Skip to content

Commit

Permalink
TritonDataCenter/node-workflow#59: Use Bunyan for logging + log errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
kusor committed Apr 16, 2012
1 parent b740086 commit 04144e7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
57 changes: 56 additions & 1 deletion lib/workflow-redis-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// very similar
var util = require('util'),
async = require('async'),
Logger = require('bunyan'),
wf = require('wf'),
WorkflowBackend = wf.WorkflowBackend;

Expand All @@ -15,6 +16,21 @@ var WorkflowRedisBackend = module.exports = function (config) {
WorkflowBackend.call(this);
this.config = config;
this.client = null;
if (!config.logger) {
config.logger = {};
}

config.logger.name = 'wf-redis-backend';
config.logger.serializers = {
err: Logger.stdSerializers.err
};

config.logger.streams = config.logger.streams || [ {
level: 'debug',
stream: process.stdout
}];

this.log = new Logger(config.logger);
};

util.inherits(WorkflowRedisBackend, WorkflowBackend);
Expand All @@ -38,7 +54,7 @@ WorkflowRedisBackend.prototype.init = function (callback) {
}

self.client.on('error', function (err) {
console.error('Redis error => ' + err.name + ':' + err.message);
self.log.error({error: err});
});

self.client.on('connect', function () {
Expand Down Expand Up @@ -91,6 +107,7 @@ WorkflowRedisBackend.prototype.createWorkflow = function (workflow, callback) {
workflow.name,
function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -104,6 +121,7 @@ WorkflowRedisBackend.prototype.createWorkflow = function (workflow, callback) {
return multi.exec(function (err, replies) {
// console.log(replies); => [ 'OK', 0 ]
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
if (workflow.chain) {
Expand All @@ -126,6 +144,7 @@ WorkflowRedisBackend.prototype.getWorkflow = function (uuid, callback) {

self.client.hgetall('workflow:' + uuid, function (err, workflow) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else if (Object.keys(workflow).length === 0) {
return callback(new wf.BackendResourceNotFoundError(sprintf(
Expand Down Expand Up @@ -157,6 +176,7 @@ WorkflowRedisBackend.prototype.deleteWorkflow = function (workflow, callback) {
multi.exec(function (err, replies) {
// console.log(replies); => [ 1, 1 ]
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
return callback(null, true);
Expand Down Expand Up @@ -189,6 +209,7 @@ WorkflowRedisBackend.prototype.updateWorkflow = function (workflow, callback) {
'workflow:' + workflow.uuid,
function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -199,6 +220,7 @@ WorkflowRedisBackend.prototype.updateWorkflow = function (workflow, callback) {

return self.getWorkflow(workflow.uuid, function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
aWorkflow = result;
Expand All @@ -207,6 +229,7 @@ WorkflowRedisBackend.prototype.updateWorkflow = function (workflow, callback) {
workflow.name,
function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -226,6 +249,7 @@ WorkflowRedisBackend.prototype.updateWorkflow = function (workflow, callback) {
return multi.exec(function (err, replies) {
// console.log(replies); => [ 'OK', 0 ]
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
if (workflow.chain) {
Expand Down Expand Up @@ -271,6 +295,7 @@ WorkflowRedisBackend.prototype.createJob = function (job, callback) {
return multi.exec(function (err, replies) {
// console.log(replies, false, 8); => [ 'OK', 1, 1 ]
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
return self._decodeJob(job, function (job) {
Expand All @@ -288,6 +313,7 @@ WorkflowRedisBackend.prototype.getJob = function (uuid, callback) {

return self.client.hgetall('job:' + uuid, function (err, job) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
if (Object.keys(job).length === 0) {
Expand All @@ -313,6 +339,7 @@ WorkflowRedisBackend.prototype.getJobProperty = function (uuid, prop, cb) {
'params'];
self.client.hget('job:' + uuid, prop, function (err, val) {
if (err) {
self.log.error({error: err});
return cb(new wf.BackendInternalError(err));
} else {
if (encoded_props.indexOf(prop) !== -1) {
Expand All @@ -338,6 +365,7 @@ WorkflowRedisBackend.prototype.validateJobTarget = function (job, callback) {
'wf_target:' + job.target,
function (err, members) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
if (members.length === 0) {
Expand Down Expand Up @@ -367,6 +395,7 @@ WorkflowRedisBackend.prototype.validateJobTarget = function (job, callback) {
});
}, function (err) {
if (err) {
self.log.error({error: err});
return callback(err);
}
return callback(null);
Expand All @@ -388,6 +417,7 @@ WorkflowRedisBackend.prototype.nextJob = function (index, callback) {

self.client.lrange('wf_queued_jobs', index, index, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -411,6 +441,7 @@ WorkflowRedisBackend.prototype.runJob = function (uuid, runner_id, callback) {

return self.client.lrem('wf_queued_jobs', 0, uuid, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -426,6 +457,7 @@ WorkflowRedisBackend.prototype.runJob = function (uuid, runner_id, callback) {
multi.hset('job:' + uuid, 'runner_id', runner_id);
return multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
return self.getJob(uuid, callback);
Expand Down Expand Up @@ -453,6 +485,7 @@ WorkflowRedisBackend.prototype.finishJob = function (job, callback) {

return self.client.lrem('wf_running_jobs', 0, job.uuid, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -477,6 +510,7 @@ WorkflowRedisBackend.prototype.finishJob = function (job, callback) {
multi.hdel('job:' + job.uuid, 'runner_id');
return multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
return self.getJob(job.uuid, callback);
Expand All @@ -501,6 +535,7 @@ WorkflowRedisBackend.prototype.updateJob = function (job, callback) {
}
self.client.hmset('job:' + job.uuid, job, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return self.getJob(job.uuid, callback);
Expand Down Expand Up @@ -528,6 +563,7 @@ WorkflowRedisBackend.prototype.updateJobProperty = function (

self.client.hset('job:' + uuid, prop, val, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback();
Expand All @@ -554,6 +590,7 @@ WorkflowRedisBackend.prototype.queueJob = function (job, callback) {

return self.client.lrem('wf_running_jobs', 0, job.uuid, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -568,6 +605,7 @@ WorkflowRedisBackend.prototype.queueJob = function (job, callback) {
multi.hdel('job:' + job.uuid, 'runner_id');
return multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
} else {
return self.getJob(job.uuid, callback);
Expand All @@ -587,6 +625,7 @@ WorkflowRedisBackend.prototype.nextJobs = function (start, stop, callback) {

self.client.lrange('wf_queued_jobs', start, stop, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand Down Expand Up @@ -618,6 +657,7 @@ WorkflowRedisBackend.prototype.registerRunner = function (
active_at,
function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
// Actually, we don't care at all about the 0/1 possible return values.
Expand Down Expand Up @@ -645,6 +685,7 @@ WorkflowRedisBackend.prototype.getRunner = function (runner_id, callback) {
var self = this;
self.client.hget('wf_runners', runner_id, function (err, runner) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback(null, new Date(runner));
Expand All @@ -659,6 +700,7 @@ WorkflowRedisBackend.prototype.getRunners = function (callback) {
return self.client.hgetall('wf_runners', function (err, runners) {
var theRunners = {};
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
Object.keys(runners).forEach(function (uuid) {
Expand All @@ -675,6 +717,7 @@ WorkflowRedisBackend.prototype.idleRunner = function (runner_id, callback) {
var self = this;
self.client.sadd('wf_idle_runners', runner_id, function (err) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback(null);
Expand Down Expand Up @@ -702,6 +745,7 @@ WorkflowRedisBackend.prototype.wakeUpRunner = function (runner_id, callback) {
var self = this;
self.client.srem('wf_idle_runners', runner_id, function (err) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback(null);
Expand All @@ -716,6 +760,7 @@ WorkflowRedisBackend.prototype.getRunnerJobs = function (runner_id, callback) {
var self = this;
self.client.smembers('wf_runner:' + runner_id, function (err, jobs) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback(null, jobs);
Expand All @@ -731,13 +776,15 @@ WorkflowRedisBackend.prototype.getWorkflows = function (callback) {

return self.client.smembers('wf_workflows', function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
res.forEach(function (uuid) {
multi.hgetall('workflow:' + uuid);
});
return multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
replies.forEach(function (workflow, i, arr) {
Expand Down Expand Up @@ -774,6 +821,7 @@ WorkflowRedisBackend.prototype.getJobs = function (execution, callback) {

multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
replies.forEach(function (job, i, arr) {
Expand All @@ -788,6 +836,7 @@ WorkflowRedisBackend.prototype.getJobs = function (execution, callback) {
list_name = 'wf_' + execution + '_jobs';
return self.client.llen(list_name, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return self.client.lrange(
Expand All @@ -803,6 +852,7 @@ WorkflowRedisBackend.prototype.getJobs = function (execution, callback) {
});
return multi.exec(function (err, replies) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
replies.forEach(function (job, i, arr) {
Expand Down Expand Up @@ -834,6 +884,7 @@ WorkflowRedisBackend.prototype.addInfo = function (uuid, info, callback) {

return self.client.exists('job:' + uuid, function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -844,6 +895,7 @@ WorkflowRedisBackend.prototype.addInfo = function (uuid, info, callback) {

return self.client.rpush('jobinfo:' + uuid, info, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
return callback();
Expand All @@ -862,6 +914,7 @@ WorkflowRedisBackend.prototype.getInfo = function (uuid, callback) {

return self.client.exists('job:' + uuid, function (err, result) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}

Expand All @@ -872,6 +925,7 @@ WorkflowRedisBackend.prototype.getInfo = function (uuid, callback) {

return self.client.llen('jobinfo:' + uuid, function (err, res) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
llen = res;
Expand All @@ -881,6 +935,7 @@ WorkflowRedisBackend.prototype.getInfo = function (uuid, callback) {
llen,
function (err, items) {
if (err) {
self.log.error({error: err});
return callback(new wf.BackendInternalError(err));
}
if (items.length) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"author": "Pedro Palazón Candel <kusorbox@gmail.com> (http://www.joyent.com)",
"name": "wf-redis-backend",
"description": "A backend for wf built over Redis",
"version": "0.2.1",
"version": "0.2.2",
"homepage": "https://github.com/kusor/node-workflow-redis-backend",
"repository": {
"type": "git",
Expand All @@ -20,7 +20,7 @@
"redis": "0.7.1",
"async": "0.1.18",
"node-uuid": "1.3.3",
"wf": "0.2.1"
"wf": "0.2.3"
},
"devDependencies": {
"tap": "0.2.0"
Expand Down

0 comments on commit 04144e7

Please sign in to comment.