diff --git a/src/dynamo/batchget/batch-get-utils.ts b/src/dynamo/batchget/batch-get-utils.ts index 5a6fc2fca..bfa22d336 100644 --- a/src/dynamo/batchget/batch-get-utils.ts +++ b/src/dynamo/batchget/batch-get-utils.ts @@ -29,9 +29,11 @@ export function batchGetItemsFetchAll( ) } -export type ResponseWithUnprocessedKeys = DynamoDB.BatchGetItemOutput & { UnprocessedKeys: BatchGetRequestMap } +export type BatchGetItemOutputWithUnprocessedKeys = + DynamoDB.BatchGetItemOutput + & { UnprocessedKeys: BatchGetRequestMap } -export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is ResponseWithUnprocessedKeys { +export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is BatchGetItemOutputWithUnprocessedKeys { if (!response.UnprocessedKeys) { return false } diff --git a/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.spec.ts b/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.spec.ts index 3f4eecc7c..ee3e9d43b 100644 --- a/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.spec.ts +++ b/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.spec.ts @@ -1,3 +1,5 @@ +// tslint:disable:no-unnecessary-class + import * as DynamoDB from 'aws-sdk/clients/dynamodb' import { of } from 'rxjs' import { Organization } from '../../../../test/models' @@ -7,38 +9,53 @@ import { BatchWriteSingleTableRequest } from './batch-write-single-table.request describe('batch write single table request', () => { const tableName = getTableName(Organization) + const item: Organization = { + id: 'myId', + createdAtDate: new Date(), + name: 'myOrg', + } - let item: Organization let dynamoRx: DynamoRx let request: BatchWriteSingleTableRequest - let nextSpyFn: () => { value: number } - const generatorMock = () => { next: nextSpyFn } + describe('constructor', () => { + it('should throw when no class was given', () => { + expect(() => new BatchWriteSingleTableRequest(null, null)).toThrow() + }) + it('should throw when class given is not @Model decorated', () => { + class NoModel {} + expect(() => new BatchWriteSingleTableRequest(null, NoModel)).toThrow() + }) - beforeEach(() => { - item = { - id: 'myId', - createdAtDate: new Date(), - name: 'myOrg', - } - nextSpyFn = jest.fn().mockImplementation(() => ({ value: 0 })) + it('should initialize params', () => { + request = new BatchWriteSingleTableRequest(null, Organization) + expect(request.params).toEqual({ + RequestItems: { + [tableName]: [], + }, + }) + }) }) describe('correct params', () => { beforeEach(() => { - dynamoRx = new DynamoRx() request = new BatchWriteSingleTableRequest(dynamoRx, Organization) + }) + + it('returnConsumedCapacity', () => { + request.returnConsumedCapacity('TOTAL') + expect(request.params.ReturnConsumedCapacity).toBe('TOTAL') + }) - const output: DynamoDB.BatchWriteItemOutput = {} - spyOn(dynamoRx, 'batchWriteItem').and.returnValue(of(output)) + it('returnItemCollectionMetrics', () => { + request.returnItemCollectionMetrics('SIZE') + expect(request.params.ReturnItemCollectionMetrics).toBe('SIZE') }) - it('delete with complex primary key', async () => { + it('delete with composite key', () => { request.delete([item]) - await request.exec(generatorMock).toPromise() - expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1) - expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({ + expect(request.params).toEqual({ RequestItems: { [tableName]: [ { @@ -52,15 +69,12 @@ describe('batch write single table request', () => { ], }, }) - expect(nextSpyFn).toHaveBeenCalledTimes(0) }) it('put object', async () => { request.put([item]) - await request.exec(generatorMock).toPromise() - expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1) - expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({ + expect(request.params).toEqual({ RequestItems: { [tableName]: [ { @@ -75,50 +89,103 @@ describe('batch write single table request', () => { ], }, }) - expect(nextSpyFn).toHaveBeenCalledTimes(0) }) - it('delete >25 items in two requests', async () => { - const twentyFiveItems = [] - for (let i = 0; i < 25; i++) { - twentyFiveItems.push(item) - } - request.delete(twentyFiveItems) + it('adding >25 items in first delete call throws', () => { + const twentyFiveItems = new Array(30).map(() => item) + expect(() => request.delete(twentyFiveItems)).toThrow() + }) + + it('adding >25 items in second delete call throws', () => { + const twentyFiveItems = new Array(25).map(() => item) request.delete(twentyFiveItems) - await request.exec(generatorMock).toPromise() - expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2) - expect(nextSpyFn).toHaveBeenCalledTimes(0) + expect(() => request.delete(twentyFiveItems)).toThrow() + }) + + it('adding >25 items in first put call throws', () => { + const twentyFiveItems = new Array(30).map(() => item) + expect(() => request.put(twentyFiveItems)).toThrow() + }) + + it('adding >25 items in second put call throws', () => { + const twentyFiveItems = new Array(25).map(() => item) + request.put(twentyFiveItems) + expect(() => request.put(twentyFiveItems)).toThrow() }) }) - describe('correct backoff', () => { + describe('Unprocessed items', () => { + const output: DynamoDB.BatchWriteItemOutput = { + UnprocessedItems: { + [tableName]: [ + { + PutRequest: { + Item: { + id: { S: 'myId' }, + createdAtDate: { S: item.createdAtDate.toISOString() }, + name: { S: 'myOrg' }, + }, + }, + }, + ], + }, + } + + let generatorSpy: jasmine.Spy + let nextFnSpy: jasmine.Spy + let batchWriteItemSpy: jasmine.Spy + beforeEach(() => { - dynamoRx = new DynamoRx() + batchWriteItemSpy = jasmine.createSpy().and.returnValues(of(output), of(output), of({ MyResult: true })) + nextFnSpy = jasmine.createSpy().and.returnValue({ value: 0 }) + dynamoRx = ({ batchWriteItem: batchWriteItemSpy }) + generatorSpy = jasmine.createSpy().and.returnValue({ next: nextFnSpy }) + request = new BatchWriteSingleTableRequest(dynamoRx, Organization) + }) - const output: DynamoDB.BatchWriteItemOutput = { - UnprocessedItems: { - [tableName]: [ - { - PutRequest: { - Item: { - id: { S: 'myId' }, - createdAtDate: { S: item.createdAtDate.toISOString() }, - name: { S: 'myOrg' }, - }, - }, - }, - ], - }, - } - spyOn(dynamoRx, 'batchWriteItem').and.returnValues(of(output), of({})) + it('should retry when unprocessed items are returned', async () => { + request.put([item]) + await request.exec(generatorSpy).toPromise() + + // only one instance of the generator should be created + expect(generatorSpy).toHaveBeenCalledTimes(1) + + expect(nextFnSpy).toHaveBeenCalledTimes(2) + + expect(batchWriteItemSpy).toHaveBeenCalledTimes(3) }) - it('should retry when capacity is exceeded', async () => { + it('should keep other params in additional calls', async () => { request.put([item]) - await request.exec(generatorMock).toPromise() - expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2) - expect(nextSpyFn).toHaveBeenCalledTimes(1) + request.returnConsumedCapacity('TOTAL') + request.returnItemCollectionMetrics('SIZE') + await request.exec(generatorSpy).toPromise() + + expect(batchWriteItemSpy).toHaveBeenCalledTimes(3) + const paramsThirdCall = batchWriteItemSpy.calls.all()[2].args[0] + + expect(paramsThirdCall).toBeDefined() + expect(paramsThirdCall.ReturnConsumedCapacity).toBe('TOTAL') + expect(paramsThirdCall.ReturnItemCollectionMetrics).toBe('SIZE') + }) + }) + + describe('exec / execFullResponse', () => { + beforeEach(() => { + dynamoRx = ({ batchWriteItem: () => of({ myResponse: true }) }) + request = new BatchWriteSingleTableRequest(dynamoRx, Organization) + request.delete([item]) + }) + + it('exec should return nothing', async () => { + const response = await request.exec().toPromise() + expect(response).toBeUndefined() + }) + + it('execFullResponse should return BatchWriteItemOutput', async () => { + const response = await request.execFullResponse().toPromise() + expect(response).toEqual({ myResponse: true }) }) }) }) diff --git a/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.ts b/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.ts index 57a7d3fe7..316870797 100644 --- a/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.ts +++ b/src/dynamo/request/batchwritesingletable/batch-write-single-table.request.ts @@ -1,16 +1,17 @@ -import { BatchWriteItemInput, BatchWriteItemOutput, WriteRequest, WriteRequests } from 'aws-sdk/clients/dynamodb' -import { Observable, of } from 'rxjs' -import { delay, map, mergeMap, tap } from 'rxjs/operators' -import { PutRequest } from '../../../../node_modules/aws-sdk/clients/dynamodb' +import * as DynamoDB from 'aws-sdk/clients/dynamodb' +import { Observable } from 'rxjs' +import { map } from 'rxjs/operators' +import { Metadata, metadataForClass } from '../../../decorator/metadata' import { randomExponentialBackoffTimer } from '../../../helper' import { createLogger, Logger } from '../../../logger/logger' import { Attributes, createToKeyFn, toDb } from '../../../mapper' import { ModelConstructor } from '../../../model' import { DynamoRx } from '../../dynamo-rx' import { getTableName } from '../../get-table-name.function' -import { BatchWriteSingleTableResponse } from './batch-write-single-table.response' +import { batchWriteItemsWriteAll } from './batch-write-utils' const MAX_BATCH_WRITE_ITEMS = 25 +const DEFAULT_TIME_SLOT = 1000 export class BatchWriteSingleTableRequest { @@ -23,8 +24,10 @@ export class BatchWriteSingleTableRequest { readonly dynamoRx: DynamoRx readonly modelClazz: ModelConstructor + readonly metadata:Metadata readonly tableName: string - readonly itemsToProcess: WriteRequests + readonly params: DynamoDB.BatchWriteItemInput + private readonly logger: Logger private keyFn: any @@ -37,81 +40,67 @@ export class BatchWriteSingleTableRequest { throw new Error("please provide the model clazz for the request, won't work otherwise") } this.modelClazz = modelClazz - this.tableName = getTableName(modelClazz) - this.itemsToProcess = [] + this.metadata = metadataForClass(this.modelClazz) + if (!this.metadata.modelOptions) { + throw new Error('given ModelConstructor has no @Model decorator') + } + + this.tableName = getTableName(this.metadata) + + this.params = { + RequestItems: { + [this.tableName]: [], + }, + } + } + + returnConsumedCapacity(value: DynamoDB.ReturnConsumedCapacity) { + this.params.ReturnConsumedCapacity = value + } + + returnItemCollectionMetrics(value: DynamoDB.ReturnItemCollectionMetrics) { + this.params.ReturnItemCollectionMetrics = value } delete(items: T[]): BatchWriteSingleTableRequest { - this.itemsToProcess.push(...items.map(item => ({ DeleteRequest: { Key: this.toKey(item) } }))) - this.logger.debug(`${items.length} items added for DeleteRequest`) + if (this.params.RequestItems[this.tableName].length + items.length > MAX_BATCH_WRITE_ITEMS) { + throw new Error(`batch write takes at max ${MAX_BATCH_WRITE_ITEMS} items`) + } + this.params.RequestItems[this.tableName].push(...items.map(this.createDeleteRequest)) return this } put(items: T[]): BatchWriteSingleTableRequest { - this.itemsToProcess.push( - ...items.map(item => ({ PutRequest: { Item: toDb(item, this.modelClazz) } })), - ) - this.logger.debug(`${items.length} items added for PutRequest`) + if (this.params.RequestItems[this.tableName].length + items.length > MAX_BATCH_WRITE_ITEMS) { + throw new Error(`batch write takes at max ${MAX_BATCH_WRITE_ITEMS} items`) + } + this.params.RequestItems[this.tableName].push(...items.map(this.createPutRequest)) return this } - // fixme backoff time is resetted for every request.. :/ /** * * @param backoffTimer generator for how much timeSlots should be waited before requesting next batch. only used when capacity was exceeded. default randomExponentialBackoffTimer * @param throttleTimeSlot defines how long one timeSlot is for throttling, default 1 second */ - exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = 1000): Observable { + exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable { this.logger.debug('starting batchWriteItem') - const rBoT = backoffTimer() - return this.execNextBatch().pipe( - mergeMap((r: BatchWriteSingleTableResponse) => { - if (r.capacityExceeded) { - const backoffTime = rBoT.next().value * throttleTimeSlot - this.logger.info(`wait ${backoffTime} ms until next request`, { backoffTime }) - return of(r).pipe(delay(backoffTime)) - } - return of(r) - }), - mergeMap((r: BatchWriteSingleTableResponse) => { - if (r.remainingItems > 0) { - return this.exec() - } else { - return of() - } + return this.write(backoffTimer, throttleTimeSlot).pipe( + map(() => { + return }), ) } - private execNextBatch(): Observable { - const batch = this.itemsToProcess.splice(0, MAX_BATCH_WRITE_ITEMS) - const batchWriteItemInput: BatchWriteItemInput = { - RequestItems: { - [this.tableName]: batch, - }, - } - this.logger.debug('request', batchWriteItemInput) - - return this.dynamoRx.batchWriteItem(batchWriteItemInput).pipe( - tap(response => this.logger.debug('response', response)), - tap((batchWriteManyResponse: BatchWriteItemOutput) => { - if (batchWriteManyResponse.UnprocessedItems && batchWriteManyResponse.UnprocessedItems[this.tableName]) { - this.itemsToProcess.unshift(...batchWriteManyResponse.UnprocessedItems[this.tableName]) - } - }), - map((batchWriteManyResponse: BatchWriteItemOutput) => ({ - remainingItems: this.itemsToProcess.length, - capacityExceeded: !!( - batchWriteManyResponse.UnprocessedItems && batchWriteManyResponse.UnprocessedItems[this.tableName] - ), - consumedCapacity: batchWriteManyResponse.ConsumedCapacity, - })), - tap(response => { - if (response.capacityExceeded) { - this.logger.info('capacity exceeded', response.consumedCapacity) - } - }), - ) + execFullResponse(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable { + return this.write(backoffTimer, throttleTimeSlot) } + + private write(backoffTimer: () => IterableIterator, throttleTimeSlot: number) { + return batchWriteItemsWriteAll(this.dynamoRx, { ...this.params }, backoffTimer(), throttleTimeSlot) + } + + private createDeleteRequest = (item: T): DynamoDB.WriteRequest => ({ DeleteRequest: { Key: this.toKey(item) } }) + private createPutRequest = (item: T): DynamoDB.WriteRequest => ({ PutRequest: { Item: toDb(item, this.modelClazz) } }) } diff --git a/src/dynamo/request/batchwritesingletable/batch-write-single-table.response.ts b/src/dynamo/request/batchwritesingletable/batch-write-single-table.response.ts deleted file mode 100644 index 1c69dbac8..000000000 --- a/src/dynamo/request/batchwritesingletable/batch-write-single-table.response.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { ConsumedCapacityMultiple } from 'aws-sdk/clients/dynamodb' - -export interface BatchWriteSingleTableResponse { - remainingItems: number - capacityExceeded: boolean - consumedCapacity?: ConsumedCapacityMultiple -} diff --git a/src/dynamo/request/batchwritesingletable/batch-write-utils.ts b/src/dynamo/request/batchwritesingletable/batch-write-utils.ts new file mode 100644 index 000000000..f4c75dcff --- /dev/null +++ b/src/dynamo/request/batchwritesingletable/batch-write-utils.ts @@ -0,0 +1,41 @@ +import * as DynamoDB from 'aws-sdk/clients/dynamodb' +import { Observable, of } from 'rxjs' +import { delay, mergeMap } from 'rxjs/operators' +import { DynamoRx } from '../../dynamo-rx' + + +export function batchWriteItemsWriteAll( + dynamoRx: DynamoRx, + params: DynamoDB.BatchWriteItemInput, + backoffTimer: IterableIterator, + throttleTimeSlot: number, +): Observable { + return dynamoRx.batchWriteItem(params) + .pipe( + mergeMap(response => { + if (hasUnprocessedItems(response)) { + return of(response.UnprocessedItems) + .pipe( + delay(backoffTimer.next().value * throttleTimeSlot), + mergeMap((unprocessedKeys: DynamoDB.BatchWriteItemRequestMap) => { + const nextParams: DynamoDB.BatchWriteItemInput = { ...params, RequestItems: unprocessedKeys } + return batchWriteItemsWriteAll(dynamoRx, nextParams, backoffTimer, throttleTimeSlot) + }), + ) + } + return of(response) + }), + ) +} + +export type BatchWriteItemOutputWithUnprocessedItems = + DynamoDB.BatchWriteItemOutput + & { UnprocessedItems: DynamoDB.BatchWriteItemRequestMap } + +export function hasUnprocessedItems(response: DynamoDB.BatchWriteItemOutput): response is BatchWriteItemOutputWithUnprocessedItems { + if (!response.UnprocessedItems) { + return false + } + return Object.values(response.UnprocessedItems) + .some(t => !!t && t.length > 0) +}