Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(1610): Add cleanup method to executor-q #75

Merged
merged 2 commits into from Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 38 additions & 30 deletions index.js
Expand Up @@ -72,7 +72,7 @@ class ExecutorQueue extends Executor {
!!err || (response.statusCode !== 201 && response.statusCode !== 200);
this.requestRetryStrategyPostEvent = (err, response) =>
!!err || (response.statusCode !== 201 && response.statusCode !== 200
&& response.statusCode !== 404); // postEvent can return 404 if no job to start
&& response.statusCode !== 404); // postEvent can return 404 if no job to start
this.fuseBox = new FuseBox();
this.fuseBox.addFuse(this.queueBreaker);
this.fuseBox.addFuse(this.redisBreaker);
Expand All @@ -88,29 +88,33 @@ class ExecutorQueue extends Executor {
};
// Jobs object to register the worker with
const jobs = {
startDelayed: Object.assign({ perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.periodicBuildTable, jobConfig.jobId);

return await this.startPeriodic(
Object.assign(JSON.parse(fullConfig), { triggerBuild: true }));
} catch (err) {
winston.error('err in startDelayed job: ', err);
throw err;
startDelayed: Object.assign({
perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.periodicBuildTable, jobConfig.jobId);

return await this.startPeriodic(
Object.assign(JSON.parse(fullConfig), { triggerBuild: true }));
} catch (err) {
winston.error('err in startDelayed job: ', err);
throw err;
}
}
} }, retryOptions),
startFrozen: Object.assign({ perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.frozenBuildTable, jobConfig.jobId);

return await this.startFrozen(JSON.parse(fullConfig));
} catch (err) {
winston.error('err in startFrozen job: ', err);
throw err;
}, retryOptions),
startFrozen: Object.assign({
perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.frozenBuildTable, jobConfig.jobId);

return await this.startFrozen(JSON.parse(fullConfig));
} catch (err) {
winston.error('err in startFrozen job: ', err);
throw err;
}
}
} }, retryOptions)
}, retryOptions)
};

// eslint-disable-next-line new-cap
Expand Down Expand Up @@ -165,15 +169,19 @@ class ExecutorQueue extends Executor {

this.multiWorker.start();
this.scheduler.connect().then(() => this.scheduler.start());
}

process.on('SIGTERM', () => {
this.multiWorker.end().catch((err) => {
winston.error(`failed to end the worker: ${err}`);
}).then(() => this.scheduler.end()).catch((err) => {
winston.error(`failed to end the scheduler: ${err}`);
process.exit(128);
});
});
/**
* Cleanup any reladed processing
*/
async cleanUp() {
try {
await this.multiWorker.end();
await this.scheduler.end();
await this.queue.end();
} catch (err) {
winston.error(`failed to end executor queue: ${err}`);
}
}

/**
Expand Down
29 changes: 22 additions & 7 deletions test/index.test.js
Expand Up @@ -18,7 +18,8 @@ const partialTestConfig = {
blockedBy
};
const partialTestConfigToString = Object.assign({}, partialTestConfig, {
blockedBy: blockedBy.toString() });
blockedBy: blockedBy.toString()
});
const testAdmin = {
username: 'admin'
};
Expand Down Expand Up @@ -64,7 +65,7 @@ describe('index test', () => {
tokenGen: userTokenGen
};
multiWorker = function () {
this.start = () => {};
this.start = () => { };
this.end = sinon.stub().resolves();
};
scheduler = function () {
Expand All @@ -82,7 +83,8 @@ describe('index test', () => {
delDelayed: sinon.stub().resolves(1),
connection: {
connected: false
}
},
end: sinon.stub().resolves()
};
resqueMock = {
Queue: sinon.stub().returns(queueMock),
Expand Down Expand Up @@ -581,7 +583,8 @@ describe('index test', () => {
it('adds a stop event to the queue if it has no blocked job', () => {
queueMock.del.resolves(0);
const partialTestConfigUndefined = Object.assign({}, partialTestConfig, {
blockedBy: undefined });
blockedBy: undefined
});
const stopConfig = Object.assign({ started: true }, partialTestConfigUndefined);

return executor.stop(partialTestConfigUndefined).then(() => {
Expand All @@ -594,15 +597,27 @@ describe('index test', () => {
it('doesn\'t call connect if there\'s already a connection', () => {
queueMock.connection.connected = true;

return executor.stop(Object.assign({}, partialTestConfig, { annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
} })).then(() => {
return executor.stop(Object.assign({}, partialTestConfig, {
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
}
})).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]);
});
});
});

describe('cleanUp', () => {
it('worker.end() is called', () => {
executor.cleanUp().then(() => {
assert.called(spyMultiWorker);
assert.called(spyScheduler);
assert.called(queueMock.end);
});
});
});

describe('stats', () => {
it('returns the correct stats', () => {
assert.deepEqual(executor.stats(), {
Expand Down