Skip to content

Commit

Permalink
fix(http): make the HTTP transport threadsafe
Browse files Browse the repository at this point in the history
Fix the bug on the `app.lib.http.http_transport.HTTPTransport` class causing the transport to close prematurely when used in concurrent contexts.
  • Loading branch information
kennedykori committed Sep 21, 2022
1 parent 9c520c2 commit 42cbb55
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
9 changes: 8 additions & 1 deletion app/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@


class Transport(Disposable, metaclass=ABCMeta):
"""Represents the flow of data between an IDR Server and this app."""
"""
Represents the flow of data between an IDR Server and this app.
.. note::
It is highly recommended that implementors of this interface make their
implementations thread safe as a transport is likely to be used in a
concurrent context.
"""

@abstractmethod
def fetch_data_source_extracts(
Expand Down
32 changes: 21 additions & 11 deletions app/lib/transports/http/http_transport.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from threading import RLock
from typing import Any, Mapping, Optional, Sequence

from requests.auth import AuthBase
Expand Down Expand Up @@ -53,6 +54,9 @@ class HTTPTransport(Transport):
An implementation of the :class:`transport <Transport>` interface that uses
the HTTP protocol for data transmission between an IDR Server and IDR
client.
.. note::
This implementation is thread safe.
"""

def __init__(
Expand Down Expand Up @@ -87,6 +91,9 @@ def __init__(
}
)
self._session.hooks["response"].append(_log_response)
self._lock: RLock = RLock()
# Mutable state. This properties should be guarded under a lock when
# being modified.
self._auth: AuthBase = _NoAuth()
self._is_closed: bool = False

Expand All @@ -96,8 +103,10 @@ def is_disposed(self) -> bool:

def dispose(self) -> None:
_LOGGER.debug("Closing transport")
self._is_closed = True
self._session.close()
if not self._is_closed:
with self._lock:
self._is_closed = True
self._session.close()

# FETCH DATA SOURCE EXTRACTS
# -------------------------------------------------------------------------
Expand Down Expand Up @@ -263,15 +272,16 @@ def _make_request(self, request: HTTPRequestParams) -> Response:
# if the status is among the re-authentication trigger status and
# if so, re-authenticate and then retry this request.
if response.status_code in self._api_dialect.auth_trigger_statuses:
_LOGGER.debug(
'Encountered an authentication trigger status("%d"), '
"re-authenticating.",
response.status_code,
)
self._auth = self._authenticate()
_LOGGER.debug(
"Re-authentication successful, retrying the request."
)
with self._lock:
_LOGGER.debug(
'Encountered an authentication trigger status("%d"), '
"re-authenticating.",
response.status_code,
)
self._auth = self._authenticate()
_LOGGER.debug(
"Re-authentication successful, retrying the request."
)
# FIXME: This could lead into a stack overflow, revisit this.
return self._make_request(request)

Expand Down

0 comments on commit 42cbb55

Please sign in to comment.