From 9fb2e64035ad10584d05707b739d76ce3190b464 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 19 Nov 2025 13:17:44 -0500 Subject: [PATCH 1/2] Adds set attribute type support to DDB sink updateExpression. --- src/sinks/dynamodb.js | 78 ++++++++++++++++++---------- test/unit/sinks/dynamodb.test.js | 89 ++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 28 deletions(-) diff --git a/src/sinks/dynamodb.js b/src/sinks/dynamodb.js index 2e9a5415..7ca71a09 100644 --- a/src/sinks/dynamodb.js +++ b/src/sinks/dynamodb.js @@ -1,5 +1,4 @@ import _ from 'highland'; -import merge from 'lodash/merge'; import Connector from '../connectors/dynamodb'; @@ -8,35 +7,58 @@ import { debug as d } from '../utils/print'; import { ratelimit } from '../utils/ratelimit'; export const updateExpression = (Item) => { - const keys = Object.keys(Item); - - const ExpressionAttributeNames = keys - .filter((attrName) => Item[attrName] !== undefined) - .map((attrName) => ({ [`#${attrName}`]: attrName })) - .reduce(merge, {}); - - const ExpressionAttributeValues = keys - .filter((attrName) => Item[attrName] !== undefined && Item[attrName] !== null) - .map((attrName) => ({ [`:${attrName}`]: Item[attrName] })) - .reduce(merge, {}); - - let UpdateExpression = `SET ${keys - .filter((attrName) => Item[attrName] !== undefined && Item[attrName] !== null) - .map((attrName) => `#${attrName} = :${attrName}`) - .join(', ')}`; - - const UpdateExpressionRemove = keys - .filter((attrName) => Item[attrName] === null) - .map((attrName) => `#${attrName}`) - .join(', '); - - if (UpdateExpressionRemove.length) { - UpdateExpression = `${UpdateExpression} REMOVE ${UpdateExpressionRemove}`; - } + const exprAttributes = Object.entries(Item) + .filter(([key, value]) => value !== undefined) + .reduce((acc, [key, value]) => { + // If this attribute ends with '_delete'...assume we're deleting values from a set. + const isDeleteSet = key.endsWith('_delete'); + const baseKey = isDeleteSet ? key.replace(/_delete$/, '') : key; + acc.ExpressionAttributeNames[`#${baseKey}`] = baseKey; + + if (value === null) { + acc.removeClauses.push(`#${baseKey}`); + return acc; + } + + if (isDeleteSet) { + let setValue = value; + if (!(setValue instanceof Set)) { + setValue = new Set([setValue]); + } + acc.ExpressionAttributeValues[`:${key}`] = setValue; + acc.deleteClauses.push(`#${baseKey} :${key}`); + return acc; + } + + if (value instanceof Set) { + acc.ExpressionAttributeValues[`:${key}`] = value; + acc.addClauses.push(`#${key} :${key}`); + return acc; + } + + acc.ExpressionAttributeValues[`:${key}`] = value; + acc.setClauses.push(`#${key} = :${key}`); + return acc; + }, { + ExpressionAttributeNames: {}, + ExpressionAttributeValues: {}, + setClauses: [], + addClauses: [], + deleteClauses: [], + removeClauses: [], + }); + + // Construct UpdateExpression + const updateExpressionParts = []; + if (exprAttributes.setClauses.length) updateExpressionParts.push(`SET ${exprAttributes.setClauses.join(', ')}`); + if (exprAttributes.removeClauses.length) updateExpressionParts.push(`REMOVE ${exprAttributes.removeClauses.join(', ')}`); + if (exprAttributes.addClauses.length) updateExpressionParts.push(`ADD ${exprAttributes.addClauses.join(', ')}`); + if (exprAttributes.deleteClauses.length) updateExpressionParts.push(`DELETE ${exprAttributes.deleteClauses.join(', ')}`); + const UpdateExpression = updateExpressionParts.join(' '); return { - ExpressionAttributeNames, - ExpressionAttributeValues, + ExpressionAttributeNames: exprAttributes.ExpressionAttributeNames, + ExpressionAttributeValues: exprAttributes.ExpressionAttributeValues, UpdateExpression, ReturnValues: 'ALL_NEW', }; diff --git a/test/unit/sinks/dynamodb.test.js b/test/unit/sinks/dynamodb.test.js index cae3c7f7..d47a3935 100644 --- a/test/unit/sinks/dynamodb.test.js +++ b/test/unit/sinks/dynamodb.test.js @@ -58,6 +58,90 @@ describe('sinks/dynamodb.js', () => { }); }); + it('should calculate updateExpression adding values to a set', () => { + const result = updateExpression({ + tags: new Set(['a', 'b']), + }); + + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags': ['a', 'b'], + }, + UpdateExpression: 'ADD #tags :tags', + ReturnValues: 'ALL_NEW', + }); + }); + + it('should calculate updateExpression removing values from a set', () => { + const result = updateExpression({ + tags_delete: new Set(['x', 'y']), + }); + + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags_delete': ['x', 'y'], + }, + UpdateExpression: 'DELETE #tags :tags_delete', + ReturnValues: 'ALL_NEW', + }); + }); + + it('should wrap calculate updateExpression wrapping a delete set value in a set', () => { + const result = updateExpression({ + tags_delete: 'x', + }); + + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags_delete': ['x'], + }, + UpdateExpression: 'DELETE #tags :tags_delete', + ReturnValues: 'ALL_NEW', + }); + }); + + it('should calculate complex updateExpression using SET, REMOVE, ADD, and DELETE', () => { + const result = updateExpression({ + id: '123', + name: 'Complex Thing', + description: null, + tags: new Set(['blue', 'green']), + categories: new Set(['a', 'b']), + tags_delete: 'red', + categories_delete: new Set(['x', 'y']), + ignoredField: undefined, + }); + + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#id': 'id', + '#name': 'name', + '#description': 'description', + '#tags': 'tags', + '#categories': 'categories', + }, + ExpressionAttributeValues: { + ':id': '123', + ':name': 'Complex Thing', + ':tags': ['blue', 'green'], + ':categories': ['a', 'b'], + ':tags_delete': ['red'], + ':categories_delete': ['x', 'y'], + }, + UpdateExpression: 'SET #id = :id, #name = :name REMOVE #description ADD #tags :tags, #categories :categories DELETE #tags :tags_delete, #categories :categories_delete', + ReturnValues: 'ALL_NEW', + }); + }); + it('should calculate timestampCondition', () => { expect(timestampCondition()).to.deep.equal({ ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp', @@ -134,3 +218,8 @@ describe('sinks/dynamodb.js', () => { .done(done); }); }); + +// Chai doesn't like sets...we can convert them to arrays to help it out. +const normalizeObj = (obj) => + JSON.parse(JSON.stringify(obj, (thisArg, value) => + (value instanceof Set ? [...value] : value))); From 4612cceffa77b31dae5844bdc2a2534d9d06452a Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 19 Nov 2025 13:18:29 -0500 Subject: [PATCH 2/2] Increase package version. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 64a19091..06b49fc0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.15", + "version": "1.1.16", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.15", + "version": "1.1.16", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 57b8ef06..3c66d9a6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.15", + "version": "1.1.16", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws",