Skip to content
Browse files

Flushing the buffer will now restart the timer.

  • Loading branch information...
1 parent 77ceeee commit 47ae41322199146a7cf33b88c6996aff545497fd @nvie committed May 25, 2010
Showing with 23 additions and 15 deletions.
  1. +22 −14 drainers/buffered.py
  2. +1 −1 tests/test_buffered.py
View
36 drainers/buffered.py
@@ -67,6 +67,10 @@ def my_buffered_callback(lines):
def buffer(self):
return self._buffer
+ @property
+ def timer(self):
+ return self._timer
+
def _should_flush(self):
# If neither chunk_size or flush_timeout is set, behave
# like a regular Drainer
@@ -84,7 +88,7 @@ def _wrapper(self, line, is_err):
self.buffer.append(tuple)
if self._should_flush():
- self._flush()
+ self._flush(restart_timer=True)
def _empty_buffer(self):
'''Empty the buffer and return a copy of it.'''
@@ -93,28 +97,32 @@ def _empty_buffer(self):
bufcopy.append(self.buffer.pop(0))
return bufcopy
- def _flush(self):
+ def _flush(self, restart_timer=True):
+ if self.timer and restart_timer:
+ self._destroy_timer()
+
if len(self.buffer) > 0:
bufcopy = self._empty_buffer()
self._orig_read_event_cb(bufcopy)
+ if self.timer and restart_timer:
+ self._create_timer()
+
def _create_timer(self):
if not self.flush_timeout is None:
- self._timer = threading.Timer(self.flush_timeout, self._flush_and_reset)
+ self._timer = threading.Timer(self.flush_timeout, self._awake_from_timer)
self._timer.daemon = True
self._timer.start()
def _destroy_timer(self):
- if not self._timer is None:
- self._timer.cancel()
-
- def _reset_timer(self):
- if not self._timer is None:
- self._destroy_timer()
- self._create_timer()
-
- def _flush_and_reset(self):
- self._flush()
+ if self.timer:
+ self.timer.cancel()
+
+ def _awake_from_timer(self):
+ # Don't let flush restart the timer. This function is only
+ # called by a Timer that has gone off, so cancelling it has no
+ # effect from here. Instead, we create a new Timer when done.
+ self._flush(restart_timer=False)
self._create_timer()
def start(self):
@@ -127,6 +135,6 @@ def start(self):
self._create_timer()
result = super(BufferedDrainer, self).start()
self._destroy_timer()
- self._flush()
+ self._flush(restart_timer=False)
return result
View
2 tests/test_buffered.py
@@ -20,7 +20,7 @@ def collect(self, lines):
self.chunks.append(lines)
called_functions = [fname for _, _, fname, _ in traceback.extract_stack()]
- if '_flush_and_reset' in called_functions:
+ if '_awake_from_timer' in called_functions:
self.triggered_by_timer += 1
elif '_wrapper' in called_functions:
self.triggered_by_chunk_size_exceeded += 1

0 comments on commit 47ae413

Please sign in to comment.
Something went wrong with that request. Please try again.