Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Improve social feed importing

  • Loading branch information...
commit 4d09c4d165bf9b3cf74f04db27be20060cde7454 1 parent 94c297d
@juyrjola juyrjola authored
View
110 importers/backends/django.py
@@ -21,6 +21,7 @@
from web.geo.models import *
from web.political.models import *
from web.social.utils import FeedUpdater, UpdateError
+from web.social.models import BrokenFeed
class DjangoBackend(Backend):
def submit_elections(self, elections):
@@ -127,24 +128,72 @@ def _validate_fb_feed(self, candidate, feed_name):
person_name = unicode(candidate.person).encode('utf8')
feed_name = unicode(feed_name).encode('utf8')
self.logger.debug("%s: Validating FB feed %s" % (person_name, feed_name))
+
+ # Attempt to find the feed with different parameters
+ search_args = [
+ {'origin_id__iexact': feed_name},
+ {'account_name__iexact': feed_name}
+ ]
+
+ cf = None
+ for args in search_args:
+ try:
+ cf = CandidateFeed.objects.get(type='FB', **args)
+ self.logger.debug("%s: Feed %s found" % (person_name, feed_name))
+ if cf.candidate != candidate:
+ other_name = unicode(candidate.person).encode('utf8')
+ self.logger.warning("%s: Found feed (%s) was for %s" %
+ (person_name, feed_name, other_name))
+ if not self.replace:
+ return
+ break
+ except CandidateFeed.DoesNotExist:
+ pass
+
+ # Check if the feed was previously marked broken.
+ bf = None
+ if not cf:
+ try:
+ bf = BrokenFeed.objects.get(type='FB', origin_id=feed_name)
+ self.logger.debug("%s: FB feed %s marked broken" % (person_name, feed_name))
+ if not self.replace:
+ return
+ except BrokenFeed.DoesNotExist:
+ pass
+
+ # Attempt to download data from FB and mark the feed
+ # as broken if we encounter trouble.
try:
graph = self.feed_updater.fb_graph.get(feed_name)
except pyfaceb.exceptions.FBHTTPException as e:
- print e
+ if not cf and not bf:
+ bf = BrokenFeed(type='FB', origin_id=feed_name)
+ bf.reason = e.message[0:49]
+ bf.save()
return
if not 'category' in graph:
self.logger.warning('%s: FB %s: not a page' % (person_name, feed_name))
+ assert not cf
+ if not bf:
+ bf = BrokenFeed(type='FB', origin_id=feed_name)
+ bf.reason = "not-page"
+ bf.save()
return
+
+ # Now we know the feed is valid. If a BrokenFeed object exists,
+ # remove it.
+ if bf:
+ bf.delete()
+
origin_id = graph['id']
- if CandidateFeed.objects.filter(type='FB', origin_id=origin_id).count():
- self.logger.warning('%s: FB %s: already exists' % (person_name, feed_name))
- return
+ if not cf:
+ try:
+ cf = CandidateFeed.objects.get(type='FB', origin_id=origin_id)
+ assert cf.candidate == candidate
+ except CandidateFeed.DoesNotExist:
+ assert CandidateFeed.objects.filter(candidate=candidate, type='FB').count() == 0
+ cf = CandidateFeed(candidate=candidate, type='FB')
- try:
- cf = CandidateFeed.objects.get(candidate=candidate, type='FB')
- return
- except CandidateFeed.DoesNotExist:
- cf = CandidateFeed(candidate=candidate, type='FB')
cf.origin_id = origin_id
cf.account_name = graph.get('username', None)
cf.save()
@@ -156,26 +205,47 @@ def _validate_twitter_feed(self, candidate, feed_name):
twitter = self.feed_updater.twitter
if feed_name.isdigit():
- args = {'user_id': feed_name}
+ tw_args = {'user_id': feed_name}
+ orm_args = {'origin_id': feed_name}
else:
- args = {'screen_name': feed_name}
+ tw_args = {'screen_name': feed_name}
+ orm_args = {'account_name__iexact': feed_name}
+
+ try:
+ cf = CandidateFeed.objects.get(type='TW', **orm_args)
+ self.logger.debug("%s: Feed %s found" % (person_name, feed_name))
+ if cf.candidate != candidate:
+ other_name = unicode(candidate.person).encode('utf8')
+ self.logger.warning("%s: Found feed (%s) was for %s" %
+ (person_name, feed_name, other_name))
+ if not self.replace:
+ return
+ except CandidateFeed.DoesNotExist:
+ cf = None
+ pass
+
+ # Check if the feed was previously marked broken.
+ bf = None
+ if not cf:
+ try:
+ bf = BrokenFeed.objects.get(type='TW', origin_id=feed_name)
+ self.logger.debug("%s: TW feed %s marked broken" % (person_name, feed_name))
+ if not self.replace:
+ return
+ except BrokenFeed.DoesNotExist:
+ pass
try:
- res = twitter.showUser(**args)
+ res = twitter.showUser(**tw_args)
except TwythonError as e:
self.logger.error('Twitter error: %s', e)
return
origin_id = str(res['id'])
- if CandidateFeed.objects.filter(type='TW', origin_id=origin_id).count():
- self.logger.warning('%s: TW %s: already exists' % (person_name, feed_name))
- return
-
- try:
- cf = CandidateFeed.objects.get(candidate=candidate, type='TW')
- return
- except CandidateFeed.DoesNotExist:
+ if not cf:
+ assert CandidateFeed.objects.filter(candidate=candidate, type='TW').count() == 0
cf = CandidateFeed(candidate=candidate, type='TW')
+
cf.origin_id = origin_id
cf.account_name = res.get('screen_name', None)
cf.save()
View
32 web/social/management/commands/update_social.py
@@ -13,37 +13,9 @@
class Command(BaseCommand):
help = "Update social media feeds"
- def update_twitter(self):
- feed_list = Feed.objects.filter(type='TW')
- # check only feeds that haven't been updated for two hours
- update_dt = datetime.datetime.now() - datetime.timedelta(hours=2)
- feed_list = feed_list.filter(Q(last_update__lt=update_dt) | Q(last_update__isnull=True))
- for feed in feed_list:
- try:
- self.updater.process_twitter_timeline(feed)
- except UpdateError as e:
- feed.update_error_count += 1
- feed.save()
- if not e.can_retry:
- break
-
- def update_facebook(self):
+ def handle(self, *args, **options):
import requests_cache
requests_cache.configure("update-social")
-
- feed_list = Feed.objects.filter(type='FB')
- # check only feeds that haven't been updated for two hours
- update_dt = datetime.datetime.now() - datetime.timedelta(hours=2)
- feed_list = feed_list.filter(Q(last_update__lt=update_dt) | Q(last_update__isnull=True))
- for feed in feed_list:
- try:
- self.updater.process_facebook_timeline(feed)
- except UpdateError:
- feed.update_error_count += 1
- feed.save()
-
- def handle(self, *args, **options):
self.logger = logging.getLogger(__name__)
self.updater = FeedUpdater(self.logger)
- self.update_twitter()
- self.update_facebook()
+ self.updater.update_feeds()
View
80 web/social/migrations/0003_auto__add_brokenfeed__add_unique_brokenfeed_type_origin_id.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+import datetime
+from south.db import db
+from south.v2 import SchemaMigration
+from django.db import models
+
+
+class Migration(SchemaMigration):
+
+ def forwards(self, orm):
+ # Adding model 'BrokenFeed'
+ db.create_table('social_brokenfeed', (
+ ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('type', self.gf('django.db.models.fields.CharField')(max_length=2)),
+ ('origin_id', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)),
+ ('account_name', self.gf('django.db.models.fields.CharField')(max_length=50, null=True)),
+ ('check_time', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, blank=True)),
+ ('reason', self.gf('django.db.models.fields.CharField')(max_length=50)),
+ ))
+ db.send_create_signal('social', ['BrokenFeed'])
+
+ # Adding unique constraint on 'BrokenFeed', fields ['type', 'origin_id']
+ db.create_unique('social_brokenfeed', ['type', 'origin_id'])
+
+
+ def backwards(self, orm):
+ # Removing unique constraint on 'BrokenFeed', fields ['type', 'origin_id']
+ db.delete_unique('social_brokenfeed', ['type', 'origin_id'])
+
+ # Deleting model 'BrokenFeed'
+ db.delete_table('social_brokenfeed')
+
+
+ models = {
+ 'social.apitoken': {
+ 'Meta': {'object_name': 'ApiToken'},
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'token': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+ 'type': ('django.db.models.fields.CharField', [], {'max_length': '2'}),
+ 'updated_time': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'})
+ },
+ 'social.brokenfeed': {
+ 'Meta': {'unique_together': "(('type', 'origin_id'),)", 'object_name': 'BrokenFeed'},
+ 'account_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True'}),
+ 'check_time': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'origin_id': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
+ 'reason': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
+ 'type': ('django.db.models.fields.CharField', [], {'max_length': '2'})
+ },
+ 'social.feed': {
+ 'Meta': {'unique_together': "(('type', 'origin_id'),)", 'object_name': 'Feed'},
+ 'account_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True'}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'interest': ('django.db.models.fields.PositiveIntegerField', [], {'null': 'True'}),
+ 'last_update': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'db_index': 'True'}),
+ 'origin_id': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
+ 'picture': ('django.db.models.fields.URLField', [], {'max_length': '250', 'null': 'True'}),
+ 'type': ('django.db.models.fields.CharField', [], {'max_length': '2'}),
+ 'update_error_count': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0'})
+ },
+ 'social.update': {
+ 'Meta': {'ordering': "['-created_time']", 'unique_together': "(('feed', 'origin_id'),)", 'object_name': 'Update'},
+ 'created_time': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
+ 'feed': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['social.Feed']"}),
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'interest': ('django.db.models.fields.PositiveIntegerField', [], {'null': 'True'}),
+ 'origin_id': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
+ 'picture': ('django.db.models.fields.URLField', [], {'max_length': '250', 'null': 'True'}),
+ 'share_caption': ('django.db.models.fields.CharField', [], {'max_length': '500', 'null': 'True'}),
+ 'share_description': ('django.db.models.fields.CharField', [], {'max_length': '500', 'null': 'True'}),
+ 'share_link': ('django.db.models.fields.URLField', [], {'max_length': '250', 'null': 'True'}),
+ 'share_title': ('django.db.models.fields.CharField', [], {'max_length': '250', 'null': 'True'}),
+ 'sub_type': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
+ 'text': ('django.db.models.fields.CharField', [], {'max_length': '4000', 'null': 'True'}),
+ 'type': ('django.db.models.fields.CharField', [], {'max_length': '30'})
+ }
+ }
+
+ complete_apps = ['social']
View
11 web/social/models.py
@@ -27,6 +27,17 @@ def update_from_origin(self):
self.interest = feed_info.get('likes', None)
return feed_info
+class BrokenFeed(models.Model):
+ type = models.CharField(max_length=2, choices=Feed.TYPE_CHOICES)
+ origin_id = models.CharField(max_length=50, db_index=True)
+ account_name = models.CharField(max_length=50, null=True)
+ check_time = models.DateTimeField(auto_now=True)
+ reason = models.CharField(max_length=50)
+
+ class Meta:
+ unique_together = (('type', 'origin_id'),)
+
+
class Update(models.Model):
feed = models.ForeignKey(Feed, db_index=True)
text = models.CharField(max_length=4000, null=True)
View
111 web/social/utils.py
@@ -6,6 +6,7 @@
import email.utils
import urllib
import calendar
+import time
from twython import Twython, TwythonError
import pyfaceb
import dateutil.parser
@@ -13,11 +14,12 @@
from django.conf import settings
from django.db import transaction
+from django.db.models import Q
from social.models import ApiToken, Feed, Update
class UpdateError(Exception):
- def __init__(self, msg, can_retry=False):
- self.can_retry = can_retry
+ def __init__(self, msg, can_continue=False):
+ self.can_continue = can_continue
super(UpdateError, self).__init__(msg)
TOKEN_URL = "https://graph.facebook.com/oauth/access_token?" + \
@@ -46,10 +48,18 @@ def get_facebook_token():
'TWITTER_ACCESS_TOKEN_SECRET': 'oauth_token_secret'
}
+def _append_without_dupes(feed_list, feed_list_b):
+ feed_dict = {}
+ for f in feed_list:
+ feed_dict[f.pk] = f
+ for f in feed_list_b:
+ if f.pk not in feed_dict:
+ feed_list.append(f)
+
class FeedUpdater(object):
def __init__(self, logger=None):
if not logger:
- logger = logging.getLogger("feed-updater")
+ logger = logging.getLogger(__name__)
self.logger = logger
token = get_facebook_token()
self.fb_graph = pyfaceb.FBGraph(token)
@@ -92,8 +102,36 @@ def subscribe_to_twitter_list(self, base_name, feeds):
if 'Not Found' in e.msg:
break
+ def find_feeds_to_update(self, feed_type=None):
+ base_query = Feed.objects.order_by('last_update')
+ if feed_type:
+ base_query = base_query.filter(type=feed_type)
+ # Check first feeds that have never been updated.
+ feed_list = list(base_query.filter(Q(last_update__isnull=True)))
+ self.logger.debug("%d feeds that have never been updated" % len(feed_list))
+
+ # Then feeds that haven't been updated in two days.
+ update_dt = datetime.datetime.now() - datetime.timedelta(days=2)
+ fl = list(base_query.filter(last_update__lt=update_dt))
+ self.logger.debug("%d feeds that haven't been updated in a while" % len(fl))
+ _append_without_dupes(feed_list, fl)
+
+ # Finally feeds that haven't been updated in two hours,
+ # but that are active (i.e. have posts dating from the
+ # last week).
+ update_dt = datetime.datetime.now() - datetime.timedelta(hours=2)
+ post_dt = datetime.datetime.now() - datetime.timedelta(weeks=1)
+ active = Q(update__created_time__gt=post_dt)
+ fl = list(base_query.filter(Q(last_update__lt=update_dt) & active).distinct())
+ self.logger.debug("%d feeds that are active" % len(fl))
+ _append_without_dupes(feed_list, fl)
+
+ self.logger.debug("updating a total of %d feeds" % len(feed_list))
+
+ return feed_list
+
@transaction.commit_on_success
- def process_twitter_timeline(self, feed):
+ def process_twitter_feed(self, feed):
self.logger.info("Processing Twitter feed %s" % feed.account_name)
user_id = feed.origin_id
args = {'user_id': user_id, 'username': user_id, 'count': 200,
@@ -104,7 +142,7 @@ def process_twitter_timeline(self, feed):
except TwythonError as e:
self.logger.error("Got Twitter exception: %s" % e)
if "Rate limit exceeded" in e.msg:
- raise UpdateError("Rate limit exceeded", can_retry=False)
+ raise UpdateError("Rate limit exceeded", can_continue=False)
raise UpdateError(e.msg)
feed.interest = info['followers_count']
feed.picture = info['profile_image_url']
@@ -122,7 +160,7 @@ def process_twitter_timeline(self, feed):
except TwythonError as e:
self.logger.error("Got Twitter exception: %s" % e)
if "Rate limit exceeded" in e.msg:
- raise UpdateError("Rate limit exceeded", can_retry=False)
+ raise UpdateError("Rate limit exceeded", can_continue=False)
raise UpdateError(e.msg)
break
if 'error' in tweets:
@@ -163,18 +201,34 @@ def process_twitter_timeline(self, feed):
feed.last_update = datetime.datetime.now()
feed.save()
+ def _fb_get(self, url):
+ last_e = None
+ # Allow for three retries
+ for i in range(0, 3):
+ try:
+ ret = self.fb_graph.get(url)
+ return ret
+ except pyfaceb.exceptions.FBHTTPException as e:
+ self.logger.error("%s" % e)
+ if i < 3:
+ # Some errors seem to be transient.
+ if '#803' in e.message or '2500' in e.message:
+ self.logger.error("Retrying")
+ last_e = e
+ # Sleep for a while before continuing.
+ time.sleep(0.5)
+ continue
+ # If we got this far, the error repeated 3 times, so
+ # we bail out.
+ raise UpdateError(last_e.message)
+
@transaction.commit_on_success
- def process_facebook_timeline(self, feed, full_update=False):
+ def process_facebook_feed(self, feed, full_update=False):
self.logger.info('Processing feed %s: %s' % (feed.account_name, feed.origin_id))
# First update the feed itself
url = '%s&fields=picture,likes,about' % feed.origin_id
- try:
- feed_info = self.fb_graph.get(url)
- except pyfaceb.exceptions.FBHTTPException as e:
- self.logger.error("%s" % e)
- raise UpdateError(e.message)
-
+ feed_info = self._fb_get(url)
feed.picture = feed_info.get('picture', {}).get('data', {}).get('url', None)
feed.interest = feed_info.get('likes', None)
@@ -186,11 +240,7 @@ def process_facebook_timeline(self, feed, full_update=False):
url = '%s/posts&limit=%d' % (feed.origin_id, count)
while True:
self.logger.info('Fetching %s' % url)
- try:
- g = self.fb_graph.get(url)
- except pyfaceb.exceptions.FBHTTPException as e:
- self.logger.error("%s" % e)
- raise UpdateError(e.message)
+ g = self._fb_get(url)
found = False
for post in g['data']:
# Sanity check
@@ -275,6 +325,31 @@ def process_facebook_timeline(self, feed, full_update=False):
feed.last_update = datetime.datetime.now()
feed.save()
+ def process_feed(self, feed):
+ if feed.type == "TW":
+ return self.process_twitter_feed(feed)
+ assert feed.type == "FB"
+ return self.process_facebook_feed(feed)
+
+ def update_feeds(self):
+ feed_list = self.find_feeds_to_update("TW")
+ for feed in feed_list:
+ try:
+ self.process_feed(feed)
+ except UpdateError as e:
+ feed.update_error_count += 1
+ feed.save()
+ if not e.can_continue:
+ break
+ feed_list = self.find_feeds_to_update("FB")
+ for feed in feed_list:
+ try:
+ self.process_feed(feed)
+ except UpdateError as e:
+ feed.update_error_count += 1
+ feed.save()
+ if not e.can_continue:
+ break
def get_facebook_graph(graph_id):
token = get_facebook_token()
Please sign in to comment.
Something went wrong with that request. Please try again.