Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

GH-81: add function which can be run within a task to add to the job'…

…s info array
  • Loading branch information...
commit 1d91df9ebcb0329130edec6e3fcd24495c994d92 1 parent 886346e
Rob Gulewich rgulewich authored
22 lib/job-runner.js
@@ -247,14 +247,32 @@ WorkflowJobRunner.prototype.runTask = function (task, chain, cb) {
247 247 try {
248 248
249 249 self.onChildUp();
250   - // Message may contain either only 'error' member, or also 'cmd',
251   - // 'result' and 'trace'.
  250 + // Message may contain one of the 'error', 'cmd', or 'info' members,
  251 + // plus 'result' and 'trace' (optional).
252 252 self.child.on('message', function (msg) {
253 253 if (self.log.debug()) {
254 254 self.log.debug({
255 255 msg: msg
256 256 }, 'child process message');
257 257 }
  258 +
  259 + if (msg.info) {
  260 + var info = {
  261 + data: msg.info,
  262 + date: new Date().toISOString()
  263 + };
  264 + if (msg.trace) {
  265 + info.trace = msg.trace;
  266 + }
  267 +
  268 + return self.backend.addInfo(self.job.uuid, info,
  269 + function (err) {
  270 + if (err) {
  271 + self.log.error({err: err}, 'Error adding info');
  272 + }
  273 + });
  274 + }
  275 +
258 276 // Save the results into the result chain + update on the backend.
259 277 var res = {
260 278 result: msg.result,
10 lib/task-runner.js
@@ -180,6 +180,12 @@ WorkflowTaskRunner.prototype.retryTask = function (cb) {
180 180 }, self.timeout);
181 181 }
182 182
  183 + self.job.info = function (info) {
  184 + return cb(self.formatResults({
  185 + info: info
  186 + }));
  187 + };
  188 +
183 189 self.body(self.job, function (err, res) {
184 190 if (self.retryTimedOut) {
185 191 self.retryTimedOut = false;
@@ -303,6 +309,10 @@ WorkflowTaskRunner.prototype.formatResults = function (msg) {
303 309 msg.error = '';
304 310 }
305 311
  312 + if (msg.info) {
  313 + msg.cmd = 'info';
  314 + }
  315 +
306 316 if (!msg.cmd) {
307 317 if (msg.error === '') {
308 318 msg.cmd = 'run';
4 lib/workflow-in-memory-backend.js
@@ -286,8 +286,12 @@ Backend.prototype.finishJob = function (job, callback) {
286 286 if (job.execution === 'running') {
287 287 job.execution = 'succeeded';
288 288 }
  289 + var info = self.jobs[job.uuid].info;
289 290 job.runner_id = null;
290 291 self.jobs[job.uuid] = deepCopy(job);
  292 + if (info) {
  293 + self.jobs[job.uuid].info = info;
  294 + }
291 295 return callback(null, job);
292 296 }
293 297 };
62 test/job-runner.test.js
@@ -23,7 +23,7 @@ var Backend = require(helper.config().backend.module),
23 23 backend = new Backend(helper.config().backend.opts),
24 24 factory, wf_job_runner;
25 25
26   -var okWf, failWf, timeoutWf, reQueueWf, reQueuedJob, elapsed;
  26 +var okWf, failWf, timeoutWf, reQueueWf, infoWf, reQueuedJob, elapsed;
27 27
28 28 var FakeRunner = function () {
29 29 this.child_processes = {};
@@ -144,7 +144,26 @@ test('setup', function (t) {
144 144 t.ifError(err, 'ReQueue wf error');
145 145 t.ok(wf, 'ReQueue wf ok');
146 146 reQueueWf = wf;
147   - t.end();
  147 +
  148 + // infoWf
  149 + factory.workflow({
  150 + name: 'Info wf',
  151 + chain: [ {
  152 + name: 'Info Task',
  153 + retry: 1,
  154 + body: function (job, cb) {
  155 + job.info('recording some info');
  156 + return cb(null);
  157 + }
  158 + }],
  159 + timeout: 60
  160 + }, function (err, wf) {
  161 + t.ifError(err, 'Info wf error');
  162 + t.ok(wf, 'Info wf ok');
  163 + infoWf = wf;
  164 +
  165 + t.end();
  166 + });
148 167 });
149 168 });
150 169 });
@@ -647,6 +666,45 @@ test('a canceled job', function (t) {
647 666 });
648 667
649 668
  669 +test('a job can call job.info()', function (t) {
  670 + factory.job({
  671 + workflow: infoWf.uuid,
  672 + exec_after: '2012-01-03T12:54:05.788Z'
  673 + }, function (err, job) {
  674 + t.ifError(err, 'job error');
  675 + t.ok(job, 'run job ok');
  676 + wf_job_runner = new WorkflowJobRunner({
  677 + runner: runner,
  678 + backend: backend,
  679 + job: job,
  680 + trace: false
  681 + });
  682 + t.ok(wf_job_runner, 'wf_job_runner ok');
  683 + backend.runJob(job.uuid, runner.uuid, function (err, job) {
  684 + t.ifError(err, 'backend.runJob error');
  685 + wf_job_runner.run(function (err) {
  686 + var uuid = job.uuid;
  687 + t.ifError(err, 'wf_job_runner run error');
  688 + backend.getJob(job.uuid, function (err, j) {
  689 + t.ifError(err, 'backend.getJob error');
  690 + t.equal(j.execution, 'succeeded');
  691 + t.equal(j.chain_results.length, 1);
  692 + t.equal(j.chain_results[0].result, 'OK');
  693 +
  694 + backend.getInfo(uuid, function (err, info) {
  695 + t.ifError(err, 'backend.getInfo error');
  696 + t.equal(info.length, 1);
  697 + t.equal(info[0].data, 'recording some info');
  698 + t.end();
  699 + });
  700 + });
  701 + });
  702 +
  703 + });
  704 + });
  705 +});
  706 +
  707 +
650 708 test('teardown', function (t) {
651 709 backend.quit(function () {
652 710 t.end();
44 test/task-runner.test.js
@@ -525,3 +525,47 @@ test('a task which fails and is canceled', function (t) {
525 525 });
526 526
527 527 });
  528 +
  529 +
  530 +test('a task which calls job.info', function (t) {
  531 + task.body = function (job, cb) {
  532 + job.info('an info string');
  533 + return cb(null);
  534 + }.toString();
  535 +
  536 + job.chain.push(task);
  537 +
  538 + var wf_task_runner = new WorkflowTaskRunner({
  539 + job: job,
  540 + task: task
  541 + });
  542 +
  543 + t.ok(wf_task_runner.uuid);
  544 + t.equal(typeof (wf_task_runner.body), 'function');
  545 +
  546 + // The callback will be called twice: the first time for info(),
  547 + // the second time to finish the task
  548 +
  549 + var firstTime = true;
  550 +
  551 + wf_task_runner.runTask(function (msg) {
  552 + t.ifError(msg.error, 'task error');
  553 + t.ok(msg.job);
  554 + t.equal(msg.task_name, task.name);
  555 +
  556 + if (firstTime) {
  557 + t.ok(msg.info, 'info present');
  558 + t.notOk(msg.result, 'result not present');
  559 + t.equal(msg.cmd, 'info', 'info cmd');
  560 + t.equal(msg.info, 'an info string', 'info string');
  561 + firstTime = false;
  562 + return;
  563 + }
  564 +
  565 + t.ok(msg.result, 'result present');
  566 + t.notOk(msg.info, 'info not present');
  567 + t.equal(msg.cmd, 'run', 'run cmd');
  568 + t.end();
  569 + });
  570 +
  571 +});

0 comments on commit 1d91df9

Please sign in to comment.
Something went wrong with that request. Please try again.