diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5c23cac --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2011 Max Ogden + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index e1e624f..a6b488b 100644 --- a/README.md +++ b/README.md @@ -1,97 +1 @@ -# PubSubHubbub Subscribing for CouchDB - -Make CouchDB accept and store PubSubHubbub feeds! Original concept by @rdfturtle (Tyler Gillies) - -# Why is this cool? - -So I reckon you have some old and busted RSS or ATOM feeds that you need to pull from whenever you want updates, right? - -Well, little buddy, it's time to enter the world of push! - -Hook your pull feeds into the magic that is PubSubHubbubb (superfeedr.com does this for you... _for free!_) and they will push to a specific PubSubHubbubb subscriber endpoint whenever they have new content. - -You might be asking "where am I going to get a valid PubSubHubbubb subscriber endpoint to store all of these awesome feed pushes"? Well, i'm glad you asked! - -With Couchpubtato, you can make any Couch database act like a valid PubSubHubbubb subscriber endpoint. - -# I still don't get it - -Ok, so lets say you want to embed a feed of upcoming calendar events on your _sweet blog_, but the calendar page only has a junky RSS feed! You can use Superfeedr to push new RSS updates in realtime to a CouchDB of your choice, which turns your previously junky RSS feed into a full JSONP enabled API for maximum client side widget goodness! - -Here's an example of this whole thingy working: I log all Portland, OR 911 calls and make them available as ActivityStreams here: http://pdxapi.com/pdx911/feed?descending=true&limit=5 - -# ActivityStreams - -By default this will convert any incoming XML RSS/ATOM feed data into JSON [ActivityStreams](http://activitystrea.ms) format. ActivityStreams are the new hotness, pure XML ATOM/RSS is old and busted. Here's an example of an ActivityStreams formatted feed item: - - { - "postedTime":"2010-10-14T00:58:32Z", - "object": { - "permalinkUrl": "http://rss.cnn.com/~r/rss/cnn_latest/~3/s52R1lImWu0/index.html", - "objectType": "article", - "summary": "O'Donnell, Coons stage feisty debate in Delaware" - }, - "verb": "post", - "actor": { - "permalinkUrl": "http://rss.cnn.com/~r/rss/cnn_latest/~3/s52R1lImWu0/index.html", - "objectType": "service", - "displayName": "CNN.com Recently Published/Updated" - } - } - -# Quick install - -* Get a free hosted Couch from [CouchOne](http://couchone.com/get) and a free Superfeedr subscriber account from [Superfeedr](http://superfeedr.com) -* Make a new database on your couch: curl -X PUT http://YOURCOUCH/DBNAME -* Copy Couchpubtato to the new db: curl -X POST http://YOURCOUCH/\_replicate -d '{"source":"http://max.couchone.com/apps","target":"apps", "doc\_ids":["_design/push"]}' -* Tell Superfeedr to store an XML feed in your Couch: curl -X POST http://superfeedr.com/hubbub -u'SUPERFEEDRUSERNAME:SUPERFEEDRPASSWORD' -d'hub.mode=subscribe' -d'hub.verify=sync' -d'hub.topic=YOURFEEDURL' -d'hub.callback=http://YOURCOUCH/DBNAME/\_design/push/_rewrite/xml' -D- - -# In-depth install - -You can use any CouchDB hosted pubicly (so that Superfeedr can post updates to it), but I'll assume you're working with a free Couch hosted by [CouchOne](http://couchone.com/get). You'll have to create a new database to store your data. I'll call mine awesome-events. You can do this from http://YOURCOUCH/_utils. - -You can either replicate the couchapp from my couch [max.couchone.com/apps/_design/push](http://max.couchone.com/apps/_design/push) (quickest option) or, if you want to hack on the Couchpubtato source code first, you'll need to install the [CouchApp command line utility](http://couchapp.org/page/installing) and check out this repo. - -If you want to hack on Couchpubtato/build it yourself, once you have the couchapp utility working, git clone this repo and go into this folder and execute couchapp init. To upload Couchpubtato into your couch just run couchapp push http://YOURCOUCH/DATABASENAME. Otherwise see the Quick install section above. - -When you push Couchpubtato into your Couch it will enhance your new database with the magical PubSubHubbubb sprinkles contained in Couchpubtato and teach your database how to store PubSubHubbubb data. - -Once your database is ready to store data, we need to use Superfeedr to hook up a feed to dump into your new PubSubHubbubb enabled Couch database. - -Now go get a free Subscriber account at [Superfeedr](http://superfeedr.com) - -To subscribe to a feed, use the following curl command: - -curl -X POST http://superfeedr.com/hubbub -u'SUPERFEEDRUSERNAME:SUPERFEEDRPASSWORD' -d'hub.mode=subscribe' -d'hub.verify=sync' -d'hub.topic=YOURFEEDURL' -d'hub.callback=http://YOURCOUCH/DATABASENAME/\_design/push/_rewrite/xml' -D- - -It should return a 204 if everything was fine, and if not, it will indicate what was wrong in the BODY. If you don't like curl, there also exist PubSubHubbub libraries in many languages. - -Now any feed updates will get sent to your Couch and will be converted and saved as JSON ActivityStreams objects. - -# Known issues - -Superfeedr will send multiple feed updates in a single update, so if 3 new items were posted to your feed, it will send a single XML update containing 3 items. Couch currently can only create a single document for each update, so you may see documents that contain multiple feed items. A more desirable situation would be a 1:1 ratio of documents in Couch to feed items. - -# License - -The MIT License - -Copyright (c) 2010 Max Ogden and Tyler Gillies - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file +work in progress \ No newline at end of file diff --git a/_id b/_id deleted file mode 100644 index 975cacd..0000000 --- a/_id +++ /dev/null @@ -1 +0,0 @@ -_design/push \ No newline at end of file diff --git a/child.js b/child.js new file mode 100644 index 0000000..302e2e1 --- /dev/null +++ b/child.js @@ -0,0 +1,35 @@ +// from https://github.com/szayat/node.couch.js + +var Script = process.binding('evals').Script; +var stdin = process.openStdin(); + +stdin.setEncoding('utf8'); + +var buffer = ''; +var listener; + +var loadModule = function (doc) { + var wrapper = "(function (exports, require, module, __filename, __dirname) { " + + doc.changes + + "\n});"; + + var module = {exports:{},id:'changes'} + + var compiledWrapper = process.compile(wrapper, doc.changes); + var p = compiledWrapper.apply(doc, [module.exports, require, module]); + return module.exports; +} + +stdin.on('data', function (chunk) { + buffer += chunk.toString(); + while (buffer.indexOf('\n') !== -1) { + line = buffer.slice(0, buffer.indexOf('\n')); + buffer = buffer.slice(buffer.indexOf('\n') + 1); + var obj = JSON.parse(line); + if ((obj[0]) === "ddoc") { + listener = loadModule(obj[1]).listener; + } else if (obj[0] === "change") { + listener(obj[1],obj[2]); + } + } +}); diff --git a/couchapp.json b/couchapp.json deleted file mode 100644 index d0d643e..0000000 --- a/couchapp.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "name": "CouchApp PubSubHubbub Subscriptions", - "description": "A Simple CouchApp PubSub Subscriber" -} diff --git a/couchapp/.couchappignore b/couchapp/.couchappignore new file mode 100644 index 0000000..8b9f9dd --- /dev/null +++ b/couchapp/.couchappignore @@ -0,0 +1,7 @@ +[ + // filenames matching these regexps will not be pushed to the database + // uncomment to activate; separate entries with "," + // ".*~$" + // ".*\\.swp$" + // ".*\\.bak$" +] \ No newline at end of file diff --git a/couchapp/changes.js b/couchapp/changes.js new file mode 100644 index 0000000..10b1e98 --- /dev/null +++ b/couchapp/changes.js @@ -0,0 +1,19 @@ +var sys = require('sys') +, couchdb = require('./lib/node-couchdb-min/couchdb') +, archiver = require('./feed-archiver'); + +exports.listener = function(change) { + if (!change.doc) return; + + if (change.doc.feed && change.doc.db) { + + var feedSaveFunc = function (data) { + sys.debug(sys.inspect(data)); + for (var item in data.channel.item) { + archiver.saveItem(data.channel.item[item], new couchdb.Db(change.doc.db), "description"); + } + } + + archiver.processFeed(change.doc.feed, feedSaveFunc); + } +} diff --git a/feed-archiver.js b/feed-archiver.js new file mode 100644 index 0000000..4754cf0 --- /dev/null +++ b/feed-archiver.js @@ -0,0 +1,54 @@ +#!/usr/bin/env node + +var fs = require('fs') + , sys = require('sys') + , xml2js = require('xml2js') + , url = require('url') + , http = require('http') + , crypto = require('crypto') + , parser = new xml2js.Parser(); + +var debug = false; + +exports.processFeed = function(feedUrl, feedSaveFunc) { + + var feed = url.parse(feedUrl); + + var feedClient = http.createClient(80, feed.host); + + var request = feedClient.request("GET", feed.pathname, {'host' : feed.host}); + request.end(); + + request.on('response', function(response) { + response.setEncoding('utf8'); + + var data; + + response.on('data', function(chunk) { + data += chunk; + }); + + response.addListener('end', function() { + parser.parseString(data); + }) + + }); + + parser.addListener('end', feedSaveFunc); +} + +exports.saveItem = function(item, db, uniqueKey) { + + var _id = crypto.createHash('md5').update(item[uniqueKey]).digest("hex"); + + db.get(_id, function(err, doc) { + if (err) { + if (err.couchDbError == "not_found") { + db.put(_id, item, function(err, result) { + if (err) return sys.error(err.stack); + sys.log( "Created " + _id); + }); + } + } + }); +} \ No newline at end of file diff --git a/fulltext/by_content/index.js b/fulltext/by_content/index.js deleted file mode 100644 index 31f06e7..0000000 --- a/fulltext/by_content/index.js +++ /dev/null @@ -1,6 +0,0 @@ -function(doc) { - var ret = new Document(); - ret.add(doc.object.content); - ret.add(doc.object.summary); - return ret; -} \ No newline at end of file diff --git a/fulltext/by_feed/index.js b/fulltext/by_feed/index.js deleted file mode 100644 index b83b3ae..0000000 --- a/fulltext/by_feed/index.js +++ /dev/null @@ -1,5 +0,0 @@ -function(doc) { - var ret = new Document(); - ret.add(doc.feedMeta.link); - return ret; -} \ No newline at end of file diff --git a/language b/language deleted file mode 100644 index f504a95..0000000 --- a/language +++ /dev/null @@ -1 +0,0 @@ -javascript \ No newline at end of file diff --git a/lib/node-couchdb-min b/lib/node-couchdb-min new file mode 160000 index 0000000..a7b9f27 --- /dev/null +++ b/lib/node-couchdb-min @@ -0,0 +1 @@ +Subproject commit a7b9f27426160c0eb20b3d44db698a9cad991519 diff --git a/lists/feed.js b/lists/feed.js deleted file mode 100644 index 2a97281..0000000 --- a/lists/feed.js +++ /dev/null @@ -1,21 +0,0 @@ -function(head, req){ - start({"headers": {"Content-Type" : "application/json;charset=utf-8"}}); - if ('callback' in req.query) send(req.query['callback'] + "("); - var started = false; - send("{\"items\": [\n"); - while(row = getRow()){ - for(var item in row.value.feed){ - item = row.value.feed[item]; - if(started) send(",\n"); - send(JSON.stringify({ - postedTime: item.postedTime, - object: item.object, - actor: item.actor, - verb: item.verb - })); - started = true; - } - } - send("]}\n"); - if ('callback' in req.query) send(")"); -} \ No newline at end of file diff --git a/main.js b/main.js new file mode 100644 index 0000000..1f3a519 --- /dev/null +++ b/main.js @@ -0,0 +1,133 @@ +// from https://github.com/szayat/node.couch.js + +var request = require('request') + , sys = require('sys') + , events = require('events') + , path = require('path') + , querystring = require('querystring') + , child = require('child_process') + , url = require('url') + ; + +var headers = {'content-type':'application/json', 'accept':'application/json'} + +function createDatabaseListener (uri, db) { + var parsedUri = url.parse(uri); + var didExist = !! db ; + if (!db) db = { + ddocs : {} + , ids : [] + , onChange: function (change) { + db.seq = change.seq; + if (change.id && change.id.slice(0, '_design/'.length) === '_design/') { + db.onDesignDoc(change.doc); + } + db.ids.forEach(function (id) { + db.ddocs[id]._changes_process().stdin.write(JSON.stringify(["change", change, uri ])+'\n'); + }) + } + , onDesignDoc: function (doc) { + sys.puts(doc._id) + if (db.ddocs[doc._id] && db.ddocs[doc._id].changes) { + // take down the process + sys.puts("Stopping process for "+doc._id); + db.ddocs[doc._id]._changes_process().kill(); + db.ids.splice(db.ids.indexOf(doc._id),1) + } + + if (doc._deleted) { + delete db.ddocs[doc._id]; + } else { + db.ddocs[doc._id] = doc; + if (doc.changes) { + // start up the process + sys.puts("Starting process for "+doc._id) + var p = child.spawn(process.execPath, [path.join(__dirname, 'child.js')]); + p.stderr.on("data", function (chunk) {sys.error(chunk.toString())}) + p.stdin.write(JSON.stringify(["ddoc", doc])+'\n'); + db.ddocs[doc._id]._changes_process = function(){return p}; + db.ids.push(doc._id) + } + } + } + }; + + var changesStream = new events.EventEmitter(); + changesStream.write = function (chunk) { + var line; + changesStream.buffer += chunk.toString(); + while (changesStream.buffer.indexOf('\n') !== -1) { + line = changesStream.buffer.slice(0, changesStream.buffer.indexOf('\n')); + if (line.length > 1) db.onChange(JSON.parse(line)); + changesStream.buffer = changesStream.buffer.slice(changesStream.buffer.indexOf('\n') + 1) + } + }; + changesStream.end = function () {createDatabaseListener(uri, db)}; + changesStream.buffer = ''; + request({uri:uri, headers:headers}, function (error, resp, body) { + + var qs; + if (error) throw error; + if (resp.statusCode > 299) { + // deal with deleted databases + var b = JSON.parse(body); + if ( didExist && body.error == "not_found" && body.reason == "no_db_file" ) { + sys.debug('database deleted: ' + uri ); + return null; + } + else + throw new Error("Response error "+sys.inspect(resp)+'\n'+body); + } + if (!db.seq) db.seq = JSON.parse(body).update_seq + qs = querystring.stringify({include_docs: "true", feed: 'continuous', since: db.seq}) + request({uri:uri+'/_changes?'+qs, responseBodyStream:changesStream}, function (err, resp, body) { + if ( err ) + sys.debug(JSON.stringify(err)); + }); + request({uri:uri+'/_all_docs?startkey=%22_design%2F%22&endkey=%22_design0%22&include_docs=true'}, + function (err, resp, body) { + if (err) throw err; + if (resp.statusCode > 299) throw new Error("Response error "+sys.inspect(resp)+'\n'+body); + JSON.parse(body).rows.forEach(function (row) { + if (!db.ddocs[row.id]) db.onDesignDoc(row.doc); + }); + }) + }) + + return db +} + +function createService (uri, interval) { + if (uri[uri.length - 1] !== '/') uri += '/'; + var dbs = {}; + var service = {}; + + var setup = function () { + var starttime = new Date(); + request({uri:uri+'_all_dbs', headers:headers}, function (error, resp, body) { + if (error) throw error; + if (resp.statusCode > 299) throw new Error("Response error "+sys.inspect(resp)+'\n'+body) + JSON.parse(body).forEach(function (db) { + if (!dbs[db]) { + dbs[db] = createDatabaseListener(uri+db); + // deal with deleted database. TODO: I'm I leaking memory somewhere?? + if ( ! dbs[db] ) + delete deb[db]; + else + if (service.onDatabase) server.onDatabase(db, dbs[db]); + } + }) + var endtime = new Date(); + setTimeout(setup, interval ? interval : (((endtime - starttime) * 5) + 1000)); + }) + } + setup(); + + return service; +} + +if (require.main == module) { + var uri = process.argv[process.argv.length - 1]; + sys.puts('Finding changes listeners on '+uri) + createService(uri); +} \ No newline at end of file diff --git a/rewrites.json b/rewrites.json deleted file mode 100644 index 612bdca..0000000 --- a/rewrites.json +++ /dev/null @@ -1,12 +0,0 @@ -[ - { - "from": "xml", - "to": "_show/challenge", - "method": "GET" - }, - { - "from":"xml", - "to": "_update/save", - "method": "POST" - } -] \ No newline at end of file diff --git a/shows/challenge.js b/shows/challenge.js deleted file mode 100644 index 458a227..0000000 --- a/shows/challenge.js +++ /dev/null @@ -1,3 +0,0 @@ -function(head, req){ - return req.query["hub.challenge"]; -} \ No newline at end of file diff --git a/updates/save.js b/updates/save.js deleted file mode 100644 index 642f22f..0000000 --- a/updates/save.js +++ /dev/null @@ -1,7 +0,0 @@ -function(doc, req){ - var lib = require('vendor/xmlToActivityStreamJson'); - data = {} - data['feed'] = lib.xmlToActivityStreamJson(req.body); - data['_id'] = req.uuid; - return [data, "posted"] -} \ No newline at end of file diff --git a/vendor/xmlToActivityStreamJson.js b/vendor/xmlToActivityStreamJson.js deleted file mode 100644 index b5ccea0..0000000 --- a/vendor/xmlToActivityStreamJson.js +++ /dev/null @@ -1,97 +0,0 @@ -//this is adapted from @daleharvey's codez - -exports.xmlToActivityStreamJson = function(xml) { - function zeroPad(n) { - return n < 10 ? '0' + n : n; - } - - function rfc3339(date) { - return date.getUTCFullYear() + '-' + - zeroPad(date.getUTCMonth() + 1) + '-' + - zeroPad(date.getUTCDate()) + 'T' + - zeroPad(date.getUTCHours()) + ':' + - zeroPad(date.getUTCMinutes()) + ':' + - zeroPad(date.getUTCSeconds()) + 'Z'; - }; - - var i, item, body, date, data = [], - re = /^<\?xml\s+version\s*=\s*(["'])[^\1]+\1[^?]*\?>/, - str = xml.replace(re, ""), - feed = new XML(str); - - // this is nasty, but its rss, its supposed to be nasty - // duck type rss vs atom - if (feed.channel.length() > 0) { - - for (i = 0; i < feed.channel.item.length(); i++) { - item = feed.channel.item[i]; - body = item.description.toString(); - date = new Date(item.pubDate.toString()); - - if (!date) { - date = new Date(); - } - - var geo = new Namespace('http://www.georss.org/georss'); - var location = item..geo::point.toString().split(' '); - - var parsed = { - "postedTime" : rfc3339(date), - "object" : { - "content" : body, - "permalinkUrl" : item.link.toString(), - "objectType" : "article", - "summary" : item.title.toString(), - "location" : location - }, - "verb" : "post", - "actor" : { - "permalinkUrl" : link, - "objectType" : "service", - "displayName" : feed.channel.title.toString() - } - } - - data = data.concat(); - } - } else { - default xml namespace="http://www.w3.org/2005/Atom"; - for each (item in feed..entry) { - body = item.content.toString(); - var dateString = item.updated.toString(); - if (dateString == "") dateString = item.published.toString(); - if (dateString == "") dateString = null; - - date = new Date(dateString); - - var link = ""; - if('link' in item) link = item.link[0].@href.toString(); - - var geo = new Namespace('http://www.georss.org/georss'); - var location = item..geo::point.toString().split(' '); - - data = data.concat({ - "postedTime" : rfc3339(date), - "object" : { - "content" : body, - "permalinkUrl" : link, - "objectType" : "article", - "summary" : item.title.toString(), - "location" : location - }, - "verb" : "post", - "actor" : { - "permalinkUrl" : link, - "objectType" : "service", - "displayName" : feed.title.toString() - } - }); - } - } - return data.concat({ - "feedMeta" : { - "link" : feed.link[0].@href.toString(), - "raw" : xml - } - }); -} \ No newline at end of file diff --git a/views/by_feed/map.js b/views/by_feed/map.js deleted file mode 100644 index 2598a36..0000000 --- a/views/by_feed/map.js +++ /dev/null @@ -1,5 +0,0 @@ -function(doc) { - if (doc.feedMeta.link && doc.postedTime) { - emit(doc.feedMeta.link, doc); - } -} \ No newline at end of file diff --git a/views/feeds/map.js b/views/feeds/map.js deleted file mode 100644 index 3cbc90b..0000000 --- a/views/feeds/map.js +++ /dev/null @@ -1,5 +0,0 @@ -function(doc) { - if (doc.feedMeta.link) { - emit(doc.feedMeta.link, doc); - } -} diff --git a/views/feeds/reduce.js b/views/feeds/reduce.js deleted file mode 100644 index ba01d16..0000000 --- a/views/feeds/reduce.js +++ /dev/null @@ -1,7 +0,0 @@ -function(keys, values, rereduce) { - if (rereduce) { - return sum(values); - } else { - return values.length; - } -} \ No newline at end of file diff --git a/views/recent/map.js b/views/recent/map.js deleted file mode 100644 index b0dd599..0000000 --- a/views/recent/map.js +++ /dev/null @@ -1,5 +0,0 @@ -function(doc) { - if (doc.feed[0].postedTime) { - emit(doc.feed[0].postedTime, doc); - } -}