diff --git a/app/core/transport.py b/app/core/transport.py index 32e06dd..ad2889e 100644 --- a/app/core/transport.py +++ b/app/core/transport.py @@ -23,7 +23,14 @@ class Transport(Disposable, metaclass=ABCMeta): - """Represents the flow of data between an IDR Server and this app.""" + """ + Represents the flow of data between an IDR Server and this app. + + .. note:: + It is highly recommended that implementors of this interface make their + implementations thread safe as a transport is likely to be used in a + concurrent context. + """ @abstractmethod def fetch_data_source_extracts( diff --git a/app/lib/transports/http/http_transport.py b/app/lib/transports/http/http_transport.py index 5927f7f..5d26d58 100644 --- a/app/lib/transports/http/http_transport.py +++ b/app/lib/transports/http/http_transport.py @@ -1,4 +1,5 @@ import logging +from threading import RLock from typing import Any, Mapping, Optional, Sequence from requests.auth import AuthBase @@ -53,6 +54,9 @@ class HTTPTransport(Transport): An implementation of the :class:`transport ` interface that uses the HTTP protocol for data transmission between an IDR Server and IDR client. + + .. note:: + This implementation is thread safe. """ def __init__( @@ -87,6 +91,9 @@ def __init__( } ) self._session.hooks["response"].append(_log_response) + self._lock: RLock = RLock() + # Mutable state. This properties should be guarded under a lock when + # being modified. self._auth: AuthBase = _NoAuth() self._is_closed: bool = False @@ -96,8 +103,10 @@ def is_disposed(self) -> bool: def dispose(self) -> None: _LOGGER.debug("Closing transport") - self._is_closed = True - self._session.close() + if not self._is_closed: + with self._lock: + self._is_closed = True + self._session.close() # FETCH DATA SOURCE EXTRACTS # ------------------------------------------------------------------------- @@ -263,15 +272,16 @@ def _make_request(self, request: HTTPRequestParams) -> Response: # if the status is among the re-authentication trigger status and # if so, re-authenticate and then retry this request. if response.status_code in self._api_dialect.auth_trigger_statuses: - _LOGGER.debug( - 'Encountered an authentication trigger status("%d"), ' - "re-authenticating.", - response.status_code, - ) - self._auth = self._authenticate() - _LOGGER.debug( - "Re-authentication successful, retrying the request." - ) + with self._lock: + _LOGGER.debug( + 'Encountered an authentication trigger status("%d"), ' + "re-authenticating.", + response.status_code, + ) + self._auth = self._authenticate() + _LOGGER.debug( + "Re-authentication successful, retrying the request." + ) # FIXME: This could lead into a stack overflow, revisit this. return self._make_request(request)