Permalink
Browse files

Implemented Celery MQ (using Redis) on user details lookups.

  • Loading branch information...
1 parent f3077e2 commit 1d872f05e1961c5d2cd9ee93836a056d436780fe @peterbe committed Nov 2, 2011
Showing with 275 additions and 97 deletions.
  1. +9 −1 README.md
  2. +7 −1 bin/run_shell.py
  3. +15 −0 celeryconfig.py
  4. +48 −86 handlers.py
  5. +35 −0 models.py
  6. +2 −0 requirements.txt
  7. +57 −0 tasks.py
  8. +8 −8 templates/following.html
  9. +7 −0 tests/base.py
  10. +87 −1 tests/test_handlers.py
View
@@ -11,4 +11,12 @@ Running tests
Run this:
- python bin/_run_tests.py --logging=error
+ $ python bin/_run_tests.py --logging=error
+
+
+Running celeryd
+---------------
+
+Run celeryd like this:
+
+ $ celeryd --loglevel=INFO
View
@@ -1,7 +1,13 @@
#!/usr/bin/env python
import code, re
-import here
+try:
+ import here
+except ImportError:
+ import sys
+ import os.path as op
+ sys.path.insert(0, op.abspath(op.join(op.dirname(__file__), '..')))
+ import here
if __name__ == '__main__':
View
@@ -0,0 +1,15 @@
+import here
+# http://docs.celeryproject.org/en/latest/tutorials/otherqueues.html#redis
+BROKER_TRANSPORT = "redis"
+
+import settings
+BROKER_HOST = settings.REDIS_HOST
+BROKER_PORT = settings.REDIS_PORT
+BROKER_VHOST = "0" # Maps to database number.
+
+CELERY_IGNORE_RESULT = True
+
+CELERY_IMPORTS = ("tasks", )
+
+import os
+CELERY_ALWAYS_EAGER = bool(os.environ.get('ALWAYS_EAGER', False))
View
@@ -14,7 +14,7 @@
from tornado.escape import json_decode, json_encode
from pymongo.objectid import InvalidId, ObjectId
import utils
-
+import tasks
from models import User, Tweeter
@@ -61,50 +61,10 @@ def save_following(self, source_username, dest_username, result):
def save_tweeter_user(self, user):
user_id = user['id']
tweeter = self.db.Tweeter.find_one({'user_id': user_id})
- _save = False
if not tweeter:
tweeter = self.db.Tweeter()
tweeter['user_id'] = user_id
- _save = True
-
- if tweeter['name'] != user['name']:
- tweeter['name'] = user['name']
- _save = True
-
- if tweeter['username'] != user['screen_name']:
- tweeter['username'] = user['screen_name']
- _save = True
-
- if tweeter['followers'] != user['followers_count']:
- tweeter['followers'] = user['followers_count']
- _save = True
-
- if tweeter['following'] != user['friends_count']:
- tweeter['following'] = user['friends_count']
- _save = True
-
- def parse_status_date(dstr):
- dstr = re.sub('\+\d{1,4}', '', dstr)
- return datetime.datetime.strptime(
- dstr,
- '%a %b %d %H:%M:%S %Y'
- )
- last_tweet_date = None
- if 'status' in user:
- last_tweet_date = user['status']['created_at']
- last_tweet_date = parse_status_date(last_tweet_date)
- if tweeter['last_tweet_date'] != last_tweet_date:
- tweeter['last_tweet_date'] = last_tweet_date
- _save = True
-
- ratio_before = tweeter['ratio']
- ratio = tweeter.set_ratio()
- if ratio != ratio_before:
- _save = True
-
- if _save:
- tweeter.save()
-
+ Tweeter.update_tweeter(tweeter, user)
return tweeter
def assert_tweeter_user(self, user):
@@ -520,53 +480,51 @@ def _fetch_info(self, options, username=None):
if username is None:
username = options['username']
- key = 'info:%s' % username
- value = self.redis.get(key)
+ def age(d):
+ return (datetime.datetime.utcnow() - d).seconds
- if value is None:
- user = self.db.User.find_one({'username': options['this_username']})
- access_token = user['access_token']
+ tweeter = self.db.Tweeter.find_one({'username': username})
+ current_user = self.get_current_user()
+ if not tweeter:
+ access_token = current_user['access_token']
result = yield tornado.gen.Task(self.twitter_request,
"/users/show",
screen_name=username,
access_token=access_token)
- if result:
- self.save_tweeter_user(result)
- else:
- result = json_decode(value)
- self.assert_tweeter_user(result)
- key = None
- if result is None:
+ tweeter = self.save_tweeter_user(result)
+ elif age(tweeter['modify_date']) > 3600:
+ tasks.refresh_user_info.delay(
+ username, current_user['access_token'])
+
+ if not tweeter:
options['error'] = "Unable to look up info for %s" % username
self._render(options)
return
- if isinstance(result, basestring):
- result = json_decode(result)
- if key:
- self.redis.setex(key, json_encode(result), 60 * 60)
+
if 'info' not in options:
- options['info'] = {options['username']: result}
+ options['info'] = {options['username']: tweeter}
self._fetch_info(options, username=options['this_username'])
else:
- options['info'][options['this_username']] = result
+ options['info'][options['this_username']] = tweeter
self._render(options)
def _render(self, options):
- if 'error' not in options:
- if options['follows']:
- page_title = '%s follows me'
- else:
- page_title = '%s is too cool for me'
- self._set_ratio(options, 'username')
- self._set_ratio(options, 'this_username')
- options['page_title'] = page_title % options['username']
- options['perm_url'] = self.get_following_perm_url(
- options['username'], options['this_username'])
- self.render('following.html', **options)
- else:
+ if 'error' in options:
options['page_title'] = 'Error :('
self.render('following_error.html', **options)
+ return
+
+ if options['follows']:
+ page_title = '%s follows me'
+ else:
+ page_title = '%s is too cool for me'
+ options['page_title'] = page_title % options['username']
+ options['perm_url'] = self.get_following_perm_url(
+ options['username'],
+ options['this_username']
+ )
+ self.render('following.html', **options)
def _set_ratio(self, options, key):
value = options[key]
@@ -680,11 +638,16 @@ class FollowingComparedtoHandler(FollowingHandler):
@tornado.gen.engine
def get(self, username, compared_to):
options = {'compared_to': compared_to}
- tweeter = self.db.Tweeter.find_by_username(self.db, username)
- compared_tweeter = self.db.Tweeter.find_by_username(self.db, compared_to)
+ tweeter = self.db.Tweeter.find_one({'username': username})
+ compared_tweeter = self.db.Tweeter.find_one({'username': compared_to})
+
+ def age(d):
+ return (datetime.datetime.utcnow() - d).seconds
+
current_user = self.get_current_user()
if current_user:
+
# if we don't have tweeter info on any of them, fetch it
if not tweeter:
# fetch it
@@ -693,12 +656,19 @@ def get(self, username, compared_to):
screen_name=username,
access_token=current_user['access_token'])
tweeter = self.save_tweeter_user(result)
+ elif age(tweeter['modify_date']) > 3600:
+ tasks.refresh_user_info.delay(
+ username, current_user['access_token'])
+
if not compared_tweeter:
result = yield tornado.gen.Task(self.twitter_request,
"/users/show",
screen_name=compared_to,
access_token=current_user['access_token'])
compared_tweeter = self.save_tweeter_user(result)
+ elif age(compared_tweeter['modify_date']) > 3600:
+ tasks.refresh_user_info.delay(
+ compared_to, current_user['access_token'])
elif not tweeter or not compared_tweeter:
options = {
@@ -717,8 +687,8 @@ def get(self, username, compared_to):
value = self.redis.get(key)
if value is None:
following = (self.db.Following
- .find_one({'user': tweeter['_id'],
- 'follows': compared_tweeter['_id']}))
+ .find_one({'user': tweeter['_id'],
+ 'follows': compared_tweeter['_id']}))
if following:
options['follows'] = following['following']
else:
@@ -735,19 +705,11 @@ def get(self, username, compared_to):
(username, compared_to))
options['info'] = {
- username: {
- 'followers_count': tweeter['followers'],
- 'friends_count': tweeter['following'],
- },
- compared_to: {
- 'followers_count': compared_tweeter['followers'],
- 'friends_count': compared_tweeter['following'],
- }
+ username: tweeter,
+ compared_to: compared_tweeter
}
options['username'] = username
options['this_username'] = compared_to
- self._set_ratio(options, 'username')
- self._set_ratio(options, 'this_username')
options['compared_to'] = compared_to
options['perm_url'] = self.get_following_perm_url(
options['username'], options['this_username'])
View
@@ -2,6 +2,9 @@
import datetime
from pymongo.objectid import ObjectId
from mongolite import Connection, Document
+
+
+
connection = Connection()
class BaseDocument(Document):
@@ -57,6 +60,38 @@ def find_by_username(db, username):
tweeter = db.Tweeter.find_one({'username': re.compile(re.escape(username), re.I)})
return tweeter
+ @staticmethod
+ def update_tweeter(tweeter, user):
+ if tweeter['name'] != user['name']:
+ tweeter['name'] = user['name']
+
+ if tweeter['username'] != user['screen_name']:
+ tweeter['username'] = user['screen_name']
+
+ if tweeter['followers'] != user['followers_count']:
+ tweeter['followers'] = user['followers_count']
+
+ if tweeter['following'] != user['friends_count']:
+ tweeter['following'] = user['friends_count']
+
+ def parse_status_date(dstr):
+ dstr = re.sub('\+\d{1,4}', '', dstr)
+ return datetime.datetime.strptime(
+ dstr,
+ '%a %b %d %H:%M:%S %Y'
+ )
+ last_tweet_date = None
+ if 'status' in user:
+ last_tweet_date = user['status']['created_at']
+ last_tweet_date = parse_status_date(last_tweet_date)
+ if tweeter['last_tweet_date'] != last_tweet_date:
+ tweeter['last_tweet_date'] = last_tweet_date
+
+ ratio_before = tweeter['ratio']
+ tweeter.set_ratio()
+ tweeter.save()
+
+
@connection.register
class Following(BaseDocument):
View
@@ -2,3 +2,5 @@ redis
tornado
mongolite
mock
+tornado-utils
+Celery
View
@@ -0,0 +1,57 @@
+import logging
+import tornado.escape
+import tornado.auth
+import tornado.ioloop
+from celery.task import task
+from celery import conf
+import settings
+from models import Tweeter, connection
+
+
+
+@task
+def refresh_user_info(*args, **kwargs):
+ try:
+ _refresh_user_info(*args, **kwargs)
+ except:
+ logging.error("_refresh_user_info() failed", exc_info=True)
+ if conf.ALWAYS_EAGER:
+ raise
+
+def _refresh_user_info(username, access_token):
+ #from time import sleep; sleep(5)
+ uu = UserUpdate()
+ def cb(r, *args, **kwargs):
+ try:
+ uu.callback(username, r)
+ finally:
+ if not conf.ALWAYS_EAGER:
+ tornado.ioloop.IOLoop.instance().stop()
+ uu.twitter_request("/users/show", cb, access_token=access_token,
+ screen_name=username)
+ if not conf.ALWAYS_EAGER:
+ tornado.ioloop.IOLoop.instance().start()
+
+
+class UserUpdate(tornado.auth.TwitterMixin):
+ def __init__(self):
+ self.settings = dict(
+ twitter_consumer_key=settings.TWITTER_CONSUMER_KEY,
+ twitter_consumer_secret=settings.TWITTER_CONSUMER_SECRET,
+ )
+
+ @property
+ def db(self):
+ return connection[settings.DATABASE_NAME]
+
+ def require_setting(self, key, error):
+ assert key in self.settings, "%s (%s)" % (error, key)
+
+ def async_callback(self, func, callback):
+ return callback
+
+ def callback(self, username, response):
+ result = tornado.escape.json_decode(response.body)
+ tweeter = self.db.Tweeter.find_one({'user_id': result['id']})
+ assert tweeter['username'].lower() == username.lower()
+ Tweeter.update_tweeter(tweeter, result)
Oops, something went wrong. Retry.

0 comments on commit 1d872f0

Please sign in to comment.