Skip to content

Commit

Permalink
create manager->processor pool
Browse files Browse the repository at this point in the history
major point of this is to create a pool of processors and a connection
pool. The connection pool holds concurrency + 2 connections to the
faktory server and each processor has it's own work loop and fetch
cycle. The connection pool is managed by the manager and all lifecycle
hooks are syndicated to the pool of workers. A processor is an object
that polls for jobs--checking out a connection for each command it
executes. It's working loop is an infinite for loop with awaiting on the
fetch and job execution.

                  ,_______________,
                  |    faktory    |
                  -----------------
                          |
                  ,________________
                 /        |        \
     ,___________,  ,___________,  ,___________,
     | processor |  | processor |  | processor |
     -------------  -------------  -------------
                  \       |       /
                   ---------------
                          |
                   ,-------------,
                   |   manager   |
                   ---------------

Where each processor has its own connection to the faktory server and
processor pools are managed by a manager. A manager has n concurrency
and that's the number of processors and connections that will be started
to serve that pool. All of this lives within one node process.

Benchmarks:

pool:

› bench/push

pushed: 30000 duration: 1.35946858s jobs/s: 22067

› bench/push

pushed: 30000 duration: 1.430357713s jobs/s: 20974

› bench/push

pushed: 30000 duration: 1.3823066609999999s jobs/s: 21703

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 6.7563684649999995s
Jobs/s: 4440

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 6.653886883s
Jobs/s: 4509

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 6.515825339s
Jobs/s: 4604

===============================================================

pool without await this.execute and sleeping in the loop

Jobs processed: 30000
Concurrency: NaN

Duration: 5.220679224s
Jobs/s: 5746

================================================================

pool without log

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 3.288809578s
Jobs/s: 9122

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 3.31144746s
Jobs/s: 9059

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 3.2611014s
Jobs/s: 9199

================================================================

master:

› bench/push

pushed: 30000 duration: 1.382443728s jobs/s: 21701

› bench/push

pushed: 30000 duration: 1.378380277s jobs/s: 21765

› bench/push

pushed: 30000 duration: 1.376629829s jobs/s: 21792

› bench/work

Jobs processed: 30000
Concurrency: 20

Duration: 4.639131743s
Jobs/s: 6467

› bench/push

Jobs processed: 30000
Concurrency: NaN

Duration: 4.458317899s
Jobs/s: 6729

› bench/push

Jobs processed: 30000
Concurrency: NaN

Duration: 4.361462364s
Jobs/s: 6878

================================================

master w/ connection pool

› bench/work

Jobs processed: 30000
Concurrency: NaN

Duration: 4.595406734s
Jobs/s: 6528

› bench/work

Jobs processed: 30000
Concurrency: NaN

Duration: 4.602304733s
Jobs/s: 6518
  • Loading branch information
jbielick committed Nov 12, 2017
1 parent 228684c commit 1932c89
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 198 deletions.
8 changes: 6 additions & 2 deletions bench/work
Expand Up @@ -8,7 +8,7 @@ const concurrency = Number(process.argv[2]);
let completed = 0;
const time = process.hrtime();

faktory.register('MyJob', async () => {
const finish = () => {
completed += 1;
if (completed === 30000) {
const diff = process.hrtime(time);
Expand All @@ -25,8 +25,12 @@ Jobs/s: ${Math.round(completed / duration, 2)}
`);
process.exit(0);
}
};

faktory.register('MyJob', async () => {
finish();
});

await faktory.work({ concurrency });
const manager = await faktory.work({ concurrency });

})();
18 changes: 18 additions & 0 deletions bin/faktory-cluster
@@ -0,0 +1,18 @@
#!/usr/bin/env node

const cluster = require('cluster');
const workers = Number(process.argv[2]) || require('os').cpus().length;
const faktory = require('../');
const program = require('../lib/cli');

if (cluster.isMaster) {
for (let i = 0; i < workers; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
faktory.work(program);
}
15 changes: 1 addition & 14 deletions bin/faktory-worker
@@ -1,19 +1,6 @@
#!/usr/bin/env node

const program = require('../cli');
const faktory = require('../');

// (async () => {
// const client = await faktory.connect();
// const job = await client.fetch('default');
// console.log(job);
// })();

faktory.register('MyJob', async () => {
await new Promise((resolve, reject) => {
process.stdout.write('.');
resolve(true);
});
});
const program = require('../lib/cli');

faktory.work(program);
30 changes: 26 additions & 4 deletions lib/cli.js
Expand Up @@ -2,12 +2,34 @@

const program = require('commander');

function list(val) {
return val.split(',');
function collectSplit(val, memo) {
memo.push(val.split(','));
return memo;
}

function collect(val, memo) {
memo.push(val);
return memo;
}

module.exports = program
.version('0.1.0')
.version(`faktory-worker ${require('../package.json').version}`)
.usage('[options]')
.option('-q, --queues <items>', 'Comma-separated queues to work on', list)
.description(`
___ __ __ __
/ _/__ _/ /__/ /____ ______ __ _ _____ ____/ /_____ ____
/ _/ _ \`/ '_/ __/ _ \\/ __/ // / | |/|/ / _ \\/ __/ '_/ -_) __/
/_/ \\_,_/_/\\_\\\\__/\\___/_/ \\_, / |__,__/\\___/_/ /_/\\_\\\\__/_/
/___/
`)
.option('-q, --queue <queue[,weight]>', 'queues to process with optional weights', collectSplit, [])
.option('-c, --concurrency <n>', 'number of concurrent workers', parseInt)
.option('-t, --timeout <n>', 'shutdown timeout', parseInt)
.option('-e, --environment <env>', 'application environment')
.option('-l, --label <label>', 'worker label', collect, [])
.option('-r, --require <path>', 'worker directory to require')
.option('-v, --verbose', 'print verbose output')
.option('-v, --version', 'print version and exit')
.parse(process.argv);

program.labels = program.label;
7 changes: 3 additions & 4 deletions lib/index.js
@@ -1,5 +1,5 @@
const Client = require('faktory-client');
const Processor = require('./processor');
const Manager = require('./manager');

const registry = {};

Expand All @@ -14,8 +14,7 @@ module.exports = {
return new Client(...args).connect();
},
async work(options = {}) {
const processor = new Processor(Object.assign({}, options, { registry }));
await processor.run();
return processor;
const manager = new Manager(Object.assign({}, options, { registry }));
return manager.run();
}
};
105 changes: 105 additions & 0 deletions lib/manager.js
@@ -0,0 +1,105 @@
const debug = require('debug')('faktory-worker:manager');
const Processor = require('./processor');
const Client = require('faktory-client');
const blocked = require('blocked');
const pool = require('generic-pool');

blocked((ms) => {
debug(`Event loop blocked ${ms}`);
}, { threshold: 10 });

module.exports = class Manager {
constructor(options = {}) {
const { concurrency, timeout } = options;

this.concurrency = concurrency || 20;
this.timeout = timeout || 8000;

this.pool = this.constructor.createPool(options, this.concurrency + 2);
options.withConnection = this.withConnection.bind(this);

this.processors = [];
for (let i = this.concurrency; i > 0; i--) {
this.processors.push(new Processor(options));
}
}

static createPool(options, size) {
return pool.createPool({
create() {
return new Client(options).connect();
},
destroy(client) {
return client.close();
}
}, {
min: 1,
max: size
});
}

async withConnection(fn, priority) {
const client = await this.pool.acquire();
try {
return fn(client);
} finally {
await this.pool.release(client);
}
}

trapSignals() {
process
.on('SIGTERM', () => this.stop())
.on('SIGTSTP', () => this.quiet())
.on('SIGINT', () => this.stop());
}

/**
* stop accepting new jobs and continue working on what's currently in progress
* @return {void}
*/
quiet() {
this.log('Quieting');
this.processors.map(p => p.quiet());
}

/**
* stop accepting new jobs, fail those that are in progress and shutdown
* @return {[type]} [description]
*/
async stop() {
this.log('Stopping');
const start = Date.now();

this.processors.map(p => p.stop());

return new Promise((resolve) => {
const shutdown = async () => {
let working = this.busy;
if (working.length === 0 || Date.now() - start > this.timeout) {
this.log(`Shutting down. In progress: ${working.length}`);
await this.pool.drain();
this.pool.clear();
resolve();
} else {
setTimeout(shutdown, 10);
}
};

shutdown();
});
}

get busy() {
return this.processors.filter(p => p.working);
}

run() {
this.trapSignals();
this.processors.map(p => p.start())
}

log(msg) {
console.log(`${new Date().toJSON()} faktory-manager ${msg}`);
}
};

0 comments on commit 1932c89

Please sign in to comment.