Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions ldclient/sse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@


class SSEClient(object):
def __init__(self, url, last_id=None, retry=3000, session=None, **kwargs):
def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, session=None, **kwargs):
self.url = url
self.last_id = last_id
self.retry = retry
self._connect_timeout = connect_timeout
self._read_timeout = read_timeout

# Optional support for passing in a requests.Session()
self.session = session
Expand All @@ -46,7 +48,12 @@ def _connect(self):

# Use session if set. Otherwise fall back to requests module.
requester = self.session or requests
self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
self.resp = requester.get(
self.url,
stream=True,
timeout=(self._connect_timeout, self._read_timeout),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the read_timeout here means that the request will throw a timeout error if no data is received for 5 minutes.. thus implementing heartbeat detection.

**self.requests_kwargs)

self.resp_file = self.resp.raw

# TODO: Ensure we're handling redirects. Might also stick the 'origin'
Expand Down
15 changes: 12 additions & 3 deletions ldclient/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
from threading import Thread

import backoff
import requests

from ldclient.interfaces import UpdateProcessor
from ldclient.sse_client import SSEClient
from ldclient.util import _stream_headers, log

# allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the
# stream will keep this from triggering
stream_read_timeout = 5 * 60


class StreamingUpdateProcessor(Thread, UpdateProcessor):
def __init__(self, config, requester, store, ready):
Expand All @@ -30,9 +34,14 @@ def run(self):
def _backoff_expo():
return backoff.expo(max_value=30)

@backoff.on_exception(_backoff_expo, requests.exceptions.RequestException, max_tries=None, jitter=backoff.full_jitter)
@backoff.on_exception(_backoff_expo, BaseException, max_tries=None, jitter=backoff.full_jitter)
def _connect(self):
messages = SSEClient(self._uri, verify=self._config.verify_ssl, headers=_stream_headers(self._config.sdk_key))
messages = SSEClient(
self._uri,
verify=self._config.verify_ssl,
headers=_stream_headers(self._config.sdk_key),
connect_timeout=self._config.connect_timeout,
read_timeout=stream_read_timeout)
for msg in messages:
if not self._running:
break
Expand Down