Skip to content

Commit

Permalink
#223 progress with result caching.
Browse files Browse the repository at this point in the history
  • Loading branch information
itayw committed Feb 13, 2014
1 parent 5ba9e17 commit 9b086a9
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 59 deletions.
135 changes: 83 additions & 52 deletions lib/dispatch/query.js
Expand Up @@ -332,6 +332,11 @@ manager.parse = function (context, options, callback) {
if (!query.timeframe.start || !query.timeframe.end)
return callback(new Error('Failed to translate timeframe provided into proper timeframe object'));
}
else
query.timframe = {
start: new Date(1970, 1, 1),
end: new Date(2099, 1, 1)
};

query.interval = manager.translateInterval(query.interval);

Expand Down Expand Up @@ -729,10 +734,9 @@ manager.buildQueryPlan = function (query, callback) {
});
}
colQuery.key = joola.common.hash(colQuery.type + '_' + metric.collection.id + '_' + JSON.stringify(_$match));

if (colQuery.type == 'plain') {
if (plan.colQueries[colQuery.key]) {
_$group = joola.common.extend({}, plan.colQueries[colQuery.key].query[1].$group);
_$group = joola.common.extend({}, plan.colQueries[colQuery.key].query[2].$group);
}

_$group[metric.key] = {};
Expand All @@ -747,8 +751,6 @@ manager.buildQueryPlan = function (query, callback) {
{$group: _$group}

];
console.log('$match', _$match);
console.log('g', _$group);
}
else {
var _$unwind, _$group2;
Expand All @@ -766,17 +768,31 @@ manager.buildQueryPlan = function (query, callback) {
{$group: _$group2}

];

//console.log('$match', _$match);
//console.log('g', _$group);
//console.log('u', _$unwind);
//console.log('g2', _$group2);
}

plan.colQueries[colQuery.key] = colQuery;
}
});

if (plan.colQueries && Object.keys(plan.colQueries).length > 0) {
Object.keys(plan.colQueries).forEach(function (key) {
var colQuery = plan.colQueries[key];
if (colQuery.query.length > 0) {
if (colQuery.query.length === 3) {
console.log('$match', colQuery.query[0]);
console.log('$sort', colQuery.query[1]);
console.log('$group', colQuery.query[2]);
}
else {
console.log('$match', colQuery.query[0]);
console.log('$sort', colQuery.query[1]);
console.log('$group', colQuery.query[2]);
console.log('$unwind', colQuery.query[3]);
console.log('$group2', colQuery.query[4]);
}
}
});
}

plan.dimensions = query.dimensions;
plan.metrics = query.metrics;

Expand Down Expand Up @@ -805,7 +821,6 @@ manager.executePlan = function (context, queryplan, callback) {

return async.map(arrCols, function iterator(_query, next) {
mongo.aggregate('cache', context.user.organization + '_' + _query.collection, _query.query, {}, function (err, results) {
//console.log(results);
if (err)
return next(err);

Expand Down Expand Up @@ -987,10 +1002,8 @@ manager.formatResults = function (results, callback) {
}

if (metric.hasOwnProperty('decimals')) {
if (document.values[metric.key]) {
document.values[metric.key] = parseFloat(document.values[metric.key].toFixed(metric.decimals));
document.fvalues[metric.key] = document.values[metric.key].toString();
}
if (document.values[metric.key])
document.fvalues[metric.key] = document.values[metric.key].toFixed(metric.decimals);
}
else {
try {
Expand Down Expand Up @@ -1166,6 +1179,8 @@ manager.formatResults = function (results, callback) {
manager.loadFromCache = function (context, query, callback) {
var results;

return callback(null);

var _query = ce.clone(query);
delete _query.timeframe;
delete _query.realtime;
Expand Down Expand Up @@ -1229,36 +1244,51 @@ manager.loadFromCache = function (context, query, callback) {
}
};

manager.saveToCache = function (context, cachedResults, cachedDetails, results, callback) {
var _query = ce.clone(results.queryplan.query);
delete _query.ts;
delete _query.uid;
delete _query.realtime;
delete _query.timeframe;
var cachedKey;
manager.saveToCache = function (context, cachedMeta, cachedResults, results, timeframe, callback) {
try {
cachedKey = joola.common.hash(JSON.stringify(_query));
_query.hash = cachedKey;
_query._key = joola.common.uuid();
_query.timestamp = new Date();
_query.timeframe = results.queryplan.query.timeframe;
var meta = ce.clone(results.queryplan.query);
delete meta.ts;
delete meta.uid;
delete meta.realtime;
delete meta.timeframe;
var cachedKey;

cachedKey = joola.common.hash(JSON.stringify(meta));
joola.logger.trace('Storing cached result with cachedKey [' + cachedKey + ']');

_query.results = ce.clone(results);
/*
Object.keys(_query.results.queryplan).forEach(function (key) {
var elem = _query.results.queryplan[key];
if (key != 'query')
delete elem[key];
_query.results.queryplan[key] = elem;
});*/
_query.results.queryplan = {};
meta.hash = cachedKey;
meta._key = joola.common.uuid();
meta.timestamp = new Date();
meta.timeframe = timeframe;

joola.logger.trace('Storing cached result with cachedKey [' + cachedKey + ']');
joola.mongo.insert('results', context.user.organization, _query, {}, function (err) {
if (cachedMeta) {
meta.extension = true;
meta.resultCount = cachedMeta.resultCount + results.documents.length;
}
else
meta.resultCount = results.documents.length;

joola.mongo.insert('results', 'exchange', meta, {}, function (err) {
if (err)
return callback(err);
return callback(null, cachedKey);
if (results.documents.length > 0) {
//if (meta.dimensions.length == 0) {
results.documents.forEach(function (document) {
if (document._id)
document._id = {fakeid: 1};
});
//}
joola.mongo.insert('results', meta._key, results.documents, {}, function (err) {
if (err)
return callback(err);

return callback(null, cachedKey);
});
}
else
return callback(null, cachedKey);
});

}
catch (ex) {
joola.logger.warn('Failed to save result into cache: ' + ex);
Expand All @@ -1268,7 +1298,7 @@ manager.saveToCache = function (context, cachedResults, cachedDetails, results,

manager.sendResults = function (results, callback) {
return callback(null, results);
}
};

exports.fetch = {
name: "/api/query/fetch",
Expand Down Expand Up @@ -1377,25 +1407,20 @@ exports.fetch = {
if (err)
return callback(err);

console.log('tf', query.timeframe.end);

manager.applyFilters(query, function (err, query) {
if (err)
return callback(err);

manager.loadFromCache(context, query, function (err, cachedResults, cachedDetails) {
//if (err)
// return callback(err);

manager.loadFromCache(context, query, function (err, cachedResults, cachedMeta) {
if (cachedResults) {
if (cacheDetails.timeframe.end < query.timeframe.end) {
joola.logger.trace('Partial cached result found, completing the diff [' + cachedDetails.timeframe.end - query.timeframe.start + '].');
if (cachedMeta.timeframe.end < query.timeframe.end) {
joola.logger.trace('Partial cached result found, completing the diff [' + cachedMeta.timeframe.end - query.timeframe.start + '].');
query.timeframe.start = cachedDetails.timeframe.end;
query.timeframe.start.setMilliseconds(query.timeframe.start.getMilliseconds() + 1);
}
else if (cachedDetails.timeframe.end = query.timeframe.end) {
else if (cachedMeta.timeframe.end = query.timeframe.end) {
joola.logger.trace('Cached result found, no need for further process.');
return manager.formatResults(cachedDetails, callback);
return manager.formatResults(cachedMeta, callback);
//return callback(null, cachedResults);
}
else {
Expand All @@ -1413,7 +1438,11 @@ exports.fetch = {

if (cachedResults && results) {
//we have both cachedResults and results, we need to squash together and re-crunch the aggregations.
manager.saveToCache(context, cachedResults, cachedDetails, results, function (err) {
var timeframe = {
start: cachedMeta.timeframe.start,
end: results.queryplan.query.timeframe.end
};
manager.saveToCache(context, cachedMeta, cachedResults, results, timeframe, function (err) {
return manager.formatResults(results, callback);
});
}
Expand All @@ -1423,7 +1452,9 @@ exports.fetch = {
}
else {
//we've got only fresh results.
return manager.formatResults(results, callback);
manager.saveToCache(context, null, null, results, results.queryplan.query.timeframe, function (err) {
return manager.formatResults(results, callback);
});
/*manager.formatResults(results, function (err, results) {
if (err)
return callback(err);
Expand Down
10 changes: 7 additions & 3 deletions test/unit/1_runtime/webserver.spec.js
Expand Up @@ -68,10 +68,14 @@ describe("webserver", function () {
});

it("should have WebSocket", function (done) {
var called = false;
var io = require('socket.io-client');
var socket = io.connect('http://' + joola.config.interfaces.webserver.host + ':' + joola.config.interfaces.webserver.port);
socket.on('connect', function () {
done();
if (!called) {
called = true;
done();
}
});
});

Expand All @@ -92,12 +96,12 @@ describe("webserver", function () {
io.socket.once('/organizations/list:done', function (_message) {
done();
});
var options =
var options =
{
APIToken: '12345',
_path: '/organizations/list'
}
;
;
io.socket.emit('/organizations/list', options);
});

Expand Down
8 changes: 4 additions & 4 deletions test/unit/6_query/query-basic.spec.js
Expand Up @@ -22,7 +22,7 @@ describe("query-basic", function () {
process.on("uncaughtException", uncaughtExceptionHandler);
});

xit("should not fail performing a query with no arguments", function (done) {
it("should not fail performing a query with no arguments", function (done) {
var query = {};
var expected = 0;

Expand Down Expand Up @@ -60,7 +60,7 @@ describe("query-basic", function () {
return done();
});
});

it("should perform a basic query", function (done) {
var query = {
timeframe: 'this_day',
Expand Down Expand Up @@ -483,6 +483,6 @@ describe("query-basic", function () {
return done();
});
});


});

0 comments on commit 9b086a9

Please sign in to comment.