Skip to content

Commit

Permalink
move approximateArrivalTimestamp to the end
Browse files Browse the repository at this point in the history
  • Loading branch information
zfu-lifion committed Mar 15, 2022
1 parent 5cfc79c commit 281c4b7
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
4 changes: 2 additions & 2 deletions lib/polling-consumer.js
Expand Up @@ -327,11 +327,11 @@ class PollingConsumer {

privateProps.setCheckpoint = async (sequenceNumber, approxArrival) => {
await stateStore.storeShardCheckpoint(
approxArrival,
shardId,
sequenceNumber,
shardsPath,
shardsPathNames
shardsPathNames,
approxArrival
);
privateProps.checkpoint = sequenceNumber;
privateProps.approxArrival = approxArrival;
Expand Down
16 changes: 8 additions & 8 deletions lib/polling-consumer.test.js
Expand Up @@ -90,11 +90,11 @@ describe('lib/polling-consumer', () => {
});
expect(getRecords).toHaveBeenCalledWith({ Limit: 1000, ShardIterator: 'iterator' });
expect(storeShardCheckpoint).toHaveBeenCalledWith(
approximateArrivalTimestamp,
'shardId-0000',
1,
'',
{}
{},
approximateArrivalTimestamp
);
expect(pushToStream).toHaveBeenCalledWith(null, {
millisBehindLatest: 10,
Expand Down Expand Up @@ -142,11 +142,11 @@ describe('lib/polling-consumer', () => {
});
expect(getRecords).toHaveBeenCalledWith({ Limit: 1000, ShardIterator: 'iterator' });
expect(storeShardCheckpoint).toHaveBeenCalledWith(
approximateArrivalTimestamp,
'shardId-0000',
1,
'',
{}
{},
approximateArrivalTimestamp
);
expect(pushToStream).toHaveBeenCalledWith(null, {
millisBehindLatest: 10,
Expand Down Expand Up @@ -224,11 +224,11 @@ describe('lib/polling-consumer', () => {
pushToStream.mockImplementationOnce(() => {
try {
expect(storeShardCheckpoint).toHaveBeenCalledWith(
approximateArrivalTimestamp,
'shardId-0000',
1,
'',
{}
{},
approximateArrivalTimestamp
);
expect(setTimeout).not.toHaveBeenCalled();
expect(debug.mock.calls).toEqual([
Expand Down Expand Up @@ -501,11 +501,11 @@ describe('lib/polling-consumer', () => {
expect(storeShardCheckpoint).not.toHaveBeenCalled();
await setCheckpoint('123', approximateArrivalTimestamp);
expect(storeShardCheckpoint).toHaveBeenCalledWith(
approximateArrivalTimestamp,
'shardId-0000',
'123',
'',
{}
{},
approximateArrivalTimestamp
);
expect(debug.mock.calls).toEqual([
['Starting to read shard "shardId-0000" from the latest record.'],
Expand Down
6 changes: 3 additions & 3 deletions lib/state-store.js
Expand Up @@ -855,20 +855,20 @@ class StateStore {
/**
* Store a shard checkpoint.
*
* @param {Date} approximateArrivalTimestamp - The approximate arrival timestamp.
* @param {string} shardId - The ID of the shard to store a checkpoint for.
* @param {string} checkpoint - The sequence number to store as the recovery point.
* @param {string} shardsPath - The path pointing to where the shards state is stored.
* @param {Object} shardsPathNames - The values of the attribute names in the path.
* @param {Date} approximateArrivalTimestamp - The approximate arrival timestamp.
* @fulfil {undefined}
* @returns {Promise}
*/
async storeShardCheckpoint(
approximateArrivalTimestamp,
shardId,
checkpoint,
shardsPath,
shardsPathNames
shardsPathNames,
approximateArrivalTimestamp
) {
const privateProps = internal(this);
const { logger } = privateProps;
Expand Down
18 changes: 12 additions & 6 deletions lib/state-store.test.js
Expand Up @@ -1321,7 +1321,7 @@ describe('lib/state-store', () => {
const store = new StateStore(options);
await store.start();
await expect(
store.storeShardCheckpoint(new Date(), 'shard-0001', null, '#a', { '#a': 'a' })
store.storeShardCheckpoint('shard-0001', null, '#a', { '#a': 'a' }, new Date())
).rejects.toThrow('The sequence number is required.');
});

Expand All @@ -1330,9 +1330,15 @@ describe('lib/state-store', () => {
const approxArrivalTimestampString = new Date().toISOString();
await store.start();
await expect(
store.storeShardCheckpoint(approxArrivalTimestampString, 'shard-0001', '1', '#a', {
'#a': 'a'
})
store.storeShardCheckpoint(
'shard-0001',
'1',
'#a',
{
'#a': 'a'
},
approxArrivalTimestampString
)
).resolves.toBeUndefined;
const { update } = new DynamoDbClient();
expect(update).toHaveBeenNthCalledWith(1, {
Expand Down Expand Up @@ -1360,7 +1366,7 @@ describe('lib/state-store', () => {
test('storeShardCheckpoint should be called if timestamp is null, timestamp stored as null', async () => {
const store = new StateStore(options);
await store.start();
await expect(store.storeShardCheckpoint(null, 'shard-0001', '1', '#a', { '#a': 'a' })).resolves
await expect(store.storeShardCheckpoint('shard-0001', '1', '#a', { '#a': 'a' }, null)).resolves
.toBeUndefined;
const { update } = new DynamoDbClient();
expect(update).toHaveBeenNthCalledWith(1, {
Expand Down Expand Up @@ -1390,7 +1396,7 @@ describe('lib/state-store', () => {
const approxArrivalTimestamp = new Date();
await store.start();
await expect(
store.storeShardCheckpoint(approxArrivalTimestamp, 'shard-0001', '1', '#a', { '#a': 'a' })
store.storeShardCheckpoint('shard-0001', '1', '#a', { '#a': 'a' }, approxArrivalTimestamp)
).resolves.toBeUndefined();
const { update } = new DynamoDbClient();
expect(update).toHaveBeenNthCalledWith(1, {
Expand Down

0 comments on commit 281c4b7

Please sign in to comment.