Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit fe94b1110bd0e5264dc37a4421756333cf7c4d08 0 parents
@scttnlsn authored
1  .gitignore
@@ -0,0 +1 @@
+node_modules/
4 .npmignore
@@ -0,0 +1,4 @@
+.DS_Store
+.git*
+node_modules/
+test/
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (C) 2012 Scott Nelson
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
4 Makefile
@@ -0,0 +1,4 @@
+test:
+ ./node_modules/.bin/mocha --reporter list --timeout 10000
+
+.PHONY: test
66 README.md
@@ -0,0 +1,66 @@
+Nettle
+======
+
+On-the-fly processing framework for Node.js and MongoDB.
+
+Install
+-------
+
+ npm install nettle
+
+Stores
+------
+
+A store provides a simple interface to the GridFS entities in a MongoDB database. It
+provides basic `put` and `get` operations and allows the data to be arbitrarily processed.
+
+ var nettle = require('nettle');
+ var store = nettle.store({ db: 'example' });
+
+Optionally specify `host`, `port` and collection `prefix` (defaults to `'fs'`) when
+creating a store.
+
+Simple example of inserting a buffer into the store:
+
+ store.put(new Buffer('foo'), function(err, doc) {
+ console.log(doc._id);
+ });
+
+And later retrieving it:
+
+ store.get(id, function(err, buffer) {
+ console.log(buffer.toString());
+ });
+
+ // -> 'foo'
+
+Processing
+----------
+
+A processor is an object with two properties: `key` and `process`. `key` must be a string
+(or a function that returns a string) and `process` must be a function accepting a buffer and
+callback. An example string processor might look something like this:
+
+ var Concat = function(str) {
+ this.key = function() {
+ return 'concat_' + str;
+ },
+
+ this.process = function(buffer, callback) {
+ callback(null, new Buffer(buffer.toString() + str));
+ }
+ };
+
+ var processor = new Concat('bar');
+
+One can then retrieve processed entities by passing the processor to `get`:
+
+ store.get(id, processor, function(err, buffer) {
+ console.log(buffer.toString());
+ });
+
+ // -> 'foobar'
+
+Note that processed entities are stored and later retrieved by the given `id`
+and processor `key`. The `process` function will thus only be called on a cache
+miss.
32 lib/collection.js
@@ -0,0 +1,32 @@
+var mongo = require('mongodb');
+var Promise = require('./promise');
+
+module.exports = Collection;
+
+function Collection(name, db) {
+ this.name = name;
+ this.db = db;
+ this.collection = new Promise();
+
+ var self = this;
+ this.db.add(function(err, db) {
+ if (err) return self.collection.resolve(err);
+
+ var collection = new mongo.Collection(db, self.name);
+ self.collection.resolve(null, collection);
+ });
+};
+
+Collection.prototype.read = function(id, callback) {
+ this.collection.add(function(err, collection) {
+ if (err) return callback(err);
+ collection.findOne({ _id: id }, callback);
+ });
+};
+
+Collection.prototype.save = function(id, data, callback) {
+ this.collection.add(function(err, collection) {
+ if (err) return callback(err);
+ collection.update({ _id: id }, { '$set': data }, { upsert: true }, callback);
+ });
+};
47 lib/connection.js
@@ -0,0 +1,47 @@
+var mongo = require('mongodb');
+var Collection = require('./collection');
+var Grid = require('./grid');
+var MetaCache = require('./metacache');
+var Promise = require('./promise');
+var Store = require('./store');
+
+module.exports = Connection;
+
+function Connection(options) {
+ this.options = options || {};
+ this.options.host || (this.options.host = 'localhost');
+ this.options.port || (this.options.port = 27017);
+
+ this.db = new Promise();
+ this.collections = {};
+ this.grids = {};
+};
+
+Connection.prototype.collection = function(name) {
+ this.collections[name] || (this.collections[name] = new Collection(name, this.db));
+ return this.collections[name];
+};
+
+Connection.prototype.grid = function(name) {
+ this.grids[name] || (this.grids[name] = new Grid(name, this.db));
+ return this.grids[name];
+};
+
+Connection.prototype.store = function(name) {
+ var entities = this.grid(name);
+ var cache = this.grid(name + '.cache');
+ var metacache = new MetaCache(this.collection(name + '.metacache'));
+
+ return new Store(entities, cache, metacache);
+};
+
+Connection.prototype.open = function(callback) {
+ var self = this;
+ var server = new mongo.Server(this.options.host, this.options.port, {});
+ var db = new mongo.Db(this.options.db, server);
+
+ db.open(function(err, db) {
+ self.db.resolve(err, db);
+ if (callback) callback(err, db);
+ });
+};
32 lib/grid.js
@@ -0,0 +1,32 @@
+var mongo = require('mongodb');
+var Promise = require('./promise');
+
+module.exports = Grid;
+
+function Grid(name, db) {
+ this.name = name;
+ this.db = db;
+ this.grid = new Promise();
+
+ var self = this;
+ this.db.add(function(err, db) {
+ if (err) return self.grid.resolve(err);
+
+ var grid = new mongo.Grid(db, self.name);
+ self.grid.resolve(null, grid);
+ });
+};
+
+Grid.prototype.get = function(id, callback) {
+ this.grid.add(function(err, grid) {
+ if (err) return callback(err);
+ grid.get(id, callback);
+ });
+};
+
+Grid.prototype.put = function(buffer, callback) {
+ this.grid.add(function(err, grid) {
+ if (err) return callback(err);
+ grid.put(buffer, callback);
+ });
+};
16 lib/index.js
@@ -0,0 +1,16 @@
+var Connection = require('./connection');
+
+exports.Connection = Connection;
+
+exports.connect = function(options, callback) {
+ var connection = new Connection(options);
+ connection.open(callback);
+ return connection;
+};
+
+exports.store = function(options, callback) {
+ if (options.db === undefined) throw new Error('No `db` specified');
+
+ var connection = exports.connect(options, callback);
+ return connection.store(options.prefix || 'fs');
+};
19 lib/metacache.js
@@ -0,0 +1,19 @@
+module.exports = MetaCache;
+
+function MetaCache(collection) {
+ this.collection = collection;
+};
+
+MetaCache.prototype.store = function(id, key, cached, callback) {
+ var cache = {};
+ cache[key] = cached;
+ this.collection.save(id, cache, callback);
+};
+
+MetaCache.prototype.lookup = function(id, key, callback) {
+ this.collection.read(id, function(err, doc) {
+ if (err) return callback(err);
+ if (!doc) return callback(null, null);
+ callback(null, doc[key]);
+ });
+};
30 lib/promise.js
@@ -0,0 +1,30 @@
+module.exports = Promise;
+
+function Promise(context) {
+ this.context = context;
+ this.args = null;
+ this.callbacks = [];
+ this.resolved = false;
+};
+
+Promise.prototype.add = function(callback) {
+ if (this.resolved) {
+ callback.apply(this.context, this.args);
+ } else {
+ this.callbacks.push(callback);
+ }
+};
+
+Promise.prototype.resolve = function() {
+ if (this.resolved) throw new Error('Promise already resolved');
+
+ this.args = arguments;
+ this.resolved = true;
+
+ var callback;
+ while (callback = this.callbacks.shift()) {
+ callback.apply(this.context, this.args);
+ }
+
+ this.callbacks = null;
+};
88 lib/store.js
@@ -0,0 +1,88 @@
+var async = require('async');
+var mongo = require('mongodb');
+
+module.exports = Store;
+
+function Store(entities, cache, metacache) {
+ this.entities = entities;
+ this.cache = cache;
+ this.metacache = metacache;
+};
+
+Store.prototype.put = function(buffer, callback) {
+ this.entities.put(buffer, callback);
+};
+
+Store.prototype.raw = function(id, callback) {
+ this.entities.get(objectId(id), callback);
+};
+
+Store.prototype.get = function(id, processor, callback) {
+ var self = this;
+ var id = objectId(id);
+
+ // Optional `processor` argument
+ if (callback === undefined) {
+ callback = processor;
+ return this.raw(id, callback);
+ }
+
+ this.metacache.lookup(id, key(processor), function(err, cached) {
+ if (err) return callback(err);
+ if (cached) return self.cache.get(cached, callback);
+ self.process(id, processor, callback);
+ });
+};
+
+Store.prototype.process = function(id, processor, callback) {
+ var self = this;
+ var id = objectId(id);
+
+ var funcs = [
+ // Get raw entity
+ function(callback) {
+ self.raw(id, callback);
+ },
+
+ // Process the entity
+ function(buffer, callback) {
+ processor.process(buffer, callback);
+ },
+
+ // Cache the result
+ function(result, callback) {
+ self.cache.put(result, function(err, doc) {
+ if (err) return callback(err);
+ callback(null, result, doc._id);
+ });
+ },
+
+ // Update the metacache
+ function(result, cached, callback) {
+ self.metacache.store(id, key(processor), cached, function(err) {
+ if (err) return callback(err);
+ callback(null, result);
+ });
+ }
+ ];
+
+ async.waterfall(funcs, callback);
+};
+
+Store.prototype.clean = function(id, processor, callback) {
+ this.metacache.lookup(id, key(processor), function(err, cached) {
+
+ })
+};
+
+// Helpers
+// ---------------
+
+function objectId(id) {
+ if (typeof id === 'string') id = new mongo.ObjectID(id);
+ return id;
+};
+
+function key(processor) {
+ return (typeof processor.key === 'function') ? processor.key() : processor.key;
+};
22 package.json
@@ -0,0 +1,22 @@
+{
+ "name": "nettle",
+ "version": "0.0.0",
+ "description": "On-the-fly processing framework for Node.js and MongoDB",
+ "homepage": "http://github.com/scttnlsn/nettle",
+ "author": "Scott Nelson <scottbnel@gmail.com>",
+ "main": "./lib/index",
+
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/scttnlsn/nettle.git"
+ },
+
+ "dependencies": {
+ "async": ">= 0.1.15",
+ "mongodb": ">= 0.9.7"
+ },
+
+ "devDependencies": {
+ "mocha": "0.8.1"
+ }
+}
72 test/store_test.js
@@ -0,0 +1,72 @@
+var assert = require('assert');
+var async = require('async');
+var mongo = require('mongodb');
+var nettle = require('../lib/index');
+
+var store = nettle.store({
+ host: process.env.NETTLE_TEST_HOST || 'localhost',
+ port: process.env.NETTLE_TEST_PORT || 27017,
+ db: process.env.NETTLE_TEST_DB || 'nettle_tests'
+});
+
+describe('Store', function() {
+ var id;
+
+ beforeEach(function(done) {
+ store.put(new Buffer('foo'), function(err, entity) {
+ if (err) throw err;
+ id = entity._id;
+ done();
+ });
+ });
+
+ it('stores unprocessed entities', function(done) {
+ store.get(id, function(err, result) {
+ if (err) throw err;
+ assert.equal(result.toString(), 'foo');
+ done();
+ });
+ });
+
+ describe('when processing an entity', function() {
+ var count;
+ var processor = {
+ key: 'reversed',
+ process: function(buffer, callback) {
+ var result = buffer.toString().split('').reverse().join('');
+ count++;
+ callback(null, new Buffer(result));
+ }
+ };
+
+ beforeEach(function() {
+ count = 0;
+ });
+
+ it('stores the processed entity', function(done) {
+ store.get(id, processor, function(err, result) {
+ if (err) throw err;
+ assert.equal(result.toString(), 'oof');
+ done();
+ });
+ });
+
+ it('caches the processed entity', function(done) {
+ var get = function(callback) {
+ store.get(id, processor, callback);
+ };
+
+ count = 0;
+ async.series([get, get, get], function(err, results) {
+ if (err) throw err;
+
+ assert.equal(count, 1);
+ results.forEach(function(result) {
+ assert.equal(result.toString(), 'oof');
+ });
+
+ done();
+ });
+ });
+ });
+});
Please sign in to comment.
Something went wrong with that request. Please try again.