Process Jobs

Thanasis Polychronakis edited this page Apr 25, 2013 · 5 revisions

kickq.process( jobNames, optOptions, callback )

The kickq.process() method requires two arguments, the jobNames a string or array of strings and callback a function. The kickq.process() method essentially kicks off a worker.

The Arguments

Arg0 :: jobNames

Type: string | Array Default: Required!

The Job Name can be a string or an array of strings. When an array is defined the order matters as the first items will always have priority over the later. The Redis' BLPOP command rules apply.

Arg1 :: optOptions

Type: Object Default: null

A key/value Object literal containing worker specific options. Currently there is only one option available:

  • concurrentJobs {number} default: 1 Set how many times the worker's callback will be invoked concurently. High latency but low cost tasks are best suited for a large number of concurent operations.

Arg2 :: callback

Type: Function(jobItem, data, callback) Default: Required!

The callback argument is a reference to a function that will get invoked when a job is ready to be processed. It gets invoked with three arguments:

  • jobItem {kickq.JobItem} The Job Item Object.
  • data {ANY} Any type of data passed to the job when it was created. Note: jobItem.data === data.
  • callback {Function(optErr, optCallback)} One of the two ways to complete a job is to invoke the provided callback function. If the job was processed successfully then invoke the callback with no arguments. If the job failed invoke the callback with an error message, string.
    • optErr {null | string} If processing failed give an error description.
    • optCallback {Function(err)} An optional callback function that gets invoked after the Kickq finishes the Job Item post-process operations.

The second way to complete a job is by returning a promise that will eventually be resolved. Check out the Promise resolving example bellow

Examples

A Plain Worker

function processJob(jobItem, data, cb) {
  jobItem.id; // the id
  jobItem.name; // the job's name
  jobItem.data === data; // same reference

  cb('error'); // <-- error
  cb(false); // <-- error
  cb(); // <-- no error

  // v-- no error and callback when post-process db ops are done
  cb(null, function(err){ /* ... */});
}

// options can be ommited
kickq.process('another job name', processJob);

Multiple Concurent Workers

var options = {
  concurrentJobs: 10 // total number of concurent jobs
}

kickq.process(['job name'], options, processJob);

Defining Multiple Jobs

kickq.process(['job name', 'another job', 'a third job'], options, processJob);

Beware If you provide an array of job names, sequence matters! Jobs will be fetched in sequence as per redis' blpop ordering.

Process a Job Using a Promise

function processJob(job, data, cb) {
  // Create a deferred object
  var deferred = when.defer();

  anAsyncOperation(function() {
    deferred.resolve(); // complete the job successfully
    deferred.reject('error message'); // fail the job
  });

  // return a promise
  return deferred.promise;
}

// start worker
kickq.process(['another job name'], processJob);