Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions ld_eventsource/actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from typing import Optional
from typing import Any, Dict, Optional

from ld_eventsource.errors import ExceptionWithHeaders


class Action:
Expand Down Expand Up @@ -110,9 +112,25 @@ class Start(Action):
Instances of this class are only available from :attr:`.SSEClient.all`.
A ``Start`` is returned for the first successful connection. If the client reconnects
after a failure, there will be a :class:`.Fault` followed by a ``Start``.

Each ``Start`` action may include HTTP response headers from the connection. These headers
are available via the :attr:`headers` property. On reconnection, a new ``Start`` will be
emitted with the headers from the new connection, which may differ from the previous one.
"""

pass
def __init__(self, headers: Optional[Dict[str, Any]] = None):
self._headers = headers

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""
The HTTP response headers from the stream connection, if available.

The headers dict uses case-insensitive keys (via urllib3's HTTPHeaderDict).

:return: the response headers, or ``None`` if not available
"""
return self._headers


class Fault(Action):
Expand All @@ -125,6 +143,9 @@ class Fault(Action):
connection attempt has failed or an existing connection has been closed. The SSEClient
will attempt to reconnect if you either call :meth:`.SSEClient.start()`
or simply continue reading events after this point.

When the error includes HTTP response headers (such as for :class:`.HTTPStatusError`
or :class:`.HTTPContentTypeError`), they are accessible via the :attr:`headers` property.
"""

def __init__(self, error: Optional[Exception]):
Expand All @@ -138,3 +159,18 @@ def error(self) -> Optional[Exception]:
in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.
"""
return self.__error

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""
The HTTP response headers from the failed connection, if available.

This property returns headers when the error is an exception that includes them,
such as :class:`.HTTPStatusError` or :class:`.HTTPContentTypeError`. For other
error types or when the stream ended normally, this returns ``None``.

:return: the response headers, or ``None`` if not available
"""
if isinstance(self.__error, ExceptionWithHeaders):
return self.__error.headers
return None
21 changes: 17 additions & 4 deletions ld_eventsource/config/connect_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import dataclass
from logging import Logger
from typing import Callable, Iterator, Optional, Union
from typing import Any, Callable, Dict, Iterator, Optional, Union

from urllib3 import PoolManager

Expand Down Expand Up @@ -96,9 +96,10 @@ class ConnectionResult:
The return type of :meth:`ConnectionClient.connect()`.
"""

def __init__(self, stream: Iterator[bytes], closer: Optional[Callable]):
def __init__(self, stream: Iterator[bytes], closer: Optional[Callable], headers: Optional[Dict[str, Any]] = None):
self.__stream = stream
self.__closer = closer
self.__headers = headers

@property
def stream(self) -> Iterator[bytes]:
Expand All @@ -107,6 +108,18 @@ def stream(self) -> Iterator[bytes]:
"""
return self.__stream

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""
The HTTP response headers, if available.

For HTTP connections, this contains the headers from the SSE stream response.
For non-HTTP connections, this will be ``None``.

The headers dict uses case-insensitive keys (via urllib3's HTTPHeaderDict).
"""
return self.__headers

def close(self):
"""
Does whatever is necessary to release the connection.
Expand Down Expand Up @@ -139,8 +152,8 @@ def __init__(self, params: _HttpConnectParams, logger: Logger):
self.__impl = _HttpClientImpl(params, logger)

def connect(self, last_event_id: Optional[str]) -> ConnectionResult:
stream, closer = self.__impl.connect(last_event_id)
return ConnectionResult(stream, closer)
stream, closer, headers = self.__impl.connect(last_event_id)
return ConnectionResult(stream, closer, headers)

def close(self):
self.__impl.close()
Expand Down
38 changes: 36 additions & 2 deletions ld_eventsource/errors.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,62 @@
from typing import Any, Dict, Optional, Protocol, runtime_checkable


@runtime_checkable
class ExceptionWithHeaders(Protocol):
"""
Protocol for exceptions that include HTTP response headers.
This allows type-safe access to headers from error responses without
using hasattr checks.
"""

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""The HTTP response headers associated with this exception."""
raise NotImplementedError


class HTTPStatusError(Exception):
"""
This exception indicates that the client was able to connect to the server, but that
the HTTP response had an error status.
When available, the response headers are accessible via the :attr:`headers` property.
"""

def __init__(self, status: int):
def __init__(self, status: int, headers: Optional[Dict[str, Any]] = None):
super().__init__("HTTP error %d" % status)
self._status = status
self._headers = headers

@property
def status(self) -> int:
return self._status

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""The HTTP response headers, if available."""
return self._headers


class HTTPContentTypeError(Exception):
"""
This exception indicates that the HTTP response did not have the expected content
type of `"text/event-stream"`.
When available, the response headers are accessible via the :attr:`headers` property.
"""

def __init__(self, content_type: str):
def __init__(self, content_type: str, headers: Optional[Dict[str, Any]] = None):
super().__init__("invalid content type \"%s\"" % content_type)
self._content_type = content_type
self._headers = headers

@property
def content_type(self) -> str:
return self._content_type

@property
def headers(self) -> Optional[Dict[str, Any]]:
"""The HTTP response headers, if available."""
return self._headers
14 changes: 9 additions & 5 deletions ld_eventsource/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from logging import Logger
from typing import Callable, Iterator, Optional, Tuple
from typing import Any, Callable, Dict, Iterator, Optional, Tuple, cast
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit

from urllib3 import PoolManager
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(self, params: _HttpConnectParams, logger: Logger):
self.__should_close_pool = params.pool is not None
self.__logger = logger

def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable]:
def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable, Dict[str, Any]]:
url = self.__params.url
if self.__params.query_params is not None:
qp = self.__params.query_params()
Expand Down Expand Up @@ -100,13 +100,17 @@ def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callab
reason: Optional[Exception] = e.reason
if reason is not None:
raise reason # e.reason is the underlying I/O error

# Capture headers early so they're available for both error and success cases
response_headers = cast(Dict[str, Any], resp.headers)

if resp.status >= 400 or resp.status == 204:
raise HTTPStatusError(resp.status)
raise HTTPStatusError(resp.status, response_headers)
content_type = resp.headers.get('Content-Type', None)
if content_type is None or not str(content_type).startswith(
"text/event-stream"
):
raise HTTPContentTypeError(content_type or '')
raise HTTPContentTypeError(content_type or '', response_headers)

stream = resp.stream(_CHUNK_SIZE)

Expand All @@ -117,7 +121,7 @@ def close():
pass
resp.release_conn()

return stream, close
return stream, close, response_headers

def close(self):
if self.__should_close_pool:
Expand Down
16 changes: 12 additions & 4 deletions ld_eventsource/sse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SSEClient:
:meth:`.RetryDelayStrategy.default()`, this delay will double with each subsequent retry,
and will also have a pseudo-random jitter subtracted. You can customize this behavior with
``retry_delay_strategy``.

**HTTP Response Headers:**
When using HTTP-based connections, the response headers from each connection are available
via the :attr:`.Start.headers` property when reading from :attr:`all`. Each time the client
connects or reconnects, a :class:`.Start` action is emitted containing the headers from that
specific connection. This allows you to access server metadata such as rate limits, session
identifiers, or custom headers.
"""

def __init__(
Expand Down Expand Up @@ -178,9 +185,10 @@ def all(self) -> Iterable[Action]:
# Reading implies starting the stream if it isn't already started. We might also
# be restarting since we could have been interrupted at any time.
while self.__connection_result is None:
fault = self._try_start(True)
result = self._try_start(True)
# return either a Start action or a Fault action
yield Start() if fault is None else fault
if result is not None:
yield result

lines = _BufferedLineReader.lines_from(self.__connection_result.stream)
reader = _SSEReader(lines, self.__last_event_id, None)
Expand Down Expand Up @@ -263,7 +271,7 @@ def _compute_next_retry_delay(self):
self.__current_retry_delay_strategy.apply(self.__base_retry_delay)
)

def _try_start(self, can_return_fault: bool) -> Optional[Fault]:
def _try_start(self, can_return_fault: bool) -> Union[None, Start, Fault]:
if self.__connection_result is not None:
return None
while True:
Expand Down Expand Up @@ -297,7 +305,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]:
self._retry_reset_baseline = time.time()
self.__current_error_strategy = self.__base_error_strategy
self.__interrupted = False
return None
return Start(self.__connection_result.headers)

@property
def last_event_id(self) -> Optional[str]:
Expand Down
9 changes: 5 additions & 4 deletions ld_eventsource/testing/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@ def apply(self) -> ConnectionResult:


class RespondWithStream(MockConnectionHandler):
def __init__(self, stream: Iterable[bytes]):
def __init__(self, stream: Iterable[bytes], headers: Optional[dict] = None):
self.__stream = stream
self.__headers = headers

def apply(self) -> ConnectionResult:
return ConnectionResult(stream=self.__stream.__iter__(), closer=None)
return ConnectionResult(stream=self.__stream.__iter__(), closer=None, headers=self.__headers)


class RespondWithData(RespondWithStream):
def __init__(self, data: str):
super().__init__([bytes(data, 'utf-8')])
def __init__(self, data: str, headers: Optional[dict] = None):
super().__init__([bytes(data, 'utf-8')], headers)


class ExpectNoMoreRequests(MockConnectionHandler):
Expand Down
Loading