Skip to content

Commit

Permalink
Merge pull request #2114 from nightscout/wip/openaps-storage
Browse files Browse the repository at this point in the history
Support static OpenAPS report files in addition of mongo so NS can run on tiny rigs
  • Loading branch information
jasoncalabrese committed Oct 23, 2016
2 parents 831ed02 + c802172 commit c9b5e13
Show file tree
Hide file tree
Showing 23 changed files with 969 additions and 128 deletions.
10 changes: 5 additions & 5 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ var compression = require('compression');
var bodyParser = require('body-parser');

function create (env, ctx) {
///////////////////////////////////////////////////
// api and json object variables
///////////////////////////////////////////////////
var api = require('./lib/api/')(env, ctx);

var app = express();
var appInfo = env.name + ' ' + env.version;
app.set('title', appInfo);
Expand All @@ -20,6 +15,11 @@ function create (env, ctx) {
return app;
}

///////////////////////////////////////////////////
// api and json object variables
///////////////////////////////////////////////////
var api = require('./lib/api/')(env, ctx);

app.use(compression({filter: function shouldCompress(req, res) {
//TODO: return false here if we find a condition where we don't want to compress
// fallback to standard filter function
Expand Down
18 changes: 9 additions & 9 deletions env.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function config ( ) {
setSSL();
setAPISecret();
setVersion();
setMongo();
setStorage();
updateSettings();

return env;
Expand Down Expand Up @@ -90,12 +90,12 @@ function setVersion() {
env.name = software.name;
}

function setMongo() {
env.mongo = readENV('MONGO_CONNECTION') || readENV('MONGO') || readENV('MONGOLAB_URI') || readENV('MONGODB_URI');
env.mongo_collection = readENV('MONGO_COLLECTION', 'entries');
function setStorage() {
env.storageURI = readENV('STORAGE_URI') || readENV('MONGO_CONNECTION') || readENV('MONGO') || readENV('MONGOLAB_URI') || readENV('MONGODB_URI');
env.entries_collection = readENV('ENTRIES_COLLECTION') || readENV('MONGO_COLLECTION', 'entries');
env.MQTT_MONITOR = readENV('MQTT_MONITOR', null);
if (env.MQTT_MONITOR) {
var hostDbCollection = [env.mongo.split('mongodb://').pop().split('@').pop(), env.mongo_collection].join('/');
var hostDbCollection = [env.storageURI.split('mongodb://').pop().split('@').pop(), env.entries_collection].join('/');
var mongoHash = crypto.createHash('sha1');
mongoHash.update(hostDbCollection);
//some MQTT servers only allow the client id to be 23 chars
Expand All @@ -117,10 +117,10 @@ function setMongo() {
// Some people prefer to use a json configuration file instead.
// This allows a provided json config to override environment variables
var DB = require('./database_configuration.json'),
DB_URL = DB.url ? DB.url : env.mongo,
DB_COLLECTION = DB.collection ? DB.collection : env.mongo_collection;
env.mongo = DB_URL;
env.mongo_collection = DB_COLLECTION;
DB_URL = DB.url ? DB.url : env.storageURI,
DB_COLLECTION = DB.collection ? DB.collection : env.entries_collection;
env.storageURI = DB_URL;
env.entries_collection = DB_COLLECTION;
}

function updateSettings() {
Expand Down
31 changes: 22 additions & 9 deletions lib/bootevent.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,33 @@ function boot (env, language) {
return ctx.bootErrors && ctx.bootErrors.length > 0;
}

function setupMongo (ctx, next) {
function setupStorage (ctx, next) {

if (hasBootErrors(ctx)) {
return next();
}

try {
require('./storage')(env, function ready ( err, store ) {
// FIXME, error is always null, if there is an error, the storage.js will throw an exception
console.log('Storage system ready');
ctx.store = store;

next();
});
if (_.startsWith(env.storageURI, 'openaps://')) {
require('./storage/openaps-storage')(env, function ready (err, store) {
if (err) {
throw err;
}

ctx.store = store;
console.log('OpenAPS Storage system ready');
next();
});
} else {
//TODO assume mongo for now, when there are more storage options add a lookup
require('./storage/mongo-storage')(env, function ready(err, store) {
// FIXME, error is always null, if there is an error, the storage.js will throw an exception
console.log('Mongo Storage system ready');
ctx.store = store;

next();
});
}
} catch (err) {
console.info('mongo err', err);
ctx.bootErrors = ctx.bootErrors || [ ];
Expand Down Expand Up @@ -174,7 +187,7 @@ function boot (env, language) {

return require('bootevent')( )
.acquire(checkEnv)
.acquire(setupMongo)
.acquire(setupStorage)
.acquire(setupAuthorization)
.acquire(setupInternals)
.acquire(ensureIndexes)
Expand Down
2 changes: 1 addition & 1 deletion lib/data/dataloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ function loadEntries (ddata, ctx, callback) {

function mergeToTreatments (ddata, results) {
var filtered = _.filter(results, function hasId (treatment) {
return _.isObject(treatment._id);
return !_.isEmpty(treatment._id);
});

var treatments = _.map(filtered, function update (treatment) {
Expand Down
50 changes: 23 additions & 27 deletions lib/devicestatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,38 @@ function storage (collection, ctx) {
});
}

var with_collection = ctx.store.with_collection(collection);

function query_for (opts) {
return find_options(opts, storage.queryOpts);
}

function list(opts, fn) {
with_collection(function (err, collection) {
// these functions, find, sort, and limit, are used to
// dynamically configure the request, based on the options we've
// been given
// these functions, find, sort, and limit, are used to
// dynamically configure the request, based on the options we've
// been given

// determine sort options
function sort ( ) {
return opts && opts.sort || {created_at: -1};
}
// determine sort options
function sort ( ) {
return opts && opts.sort || {created_at: -1};
}

// configure the limit portion of the current query
function limit ( ) {
if (opts && opts.count) {
return this.limit(parseInt(opts.count));
}
return this;
// configure the limit portion of the current query
function limit ( ) {
if (opts && opts.count) {
return this.limit(parseInt(opts.count));
}
return this;
}

// handle all the results
function toArray (err, entries) {
fn(err, entries);
}
// handle all the results
function toArray (err, entries) {
fn(err, entries);
}

// now just stitch them all together
limit.call(collection
.find(query_for(opts))
.sort(sort( ))
).toArray(toArray);
});
// now just stitch them all together
limit.call(api( )
.find(query_for(opts))
.sort(sort( ))
).toArray(toArray);
}

function remove (_id, fn) {
Expand All @@ -74,7 +70,7 @@ function storage (collection, ctx) {
}

function api() {
return ctx.store.db.collection(collection);
return ctx.store.collection(collection);
}

api.list = list;
Expand Down
53 changes: 19 additions & 34 deletions lib/entries.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ function storage(env, ctx) {

// TODO: Code is a little redundant.

var with_collection = ctx.store.with_collection(env.mongo_collection);

// query for entries from storage
function list (opts, fn) {
with_collection(function (err, collection) {
// these functions, find, sort, and limit, are used to
// dynamically configure the request, based on the options we've
// been given
Expand All @@ -42,17 +39,14 @@ function storage(env, ctx) {
}

// now just stitch them all together
limit.call(collection
limit.call(api( )
.find(query_for(opts))
.sort(sort( ))
).toArray(toArray);
});
}

function remove (opts, fn) {
with_collection(function (err, collection) {
collection.remove(query_for(opts), fn);
});
api( ).remove(query_for(opts), fn);
}

// return writable stream to lint each sgv record passing through it
Expand Down Expand Up @@ -86,39 +80,30 @@ function storage(env, ctx) {

// store new documents using the storage mechanism
function create (docs, fn) {
with_collection(function(err, collection) {
if (err) { fn(err); return; }
// potentially a batch insert
var firstErr = null,
numDocs = docs.length,
totalCreated = 0;

docs.forEach(function(doc) {
var query = (doc.sysTime && doc.type) ? {sysTime: doc.sysTime, type: doc.type} : doc;
collection.update(query, doc, {upsert: true}, function (err) {
firstErr = firstErr || err;
if (++totalCreated === numDocs) {
//TODO: this is triggering a read from Mongo, we can do better
ctx.bus.emit('data-received');
fn(firstErr, docs);
}
});
// potentially a batch insert
var firstErr = null,
numDocs = docs.length,
totalCreated = 0;

docs.forEach(function(doc) {
var query = (doc.sysTime && doc.type) ? {sysTime: doc.sysTime, type: doc.type} : doc;
api( ).update(query, doc, {upsert: true}, function (err) {
firstErr = firstErr || err;
if (++totalCreated === numDocs) {
//TODO: this is triggering a read from Mongo, we can do better
ctx.bus.emit('data-received');
fn(firstErr, docs);
}
});
});
}

function getEntry(id, fn) {
with_collection(function(err, collection) {
api( ).findOne({_id: ObjectID(id)}, function (err, entry) {
if (err) {
fn(err);
} else {
collection.findOne({_id: ObjectID(id)}, function (err, entry) {
if (err) {
fn(err);
} else {
fn(null, entry);
}
});
fn(null, entry);
}
});
}
Expand All @@ -131,7 +116,7 @@ function storage(env, ctx) {
function api ( ) {
// obtain handle usable for querying the collection associated
// with these records
return ctx.store.db.collection(env.mongo_collection);
return ctx.store.collection(env.entries_collection);
}

// Expose all the useful functions
Expand Down
2 changes: 1 addition & 1 deletion lib/food.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function storage (env, ctx) {


function api ( ) {
return ctx.store.db.collection(env.food_collection);
return ctx.store.collection(env.food_collection);
}

api.list = list;
Expand Down
2 changes: 1 addition & 1 deletion lib/profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function storage (collection, ctx) {
}

function api () {
return ctx.store.db.collection(collection);
return ctx.store.collection(collection);
}

api.list = list;
Expand Down
17 changes: 2 additions & 15 deletions lib/storage.js → lib/storage/mongo-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function init (env, cb, forceNewConnection) {
cb(null, mongo);
}
} else {
if (!env.mongo) {
if (!env.storageURI) {
throw new Error('MongoDB connection string is missing');
}

Expand All @@ -28,7 +28,7 @@ function init (env, cb, forceNewConnection) {
var options = { replset: { socketOptions: { connectTimeoutMS : timeout, socketTimeoutMS : timeout }}};

var connect_with_retry = function(i) {
return MongoClient.connect(env.mongo, options, function connected(err, db) {
return MongoClient.connect(env.storageURI, options, function connected(err, db) {
if (err) {
if (i>20) {
// Abort after retrying for more than 10 minutes
Expand Down Expand Up @@ -56,19 +56,6 @@ function init (env, cb, forceNewConnection) {
return connection.collection(name);
};

mongo.with_collection = function with_collection (name) {
return function use_collection(fn) {
fn(null, connection.collection(name));
};
};

mongo.limit = function limit (opts) {
if (opts && opts.count) {
return this.limit(parseInt(opts.count));
}
return this;
};

mongo.ensureIndexes = function ensureIndexes (collection, fields) {
fields.forEach(function (field) {
console.info('ensuring index for: ' + field);
Expand Down

0 comments on commit c9b5e13

Please sign in to comment.