Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Simpler Fix to the Streaming Code due to Changes from Twitter on Jan. 13, 2014. #196

Merged
merged 27 commits into from
Feb 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
128ec04
Ignore .idea files from PyCharm.
adonoho Jan 14, 2014
8631806
Do not send gzip headers for streaming calls. (ref: RouxRC [73efaca])
adonoho Jan 15, 2014
ff3ca19
Fix the uri extension by attribute name.
adonoho Jan 15, 2014
25ea832
Change some default attributes and add TwitterHTTPError (ref: RouxRC …
adonoho Jan 15, 2014
d488eec
Test for delimiters in the stream and removes them. Add comments.
adonoho Jan 15, 2014
95d4980
Update to use OAuth, take in command line arguments and modify the im…
adonoho Jan 15, 2014
d908997
Move the potentially uninitialized values out of the if test.
adonoho Jan 16, 2014
ef99d73
Increase the size of the read buffer to be larger than the average tw…
adonoho Jan 16, 2014
7333aa5
Add support for both user and site streams.
adonoho Jan 17, 2014
57aa6d8
Bring HTTP chunk downloading into its own separate method.
adonoho Jan 17, 2014
54555a7
Cosmetic edit
adonoho Jan 20, 2014
b8cdd54
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 20, 2014
c0fc741
Minimize string decoding and move to use a bytearray for the buffer. …
adonoho Jan 23, 2014
02bce53
Cosmetic edits.
adonoho Jan 24, 2014
db75126
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 24, 2014
2693800
Move recv_chunk() into a stand alone function. Further minimize memor…
adonoho Jan 26, 2014
e28a1da
Move variables out of the iterator class and into the generator funct…
adonoho Jan 26, 2014
23dcd46
As Twitter appears to send complete JSON in the chunks, we can simpli…
adonoho Jan 27, 2014
28a8ef6
Further refine socket management.
adonoho Jan 28, 2014
cd2fbdf
Bump the version number.
adonoho Jan 28, 2014
c20d1a8
Remove all keep-alive delimiters to allow the hangup patch to function.
adonoho Jan 28, 2014
0d92536
Remove socket timeout mutation code.
adonoho Jan 28, 2014
b01fa3f
Set a timeout on the main sample stream to test that code path.
adonoho Jan 28, 2014
443e409
Handle HTTP chunks that only contain keep-alive delimiters.
adonoho Jan 28, 2014
3e782f6
Add comments detailing why we can avoid handling certain edge cases i…
adonoho Jan 29, 2014
a8880f9
Clarify the comment about edge cases.
adonoho Jan 29, 2014
12bb62d
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 30, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ twitter3.egg-info
*~
dist
build
.idea
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
import sys, os

version = '1.10.2'
version = '1.10.3'

install_requires = [
# -*- Extra requirements: -*-
Expand Down
13 changes: 7 additions & 6 deletions twitter/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class TwitterCall(object):

def __init__(
self, auth, format, domain, callable_cls, uri="",
uriparts=None, secure=True, timeout=None):
uriparts=None, secure=True, timeout=None, gzip=False):
self.auth = auth
self.format = format
self.domain = domain
Expand All @@ -137,6 +137,7 @@ def __init__(
self.uriparts = uriparts
self.secure = secure
self.timeout = timeout
self.gzip = gzip

def __getattr__(self, k):
try:
Expand All @@ -145,9 +146,9 @@ def __getattr__(self, k):
def extend_call(arg):
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
callable_cls=self.callable_cls, timeout=self.timeout, uriparts=self.uriparts \
+ (arg,),
secure=self.secure)
callable_cls=self.callable_cls, timeout=self.timeout,
secure=self.secure, gzip=self.gzip,
uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
else:
Expand Down Expand Up @@ -194,13 +195,13 @@ def __call__(self, **kwargs):
uriBase = "http%s://%s/%s%s%s" %(
secure_str, self.domain, uri, dot, self.format)

headers = {'Accept-Encoding': 'gzip'}
headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
body = None; arg_data = None
if self.auth:
headers.update(self.auth.generate_headers())
arg_data = self.auth.encode_params(uriBase, method, kwargs)
if method == 'GET':
uriBase += '?' + arg_data
body = None
else:
body = arg_data.encode('utf8')

Expand Down
94 changes: 61 additions & 33 deletions twitter/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,89 @@

from .api import TwitterCall, wrap_response, TwitterHTTPError

def recv_chunk(sock): # -> bytearray:

buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
crlf = buf.find(b'\r\n') # Find the HTTP chunk size.

if crlf > 0: # If there is a length, then process it

remaining = int(buf[:crlf], 16) # Decode the chunk size.

start = crlf + 2 # Add in the length of the header's CRLF pair.
end = len(buf) - start

chunk = bytearray(remaining)

if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter.
chunk[:remaining] = buf[start:start + remaining]
# There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
# of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
# shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
# and eliminates the need to address them.
else: # There is more to read in the chunk.
chunk[:end] = buf[start:]
chunk[end:] = sock.recv(remaining - end)
sock.recv(2) # Read the trailing CRLF pair. Throw it away.

return chunk

return bytearray()

## recv_chunk()


class TwitterJSONIter(object):

def __init__(self, handle, uri, arg_data, block=True, timeout=None):
self.decoder = json.JSONDecoder()
self.handle = handle
self.buf = b""
self.uri = uri
self.arg_data = arg_data
self.block = block
self.timeout = timeout
self.timer = time.time()


def __iter__(self):
if sys.version_info >= (3, 0):
sock = self.handle.fp.raw._sock
else:
sock = self.handle.fp._sock.fp._sock
sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if not self.block or self.timeout:
sock.setblocking(False)
sock.setblocking(self.block and not self.timeout)
buf = u''
json_decoder = json.JSONDecoder()
timer = time.time()
while True:
try:
utf8_buf = self.buf.decode('utf8').lstrip()
res, ptr = self.decoder.raw_decode(utf8_buf)
self.buf = utf8_buf[ptr:].encode('utf8')
buf = buf.lstrip()
res, ptr = json_decoder.raw_decode(buf)
buf = buf[ptr:]
yield wrap_response(res, self.handle.headers)
self.timer = time.time()
timer = time.time()
continue
except ValueError as e:
if self.block:
pass
else:
yield None
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, self.format, arg_data)
# this is a non-blocking read (ie, it will return if any data is available)
if self.block: pass
else: yield None
try:
buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
if self.timeout:
ready_to_read = select.select([sock], [], [], self.timeout)
if ready_to_read[0]:
self.buf += sock.recv(1024)
if time.time() - self.timer > self.timeout:
yield {"timeout":True}
else:
yield {"timeout":True}
buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
if time.time() - timer > self.timeout:
yield {'timeout': True}
else: yield {'timeout': True}
else:
self.buf += sock.recv(1024)
buf += recv_chunk(sock).decode('utf-8')
if not buf and self.block:
yield {'hangup': True}
except SSLError as e:
if (not self.block or self.timeout) and (e.errno == 2):
# Apparently this means there was nothing in the socket buf
pass
else:
raise
# Error from a non-blocking read of an empty buffer.
if (not self.block or self.timeout) and (e.errno == 2): pass
else: raise

def handle_stream_response(req, uri, arg_data, block, timeout=None):
handle = urllib_request.urlopen(req,)
try:
handle = urllib_request.urlopen(req,)
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, 'json', arg_data)
return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))

class TwitterStreamCallWithTimeout(TwitterCall):
Expand Down Expand Up @@ -119,4 +147,4 @@ def __init__(
TwitterStreamCall.__init__(
self, auth=auth, format="json", domain=domain,
callable_cls=call_cls,
secure=secure, uriparts=uriparts, timeout=timeout)
secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)
53 changes: 41 additions & 12 deletions twitter/stream_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,59 @@

USAGE

twitter-stream-example <username> <password>
stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>

"""

from __future__ import print_function

import sys
import argparse

from .stream import TwitterStream
from .auth import UserPassAuth
from .util import printNicely
from twitter.stream import TwitterStream
from twitter.oauth import OAuth
from twitter.util import printNicely

def main(args=sys.argv[1:]):
if not args[1:]:
print(__doc__)
return 1

# When using twitter stream you must authorize. UserPass or OAuth.
stream = TwitterStream(auth=UserPassAuth(args[0], args[1]))
def parse_arguments():

parser = argparse.ArgumentParser()

parser.add_argument('-t', '--token', help='The Twitter Access Token.')
parser.add_argument('-ts', '--token_secret', help='The Twitter Access Token Secret.')
parser.add_argument('-ck', '--consumer_key', help='The Twitter Consumer Key.')
parser.add_argument('-cs', '--consumer_secret', help='The Twitter Consumer Secret.')
parser.add_argument('-us', '--user_stream', action='store_true', help='Connect to the user stream endpoint.')
parser.add_argument('-ss', '--site_stream', action='store_true', help='Connect to the site stream endpoint.')

return parser.parse_args()

## parse_arguments()


def main():

args = parse_arguments()

# When using twitter stream you must authorize.
auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
if args.user_stream:
stream = TwitterStream(auth=auth, domain='userstream.twitter.com')
tweet_iter = stream.user()
elif args.site_stream:
stream = TwitterStream(auth=auth, domain='sitestream.twitter.com')
tweet_iter = stream.site()
else:
stream = TwitterStream(auth=auth, timeout=60.0)
tweet_iter = stream.statuses.sample()

# Iterate over the sample stream.
tweet_iter = stream.statuses.sample()
for tweet in tweet_iter:
# You must test that your tweet has text. It might be a delete
# or data message.
if tweet.get('text'):
printNicely(tweet['text'])

## main()

if __name__ == '__main__':
main()