Skip to content

Commit

Permalink
feat(core): added AWS SDK v3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
whimzyLive committed Mar 14, 2022
1 parent 2c770c0 commit 1a8b61c
Show file tree
Hide file tree
Showing 22 changed files with 601 additions and 170 deletions.
1 change: 1 addition & 0 deletions examples/demo-document-client-v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ setup_ts_build(
"//packages/core:library",
"//packages/document-client:library",
"@npm//@aws-sdk/client-dynamodb",
"@npm//@aws-sdk/lib-dynamodb",
"@npm//reflect-metadata",
"@npm//rollup-plugin-commonjs",
"@npm//rollup-plugin-node-resolve",
Expand Down
11 changes: 7 additions & 4 deletions packages/core/src/classes/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {
NoSuchEntityExistsError,
} from '@typedorm/common';
import {loadPackage} from '@typedorm/common/src/helpers/load-package';
import {DocumentClientV2, DocumentClientV3} from '@typedorm/document-client';
import {
DocumentClient,
DocumentClientV2,
DocumentClientV3,
} from '@typedorm/document-client';
import {isUsedForPrimaryKey} from '../../helpers/is-used-for-primary-key';
import {BatchManager} from '../manager/batch-manager';
import {EntityManager} from '../manager/entity-manager';
Expand All @@ -31,8 +35,7 @@ export class Connection {
readonly batchManager: BatchManager;
readonly scanManager: ScanManager;
readonly defaultConfig: {queryItemsImplicitLimit: number};
// TODO: Add support for DocumentClientV3 type
readonly documentClient: DocumentClientV2;
readonly documentClient: DocumentClient;
readonly logger: DebugLogger;

private _entityMetadatas: Map<string, EntityMetadata>;
Expand All @@ -59,7 +62,7 @@ export class Connection {

this.documentClient = this.loadOrInitiateDocumentClient(
options.documentClient
) as DocumentClientV2;
);

/**
* This makes sure that we only ever build entity metadatas once per connection
Expand Down
52 changes: 26 additions & 26 deletions packages/core/src/classes/manager/batch-manager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {DocumentClientTypes} from '@typedorm/document-client';
import {WriteBatch} from '../batch/write-batch';
import {Connection} from '../connection/connection';
import {DocumentClientBatchTransformer} from '../transformer/document-client-batch-transformer';
Expand All @@ -10,11 +11,6 @@ import {
MANAGER_NAME,
STATS_TYPE,
} from '@typedorm/common';
import {
BatchGetResponseMap,
BatchWriteItemRequestMap,
DocumentClient,
} from 'aws-sdk/clients/dynamodb';
import {isEmptyObject} from '../../helpers/is-empty-object';
import {ReadBatch} from '../batch/read-batch';
import {MetadataOptions} from '../transformer/base-transformer';
Expand Down Expand Up @@ -176,8 +172,8 @@ export class BatchManager {
...lazyTransactionRequests,
...batchRequests,
] as
| Promise<DocumentClient.TransactWriteItemsOutput>[]
| Promise<DocumentClient.BatchWriteItemOutput>[];
| Promise<DocumentClientTypes.TransactWriteItemOutput>[]
| Promise<DocumentClientTypes.BatchWriteItemOutput>[];

// 2. wait for all promises to finish
const responses = await Promise.all(allRequests);
Expand Down Expand Up @@ -211,7 +207,7 @@ export class BatchManager {
// 4.1. reverse parse all failed inputs to original user inputs
// filter or drop any empty values
const transformedUnprocessedItems = unprocessedItems.flatMap(
(unprocessedItemInput: DocumentClient.BatchWriteItemRequestMap) =>
(unprocessedItemInput: DocumentClientTypes.BatchWriteItemRequestMap) =>
this._dcBatchTransformer.toWriteBatchInputList(
unprocessedItemInput,
metadata
Expand Down Expand Up @@ -340,7 +336,7 @@ export class BatchManager {

// 4.2 transform unprocessed items
const unprocessedTransformedItems = unprocessedItemsList?.flatMap(
(item: DocumentClient.BatchGetRequestMap) =>
(item: DocumentClientTypes.BatchGetRequestMap) =>
this._dcBatchTransformer.toReadBatchInputList(item, metadata)
);

Expand Down Expand Up @@ -370,14 +366,14 @@ export class BatchManager {
* @param batchWriteItemOutputItems
*/
private async recursiveHandleBatchWriteItemsResponse(
batchWriteItemOutputItems: DocumentClient.BatchWriteItemOutput[],
batchWriteItemOutputItems: DocumentClientTypes.BatchWriteItemOutputList,
totalAttemptsSoFar: number,
options?: BatchManagerWriteOptions,
metadataOptions?: MetadataOptions
): Promise<DocumentClient.BatchWriteItemRequestMap[]> {
): Promise<DocumentClientTypes.BatchWriteItemRequestMapList> {
const unProcessedListItems = batchWriteItemOutputItems
.filter(
(response: DocumentClient.BatchWriteItemOutput) =>
(response: DocumentClientTypes.BatchWriteItemOutput) =>
response.UnprocessedItems && !isEmptyObject(response.UnprocessedItems)
)
.map(item => item.UnprocessedItems!);
Expand Down Expand Up @@ -422,7 +418,7 @@ export class BatchManager {
);
return acc;
},
{} as BatchWriteItemRequestMap
{} as DocumentClientTypes.BatchWriteItemRequestMap
);

const batchRequestsItems = this._dcBatchTransformer.mapTableWriteItemsToBatchWriteItems(
Expand All @@ -445,7 +441,7 @@ export class BatchManager {

const batchRequestsResponses = (await Promise.all(
batchRequests
)) as DocumentClient.BatchWriteItemOutput[];
)) as DocumentClientTypes.BatchWriteItemOutputList;

// log stats
batchRequestsResponses.forEach((response, index) => {
Expand All @@ -469,23 +465,24 @@ export class BatchManager {
}

private async recursiveHandleBatchReadItemsResponse(
batchReadItemOutputList: DocumentClient.BatchGetItemOutput[],
batchReadItemOutputList: DocumentClientTypes.BatchGetItemOutputList,
totalAttemptsSoFar: number,
options?: BatchManagerReadOptions,
responsesStore: DocumentClient.ItemList = [],
responsesStore: DocumentClientTypes.ItemList = [],
metadataOptions?: MetadataOptions
): Promise<{
items: DocumentClient.ItemList;
unprocessedItemsList?: DocumentClient.BatchGetRequestMap[];
items: DocumentClientTypes.ItemList;
unprocessedItemsList?: DocumentClientTypes.BatchGetRequestMapList;
}> {
// save all responses from api to responses store
const batchReadResponses = batchReadItemOutputList
.filter(
(response: DocumentClient.BatchGetItemOutput) =>
(response: DocumentClientTypes.BatchGetItemOutput) =>
response.Responses && !isEmptyObject(response.Responses)
)
.map(
(response: DocumentClient.BatchGetItemOutput) => response.Responses!
(response: DocumentClientTypes.BatchGetItemOutput) =>
response.Responses!
);
if (batchReadResponses.length) {
const mappedResponsesItemList = batchReadResponses.flatMap(
Expand All @@ -496,7 +493,7 @@ export class BatchManager {

// recursively process all unprocessed items
const unprocessedItemsList = batchReadItemOutputList.filter(
(response: DocumentClient.BatchGetItemOutput) =>
(response: DocumentClientTypes.BatchGetItemOutput) =>
response.UnprocessedKeys && !isEmptyObject(response.UnprocessedKeys)
);

Expand Down Expand Up @@ -536,20 +533,21 @@ export class BatchManager {

// aggregate all requests by table name
const sortedUnprocessedItems = unprocessedItemsList.reduce(
(acc, {UnprocessedKeys}: DocumentClient.BatchGetItemOutput) => {
(acc, {UnprocessedKeys}: DocumentClientTypes.BatchGetItemOutput) => {
Object.entries(UnprocessedKeys!).forEach(
([tableName, unprocessedRequests]) => {
if (!acc[tableName]) {
acc[tableName] = {
Keys: [],
};
}
acc[tableName].Keys.push(...unprocessedRequests.Keys);

acc[tableName].Keys?.push(...unprocessedRequests.Keys);
}
);
return acc;
},
{} as DocumentClient.BatchGetRequestMap
{} as DocumentClientTypes.BatchGetRequestMap
);

const batchRequestsItemsList = this._dcBatchTransformer.mapTableReadItemsToBatchReadItems(
Expand All @@ -571,7 +569,7 @@ export class BatchManager {

const batchRequestsResponses = (await Promise.all(
batchRequests
)) as DocumentClient.BatchGetItemOutput[];
)) as DocumentClientTypes.BatchGetItemOutputList;

// log stats
batchRequestsResponses.forEach((response, index) => {
Expand Down Expand Up @@ -600,7 +598,9 @@ export class BatchManager {
this._errorQueue = [];
}

private mapBatchGetResponseToItemList(batchGetResponse: BatchGetResponseMap) {
private mapBatchGetResponseToItemList(
batchGetResponse: DocumentClientTypes.BatchGetResponseMap
) {
return Object.entries(batchGetResponse).flatMap(
([, batchResponse]) => batchResponse
);
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/classes/manager/entity-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ export class EntityManager {
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}
);

const entityClass = getConstructorForInstance(entity);

if (!isWriteTransactionItemList(dynamoPutItemInput)) {
Expand All @@ -173,7 +174,7 @@ export class EntityManager {
// by default dynamodb does not return attributes on create operation, so return one
const itemToReturn = this._entityTransformer.fromDynamoEntity<Entity>(
entityClass,
dynamoPutItemInput.Item,
dynamoPutItemInput.Item as DocumentClientTypes.AttributeMap,
{
requestId,
}
Expand Down
40 changes: 20 additions & 20 deletions packages/core/src/classes/manager/scan-manager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {DocumentClientTypes} from '@typedorm/document-client';
import {
EntityTarget,
INTERNAL_ENTITY_ATTRIBUTE,
Expand All @@ -6,7 +7,6 @@ import {
PARALLEL_SCAN_CONCURRENCY_LIMIT,
STATS_TYPE,
} from '@typedorm/common';
import {DynamoDB} from 'aws-sdk';
import pLimit from 'p-limit';
import {getUniqueRequestId} from '../../helpers/get-unique-request-id';
import {Connection} from '../connection/connection';
Expand All @@ -32,7 +32,7 @@ interface ScanManageBaseOptions<Entity, PartitionKey> {
* Cursor to traverse from
* @default none
*/
cursor?: DynamoDB.DocumentClient.Key;
cursor?: DocumentClientTypes.Key;

/**
* Specify filter to apply
Expand Down Expand Up @@ -203,10 +203,10 @@ export class ScanManager {

let response: {
items?: Entity[];
unknownItems?: DynamoDB.DocumentClient.AttributeMap[];
unknownItems?: DocumentClientTypes.AttributeMap[];
cursor?:
| DynamoDB.DocumentClient.Key
| Record<number, DynamoDB.DocumentClient.Key>;
| DocumentClientTypes.Key
| Record<number, DocumentClientTypes.Key>;
};

if (findOptions?.totalSegments) {
Expand Down Expand Up @@ -286,8 +286,8 @@ export class ScanManager {
metadataOptions?: MetadataOptions
): Promise<{
items: Entity[] | undefined;
unknownItems: DynamoDB.DocumentClient.AttributeMap[] | undefined;
cursor: Record<number, DynamoDB.DocumentClient.Key | undefined>;
unknownItems: DocumentClientTypes.AttributeMap[] | undefined;
cursor: Record<number, DocumentClientTypes.Key | undefined>;
}> {
// start with 0
this.itemsFetchedSoFarTotalParallelCount = 0;
Expand Down Expand Up @@ -352,8 +352,8 @@ export class ScanManager {
(
acc: {
items: Entity[];
unknownItems: DynamoDB.DocumentClient.AttributeMap[];
cursor: Record<number, DynamoDB.DocumentClient.Key>;
unknownItems: DocumentClientTypes.AttributeMap[];
cursor: Record<number, DocumentClientTypes.Key>;
},
current,
index
Expand Down Expand Up @@ -385,8 +385,8 @@ export class ScanManager {
},
{} as {
items: Entity[];
unknownItems: DynamoDB.DocumentClient.AttributeMap[];
cursor: Record<number, DynamoDB.DocumentClient.Key>;
unknownItems: DocumentClientTypes.AttributeMap[];
cursor: Record<number, DocumentClientTypes.Key>;
}
);

Expand All @@ -405,8 +405,8 @@ export class ScanManager {
metadataOptions?: MetadataOptions
): Promise<{
items: Entity[] | undefined;
unknownItems: DynamoDB.DocumentClient.AttributeMap[] | undefined;
cursor: DynamoDB.DocumentClient.Key | undefined;
unknownItems: DocumentClientTypes.AttributeMap[] | undefined;
cursor: DocumentClientTypes.Key | undefined;
}> {
const requestId = getUniqueRequestId(metadataOptions?.requestId);

Expand Down Expand Up @@ -468,14 +468,14 @@ export class ScanManager {
itemsFetched = [],
metadataOptions,
}: {
scanInput: DynamoDB.DocumentClient.ScanInput;
scanInput: DocumentClientTypes.ScanInput;
limit?: number;
cursor?: DynamoDB.DocumentClient.Key;
itemsFetched?: DynamoDB.DocumentClient.ItemList;
cursor?: DocumentClientTypes.Key;
itemsFetched?: DocumentClientTypes.ItemList;
metadataOptions?: MetadataOptions;
}): Promise<{
items: DynamoDB.DocumentClient.ItemList;
cursor?: DynamoDB.DocumentClient.Key;
items: DocumentClientTypes.ItemList;
cursor?: DocumentClientTypes.Key;
}> {
// return if the count is already met
if (limit && this.itemsFetchedSoFarTotalParallelCount >= limit) {
Expand Down Expand Up @@ -540,8 +540,8 @@ export class ScanManager {
currentCount = 0,
metadataOptions,
}: {
scanInput: DynamoDB.DocumentClient.ScanInput;
cursor?: DynamoDB.DocumentClient.Key;
scanInput: DocumentClientTypes.ScanInput;
cursor?: DocumentClientTypes.Key;
currentCount?: number;
metadataOptions?: MetadataOptions;
}): Promise<number> {
Expand Down

0 comments on commit 1a8b61c

Please sign in to comment.