Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
max-parallel: 1
fail-fast: true
matrix:
python: [3.9.21, 3.10.16, 3.11.11, 3.12.9, 3.13.2]
python: ["3.9", "3.10", "3.11", "3.12", "3.13"]
timeout-minutes: 5
steps:
- name: Checkout repository
Expand Down
41 changes: 37 additions & 4 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: python
version: 10.6.1
version: 10.6.2
schema: 1
scm: github.com/pubnub/python
sdks:
Expand All @@ -18,7 +18,7 @@ sdks:
distributions:
- distribution-type: library
distribution-repository: package
package-name: pubnub-10.6.1
package-name: pubnub-10.6.2
location: https://pypi.org/project/pubnub/
supported-platforms:
supported-operating-systems:
Expand Down Expand Up @@ -94,8 +94,8 @@ sdks:
-
distribution-type: library
distribution-repository: git release
package-name: pubnub-10.6.1
location: https://github.com/pubnub/python/releases/download/10.6.1/pubnub-10.6.1.tar.gz
package-name: pubnub-10.6.2
location: https://github.com/pubnub/python/releases/download/10.6.2/pubnub-10.6.2.tar.gz
supported-platforms:
supported-operating-systems:
Linux:
Expand Down Expand Up @@ -169,6 +169,39 @@ sdks:
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
is-required: Required
changelog:
- date: 2026-03-26
version: 10.6.2
changes:
- type: bug
text: "Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`."
- type: bug
text: "Handle cases where status or `error_data` is `None` instead of raising `AttributeError`."
- type: bug
text: "Match `PubNubAsyncioException` which is what `request_future` actually returns on failure."
- type: bug
text: "Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up."
- type: bug
text: "Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison."
- type: bug
text: "Prevent falling through to start a heartbeat after deciding to give up."
- type: bug
text: "Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset."
- type: bug
text: "On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake."
- type: bug
text: "Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks."
- type: bug
text: "Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests."
- type: improvement
text: "Cover both `asyncio` and threads paths simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation."
- type: improvement
text: "Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit."
- type: improvement
text: "Enable presence heartbeat and use unique channel names so presence registers on the server."
- type: improvement
text: "Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`."
- type: improvement
text: "Avoid collisions with stale data from prior test runs."
- date: 2026-02-10
version: 10.6.1
changes:
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
## 10.6.2
March 26 2026

#### Fixed
- Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`.
- Handle cases where status or `error_data` is `None` instead of raising `AttributeError`.
- Match `PubNubAsyncioException` which is what `request_future` actually returns on failure.
- Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up.
- Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison.
- Prevent falling through to start a heartbeat after deciding to give up.
- Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset.
- On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake.
- Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks.
- Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests.

#### Modified
- Cover both `asyncio` and threads paths simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation.
- Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit.
- Enable presence heartbeat and use unique channel names so presence registers on the server.
- Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`.
- Avoid collisions with stale data from prior test runs.

## 10.6.1
February 10 2026

Expand Down
65 changes: 45 additions & 20 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
from pubnub.exceptions import PubNubAsyncioException, PubNubException
from pubnub.features import feature_enabled
from pubnub.models.server.subscribe import SubscribeMessage
from pubnub.pubnub import PubNub
Expand Down Expand Up @@ -80,9 +80,10 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0
request.timetoken(0)
response = await request.future()

if isinstance(response, Exception):
if isinstance(response, PubNubAsyncioException):
self.logger.warning(f'Handshake failed: {str(response)}')
handshake_failure = events.HandshakeFailureEvent(response, 1, timetoken=timetoken)
reason = response.status.error_data if response.status and response.status.error_data else str(response)
handshake_failure = events.HandshakeFailureEvent(reason, 1, timetoken=timetoken)
self.event_engine.trigger(handshake_failure)
elif response.status.error:
self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}')
Expand Down Expand Up @@ -184,8 +185,15 @@ def calculate_reconnection_delay(self, attempts):

return delay

def _should_give_up(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.NONE:
return True
if self.max_retry_attempts == -1:
return False
return attempts > self.max_retry_attempts

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
if self._should_give_up(self.invocation.attempts):
self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts)
else:
attempts = self.invocation.attempts
Expand Down Expand Up @@ -214,9 +222,10 @@ async def delayed_reconnect_async(self, delay, attempt):

response = await request.future()

if isinstance(response, PubNubException):
if isinstance(response, PubNubAsyncioException):
self.logger.warning(f'Reconnect failed: {str(response)}')
self.failure(str(response), attempt, self.get_timetoken())
reason = response.status.error_data if response.status and response.status.error_data else str(response)
self.failure(reason, attempt, self.get_timetoken())

elif response.status.error:
self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}')
Expand Down Expand Up @@ -302,10 +311,11 @@ async def heartbeat(self, channels, groups, stop_event):

response = await request.future()

if isinstance(response, PubNubException):
if isinstance(response, PubNubAsyncioException):
self.logger.warning(f'Heartbeat failed: {str(response)}')
reason = response.status.error_data if response.status and response.status.error_data else str(response)
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data, attempt=1))
reason=reason, attempt=1))
elif response.status and response.status.error:
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
Expand Down Expand Up @@ -345,18 +355,36 @@ async def leave(self, channels, groups, stop_event):
leave_request = Leave(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
leave = await leave_request.future()

if leave.status.error:
self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}')
if isinstance(leave, PubNubAsyncioException):
self.logger.warning(f'Leave failed: {str(leave)}')
elif leave.status and leave.status.error:
self.logger.warning(f'Leave failed: {leave.status.error_data.__dict__}')


class HeartbeatDelayedEffect(Effect):
def __init__(self, pubnub_instance, event_engine_instance,
invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None:
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
elif self.reconnection_policy is PNReconnectionPolicy.LINEAR:
self.max_retry_attempts = LinearDelay.MAX_RETRIES
else:
self.max_retry_attempts = 0

if pubnub_instance.config.maximum_reconnection_retries is not None:
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries

def _should_give_up(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.NONE:
return True
if self.max_retry_attempts == -1:
return False
return attempts > self.max_retry_attempts

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
Expand All @@ -368,23 +396,19 @@ def calculate_reconnection_delay(self, attempts):
return delay

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
if self._should_give_up(self.invocation.attempts):
self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels,
groups=self.invocation.groups,
reason=self.invocation.reason,
attempt=self.invocation.attempts))
return

if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
self.run_async(self.heartbeat(channels=self.invocation.channels, groups=self.invocation.groups,
attempt=self.invocation.attempts, stop_event=self.stop_event))

async def heartbeat(self, channels, groups, attempt, stop_event):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels,
groups=self.invocation.groups,
reason=self.invocation.reason,
attempt=self.invocation.attempts))

channels = list(filter(lambda ch: not ch.endswith('-pnpres'), self.invocation.channels))
groups = list(filter(lambda gr: not gr.endswith('-pnpres'), self.invocation.groups))
Expand All @@ -395,12 +419,13 @@ async def heartbeat(self, channels, groups, attempt, stop_event):
await asyncio.sleep(delay)

response = await request.future()
if isinstance(response, PubNubException):
if isinstance(response, PubNubAsyncioException):
self.logger.warning(f'Heartbeat failed: {str(response)}')
reason = response.status.error_data if response.status and response.status.error_data else str(response)
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data,
reason=reason,
attempt=attempt))
elif response.status.error:
elif response.status and response.status.error:
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data,
Expand Down
4 changes: 3 additions & 1 deletion pubnub/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def __init__(self, result, status):
self.status = status

def __str__(self):
return str(self.status.error_data.exception)
if self.status and hasattr(self.status, 'error_data') and self.status.error_data:
return str(self.status.error_data.exception)
return f"PubNubAsyncioException(result={self.result}, status={self.status})"

@staticmethod
def is_error():
Expand Down
12 changes: 8 additions & 4 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,14 @@ def stop(self):
Raises:
Exception: If subscription manager is not enabled
"""
if self._subscription_manager is not None:
self._subscription_manager.stop()
else:
raise Exception("Subscription manager is not enabled for this instance")
try:
if self._subscription_manager is not None:
self._subscription_manager.stop()
else:
raise Exception("Subscription manager is not enabled for this instance")
finally:
if hasattr(self._request_handler, 'close'):
self._request_handler.close()

def request_deferred(self, options_func):
raise NotImplementedError
Expand Down
26 changes: 24 additions & 2 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ async def main():
from pubnub.event_engine.models import events, states

from pubnub.models.consumer.common import PNStatus
from pubnub.models.consumer.pn_error_data import PNErrorData
from pubnub.dtos import SubscribeOperation, UnsubscribeOperation
from pubnub.event_engine.statemachine import StateMachine
from pubnub.endpoints.presence.heartbeat import Heartbeat
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.pubnub_core import PubNubCore
from pubnub.request_handlers.base import BaseRequestHandler
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler, WallClockTimeoutError
from pubnub.workers import SubscribeMessageWorker
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
from pubnub import utils
Expand Down Expand Up @@ -234,9 +235,30 @@ async def request_future(self, options_func, cancellation_event):
res = await self._request_handler.async_request(options_func, cancellation_event)
return res
except PubNubException as e:
if e.status is not None:
status = e.status
else:
status = PNStatus()
status.category = PNStatusCategory.PNBadRequestCategory
status.error = True
status.error_data = PNErrorData(str(e), e)
status.status_code = e._status_code if e._status_code != 0 else None
return PubNubAsyncioException(
result=None,
status=status
)
except WallClockTimeoutError:
return PubNubAsyncioException(
result=None,
status=e.status
status=options_func().create_status(
PNStatusCategory.PNTimeoutCategory,
None,
None,
exception=PubNubException(
pn_error=PNERR_CLIENT_TIMEOUT,
errormsg="Wall-clock deadline exceeded (system sleep detected)"
)
)
)
except asyncio.TimeoutError:
return PubNubAsyncioException(
Expand Down
Loading
Loading