diff --git a/package-lock.json b/package-lock.json index c02caf6..1a10acb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index d5dda12..3e334c6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/flavors/update.js b/src/flavors/update.js index 309039c..3464d50 100644 --- a/src/flavors/update.js +++ b/src/flavors/update.js @@ -2,6 +2,7 @@ import { printStartPipeline, printEndPipeline, faulty, faultyAsyncStream, splitObject, compact, + faultify, } from '../utils'; import { @@ -63,8 +64,7 @@ const toGetRequest = (rule) => faulty((uow) => ({ : undefined, })); -const toUpdateRequest = (rule) => faultyAsyncStream((uow) => Promise.resolve(rule.toUpdateRequest(uow, rule)) - .then((updateRequest) => ({ - ...uow, - updateRequest, - }))); +const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ + ...uow, + updateRequest: await faultify(rule.toUpdateRequest)(uow, rule), +})); diff --git a/test/unit/flavors/update.test.js b/test/unit/flavors/update.test.js index b1ba689..b88c31b 100644 --- a/test/unit/flavors/update.test.js +++ b/test/unit/flavors/update.test.js @@ -213,6 +213,80 @@ describe('flavors/update.js', () => { }) .done(done); }); + + it('should fault on error', (done) => { + sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]); + sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({ + Responses: { + undefined: [{ + pk: '2', + sk: 'thing', + discriminator: 'thing', + name: 'thing2', + }], + }, + UnprocessedKeys: {}, + }); + const ebStub = sinon.stub(EventBridgeConnector.prototype, 'putEvents').resolves({ FailedEntryCount: 0 }); + + sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE); + + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'Thing One', + description: 'This is thing one', + otherThing: 'thing|2', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, + ]); + + const errorRule = { + id: 'error-rule', + flavor: update, + eventType: /thing-*/, + filters: [() => true], + toGetRequest, + fks: ['otherThing'], + toUpdateRequest: (uow) => { throw new Error('intentional fault'); }, + }; + + initialize({ + ...initializeFrom([errorRule]), + }, { ...defaultOptions, AES: false }) + .assemble(fromDynamodb(events), true) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.eq(1); + expect(collected[0].event.err.message).to.eq('intentional fault'); + expect(ebStub).to.have.been.calledOnceWith({ + Entries: [{ + EventBusName: 'undefined', + Source: 'custom', + DetailType: 'fault', + Detail: JSON.stringify(collected[0].event), + }], + }, { + batch: [{ + event: collected[0].event, + publishRequestEntry: collected[0].publishRequestEntry, + }], + publishRequest: collected[0].publishRequest, + }); + }) + .done(done); + }); }); const toUpdateRequest = (uow) => ({