Skip to content

Commit

Permalink
✨ if the dynamodb collections exist, skip the setup
Browse files Browse the repository at this point in the history
  • Loading branch information
drieshooghe committed Nov 20, 2022
1 parent aeaee14 commit 8cda3b2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 50 deletions.
49 changes: 31 additions & 18 deletions lib/integration/event-store/dynamodb.event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import {
BatchWriteItemCommand,
BatchWriteItemInput,
CreateTableCommand,
DescribeTableCommand,
DynamoDBClient,
GetItemCommand,
QueryCommand,
ResourceNotFoundException,
} from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
import { DEFAULT_BATCH_SIZE, StreamReadingDirection } from '../../constants';
Expand Down Expand Up @@ -34,24 +36,35 @@ export class DynamoDBEventStore extends EventStore {

async setup(pool?: IEventPool): Promise<EventCollection> {
const collection = EventCollection.get(pool);
await this.client.send(
new CreateTableCommand({
TableName: collection,
KeySchema: [
{ AttributeName: 'streamId', KeyType: 'HASH' },
{ AttributeName: 'version', KeyType: 'RANGE' },
],
AttributeDefinitions: [
{ AttributeName: 'streamId', AttributeType: 'S' },
{ AttributeName: 'version', AttributeType: 'N' },
],
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1,
},
BillingMode: 'PAY_PER_REQUEST',
}),
);

try {
await this.client.send(new DescribeTableCommand({ TableName: collection }));
} catch (err) {
switch (err.constructor) {
case ResourceNotFoundException:
await this.client.send(
new CreateTableCommand({
TableName: collection,
KeySchema: [
{ AttributeName: 'streamId', KeyType: 'HASH' },
{ AttributeName: 'version', KeyType: 'RANGE' },
],
AttributeDefinitions: [
{ AttributeName: 'streamId', AttributeType: 'S' },
{ AttributeName: 'version', AttributeType: 'N' },
],
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1,
},
BillingMode: 'PAY_PER_REQUEST',
}),
);
break;
default:
throw err;
}
}

return collection;
}
Expand Down
77 changes: 45 additions & 32 deletions lib/integration/snapshot-store/dynamodb.snapshot-store.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
AttributeValue,
CreateTableCommand,
DescribeTableCommand,
DynamoDBClient,
GetItemCommand,
QueryCommand,
ResourceNotFoundException,
TransactWriteItemsCommand,
} from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
Expand Down Expand Up @@ -31,38 +33,49 @@ export class DynamoDBSnapshotStore extends SnapshotStore {

async setup(pool?: ISnapshotPool): Promise<SnapshotCollection> {
const collection = SnapshotCollection.get(pool);
await this.client.send(
new CreateTableCommand({
TableName: collection,
KeySchema: [
{ AttributeName: 'streamId', KeyType: 'HASH' },
{ AttributeName: 'version', KeyType: 'RANGE' },
],
AttributeDefinitions: [
{ AttributeName: 'streamId', AttributeType: 'S' },
{ AttributeName: 'version', AttributeType: 'N' },
{ AttributeName: 'aggregateName', AttributeType: 'S' },
{ AttributeName: 'latest', AttributeType: 'S' },
],
GlobalSecondaryIndexes: [
{
IndexName: 'aggregate_index',
KeySchema: [
{ AttributeName: 'aggregateName', KeyType: 'HASH' },
{ AttributeName: 'latest', KeyType: 'RANGE' },
],
Projection: {
ProjectionType: 'ALL',
},
},
],
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1,
},
BillingMode: 'PAY_PER_REQUEST',
}),
);

try {
await this.client.send(new DescribeTableCommand({ TableName: collection }));
} catch (err) {
switch (err.constructor) {
case ResourceNotFoundException:
await this.client.send(
new CreateTableCommand({
TableName: collection,
KeySchema: [
{ AttributeName: 'streamId', KeyType: 'HASH' },
{ AttributeName: 'version', KeyType: 'RANGE' },
],
AttributeDefinitions: [
{ AttributeName: 'streamId', AttributeType: 'S' },
{ AttributeName: 'version', AttributeType: 'N' },
{ AttributeName: 'aggregateName', AttributeType: 'S' },
{ AttributeName: 'latest', AttributeType: 'S' },
],
GlobalSecondaryIndexes: [
{
IndexName: 'aggregate_index',
KeySchema: [
{ AttributeName: 'aggregateName', KeyType: 'HASH' },
{ AttributeName: 'latest', KeyType: 'RANGE' },
],
Projection: {
ProjectionType: 'ALL',
},
},
],
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1,
},
BillingMode: 'PAY_PER_REQUEST',
}),
);
break;
default:
throw err;
}
}

return collection;
}
Expand Down

0 comments on commit 8cda3b2

Please sign in to comment.