From 3cf951c1ce3a1d98fef690980c37943a48dd14da Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Thu, 11 Sep 2025 11:28:35 -0400 Subject: [PATCH 1/2] Handle faults in toUpdateRequest in update flavor if defined function not async. --- src/flavors/update.js | 10 ++--- test/unit/flavors/update.test.js | 74 ++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/flavors/update.js b/src/flavors/update.js index 309039cc..3464d506 100644 --- a/src/flavors/update.js +++ b/src/flavors/update.js @@ -2,6 +2,7 @@ import { printStartPipeline, printEndPipeline, faulty, faultyAsyncStream, splitObject, compact, + faultify, } from '../utils'; import { @@ -63,8 +64,7 @@ const toGetRequest = (rule) => faulty((uow) => ({ : undefined, })); -const toUpdateRequest = (rule) => faultyAsyncStream((uow) => Promise.resolve(rule.toUpdateRequest(uow, rule)) - .then((updateRequest) => ({ - ...uow, - updateRequest, - }))); +const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ + ...uow, + updateRequest: await faultify(rule.toUpdateRequest)(uow, rule), +})); diff --git a/test/unit/flavors/update.test.js b/test/unit/flavors/update.test.js index b1ba689c..b88c31b8 100644 --- a/test/unit/flavors/update.test.js +++ b/test/unit/flavors/update.test.js @@ -213,6 +213,80 @@ describe('flavors/update.js', () => { }) .done(done); }); + + it('should fault on error', (done) => { + sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]); + sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({ + Responses: { + undefined: [{ + pk: '2', + sk: 'thing', + discriminator: 'thing', + name: 'thing2', + }], + }, + UnprocessedKeys: {}, + }); + const ebStub = sinon.stub(EventBridgeConnector.prototype, 'putEvents').resolves({ FailedEntryCount: 0 }); + + sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE); + + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'Thing One', + description: 'This is thing one', + otherThing: 'thing|2', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, + ]); + + const errorRule = { + id: 'error-rule', + flavor: update, + eventType: /thing-*/, + filters: [() => true], + toGetRequest, + fks: ['otherThing'], + toUpdateRequest: (uow) => { throw new Error('intentional fault'); }, + }; + + initialize({ + ...initializeFrom([errorRule]), + }, { ...defaultOptions, AES: false }) + .assemble(fromDynamodb(events), true) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.eq(1); + expect(collected[0].event.err.message).to.eq('intentional fault'); + expect(ebStub).to.have.been.calledOnceWith({ + Entries: [{ + EventBusName: 'undefined', + Source: 'custom', + DetailType: 'fault', + Detail: JSON.stringify(collected[0].event), + }], + }, { + batch: [{ + event: collected[0].event, + publishRequestEntry: collected[0].publishRequestEntry, + }], + publishRequest: collected[0].publishRequest, + }); + }) + .done(done); + }); }); const toUpdateRequest = (uow) => ({ From 68cb66f213ec74ee2adaaedc3eb190e720ab308b Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Thu, 11 Sep 2025 11:29:04 -0400 Subject: [PATCH 2/2] Bump version. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index c02caf67..1a10acb0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index d5dda124..3e334c6f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.10", + "version": "1.1.11", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws",