forked from python-twitter-tools/twitter
-
Notifications
You must be signed in to change notification settings - Fork 4
/
stream.py
96 lines (82 loc) · 3.36 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
try:
import urllib.request as urllib_request
import urllib.error as urllib_error
import io
except ImportError:
import urllib2 as urllib_request
import urllib2 as urllib_error
import json
from ssl import SSLError
from .api import TwitterCall, wrap_response
class TwitterJSONIter(object):
def __init__(self, handle, uri, arg_data, block=True):
self.decoder = json.JSONDecoder()
self.handle = handle
self.buf = b""
self.block = block
def __iter__(self):
sock = self.handle.fp._sock.fp._sock
if not self.block:
sock.setblocking(False)
while True:
try:
utf8_buf = self.buf.decode('utf8').lstrip()
res, ptr = self.decoder.raw_decode(utf8_buf)
self.buf = utf8_buf[ptr:].encode('utf8')
yield wrap_response(res, self.handle.headers)
continue
except ValueError as e:
if self.block:
pass
else:
yield None
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, self.format, arg_data)
# this is a non-blocking read (ie, it will return if any data is available)
try:
self.buf += sock.recv(1024)
except SSLError as e:
if (not self.block) and (e.errno == 2):
# Apparently this means there was nothing in the socket buf
pass
else:
raise
def handle_stream_response(req, uri, arg_data, block):
handle = urllib_request.urlopen(req,)
return iter(TwitterJSONIter(handle, uri, arg_data, block))
class TwitterStreamCall(TwitterCall):
def _handle_response(self, req, uri, arg_data):
return handle_stream_response(req, uri, arg_data, block=True)
class TwitterStreamCallNonBlocking(TwitterCall):
def _handle_response(self, req, uri, arg_data):
return handle_stream_response(req, uri, arg_data, block=False)
class TwitterStream(TwitterStreamCall):
"""
The TwitterStream object is an interface to the Twitter Stream API
(stream.twitter.com). This can be used pretty much the same as the
Twitter class except the result of calling a method will be an
iterator that yields objects decoded from the stream. For
example::
twitter_stream = TwitterStream(auth=UserPassAuth('joe', 'joespassword'))
iterator = twitter_stream.statuses.sample()
for tweet in iterator:
...do something with this tweet...
The iterator will yield tweets forever and ever (until the stream
breaks at which point it raises a TwitterHTTPError.)
The `block` parameter controls if the stream is blocking. Default
is blocking (True). When set to False, the iterator will
occasionally yield None when there is no available message.
"""
def __init__(
self, domain="stream.twitter.com", secure=True, auth=None,
api_version='1', block=True):
uriparts = ()
uriparts += (str(api_version),)
if block:
call_cls = TwitterStreamCall
else:
call_cls = TwitterStreamCallNonBlocking
TwitterStreamCall.__init__(
self, auth=auth, format="json", domain=domain,
callable_cls=call_cls,
secure=secure, uriparts=uriparts)