API Job Methods

jnordberg edited this page Aug 10, 2012 · 3 revisions

Primary methods


Default: read lines from STDIN (auto-detects newline as \n or \r\n)


input: [0,1,2]                  //Array input 
input: '/path/file.txt'         //Reads lines from a file (auto-detects newline)
input: '/path/to/dir/'          //Reads all files in a directory
input: false                    //Runs the job once
input: true                     //Runs the job indefinitely

To input from a stream

input: function() {
    this.input.apply(this, arguments);

To write your own input function, use

input: function(start, num, callback) {
  • start = the offset. Starts at zero
  • num = the number of rows / lines to return
  • callback = in the format callback([input, ..])
  • When there's no input left, call callback(false)


Default: write lines to STDOUT

Note: output is called periodically rather than at the completion of a job so that very large or continuous IO can be handled

output: '/path/file.txt'    //Outputs lines to a file
output: false               //Ignores output

To output to a stream

output: function(out) {

To write your own output function

output: function(output) {
    //Note: this method always receives an array of output
    output.forEach(function(line) {


Default: passes through input to the next stage of processing

Takes some input to use or transform

run: function(line) {

The following methods are available in run to control flow

this.emit(result)               //Emits a result to the next stage of processing
this.skip()                     //Cancels the thread and discards the input
this.exit(msg)                  //Exits the job with an optional error message
this.retry()                    //Retries the thread
this.add(input)                 //Dynamically add input to the queue


Default: passes through input to the next stage of processing

Called before output(). Use emit() as you would normally

reduce: function(lines) {
    //Note: this method always receives an array
    var emit = [];
    lines.forEach(function(line) {
        if (line.indexOf('foo') > 0) emit.push(line);    


Called if a thread fails. A thread can fail if it time's out, exceeds the maximum number of retries, makes a bad request, spawns a bad command, etc.

fail: function(input, status) {
    console.log(input+' failed with status: '+status);


Called once the job is complete - use it to define any cleanup code, etc.

complete: function(callback) {
    console.log('Job complete.');
    callback(); //Important!

IO Methods

To read or write to a file inside a job, use the following methods. Both methods are synchronous if no callback is provided

this.read(file, [callback]);
this.write(file, data, [callback]); 

Node.io also includes methods for working with separated values (such as CSV)

this.parseValues(line, delim, quote, quote_escape); //Default: delim = ','  quote = '"'  quote_scape = '"'
this.writeValues(values, delim, quote, quote_escape);


input: ['val1,val2,"val3 val4",val5']
run: function (line) {
    var values = this.parseValues(line);
    console.log(values); //['val1','val2','val3 val4','val5']

Scraping methods

To make a request, use the following methods.

this.get(url, [headers], callback, [parse]) headers and parse are optional

Makes a GET request to the URL and returns the result - callback takes err, data, headers

parse is an optional callback used to decode / pre-parse the returned data


this.get('http://www.google.com/', function(err, data, headers) {

this.getHtml(url, [headers], callback, [parse])

The same as above, except callback takes err, $, data, headers


this.getHtml('http://www.google.com/', function(err, $, data, headers) {
    $('a').each('href', function (href) {
        //Print all links on the page

There are also helper methods for setting or adding headers. Call these methods before making a request

this.setHeader(key, value);
this.setUserAgent('Firefox ...');
this.setCookie('foo', 'bar);
this.addCookie('second', cookie');

There are also methods to make post requests. If body is an object, it is encoded using the built-in querystring module

this.post(url, body, [headers], callback, [parse])
this.postHtml(url, body, [headers], callback, [parse])

To make a custom request, use the lower level doRequest method

this.doRequest(method, url, body, [headers], callback, [parse])

Note: nested requests have the cookie and referer headers automatically set.

Executing commands

To execute a command, use the following methods. Callback takes the format of err, stdout, stderr

this.exec(cmd, callback);
this.spawn(cmd, stdin, callback);       //Same as exec, but can write to STDIN


this.exec('pwd', function (err, stdout) {
    console.log(stdout); //Prints the current working directory

Data validation and sanitization

node.io uses node-validator to provide data validation and sanitization methods. Validation methods are available through this.assert while sanitization / filtering methods are available through this.filter.

this.assert('abc').isInt();                         //Fails - job.fail() is called
this.assert('foo@bar.com').len(3,64).isEmail();     //Ok
this.assert('abcdef').is(/^[a-z]+$/);               //Ok
var num = this.filter('00000001').ltrim(0).toInt(); //num = 1
var str = this.filter('&lt;a&gt;').entityDecode();  //str = '<a>'

The full list of validation / sanitization methods is available here.