Permalink
Browse files

WIP; streaming changes

  • Loading branch information...
1 parent a3ab1b8 commit e2b80df5c65a2e3e574056f4abef236ee6f32577 @jedp committed May 9, 2012
Showing with 113 additions and 11 deletions.
  1. +2 −0 .gitignore
  2. +21 −0 lib/api.js
  3. +1 −1 lib/config.js
  4. +38 −6 lib/db.js
  5. +27 −4 lib/server.js
  6. +24 −0 test/store.js
View
@@ -13,3 +13,5 @@ env.sh
node_modules
.gitmodules
+# awsbox setup notes
+awsbox_notes.txt
View
@@ -1,5 +1,7 @@
var http = require('http');
var qs = require('querystring');
+var events = require('events');
+var util = require('util');
function makeQueryString(options) {
var values = [];
@@ -18,9 +20,14 @@ function makeQueryString(options) {
}
var KPIggyBank = function KPIggyBank(host, port) {
+ events.EventEmitter.call(this);
this.host = host;
this.port = port;
+ this._followingChanges = false;
+
+ return this;
};
+util.inherits(KPIggyBank, events.EventEmitter);
KPIggyBank.prototype.saveData = function saveData(interaction_data, callback) {
callback = callback || function() {};
@@ -57,6 +64,13 @@ KPIggyBank.prototype.saveData = function saveData(interaction_data, callback) {
req.end();
};
+KPIggyBank.prototype.followChanges = function() {
+ if (this._followingChanges) return (null);
+
+
+
+};
+
/*
* a utility GET function for api calls
*/
@@ -103,6 +117,13 @@ KPIggyBank.prototype.count = function(callback) {
});
};
+KPIggyBank.prototype.changes = function(callback) {
+ var self = this;
+ this._get('/wsapi/interaction_data/stream', {}, function(err, data) {
+ self.emit('change', data);
+ });
+};
+
/*
* fetchRange(options, callback) -> [err, list]
*/
View
@@ -13,7 +13,7 @@ module.exports = {
couchdb_pass: process.env['KPIG_COUCHDB_PASS'] || "kpiggybank",
server_host: process.env['KPIG_SERVER_HOST'] || "127.0.0.1",
- server_port: parseInt((process.env['KPIG_SERVER_PORT'] || "3000"), 10),
+ server_port: parseInt((process.env['KPIG_SERVER_PORT'] || "80"), 10),
server_mode: process.env['KPIG_SERVER_MODE'] || "dev"
}
View
@@ -1,26 +1,57 @@
var cradle = require('cradle');
+var events = require('events');
+var util = require('util');
function do_nothing() {};
var Database = function Database(config, callback) {
+ events.EventEmitter.call(this);
+ var self = this;
+
if (arguments.length < 2) {
callback = config || do_nothing;
config = require('./config');
}
this.conn = new (cradle.Connection)(config.couchdb_host, config.couchdb_port);
+ this.db = this.conn.database(config.couchdb_db);
+
+ this._maybeCreateDB(function(err, created) {
+ if (err) return callback(err);
+
+
+ var feed = self.db.changes();
+ feed.on('error', function(err) {
+ console.error(err);
+ // require clients to handle error?
+ // seems weird, since we're otherwise not an event emitter
+ //self.emit('error', err);
+ });
+
+ feed.on('change', function(change) {
+ self.emit('change', change);
+ });
+
+ return callback(null, created);
+ });
+
+};
+util.inherits(Database, events.EventEmitter);
+
+Database.prototype._maybeCreateDB = function(callback) {
+ var self = this;
// create the db if necessary
- var db = this.conn.database(config.couchdb_db);
- db.exists(function(err, exists) {
+ this.db.exists(function(err, exists) {
if (err) return callback(err);
if (! exists) {
- db.create(function(err, created) {
+ self.db.create(function(err, created) {
+ console.log("created -> " + created);
// data design document puts timestamp in key
// so you can do startkey/endkey queries
- db.save('_design/data', {
+ self.db.save('_design/data', {
// query all by timestamp
all: {
@@ -43,15 +74,16 @@ var Database = function Database(config, callback) {
}
}, function(err, result) {
+ console.log("saved " + result);
return callback(null, created);
});
});
} else {
+ console.log("already exists");
+
return callback(null, exists);
}
});
-
- this.db = db;
};
Database.prototype.save = function save(data, callback) {
View
@@ -68,10 +68,26 @@ app.get('/', function(req, res) {
*/
app.post('/wsapi/interaction_data', function(req, res) {
- // XXX for now just blindly store whatever
- // fire and forget
- db.save(req.body);
- res.writeHead(200);
+ if (req.body.length > 10000) {
+ // too large
+ res.writeHead(413);
+ return res.end();
+ }
+
+ try {
+ var data = req.body;
+ if (data.timestamp) {
+ db.save(data);
+ res.writeHead(200);
+ } else {
+ // bogus
+ res.writeHead(400);
+ }
+ } catch (e) {
+ console.error(e);
+ return res.end();
+ }
+
res.end();
});
@@ -110,6 +126,13 @@ app.get('/wsapi/interaction_data/count', function(req, res) {
}
});
+app.get('/wsapi/interaction_data/stream', function(req, res) {
+ res.writeHead(200);
+ db.on('change', function(change) {
+ res.write(JSON.stringify(change));
+ });
+});
+
if (!module.parent) {
start();
emitter.on('error', console.error);
View
@@ -135,6 +135,7 @@ vows.describe("Blob storage")
kpiggybankProcess = spawn('node', [server_exec]);
kpiggybankProcess.stdout.on('data', function(buf) {
buf.toString().split("\n").forEach(function(line) {
+ console.log("subprocess: " + line);
if (/kpiggybank listening/.test(line)) {
return cb(null, true);
}
@@ -232,6 +233,29 @@ vows.describe("Blob storage")
}
})
+/*
+ * in progress ...
+
+.addBatch({
+ "Streaming": {
+ topic: function() {
+ var api = new API(config.server_host, config.server_port);
+ console.log("setting up streaming");
+ api.followChanges();
+ console.log("followed changes");
+ api.on('change', this.callback);
+ console.log("registered handler");
+ api.saveData(makeBlob(1336575273), function(err,res) { console.log("saved: " + err + " " + res)});
+ },
+
+ "works": function(blob) {
+ console.log("got " + err + " " + blob);
+ assert(blob.timestamp === 1336575273);
+ }
+ }
+})
+*/
+
.addBatch({
"In the end": {
topic: function() {

0 comments on commit e2b80df

Please sign in to comment.