Skip to content

Commit

Permalink
let _claimPendingJobs have a valid job when updating leads to version…
Browse files Browse the repository at this point in the history
… conflict (elastic#21980)

* let _claimPendingJobs have a valid job when updating leads to version conflict

* change _claimJob to reject the promise instead of resolve to true/false

* add _claimPendingJobs tests

* fix tests
  • Loading branch information
tsullivan committed Aug 24, 2018
1 parent 3fc85f7 commit 7339c66
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 39 deletions.
71 changes: 63 additions & 8 deletions x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import expect from 'expect.js';
import sinon from 'sinon';
import moment from 'moment';
import { noop, random, get, find } from 'lodash';
import { noop, random, get, find, identity } from 'lodash';
import { ClientMock } from './fixtures/elasticsearch';
import { QueueMock } from './fixtures/queue';
import { Worker } from '../worker';
Expand Down Expand Up @@ -494,25 +494,57 @@ describe('Worker class', function () {
expect(msg).to.equal(false);
});

it('should return true on version errors', function () {
it('should reject the promise on version errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._claimJob(job)
.then((res) => expect(res).to.equal(true));
.catch(err => {
expect(err).to.eql({ statusCode: 409 });
});
});

it('should return false on other errors', function () {
it('should reject the promise on other errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
return worker._claimJob(job)
.then((res) => expect(res).to.equal(false));
.catch(err => {
expect(err).to.eql({ statusCode: 401 });
});
});
});

describe('find a pending job to claim', function () {
const getMockJobs = (status = 'pending') => ([{
_index: 'myIndex',
_type: 'test',
_id: 12345,
_version: 3,
found: true,
_source: {
jobtype: 'jobtype',
created_by: false,
payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' },
priority: 10,
timeout: 10000,
created_at: '2016-04-25T21:13:04.738Z',
attempts: 0,
max_attempts: 3,
status
},
}]);

it('should emit on other errors', function (done) {
beforeEach(function () {
worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions);
});

afterEach(() => {
mockQueue.client.update.restore();
});

it('should emit for errors from claiming job', function (done) {
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));

worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
Expand All @@ -523,7 +555,30 @@ describe('Worker class', function () {
done(e);
}
});
worker._claimJob(job);

worker._claimPendingJobs(getMockJobs());
});

it('should reject the promise if an error claiming the job', function () {
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._claimPendingJobs(getMockJobs())
.catch(err => {
expect(err).to.eql({ statusCode: 409 });
});
});

it('should get the pending job', function () {
sinon.stub(mockQueue.client, 'update').returns(Promise.resolve({ test: 'cool' }));
sinon.stub(worker, '_performJob').callsFake(identity);
return worker._claimPendingJobs(getMockJobs())
.then(claimedJob => {
expect(claimedJob._index).to.be('myIndex');
expect(claimedJob._type).to.be('test');
expect(claimedJob._source.jobtype).to.be('jobtype');
expect(claimedJob._source.status).to.be('processing');
expect(claimedJob.test).to.be('cool');
worker._performJob.restore();
});
});
});

Expand Down
68 changes: 37 additions & 31 deletions x-pack/plugins/reporting/server/lib/esqueue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ function formatJobObject(job) {
};
}

function getLogger(opts, id, logLevel) {
return (msg, err) => {
const logger = opts.logger || function () {};

const message = `${id} - ${msg}`;
const tags = ['worker', logLevel];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
}

export class Worker extends events.EventEmitter {
constructor(queue, type, workerFn, opts) {
if (typeof type !== 'string') throw new Error('type must be a string');
Expand All @@ -40,19 +56,8 @@ export class Worker extends events.EventEmitter {
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;

this.debug = (msg, err) => {
const logger = opts.logger || function () {};

const message = `${this.id} - ${msg}`;
const tags = ['worker', 'debug'];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
this.debug = getLogger(opts, this.id, 'debug');
this.warn = getLogger(opts, this.id, 'warn');

this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
Expand Down Expand Up @@ -134,17 +139,11 @@ export class Worker extends events.EventEmitter {
...doc
};
return updatedJob;
})
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_claimJob failed on job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return false;
});
}

_failJob(job, output = false) {
this.debug(`Failing job ${job._id}`);
this.warn(`Failing job ${job._id}`);

const completedTime = moment().toISOString();
const docOutput = this._formatOutput(output);
Expand All @@ -170,7 +169,7 @@ export class Worker extends events.EventEmitter {
.then(() => true)
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_failJob failed to update job ${job._id}`, err);
this.warn(`_failJob failed to update job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
return false;
});
Expand Down Expand Up @@ -215,7 +214,7 @@ export class Worker extends events.EventEmitter {
if (isResolved) return;

cancellationToken.cancel();
this.debug(`Timeout processing job ${job._id}`);
this.warn(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
Expand Down Expand Up @@ -253,7 +252,7 @@ export class Worker extends events.EventEmitter {
})
.catch((err) => {
if (err.statusCode === 409) return false;
this.debug(`Failure saving job output ${job._id}`, err);
this.warn(`Failure saving job output ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
});
}, (jobErr) => {
Expand All @@ -265,7 +264,7 @@ export class Worker extends events.EventEmitter {

// job execution failed
if (jobErr.name === 'WorkerTimeoutError') {
this.debug(`Timeout on job ${job._id}`);
this.warn(`Timeout on job ${job._id}`);
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
return;

Expand All @@ -278,7 +277,7 @@ export class Worker extends events.EventEmitter {
}
}

this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.warn(`Failure occurred on job ${job._id}`, jobErr);
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
});
Expand Down Expand Up @@ -316,23 +315,30 @@ export class Worker extends events.EventEmitter {

return this._claimJob(job)
.then((claimResult) => {
if (claimResult !== false) {
claimed = true;
return claimResult;
claimed = true;
return claimResult;
})
.catch((err) => {
if (err.statusCode === 409) {
this.warn(`_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, err);
return; // continue reducing and looking for a different job to claim
}
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return Promise.reject(err);
});
});
}, Promise.resolve())
.then((claimedJob) => {
if (!claimedJob) {
this.debug(`All ${jobs.length} jobs already claimed`);
this.debug(`Found no claimable jobs out of ${jobs.length} total`);
return;
}
this.debug(`Claimed job ${claimedJob._id}`);
return this._performJob(claimedJob);
})
.catch((err) => {
this.debug('Error claiming jobs', err);
this.warn('Error claiming jobs', err);
return Promise.reject(err);
});
}

Expand Down Expand Up @@ -384,7 +390,7 @@ export class Worker extends events.EventEmitter {
// ignore missing indices errors
if (err && err.status === 404) return [];

this.debug('job querying failed', err);
this.warn('job querying failed', err);
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
throw err;
});
Expand Down

0 comments on commit 7339c66

Please sign in to comment.