diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py index fde502d..49c9b93 100644 --- a/src/sghi/etl/commons/sinks.py +++ b/src/sghi/etl/commons/sinks.py @@ -99,7 +99,7 @@ def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]: class NullSink(Sink[_PDT], Generic[_PDT]): """A :class:`Sink` that discards all the data it receives. - Like to ``dev/null`` on Unix, instances of this ``Sink`` discard all data + Like ``dev/null`` on Unix, instances of this ``Sink`` discard all data drained to them but report the drain operation as successful. This is mostly useful as a placeholder or where further consumption of processed data is not required. @@ -190,7 +190,7 @@ class ScatterSink(Sink[_PDT], Generic[_PDT]): Disposing instances of this class also disposes of their embedded sinks. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Sink`` are **NOT SAFE** to retry. """ @@ -324,8 +324,8 @@ def drain(self, processed_data: _PDT) -> None: """ self._logger.info("Draining processed data to all available sinks.") - with self._executor as executor: - futures = executor.execute(processed_data) + executor = self._executor.__enter__() + futures = executor.execute(processed_data) self._result_gatherer(futures) @@ -357,9 +357,9 @@ def dispose(self) -> None: def _sink_to_task(self, s: Sink[_PDT]) -> Task[_PDT, None]: @task def do_drain(processed_data: _PDT) -> None: - with s as _s: - drain = self._retry_policy_factory().retry(_s.drain) - return drain(processed_data) + _s = s.__enter__() + drain = self._retry_policy_factory().retry(_s.drain) + return drain(processed_data) return do_drain @@ -390,7 +390,7 @@ class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]): Disposing instances of this class also disposes of their embedded sinks. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Sink`` are **NOT SAFE** to retry. """ # noqa: D205 @@ -550,8 +550,8 @@ def drain(self, processed_data: Sequence[_PDT]) -> None: "to all available sinks." ) - with self._executor as executor: - futures = executor.execute(processed_data) + executor = self._executor.__enter__() + futures = executor.execute(processed_data) self._result_gatherer(futures) @@ -587,9 +587,9 @@ def _sink_to_task( ) -> Task[Sequence[_PDT], None]: @task def do_drain(processed_data: Sequence[_PDT]) -> None: - with s as _s: - drain = self._retry_policy_factory().retry(_s.drain) - return drain(processed_data[i]) + _s = s.__enter__() + drain = self._retry_policy_factory().retry(_s.drain) + return drain(processed_data[i]) return do_drain