diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js index 188bdf92ed651d..57698e81ff4370 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js @@ -7,7 +7,7 @@ import expect from 'expect.js'; import sinon from 'sinon'; import moment from 'moment'; -import { noop, random, get, find } from 'lodash'; +import { noop, random, get, find, identity } from 'lodash'; import { ClientMock } from './fixtures/elasticsearch'; import { QueueMock } from './fixtures/queue'; import { Worker } from '../worker'; @@ -494,25 +494,57 @@ describe('Worker class', function () { expect(msg).to.equal(false); }); - it('should return true on version errors', function () { + it('should reject the promise on version errors', function () { mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 })); return worker._claimJob(job) - .then((res) => expect(res).to.equal(true)); + .catch(err => { + expect(err).to.eql({ statusCode: 409 }); + }); }); - it('should return false on other errors', function () { + it('should reject the promise on other errors', function () { mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); return worker._claimJob(job) - .then((res) => expect(res).to.equal(false)); + .catch(err => { + expect(err).to.eql({ statusCode: 401 }); + }); }); + }); + + describe('find a pending job to claim', function () { + const getMockJobs = (status = 'pending') => ([{ + _index: 'myIndex', + _type: 'test', + _id: 12345, + _version: 3, + found: true, + _source: { + jobtype: 'jobtype', + created_by: false, + payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' }, + priority: 10, + timeout: 10000, + created_at: '2016-04-25T21:13:04.738Z', + attempts: 0, + max_attempts: 3, + status + }, + }]); - it('should emit on other errors', function (done) { + beforeEach(function () { + worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); + }); + + afterEach(() => { mockQueue.client.update.restore(); + }); + + it('should emit for errors from claiming job', function (done) { sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); - worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { + worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { try { expect(err).to.have.property('error'); expect(err).to.have.property('job'); @@ -523,7 +555,30 @@ describe('Worker class', function () { done(e); } }); - worker._claimJob(job); + + worker._claimPendingJobs(getMockJobs()); + }); + + it('should reject the promise if an error claiming the job', function () { + sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 })); + return worker._claimPendingJobs(getMockJobs()) + .catch(err => { + expect(err).to.eql({ statusCode: 409 }); + }); + }); + + it('should get the pending job', function () { + sinon.stub(mockQueue.client, 'update').returns(Promise.resolve({ test: 'cool' })); + sinon.stub(worker, '_performJob').callsFake(identity); + return worker._claimPendingJobs(getMockJobs()) + .then(claimedJob => { + expect(claimedJob._index).to.be('myIndex'); + expect(claimedJob._type).to.be('test'); + expect(claimedJob._source.jobtype).to.be('jobtype'); + expect(claimedJob._source.status).to.be('processing'); + expect(claimedJob.test).to.be('cool'); + worker._performJob.restore(); + }); }); }); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 20d487c9128c75..02f9c054c2e3bd 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -22,6 +22,22 @@ function formatJobObject(job) { }; } +function getLogger(opts, id, logLevel) { + return (msg, err) => { + const logger = opts.logger || function () {}; + + const message = `${id} - ${msg}`; + const tags = ['worker', logLevel]; + + if (err) { + logger(`${message}: ${err.stack ? err.stack : err }`, tags); + return; + } + + logger(message, tags); + }; +} + export class Worker extends events.EventEmitter { constructor(queue, type, workerFn, opts) { if (typeof type !== 'string') throw new Error('type must be a string'); @@ -40,19 +56,8 @@ export class Worker extends events.EventEmitter { this.checkSize = opts.size || 10; this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE; - this.debug = (msg, err) => { - const logger = opts.logger || function () {}; - - const message = `${this.id} - ${msg}`; - const tags = ['worker', 'debug']; - - if (err) { - logger(`${message}: ${err.stack ? err.stack : err }`, tags); - return; - } - - logger(message, tags); - }; + this.debug = getLogger(opts, this.id, 'debug'); + this.warn = getLogger(opts, this.id, 'warn'); this._running = true; this.debug(`Created worker for job type ${this.jobtype}`); @@ -134,17 +139,11 @@ export class Worker extends events.EventEmitter { ...doc }; return updatedJob; - }) - .catch((err) => { - if (err.statusCode === 409) return true; - this.debug(`_claimJob failed on job ${job._id}`, err); - this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); - return false; }); } _failJob(job, output = false) { - this.debug(`Failing job ${job._id}`); + this.warn(`Failing job ${job._id}`); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); @@ -170,7 +169,7 @@ export class Worker extends events.EventEmitter { .then(() => true) .catch((err) => { if (err.statusCode === 409) return true; - this.debug(`_failJob failed to update job ${job._id}`, err); + this.warn(`_failJob failed to update job ${job._id}`, err); this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job)); return false; }); @@ -215,7 +214,7 @@ export class Worker extends events.EventEmitter { if (isResolved) return; cancellationToken.cancel(); - this.debug(`Timeout processing job ${job._id}`); + this.warn(`Timeout processing job ${job._id}`); reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, { timeout: job._source.timeout, jobId: job._id, @@ -253,7 +252,7 @@ export class Worker extends events.EventEmitter { }) .catch((err) => { if (err.statusCode === 409) return false; - this.debug(`Failure saving job output ${job._id}`, err); + this.warn(`Failure saving job output ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job)); }); }, (jobErr) => { @@ -265,7 +264,7 @@ export class Worker extends events.EventEmitter { // job execution failed if (jobErr.name === 'WorkerTimeoutError') { - this.debug(`Timeout on job ${job._id}`); + this.warn(`Timeout on job ${job._id}`); this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job)); return; @@ -278,7 +277,7 @@ export class Worker extends events.EventEmitter { } } - this.debug(`Failure occurred on job ${job._id}`, jobErr); + this.warn(`Failure occurred on job ${job._id}`, jobErr); this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job)); return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false); }); @@ -316,23 +315,30 @@ export class Worker extends events.EventEmitter { return this._claimJob(job) .then((claimResult) => { - if (claimResult !== false) { - claimed = true; - return claimResult; + claimed = true; + return claimResult; + }) + .catch((err) => { + if (err.statusCode === 409) { + this.warn(`_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, err); + return; // continue reducing and looking for a different job to claim } + this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); + return Promise.reject(err); }); }); }, Promise.resolve()) .then((claimedJob) => { if (!claimedJob) { - this.debug(`All ${jobs.length} jobs already claimed`); + this.debug(`Found no claimable jobs out of ${jobs.length} total`); return; } this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) .catch((err) => { - this.debug('Error claiming jobs', err); + this.warn('Error claiming jobs', err); + return Promise.reject(err); }); } @@ -384,7 +390,7 @@ export class Worker extends events.EventEmitter { // ignore missing indices errors if (err && err.status === 404) return []; - this.debug('job querying failed', err); + this.warn('job querying failed', err); this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err)); throw err; });