Skip to content

Commit

Permalink
Bring API in line with design in README.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Allen committed Jan 21, 2014
1 parent ab0bf28 commit 2b39bd4
Show file tree
Hide file tree
Showing 5 changed files with 508 additions and 0 deletions.
39 changes: 39 additions & 0 deletions Gruntfile.js
@@ -0,0 +1,39 @@
module.exports = function(grunt) {

grunt.initConfig({
env: {
dev: {
},
test: {
NODE_ENV: 'test'
},
// For running not on heroku with prod params - heroku does not use these.
prod: {
}
},
jshint: {
all: ['Gruntfile.js', 'lib/**/*.js', 'test/**/*.js']
},
mochacli: {
all: ['test/**/*.js'],
options: {
reporter: 'mocha-unfunk-reporter',
ui: 'tdd'
}
},
watch: {
files: ['lib/**/*.js', 'test/**/*.js'],
tasks: 'test'
}
});

grunt.loadNpmTasks('grunt-contrib-jshint');
grunt.loadNpmTasks('grunt-contrib-watch');
grunt.loadNpmTasks('grunt-mocha-cli');
grunt.loadNpmTasks('grunt-env');
grunt.registerTask('test', ['env:test', 'mochacli']);
grunt.registerTask('dev', ['env:dev', 'exec:dev']);
grunt.registerTask('prod', ['env:prod', 'exec:dev']);
};

// vim: set et sw=2 ts=2 colorcolumn=80:
5 changes: 5 additions & 0 deletions index.js
@@ -0,0 +1,5 @@
/**
* The entry point.
**/

module.exports = require("./lib/jobs");
148 changes: 148 additions & 0 deletions lib/jobs.js
@@ -0,0 +1,148 @@
var async = require('async'),
moment = require('moment'),
events = require('events'),
_ = require('lodash'),
// For now, this isn't DB backed - we just store the jobs in memory.
next_job_id = 0,
jobs = [],
shouldStillProcess;

// Provide the ability to set/clear internal job state in unit tests.
if (process.env.NODE_ENV == 'test') {
exports.setJobs = function(newJobs) {
jobs = newJobs;
};
exports.getJobs = function() {
return jobs;
};
exports.getScheduledJobs = function() {
return _.filter(jobs, function(jobContainer) {
return jobContainer.processNext !== null;
});
};
}

exports.eventEmitter = new events.EventEmitter();

exports.create = function(job, processIn, done) {
var jobContainer = {
id: next_job_id++,
processNext: moment().add(processIn).toDate(),
jobData: [job]
};

jobs.push(jobContainer);
done();
};

function updateJob(jobContainer, newJobData, processIn, cb) {
jobContainer.jobData.push(newJobData);

if(processIn === null) {
jobContainer.processNext = null;
} else {
jobContainer.processNext = moment().add('valueOf', processIn);
}
exports.eventEmitter.emit('jobUpdated');
return cb();
}

function maybeServiceJob(jobContainer, serviceJob, cb) {
exports.eventEmitter.emit('maybeServiceJob');

var processingComplete = function(err, newJobData, processIn) {
if (err !== null) {
console.log('not updating job due to err callback');
return cb();
} else {
updateJob(jobContainer, newJobData, processIn, cb);
}
};

// Is processNext time less than or equal to current time?
if (jobContainer.processNext.valueOf() <= moment().valueOf()) {
// Job is ready for processing.
var jobId = jobContainer.id;
var jobData = _.last(jobContainer.jobData);
serviceJob(jobId, jobData, processingComplete);
} else {
// Job is not to be processed yet. Push back on queue and callback.
console.log('not running job yet, it is still delayed.');
cb();
}
}

/**
* Examines and services jobs in the 'jobs' array, repeatedly, forever, unless
* stopProcessing() is called.
* Call this once to start processing jobs.
* @param {function} serviceJob Iterator function to be run on jobs requiring
* service.
*/
exports.process = function(serviceJob) {
shouldStillProcess = true;

async.whilst(
// test
function() {return shouldStillProcess;},
// iterator
function(cb) {
setImmediate(function() {
var jobsToProcess = _.filter(jobs, function(job) {
return job.processNext !== null;
});

if (jobsToProcess.length === 0) {
console.log('no jobs scheduled');
return cb();
}
// Find job with earliest time.
var jobToProcess = _.min(jobsToProcess, function(jobContainer) {
return jobContainer.processNext.valueOf();
});

console.log('Going to process this job:');
console.log(jobToProcess);

maybeServiceJob(jobToProcess, serviceJob, cb);
});
},
// called when test fails and execution ceases
function() {});
};

exports.stopProcessing = function() {
shouldStillProcess = false;
};

exports.processNow = function(id, callback, done) {
var jobContainer = _.find(jobs, function(jContainer) {
return jContainer.id === id;
});

var processingComplete = function(err, newJobData, processIn) {
if (err !== null) {
console.log('not updating job due to err callback');
return done();
} else {
updateJob(jobContainer, newJobData, processIn, done);
}
};
callback(null, {}, processingComplete);
};

function latestData(id) {
var jobContainer = _.find(jobs, function(jobCont) {return jobCont.id == id;});
return _.last(jobContainer.jobData);
}

exports.get = function(id, cb) {
return cb(null, latestData(id));
};

exports.getHistory = function(id, cb) {
var jobContainer = _.find(jobs, function(jobCont) {return jobCont.id == id;});
return cb(null, jobContainer.jobData);
};

// vim: set et sw=2 ts=2 colorcolumn=80:
41 changes: 41 additions & 0 deletions package.json
@@ -0,0 +1,41 @@
{
"name": "node-pg-jobs",
"version": "0.0.0",
"description": "A simple yet powerful postgres backed job queue for node.js with state machine like operation.",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "grunt test"
},
"repository": {
"type": "git",
"url": "git://github.com/noblesamurai/node-pg-jobs.git"
},
"keywords": [
"job",
"queue",
"postgres"
],
"author": "Timothy Leslie Allen <allen.timothy.email@gmail.com> (https://github.com/timlesallen)",
"license": "MIT",
"bugs": {
"url": "https://github.com/noblesamurai/node-pg-jobs/issues"
},
"devDependencies": {
"grunt-env": "~0.4.1",
"grunt-contrib-watch": "~0.5.3",
"grunt-mocha-cli": "~1.5.0",
"grunt": "~0.4.2",
"grunt-contrib-jshint": "~0.8.0",
"mocha-unfunk-reporter": "~0.4.0",
"chai": "~1.8.1"
},
"dependencies": {
"async": "~0.2.9",
"moment": "~2.5.0",
"sinon": "~1.7.3",
"lodash": "~2.4.1"
}
}

0 comments on commit 2b39bd4

Please sign in to comment.