diff --git a/ldclient/sse_client.py b/ldclient/sse_client.py index 3534a846..d83982fa 100644 --- a/ldclient/sse_client.py +++ b/ldclient/sse_client.py @@ -16,10 +16,12 @@ class SSEClient(object): - def __init__(self, url, last_id=None, retry=3000, session=None, **kwargs): + def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, session=None, **kwargs): self.url = url self.last_id = last_id self.retry = retry + self._connect_timeout = connect_timeout + self._read_timeout = read_timeout # Optional support for passing in a requests.Session() self.session = session @@ -46,7 +48,12 @@ def _connect(self): # Use session if set. Otherwise fall back to requests module. requester = self.session or requests - self.resp = requester.get(self.url, stream=True, **self.requests_kwargs) + self.resp = requester.get( + self.url, + stream=True, + timeout=(self._connect_timeout, self._read_timeout), + **self.requests_kwargs) + self.resp_file = self.resp.raw # TODO: Ensure we're handling redirects. Might also stick the 'origin' diff --git a/ldclient/streaming.py b/ldclient/streaming.py index a2fa6c93..f6ce61c2 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -4,11 +4,15 @@ from threading import Thread import backoff -import requests + from ldclient.interfaces import UpdateProcessor from ldclient.sse_client import SSEClient from ldclient.util import _stream_headers, log +# allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the +# stream will keep this from triggering +stream_read_timeout = 5 * 60 + class StreamingUpdateProcessor(Thread, UpdateProcessor): def __init__(self, config, requester, store, ready): @@ -30,9 +34,14 @@ def run(self): def _backoff_expo(): return backoff.expo(max_value=30) - @backoff.on_exception(_backoff_expo, requests.exceptions.RequestException, max_tries=None, jitter=backoff.full_jitter) + @backoff.on_exception(_backoff_expo, BaseException, max_tries=None, jitter=backoff.full_jitter) def _connect(self): - messages = SSEClient(self._uri, verify=self._config.verify_ssl, headers=_stream_headers(self._config.sdk_key)) + messages = SSEClient( + self._uri, + verify=self._config.verify_ssl, + headers=_stream_headers(self._config.sdk_key), + connect_timeout=self._config.connect_timeout, + read_timeout=stream_read_timeout) for msg in messages: if not self._running: break