Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 197 lines (168 sloc) 6.006 kb
9a13d33 @joshmarshall Initial import.
authored
1 from tornado.web import RequestHandler, Application, asynchronous
2dc5164 @joshmarshall Added prefetching tweets for a filtered streaming request.
authored
2 from tornado.httpclient import AsyncHTTPClient
9a13d33 @joshmarshall Initial import.
authored
3 from tornado.websocket import WebSocketHandler
4 from tornado.ioloop import IOLoop
5 from toredis.client import Client
6 import json
7 import uuid
8 import time
9 import optparse
10 import logging
2dc5164 @joshmarshall Added prefetching tweets for a filtered streaming request.
authored
11 import urllib
12 import urlparse
9a13d33 @joshmarshall Initial import.
authored
13
14 SETTINGS = {
15 "port": 8888,
16 "twitter_url": "/1/statuses/sample.json",
17
18 # Arg / cookie settings
19 "client_id_cookie": "client_id",
20 "last_time_argument": "last_time",
21
22 # Redis arguments
23 "redis_host": "localhost",
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
24 "redis_pub_channel": "tweetwatcher",
25
26 # cache settings (should use redis for this later!!)
27 "max_cache": 50
9a13d33 @joshmarshall Initial import.
authored
28 }
29
30 try:
31 from settings import SETTINGS as local_settings
32 SETTINGS.update(local_settings)
33 except ImportError:
34 # No local settings
35 pass
36
37 OPTIONS = optparse.OptionParser()
38 OPTIONS.add_option("-p", "--port", dest="port", type="int",
39 default=8888, help="the server port")
40
41 CLIENTS = {} # in memory list of this Tornado instance's connections
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
42 CACHE = []
9a13d33 @joshmarshall Initial import.
authored
43
44 class PageHandler(RequestHandler):
45 """ Just a simple handler for loading the base index page. """
46
47 def get(self):
48 host = self.request.host
49 if ":" not in host:
50 host += ":%s" % SETTINGS["port"]
51 self.render("index.htm", host=host)
52
53 class PollHandler(RequestHandler):
54
55 @asynchronous
56 def get(self):
57 """ Long polling group """
58 self.client_id = self.get_cookie(SETTINGS["client_id_cookie"])
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
59 last_time = int(self.get_argument(SETTINGS["last_time_argument"], 0))
9a13d33 @joshmarshall Initial import.
authored
60 if not self.client_id:
61 self.client_id = uuid.uuid4().hex
62 self.set_cookie(SETTINGS["client_id_cookie"], self.client_id)
63 CLIENTS[self.client_id] = self
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
64 messages = {
65 "type": "messages",
66 "messages": [],
67 "time": int(time.time())
68 }
69 for msg in CACHE[:]:
70 if msg["time"] > last_time:
71 messages["messages"].append(msg)
72 if messages["messages"]:
73 return self.finish(messages)
9a13d33 @joshmarshall Initial import.
authored
74
75 def write_message(self, message):
76 """ Write a response and close connection """
77 del CLIENTS[self.client_id]
78 self.finish({
79 "type": "messages",
80 "messages":[message,],
81 "time": int(time.time())
82 })
83
84
85 class StreamHandler(WebSocketHandler):
86 """ Watches the Twitter stream """
87
88 client_id = None
89
90 def open(self):
91 """ Creates the client and watches stream """
92 self.client_id = uuid.uuid4().hex
93 CLIENTS[self.client_id] = self
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
94 [self.write_message(msg) for msg in CACHE]
9a13d33 @joshmarshall Initial import.
authored
95
96 def send_message(self, message):
97 """ Send a message to the watching clients """
98 msg_type, msg_channel, msg_value = message
99 if msg_type == "message":
100 self.write_message(json.loads(msg_value))
101
102 def on_message(self, message):
103 """ Just a heartbeat, no real purpose """
104 pass
105
106 def on_close(self):
107 """ Removes a client from the connection list """
108 del CLIENTS[self.client_id]
109 print "Client %s removed." % self.client_id
110
111
112 def handle_message(message):
113 """ Handle a publish message """
114 msg_type, msg_channel, msg_value = message
115 if msg_type != "message":
116 # Nothing important right now
117 return
118 message = json.loads(msg_value)
cbf2150 @joshmarshall Adding hacky cache until we implement a redis-based solution.
authored
119 CACHE.append(message)
120 while len(CACHE) > 50:
121 CACHE.pop(0)
9a13d33 @joshmarshall Initial import.
authored
122 for client in CLIENTS.values():
123 try:
124 client.write_message(message)
125 except Exception, exc:
126 # Client is already closed?
127 logging.error("%s", exc)
128
129
2dc5164 @joshmarshall Added prefetching tweets for a filtered streaming request.
authored
130 def cache_prefetched_tweets(response):
131 """ Fills cache with new messages """
132 if response.code != 200:
133 logging.error("Invalid twitter response: %s" % response.code)
134 return
135 results = json.loads(response.body)["results"]
136 results.reverse()
137 for result in results:
138 try:
139 text = result["text"]
140 name = ""
141 username = result["from_user"]
142 avatar = result["profile_image_url"]
143 except KeyError:
144 logging.error("Invalid tweet response %s" % result)
145 continue
146 CACHE.append({
147 "type": "tweet",
148 "text": text,
149 "name": name,
150 "username": username,
151 "avatar": avatar,
152 "time": 1
153 })
154
155
156 def prefetch_tweets(ioloop):
157 """ Prefetches the tweets for the configured keyword """
158 client = AsyncHTTPClient()
159 twitter_url_parsed = urlparse.urlparse(SETTINGS["twitter_url"])
160 twitter_query = urlparse.parse_qs(twitter_url_parsed.query)
161 twitter_keywords = twitter_query.get("track")
162 if not twitter_keywords:
163 logging.warning("No twitter keywords -- not prefetching.")
164 return
165 params = {
166 "result_type": "recent",
167 "rpp": SETTINGS["max_cache"],
168 "with_twitter_user_id": "true",
169 "q": " ".join(twitter_keywords)
170 }
171 query_params = urllib.urlencode(params)
172 client.fetch("https://search.twitter.com/search.json?%s" % query_params,
173 callback=cache_prefetched_tweets)
9a13d33 @joshmarshall Initial import.
authored
174
175 def main():
176 """ Start the application and Twitter stream monitor """
177 options, args = OPTIONS.parse_args()
178 if options.port:
179 SETTINGS["port"] = options.port
2dc5164 @joshmarshall Added prefetching tweets for a filtered streaming request.
authored
180 ioloop = IOLoop.instance()
181 prefetch_tweets(ioloop)
9a13d33 @joshmarshall Initial import.
authored
182 app = Application([
183 (r"/", PageHandler),
184 (r"/stream", StreamHandler),
185 (r"/poll", PollHandler)
186 ], static_path="static", debug=True)
187 app.listen(SETTINGS["port"])
188 client = Client() # redis connection for all connected clients
189 client.connect()
190 client.subscribe(SETTINGS["redis_pub_channel"], callback=handle_message)
191 # Monitor Twitter Stream once for all clients
192 ioloop.start()
193
194
195 if __name__ == "__main__":
196 main()
Something went wrong with that request. Please try again.