Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency (parallel runs) support #867

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ runner.run(collection, {
script: 5000
},

// Number of concurrent users
concurrency: 1,

// Number of iterations
iterationCount: 1,

Expand Down
49 changes: 34 additions & 15 deletions lib/runner/extensions/control.command.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ module.exports = {

// schedule the pause command as an interrupt and flag that the run is pausing
this.paused = true;
this.interrupt('pause', null, callback);
_.forOwn(this.state.cursors, function (cursor) {
this.interrupt('pause', {
coords: cursor.current()
}, callback);
}.bind(this));
},

/**
Expand All @@ -38,9 +42,9 @@ module.exports = {
// set flag that it is no longer paused and fire the stored callback for the command when it was paused
this.paused = false;
setTimeout(function () {
this.__resume();
_.over(this.__resume).apply(this);
delete this.__resume;
this.triggers.resume(null, this.state.cursor.current());
this.triggers.resume(null);
}.bind(this), 0);

callback && callback();
Expand All @@ -58,21 +62,31 @@ module.exports = {
summarise = true;
}

this.interrupt('abort', {
summarise: summarise
}, callback);
_.forOwn(this.state.cursors, function (cursor) {
this.interrupt('abort', {
summarise: summarise,
coords: cursor.current()
}, callback);
}.bind(this));

_.isFunction(this.__resume) && this.resume();
_.isArray(this.__resume) && _.over(this.__resume).apply(this);
}
},

process: /** @lends Run.commands */ {
pause: function (userback, payload, next) {
// trigger the secondary callbacks
this.triggers.pause(null, this.state.cursor.current());
if (!this.__resume) { this.__resume = []; }

// tuck away the command completion callback in the run object so that it can be used during resume
this.__resume = next;
this.__resume.push(next.bind(this, null));

// wait to trigger the callback until all cursors have been paused
if (this.__resume.length !== _.size(this.state.cursors)) {
return;
}

// trigger the secondary callbacks
this.triggers.pause(null);

// execute the userback sent as part of the command and do so in a try block to ensure it does not hamper
// the process tick
Expand All @@ -92,12 +106,17 @@ module.exports = {
*/
abort: function (userback, payload, next) {
// clear instruction pool and as such there will be nothing next to execute
this.pool.clear();
this.triggers.abort(null, this.state.cursor.current());
this.getPool(payload.coords.ref).clear();

// execute the userback sent as part of the command and do so in a try block to ensure it does not hamper
// the process tick
backpack.ensure(userback, this) && userback();
// wait until all pools have been cleared, then trigger abort
this.__aborted = (this.__aborted || 0) + 1;
if (this.__aborted === _.size(this.pools)) {
this.triggers.abort(null);

// execute the userback sent as part of the command and do so in a try block
// to ensure it does not hamper the process tick
backpack.ensure(userback, this) && userback();
}

next(null);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/runner/extensions/delay.command.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module.exports = {
* @param {Function} next
*/
delay: function (payload, next) {
var cursor = payload.cursor || this.state.cursor.current();
var cursor = payload.cursor;

this.waiting = true; // set flag
// trigger the waiting stae change event
Expand Down
4 changes: 0 additions & 4 deletions lib/runner/extensions/item.command.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
var _ = require('lodash'),
uuid = require('uuid'),
visualizer = require('../../visualizer'),

/**
Expand Down Expand Up @@ -98,9 +97,6 @@ module.exports = {
return next(new Error('runtime: item execution is missing required parameters'));
}

// store a common uuid in the coords
coords.ref = uuid.v4();

// here we code to queue prerequest script, then make a request and then execute test script
this.triggers.beforeItem(null, coords, item);

Expand Down
46 changes: 16 additions & 30 deletions lib/runner/extensions/waterfall.command.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
var _ = require('lodash'),
Cursor = require('../cursor'),
VariableScope = require('postman-collection').VariableScope,

prepareLookupHash,
Expand Down Expand Up @@ -65,21 +64,6 @@ module.exports = {
!_.isArray(state.data) && (state.data = []);
!_.isObject(state.data[0]) && (state.data[0] = {});

// if the location in state is already normalised then go ahead and queue iteration, else normalise the
// location
state.cursor = Cursor.box(state.cursor, { // we pass bounds to ensure there is no stale state
cycles: state.data.length,
length: state.items.length
});
this.waterfall = state.cursor; // copy the location object to instance for quick access

// queue the iteration command on start
this.queue('waterfall', {
coords: this.waterfall.current(),
static: true,
start: true
});

// clear the variable that is supposed to store item name and id lookup hash for easy setNextRequest
this.snrHash = null; // we populate it in the first SNR call

Expand All @@ -99,7 +83,9 @@ module.exports = {
*/
waterfall: function (payload, next) {
// we procure the coordinates that we have to pick item and data from. the data is
var coords = payload.static ? payload.coords : this.waterfall.whatnext(payload.coords),
var coords = payload.static ?
payload.coords :
this.getCursor(payload.coords.ref).whatnext(payload.coords),
item = this.state.items[coords.position],
delay;

Expand Down Expand Up @@ -196,19 +182,19 @@ module.exports = {
nextCoords.position = 0;
seekingToStart = true;
}

this.waterfall.seek(nextCoords.position, nextCoords.iteration, function (err, chngd, coords) {
// this condition should never arise, so better throw error when this happens
if (err) {
throw err;
}

this.queue('waterfall', {
coords: coords,
static: seekingToStart,
stopRunNow: stopRunNow
});
}, this);
this.getCursor(coords.ref)
.seek(nextCoords.position, nextCoords.iteration, function (err, chngd, coords) {
// this condition should never arise, so better throw error when this happens
if (err) {
throw err;
}

this.queue('waterfall', {
coords: coords,
static: seekingToStart,
stopRunNow: stopRunNow
});
}, this);
});
}.bind(this), {
time: delay,
Expand Down
86 changes: 72 additions & 14 deletions lib/runner/run.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var _ = require('lodash'),
async = require('async'),
backpack = require('../backpack'),
Cursor = require('./cursor'),
Instruction = require('./instruction'),

Run; // constructor
Expand All @@ -22,13 +23,15 @@ Run = function PostmanCollectionRun (state, options) { // eslint-disable-line fu
* @type {Object}
* @todo: state also holds the host for now (if any).
*/
state: _.assign({}, state),
state: _.assign({
cursors: {}
}, state),

/**
* @private
* @type {InstructionPool}
* @type {Object}
*/
pool: Instruction.pool(Run.commands),
pools: {},

/**
* @private
Expand Down Expand Up @@ -81,7 +84,8 @@ _.assign(Run.prototype, {
*/
immediate: function (action, payload) {
var scope = this,
instruction = this.pool.create(action, payload, _.slice(arguments, 2));
instruction = this.getPool((payload.coords || payload.context.coords).ref)
.create(action, payload, _.slice(arguments, 2));

// we directly execute this instruction instead od queueing it.
setTimeout(function () {
Expand Down Expand Up @@ -109,7 +113,7 @@ _.assign(Run.prototype, {

if (_.isFinite(_.get(this.options, 'timeout.global'))) {
timeback = backpack.timeback(callback, this.options.timeout.global, this, function () {
this.pool.clear();
_.invokeMap(this.pools, 'clear');
});
}

Expand All @@ -121,8 +125,24 @@ _.assign(Run.prototype, {

// save the normalised callbacks as triggers
this.triggers = callback;
this.triggers.start(null, this.state.cursor.current()); // @todo may throw error if cursor absent
this._process(timeback);

// @todo set concurrency default in newman run options
for (var i = 0, cursor; i < _.get(this.options, 'concurrency', 1); i++) {
// get a cursor for this process
cursor = this.getCursor().current();

// trigger start only once for the entire run
if (i === 0) { this.triggers.start(null, cursor); }

// queue the iteration command on start
this.queue('waterfall', {
coords: cursor,
static: true,
start: true
});

this._process(cursor.ref, timeback);
}
}.bind(this));
},

Expand All @@ -137,6 +157,38 @@ _.assign(Run.prototype, {
return this.state.items[cursor.position];
},

/**
* @private
* @param {String} [pid]
* @return {Cursor}
*/
getCursor: function (pid) {
var cursor;

if (pid) {
// return the existing cursor for a specific process
cursor = this.state.cursors[pid];
}
else {
// create a new cursor for the process
cursor = Cursor.create(this.state.items.length, this.state.data.length);
this.state.cursors[cursor.ref] = cursor;
// create a new pool of instructions associated to the cursor
this.pools[cursor.ref] = Instruction.pool(Run.commands);
}

return cursor;
},

/**
* @private
* @param {String} pid
* @return {InstructionPool}
*/
getPool: function (pid) {
return this.pools[pid];
},

/**
* @private
*
Expand All @@ -146,27 +198,33 @@ _.assign(Run.prototype, {
* @param {Boolean} [immediate]
*/
_schedule: function (action, payload, args, immediate) {
var instruction = this.pool.create(action, payload, args);
var pool = this.getPool((payload.coords || payload.cursor).ref),
instruction = pool.create(action, payload, args);

// based on whether the immediate flag is set, add to the top or bottom of the instruction queue.
(immediate ? this.pool.unshift : this.pool.push)(instruction);
(immediate ? pool.unshift : pool.push)(instruction);

return instruction;
},

_process: function (callback) {
_process: function (pid, callback) {
// extract the command from the queue
var instruction = this.pool.shift();
var instruction = this.getPool(pid).shift(),
cursor = this.getCursor(pid).current();

// if there is nothing to process, exit
// if there is nothing to process, return
if (!instruction) {
callback(null, this.state.cursor.current());
delete this.pools[cursor.ref];
// if no pool has any more instructions, exit
if (!_.size(this.pools)) {
return callback(null, cursor);
}

return;
}

instruction.execute(function (err) {
return err ? callback(err, this.state.cursor.current()) : this._process(callback); // process recursively
return err ? callback(err, cursor) : this._process(pid, callback); // process recursively
}, this);
}
});
Expand Down