Skip to content

Commit

Permalink
JobQueue: If job data does not parse, delete it from database
Browse files Browse the repository at this point in the history
  • Loading branch information
scottnonnenberg-signal committed May 5, 2022
1 parent 3f35e8c commit 300cee2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
5 changes: 3 additions & 2 deletions ts/jobs/JobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export abstract class JobQueue<T> {
* takes a single number, `parseData` should throw if `data` is a number and should
* return the number otherwise.
*
* If it throws, the job will be deleted from the store and the job will not be run.
* If it throws, the job will be deleted from the database and the job will not be run.
*
* Will only be called once per job, even if `maxAttempts > 1`.
*/
Expand Down Expand Up @@ -205,9 +205,10 @@ export abstract class JobQueue<T> {
parsedData = this.parseData(storedJob.data);
} catch (err) {
log.error(
`${this.logPrefix} failed to parse data for job ${storedJob.id}`,
`${this.logPrefix} failed to parse data for job ${storedJob.id}, created ${storedJob.timestamp}. Deleting job. Parse error:`,
Errors.toLogFormat(err)
);
await this.store.delete(storedJob.id);
reject(
new Error(
'Failed to parse job data. Was unexpected data loaded from the database?'
Expand Down
5 changes: 3 additions & 2 deletions ts/test-node/jobs/JobQueue_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ describe('JobQueue', () => {
sinon.assert.calledWithMatch(run, { data: 'valid' });
});

it('keeps jobs in the storage if parseData throws', async () => {
it('deletes jobs from storage if parseData throws', async () => {
const store = new TestJobQueueStore();

class TestQueue extends JobQueue<string> {
Expand All @@ -539,9 +539,10 @@ describe('JobQueue', () => {

await (await queue.add('invalid 1')).completion.catch(noop);
await (await queue.add('invalid 2')).completion.catch(noop);
await queue.add('valid');

const datas = store.storedJobs.map(job => job.data);
assert.sameMembers(datas, ['invalid 1', 'invalid 2']);
assert.sameMembers(datas, ['valid']);
});

it('adding the job resolves AFTER inserting the job into the database', async () => {
Expand Down

0 comments on commit 300cee2

Please sign in to comment.