Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Use a buffer rather than reading the Twitter response one byte at a time... #144

Merged
merged 3 commits into from

2 participants

@danfairs

.... This actually respects the buffer_size option that was already there, but unused.

This improves performance from processing about 25 tweets/sec on my 2.8 GHz Core 2 Duo MacBook Pro pegging a core at 99% up to processing about 50/sec using 10% CPU.

@joshthecoder
Owner

Wow that's night and day in terms of CPU usage. Thanks for fixing that up.

@joshthecoder joshthecoder merged commit a370558 into tweepy:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 31, 2012
  1. Use a buffer rather than reading the Twitter response one byte at a t…

    Dan Fairs authored
    …ime. This actually respects the buffer_size option that was already there, but unused.
Commits on Feb 4, 2012
This page is out of date. Refresh to see the latest.
Showing with 27 additions and 16 deletions.
  1. +27 −16 tweepy/streaming.py
View
43 tweepy/streaming.py
@@ -135,24 +135,31 @@ def _run(self):
if exception:
raise
- def _read_loop(self, resp):
- while self.running:
- if resp.isclosed():
- break
-
- # read length
- data = ''
- while True:
- c = resp.read(1)
- if c == '\n':
- break
- data += c
- data = data.strip()
-
- # read data and pass into listener
- if self.listener.on_data(data) is False:
+ def _data(self, data):
+ for d in [dt for dt in data.split('\n') if dt]:
+ if self.listener.on_data(d) is False:
self.running = False
+ def _read_loop(self, resp):
+ buf = ''
+ while self.running and not resp.isclosed():
+ c = resp.read(self.buffer_size)
+ idx = c.rfind('\n')
+ if idx > -1:
+ # There is an index. Store the tail part for later,
+ # and process the head part as messages. We use idx + 1
+ # as we dont' actually want to store the newline.
+ data = buf + c[:idx]
+ buf = c[idx + 1:]
+ self._data(data)
+ else:
+ # No newline found, so we add this to our accumulated
+ # buffer
+ buf += c
+
+ if resp.isclosed():
+ self.on_closed(resp)
+
def _start(self, async):
self.running = True
if async:
@@ -160,6 +167,10 @@ def _start(self, async):
else:
self._run()
+ def on_closed(self, resp):
+ """ Called when the response has been closed by Twitter """
+ pass
+
def userstream(self, count=None, async=False, secure=True):
if self.running:
raise TweepError('Stream object already connected!')
Something went wrong with that request. Please try again.