Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
kusor committed Mar 8, 2012
1 parent b783687 commit 37564c1
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 61 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ over [PostgreSQL](http://www.postgresql.org/).

# Installation

npm install workflow-pg-backend
npm install wf-pg-backend

# Usage

Add the following to the config file of your application using node-workflow:
Add the following to the config file of your application using wf:

{
"backend": {
"module": "workflow-pg-backend",
"module": "wf-pg-backend",
"opts": {
"port": 5432,
"host": "localhost",
Expand All @@ -30,7 +30,7 @@ above, you don't need to specify them since those are the default values.
Please, note that this module will not try to create the `database` especified,
it must exists in order to be used by this module.

And that should be it. `node-workflow` REST API and Runners should take care of
And that should be it. `wf` REST API and Runners should take care of
properly loading the module on init.

# Issues
Expand Down
110 changes: 61 additions & 49 deletions lib/workflow-pg-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
var util = require('util'),
async = require('async'),
pg = require('pg').native,
WorkflowBackend = require('node-workflow').WorkflowBackend;
wf = require('wf'),
WorkflowBackend = wf.WorkflowBackend;

var sprintf = util.format;

Expand Down Expand Up @@ -82,9 +83,10 @@ WorkflowPgBackend.prototype.createWorkflow = function (workflow, callback) {
val_places.join(', ') + ')', vals).
on('error', function (err) {
error = (err.code === '23505') ?
'Workflow.name must be unique. A workflow with name "' +
workflow.name + '" already exists' :
err.Error;
new wf.BackendInvalidArgumentError(
'Workflow.name must be unique. A workflow with name "' +
workflow.name + '" already exists') :
new wf.BackendInternalError(err.Error);
}).on('end', function () {
if (error) {
return callback(error);
Expand Down Expand Up @@ -115,10 +117,10 @@ WorkflowPgBackend.prototype.getWorkflow = function (uuid, callback) {
});
}).on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else if (workflow === null) {
return callback(sprintf(
'Workflow with uuid \'%s\' does not exist', uuid));
return callback(new wf.BackendResourceNotFoundError(sprintf(
'Workflow with uuid \'%s\' does not exist', uuid)));
} else {
return callback(null, workflow);
}
Expand Down Expand Up @@ -161,7 +163,7 @@ WorkflowPgBackend.prototype.updateWorkflow = function (workflow, callback) {
error = err.Error;
}).on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, workflow);
}
Expand All @@ -179,7 +181,7 @@ WorkflowPgBackend.prototype.deleteWorkflow = function (workflow, callback) {
'DELETE FROM wf_workflows WHERE uuid=$1 RETURNING *', [workflow.uuid],
function (err, result) {
if (err) {
return callback(err);
return callback(new wf.BackendInternalError(err));
} else {
return callback(null, (result.rows.length === 1));
}
Expand Down Expand Up @@ -211,7 +213,7 @@ WorkflowPgBackend.prototype.getWorkflows = function (callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, workflows);
}
Expand Down Expand Up @@ -247,7 +249,7 @@ WorkflowPgBackend.prototype.createJob = function (job, callback) {
error = err.Error;
}).on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, job);
}
Expand All @@ -269,10 +271,10 @@ WorkflowPgBackend.prototype.getJob = function (uuid, callback) {
job = self._decodeJob(row);
}).on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else if (job === null) {
return callback(sprintf(
'Workflow with uuid \'%s\' does not exist', uuid));
return callback(new wf.BackendResourceNotFoundError(sprintf(
'Workflow with uuid \'%s\' does not exist', uuid)));
} else {
return callback(null, job);
}
Expand Down Expand Up @@ -306,10 +308,10 @@ WorkflowPgBackend.prototype.getJobProperty = function (uuid, prop, cb) {
})
.on('end', function () {
if (error) {
return cb(error);
return cb(new wf.BackendInternalError(error));
} else if (value === null) {
return cb(sprintf(
'Job with uuid \'%s\' does not exist', uuid));
return cb(new wf.BackendResourceNotFoundError(sprintf(
'Job with uuid \'%s\' does not exist', uuid)));
} else {
return cb(null, value);
}
Expand All @@ -332,12 +334,13 @@ WorkflowPgBackend.prototype.validateJobTarget = function (job, callback) {
'execution=\'queued\' OR execution=\'running\'',
[job.workflow_uuid, job.target, JSON.stringify(job.params)]).
on('error', function (err) {
error = err.Error;
error = new wf.BackendInternalError(err.Error);
}).
on('row', function (row) {
if (row.count !== 0) {
error = 'Another job with the same target' +
' and params is already queued';
error = new wf.BackendInvalidArgumentError(
'Another job with the same target' +
' and params is already queued');
}
}).
on('end', function () {
Expand Down Expand Up @@ -375,7 +378,7 @@ WorkflowPgBackend.prototype.nextJob = function (index, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, job);
}
Expand All @@ -402,7 +405,8 @@ WorkflowPgBackend.prototype.runJob = function (uuid, runner_id, callback) {
throw new Error(err.Error);
}
if (res.rows.length === 0) {
error = sprintf('Job with uuid \'%s\' is not queued', uuid);
error = new wf.BackendPreconditionFailedError(sprintf(
'Job with uuid \'%s\' is not queued', uuid));
}
});

Expand All @@ -414,7 +418,8 @@ WorkflowPgBackend.prototype.runJob = function (uuid, runner_id, callback) {
throw err;
}
if (res.rows.length === 0) {
error = sprintf('Unable to lock job \'%s\'', uuid);
error = new wf.BackendPreconditionFailedError(sprintf(
'Unable to lock job \'%s\'', uuid));
} else {
job = res.rows[0];
}
Expand All @@ -429,7 +434,7 @@ WorkflowPgBackend.prototype.runJob = function (uuid, runner_id, callback) {
});
} catch (e) {
error = e.message;
return callback(error);
return callback(new wf.BackendInternalError(error));
}
};

Expand All @@ -447,9 +452,10 @@ WorkflowPgBackend.prototype.finishJob = function (job, callback) {
'OR execution=\'canceled\'',
[job.uuid], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
} else if (res.rows.length === 0) {
return callback('Only running jobs can be finished');
return new wf.BackendPreconditionFailedError(callback(
'Only running jobs can be finished'));
} else {
if (job.execution === 'running') {
job.execution = 'succeeded';
Expand Down Expand Up @@ -502,7 +508,7 @@ WorkflowPgBackend.prototype.updateJob = function (job, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, theJob);
}
Expand Down Expand Up @@ -534,7 +540,7 @@ WorkflowPgBackend.prototype.updateJobProperty = function (
[val, uuid],
function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
} else {
return callback(null);
}
Expand All @@ -558,9 +564,10 @@ WorkflowPgBackend.prototype.queueJob = function (job, callback) {
'SELECT * FROM wf_jobs WHERE uuid=$1 AND execution=\'running\'',
[job.uuid], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
} else if (res.rows.length === 0) {
return callback('Only running jobs can be queued');
return new wf.BackendPreconditionFailedError(callback(
'Only running jobs can be queued'));
} else {
job.execution = 'queued';
job.runner_id = null;
Expand Down Expand Up @@ -593,7 +600,7 @@ WorkflowPgBackend.prototype.nextJobs = function (start, stop, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, jobs);
}
Expand Down Expand Up @@ -626,7 +633,7 @@ WorkflowPgBackend.prototype.registerRunner = function (
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null);
}
Expand All @@ -653,7 +660,7 @@ WorkflowPgBackend.prototype.runnerActive = function (
self.client.query('UPDATE wf_runners SET (active_at)=($1) ' +
'WHERE uuid=$2 RETURNING *', [active_at, runner_id], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
}
if (res.rows.length !== 1) {
return self.registerRunner(runner_id, active_at, callback);
Expand Down Expand Up @@ -682,10 +689,10 @@ WorkflowPgBackend.prototype.getRunner = function (runner_id, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else if (runner === null) {
return callback(sprintf(
'WorkflowRunner with uuid \'%s\' does not exist', runner_id));
return callback(new wf.BackendResourceNotFoundError(sprintf(
'WorkflowRunner with uuid \'%s\' does not exist', runner_id)));
} else {
return callback(null, runner.active_at);
}
Expand All @@ -711,7 +718,7 @@ WorkflowPgBackend.prototype.getRunners = function (callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, runners);
}
Expand All @@ -727,10 +734,11 @@ WorkflowPgBackend.prototype.idleRunner = function (runner_id, callback) {
self.client.query('UPDATE wf_runners SET (idle)=(TRUE) ' +
'WHERE uuid=$1 RETURNING *', [runner_id], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
}
if (res.rows.length !== 1) {
return callback('Cannot idle unexisting runners');
return new wf.BackendResourceNotFoundError(callback(
'Cannot idle unexisting runners'));
} else {
return callback(null);
}
Expand Down Expand Up @@ -769,10 +777,11 @@ WorkflowPgBackend.prototype.wakeUpRunner = function (runner_id, callback) {
self.client.query('UPDATE wf_runners SET (idle)=(FALSE) ' +
'WHERE uuid=$1 RETURNING *', [runner_id], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
}
if (res.rows.length !== 1) {
return callback('Cannot wake up unexisting runners');
return callback(new wf.BackendResourceNotFoundError(
'Cannot wake up unexisting runners'));
} else {
return callback(null);
}
Expand All @@ -798,7 +807,7 @@ WorkflowPgBackend.prototype.getRunnerJobs = function (runner_id, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, jobs);
}
Expand All @@ -822,8 +831,9 @@ WorkflowPgBackend.prototype.getJobs = function (execution, callback) {
} else if (executions.indexOf(execution !== -1)) {
query += ' WHERE execution=\'' + execution + '\'';
} else {
return callback('excution is required and must be one of' +
'"queued", "failed", "succeeded", "canceled", "running"');
return callback(new wf.BackendInvalidArgumentError(
'excution is required and must be one of' +
'"queued", "failed", "succeeded", "canceled", "running"'));
}

return self.client.query(query).
Expand All @@ -835,7 +845,7 @@ WorkflowPgBackend.prototype.getJobs = function (execution, callback) {
}).
on('end', function () {
if (error) {
return callback(error);
return callback(new wf.BackendInternalError(error));
} else {
return callback(null, jobs);
}
Expand All @@ -856,9 +866,10 @@ WorkflowPgBackend.prototype.addInfo = function (uuid, info, callback) {
'SELECT * FROM wf_jobs WHERE uuid=$1',
[uuid], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
} else if (res.rows.length === 0) {
return callback('Job does not exist. Cannot Update.');
return callback(new wf.BackendResourceNotFoundError(
'Job does not exist. Cannot Update.'));
} else {
existing_info = JSON.parse(res.rows[0].info) || [];
existing_info.push(info);
Expand All @@ -882,9 +893,10 @@ WorkflowPgBackend.prototype.getInfo = function (uuid, callback) {
'SELECT info FROM wf_jobs WHERE uuid=$1',
[uuid], function (err, res) {
if (err) {
return callback(err.Error);
return callback(new wf.BackendInternalError(err.Error));
} else if (res.rows.length === 0) {
return callback('Job does not exist. Cannot get info.');
return callback(new wf.BackendResourceNotFoundError(
'Job does not exist. Cannot get info.'));
} else {
return callback(null, (JSON.parse(res.rows[0].info) || []));
}
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"author": "Pedro Palazón Candel <kusorbox@gmail.com> (http://www.joyent.com)",
"name": "node-workflow-pg-backend",
"description": "A backend for node-workflow built over PostgreSQL",
"version": "0.1.0",
"name": "wf-pg-backend",
"description": "A backend for wf built over PostgreSQL",
"version": "0.2.0",
"homepage": "https://github.com/kusor/node-workflow-pg-backend",
"repository": {
"type": "git",
Expand All @@ -20,7 +20,7 @@
"bunyan": "0.6.8",
"async": "0.1.15",
"node-uuid": "1.3.3",
"node-workflow": "0.1.0"
"wf": "0.2.0"
},
"devDependencies": {
"tap": "0.2.0"
Expand Down
Loading

0 comments on commit 37564c1

Please sign in to comment.