Skip to content

Commit

Permalink
Merge e469c09 into 80624a9
Browse files Browse the repository at this point in the history
  • Loading branch information
itayw committed Apr 16, 2014
2 parents 80624a9 + e469c09 commit ef771f1
Show file tree
Hide file tree
Showing 15 changed files with 1,077 additions and 355 deletions.
28 changes: 18 additions & 10 deletions benchmark/base.bench.js
@@ -1,29 +1,37 @@
var common = require('../lib/common/index');
var benchrest = require('bench-rest');
var
util = require('util'),
benchrest = require('bench-rest'),

common = require('../lib/common/index');

var flows = [
'./flows/beacon.spec.js',
//'./flows/beacon.large.spec.js',
'./flows/metadata.spec.js'
];

global.JOOLA_ADDRESS = 'http://127.0.0.1:8080';
const VERSION = '0.0.1';

var results = {};
results.benchmarkID = common.uuid();
results.timestamp = new Date();
results.flows = {};
results.flows =[];
results.flowCount = flows.length;

var actual = 0;

var joolaio = require('joola.io.sdk');
joolaio.init({host: 'http://localhost:8080', APIToken: 'apitoken-root', ajax: true}, function (err) {
joolaio.init({host: JOOLA_ADDRESS, APIToken: 'apitoken-root', ajax: true}, function (err) {
if (err)
throw err;

joolaio.system.nodeDetails(function (err, details) {
if (err)
throw err;

results.nodeDetails = details;

results.version = VERSION;
flows.forEach(function (flow) {
console.log('Running Flow', flow);
var flowModule = require(flow);
Expand All @@ -36,12 +44,12 @@ joolaio.init({host: 'http://localhost:8080', APIToken: 'apitoken-root', ajax: tr
console.error('Failed in %s with err: ', ctxName, err);
})
.on('progress', function (stats, percent, concurrent, ips) {
console.log('Progress: %s complete', percent);
console.log('Progress: %s complete', percent, concurrent, ips);
})
.on('end', function (stats, errorCount) {
console.log(stats);
//console.log(stats);
stats.name = flowModule.name;
results.flows[flowModule.name] = stats;
results.flows.push(stats);

if (++actual === results.flowCount)
return alldone();
Expand All @@ -51,11 +59,11 @@ joolaio.init({host: 'http://localhost:8080', APIToken: 'apitoken-root', ajax: tr
});

function alldone() {
console.log(results);
console.log(util.inspect(results, {depth: null, colors: true}));
joolaio.beacon.insert('benchmark', results, function (err) {
if (err)
throw err;
process.exit(0);
});
}
});
});
4 changes: 2 additions & 2 deletions benchmark/flows/beacon.large.spec.js
@@ -1,9 +1,9 @@
module.exports = {
name: 'beacon.large',
name: 'beacon_large',
runOptions: {
limit: 1, // concurrent connections
iterations: 1, // number of iterations to perform
progress: 1000
progress: 5000
//prealloc: 10
},
flow: {
Expand Down
8 changes: 4 additions & 4 deletions benchmark/flows/beacon.spec.js
@@ -1,10 +1,10 @@
module.exports = {
name: 'beacon',
runOptions: {
limit: 1, // concurrent connections
iterations: 10, // number of iterations to perform
progress: 1000
//prealloc: 10
limit: 10, // concurrent connections
iterations: 100, // number of iterations to perform
progress: 5000
//prealloc: 10
},
flow: {
before: [], // operations to do before anything
Expand Down
8 changes: 4 additions & 4 deletions benchmark/flows/metadata.spec.js
@@ -1,10 +1,10 @@
module.exports = {
name: 'metadata',
runOptions: {
limit: 1, // concurrent connections
iterations: 10, // number of iterations to perform
progress: 1000
//prealloc: 10
limit: 10, // concurrent connections
iterations: 100, // number of iterations to perform
progress: 5000
//prealloc: 10
},
flow: {
before: [], // operations to do before anything
Expand Down
12 changes: 8 additions & 4 deletions lib/dispatch/index.js
Expand Up @@ -338,8 +338,7 @@ dispatch.processRequest = function (message, headers) {

var d = domain.create();
d.on('error', function (err) {
joola.logger.debug('Failed to process dispatch request: ' + err);
console.log('err', err);
joola.logger.warn(err, 'Failed to process dispatch request: ' + err);
_result = {
err: err,
message: null
Expand Down Expand Up @@ -377,8 +376,13 @@ dispatch.processResponse = function (message, headers) {
args.push(result.message[key]);
});
}
else
else if (joola.common.typeof(result.err) === 'object') {
joola.logger.debug(result.err, 'Failed to process dispatch response: ' + result.err);
args.push(result.err);
}
else {
console.log('Weird result', message);
}

delete message.result;
delete message.payload;
Expand All @@ -387,7 +391,7 @@ dispatch.processResponse = function (message, headers) {
if (listener[message.id]) {
var d = domain.create();
d.on('error', function (err) {
joola.logger.debug('Failed to process dispatch response: ' + err);
joola.logger.warn(err, 'Failed to process dispatch response: ' + err);
//TODO: The assumption here is that the same function that was called and failed will handle the error properly. this is a risky assumption, but it allows for proper error messages to be handled.
try {
listener[message.id].apply(listener, [err]);
Expand Down
158 changes: 89 additions & 69 deletions lib/dispatch/query.js
Expand Up @@ -273,31 +273,35 @@ manager.parse = function (context, options, callback) {
if (typeof query.timeframe === 'string')
query.timeframe = manager.translateTimeframe(query.timeframe, query.interval);

if (query.timeframe_force_end)
query.timeframe.end = new Date(query.timeframe_force_end);
if (query.timeframe && !query.timeframe.hasOwnProperty('last_n_items')) {
if (query.timeframe_force_end)
query.timeframe.end = new Date(query.timeframe_force_end);

if (query.timeframe_force_start) {
query.timeframe.start = new Date(query.timeframe_force_start);
}
if (query.timeframe_force_start) {
query.timeframe.start = new Date(query.timeframe_force_start);
}

if (!query.timeframe.start || !query.timeframe.end)
return setImmediate(function () {
return callback(new Error('Failed to translate timeframe provided into proper timeframe object'));
});
if (!query.timeframe.start || !query.timeframe.end)
return setImmediate(function () {
return callback(new Error('Failed to translate timeframe provided into proper timeframe object'));
});

if (typeof query.timeframe.start === 'string')
query.timeframe.start = new Date(query.timeframe.start);
if (typeof query.timeframe.end === 'string')
query.timeframe.end = new Date(query.timeframe.end);
}
else
query.timframe = {
start: new Date(1970, 1, 1),
end: new Date(2099, 1, 1)
};
if (typeof query.timeframe.start === 'string')
query.timeframe.start = new Date(query.timeframe.start);
if (typeof query.timeframe.end === 'string')
query.timeframe.end = new Date(query.timeframe.end);

query.timeframe.start.setMilliseconds(0);
query.timeframe.end.setMilliseconds(999);
query.timeframe.start.setMilliseconds(0);
query.timeframe.end.setMilliseconds(999);
}
}
/*
else
query.timframe = {
start: new Date(1970, 1, 1),
end: new Date(2099, 1, 1)
};
*/

query._interval = query.interval;
query.interval = manager.translateInterval(query.interval);
Expand Down Expand Up @@ -562,6 +566,13 @@ manager.translateTimeframe = function (timeframe, interval) {
end: new Date(_enddate)
};
}

m = /last_(\d+)_items/.exec(timeframe);
if (m && m.length > 0) {
return {
last_n_items: parseInt(m[1], 10)
};
}
};

manager.translateInterval = function (interval) {
Expand Down Expand Up @@ -593,19 +604,23 @@ manager.buildQueryPlan = function (query, callback) {
var $match = {};
var $project = {};
var $group = {};
var $limit;

if (!query.dimensions)
query.dimensions = [];
if (!query.metrics)
query.metrics = [];

if (query.timeframe) {
if (query.timeframe && !query.timeframe.hasOwnProperty('last_n_items')) {
if (typeof query.timeframe.start === 'string')
query.timeframe.start = new Date(query.timeframe.start);
if (typeof query.timeframe.end === 'string')
query.timeframe.end = new Date(query.timeframe.end);
$match.timestamp = {$gte: query.timeframe.start, $lt: query.timeframe.end};
}
else if (query.timeframe && query.timeframe.hasOwnProperty('last_n_items')) {
$limit = {$limit: query.timeframe.last_n_items};
}

if (query.filter) {
query.filter.forEach(function (f) {
Expand Down Expand Up @@ -710,6 +725,10 @@ manager.buildQueryPlan = function (query, callback) {
{$group: _$group}
];
}

if ($limit) {
colQuery.query.push($limit);
}
}
else {
var _$group2;
Expand All @@ -732,30 +751,8 @@ manager.buildQueryPlan = function (query, callback) {
}
});

if (false && plan.colQueries && Object.keys(plan.colQueries).length > 0) {
Object.keys(plan.colQueries).forEach(function (key) {
var colQuery = plan.colQueries[key];
if (colQuery.query.length > 0) {
if (colQuery.query.length === 4) {
var _$match = ce.clone(colQuery.query[0].$match);
if (_$match.timestamp) {
_$match.timestamp.$gte = _$match.timestamp.$gte.toISOString();
_$match.timestamp.$lt = _$match.timestamp.$lt.toISOString();
}
console.log(new Date().toISOString(), '$match', _$match);
console.log('$unwind', colQuery.query[1]);
console.log('$sort', colQuery.query[2]);
console.log('$group', colQuery.query[3]);
}
else {
console.log('$match', colQuery.query[0]);
console.log('$sort', colQuery.query[1]);
console.log('$group', colQuery.query[2]);
console.log('$unwind', colQuery.query[3]);
console.log('$group2', colQuery.query[4]);
}
}
});
if (true) {
console.log('plan', require('util').inspect(plan.colQueries, {depth: null, colors: true}));
}

plan.dimensions = query.dimensions;
Expand Down Expand Up @@ -1088,18 +1085,35 @@ manager.formatResults = function (results, callback) {
}
results.documents = _documents;
}
else if (!timestampDimension && query.timeframe.hasOwnProperty('last_n_items')) {
//console.log('qq', query);
_documents = results.documents;
if (!itr && query.timeframe && query.timeframe.hasOwnProperty('last_n_items')) {
//we need to fill a simple integer range.
/*
for (var i = results.documents.length; i < query.timeframe.last_n_items; i++) {
_templateItem = joola.common.extend({}, templateItem);
if (query.dimensions && query.dimensions.length > 0) {
_templateItem.values[query.dimensions[0].key] = null;
_templateItem.fvalues[query.dimensions[0].key] = null;
}
_documents.unshift(ce.clone(_templateItem));
}*/
}
}
}
else if (results.documents.length === 0 && timestampDimension) {
interval = query.interval.replace('timebucket.', '');
if (interval === 'ddate')
interval = 'day';
if (!query.timeframe) {
query.timeframe = {};
query.timeframe.start = results.documents[results.documents.length - 1].values.timestamp;
query.timeframe.end = results.documents[0].values.timestamp;
if (results.documents.length > 0) {
query.timeframe.start = results.documents[results.documents.length - 1].values.timestamp;
query.timeframe.end = results.documents[0].values.timestamp;
}
}

itr = moment.twix(query.timeframe.start, query.timeframe.end).iterate(interval);
_documents = [];
if (results.documents.length > 0) {
Object.keys(results.documents[0].values).forEach(function (key) {
Expand All @@ -1120,24 +1134,28 @@ manager.formatResults = function (results, callback) {
});
}

while (itr.hasNext()) {
_d = new Date(itr.next()._d.getTime());
if (interval === 'day')
exists = checkExists(timestampDimension, results.documents, _d, true);
else
exists = checkExists(timestampDimension, results.documents, _d);
/*
exists = _.find(results.documents, function (document) {
return document.values[timestampDimension.key].getTime() === _d.getTime();
});*/

if (exists)
_documents.push(exists);
else {
_templateItem = joola.common.extend({}, templateItem);
_templateItem.values[timestampDimension.key] = new Date(_d);
_templateItem.fvalues[timestampDimension.key] = new Date(_d.getTime());
_documents.push(ce.clone(_templateItem));
if (query.timeframe && query.timeframe.start && query.timeframe.end) {
itr = moment.twix(query.timeframe.start, query.timeframe.end).iterate(interval);

while (itr.hasNext()) {
_d = new Date(itr.next()._d.getTime());
if (interval === 'day')
exists = checkExists(timestampDimension, results.documents, _d, true);
else
exists = checkExists(timestampDimension, results.documents, _d);
/*
exists = _.find(results.documents, function (document) {
return document.values[timestampDimension.key].getTime() === _d.getTime();
});*/

if (exists)
_documents.push(exists);
else {
_templateItem = joola.common.extend({}, templateItem);
_templateItem.values[timestampDimension.key] = new Date(_d);
_templateItem.fvalues[timestampDimension.key] = new Date(_d.getTime());
_documents.push(ce.clone(_templateItem));
}
}
}
results.documents = _documents;
Expand Down Expand Up @@ -1175,7 +1193,7 @@ manager.formatResults = function (results, callback) {
results.cost = results.queryplan.query.cost;
results.resultCount = results.documents.length;

if (results.queryplan.query.timeframe) {
if (results.queryplan.query.timeframe && results.queryplan.query.timeframe.start && results.queryplan.query.timeframe.end) {
results.queryplan.query.timeframe.start = new Date(results.queryplan.query.timeframe.start).toISOString();
results.queryplan.query.timeframe.end = new Date(results.queryplan.query.timeframe.end).toISOString();
}
Expand Down Expand Up @@ -1311,7 +1329,9 @@ exports.fetch = {
else
return router.responseError(new router.ErrorTemplate('Failed to route action [' + 'fetch' + ']: ' + (typeof(err) === 'object' ? err.message : err)), req, res);

lastQueryEndDate = new Date(result.query.timeframe.end);
if (result.query && result.query.timeframe && result.query.timeframe.end) {
lastQueryEndDate = new Date(result.query.timeframe.end);
}
timestampDimension = _.find(result.dimensions, function (d) {
return d.datatype == 'date';
});
Expand Down

0 comments on commit ef771f1

Please sign in to comment.