Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/dynamo/batchget/batch-get-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ export function batchGetItemsFetchAll(
)
}

export type ResponseWithUnprocessedKeys = DynamoDB.BatchGetItemOutput & { UnprocessedKeys: BatchGetRequestMap }
export type BatchGetItemOutputWithUnprocessedKeys =
DynamoDB.BatchGetItemOutput
& { UnprocessedKeys: BatchGetRequestMap }

export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is ResponseWithUnprocessedKeys {
export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is BatchGetItemOutputWithUnprocessedKeys {
if (!response.UnprocessedKeys) {
return false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// tslint:disable:no-unnecessary-class

import * as DynamoDB from 'aws-sdk/clients/dynamodb'
import { of } from 'rxjs'
import { Organization } from '../../../../test/models'
Expand All @@ -7,38 +9,53 @@ import { BatchWriteSingleTableRequest } from './batch-write-single-table.request

describe('batch write single table request', () => {
const tableName = getTableName(Organization)
const item: Organization = <Organization>{
id: 'myId',
createdAtDate: new Date(),
name: 'myOrg',
}

let item: Organization
let dynamoRx: DynamoRx
let request: BatchWriteSingleTableRequest<Organization>

let nextSpyFn: () => { value: number }
const generatorMock = () => <any>{ next: nextSpyFn }
describe('constructor', () => {
it('should throw when no class was given', () => {
expect(() => new BatchWriteSingleTableRequest(<any>null, <any>null)).toThrow()
})
it('should throw when class given is not @Model decorated', () => {
class NoModel {}
expect(() => new BatchWriteSingleTableRequest(<any>null, NoModel)).toThrow()
})

beforeEach(() => {
item = <any>{
id: 'myId',
createdAtDate: new Date(),
name: 'myOrg',
}
nextSpyFn = jest.fn().mockImplementation(() => ({ value: 0 }))
it('should initialize params', () => {
request = new BatchWriteSingleTableRequest(<any>null, Organization)
expect(request.params).toEqual({
RequestItems: {
[tableName]: [],
},
})
})
})

describe('correct params', () => {
beforeEach(() => {
dynamoRx = new DynamoRx()
request = new BatchWriteSingleTableRequest(dynamoRx, Organization)
})

it('returnConsumedCapacity', () => {
request.returnConsumedCapacity('TOTAL')
expect(request.params.ReturnConsumedCapacity).toBe('TOTAL')
})

const output: DynamoDB.BatchWriteItemOutput = {}
spyOn(dynamoRx, 'batchWriteItem').and.returnValue(of(output))
it('returnItemCollectionMetrics', () => {
request.returnItemCollectionMetrics('SIZE')
expect(request.params.ReturnItemCollectionMetrics).toBe('SIZE')
})

it('delete with complex primary key', async () => {
it('delete with composite key', () => {
request.delete([item])
await request.exec(generatorMock).toPromise()

expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1)
expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({
expect(request.params).toEqual({
RequestItems: {
[tableName]: [
{
Expand All @@ -52,15 +69,12 @@ describe('batch write single table request', () => {
],
},
})
expect(nextSpyFn).toHaveBeenCalledTimes(0)
})

it('put object', async () => {
request.put([item])
await request.exec(generatorMock).toPromise()

expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1)
expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({
expect(request.params).toEqual({
RequestItems: {
[tableName]: [
{
Expand All @@ -75,50 +89,103 @@ describe('batch write single table request', () => {
],
},
})
expect(nextSpyFn).toHaveBeenCalledTimes(0)
})

it('delete >25 items in two requests', async () => {
const twentyFiveItems = []
for (let i = 0; i < 25; i++) {
twentyFiveItems.push(item)
}
request.delete(twentyFiveItems)
it('adding >25 items in first delete call throws', () => {
const twentyFiveItems = new Array(30).map(() => item)
expect(() => request.delete(twentyFiveItems)).toThrow()
})

it('adding >25 items in second delete call throws', () => {
const twentyFiveItems = new Array(25).map(() => item)
request.delete(twentyFiveItems)
await request.exec(generatorMock).toPromise()
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2)
expect(nextSpyFn).toHaveBeenCalledTimes(0)
expect(() => request.delete(twentyFiveItems)).toThrow()
})

it('adding >25 items in first put call throws', () => {
const twentyFiveItems = new Array(30).map(() => item)
expect(() => request.put(twentyFiveItems)).toThrow()
})

it('adding >25 items in second put call throws', () => {
const twentyFiveItems = new Array(25).map(() => item)
request.put(twentyFiveItems)
expect(() => request.put(twentyFiveItems)).toThrow()
})
})

describe('correct backoff', () => {
describe('Unprocessed items', () => {
const output: DynamoDB.BatchWriteItemOutput = {
UnprocessedItems: {
[tableName]: [
{
PutRequest: {
Item: {
id: { S: 'myId' },
createdAtDate: { S: item.createdAtDate.toISOString() },
name: { S: 'myOrg' },
},
},
},
],
},
}

let generatorSpy: jasmine.Spy
let nextFnSpy: jasmine.Spy
let batchWriteItemSpy: jasmine.Spy

beforeEach(() => {
dynamoRx = new DynamoRx()
batchWriteItemSpy = jasmine.createSpy().and.returnValues(of(output), of(output), of({ MyResult: true }))
nextFnSpy = jasmine.createSpy().and.returnValue({ value: 0 })
dynamoRx = <DynamoRx>(<any>{ batchWriteItem: batchWriteItemSpy })
generatorSpy = jasmine.createSpy().and.returnValue({ next: nextFnSpy })

request = new BatchWriteSingleTableRequest(dynamoRx, Organization)
})

const output: DynamoDB.BatchWriteItemOutput = {
UnprocessedItems: {
[tableName]: [
{
PutRequest: {
Item: {
id: { S: 'myId' },
createdAtDate: { S: item.createdAtDate.toISOString() },
name: { S: 'myOrg' },
},
},
},
],
},
}
spyOn(dynamoRx, 'batchWriteItem').and.returnValues(of(output), of({}))
it('should retry when unprocessed items are returned', async () => {
request.put([item])
await request.exec(<any>generatorSpy).toPromise()

// only one instance of the generator should be created
expect(generatorSpy).toHaveBeenCalledTimes(1)

expect(nextFnSpy).toHaveBeenCalledTimes(2)

expect(batchWriteItemSpy).toHaveBeenCalledTimes(3)
})

it('should retry when capacity is exceeded', async () => {
it('should keep other params in additional calls', async () => {
request.put([item])
await request.exec(generatorMock).toPromise()
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2)
expect(nextSpyFn).toHaveBeenCalledTimes(1)
request.returnConsumedCapacity('TOTAL')
request.returnItemCollectionMetrics('SIZE')
await request.exec(<any>generatorSpy).toPromise()

expect(batchWriteItemSpy).toHaveBeenCalledTimes(3)
const paramsThirdCall = <DynamoDB.BatchWriteItemInput>batchWriteItemSpy.calls.all()[2].args[0]

expect(paramsThirdCall).toBeDefined()
expect(paramsThirdCall.ReturnConsumedCapacity).toBe('TOTAL')
expect(paramsThirdCall.ReturnItemCollectionMetrics).toBe('SIZE')
})
})

describe('exec / execFullResponse', () => {
beforeEach(() => {
dynamoRx = <DynamoRx>(<any>{ batchWriteItem: () => of({ myResponse: true }) })
request = new BatchWriteSingleTableRequest(dynamoRx, Organization)
request.delete([item])
})

it('exec should return nothing', async () => {
const response = await request.exec().toPromise()
expect(response).toBeUndefined()
})

it('execFullResponse should return BatchWriteItemOutput', async () => {
const response = await request.execFullResponse().toPromise()
expect(response).toEqual({ myResponse: true })
})
})
})
Loading