Skip to content

Commit

Permalink
Checkpoint on progress elsewhere in the project
Browse files Browse the repository at this point in the history
  • Loading branch information
lmorchard committed Nov 27, 2011
1 parent 57200e1 commit 7a41f3e
Show file tree
Hide file tree
Showing 11 changed files with 429 additions and 158 deletions.
22 changes: 20 additions & 2 deletions OUTLINE.markdown
@@ -1,7 +1,16 @@
# Tinkering with Activity Streams

* Preface
* The Activity Stream Data Model
* Part I:
* What are Activity Streams?
* Gathering Tools for Tinkering
* node.js
* Python
* CouchDB
* RabbitMQ
* Celery
* Vagrant & Puppet?

* Creating
* Publishing
* Republishing
Expand All @@ -18,8 +27,14 @@

## Inbox

* The Lifestreams data model seems a lot like CouchDB
* Substreams & agents = a lot like map functions
* Summarize = a lot like reduce function


* Hacks
* rss/atom-to-as-json converter
* convert RSS/Atom feed to AS-JSON
* fill in missing fields with inferences and user-supplied metadata
* CouchApp to post to an AS?
* S3 app to post to an AS?
* status notes
Expand All @@ -29,4 +44,7 @@
* from facebook
* from twitter
* from google+
* webfinger seems handy
* How do replies work?
* salmon?
* ostatus?
223 changes: 140 additions & 83 deletions book/ch01-what-are-activity-streams.asciidoc

Large diffs are not rendered by default.

84 changes: 61 additions & 23 deletions code/nodejs/twas/cli/play.js
Expand Up @@ -18,6 +18,7 @@ var Play = ({
QUEUE_CONCURRENCY: 32,
BUFFER_SIZE: 100 * 1024,
FETCH_MAX_AGE: 1 * 60 * 60 * 1000,
MAX_ERRORS: 3,
FEEDSUBSCRIPTION_BY_PROFILE_ID_VIEW:
'main/FeedSubscription-by-profile-id',

Expand All @@ -29,14 +30,15 @@ var Play = ({
$this.logger = new (winston.Logger)({
transports: [
new (winston.transports.Console)({
//level: 'info',
// level: 'info',
level: 'debug',
colorize: true
})
]
});
$this.logger.cli();
$this.logger.setLevels(winston.config.syslog.levels);
$this.logger
.cli()
.setLevels(winston.config.syslog.levels);

$this.fetch_queue = async.queue(
_.bind($this.fetchOneFeed, $this),
Expand All @@ -51,7 +53,9 @@ var Play = ({
$this.logger.info(results.length + ' subscriptions fetched.');
for (var i=0,result; result=results[i]; i++) {
var sub = result.value;
$this.fetch_queue.push(sub);
if (sub.url && !sub.disabled) {
$this.fetch_queue.push(sub);
}
}
});

Expand All @@ -70,11 +74,12 @@ var Play = ({
// Ensure resource data, creating it from defaults if necessary.
hr = _.extend({
_id: hr_doc_id,
_rev: null,
doc_type: hr_doc_type,
url: sub.url,
headers: {},
last_error: null,
last_fetch_time: 0
status_history: []
}, hr || {});

// Check whether this resource was fetched too recently to
Expand All @@ -84,7 +89,6 @@ var Play = ({
// $this.logger.debug("SKIP " + sub.url);
return next();
}
hr.last_fetch_time = now;
sub.last_fetch_time = now;

// Set up request headers for conditional GET, if the right
Expand Down Expand Up @@ -122,31 +126,50 @@ var Play = ({
res.on('end', function () {
$this.logger.info("("+res.statusCode+") GET " + sub.url);

if (304 == res.statusCode) {
// Skip other updates for 304.
var content = null;

if (200 == res.statusCode) {
hr.status = res.statusCode;
hr.headers = res.headers;
hr.last_error = null;
return $this.updateRecords(sub, hr, null, next);
content = Buffer.concat(chunks);
}

hr.status = res.statusCode;
hr.headers = res.headers;
if (301 == res.statusCode) {
// On a 301 Moved, change the subscription URL and
// queue another fetch after updating DB records.
if (hr.headers['location'] && sub.url != hr.headers['location']) {
sub.url = hr.headers['location'];
var old_next = next;
next = function () {
$this.fetch_queue.push(sub);
old_next();
};
$this.logger.debug("REQUEUE " + sub.url);
}
}

// TODO: Handle 302 temporary redirects.
if (302 == res.statusCode) {
hr.last_error = null;
}

if (200 == res.statusCode) {
if (304 == res.statusCode) {
// Skip other updates for 304.
hr.last_error = null;
var content = Buffer.concat(chunks);
return $this.updateRecords(sub, hr, content, next);
}

// TODO: Handle 3xx redirects.
// TODO: Disable feeds with 4xx and 5xx

return $this.updateRecords(sub, hr, null, next);
return $this.updateRecords(sub, hr, content, next);
});

});

// Register error handler for HTTP GET request.
req.on('error', function (e) {
$this.logger.error("ERROR "+sub.url+" "+e.code+" "+e);
hr.status = 999;
hr.last_error = ''+e;
return $this.updateRecords(sub, hr, null, next);
});
Expand All @@ -161,6 +184,26 @@ var Play = ({
// record after the HTTP request.
updateRecords: function (sub, hr, content, next) {
var $this = this;

// Rotate the status history for this resource
var h = hr.status_history;
h.push(hr.status);
if (h.length > 10) { h.shift(); }

// Count recent consecutive errors
var ct = 0;
for (var i=h.length-1; i>=0; i--) {
if (h[i] >= 400) { ct++; }
else { break; }
}

// Update the resource and disable sub if too many errors
hr.status_history = h;
if (ct > $this.MAX_ERRORS) {
$this.logger.error("TOO MANY ERRORS, DISABLING " + sub.url);
sub.disabled = true;
}

async.waterfall([
function (wf_next) {
// First, update the subscription record.
Expand All @@ -169,13 +212,8 @@ var Play = ({
},
function (wf_next) {
// Second, update or create the resource record.
if (hr._rev) {
$this.db.save(hr._id, hr._rev, hr,
function (err, res) { wf_next(); });
} else {
$this.db.save(hr._id, hr,
function (err, res) { wf_next(); });
}
$this.db.save(hr._id, hr._rev, hr,
function (err, res) { wf_next(); });
},
function (wf_next) {
// Third, if we have content for the resource,
Expand Down
Expand Up @@ -7,7 +7,11 @@
from django.core.management.base import BaseCommand, CommandError
from manage import path

from main.models import FeedSubscription
from main.models import Profile, FeedSubscription


PROFILES_BY_USER_NAME_VIEW = 'main/Profile-by-user-name'


class Command(BaseCommand):
help = 'Create new subscription'
Expand All @@ -20,14 +24,24 @@ class Command(BaseCommand):
action='store', dest='url', help='URL'),
make_option('-l', '--link',
action='store', dest='link', help='Link'),
make_option('-p', '--profile',
action='store', dest='profile', help='Profile'),
)
can_import_settings = True

def handle(self, *arg, **kwargs):

# Grab the profile for supplied user name
p = Profile.view(PROFILES_BY_USER_NAME_VIEW,
key=kwargs['profile']).first()
if not p:
raise CommandError('No profile named %s found' %
kwargs['profile'])

sub = FeedSubscription.get_or_create('%s:%s' %
('FeedSubscription', kwargs['url']))
sub.feed_type = kwargs['type']
sub.profile_id = p._id
sub.url = kwargs['url']
sub.link = kwargs['link']
sub.title = kwargs['title']
Expand Down
18 changes: 13 additions & 5 deletions code/python/twas/apps/main/management/commands/fetchfeeds.py
Expand Up @@ -13,7 +13,7 @@

from twas.utils import opml

from main.models import FeedSubscription, HttpResource
from main.models import Profile, FeedSubscription, HttpResource

PROFILES_BY_USER_NAME_VIEW = 'main/Profile-by-user-name'
FEEDSUBSCRIPTION_BY_PROFILE_ID_VIEW = 'main/FeedSubscription-by-profile-id'
Expand Down Expand Up @@ -70,10 +70,6 @@ def fetchSubscription(self, s):
r = requests.get(s.url, headers=h, timeout=10.0)
print "\t%s" % (r.status_code)

if 304 == r.status_code:
# 304 Not Modified means no need to update.
return

if 200 == r.status_code:
# Try grabbing response content,
# clear error if successful.
Expand All @@ -84,7 +80,19 @@ def fetchSubscription(self, s):
content = r.content
content_type = r.headers['content-type']

if 304 == r.status_code:
# 304 Not Modified means no need to update.
return

# TODO: Handle 3xx redirects.
if 301 == r.status_code:
s.url = hr.headers['location']
s.save()
return self.fetchSubscription(s)

# TODO: Disable feeds with 4xx and 5xx
if 404 == r.status_code:
s.disabled = True

except Exception, e:
# If there was any exception at all, save it.
Expand Down
2 changes: 2 additions & 0 deletions code/python/twas/apps/main/management/commands/opmlimport.py
Expand Up @@ -8,9 +8,11 @@

from main.models import Profile, FeedSubscription


PROFILES_BY_USER_NAME_VIEW = 'main/Profile-by-user-name'
FEEDSUBSCRIPTION_BY_PROFILE_ID_VIEW = 'main/FeedSubscription-by-profile-id'


class Command(BaseCommand):
help = 'Import OPML subscriptions'
args = 'filename'
Expand Down
95 changes: 95 additions & 0 deletions code/python/twas/apps/main/management/commands/parsefeeds.py
@@ -0,0 +1,95 @@
import sys
import os
import base64
import time
import json
from uuid import uuid4

from datetime import tzinfo, timedelta, datetime
from optparse import make_option

import requests

from django.db import connection
from django.core.management.base import BaseCommand, CommandError
from manage import path

from twas.utils import opml

import couchdb
from main.models import Profile, FeedSubscription, HttpResource


PROFILES_BY_USER_NAME_VIEW = 'main/Profile-by-user-name'
FEEDSUBSCRIPTION_BY_PROFILE_ID_VIEW = 'main/FeedSubscription-by-profile-id'
ITEMS_BY_STREAM_URL = 'main/ActivityStreamItem-by-stream-url'
FETCH_MAX_AGE = 60 * 60 * 1000 # 1 hour


class TZ(tzinfo):
def utcoffset(self, dt):
return timedelta(minutes=0)


class Command(BaseCommand):
help = 'Refresh subscribed feeds'
option_list = BaseCommand.option_list + (
make_option('-u', '--user',
action='store', dest='user_name',
default="",
help='User name'),
)
can_import_settings = True

def handle(self, *arg, **kwargs):

self.couch = couchdb.Server()
self.db = self.couch['twas']

# Grab the profile for supplied user name
p = Profile.view(PROFILES_BY_USER_NAME_VIEW,
key=kwargs['user_name']).first()
if not p:
raise CommandError('No profile named %s found' %
kwargs['user_name'])

subs = FeedSubscription.view(FEEDSUBSCRIPTION_BY_PROFILE_ID_VIEW,
key=p._id).all()
for s in subs:
if s['feed_type'] == 'as-json':
self.parseJSON(s)

def parseJSON(self, s):
dt_now = datetime.utcnow().replace(tzinfo=TZ()).isoformat()

# Fetch the HttpResource
hr_id = HttpResource.new_id(s.url)
hr = HttpResource.get(hr_id)
if hr.status != 200:
return False

# Parse the JSON from the HttpResource
content = hr.fetch_attachment('body')
data = json.loads(content)
print "Parsed %s items from %s" % (len(data['items']), s.url)

# Fetch previous items for this stream URL, if any.
prev_items = self.db.view(ITEMS_BY_STREAM_URL,
key=s.url)
items_by_id = dict((i.id, i.value) for i in prev_items)

# Create / update items in DB from the stream.
new_items = []
for item in data['items']:
_id = 'ActivityStreamItem:%s:%s' % (s.url, item['id'])
db_item = items_by_id.get(_id, {})
db_item.update(item)
db_item['_id'] = _id
db_item['doc_type'] = 'ActivityStreamItem'
db_item['stream_url'] = s.url
if 'parsed_first_at' not in db_item:
db_item['parsed_first_at'] = dt_now
db_item['parsed_update_at'] = dt_now
new_items.append(db_item)

results = self.db.update(new_items)

0 comments on commit 7a41f3e

Please sign in to comment.