diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index ca66d1a5..cb4b51aa 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -1,3 +1,59 @@ +"""PubNub Python SDK Implementation. + +This module provides the main implementation of the PubNub Python SDK, offering real-time +messaging and presence functionality. It implements the native (synchronous) version of +the PubNub client, building upon the core functionality defined in PubNubCore. + +Key Components: + - PubNub: Main class for interacting with PubNub services + - NativeSubscriptionManager: Handles channel subscriptions and message processing + - NativeReconnectionManager: Manages network reconnection strategies + - NativePublishSequenceManager: Manages message sequence numbers for publishing + - SubscribeListener: Helper class for handling subscription events + - NonSubscribeListener: Helper class for handling non-subscription operations + +Features: + - Real-time messaging with publish/subscribe + - Presence detection and heartbeat + - Channel and Channel Group support + - Message queueing and worker thread management + - Automatic reconnection handling + - Custom request handler support + - Telemetry tracking + +Usage Example: + ```python + from pubnub.pnconfiguration import PNConfiguration + from pubnub.pubnub import PubNub + + config = PNConfiguration() + config.publish_key = 'your_pub_key' + config.subscribe_key = 'your_sub_key' + config.uuid = 'client-123' + + pubnub = PubNub(config) + + # Publish messages + pubnub.publish().channel("my_channel").message("Hello!").sync() + ``` + +Threading Notes: + - The SDK uses multiple threads for different operations + - SubscribeMessageWorker runs in a daemon thread + - Heartbeat and reconnection timers run in separate threads + - Thread-safe implementations for sequence management and message queuing + +Error Handling: + - Automatic retry mechanisms for failed operations + - Configurable reconnection policies + - Status callbacks for error conditions + - Exception propagation through status objects + +Note: + This implementation is designed for synchronous operations. For asynchronous + operations, consider using the PubNubAsyncio implementation of the SDK. +""" + import copy import importlib import logging @@ -26,15 +82,27 @@ class PubNub(PubNubCore): - """PubNub Python API""" + """Main PubNub client class for synchronous operations. + + This class provides the primary interface for interacting with the PubNub network. + It implements synchronous (blocking) operations and manages the lifecycle of subscriptions, + message processing, and network connectivity. + + Attributes: + config (PNConfiguration): Configuration instance containing SDK settings + """ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[BaseRequestHandler] = None): - """ PubNub instance constructor + """Initialize a new PubNub instance. - Parameters: - config (PNConfiguration): PNConfiguration instance (required) - custom_request_handler (BaseRequestHandler): Custom request handler class (optional) + Args: + config (PNConfiguration): Configuration instance containing settings + custom_request_handler (Type[BaseRequestHandler], optional): Custom request handler class. + Can also be set via set_request_handler method. + Raises: + Exception: If custom request handler is not a subclass of BaseRequestHandler + AssertionError: If config is not an instance of PNConfiguration """ assert isinstance(config, PNConfiguration) PubNubCore.__init__(self, config) @@ -61,28 +129,47 @@ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[Base self._telemetry_manager = NativeTelemetryManager() - def sdk_platform(self): + def sdk_platform(self) -> str: + """Get the SDK platform identifier. + + Returns: + str: An empty string for the native SDK implementation + """ return "" - def set_request_handler(self, handler: BaseRequestHandler): - """Set custom request handler + def set_request_handler(self, handler: BaseRequestHandler) -> None: + """Set a custom request handler for HTTP operations. - Parametrers: + Args: handler (BaseRequestHandler): Instance of custom request handler + + Raises: + AssertionError: If handler is not an instance of BaseRequestHandler """ assert isinstance(handler, BaseRequestHandler) self._request_handler = handler def get_request_handler(self) -> BaseRequestHandler: - """Get instance of request handler + """Get the current request handler instance. - Return: handler(BaseRequestHandler): Instance of request handler + Returns: + BaseRequestHandler: The current request handler instance """ return self._request_handler def request_sync(self, endpoint_call_options): - platform_options = PlatformOptions(self.headers, self.config) + """Execute a synchronous request to the PubNub network. + + Args: + endpoint_call_options: Options for the endpoint call + Returns: + The response from the PubNub network + + Note: + This is an internal method used by endpoint classes + """ + platform_options = PlatformOptions(self.headers, self.config) self.merge_in_params(endpoint_call_options) if self.config.log_verbosity: @@ -90,8 +177,21 @@ def request_sync(self, endpoint_call_options): return self._request_handler.sync_request(platform_options, endpoint_call_options) def request_async(self, endpoint_name, endpoint_call_options, callback, cancellation_event): - platform_options = PlatformOptions(self.headers, self.config) + """Execute an asynchronous request to the PubNub network. + Args: + endpoint_name: Name of the endpoint being called + endpoint_call_options: Options for the endpoint call + callback: Callback function for the response + cancellation_event: Event to cancel the request + + Returns: + The async request object + + Note: + This is an internal method used by endpoint classes + """ + platform_options = PlatformOptions(self.headers, self.config) self.merge_in_params(endpoint_call_options) if self.config.log_verbosity: @@ -108,7 +208,6 @@ def request_async(self, endpoint_name, endpoint_call_options, callback, cancella ) def merge_in_params(self, options): - params_to_merge_in = {} if options.operation_type == PNOperationType.PNPublishOperation: @@ -117,6 +216,11 @@ def merge_in_params(self, options): options.merge_params_in(params_to_merge_in) def stop(self): + """Stop all subscriptions and clean up resources. + + Raises: + Exception: If subscription manager is not enabled + """ if self._subscription_manager is not None: self._subscription_manager.stop() else: @@ -130,10 +234,21 @@ def request_future(self, *args, **kwargs): class NativeReconnectionManager(ReconnectionManager): + """Manages reconnection attempts for lost network connections. + + This class implements the reconnection policy (linear or exponential backoff) + and handles the timing of reconnection attempts. + """ + def __init__(self, pubnub): super(NativeReconnectionManager, self).__init__(pubnub) def _register_heartbeat_timer(self): + """Register a new heartbeat timer for reconnection attempts. + + This method implements the reconnection policy and schedules the next + reconnection attempt based on the current state. + """ self.stop_heartbeat_timer() if self._retry_limit_reached(): @@ -169,6 +284,11 @@ def _call_time_callback(self, resp, status): self._register_heartbeat_timer() def start_polling(self): + """Start the reconnection polling process. + + This method begins the process of attempting to reconnect to the PubNub + network based on the configured reconnection policy. + """ if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE: logger.warning("reconnection policy is disabled, please handle reconnection manually.") disconnect_status = PNStatus() @@ -177,7 +297,6 @@ def start_polling(self): return logger.debug("reconnection manager start at: %s" % utils.datetime_now()) - self._register_heartbeat_timer() def stop_heartbeat_timer(self): @@ -201,7 +320,24 @@ def get_next_sequence(self): class NativeSubscriptionManager(SubscriptionManager): + """Manages channel subscriptions and message processing. + + This class handles the subscription lifecycle, message queuing, + and delivery of messages to listeners. + + Attributes: + _message_queue (Queue): Queue for incoming messages + _consumer_event (Event): Event for controlling the consumer thread + _subscribe_call: Current subscription API call + _heartbeat_periodic_callback: Callback for periodic heartbeats + """ + def __init__(self, pubnub_instance): + """Initialize the subscription manager. + + Args: + pubnub_instance: The PubNub instance this manager belongs to + """ subscription_manager = self self._message_queue = Queue() @@ -290,14 +426,20 @@ def _message_queue_put(self, message): self._message_queue.put(message) def reconnect(self): + """Reconnect all current subscriptions. + + Restarts the subscribe loop and heartbeat timer if enabled. + """ self._should_stop = False self._start_subscribe_loop() - # Check the instance flag to determine if we want to perform the presence heartbeat - # This is False by default if self._pubnub.config.enable_presence_heartbeat is True: self._register_heartbeat_timer() def disconnect(self): + """Disconnect from all subscriptions. + + Stops the subscribe loop and heartbeat timer. + """ self._should_stop = True self._stop_heartbeat_timer() self._stop_subscribe_loop() @@ -425,6 +567,22 @@ def _take_message(self): class SubscribeListener(SubscribeCallback): + """Helper class for handling subscription events. + + This class provides a way to wait for specific events or messages + in a synchronous manner. + + Attributes: + connected (bool): Whether currently connected + connected_event (Event): Event signaling connection + disconnected_event (Event): Event signaling disconnection + presence_queue (Queue): Queue for presence events + message_queue (Queue): Queue for messages + channel_queue (Queue): Queue for channel events + uuid_queue (Queue): Queue for UUID events + membership_queue (Queue): Queue for membership events + """ + def __init__(self): self.connected = False self.connected_event = Event() @@ -448,29 +606,32 @@ def presence(self, pubnub, presence): self.presence_queue.put(presence) def wait_for_connect(self): + """Wait for a connection to be established. + + Raises: + Exception: If already connected + """ if not self.connected_event.is_set(): self.connected_event.wait() - # failing because you don't have to wait is illogical - # else: - # raise Exception("the instance is already connected") - - def channel(self, pubnub, channel): - self.channel_queue.put(channel) - - def uuid(self, pubnub, uuid): - self.uuid_queue.put(uuid) - - def membership(self, pubnub, membership): - self.membership_queue.put(membership) def wait_for_disconnect(self): + """Wait for a disconnection to occur. + + Raises: + Exception: If already disconnected + """ if not self.disconnected_event.is_set(): self.disconnected_event.wait() - # failing because you don't have to wait is illogical - # else: - # raise Exception("the instance is already disconnected") def wait_for_message_on(self, *channel_names): + """Wait for a message on specific channels. + + Args: + *channel_names: Channel names to wait for + + Returns: + The message envelope when received + """ channel_names = list(channel_names) while True: env = self.message_queue.get() @@ -492,6 +653,17 @@ def wait_for_presence_on(self, *channel_names): class NonSubscribeListener: + """Helper class for handling non-subscription operations. + + This class provides a way to wait for the completion of non-subscription + operations in a synchronous manner. + + Attributes: + result: The operation result + status: The operation status + done_event (Event): Event signaling operation completion + """ + def __init__(self): self.result = None self.status = None @@ -503,20 +675,44 @@ def callback(self, result, status): self.done_event.set() def pn_await(self, timeout=5): - """ Returns False if a timeout happened, otherwise True""" + """Wait for the operation to complete. + + Args: + timeout (int): Maximum time to wait in seconds + + Returns: + bool: False if timeout occurred, True otherwise + """ return self.done_event.wait(timeout) def await_result(self, timeout=5): + """Wait for and return the operation result. + + Args: + timeout (int): Maximum time to wait in seconds + + Returns: + The operation result + """ self.pn_await(timeout) return self.result def await_result_and_reset(self, timeout=5): + """Wait for the result and reset the listener. + + Args: + timeout (int): Maximum time to wait in seconds + + Returns: + Copy of the operation result + """ self.pn_await(timeout) cp = copy.copy(self.result) self.reset() return cp def reset(self): + """Reset the listener state.""" self.result = None self.status = None self.done_event.clear() diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 54f7b221..481f9c7b 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -1,3 +1,55 @@ +"""PubNub Python SDK Asyncio Implementation. + +This module provides the asynchronous implementation of the PubNub Python SDK using +asyncio. It enables non-blocking operations for real-time communication features +and is designed for use in asyncio-based applications. + +Key Components: + - PubNubAsyncio: Main class for asynchronous PubNub operations + - AsyncioSubscriptionManager: Async implementation of subscription handling + - EventEngineSubscriptionManager: Event-driven subscription management + - AsyncioReconnectionManager: Async network reconnection handling + - AsyncioPublishSequenceManager: Async message sequence management + +Features: + - Asynchronous publish/subscribe messaging + - Non-blocking network operations + - Event-driven architecture + - Customizable request handling + - Automatic reconnection with backoff strategies + - Concurrent message processing + +Usage Example: + ```python + import asyncio + from pubnub.pnconfiguration import PNConfiguration + from pubnub.pubnub_asyncio import PubNubAsyncio + + async def main(): + config = PNConfiguration() + config.publish_key = 'your_pub_key' + config.subscribe_key = 'your_sub_key' + config.uuid = 'client-123' + + pubnub = PubNubAsyncio(config) + + # Subscribe to channels + await pubnub.subscribe().channels("my_channel").execute() + + # Publish messages + await pubnub.publish().channel("my_channel").message("Hello!").future() + + # Cleanup + await pubnub.stop() + + asyncio.run(main()) + ``` + +Note: + This implementation is designed for asynchronous operations using Python's + asyncio framework. For synchronous operations, use the standard PubNub class. +""" + import importlib import logging import asyncio @@ -33,22 +85,50 @@ class PubNubAsyncHTTPTransport(AsyncHTTPTransport): + """Custom HTTP transport for asynchronous PubNub operations. + + This class extends AsyncHTTPTransport to provide PubNub-specific + transport functionality with proper connection state tracking. + + Attributes: + is_closed (bool): Whether the transport is closed + """ + is_closed: bool = False def close(self): + """Close the transport connection.""" self.is_closed = True super().aclose() class PubNubAsyncio(PubNubCore): - """ - PubNub Python SDK for asyncio framework + """PubNub Python SDK for asyncio framework. + + This class provides the main interface for asynchronous interactions with + the PubNub network. It implements all core PubNub functionality in a + non-blocking manner. + + Attributes: + event_loop (AbstractEventLoop): The asyncio event loop to use """ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *, custom_request_handler=None): + """Initialize a new PubNubAsyncio instance. + + Args: + config: PubNub configuration instance + custom_event_loop (AbstractEventLoop, optional): Custom event loop to use + subscription_manager (Type, optional): Custom subscription manager class + custom_request_handler (Type[BaseRequestHandler], optional): Custom request + handler class. Can also be set via PUBNUB_ASYNC_REQUEST_HANDLER + environment variable. + + Raises: + Exception: If custom request handler is not a subclass of BaseRequestHandler + """ super(PubNubAsyncio, self).__init__(config) self.event_loop = custom_event_loop or asyncio.get_event_loop() - self._session = None if (not custom_request_handler) and (handler := os.getenv('PUBNUB_ASYNC_REQUEST_HANDLER')): @@ -73,7 +153,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *, self._subscription_manager = subscription_manager(self) self._publish_sequence_manager = AsyncioPublishSequenceManager(self.event_loop, PubNubCore.MAX_SEQUENCE) - self._telemetry_manager = AsyncioTelemetryManager() @property @@ -81,24 +160,42 @@ def _connector(self): return self._request_handler._connector async def close_pending_tasks(self, tasks): + """Close any pending tasks and wait for completion. + + Args: + tasks: List of tasks to close + """ await asyncio.gather(*tasks) await asyncio.sleep(0.1) async def create_session(self): + """Create a new HTTP session.""" await self._request_handler.create_session() async def close_session(self): + """Close the current HTTP session.""" await self._request_handler.close_session() async def set_connector(self, connector): + """Set a custom connector for HTTP operations. + + Args: + connector: The connector to use + """ await self._request_handler.set_connector(connector) async def stop(self): + """Stop all operations and clean up resources.""" if self._subscription_manager: self._subscription_manager.stop() await self.close_session() def sdk_platform(self): + """Get the SDK platform identifier. + + Returns: + str: "-Asyncio" to identify this as the asyncio implementation + """ return "-Asyncio" def request_sync(self, *args): @@ -108,10 +205,32 @@ def request_deferred(self, *args): raise NotImplementedError async def request_result(self, options_func, cancellation_event): + """Execute a request and return its result. + + Args: + options_func: Function that returns request options + cancellation_event: Event to cancel the request + + Returns: + The result of the request + """ envelope = await self._request_handler.async_request(options_func, cancellation_event) return envelope.result async def request_future(self, options_func, cancellation_event): + """Execute a request and return a future. + + This method handles various error conditions and wraps them in + appropriate exception types. + + Args: + options_func: Function that returns request options + cancellation_event: Event to cancel the request + + Returns: + PubNubAsyncioException: On error + AsyncioEnvelope: On success + """ try: res = await self._request_handler.async_request(options_func, cancellation_event) return res @@ -157,16 +276,28 @@ async def request_future(self, options_func, cancellation_event): class AsyncioReconnectionManager(ReconnectionManager): + """Manages reconnection attempts for lost network connections. + + This class implements the reconnection policy (linear or exponential backoff) + using asyncio's event loop for timing. + + Attributes: + _task: The current reconnection task + """ + def __init__(self, pubnub): self._task = None super(AsyncioReconnectionManager, self).__init__(pubnub) async def _register_heartbeat_timer(self): + """Register a new heartbeat timer for reconnection attempts. + + This method implements the reconnection policy and schedules the next + reconnection attempt based on the current state. + """ while True: self._recalculate_interval() - await asyncio.sleep(self._timer_interval) - logger.debug("reconnect loop at: %s" % utils.datetime_now()) try: @@ -180,6 +311,7 @@ async def _register_heartbeat_timer(self): self._connection_errors += 1 def start_polling(self): + """Start the reconnection polling process.""" if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE: logger.warning("reconnection policy is disabled, please handle reconnection manually.") return @@ -187,6 +319,7 @@ def start_polling(self): self._task = asyncio.ensure_future(self._register_heartbeat_timer()) def stop_polling(self): + """Stop the reconnection polling process.""" if self._task is not None and not self._task.cancelled(): self._task.cancel() @@ -208,6 +341,18 @@ async def get_next_sequence(self): class AsyncioSubscriptionManager(SubscriptionManager): + """Manages channel subscriptions and message processing. + + This class handles the subscription lifecycle, message queuing, + and delivery of messages to listeners using asyncio primitives. + + Attributes: + _message_queue (Queue): Queue for incoming messages + _subscription_lock (Semaphore): Lock for subscription operations + _subscribe_loop_task: Current subscription loop task + _heartbeat_periodic_callback: Callback for periodic heartbeats + """ + def __init__(self, pubnub_instance): subscription_manager = self @@ -255,13 +400,19 @@ def _start_worker(self): ) def reconnect(self): - # TODO: method is synchronized in Java + """Reconnect all current subscriptions. + + Restarts the subscribe loop and heartbeat timer if enabled. + """ self._should_stop = False self._subscribe_loop_task = asyncio.ensure_future(self._start_subscribe_loop()) self._register_heartbeat_timer() def disconnect(self): - # TODO: method is synchronized in Java + """Disconnect from all subscriptions. + + Stops the subscribe loop and heartbeat timer. + """ self._should_stop = True self._stop_heartbeat_timer() self._stop_subscribe_loop() @@ -273,61 +424,45 @@ def stop(self): self._subscribe_loop_task.cancel() async def _start_subscribe_loop(self): - self._stop_subscribe_loop() + """Start the subscription loop. + This method handles the main subscription process, including + channel management and error handling. + """ + self._stop_subscribe_loop() await self._subscription_lock.acquire() - combined_channels = self._subscription_state.prepare_channel_list(True) - combined_groups = self._subscription_state.prepare_channel_group_list(True) - - if len(combined_channels) == 0 and len(combined_groups) == 0: - self._subscription_lock.release() - return - - self._subscribe_request_task = asyncio.ensure_future( - Subscribe(self._pubnub) - .channels(combined_channels) - .channel_groups(combined_groups) - .timetoken(self._timetoken) - .region(self._region) - .filter_expression(self._pubnub.config.filter_expression) - .future() - ) - - e = await self._subscribe_request_task - - if self._subscribe_request_task.cancelled(): - self._subscription_lock.release() - return + try: + combined_channels = self._subscription_state.prepare_channel_list(True) + combined_groups = self._subscription_state.prepare_channel_group_list(True) - if e.is_error(): - if e.status and e.status.category == PNStatusCategory.PNCancelledCategory: + if len(combined_channels) == 0 and len(combined_groups) == 0: self._subscription_lock.release() return - if e.status and e.status.category == PNStatusCategory.PNTimeoutCategory: - asyncio.ensure_future(self._start_subscribe_loop()) - self._subscription_lock.release() - return + self._subscribe_request_task = asyncio.ensure_future( + Subscribe(self._pubnub) + .channels(combined_channels) + .channel_groups(combined_groups) + .timetoken(self._timetoken) + .region(self._region) + .filter_expression(self._pubnub.config.filter_expression) + .future() + ) - logger.error("Exception in subscribe loop: %s" % str(e)) + e = await self._subscribe_request_task - if e.status and e.status.category == PNStatusCategory.PNAccessDeniedCategory: - e.status.operation = PNOperationType.PNUnsubscribeOperation + if self._subscribe_request_task.cancelled(): + return - # TODO: raise error - self._listener_manager.announce_status(e.status) + if e.is_error(): + await self._handle_subscription_error(e) + else: + self._handle_endpoint_call(e.result, e.status) + self._subscribe_loop_task = asyncio.ensure_future(self._start_subscribe_loop()) - self._reconnection_manager.start_polling() - self._subscription_lock.release() - self.disconnect() - return - else: - self._handle_endpoint_call(e.result, e.status) + finally: self._subscription_lock.release() - self._subscribe_loop_task = asyncio.ensure_future(self._start_subscribe_loop()) - - self._subscription_lock.release() def _stop_subscribe_loop(self): if self._subscribe_request_task is not None and not self._subscribe_request_task.cancelled(): @@ -398,6 +533,28 @@ async def _send_leave_helper(self, unsubscribe_operation): self._listener_manager.announce_status(envelope.status) + async def _handle_subscription_error(self, error): + """Handle errors that occur during subscription. + + Args: + error: The error that occurred + """ + if error.status and error.status.category == PNStatusCategory.PNCancelledCategory: + return + + if error.status and error.status.category == PNStatusCategory.PNTimeoutCategory: + asyncio.ensure_future(self._start_subscribe_loop()) + return + + logger.error("Exception in subscribe loop: %s" % str(error)) + + if error.status and error.status.category == PNStatusCategory.PNAccessDeniedCategory: + error.status.operation = PNOperationType.PNUnsubscribeOperation + + self._listener_manager.announce_status(error.status) + self._reconnection_manager.start_polling() + self.disconnect() + class EventEngineSubscriptionManager(SubscriptionManager): event_engine: StateMachine @@ -550,6 +707,20 @@ def _schedule_next(self): class SubscribeListener(SubscribeCallback): + """Helper class for handling subscription events. + + This class provides a way to wait for specific events or messages + in an asynchronous manner. + + Attributes: + connected (bool): Whether currently connected + connected_event (Event): Event signaling connection + disconnected_event (Event): Event signaling disconnection + presence_queue (Queue): Queue for presence events + message_queue (Queue): Queue for messages + error_queue (Queue): Queue for errors + """ + def __init__(self): self.connected = False self.connected_event = Event() @@ -559,6 +730,12 @@ def __init__(self): self.error_queue = Queue() def status(self, pubnub, status): + """Handle status updates from the PubNub instance. + + Args: + pubnub: The PubNub instance + status: The status update + """ super().status(pubnub, status) if utils.is_subscribed_event(status) and not self.connected_event.is_set(): self.connected_event.set() @@ -568,12 +745,35 @@ def status(self, pubnub, status): self.error_queue.put_nowait(status.error_data.exception) def message(self, pubnub, message): + """Handle incoming messages from the PubNub instance. + + Args: + pubnub: The PubNub instance + message: The incoming message + """ self.message_queue.put_nowait(message) def presence(self, pubnub, presence): + """Handle presence updates from the PubNub instance. + + Args: + pubnub: The PubNub instance + presence: The presence update + """ self.presence_queue.put_nowait(presence) async def _wait_for(self, coro): + """Wait for a coroutine to complete. + + Args: + coro: The coroutine to wait for + + Returns: + The result of the coroutine + + Raises: + Exception: If an error occurs while waiting + """ scc_task = asyncio.ensure_future(coro) err_task = asyncio.ensure_future(self.error_queue.get()) @@ -592,20 +792,27 @@ async def _wait_for(self, coro): return scc_task.result() async def wait_for_connect(self): + """Wait for a connection to be established.""" if not self.connected_event.is_set(): await self._wait_for(self.connected_event.wait()) - # failing because you don't have to wait is illogical - # else: - # raise Exception("instance is already connected") async def wait_for_disconnect(self): + """Wait for a disconnection to occur.""" if not self.disconnected_event.is_set(): await self._wait_for(self.disconnected_event.wait()) - # failing because you don't have to wait is illogical - # else: - # raise Exception("instance is already disconnected") async def wait_for_message_on(self, *channel_names): + """Wait for a message on specific channels. + + Args: + *channel_names: Channel names to wait for + + Returns: + The message envelope when received + + Raises: + Exception: If an error occurs while waiting + """ channel_names = list(channel_names) while True: try: diff --git a/pubnub/pubnub_core.py b/pubnub/pubnub_core.py index ec4b5b26..ea94f798 100644 --- a/pubnub/pubnub_core.py +++ b/pubnub/pubnub_core.py @@ -1,8 +1,64 @@ +"""PubNub Core SDK Implementation. + +This module implements the core functionality of the PubNub Python SDK. It provides +a comprehensive interface for real-time communication features including: + +- Publish/Subscribe Messaging +- Presence Detection +- Channel Groups +- Message Storage and Playback +- Push Notifications +- Stream Controllers +- Message Actions +- File Sharing +- Access Management + +The PubNubCore class serves as the base implementation, providing all core functionality +while allowing platform-specific implementations (like synchronous vs asynchronous) +to extend it. + +Typical usage: + ```python + from pubnub.pnconfiguration import PNConfiguration + from pubnub.pubnub import PubNub + + config = PNConfiguration() + config.publish_key = 'your_pub_key' + config.subscribe_key = 'your_sub_key' + config.uuid = 'client-123' + + pubnub = PubNub(config) + + # Publishing + pubnub.publish().channel("chat").message({"text": "Hello!"}).sync() + + # Subscribing + def my_listener(message, event): + print(f"Received: {message.message}") + + pubnub.add_listener(my_listener) + pubnub.subscribe().channels("chat").execute() + ``` + +Implementation Notes: + - All methods return builder objects that can be chained + - Synchronous operations end with .sync() + - Asynchronous implementations may provide different execution methods + - Error handling is done through PubNubException + - Configuration is immutable by default for thread safety + +See Also: + - PNConfiguration: For SDK configuration options + - PubNub: For the main implementation + - PubNubAsyncio: For the asynchronous implementation + - SubscribeCallback: For handling subscription events +""" + import logging import time from warnings import warn from copy import deepcopy -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Any, TYPE_CHECKING from pubnub.endpoints.entities.membership.add_memberships import AddSpaceMembers, AddUserSpaces from pubnub.endpoints.entities.membership.update_memberships import UpdateSpaceMembers, UpdateUserSpaces from pubnub.endpoints.entities.membership.fetch_memberships import FetchSpaceMemberships, FetchUserMemberships @@ -91,30 +147,46 @@ from pubnub.endpoints.push.list_push_provisions import ListPushProvisions from pubnub.managers import TelemetryManager +if TYPE_CHECKING: + from pubnub.endpoints.file_operations.send_file_asyncio import AsyncioSendFile + from pubnub.endpoints.file_operations.download_file_asyncio import DownloadFileAsyncio + logger = logging.getLogger("pubnub") class PubNubCore: - """A base class for PubNub Python API implementations""" - SDK_VERSION = "10.4.0" - SDK_NAME = "PubNub-Python" + """A base class for PubNub Python API implementations. + + This class provides the core functionality for interacting with the PubNub real-time network. + It includes methods for publishing/subscribing to channels, managing presence, handling files, + and dealing with access control. + """ - TIMESTAMP_DIVIDER = 1000 - MAX_SEQUENCE = 65535 + SDK_VERSION: str = "10.4.0" + SDK_NAME: str = "PubNub-Python" + + TIMESTAMP_DIVIDER: int = 1000 + MAX_SEQUENCE: int = 65535 __metaclass__ = ABCMeta - __crypto = None + __crypto: Optional[PubNubCryptoModule] = None _subscription_registry: PNSubscriptionRegistry - def __init__(self, config: PNConfiguration): + def __init__(self, config: PNConfiguration) -> None: + """Initialize a new PubNub instance. + + Args: + config (PNConfiguration): Configuration instance containing settings like + publish/subscribe keys, UUID, and other operational parameters. + """ if not config.disable_config_locking: config.lock() self.config = deepcopy(config) else: self.config = config self.config.validate() - self.headers = { + self.headers: Dict[str, str] = { 'User-Agent': self.sdk_name } @@ -126,121 +198,400 @@ def __init__(self, config: PNConfiguration): self._subscription_registry = PNSubscriptionRegistry(self) @property - def base_origin(self): + def base_origin(self) -> str: return self._base_path_manager.get_base_path() @property - def sdk_name(self): + def sdk_name(self) -> str: return "%s%s/%s" % (PubNubCore.SDK_NAME, self.sdk_platform(), PubNubCore.SDK_VERSION) @abstractmethod - def sdk_platform(self): + def sdk_platform(self) -> str: pass @property - def uuid(self): + def uuid(self) -> str: return self.config.uuid @property - def crypto(self) -> PubNubCryptoModule: + def crypto(self) -> Optional[PubNubCryptoModule]: crypto_module = self.__crypto or self.config.crypto_module if not crypto_module and self.config.cipher_key: crypto_module = self.config.DEFAULT_CRYPTO_MODULE(self.config) return crypto_module @crypto.setter - def crypto(self, crypto: PubNubCryptoModule): + def crypto(self, crypto: PubNubCryptoModule) -> None: self.__crypto = crypto - def add_listener(self, listener): - self._validate_subscribe_manager_enabled() + def add_listener(self, listener: Any) -> None: + """Add a listener for subscribe events. - return self._subscription_manager.add_listener(listener) + The listener will receive callbacks for messages, presence events, + and status updates. + + Args: + listener (Any): An object implementing the necessary callback methods + for handling subscribe events. - def remove_listener(self, listener): + Raises: + Exception: If subscription manager is not enabled. + + Example: + ```python + class MyListener(SubscribeCallback): + def message(self, pubnub, message): + print(f"Received message: {message.message}") + + pubnub.add_listener(MyListener()) + ``` + """ self._validate_subscribe_manager_enabled() + return self._subscription_manager.add_listener(listener) - return self._subscription_manager.remove_listener(listener) + def remove_listener(self, listener: Any) -> None: + """Remove a listener from the subscription manager. + + Args: + listener (Any): The listener to remove. + + Returns: + None + + Example: + ```python + pubnub.remove_listener(MyListener()) + ``` + """ - def get_subscribed_channels(self): self._validate_subscribe_manager_enabled() + return self._subscription_manager.remove_listener(listener) + def get_subscribed_channels(self) -> List[str]: + self._validate_subscribe_manager_enabled() return self._subscription_manager.get_subscribed_channels() - def get_subscribed_channel_groups(self): + def get_subscribed_channel_groups(self) -> List[str]: self._validate_subscribe_manager_enabled() return self._subscription_manager.get_subscribed_channel_groups() def add_channel_to_channel_group(self, channels: Union[str, List[str]] = None, channel_group: str = None) -> AddChannelToChannelGroup: + """Add channels to a channel group. + + Channel groups allow you to group multiple channels under a single + subscription point. + + Args: + channels: Channel(s) to add to the group. + channel_group (str, optional): The name of the channel group. + + Returns: + AddChannelToChannelGroup: An AddChannelToChannelGroup object that can + be used to execute the request. + + Example: + ```python + pubnub.add_channel_to_channel_group( + channels=["chat-1", "chat-2"], + channel_group="all-chats" + ).sync() + ``` + """ return AddChannelToChannelGroup(self, channels=channels, channel_group=channel_group) def remove_channel_from_channel_group(self, channels: Union[str, List[str]] = None, channel_group: str = None) -> RemoveChannelFromChannelGroup: + """Remove channels from a channel group. + + Removes specified channels from a channel group subscription point. + + Args: + channels: Channel(s) to remove from the group. + channel_group (str, optional): The name of the channel group. + + Returns: + RemoveChannelFromChannelGroup: A RemoveChannelFromChannelGroup object + that can be used to execute the request. + + Example: + ```python + pubnub.remove_channel_from_channel_group( + channels="chat-1", + channel_group="all-chats" + ).sync() + ``` + """ return RemoveChannelFromChannelGroup(self, channels=channels, channel_group=channel_group) def list_channels_in_channel_group(self, channel_group: str = None) -> ListChannelsInChannelGroup: + """List all channels in a channel group. + + Retrieves all channels that are members of the specified channel group. + + Args: + channel_group (str, optional): The name of the channel group. + + Returns: + ListChannelsInChannelGroup: A ListChannelsInChannelGroup object that + can be used to execute the request. + + Example: + ```python + result = pubnub.list_channels_in_channel_group( + channel_group="all-chats" + ).sync() + print(f"Channels in group: {result.channels}") + ``` + """ return ListChannelsInChannelGroup(self, channel_group=channel_group) def remove_channel_group(self) -> RemoveChannelGroup: + """Remove a channel group. + + Removes a channel group from the PubNub network. + + Returns: + RemoveChannelGroup: A RemoveChannelGroup object that can be used to + execute the request. + + Example: + ```python + pubnub.remove_channel_group().sync() + ``` + """ return RemoveChannelGroup(self) def subscribe(self) -> SubscribeBuilder: + """Create a new subscription to channels or channel groups. + + Returns: + SubscribeBuilder: A builder object for configuring the subscription. + + Example: + ```python + pubnub.subscribe() \ + .channels("my_channel") \ + .with_presence() \ + .execute() + ``` + """ return SubscribeBuilder(self) def unsubscribe(self) -> UnsubscribeBuilder: + """Create a new unsubscribe request. + + Returns: + UnsubscribeBuilder: A builder object for configuring the unsubscribe. + + Example: + ```python + pubnub.unsubscribe() \ + .channels("my_channel") \ + .execute() + ``` + """ return UnsubscribeBuilder(self) - def unsubscribe_all(self): + def unsubscribe_all(self) -> None: + """Unsubscribe from all channels and channel groups. + + Removes all current subscriptions from the PubNub instance. + + Returns: + None + + Example: + ```python + pubnub.unsubscribe_all() + ``` + """ return self._subscription_registry.unsubscribe_all() - def reconnect(self): + def reconnect(self) -> None: return self._subscription_registry.reconnect() def heartbeat(self) -> Heartbeat: + """Send a heartbeat signal to the PubNub network. + + Updates presence information for the current user in subscribed channels. + This is typically handled automatically by the SDK but can be manually + triggered if needed. + + Returns: + Heartbeat: A Heartbeat object that can be used to execute the request. + + Note: + Manual heartbeats are rarely needed as the SDK handles presence + automatically when subscribing to channels with presence enabled. + """ return Heartbeat(self) def set_state(self, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None, - state: Optional[Dict[str, any]] = None) -> SetState: + state: Optional[Dict[str, Any]] = None) -> SetState: + """Set state data for a subscriber. + + Sets state information for the current subscriber on specified channels + or channel groups. + + Args: + channels: Channel(s) to set state for. + channel_groups: Channel group(s) to set state for. + state: Dictionary containing state information. + + Returns: + SetState: A SetState object that can be used to execute the request. + + Example: + ```python + pubnub.set_state( + channels=["game"], + state={"level": 5, "health": 100} + ).sync() + ``` + """ return SetState(self, self._subscription_manager, channels, channel_groups, state) def get_state(self, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None, uuid: Optional[str] = None) -> GetState: + """Get the current state for a user. + + Retrieves the metadata associated with a user's presence in specified + channels or channel groups. + + Args: + channels: Channel(s) to get state from. + channel_groups: Channel group(s) to get state from. + uuid (str, optional): The UUID of the user to get state for. + If not provided, uses the UUID of the current instance. + + Returns: + GetState: A GetState object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_state( + channels=["game"], + uuid="player123" + ).sync() + print(f"Player state: {result.state}") + ``` + """ return GetState(self, channels, channel_groups, uuid) def here_now(self, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None, include_state: bool = False, include_uuids: bool = True) -> HereNow: + """Get presence information for channels and channel groups. + + Retrieves information about subscribers currently present in specified + channels and channel groups. + + Args: + channels: Channel(s) to get presence info for. + channel_groups: Channel group(s) to get presence info for. + include_state: Whether to include subscriber state information. + include_uuids: Whether to include subscriber UUIDs. + + Returns: + HereNow: A HereNow object that can be used to execute the request. + + Example: + ```python + result = pubnub.here_now( + channels=["lobby"], + include_state=True + ).sync() + print(f"Active users: {result.total_occupancy}") + ``` + """ return HereNow(self, channels, channel_groups, include_state, include_uuids) - def where_now(self, user_id: Optional[str] = None): + def where_now(self, user_id: Optional[str] = None) -> WhereNow: + """Get presence information for a specific user. + + Retrieves a list of channels the specified user is currently subscribed to. + + Args: + user_id (str, optional): The UUID of the user to get presence info for. + If not provided, uses the UUID of the current instance. + + Returns: + WhereNow: A WhereNow object that can be used to execute the request. + + Example: + ```python + result = pubnub.where_now(user_id="user123").sync() + print(f"User is in channels: {result.channels}") + ``` + """ return WhereNow(self, user_id) - def publish(self, channel: str = None, message: any = None, should_store: Optional[bool] = None, - use_post: Optional[bool] = None, meta: Optional[any] = None, replicate: Optional[bool] = None, + def publish(self, channel: str = None, message: Any = None, should_store: Optional[bool] = None, + use_post: Optional[bool] = None, meta: Optional[Any] = None, replicate: Optional[bool] = None, ptto: Optional[int] = None, ttl: Optional[int] = None, custom_message_type: Optional[str] = None ) -> Publish: - """ Sends a message to all channel subscribers. A successfully published message is replicated across PubNub's - points of presence and sent simultaneously to all subscribed clients on a channel. + """Publish a message to a channel. + + Sends a message to all channel subscribers. Messages are replicated across PubNub's + points of presence and delivered to all subscribed clients simultaneously. + + Args: + channel (str, optional): The channel to publish to. + message (Any, optional): The message to publish. Can be any JSON-serializable type. + should_store (bool, optional): Whether to store the message in history. + use_post (bool, optional): Whether to use POST instead of GET for the request. + meta (Any, optional): Additional metadata to attach to the message. + replicate (bool, optional): Whether to replicate the message across data centers. + ptto (int, optional): Publish TimeToken Override - Timestamp for the message. + ttl (int, optional): Time to live in minutes for the message. + custom_message_type (str, optional): Custom message type identifier. + + Returns: + Publish: A Publish object that can be used to execute the request. + + Example: + ```python + pubnub.publish( + channel="my_channel", + message={"text": "Hello, World!"}, + meta={"sender": "python-sdk"} + ).sync() + ``` """ return Publish(self, channel=channel, message=message, should_store=should_store, use_post=use_post, meta=meta, replicate=replicate, ptto=ptto, ttl=ttl, custom_message_type=custom_message_type) - def grant(self): + def grant(self) -> Grant: """ Deprecated. Use grant_token instead """ warn("Access management v2 is being deprecated. We recommend switching to grant_token().", DeprecationWarning, stacklevel=2) return Grant(self) - def grant_token(self, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None, - users: Union[str, List[str]] = None, spaces: Union[str, List[str]] = None, - authorized_user_id: str = None, ttl: Optional[int] = None, meta: Optional[any] = None): - return GrantToken(self, channels=channels, channel_groups=channel_groups, users=users, spaces=spaces, - authorized_user_id=authorized_user_id, ttl=ttl, meta=meta) + def grant_token( + self, + channels: Union[str, List[str]] = None, + channel_groups: Union[str, List[str]] = None, + users: Union[str, List[str]] = None, + spaces: Union[str, List[str]] = None, + authorized_user_id: str = None, + ttl: Optional[int] = None, + meta: Optional[Any] = None + ) -> GrantToken: + return GrantToken( + self, + channels=channels, + channel_groups=channel_groups, + users=users, + spaces=spaces, + authorized_user_id=authorized_user_id, + ttl=ttl, + meta=meta + ) def revoke_token(self, token: str) -> RevokeToken: return RevokeToken(self, token) - def audit(self): + def audit(self) -> Audit: """ Deprecated """ warn("Access management v2 is being deprecated.", DeprecationWarning, stacklevel=2) return Audit(self) @@ -248,78 +599,422 @@ def audit(self): # Push Related methods def list_push_channels(self, device_id: str = None, push_type: PNPushType = None, topic: str = None, environment: PNPushEnvironment = None) -> ListPushProvisions: + """List channels registered for push notifications. + + Retrieves a list of channels that are registered for push notifications + for a specific device. + + Args: + device_id (str, optional): The device token/ID to list channels for. + push_type (PNPushType, optional): The type of push notification service + (e.g., APNS, FCM). + topic (str, optional): The topic for APNS notifications. + environment (PNPushEnvironment, optional): The environment for APNS + (production or development). + + Returns: + ListPushProvisions: A ListPushProvisions object that can be used to + execute the request. + + Example: + ```python + from pubnub.enums import PNPushType + + result = pubnub.list_push_channels( + device_id="device_token", + push_type=PNPushType.APNS + ).sync() + print(f"Registered channels: {result.channels}") + ``` + """ return ListPushProvisions(self, device_id=device_id, push_type=push_type, topic=topic, environment=environment) - def add_channels_to_push(self, channels: Union[str, List[str]], device_id: str = None, push_type: PNPushType = None, - topic: str = None, environment: PNPushEnvironment = None) -> AddChannelsToPush: + def add_channels_to_push(self, channels: Union[str, List[str]], device_id: str = None, + push_type: PNPushType = None, topic: str = None, + environment: PNPushEnvironment = None) -> AddChannelsToPush: + """Register channels for push notifications. + + Enables push notifications for specified channels on a device. + + Args: + channels: Channel(s) to enable push notifications for. + device_id (str, optional): The device token/ID to register. + push_type (PNPushType, optional): The type of push notification service. + topic (str, optional): The topic for APNS notifications. + environment (PNPushEnvironment, optional): The environment for APNS. + + Returns: + AddChannelsToPush: An AddChannelsToPush object that can be used to + execute the request. + + Example: + ```python + from pubnub.enums import PNPushType + + pubnub.add_channels_to_push( + channels=["alerts", "news"], + device_id="device_token", + push_type=PNPushType.FCM + ).sync() + ``` + """ return AddChannelsToPush(self, channels=channels, device_id=device_id, push_type=push_type, topic=topic, environment=environment) def remove_channels_from_push(self, channels: Union[str, List[str]] = None, device_id: str = None, push_type: PNPushType = None, topic: str = None, environment: PNPushEnvironment = None) -> RemoveChannelsFromPush: + """Unregister channels from push notifications. + + Disables push notifications for specified channels on a device. + + Args: + channels: Channel(s) to disable push notifications for. + device_id (str, optional): The device token/ID. + push_type (PNPushType, optional): The type of push notification service. + topic (str, optional): The topic for APNS notifications. + environment (PNPushEnvironment, optional): The environment for APNS. + + Returns: + RemoveChannelsFromPush: A RemoveChannelsFromPush object that can be + used to execute the request. + + Example: + ```python + pubnub.remove_channels_from_push( + channels=["alerts"], + device_id="device_token", + push_type=PNPushType.FCM + ).sync() + ``` + """ return RemoveChannelsFromPush(self, channels=channels, device_id=device_id, push_type=push_type, topic=topic, environment=environment) - def remove_device_from_push(self, device_id: str = None, push_type: PNPushType = None, topic: str = None, - environment: PNPushEnvironment = None) -> RemoveDeviceFromPush: + def remove_device_from_push(self, device_id: str = None, push_type: PNPushType = None, + topic: str = None, environment: PNPushEnvironment = None) -> RemoveDeviceFromPush: + """Unregister a device from all push notifications. + + Removes all push notification registrations for a device. + + Args: + device_id (str, optional): The device token/ID to unregister. + push_type (PNPushType, optional): The type of push notification service. + topic (str, optional): The topic for APNS notifications. + environment (PNPushEnvironment, optional): The environment for APNS. + + Returns: + RemoveDeviceFromPush: A RemoveDeviceFromPush object that can be used + to execute the request. + + Example: + ```python + pubnub.remove_device_from_push( + device_id="device_token", + push_type=PNPushType.FCM + ).sync() + ``` + """ return RemoveDeviceFromPush(self, device_id=device_id, push_type=push_type, topic=topic, environment=environment) - def history(self): + def history(self) -> History: + """Fetch historical messages from a channel. + + Retrieves previously published messages from the PubNub network. + + Returns: + History: A History object that can be used to configure and execute the request. + + Example: + ```python + result = pubnub.history()\ + .channel("chat")\ + .count(100)\ + .include_timetoken(True)\ + .sync() + + for message in result.messages: + print(f"Message: {message.entry} at {message.timetoken}") + ``` + + Note: + The number of messages that can be retrieved is limited by your + PubNub subscription level and message retention settings. + """ return History(self) def message_counts(self, channels: Union[str, List[str]] = None, channels_timetoken: Union[str, List[str]] = None) -> MessageCount: + """Get message counts for channels. + + Retrieves the number of messages published to specified channels, + optionally filtered by timetoken. + + Args: + channels: Channel(s) to get message counts for. + channels_timetoken: Timetoken(s) to count messages from. + + Returns: + MessageCount: A MessageCount object that can be used to execute the request. + + Example: + ```python + result = pubnub.message_counts( + channels=["chat", "alerts"], + channels_timetoken=["15790288836087530"] + ).sync() + print(f"Messages in chat: {result.channels['chat']}") + ``` + """ return MessageCount(self, channels=channels, channels_timetoken=channels_timetoken) - def fire(self, channel: str = None, message: any = None, use_post: Optional[bool] = None, - meta: Optional[any] = None) -> Fire: + def fire(self, channel: str = None, message: Any = None, use_post: Optional[bool] = None, + meta: Optional[Any] = None) -> Fire: return Fire(self, channel=channel, message=message, use_post=use_post, meta=meta) - def signal(self, channel: str = None, message: any = None, custom_message_type: Optional[str] = None) -> Signal: + def signal(self, channel: str = None, message: Any = None, custom_message_type: Optional[str] = None) -> Signal: return Signal(self, channel=channel, message=message, custom_message_type=custom_message_type) def set_uuid_metadata(self, uuid: str = None, include_custom: bool = None, custom: dict = None, include_status: bool = True, include_type: bool = True, status: str = None, type: str = None, name: str = None, email: str = None, external_id: str = None, profile_url: str = None) -> SetUuid: + """Set or update metadata for a UUID. + + Associates custom metadata with a UUID that can be used for user profiles, + presence information, or any other user-related data. + + Args: + uuid (str, optional): The UUID to set metadata for. + include_custom (bool, optional): Whether to include custom fields in response. + custom (dict, optional): Custom metadata fields to set. + include_status (bool, optional): Whether to include status in response. + include_type (bool, optional): Whether to include type in response. + status (str, optional): User's status (e.g., "online", "offline"). + type (str, optional): User's type or role. + name (str, optional): User's display name. + email (str, optional): User's email address. + external_id (str, optional): External system identifier. + profile_url (str, optional): URL to user's profile image. + + Returns: + SetUuid: A SetUuid object that can be used to execute the request. + + Example: + ```python + pubnub.set_uuid_metadata() \ + .uuid("user-123") \ + .name("John Doe") \ + .email("john@example.com") \ + .custom({"role": "admin"}) \ + .sync() + ``` + """ return SetUuid(self, uuid=uuid, include_custom=include_custom, custom=custom, include_status=include_status, include_type=include_type, status=status, type=type, name=name, email=email, external_id=external_id, profile_url=profile_url) def get_uuid_metadata(self, uuid: str = None, include_custom: bool = None, include_status: bool = True, include_type: bool = True) -> GetUuid: + """Get metadata for a specific UUID. + + Retrieves the metadata associated with a UUID including custom fields, + status, and type information. + + Args: + uuid (str, optional): The UUID to get metadata for. + include_custom (bool, optional): Whether to include custom fields. + include_status (bool, optional): Whether to include status. + include_type (bool, optional): Whether to include type. + + Returns: + GetUuid: A GetUuid object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_uuid_metadata()\ + .uuid("user-123")\ + .include_custom(True)\ + .sync() + print(f"User name: {result.result.data['name']}") + ``` + """ return GetUuid(self, uuid=uuid, include_custom=include_custom, include_status=include_status, include_type=include_type) def remove_uuid_metadata(self, uuid: str = None) -> RemoveUuid: + """Remove all metadata for a UUID. + + Deletes all metadata associated with a UUID including custom fields, + status, and type information. + + Args: + uuid (str, optional): The UUID to remove metadata for. + + Returns: + RemoveUuid: A RemoveUuid object that can be used to execute the request. + + Example: + ```python + pubnub.remove_uuid_metadata().uuid("user-123").sync() + ``` + + Warning: + This operation is permanent and cannot be undone. + """ return RemoveUuid(self, uuid=uuid) def get_all_uuid_metadata(self, include_custom: bool = None, include_status: bool = True, include_type: bool = True, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None) -> GetAllUuid: + """Get metadata for all UUIDs. + + Retrieves metadata for all UUIDs with optional filtering and sorting. + + Args: + include_custom (bool, optional): Whether to include custom fields. + include_status (bool, optional): Whether to include status. + include_type (bool, optional): Whether to include type. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Filter expression for results. + include_total_count (bool, optional): Whether to include total count. + sort_keys (list, optional): Keys to sort results by. + + Returns: + GetAllUuid: A GetAllUuid object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_all_uuid_metadata()\ + .include_custom(True)\ + .filter("name LIKE 'John*'")\ + .limit(100)\ + .sync() + for user in result.result.data: + print(f"User: {user['name']}") + ``` + """ return GetAllUuid(self, include_custom=include_custom, include_status=include_status, include_type=include_type, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys) def set_channel_metadata(self, channel: str = None, custom: dict = None, include_custom: bool = False, include_status: bool = True, include_type: bool = True, name: str = None, description: str = None, status: str = None, type: str = None) -> SetChannel: + """Set or update metadata for a channel. + + Associates custom metadata with a channel that can be used for channel + information, categorization, or any other channel-related data. + + Args: + channel (str, optional): The channel to set metadata for. + custom (dict, optional): Custom metadata fields to set. + include_custom (bool, optional): Whether to include custom fields in response. + include_status (bool, optional): Whether to include status in response. + include_type (bool, optional): Whether to include type in response. + name (str, optional): Display name for the channel. + description (str, optional): Channel description. + status (str, optional): Channel status (e.g., "active", "archived"). + type (str, optional): Channel type or category. + + Returns: + SetChannel: A SetChannel object that can be used to execute the request. + + Example: + ```python + pubnub.set_channel_metadata()\ + .channel("room-1")\ + .name("General Chat")\ + .description("Public chat room for general discussions")\ + .custom({"category": "public"})\ + .sync() + ``` + """ return SetChannel(self, channel=channel, custom=custom, include_custom=include_custom, include_status=include_status, include_type=include_type, name=name, description=description, status=status, type=type) def get_channel_metadata(self, channel: str = None, include_custom: bool = False, include_status: bool = True, include_type: bool = True) -> GetChannel: + """Get metadata for a specific channel. + + Retrieves the metadata associated with a channel including custom fields, + status, and type information. + + Args: + channel (str, optional): The channel to get metadata for. + include_custom (bool, optional): Whether to include custom fields. + include_status (bool, optional): Whether to include status. + include_type (bool, optional): Whether to include type. + + Returns: + GetChannel: A GetChannel object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_channel_metadata()\ + .channel("room-1")\ + .include_custom(True)\ + .sync() + print(f"Channel name: {result.result.data['name']}") + ``` + """ return GetChannel(self, channel=channel, include_custom=include_custom, include_status=include_status, include_type=include_type) def remove_channel_metadata(self, channel: str = None) -> RemoveChannel: + """Remove all metadata for a channel. + + Deletes all metadata associated with a channel including custom fields, + status, and type information. + + Args: + channel (str, optional): The channel to remove metadata for. + + Returns: + RemoveChannel: A RemoveChannel object that can be used to execute the request. + + Example: + ```python + pubnub.remove_channel_metadata().channel("room-1").sync() + ``` + + Warning: + This operation is permanent and cannot be undone. + """ return RemoveChannel(self, channel=channel) def get_all_channel_metadata(self, include_custom=False, include_status=True, include_type=True, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None) -> GetAllChannels: + """Get metadata for all channels. + + Retrieves metadata for all channels with optional filtering and sorting. + + Args: + include_custom (bool, optional): Whether to include custom fields. + include_status (bool, optional): Whether to include status. + include_type (bool, optional): Whether to include type. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Filter expression for results. + include_total_count (bool, optional): Whether to include total count. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination information. + + Returns: + GetAllChannels: A GetAllChannels object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_all_channel_metadata()\ + .include_custom(True)\ + .filter("name LIKE 'chat*'")\ + .limit(100)\ + .sync() + for channel in result.result.data: + print(f"Channel: {channel['name']}") + ``` + """ return GetAllChannels(self, include_custom=include_custom, include_status=include_status, include_type=include_type, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page) @@ -328,45 +1023,67 @@ def set_channel_members(self, channel: str = None, uuids: List[PNUUID] = None, i limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MemberIncludes = None ) -> SetChannelMembers: - """ Creates a builder for setting channel members. Can be used both as a builder or as a single call with - named parameters. - - Parameters - ---------- - channel : str - The channel for which members are being set. - uuids : List[PNUUID] - List of users to be set as members of the channel. - include_custom : bool, optional - Whether to include custom fields in the response. - limit : int, optional - Maximum number of results to return. - filter : str, optional - Filter expression to apply to the results. - include_total_count : bool, optional - Whether to include the total count of results. - sort_keys : list, optional - List of keys to sort the results by. - page : PNPage, optional - Pagination information. - include : MemberIncludes, optional - Additional fields to include in the response. - :return: An instance of SetChannelMembers builder. - :rtype: SetChannelMembers - - Example: - -------- - pn = PubNub(config) - users = [PNUser("user1"), PNUser("user2", type="admin", status="offline")] - response = pn.set_channel_members(channel="my_channel", uuids=users).sync() + """Set the members (UUIDs) of a channel, replacing any existing members. + + This method allows you to set the complete list of members for a channel, + overwriting any existing members. This is useful when you want to completely + replace the current member list rather than add or remove individual members. + + Args: + channel (str, optional): The channel to set members for. + uuids (List[PNUUID], optional): List of UUIDs to set as channel members. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MemberIncludes, optional): Additional fields to include in response. + + Returns: + SetChannelMembers: A SetChannelMembers object that can be used to execute the request. + + Example: + ```python + pubnub.set_channel_members()\ + .channel("room-1")\ + .uuids([PNUser("user-1"), PNUser("user-2"), PNUser("user-3")])\ + .sync() + ``` """ return SetChannelMembers(self, channel=channel, uuids=uuids, include_custom=include_custom, limit=limit, - filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, - include=include) + filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, + page=page, include=include) def get_channel_members(self, channel: str = None, include_custom: bool = None, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MemberIncludes = None) -> GetChannelMembers: + """Retrieve a list of members (UUIDs) that are part of a channel. + + This method allows you to fetch all members currently associated with a channel, + with options for pagination and including additional information. + + Args: + channel (str, optional): The channel to get members from. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MemberIncludes, optional): Additional fields to include in response. + + Returns: + GetChannelMembers: A GetChannelMembers object that can be used to execute the request. + + Example: + ```python + pubnub.get_channel_members()\ + .channel("room-1")\ + .include_custom(True)\ + .sync() + ``` + """ return GetChannelMembers(self, channel=channel, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, include=include) @@ -375,6 +1092,32 @@ def remove_channel_members(self, channel: str = None, uuids: List[str] = None, i limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MemberIncludes = None ) -> RemoveChannelMembers: + """Remove members (UUIDs) from a channel. + + This method allows you to remove one or more members from a channel in a single operation. + + Args: + channel (str, optional): The channel to remove members from. + uuids (List[str], optional): List of UUIDs to remove from the channel. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MemberIncludes, optional): Additional fields to include in response. + + Returns: + RemoveChannelMembers: A RemoveChannelMembers object that can be used to execute the request. + + Example: + ```python + pubnub.remove_channel_members()\ + .channel("room-1")\ + .uuids(["user-1", "user-2"])\ + .sync() + ``` + """ return RemoveChannelMembers(self, channel=channel, uuids=uuids, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, include=include) @@ -383,6 +1126,35 @@ def manage_channel_members(self, channel: str = None, uuids_to_set: List[str] = uuids_to_remove: List[str] = None, include_custom: bool = None, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MemberIncludes = None) -> ManageChannelMembers: + """Manage members of a channel by adding and/or removing UUIDs. + + This method allows you to add new members to a channel and remove existing members + in a single operation. + + Args: + channel (str, optional): The channel to manage members for. + uuids_to_set (List[str], optional): List of UUIDs to add as members. + uuids_to_remove (List[str], optional): List of UUIDs to remove from members. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MemberIncludes, optional): Additional fields to include in response. + + Returns: + ManageChannelMembers: A ManageChannelMembers object that can be used to execute the request. + + Example: + ```python + pubnub.manage_channel_members()\ + .channel("room-1")\ + .uuids_to_set(["user-1", "user-2"])\ + .uuids_to_remove(["user-3"])\ + .sync() + ``` + """ return ManageChannelMembers(self, channel=channel, uuids_to_set=uuids_to_set, uuids_to_remove=uuids_to_remove, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, @@ -391,6 +1163,33 @@ def manage_channel_members(self, channel: str = None, uuids_to_set: List[str] = def set_memberships(self, uuid: str = None, channel_memberships: List[str] = None, include_custom: bool = False, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MembershipIncludes = None) -> SetMemberships: + """Set channel memberships for a UUID. + + This method allows you to set the channels that a UUID is a member of, + replacing any existing memberships. + + Args: + uuid (str, optional): The UUID to set memberships for. + channel_memberships (List[str], optional): List of channels to set as memberships. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MembershipIncludes, optional): Additional fields to include in response. + + Returns: + SetMemberships: A SetMemberships object that can be used to execute the request. + + Example: + ```python + pubnub.set_memberships()\ + .uuid("user-1")\ + .channel_memberships(["room-1", "room-2"])\ + .sync() + ``` + """ return SetMemberships(self, uuid=uuid, channel_memberships=channel_memberships, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, include=include) @@ -398,6 +1197,33 @@ def set_memberships(self, uuid: str = None, channel_memberships: List[str] = Non def get_memberships(self, uuid: str = None, include_custom: bool = False, limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MembershipIncludes = None): + """Get channel memberships for a UUID. + + Retrieves a list of channels that a UUID is a member of. + + Args: + uuid (str, optional): The UUID to get memberships for. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MembershipIncludes, optional): Additional fields to include in response. + + Returns: + GetMemberships: A GetMemberships object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_memberships()\ + .uuid("user-1")\ + .include_custom(True)\ + .sync() + for membership in result.data: + print(f"Channel: {membership['channel']}") + ``` + """ return GetMemberships(self, uuid=uuid, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, include=include) @@ -406,29 +1232,130 @@ def manage_memberships(self, uuid: str = None, channel_memberships_to_set: List[ limit: int = None, filter: str = None, include_total_count: bool = None, sort_keys: list = None, page: PNPage = None, include: MembershipIncludes = None ) -> ManageMemberships: + """Manage channel memberships for a UUID by adding and/or removing channels. + + This method allows you to add new channel memberships and remove existing ones + for a UUID in a single operation. + + Args: + uuid (str, optional): The UUID to manage memberships for. + channel_memberships_to_set (List[str], optional): List of channels to add as memberships. + channel_memberships_to_remove (List[str], optional): List of channels to remove from memberships. + include_custom (bool, optional): Whether to include custom fields in response. + limit (int, optional): Maximum number of results to return. + filter (str, optional): Expression to filter results. + include_total_count (bool, optional): Whether to include total count in response. + sort_keys (list, optional): Keys to sort results by. + page (PNPage, optional): Pagination parameters. + include (MembershipIncludes, optional): Additional fields to include in response. + + Returns: + ManageMemberships: A ManageMemberships object that can be used to execute the request. + + Example: + ```python + pubnub.manage_memberships()\ + .uuid("user-1")\ + .channel_memberships_to_set(["room-1", "room-2"])\ + .channel_memberships_to_remove(["room-3"])\ + .sync() + ``` + """ return ManageMemberships(self, uuid=uuid, channel_memberships_to_set=channel_memberships_to_set, channel_memberships_to_remove=channel_memberships_to_remove, include_custom=include_custom, limit=limit, filter=filter, include_total_count=include_total_count, sort_keys=sort_keys, page=page, include=include) - def fetch_messages(self, channels: Union[str, List[str]] = None, start: int = None, end: int = None, - count: int = None, include_meta: bool = None, include_message_actions: bool = None, - include_message_type: bool = None, include_uuid: bool = None, + def fetch_messages(self, channels: Union[str, List[str]] = None, start: Optional[int] = None, + end: Optional[int] = None, count: Optional[int] = None, + include_meta: Optional[bool] = None, include_message_actions: Optional[bool] = None, + include_message_type: Optional[bool] = None, include_uuid: Optional[bool] = None, decrypt_messages: bool = False) -> FetchMessages: return FetchMessages(self, channels=channels, start=start, end=end, count=count, include_meta=include_meta, include_message_actions=include_message_actions, include_message_type=include_message_type, include_uuid=include_uuid, decrypt_messages=decrypt_messages) - def add_message_action(self, channel: str = None, message_action: PNMessageAction = None): + def add_message_action(self, channel: str = None, message_action: PNMessageAction = None) -> AddMessageAction: + """Add an action to a message. + + Adds metadata like reactions, replies, or custom actions to an existing message. + + Args: + channel (str, optional): The channel containing the message. + message_action (PNMessageAction, optional): The action to add to the message. + Should include type, value, and message timetoken. + + Returns: + AddMessageAction: An AddMessageAction object that can be used to execute the request. + + Example: + ```python + from pubnub.models.consumer.message_actions import PNMessageAction + + action = PNMessageAction( + type="reaction", + value="👍", + message_timetoken="1234567890" + ) + pubnub.add_message_action( + channel="chat", + message_action=action + ).sync() + ``` + """ return AddMessageAction(self, channel=channel, message_action=message_action) - def get_message_actions(self, channel: str = None, start: str = None, end: str = None, - limit: str = None) -> GetMessageActions: + def get_message_actions(self, channel: str = None, start: Optional[str] = None, + end: Optional[str] = None, limit: Optional[str] = None) -> GetMessageActions: + """Retrieve message actions for a channel. + + Gets a list of actions that have been added to messages in the specified channel. + + Args: + channel (str, optional): The channel to get message actions from. + start (str, optional): Start timetoken for the action search. + end (str, optional): End timetoken for the action search. + limit (str, optional): Maximum number of actions to return. + + Returns: + GetMessageActions: A GetMessageActions object that can be used to execute the request. + + Example: + ```python + result = pubnub.get_message_actions( + channel="chat", + limit="10" + ).sync() + for action in result.actions: + print(f"Action: {action.type} - {action.value}") + ``` + """ return GetMessageActions(self, channel=channel, start=start, end=end, limit=limit) - def remove_message_action(self, channel: str = None, message_timetoken: int = None, - action_timetoken: int = None) -> RemoveMessageAction: + def remove_message_action(self, channel: str = None, message_timetoken: Optional[int] = None, + action_timetoken: Optional[int] = None) -> RemoveMessageAction: + """Remove an action from a message. + + Deletes a specific action that was previously added to a message. + + Args: + channel (str, optional): The channel containing the message. + message_timetoken (int, optional): Timetoken of the original message. + action_timetoken (int, optional): Timetoken of the action to remove. + + Returns: + RemoveMessageAction: A RemoveMessageAction object that can be used to execute the request. + + Example: + ```python + pubnub.remove_message_action( + channel="chat", + message_timetoken=1234567890, + action_timetoken=1234567891 + ).sync() + ``` + """ return RemoveMessageAction(self, channel=channel, message_timetoken=message_timetoken, action_timetoken=action_timetoken) @@ -437,18 +1364,96 @@ def time(self) -> Time: def delete_messages(self, channel: str = None, start: Optional[int] = None, end: Optional[int] = None) -> HistoryDelete: + """Delete messages from a channel's history. + + Permanently removes messages from a channel within the specified timeframe. + + Args: + channel (str, optional): The channel to delete messages from. + start (int, optional): Start timetoken for deletion range. + end (int, optional): End timetoken for deletion range. + + Returns: + HistoryDelete: A HistoryDelete object that can be used to execute the request. + + Example: + ```python + pubnub.delete_messages( + channel="chat", + start=15790288836087530, + end=15790288836087540 + ).sync() + ``` + + Warning: + This operation is permanent and cannot be undone. Use with caution. + """ return HistoryDelete(self, channel=channel, start=start, end=end) - def parse_token(self, token): + def parse_token(self, token: str) -> Any: + """Parse an access token to examine its contents. + + Args: + token (str): The token string to parse. + + Returns: + Any: The parsed token data structure. + + Example: + ```python + token_data = pubnub.parse_token("my-token-string") + print(f"Token permissions: {token_data.permissions}") + ``` + """ return self._token_manager.parse_token(token) - def set_token(self, token): + def set_token(self, token: str) -> None: + """Set the access token for this PubNub instance. + + Args: + token (str): The token string to use for authentication. + + Note: + This token will be used for all subsequent requests that + require authentication. + """ self._token_manager.set_token(token) - def _get_token(self): + def _get_token(self) -> Optional[str]: + """Get the current access token. + + Returns: + Optional[str]: The current token string, or None if not set. + + Note: + This is an internal method used by the SDK for authentication. + """ return self._token_manager.get_token() - def send_file(self): + def send_file(self) -> Union['SendFileNative', 'AsyncioSendFile']: + """Send a file through PubNub's file upload service. + + The method automatically selects the appropriate implementation based on + the SDK platform (synchronous or asynchronous). + + Returns: + Union[SendFileNative, AsyncioSendFile]: A file sender object that can + be used to configure and execute the file upload. + + Raises: + NotImplementedError: If the SDK platform is not supported. + + Example: + ```python + with open("image.jpg", "rb") as file: + pubnub.send_file() \ + .channel("room-1") \ + .file_name("image.jpg") \ + .file_object(file) \ + .message("My dog is a good boy") \ + .sync() + ``` + """ if not self.sdk_platform(): return SendFileNative(self) elif "Asyncio" in self.sdk_platform(): @@ -457,7 +1462,28 @@ def send_file(self): else: raise NotImplementedError - def download_file(self): + def download_file(self) -> Union['DownloadFileNative', 'DownloadFileAsyncio']: + """Download a file from PubNub's file storage service. + + The method automatically selects the appropriate implementation based on + the SDK platform (synchronous or asynchronous). + + Returns: + Union[DownloadFileNative, DownloadFileAsyncio]: A file downloader object + that can be used to configure and execute the file download. + + Raises: + NotImplementedError: If the SDK platform is not supported. + + Example: + ```python + pubnub.download_file()\ + .channel("room-1")\ + .file_id("abc123") \ + .file_name("image.jpg") \ + .sync() + ``` + """ if not self.sdk_platform(): return DownloadFileNative(self) elif "Asyncio" in self.sdk_platform(): @@ -467,12 +1493,73 @@ def download_file(self): raise NotImplementedError def list_files(self, channel: str = None, *, limit: int = None, next: str = None) -> ListFiles: + """List files stored in a channel. + + Retrieves metadata about files that have been uploaded to a specific channel. + + Args: + channel (str, optional): The channel to list files from. + limit (int, optional): The maximum number of files to return. + next (str, optional): The pagination token for the next page of results. + + Returns: + ListFiles: A ListFiles object that can be used to execute the request. + + Example: + ```python + result = pubnub.list_files(channel="room-1", limit=10, next="next_token").sync() + for file in result.data: + print(f"File: {file.name}, Size: {file.size}") + ``` + """ return ListFiles(self, channel=channel, limit=limit, next=next) def get_file_url(self, channel: str = None, file_name: str = None, file_id: str = None) -> GetFileDownloadUrl: + """Get the download URL for a specific file. + + Generates a temporary URL that can be used to download a file. + + Args: + channel (str, optional): The channel where the file is stored. + file_name (str, optional): The name of the file. + file_id (str, optional): The unique identifier of the file. + + Returns: + GetFileDownloadUrl: A GetFileDownloadUrl object that can be used to execute the request. + + Example: + ```python + url = pubnub.get_file_url( + channel="room-1", + file_id="abc123", + file_name="image.jpg" + ).sync() + ``` + """ return GetFileDownloadUrl(self, channel=channel, file_name=file_name, file_id=file_id) def delete_file(self, channel: str = None, file_name: str = None, file_id: str = None) -> DeleteFile: + """Delete a file from PubNub's file storage. + + Permanently removes a file from the specified channel. + + Args: + channel (str, optional): The channel where the file is stored. + file_name (str, optional): The name of the file to delete. + file_id (str, optional): The unique identifier of the file to delete. + + Returns: + DeleteFile: A DeleteFile object that can be used to execute the request. + + Example: + ```python + pubnub.delete_file( + channel="room-1", + file_id="abc123", + file_name="image.jpg" + ).sync() + ``` + """ return DeleteFile(self, channel=channel, file_name=file_name, file_id=file_id) def _fetch_file_upload_s3_data(self) -> FetchFileUploadS3Data: @@ -481,19 +1568,28 @@ def _fetch_file_upload_s3_data(self) -> FetchFileUploadS3Data: def publish_file_message(self) -> PublishFileMessage: return PublishFileMessage(self) - def decrypt(self, cipher_key, file): + def decrypt(self, cipher_key: str, file: Any) -> Any: warn('Deprecated: Usage of decrypt with cipher key will be removed. Use PubNub.crypto.decrypt instead') return self.config.file_crypto.decrypt(cipher_key, file) - def encrypt(self, cipher_key, file): + def encrypt(self, cipher_key: str, file: Any) -> Any: warn('Deprecated: Usage of encrypt with cipher key will be removed. Use PubNub.crypto.encrypt instead') return self.config.file_crypto.encrypt(cipher_key, file) @staticmethod - def timestamp(): + def timestamp() -> int: + """Get the current timestamp. + + Returns: + int: Current Unix timestamp in seconds. + + Note: + This method is used internally for generating request timestamps + and can be used for custom timing needs. + """ return int(time.time()) - def _validate_subscribe_manager_enabled(self): + def _validate_subscribe_manager_enabled(self) -> None: if self._subscription_manager is None: raise Exception("Subscription manager is not enabled for this instance") @@ -787,16 +1883,16 @@ def fetch_memberships(self, user_id: str = None, space_id: str = None, limit=Non return memberships.sync() return memberships - def channel(self, channel) -> PubNubChannel: + def channel(self, channel: str) -> PubNubChannel: return PubNubChannel(self, channel) - def channel_group(self, channel_group) -> PubNubChannelGroup: + def channel_group(self, channel_group: str) -> PubNubChannelGroup: return PubNubChannelGroup(self, channel_group) - def channel_metadata(self, channel) -> PubNubChannelMetadata: + def channel_metadata(self, channel: str) -> PubNubChannelMetadata: return PubNubChannelMetadata(self, channel) - def user_metadata(self, user_id) -> PubNubUserMetadata: + def user_metadata(self, user_id: str) -> PubNubUserMetadata: return PubNubUserMetadata(self, user_id) def subscription_set(self, subscriptions: list) -> PubNubSubscriptionSet: diff --git a/requirements-dev.txt b/requirements-dev.txt index 5e2bb1ec..326ccabb 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,7 +3,7 @@ pytest-cov>=6.0.0 pycryptodomex>=3.21.0 flake8>=7.1.2 pytest>=8.3.5 -pytest-asyncio>=0.24.0 +pytest-asyncio>=0.24.0,<1.0.0 httpx>=0.28 h2>=4.1 requests>=2.32.2