Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

experimental nodejs feed subscription daemon

  • Loading branch information...
commit 9b95c1482e61bd469ba27fad9ad846b17c6ad24c 1 parent 53caff1
@maxogden authored
View
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2011 Max Ogden
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
98 README.md
@@ -1,97 +1 @@
-# PubSubHubbub Subscribing for CouchDB
-
-Make CouchDB accept and store PubSubHubbub feeds! Original concept by @rdfturtle (Tyler Gillies)
-
-# Why is this cool?
-
-So I reckon you have some old and busted RSS or ATOM feeds that you need to pull from whenever you want updates, right?
-
-Well, little buddy, it's time to enter the world of push!
-
-Hook your pull feeds into the magic that is PubSubHubbubb (superfeedr.com does this for you... _for free!_) and they will push to a specific PubSubHubbubb subscriber endpoint whenever they have new content.
-
-You might be asking "where am I going to get a valid PubSubHubbubb subscriber endpoint to store all of these awesome feed pushes"? Well, i'm glad you asked!
-
-With Couchpubtato, you can make any Couch database act like a valid PubSubHubbubb subscriber endpoint.
-
-# I still don't get it
-
-Ok, so lets say you want to embed a feed of upcoming calendar events on your _sweet blog_, but the calendar page only has a junky RSS feed! You can use Superfeedr to push new RSS updates in realtime to a CouchDB of your choice, which turns your previously junky RSS feed into a full JSONP enabled API for maximum client side widget goodness!
-
-Here's an example of this whole thingy working: I log all Portland, OR 911 calls and make them available as ActivityStreams here: <code>http://pdxapi.com/pdx911/feed?descending=true&limit=5</code>
-
-# ActivityStreams
-
-By default this will convert any incoming XML RSS/ATOM feed data into JSON [ActivityStreams](http://activitystrea.ms) format. ActivityStreams are the new hotness, pure XML ATOM/RSS is old and busted. Here's an example of an ActivityStreams formatted feed item:
-
- {
- "postedTime":"2010-10-14T00:58:32Z",
- "object": {
- "permalinkUrl": "http://rss.cnn.com/~r/rss/cnn_latest/~3/s52R1lImWu0/index.html",
- "objectType": "article",
- "summary": "O'Donnell, Coons stage feisty debate in Delaware"
- },
- "verb": "post",
- "actor": {
- "permalinkUrl": "http://rss.cnn.com/~r/rss/cnn_latest/~3/s52R1lImWu0/index.html",
- "objectType": "service",
- "displayName": "CNN.com Recently Published/Updated"
- }
- }
-
-# Quick install
-
-* Get a free hosted Couch from [CouchOne](http://couchone.com/get) and a free Superfeedr subscriber account from [Superfeedr](http://superfeedr.com)
-* Make a new database on your couch: <code>curl -X PUT http://YOURCOUCH/DBNAME</code>
-* Copy Couchpubtato to the new db: <code>curl -X POST http://YOURCOUCH/\_replicate -d '{"source":"http://max.couchone.com/apps","target":"apps", "doc\_ids":["_design/push"]}'</code>
-* Tell Superfeedr to store an XML feed in your Couch: <code>curl -X POST http://superfeedr.com/hubbub -u'SUPERFEEDRUSERNAME:SUPERFEEDRPASSWORD' -d'hub.mode=subscribe' -d'hub.verify=sync' -d'hub.topic=YOURFEEDURL' -d'hub.callback=http://YOURCOUCH/DBNAME/\_design/push/_rewrite/xml' -D-</code>
-
-# In-depth install
-
-You can use any CouchDB hosted pubicly (so that Superfeedr can post updates to it), but I'll assume you're working with a free Couch hosted by [CouchOne](http://couchone.com/get). You'll have to create a new database to store your data. I'll call mine <code>awesome-events</code>. You can do this from http://YOURCOUCH/_utils.
-
-You can either replicate the couchapp from my couch [max.couchone.com/apps/_design/push](http://max.couchone.com/apps/_design/push) (quickest option) or, if you want to hack on the Couchpubtato source code first, you'll need to install the [CouchApp command line utility](http://couchapp.org/page/installing) and check out this repo.
-
-If you want to hack on Couchpubtato/build it yourself, once you have the couchapp utility working, <code>git clone</code> this repo and go into this folder and execute <code>couchapp init</code>. To upload Couchpubtato into your couch just run <code>couchapp push http://YOURCOUCH/DATABASENAME</code>. Otherwise see the Quick install section above.
-
-When you push Couchpubtato into your Couch it will enhance your new database with the magical PubSubHubbubb sprinkles contained in Couchpubtato and teach your database how to store PubSubHubbubb data.
-
-Once your database is ready to store data, we need to use Superfeedr to hook up a feed to dump into your new PubSubHubbubb enabled Couch database.
-
-Now go get a free Subscriber account at [Superfeedr](http://superfeedr.com)
-
-To subscribe to a feed, use the following <code>curl</code> command:
-
-<code>curl -X POST http://superfeedr.com/hubbub -u'SUPERFEEDRUSERNAME:SUPERFEEDRPASSWORD' -d'hub.mode=subscribe' -d'hub.verify=sync' -d'hub.topic=YOURFEEDURL' -d'hub.callback=http://YOURCOUCH/DATABASENAME/\_design/push/_rewrite/xml' -D-</code>
-
-It should return a <code>204</code> if everything was fine, and if not, it will indicate what was wrong in the BODY. If you don't like curl, there also exist PubSubHubbub libraries in many languages.
-
-Now any feed updates will get sent to your Couch and will be converted and saved as JSON ActivityStreams objects.
-
-# Known issues
-
-Superfeedr will send multiple feed updates in a single update, so if 3 new items were posted to your feed, it will send a single XML update containing 3 items. Couch currently can only create a single document for each update, so you may see documents that contain multiple feed items. A more desirable situation would be a 1:1 ratio of documents in Couch to feed items.
-
-# License
-
-The MIT License
-
-Copyright (c) 2010 Max Ogden and Tyler Gillies
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
+work in progress
View
1  _id
@@ -1 +0,0 @@
-_design/push
View
35 child.js
@@ -0,0 +1,35 @@
+// from https://github.com/szayat/node.couch.js
+
+var Script = process.binding('evals').Script;
+var stdin = process.openStdin();
+
+stdin.setEncoding('utf8');
+
+var buffer = '';
+var listener;
+
+var loadModule = function (doc) {
+ var wrapper = "(function (exports, require, module, __filename, __dirname) { "
+ + doc.changes
+ + "\n});";
+
+ var module = {exports:{},id:'changes'}
+
+ var compiledWrapper = process.compile(wrapper, doc.changes);
+ var p = compiledWrapper.apply(doc, [module.exports, require, module]);
+ return module.exports;
+}
+
+stdin.on('data', function (chunk) {
+ buffer += chunk.toString();
+ while (buffer.indexOf('\n') !== -1) {
+ line = buffer.slice(0, buffer.indexOf('\n'));
+ buffer = buffer.slice(buffer.indexOf('\n') + 1);
+ var obj = JSON.parse(line);
+ if ((obj[0]) === "ddoc") {
+ listener = loadModule(obj[1]).listener;
+ } else if (obj[0] === "change") {
+ listener(obj[1],obj[2]);
+ }
+ }
+});
View
4 couchapp.json
@@ -1,4 +0,0 @@
-{
- "name": "CouchApp PubSubHubbub Subscriptions",
- "description": "A Simple CouchApp PubSub Subscriber"
-}
View
7 couchapp/.couchappignore
@@ -0,0 +1,7 @@
+[
+ // filenames matching these regexps will not be pushed to the database
+ // uncomment to activate; separate entries with ","
+ // ".*~$"
+ // ".*\\.swp$"
+ // ".*\\.bak$"
+]
View
19 couchapp/changes.js
@@ -0,0 +1,19 @@
+var sys = require('sys')
+, couchdb = require('./lib/node-couchdb-min/couchdb')
+, archiver = require('./feed-archiver');
+
+exports.listener = function(change) {
+ if (!change.doc) return;
+
+ if (change.doc.feed && change.doc.db) {
+
+ var feedSaveFunc = function (data) {
+ sys.debug(sys.inspect(data));
+ for (var item in data.channel.item) {
+ archiver.saveItem(data.channel.item[item], new couchdb.Db(change.doc.db), "description");
+ }
+ }
+
+ archiver.processFeed(change.doc.feed, feedSaveFunc);
+ }
+}
View
54 feed-archiver.js
@@ -0,0 +1,54 @@
+#!/usr/bin/env node
+
+var fs = require('fs')
+ , sys = require('sys')
+ , xml2js = require('xml2js')
+ , url = require('url')
+ , http = require('http')
+ , crypto = require('crypto')
+ , parser = new xml2js.Parser();
+
+var debug = false;
+
+exports.processFeed = function(feedUrl, feedSaveFunc) {
+
+ var feed = url.parse(feedUrl);
+
+ var feedClient = http.createClient(80, feed.host);
+
+ var request = feedClient.request("GET", feed.pathname, {'host' : feed.host});
+ request.end();
+
+ request.on('response', function(response) {
+ response.setEncoding('utf8');
+
+ var data;
+
+ response.on('data', function(chunk) {
+ data += chunk;
+ });
+
+ response.addListener('end', function() {
+ parser.parseString(data);
+ })
+
+ });
+
+ parser.addListener('end', feedSaveFunc);
+}
+
+exports.saveItem = function(item, db, uniqueKey) {
+
+ var _id = crypto.createHash('md5').update(item[uniqueKey]).digest("hex");
+
+ db.get(_id, function(err, doc) {
+ if (err) {
+ if (err.couchDbError == "not_found") {
+ db.put(_id, item, function(err, result) {
+ if (err) return sys.error(err.stack);
+ sys.log( "Created " + _id);
+ });
+ }
+ }
+ });
+}
View
6 fulltext/by_content/index.js
@@ -1,6 +0,0 @@
-function(doc) {
- var ret = new Document();
- ret.add(doc.object.content);
- ret.add(doc.object.summary);
- return ret;
-}
View
5 fulltext/by_feed/index.js
@@ -1,5 +0,0 @@
-function(doc) {
- var ret = new Document();
- ret.add(doc.feedMeta.link);
- return ret;
-}
View
1  language
@@ -1 +0,0 @@
-javascript
1  lib/node-couchdb-min
@@ -0,0 +1 @@
+Subproject commit a7b9f27426160c0eb20b3d44db698a9cad991519
View
21 lists/feed.js
@@ -1,21 +0,0 @@
-function(head, req){
- start({"headers": {"Content-Type" : "application/json;charset=utf-8"}});
- if ('callback' in req.query) send(req.query['callback'] + "(");
- var started = false;
- send("{\"items\": [\n");
- while(row = getRow()){
- for(var item in row.value.feed){
- item = row.value.feed[item];
- if(started) send(",\n");
- send(JSON.stringify({
- postedTime: item.postedTime,
- object: item.object,
- actor: item.actor,
- verb: item.verb
- }));
- started = true;
- }
- }
- send("]}\n");
- if ('callback' in req.query) send(")");
-}
View
133 main.js
@@ -0,0 +1,133 @@
+// from https://github.com/szayat/node.couch.js
+
+var request = require('request')
+ , sys = require('sys')
+ , events = require('events')
+ , path = require('path')
+ , querystring = require('querystring')
+ , child = require('child_process')
+ , url = require('url')
+ ;
+
+var headers = {'content-type':'application/json', 'accept':'application/json'}
+
+function createDatabaseListener (uri, db) {
+ var parsedUri = url.parse(uri);
+ var didExist = !! db ;
+ if (!db) db = {
+ ddocs : {}
+ , ids : []
+ , onChange: function (change) {
+ db.seq = change.seq;
+ if (change.id && change.id.slice(0, '_design/'.length) === '_design/') {
+ db.onDesignDoc(change.doc);
+ }
+ db.ids.forEach(function (id) {
+ db.ddocs[id]._changes_process().stdin.write(JSON.stringify(["change", change, uri ])+'\n');
+ })
+ }
+ , onDesignDoc: function (doc) {
+ sys.puts(doc._id)
+ if (db.ddocs[doc._id] && db.ddocs[doc._id].changes) {
+ // take down the process
+ sys.puts("Stopping process for "+doc._id);
+ db.ddocs[doc._id]._changes_process().kill();
+ db.ids.splice(db.ids.indexOf(doc._id),1)
+ }
+
+ if (doc._deleted) {
+ delete db.ddocs[doc._id];
+ } else {
+ db.ddocs[doc._id] = doc;
+ if (doc.changes) {
+ // start up the process
+ sys.puts("Starting process for "+doc._id)
+ var p = child.spawn(process.execPath, [path.join(__dirname, 'child.js')]);
+ p.stderr.on("data", function (chunk) {sys.error(chunk.toString())})
+ p.stdin.write(JSON.stringify(["ddoc", doc])+'\n');
+ db.ddocs[doc._id]._changes_process = function(){return p};
+ db.ids.push(doc._id)
+ }
+ }
+ }
+ };
+
+ var changesStream = new events.EventEmitter();
+ changesStream.write = function (chunk) {
+ var line;
+ changesStream.buffer += chunk.toString();
+ while (changesStream.buffer.indexOf('\n') !== -1) {
+ line = changesStream.buffer.slice(0, changesStream.buffer.indexOf('\n'));
+ if (line.length > 1) db.onChange(JSON.parse(line));
+ changesStream.buffer = changesStream.buffer.slice(changesStream.buffer.indexOf('\n') + 1)
+ }
+ };
+ changesStream.end = function () {createDatabaseListener(uri, db)};
+ changesStream.buffer = '';
+ request({uri:uri, headers:headers}, function (error, resp, body) {
+
+ var qs;
+ if (error) throw error;
+ if (resp.statusCode > 299) {
+ // deal with deleted databases
+ var b = JSON.parse(body);
+ if ( didExist && body.error == "not_found" && body.reason == "no_db_file" ) {
+ sys.debug('database deleted: ' + uri );
+ return null;
+ }
+ else
+ throw new Error("Response error "+sys.inspect(resp)+'\n'+body);
+ }
+ if (!db.seq) db.seq = JSON.parse(body).update_seq
+ qs = querystring.stringify({include_docs: "true", feed: 'continuous', since: db.seq})
+ request({uri:uri+'/_changes?'+qs, responseBodyStream:changesStream}, function (err, resp, body) {
+ if ( err )
+ sys.debug(JSON.stringify(err));
+ });
+ request({uri:uri+'/_all_docs?startkey=%22_design%2F%22&endkey=%22_design0%22&include_docs=true'},
+ function (err, resp, body) {
+ if (err) throw err;
+ if (resp.statusCode > 299) throw new Error("Response error "+sys.inspect(resp)+'\n'+body);
+ JSON.parse(body).rows.forEach(function (row) {
+ if (!db.ddocs[row.id]) db.onDesignDoc(row.doc);
+ });
+ })
+ })
+
+ return db
+}
+
+function createService (uri, interval) {
+ if (uri[uri.length - 1] !== '/') uri += '/';
+ var dbs = {};
+ var service = {};
+
+ var setup = function () {
+ var starttime = new Date();
+ request({uri:uri+'_all_dbs', headers:headers}, function (error, resp, body) {
+ if (error) throw error;
+ if (resp.statusCode > 299) throw new Error("Response error "+sys.inspect(resp)+'\n'+body)
+ JSON.parse(body).forEach(function (db) {
+ if (!dbs[db]) {
+ dbs[db] = createDatabaseListener(uri+db);
+ // deal with deleted database. TODO: I'm I leaking memory somewhere??
+ if ( ! dbs[db] )
+ delete deb[db];
+ else
+ if (service.onDatabase) server.onDatabase(db, dbs[db]);
+ }
+ })
+ var endtime = new Date();
+ setTimeout(setup, interval ? interval : (((endtime - starttime) * 5) + 1000));
+ })
+ }
+ setup();
+
+ return service;
+}
+
+if (require.main == module) {
+ var uri = process.argv[process.argv.length - 1];
+ sys.puts('Finding changes listeners on '+uri)
+ createService(uri);
+}
View
12 rewrites.json
@@ -1,12 +0,0 @@
-[
- {
- "from": "xml",
- "to": "_show/challenge",
- "method": "GET"
- },
- {
- "from":"xml",
- "to": "_update/save",
- "method": "POST"
- }
-]
View
3  shows/challenge.js
@@ -1,3 +0,0 @@
-function(head, req){
- return req.query["hub.challenge"];
-}
View
7 updates/save.js
@@ -1,7 +0,0 @@
-function(doc, req){
- var lib = require('vendor/xmlToActivityStreamJson');
- data = {}
- data['feed'] = lib.xmlToActivityStreamJson(req.body);
- data['_id'] = req.uuid;
- return [data, "posted"]
-}
View
97 vendor/xmlToActivityStreamJson.js
@@ -1,97 +0,0 @@
-//this is adapted from @daleharvey's codez
-
-exports.xmlToActivityStreamJson = function(xml) {
- function zeroPad(n) {
- return n < 10 ? '0' + n : n;
- }
-
- function rfc3339(date) {
- return date.getUTCFullYear() + '-' +
- zeroPad(date.getUTCMonth() + 1) + '-' +
- zeroPad(date.getUTCDate()) + 'T' +
- zeroPad(date.getUTCHours()) + ':' +
- zeroPad(date.getUTCMinutes()) + ':' +
- zeroPad(date.getUTCSeconds()) + 'Z';
- };
-
- var i, item, body, date, data = [],
- re = /^<\?xml\s+version\s*=\s*(["'])[^\1]+\1[^?]*\?>/,
- str = xml.replace(re, ""),
- feed = new XML(str);
-
- // this is nasty, but its rss, its supposed to be nasty
- // duck type rss vs atom
- if (feed.channel.length() > 0) {
-
- for (i = 0; i < feed.channel.item.length(); i++) {
- item = feed.channel.item[i];
- body = item.description.toString();
- date = new Date(item.pubDate.toString());
-
- if (!date) {
- date = new Date();
- }
-
- var geo = new Namespace('http://www.georss.org/georss');
- var location = item..geo::point.toString().split(' ');
-
- var parsed = {
- "postedTime" : rfc3339(date),
- "object" : {
- "content" : body,
- "permalinkUrl" : item.link.toString(),
- "objectType" : "article",
- "summary" : item.title.toString(),
- "location" : location
- },
- "verb" : "post",
- "actor" : {
- "permalinkUrl" : link,
- "objectType" : "service",
- "displayName" : feed.channel.title.toString()
- }
- }
-
- data = data.concat();
- }
- } else {
- default xml namespace="http://www.w3.org/2005/Atom";
- for each (item in feed..entry) {
- body = item.content.toString();
- var dateString = item.updated.toString();
- if (dateString == "") dateString = item.published.toString();
- if (dateString == "") dateString = null;
-
- date = new Date(dateString);
-
- var link = "";
- if('link' in item) link = item.link[0].@href.toString();
-
- var geo = new Namespace('http://www.georss.org/georss');
- var location = item..geo::point.toString().split(' ');
-
- data = data.concat({
- "postedTime" : rfc3339(date),
- "object" : {
- "content" : body,
- "permalinkUrl" : link,
- "objectType" : "article",
- "summary" : item.title.toString(),
- "location" : location
- },
- "verb" : "post",
- "actor" : {
- "permalinkUrl" : link,
- "objectType" : "service",
- "displayName" : feed.title.toString()
- }
- });
- }
- }
- return data.concat({
- "feedMeta" : {
- "link" : feed.link[0].@href.toString(),
- "raw" : xml
- }
- });
-}
View
5 views/by_feed/map.js
@@ -1,5 +0,0 @@
-function(doc) {
- if (doc.feedMeta.link && doc.postedTime) {
- emit(doc.feedMeta.link, doc);
- }
-}
View
5 views/feeds/map.js
@@ -1,5 +0,0 @@
-function(doc) {
- if (doc.feedMeta.link) {
- emit(doc.feedMeta.link, doc);
- }
-}
View
7 views/feeds/reduce.js
@@ -1,7 +0,0 @@
-function(keys, values, rereduce) {
- if (rereduce) {
- return sum(values);
- } else {
- return values.length;
- }
-}
View
5 views/recent/map.js
@@ -1,5 +0,0 @@
-function(doc) {
- if (doc.feed[0].postedTime) {
- emit(doc.feed[0].postedTime, doc);
- }
-}
Please sign in to comment.
Something went wrong with that request. Please try again.