Permalink
Browse files

Added mbox queue, to better schedule queries under load

* the Broker lets you defer tasks into the queue under a chosen mailbox name.
* the Broker has Workers that handle deferred jobs. Workers each handle a non-overlapping set of mailboxes.
* when a worker is ready, it fires off the task with lowest mbox name, attaching its own callback. The task is responsible for pinging back the supplied proxy callback
* The proxy callback will re-issue the call to all the registered on_complete callbacks.

Unfortunately, this doesn't yet solve the problem -- but it's a spike in the ground
  • Loading branch information...
1 parent b51881a commit 84c47e4b29a93a805395d31071fd037bbb82608c Philip (flip) Kromer committed Sep 5, 2012
Showing with 313 additions and 13 deletions.
  1. +1 −0 .gitignore
  2. +159 −0 lib/cube/broker.js
  3. +6 −3 lib/cube/metalog.js
  4. +136 −0 test/broker-test.js
  5. +11 −10 test/test_helper.js
View
1 .gitignore
@@ -1 +1,2 @@
node_modules
+away/*
View
159 lib/cube/broker.js
@@ -0,0 +1,159 @@
+'use strict';
+
+var _ = require('underscore'),
+ util = require('util'),
+ metalog = require('./metalog');
+
+// milliseconds to sleep, if all jobs are satisfied, until checking again
+var worker_sleep_ms = 100;
+
+// ---- Broker ----
+
+function Broker(name, interval){
+ var workers = [];
+ add_worker();
+ this.name = name;
+
+ // `deferProxy(name, perform, *args, on_complete)` -- Issue a request with
+ // controlled concurrency; send its results to all interested listeners. We
+ // use it so that when multiple clients are interested in a metric, we only
+ // issue the query once, yet share the good news immediately.
+ //
+ // Worker will at some time invoke perform with the supplied args, tacking on
+ // its own callback (`perform(*args, worker_cb)`, essentially). The task must,
+ // success or failure, eventually invoke the proxy callback.
+ //
+ // When the task triggers the proxy callback with the result, every
+ // `on_complete` handler waiting on that name is invoked with that result.
+ // For the second and further tasks `defer`ing to a given mbox, the `perform`
+ // function is ignored -- tasks with the same name must be interchangeable.
+ //
+ // @param [String] name -- handle for the query
+ // @param [Function] perform(*args) -- function for worker to dispatch
+ // @param [Array<Object>] *args -- args sent to `perform` when dispatched
+ // @param [Function] complete_cb -- called when the task completes
+ //
+ this.deferProxy = function deferProxy(){ //
+ var args = _.toArray(arguments),
+ name = args.shift(), perform = args.shift(), on_complete = args.pop();
+ if (! (name && perform && on_complete)) throw new TypeError('you must supply a name, perform callback and on-complete handler: got ' + [name, perform, on_complete]);
+ worker_for(name).add(name, perform, args, on_complete);
+ return this;
+ };
+
+ function worker_for(name){
+ return workers[0];
+ }
+
+ function add_worker(){
+ var worker = new Worker((name+'-'+workers.length), interval);
+ workers.push(worker);
+ worker.start();
+ return worker;
+ }
+
+ function stop(){ _.each(workers, function(worker){ worker.stop() }) };
+
+ function report(){ return { workers: _.map(workers, function(worker){ return worker.report() }) }; }
+ function toString(){ return util.inspect(this.report()); };
+
+ Object.defineProperties(this, {
+ stop: {value: stop},
+ report: {value: report}, toString: {value: toString},
+ });
+}
+
+// ---- Worker ----
+
+// A worker executes a set of tasks with parallelism 1
+function Worker(qname, interval){
+ var queue = Object.create(null),
+ active = null,
+ self = this,
+ clock;
+ this.qname = qname;
+
+ function add(mbox, perform, args, on_complete) {
+ var job = ((active && (active.mbox === mbox)) ? active : queue[mbox]);
+ if (! job){ job = queue[mbox] = new Job(mbox, perform, args); }
+ //
+ job.listen(on_complete);
+ metalog.minor('q_worker', {is: 'add', mbox: job.mbox, am: self.report(), job: job.report() });
+ return job;
+ }
+
+ function invoke(job) {
+ // move this job to be active
+ active = job;
+ delete queue[job.mbox];
+ // add our callback as last arg. when triggered, it takes the arguments it
+ // was triggered with and has job fire that at all its `on_complete`s, and
+ // clears the active job (letting the next task start).
+ job.args.push(function _completed(){
+ var result = arguments;
+ metalog.warn('q_worker', {is: '<-!', mbox: job.mbox, am: self.report(), result: util.inspect(result).slice(0,80) });
+ if (job !== active) metalog.warn('q_worker', {is: 'ERR', am: qname, error: 'job was missing when callback triggered', self: self.report() });
+ active = null;
+ job.complete(result);
+ });
+ // start the task
+ metalog.warn('q_worker', {is: '?->', mbox: job.mbox, am: self.report(), perform_args: util.inspect(job.args).slice(0,80) });
+ try{ job.perform.apply(null, job.args); } catch(err){ metalog.warn('q_worker', {is: 'ERR', am: qname, as: 'performing job '+job.mbox, error: err.message }) };
+ }
+
+ function start(){
+ if (clock){ return metalog.warn('q_worker', {is: 'ERR', am: self.report(), error: 'tried to start an already-running worker' }); }
+ clock = setInterval(self.work, interval);
+ }
+ function stop(){
+ metalog.info('q_worker', {is: 'stp'});
+ if (! clock){ return metalog.warn('q_worker', {is: 'ERR', am: self.report(), error: 'tried to stop an already-stopped worker' }); }
+ clearInterval(clock);
+ clock = null;
+ }
+
+ function work (){
+ var job;
+ if (active) { self.onWait(); }
+ else if (job = next()){ self.invoke(job); }
+ else { self.onIdle(); }
+ };
+
+ function size(){ return _.size(queue); }
+ function next(){ return queue[ _.keys(queue).sort()[0] ]; }
+
+ // function onIdle(){ util.print(' '+self.qname+'!'); };
+ // function onWait(){ util.print(' '+self.qname+'@'); };
+ function onIdle(){ };
+ function onWait(){ };
+
+ function report(){ return { qname: this.qname, size: this.size, queue: _.keys(this.queue) }; };
+ function toString(){ return util.inspect(this.report()); };
+
+ Object.defineProperties(this, {
+ add: {value: add}, size: {get: size},
+ start: {value: start}, stop: {value: stop},
+ report: {value: report}, toString: {value: toString},
+ onWait: {value: onWait}, onIdle: {value: onIdle}
+ });
+}
+
+// ---- Job ----
+
+function Job(mbox, perform, args){
+ _.extend(this, { mbox: mbox, perform: perform, args: args, on_completes: [] });
+}
+Job.prototype.complete = function(result){
+ for (var ii in this.on_completes){
+ process.nextTick(function(){ this.on_completes[ii].apply(null, result); });
+ };
+}
+Job.prototype.listen = function(on_complete){
+ this.on_completes.push(on_complete);
+};
+Job.prototype.toString = function (){ return util.inspect(this.report()); };
+Job.prototype.report = function (){ return this; }
+
+// ----
+
+module.exports = { Broker: Broker, Job: Job, Worker: Worker };
View
9 lib/cube/metalog.js
@@ -1,6 +1,7 @@
'use strict';
-var util = require("util");
+var util = require("util"),
+ _ = require("underscore");
var metalog = {
putter: null,
@@ -31,7 +32,7 @@ metalog.event = function(name, hsh, logger){
// Always goes thru to metalog.log
metalog.warn = function(name, hsh){
- metalog.log(name + "\t" + JSON.stringify(hsh));
+ metalog.log(name + "\t" + (+Date.now()) + "\t" + JSON.stringify(hsh));
};
// Events important enough for the production log file. Does not cubify.
@@ -48,7 +49,9 @@ metalog.minor = function(name, hsh){
metalog.inspectify = function inspectify(args){
for (var idx in arguments) {
util.print(idx + ": ");
- util.print(util.inspect(arguments[idx])+"\n");
+ var val = arguments[idx];
+ if (_.isFunction(val)) val = val.toString().slice(0, 80);
+ util.print(util.inspect(val, false, 2, true)+"\n"); // , null, true
}
util.print('----\n');
};
View
136 test/broker-test.js
@@ -0,0 +1,136 @@
+'use strict';
+
+var _ = require('underscore'),
+ util = require("util"),
+ vows = require("vows"),
+ assert = require("assert"),
+ test_helper = require("./test_helper"),
+ broker = require("../lib/cube/broker"),
+ Job = broker.Job, Broker = broker.Broker, Worker = broker.Worker,
+ metalog = require('../lib/cube/metalog');
+
+var suite = vows.describe("broker");
+
+var squarer = function(ii, cb){ cb(null, ii*ii, 'squarer'); };
+
+assert.isCalledTimes = function(ctxt, reps){
+ var results = [];
+ setTimeout(function(){ ctxt.callback(new Error('timeout: need '+reps+' results only have '+util.inspect(results))); }, 2000);
+ return function _is_called_checker(){
+ results.push(_.toArray(arguments));
+ if (results.length >= reps) ctxt.callback(null, results);
+ };
+}
+assert.isNotCalled = function(name){
+ return function(){ throw new Error(name + ' should not have been called, but was'); };
+};
+
+function example_worker(paused){
+ var worker = new Worker('worker', 50);
+ // worker.idle = _.identity;
+ if (! paused) worker.start();
+ return worker;
+}
+
+
+function example_job(){ return (new Job('smurf', squarer, [7])); };
+
+suite.addBatch({
+ // 'Worker': {
+ // topic: example_worker,
+ // // '.new': {
+ // // '': function(worker){
+ // // test_helper.inspectify('new worker', worker, worker);
+ // // worker.add('smurfette', squarer, [3], metalog.inspectify);
+ // // },
+ // // },
+ // '.invoke': {
+ // topic: function(worker){
+ // var ctxt = this;
+ // ctxt.checker = assert.isCalledTimes(ctxt, 3);
+ // ctxt.performances = 0;
+ // // shortly, worker will invoke perfom (once). 200 ms later, `perform`
+ // // will call `worker`'s proxy callback, which invokes all 3 callbacks.
+ // var perform = function(cb){ ctxt.performances++; setTimeout(function(){cb('hi')}, 200); };
+ // worker.add('thrice', perform, [], ctxt.checker);
+ // worker.add('thrice', assert.isNotCalled('perform'), [], ctxt.checker);
+ // worker.add('thrice', assert.isNotCalled('perform'), [], ctxt.checker);
+ // this.worker = worker;
+ // },
+ // 'calls perform exactly once': function(){ assert.equal(this.performances, 1); },
+ // 'calls all registered callbacks': function(results){ assert.deepEqual(results, [['hi'], ['hi'], ['hi']]) },
+ // teardown: function(){
+ // this.worker.stop();
+ // }
+ // }
+ // },
+
+ // 'Job': {
+ // '.new': {
+ // topic: example_job,
+ // '': function(job){
+ // assert.deepEqual(job, {name: 'smurf', perform: squarer, args: [7], on_completes: []});
+ // },
+ // },
+ // '.listen': {
+ // topic: function(){
+ // var job = example_job();
+ // job.listen(squarer);
+ // return job;
+ // },
+ // '': function(job){
+ // test_helper.inspectify(job, job.toString());
+ // }
+ // },
+ // // '.add': {
+ // // topic: example_job,
+ // // '': function(job){
+ // // }
+ // // },
+ // },
+
+ 'Broker': {
+ 'handles interleaved jobs': {
+ topic: function(){
+ var ctxt = this,
+ broker = this.broker = new Broker('test', 10),
+ ignored = assert.isNotCalled('perform');
+ ctxt.perfs = {a: 0, b: 0, c:0};
+ ctxt.checker = assert.isCalledTimes(ctxt, 8);
+ var task_a = function(ii, a2, cb){ ctxt.perfs.a++; setTimeout(function(){cb('result_a', ii*ii, a2)}, 10); };
+ var task_b = function(ii, cb){ ctxt.perfs.b++; setTimeout(function(){cb('result_b', ii*ii )}, 20); };
+ var task_c = function(ii, cb){ ctxt.perfs.c++; setTimeout(function(){cb('result_c', ii*ii )}, 300); };
+ // will go second: jobs are sorted
+ broker.deferProxy('task_b', task_b, 1, ctxt.checker);
+ broker.deferProxy('task_b', ignored, '<>', ctxt.checker);
+ // will go first
+ broker.deferProxy('task_a', task_a, 0, '?', ctxt.checker);
+ broker.deferProxy('task_a', ignored, '<>', ctxt.checker);
+ // will go third
+ broker.deferProxy('task_c', task_c, 2, ctxt.checker);
+ // a & b will be done; c (takes 300ms) will still be running.
+ setTimeout(function(){
+ broker.deferProxy('task_a', task_a, 3, '!', ctxt.checker);
+ broker.deferProxy('task_c', ignored, '<>', ctxt.checker);
+ broker.deferProxy('task_a', task_a, 3, '!', ctxt.checker);
+ }, 200);
+ },
+ 'calls perform exactly once': function(){ assert.deepEqual(this.perfs, {a: 2, b: 1, c: 1}); },
+ 'calls all registered callbacks': function(results){
+ assert.deepEqual(results, [
+ ['result_a', 0, '?'], ['result_a', 0, '?'],
+ ['result_b', 1], ['result_b', 1],
+ ['result_c', 4], ['result_c', 4],
+ ['result_a', 9, '!'], ['result_a', 9, '!']
+ ])
+ },
+ teardown: function(){
+ this.broker.stop();
+ }
+ }
+ },
+});
+
+
+
+suite['export'](module);
View
21 test/test_helper.js
@@ -73,16 +73,16 @@ test_helper.udp_request = function (data){
return function(){
var udp_client = dgram.createSocket('udp4');
var buffer = new Buffer(JSON.stringify(data));
- var context = this;
+ var ctxt = this;
metalog.info('sending_udp', { data: data });
- udp_client.send(buffer, 0, buffer.length, context.udp_port, 'localhost',
- function(err, val){ delayed_callback(context)(err, val); udp_client.close(); } );
+ udp_client.send(buffer, 0, buffer.length, ctxt.udp_port, 'localhost',
+ function(err, val){ delay(ctxt.callback, ctxt)(err, val); udp_client.close(); } );
};
};
// proxies to the test context's callback after a short delay.
//
-// @example as a test topic; will get the same data the cb otherwise would have:
+// @example the test topic introduces a delay; the 'is party time' vow gets the same data the cb otherwise would have:
// { topic: send_some_data,
// 'a short time later': {
// topic: test_helper.delaying_topic,
@@ -91,7 +91,7 @@ test_helper.udp_request = function (data){
function delaying_topic(){
var args = Array.prototype.slice.apply(arguments);
args.unshift(null);
- delayed_callback(this).apply(this, args);
+ delay(this.callback, this).apply(this, args);
}
test_helper.delaying_topic = delaying_topic;
@@ -100,21 +100,22 @@ test_helper.delaying_topic = delaying_topic;
//
// @example
// // you
-// dcb = delayed_callback(this)
+// dcb = delay(this)
// foo.do_something('...', dcb);
// // foo, after do_something'ing, invokes the delayed callback
// dcb(null, 1, 2);
// // 50ms later, dcb does the equivalent of
// this.callback(null, 1, 2);
//
-function delayed_callback(context){
+function delay(orig_cb, ctxt, ms){
+ ctxt = ctxt || null;
+ ms = ms || 100;
return function(){
- var callback_delay = 100;
var args = arguments;
- setTimeout(function(){ context.callback.apply(context, args); }, callback_delay);
+ setTimeout(function(){ orig_cb.apply(ctxt, args); }, ms);
};
}
-test_helper.delayed_callback = delayed_callback;
+test_helper.delay = delay;
// test_helper.with_server --
// start server, run tests once server starts, stop server when tests are done

0 comments on commit 84c47e4

Please sign in to comment.