Skip to content

Commit

Permalink
Add custom re-subscribe method
Browse files Browse the repository at this point in the history
  • Loading branch information
cgabard committed Jun 13, 2018
1 parent 1fc1a20 commit ad7a392
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 160 deletions.
3 changes: 2 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ You can install development dependencies with:
Release history
---------------

- **v. 0.3.0**: Remove date-time parsing of the replay mixin.
- **v. 0.3.0**: Remove date-time parsing of the replay mixin, Allow client to
customize ``ReSubscribeMixin`` retry conditions.
- **v. 0.2.0**: Add refresh token authentication
- **v. 0.1.1**: Add documentation and initial typing information.
- **v. 0.1.0**: Initial release.
Expand Down
71 changes: 48 additions & 23 deletions aio_sf_streaming/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Mixins module: Provide various mixins modules
"""
import asyncio
import datetime
import enum
import logging
from typing import Union
Expand Down Expand Up @@ -95,14 +94,10 @@ async def messages(self) -> JSONObject:

# Create a task : do not wait the replay id is stored to
# reconnect as soon as possible
self.loop.create_task(
self.store_replay_id(channel, replay_id, creation_time)
)
self.loop.create_task(self.store_replay_id(channel, replay_id, creation_time))
yield message

async def store_replay_id(
self, channel: str, replay_id: int, creation_time: str
) -> None:
async def store_replay_id(self, channel: str, replay_id: int, creation_time: str) -> None:
"""
Callback called to store a replay id. You should override this method
to implement your custom logic.
Expand Down Expand Up @@ -175,10 +170,7 @@ async def messages(self) -> JSONObject:
channel = message["channel"]

# If asked, perform a new handshake
if (
channel.startswith("/meta/")
and message.get("error") == "403::Unknown client"
):
if channel.startswith("/meta/") and message.get("error") == "403::Unknown client":
logger.info("Disconnected, do new handshake")
await self.handshake()
continue
Expand Down Expand Up @@ -224,25 +216,58 @@ def __init__(self, retry_sub_duration: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.retry_sub_duration = retry_sub_duration

async def should_retry_on_exception(self, channel: str, exception: Exception) -> bool:
"""
Callback called to process an exception raised during subscription.
Return a boolean if we must retry. If ``False`` is returned, the exception will be
propagated to caller.
By-default, do return always ``False``.
:param channel: Channel name
:param exception: The exception raised
"""
return False

async def should_retry_on_error_response(self, channel: str, response: JSONObject) -> bool:
"""
Callback called to process a response with and error message.
Return a boolean if we must retry. If ``False`` is returned, the response will be
returned to caller.
By-default, retry on known 'server unavailable' response.
:param channel: Channel name
:param response: The response received
"""
return (
response[0]
.get("ext", {})
.get("sfdc", {})
.get("failureReason", "")
.startswith("SERVER_UNAVAILABLE")
)

async def subscribe(self, channel: str) -> JSONList:
"""
See :py:func:`BaseSalesforceStreaming.subscribe`
"""
while True:
response = await super().subscribe(channel)

if not response or response[0]["successful"]:
try:
response = await super().subscribe(channel)
except Exception as e:
should_retry = await self.should_retry_on_exception(channel, e)
if not should_retry:
raise
else:
if response and response[0]["successful"]:
return response
else:
should_retry = await self.should_retry_on_error_response(channel, response)

if not should_retry:
return response

# If not the known error, return
if not (
response[0]
.get("ext", {})
.get("sfdc", {})
.get("failureReason", "")
.startswith("SERVER_UNAVAILABLE")
):
return response
await asyncio.sleep(self.retry_sub_duration)


Expand Down
8 changes: 2 additions & 6 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ The package is separated in 5 mains modules:
- To authenticate on Salesforce, you must use one connector that add
authentication capability to :py:class:`BaseSalesforceStreaming`. See
:ref:`connectors` section for a list of available connectors.
- :ref:`mixins` extend :py:class:`BaseSalesforceStreaming` capabilities and
- Finally, :ref:`mixins` extend :py:class:`BaseSalesforceStreaming` capabilities and
can be added easily as opt-in option by sub classing.
- Finally, :ref:`utils` provide some internal utilities functions.


.. _main_interface:

Expand Down Expand Up @@ -132,7 +130,5 @@ Mixins
.. autoclass:: AutoReconnectMixin

.. autoclass:: ReSubscribeMixin


.. _utils:
:members: should_retry_on_exception, should_retry_on_error_response

3 changes: 2 additions & 1 deletion docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Release notes
=============

v. 0.3.0
Remove date-time parsing of the replay mixin.
- Remove date-time parsing of the replay mixin.
- Allow client to customize ``ReSubscribeMixin`` retry conditions.
v. 0.2.0
Minor release : Add refresh token authentication
v. 0.1.1
Expand Down

0 comments on commit ad7a392

Please sign in to comment.