Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .codacy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
exclude_paths:
- "tests/**"
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ PubNubTwisted.ipynb
# GitHub Actions #
##################
.github/.release

venv/
reports/
4 changes: 2 additions & 2 deletions examples/native_sync/message_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def main() -> None:
print(f"Fetched message with reactions: {messages[0].__dict__}")
assert len(messages) == 1, "Message not found in history"
assert hasattr(messages[0], 'actions'), "Message actions not included in response"
assert len(messages[0].actions) == 2, "Unexpected number of actions in history"
assert len(messages[0].actions) >= 2, "Unexpected number of actions in history"

# Step 4: Retrieve all reactions for the message
# We use a time window around the message timetoken to fetch reactions
Expand All @@ -198,7 +198,7 @@ def main() -> None:
end_timetoken = str(int(message_timetoken) + 1000)
reactions = get_reactions(pubnub, channel, start_timetoken, end_timetoken, "100")
print(f"Reactions found: {len(reactions.actions)}")
assert len(reactions.actions) == 2, "Unexpected number of reactions"
assert len(reactions.actions) >= 2, "Unexpected number of reactions"

# Step 5: Display and remove each reaction
for reaction in reactions.actions:
Expand Down
8 changes: 6 additions & 2 deletions pubnub/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ def get_status_code(self):
return self._status_code

def get_error_message(self):
result = ''
try:
error = loads(self._errormsg)
return error.get('error')
result = error.get('error')
except JSONDecodeError:
return self._errormsg
result = self._errormsg
if not result and self._pn_error:
result = self._pn_error
return result


class PubNubAsyncioException(Exception):
Expand Down
20 changes: 18 additions & 2 deletions pubnub/request_handlers/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,15 @@ def _build_envelope(self, p_options, e_options):
if res.text is None:
text = "N/A"
else:
text = res.text
# Safely access response text - handle streaming responses
try:
text = res.text
except httpx.ResponseNotRead:
# For streaming responses, we need to read first
text = res.content.decode('utf-8', errors='ignore')
except Exception:
# Fallback in case of any response reading issues
text = f"Response content unavailable (status: {res.status_code})"

if res.status_code >= 500:
err = PNERR_SERVER_ERROR
Expand Down Expand Up @@ -259,7 +267,15 @@ def _invoke_request(self, p_options, e_options, base_origin):

try:
res = self.session.request(**args)
logger.debug("GOT %s" % res.text)
# Safely access response text - read content first for streaming responses
try:
logger.debug("GOT %s" % res.text)
except httpx.ResponseNotRead:
# For streaming responses, we need to read first
logger.debug("GOT %s" % res.content.decode('utf-8', errors='ignore'))
except Exception as e:
# Fallback logging in case of any response reading issues
logger.debug("GOT response (content access failed: %s)" % str(e))

except httpx.ConnectError as e:
raise PubNubException(
Expand Down
172 changes: 172 additions & 0 deletions tests/integrational/native_threads/test_retry_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import logging
import unittest
import time
import pubnub as pn

from unittest.mock import patch
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory
from pubnub.exceptions import PubNubException
from pubnub.managers import LinearDelay, ExponentialDelay
from pubnub.pubnub import PubNub, SubscribeListener

from tests.helper import pnconf_env_copy


pn.set_stream_logger('pubnub', logging.DEBUG)


class DisconnectListener(SubscribeListener):
status_result = None
disconnected = False

def status(self, pubnub, status):
if status.category == PNStatusCategory.PNDisconnectedCategory:
print('Could not connect. Exiting...')
self.disconnected = True

def message(self, pubnub, message):
print(f'Message:\n{message.__dict__}')

def presence(self, pubnub, presence):
print(f'Presence:\n{presence.__dict__}')


class TestPubNubRetryPolicies(unittest.TestCase):
def test_subscribe_retry_policy_none(self):
ch = "test-subscribe-retry-policy-none"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
reconnect_policy=PNReconnectionPolicy.NONE, enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

def test_subscribe_retry_policy_linear(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-linear"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
reconnect_policy=PNReconnectionPolicy.LINEAR,
enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1

def test_subscribe_retry_policy_exponential(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-exponential"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
reconnect_policy=PNReconnectionPolicy.EXPONENTIAL,
enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1

def test_subscribe_retry_policy_linear_with_max_retries(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-linear"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
maximum_reconnection_retries=3,
reconnect_policy=PNReconnectionPolicy.LINEAR,
enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == 3

def test_subscribe_retry_policy_exponential_with_max_retries(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-exponential"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
maximum_reconnection_retries=3,
reconnect_policy=PNReconnectionPolicy.EXPONENTIAL,
enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == 3

def test_subscribe_retry_policy_linear_with_custom_interval(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-linear"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
maximum_reconnection_retries=3, reconnection_interval=1,
reconnect_policy=PNReconnectionPolicy.LINEAR,
enable_presence_heartbeat=True))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == 0
Loading
Loading