Skip to content

Commit

Permalink
Merge pull request #561 from michaelbrooks/stream-disconnection
Browse files Browse the repository at this point in the history
Prevent infinite read loop in tweepy Stream
  • Loading branch information
joshthecoder committed Feb 21, 2015
2 parents f8fa8ec + 74bdefd commit 485f64c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
41 changes: 41 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,47 @@ def test_read_tweet(self):
self.assertEqual('24\n', buf.read_line())
self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))

def test_read_empty_buffer(self):
"""
Requests can be closed by twitter.
The ReadBuffer should not loop infinitely when this happens.
Instead it should return and let the outer _read_loop handle it.
"""

# If the test fails, we are in danger of an infinite loop
# so we need to do some work to block that from happening
class InfiniteLoopException(Exception):
pass

self.called_count = 0
call_limit = 5
def on_read(chunk_size):
self.called_count += 1

if self.called_count > call_limit:
# we have failed
raise InfiniteLoopException("Oops, read() was called a bunch of times")

return ""

# Create a fake stream
stream = six.StringIO('')

# Mock it's read function so it can't be called too many times
mock_read = MagicMock(side_effect=on_read)

try:
with patch.multiple(stream, create=True, read=mock_read, closed=True):
# Now the stream can't call 'read' more than call_limit times
# and it looks like a requests stream that is closed
buf = ReadBuffer(stream, 50)
buf.read_line("\n")
except InfiniteLoopException:
self.fail("ReadBuffer.read_line tried to loop infinitely.")

# The mocked function not have been called at all since the stream looks closed
self.assertEqual(mock_read.call_count, 0)


class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
Expand Down
10 changes: 5 additions & 5 deletions tweepy/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ def __init__(self, stream, chunk_size):
self._chunk_size = chunk_size

def read_len(self, length):
while True:
while not self._stream.closed:
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
self._buffer += self._stream.read(read_len).decode("ascii")

def read_line(self, sep='\n'):
start = 0
while True:
while not self._stream.closed:
loc = self._buffer.find(sep, start)
if loc >= 0:
return self._pop(loc + len(sep))
Expand Down Expand Up @@ -292,9 +292,9 @@ def _data(self, data):
def _read_loop(self, resp):
buf = ReadBuffer(resp.raw, self.chunk_size)

while self.running:
while self.running and not resp.raw.closed:
length = 0
while True:
while not resp.raw.closed:
line = buf.read_line().strip()
if not line:
self.listener.keep_alive() # keep-alive new lines are expected
Expand Down Expand Up @@ -334,7 +334,7 @@ def _read_loop(self, resp):
# self._data(next_status_obj.decode('utf-8'))


if resp.raw._fp.isclosed():
if resp.raw.closed:
self.on_closed(resp)

def _start(self, async):
Expand Down

0 comments on commit 485f64c

Please sign in to comment.