Skip to content

Commit

Permalink
feat(core): add support for logging stats when consumed capacity is t…
Browse files Browse the repository at this point in the history
…urned on
  • Loading branch information
whimzyLive committed May 20, 2021
1 parent d3dc72b commit b179c48
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 18 deletions.
6 changes: 6 additions & 0 deletions packages/common/src/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ export enum AUTO_GENERATE_ATTRIBUTE_STRATEGY {
export enum INTERNAL_ENTITY_ATTRIBUTE {
ENTITY_NAME = '__en',
}

export enum CONSUMED_CAPACITY_TYPE {
INDEXES = 'INDEXES',
TOTAL = 'TOTAL',
NONE = 'NONE',
}
8 changes: 7 additions & 1 deletion packages/common/src/logger/debug-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,22 @@ export class DebugLogger {

logStats({
requestId,
requestSegment,
statsType,
consumedCapacityData,
}: {
requestId?: string;
requestSegment?: number; // if for any reason, request was segmented into multiple request, this indicates segment index
statsType: STATS_TYPE;
consumedCapacityData: any;
}) {
if (this.debugStatsLog.enabled) {
this.debugStatsLog(
`${chalk.bold.bgCyanBright(requestId)} ${chalk.green(statsType)}:`,
`${
chalk.bold.bgCyanBright(requestId) + requestSegment
? ':' + chalk.bold.bgCyanBright(requestId)
: ''
} ${chalk.green(statsType)}:`,
chalk.white(this.ensurePrintable(consumedCapacityData))
);
}
Expand Down
10 changes: 8 additions & 2 deletions packages/core/src/classes/manager/__test__/batch-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Replace} from '@typedorm/common';
import {CONSUMED_CAPACITY_TYPE, Replace} from '@typedorm/common';
import {User} from '@typedorm/core/__mocks__/user';
import {UserUniqueEmail} from '@typedorm/core/__mocks__/user-unique-email';
import {createTestConnection, resetTestConnection} from '@typedorm/testing';
Expand Down Expand Up @@ -371,7 +371,13 @@ test('processes simple batch read request', async () => {
},
},
]);
const response = await manager.read(readTestBatch);
const response = await manager.read(
readTestBatch,
{},
{
returnConsumedCapacity: CONSUMED_CAPACITY_TYPE.TOTAL,
}
);

expect(originalPromiseAll).toHaveBeenCalledTimes(1);
expect(originalPromiseAll).toHaveBeenCalledWith([expect.any(Promise)]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
UserAutoGenerateAttributes,
} from '../../../../__mocks__/user-auto-generate-attributes';
import {Connection} from '../../connection/connection';
import {CONSUMED_CAPACITY_TYPE} from '@typedorm/common';

let manager: EntityManager;
let connection: Connection;
Expand Down Expand Up @@ -49,7 +50,9 @@ test('creates entity', async () => {
user.name = 'Test User';
user.status = 'active';

const userEntity = await manager.create(user);
const userEntity = await manager.create(user, undefined, {
returnConsumedCapacity: CONSUMED_CAPACITY_TYPE.TOTAL,
});
expect(dcMock.put).toHaveBeenCalledTimes(1);
expect(dcMock.put).toHaveBeenCalledWith({
Item: {
Expand All @@ -63,6 +66,7 @@ test('creates entity', async () => {
status: 'active',
},
TableName: 'test-table',
ReturnConsumedCapacity: 'TOTAL',
ConditionExpression:
'(attribute_not_exists(#CE_PK)) AND (attribute_not_exists(#CE_SK))',
ExpressionAttributeNames: {
Expand Down Expand Up @@ -285,6 +289,9 @@ test('checks if given item exists', async () => {
User,
{
id: '1',
},
{
returnConsumedCapacity: CONSUMED_CAPACITY_TYPE.INDEXES,
}
);

Expand All @@ -294,6 +301,7 @@ test('checks if given item exists', async () => {
SK: 'USER#1',
},
TableName: 'test-table',
ReturnConsumedCapacity: 'INDEXES',
});
expect(userEntity).toEqual(true);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '@typedorm/core/__mocks__/user-unique-email';
import {Organisation} from '@typedorm/core/__mocks__/organisation';
import {ReadTransaction} from '../../transaction/read-transaction';
import {CONSUMED_CAPACITY_TYPE} from '@typedorm/common';

let manager: TransactionManager;
const dcMock = {
Expand Down Expand Up @@ -79,10 +80,13 @@ test('performs write transactions for simple writes', async () => {
},
]);

const response = await manager.write(transaction);
const response = await manager.write(transaction, {
returnConsumedCapacity: CONSUMED_CAPACITY_TYPE.TOTAL,
});

expect(dcMock.transactWrite).toHaveBeenCalledTimes(1);
expect(dcMock.transactWrite).toHaveBeenCalledWith({
ReturnConsumedCapacity: 'TOTAL',
TransactItems: [
{
Put: {
Expand Down
60 changes: 60 additions & 0 deletions packages/core/src/classes/manager/batch-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
BATCH_WRITE_MAX_ALLOWED_ATTEMPTS,
INTERNAL_ENTITY_ATTRIBUTE,
MANAGER_NAME,
STATS_TYPE,
} from '@typedorm/common';
import {
BatchGetResponseMap,
Expand Down Expand Up @@ -110,6 +111,7 @@ export class BatchManager {
() =>
this.connection.transactionManger.writeRaw(transformedInput, {
requestId,
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}),
// return original item when failed to process
rawInput,
Expand All @@ -130,6 +132,7 @@ export class BatchManager {
undefined,
{
requestId,
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}
);

Expand All @@ -151,6 +154,7 @@ export class BatchManager {
deleteTransactionItemList,
{
requestId,
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}
);
},
Expand All @@ -169,6 +173,7 @@ export class BatchManager {
this.connection.documentClient
.batchWrite({
RequestItems: {...batchRequestMap},
ReturnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
})
.promise(),
// for batch requests this returning item will be transformed to
Expand All @@ -189,6 +194,18 @@ export class BatchManager {
// 2. wait for all promises to finish
const responses = await Promise.all(allRequests);

// log stats
responses.forEach((response, index) => {
if (response.ConsumedCapacity) {
this.connection.logger.logStats({
requestId,
requestSegment: index,
statsType: STATS_TYPE.CONSUMED_CAPACITY,
consumedCapacityData: response.ConsumedCapacity,
});
}
});

// 3. run retry attempts
// process all unprocessed items recursively until all are either done
// or reached the retry limit
Expand All @@ -198,6 +215,7 @@ export class BatchManager {
options,
{
requestId,
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}
);

Expand Down Expand Up @@ -272,6 +290,7 @@ export class BatchManager {
this.connection.documentClient
.batchGet({
RequestItems: {...batchRequestItems},
ReturnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
})
.promise(),
batchRequestItems,
Expand All @@ -282,6 +301,18 @@ export class BatchManager {
// 2. wait for all promises to finish, either failed or hit the limit
const initialResponses = await Promise.all(batchRequests);

// log stats
initialResponses.forEach((response, index) => {
if (response.ConsumedCapacity) {
this.connection.logger.logStats({
requestId,
requestSegment: index,
statsType: STATS_TYPE.CONSUMED_CAPACITY,
consumedCapacityData: response.ConsumedCapacity,
});
}
});

// 3. run retries
const {
items,
Expand All @@ -293,6 +324,7 @@ export class BatchManager {
[],
{
requestId,
returnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
}
);

Expand Down Expand Up @@ -416,6 +448,8 @@ export class BatchManager {
this.connection.documentClient
.batchWrite({
RequestItems: {...batchRequestMap},
ReturnItemCollectionMetrics:
metadataOptions?.returnConsumedCapacity,
})
.promise(),
batchRequestMap,
Expand All @@ -426,6 +460,19 @@ export class BatchManager {
const batchRequestsResponses = (await Promise.all(
batchRequests
)) as DocumentClient.BatchWriteItemOutput[];

// log stats
batchRequestsResponses.forEach((response, index) => {
if (response.ConsumedCapacity) {
this.connection.logger.logStats({
requestId: metadataOptions?.requestId,
requestSegment: index,
statsType: STATS_TYPE.CONSUMED_CAPACITY,
consumedCapacityData: response.ConsumedCapacity,
});
}
});

return this.recursiveHandleBatchWriteItemsResponse(
batchRequestsResponses,
++totalAttemptsSoFar,
Expand Down Expand Up @@ -529,6 +576,7 @@ export class BatchManager {
this.connection.documentClient
.batchGet({
RequestItems: {...batchRequestMap},
ReturnConsumedCapacity: metadataOptions?.returnConsumedCapacity,
})
.promise(),
batchRequestMap,
Expand All @@ -540,6 +588,18 @@ export class BatchManager {
batchRequests
)) as DocumentClient.BatchGetItemOutput[];

// log stats
batchRequestsResponses.forEach((response, index) => {
if (response.ConsumedCapacity) {
this.connection.logger.logStats({
requestId: metadataOptions?.requestId,
requestSegment: index,
statsType: STATS_TYPE.CONSUMED_CAPACITY,
consumedCapacityData: response.ConsumedCapacity,
});
}
});

return this.recursiveHandleBatchReadItemsResponse(
batchRequestsResponses,
++totalAttemptsSoFar,
Expand Down

0 comments on commit b179c48

Please sign in to comment.