Skip to content

Commit

Permalink
Add immediate flushing if we accumulate too much data.
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonspeed committed Sep 18, 2023
1 parent 8ffe969 commit c4a41a4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
3 changes: 3 additions & 0 deletions src/twisted/protocols/test/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,9 @@ def test_writes_get_aggregated(self, writes: list[Union[bytes, None]]):
else:
length_so_far += len(value)
aggregate.write(value)
if length_so_far > 64_000:
lengths.append(length_so_far)
length_so_far = 0
aggregate.flush()
if length_so_far != 0:
lengths.append(length_so_far)
Expand Down
26 changes: 20 additions & 6 deletions src/twisted/protocols/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from twisted.internet._sslverify import _setAcceptableProtocols
from twisted.internet.interfaces import (
IConsumer,
IDelayedCall,
IHandshakeListener,
ILoggingContext,
INegotiated,
Expand Down Expand Up @@ -706,19 +707,32 @@ def __init__(self, write: Callable[[bytes],object], clock: IReactorTime):
self._write = write
self._clock = clock
self._buffer = []
self._bufferLen = 0
self._scheduled : Optional[IDelayedCall] = None

def write(self, data: bytes) -> None:
"""Buffer the data or write it immediately."""
was_empty = len(self._buffer) == 0
# TODO might want logic that flushes large writes immediately to reduce
# memory usage...
"""
Buffer the data, or write it immediately if we've accumulated enough to
make it worth it.
Accumulating too much data can result in higher memory usage.
"""
self._buffer.append(data)
if was_empty:
self._clock.callLater(0, self.flush)
self._bufferLen += len(data)
if self._bufferLen > 64_000:
self.flush()
if self._scheduled is None:
self._scheduled = self._clock.callLater(0, self._scheduledFlush)

def _scheduledFlush(self) -> None:
"""Called in next reactor iteration."""
self._scheduled = None
self.flush()

def flush(self) -> None:
"""Flush any buffered writes."""
if self._buffer:
self._bufferLen = 0
self._write(b"".join(self._buffer))
del self._buffer[:]

Expand Down

0 comments on commit c4a41a4

Please sign in to comment.