Skip to content

Commit

Permalink
run project on node in two steps - reserve, run
Browse files Browse the repository at this point in the history
  • Loading branch information
okv committed Dec 7, 2016
1 parent ef16c5c commit 7e06b1b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 23 deletions.
28 changes: 22 additions & 6 deletions lib/distributor.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@ Distributor.prototype._getNodeForScmChangesCheck = function(project, params) {
};

Distributor.prototype._runNext = function(callback) {
var self = this;
var self = this,
node,
executorId;

Steppy(
function() {
// update wait reasons for all queue items before run
self._updateWaitReasons();

var node;
var queueItemIndex = _(self.queue).findIndex(function(item) {
node = _(self.nodes).find(function(node) {
return !self._getExecutorWaitReason(
Expand Down Expand Up @@ -170,15 +171,23 @@ Distributor.prototype._runNext = function(callback) {
this.slot()
);

executorId = node.reserveExecutor(
build.project,
{env: build.env}
);
},
function(err, build) {
this.pass(build);

var stepCallback = this.slot();
// run project on the first step two prevent parallel run next calls
var executor = node.run(build.project, {env: build.env}, function(err) {

var executor = node.runExecutor(executorId, function(err) {
var changes = {
endDate: Date.now(),
completed: true
};

// executor may not exist on error
// executor may not exist on error
if (executor && executor.canceled) {
changes.status = 'canceled';
changes.canceledBy = self.inprogressBuildsHash[build.id].canceledBy;
Expand Down Expand Up @@ -245,7 +254,14 @@ Distributor.prototype._runNext = function(callback) {
// update wait reasons for all queue items after run
self._updateWaitReasons();
},
callback
function(err, build) {
// ensure release executor (e.g. on error) by releasing it manually
if (node && executorId) {
node.releaseExecutor(executorId);
}

callback(err, build);
}
);
};

Expand Down
78 changes: 61 additions & 17 deletions lib/node/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ function Node(params) {
throw new Error('Unknown usage strategy: ' + this.usageStrategy);
}

this.reservedExecutorsSequence = 1;
this.reservedExecutorsHash = {};

this.executors = [];
}

Expand Down Expand Up @@ -113,40 +116,81 @@ Node.prototype.hasScmChanges = function(project, params, callback) {
);
};

Node.prototype.run = function(project, params, callback) {
var self = this;

var waitReason = self.getExecutorWaitReason(project, params);
Node.prototype.reserveExecutor = function(project, params) {
var waitReason = this.getExecutorWaitReason(project, params);
if (waitReason) {
return callback(new Error(
throw new Error(
'Project "' + project.name + '" should wait because: ' + waitReason
));
);
}

var executor, createExecutorError;
var executor = this._createExecutor(
_({project: project}).defaults(params)
);
this.executors.push(executor);

try {
executor = self._createExecutor(_({project: project}).defaults(params));
self.executors.push(executor);
} catch(err) {
createExecutorError = err;
}
var executorId = this.reservedExecutorsSequence++;
this.reservedExecutorsHash[executorId] = executor;

return executorId;
};

Node.prototype._releaseExecutor = function(executor) {
var executorIndex = _(this.executors).findIndex(executor);
this.executors.splice(executorIndex, 1);
};

Node.prototype.releaseExecutor = function(executorId) {
var executor = this.reservedExecutorsHash[executorId];

if (createExecutorError) {
callback(createExecutorError);
if (!executor) {
return null;
}

delete this.reservedExecutorsHash[executorId];

this._releaseExecutor(executor);
};

Node.prototype.runExecutor = function(executorId, callback) {
var self = this,
executor = self.reservedExecutorsHash[executorId];

if (!executor) {
return callback(new Error(
'Can`t fine reserved executor with id: ' + executorId
));
}

// drop from hash early to prevent possible duplicate run
delete self.reservedExecutorsHash[executorId];

// run executor on next tick to return it asap, needed to been able
// to listen all executor run events
process.nextTick(function() {
executor.run(function(err) {
var executorIndex = _(self.executors).findIndex(executor);
self.executors.splice(executorIndex, 1);
self._releaseExecutor(executor);

callback(err);
});
});

return executor;
};

Node.prototype.run = function(project, params, callback) {

var executorId, reserveExecutorError;

try {
executorId = this.reserveExecutor(project, params);
} catch(err) {
reserveExecutorError = err;
}

if (reserveExecutorError) {
return callback(reserveExecutorError);
}

return this.runExecutor(executorId, callback);
};

0 comments on commit 7e06b1b

Please sign in to comment.