From e00041e0eb80840585cf7806a4d360a083c76cc7 Mon Sep 17 00:00:00 2001 From: Charley DAVID Date: Tue, 19 Sep 2023 23:37:16 +0200 Subject: [PATCH] Fix: XAUTOCLAIM after a TRIM with pending messages returns nil (#2565) * fix(client): XCLAIM & XAUTOCLAIM after a TRIM might return nils * fix(client): Fix race condition in specs * revert test utils changes * make tests faster --------- Co-authored-by: Leibale Eidelman --- .../client/lib/commands/XAUTOCLAIM.spec.ts | 80 ++++++++++++++++--- packages/client/lib/commands/XAUTOCLAIM.ts | 6 +- packages/client/lib/commands/XCLAIM.spec.ts | 32 +++++++- packages/client/lib/commands/XCLAIM.ts | 2 +- .../lib/commands/generic-transformers.spec.ts | 33 ++++++++ .../lib/commands/generic-transformers.ts | 28 ++++--- 6 files changed, 154 insertions(+), 27 deletions(-) diff --git a/packages/client/lib/commands/XAUTOCLAIM.spec.ts b/packages/client/lib/commands/XAUTOCLAIM.spec.ts index 4447a06d773..bae914bda05 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.spec.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.spec.ts @@ -23,20 +23,76 @@ describe('XAUTOCLAIM', () => { }); }); - testUtils.testWithClient('client.xAutoClaim', async client => { - await Promise.all([ - client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }), + testUtils.testWithClient('client.xAutoClaim without messages', async client => { + const [,, reply] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAutoClaim('key', 'group', 'consumer', 1, '0-0') ]); - assert.deepEqual( - await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'), - { - nextId: '0-0', - messages: [] - } - ); + assert.deepEqual(reply, { + nextId: '0-0', + messages: [] + }); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xAutoClaim with messages', async client => { + const [,, id,, reply] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xAutoClaim('key', 'group', 'consumer', 0, '0-0') + ]); + + assert.deepEqual(reply, { + nextId: '0-0', + messages: [{ + id, + message: Object.create(null, { + foo: { + value: 'bar', + configurable: true, + enumerable: true + } + }) + }] + }); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => { + const [,,,,, id,, reply] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { foo: 'bar' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xTrim('key', 'MAXLEN', 0), + client.xAdd('key', '*', { bar: 'baz' }), + client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }), + client.xAutoClaim('key', 'group', 'consumer', 0, '0-0') + ]); + + assert.deepEqual(reply, { + nextId: '0-0', + messages: testUtils.isVersionGreaterThan([7, 0]) ? [{ + id, + message: Object.create(null, { + bar: { + value: 'baz', + configurable: true, + enumerable: true + } + }) + }] : [null, { + id, + message: Object.create(null, { + bar: { + value: 'baz', + configurable: true, + enumerable: true + } + }) + }] + }); }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XAUTOCLAIM.ts b/packages/client/lib/commands/XAUTOCLAIM.ts index 4bf46057bac..831563981a6 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.ts @@ -1,5 +1,5 @@ import { RedisCommandArgument, RedisCommandArguments } from '.'; -import { StreamMessagesReply, transformStreamMessagesReply } from './generic-transformers'; +import { StreamMessagesNullReply, transformStreamMessagesNullReply } from './generic-transformers'; export const FIRST_KEY_INDEX = 1; @@ -28,12 +28,12 @@ type XAutoClaimRawReply = [RedisCommandArgument, Array]; interface XAutoClaimReply { nextId: RedisCommandArgument; - messages: StreamMessagesReply; + messages: StreamMessagesNullReply; } export function transformReply(reply: XAutoClaimRawReply): XAutoClaimReply { return { nextId: reply[0], - messages: transformStreamMessagesReply(reply[1]) + messages: transformStreamMessagesNullReply(reply[1]) }; } diff --git a/packages/client/lib/commands/XCLAIM.spec.ts b/packages/client/lib/commands/XCLAIM.spec.ts index 141a62ab77a..6626e84c731 100644 --- a/packages/client/lib/commands/XCLAIM.spec.ts +++ b/packages/client/lib/commands/XCLAIM.spec.ts @@ -83,8 +83,38 @@ describe('XCLAIM', () => { }); assert.deepEqual( - await client.xClaim('key', 'group', 'consumer', 1, '0-0'), + await client.xClaim('key', 'group', 'consumer', 0, '0-0'), [] ); }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xClaim with a message', async client => { + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + const id = await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + + assert.deepEqual( + await client.xClaim('key', 'group', 'consumer', 0, id), + [{ + id, + message: Object.create(null, { 'foo': { + value: 'bar', + configurable: true, + enumerable: true + } }) + }] + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('client.xClaim with a trimmed message', async client => { + await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }); + const id = await client.xAdd('key', '*', { foo: 'bar' }); + await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }); + await client.xTrim('key', 'MAXLEN', 0); + + assert.deepEqual( + await client.xClaim('key', 'group', 'consumer', 0, id), + testUtils.isVersionGreaterThan([7, 0]) ? []: [null] + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XCLAIM.ts b/packages/client/lib/commands/XCLAIM.ts index bc38f9b9e95..e7b458e2376 100644 --- a/packages/client/lib/commands/XCLAIM.ts +++ b/packages/client/lib/commands/XCLAIM.ts @@ -45,4 +45,4 @@ export function transformArguments( return args; } -export { transformStreamMessagesReply as transformReply } from './generic-transformers'; +export { transformStreamMessagesNullReply as transformReply } from './generic-transformers'; diff --git a/packages/client/lib/commands/generic-transformers.spec.ts b/packages/client/lib/commands/generic-transformers.spec.ts index 301cab0a75c..60caf26eaad 100644 --- a/packages/client/lib/commands/generic-transformers.spec.ts +++ b/packages/client/lib/commands/generic-transformers.spec.ts @@ -9,6 +9,7 @@ import { transformStringNumberInfinityArgument, transformTuplesReply, transformStreamMessagesReply, + transformStreamMessagesNullReply, transformStreamsMessagesReply, transformSortedSetWithScoresReply, pushGeoCountArgument, @@ -219,6 +220,38 @@ describe('Generic Transformers', () => { ); }); + it('transformStreamMessagesNullReply', () => { + assert.deepEqual( + transformStreamMessagesNullReply([null, ['0-0', ['0key', '0value']]]), + [null, { + id: '0-0', + message: Object.create(null, { + '0key': { + value: '0value', + configurable: true, + enumerable: true + } + }) + }] + ); + }); + + it('transformStreamMessagesNullReply', () => { + assert.deepEqual( + transformStreamMessagesNullReply([null, ['0-1', ['11key', '11value']]]), + [null, { + id: '0-1', + message: Object.create(null, { + '11key': { + value: '11value', + configurable: true, + enumerable: true + } + }) + }] + ); + }); + describe('transformStreamsMessagesReply', () => { it('null', () => { assert.equal( diff --git a/packages/client/lib/commands/generic-transformers.ts b/packages/client/lib/commands/generic-transformers.ts index 5048de9399a..4cf610a036e 100644 --- a/packages/client/lib/commands/generic-transformers.ts +++ b/packages/client/lib/commands/generic-transformers.ts @@ -92,19 +92,27 @@ export interface StreamMessageReply { message: Record; } -export type StreamMessagesReply = Array; +export function transformStreamMessageReply([id, message]: Array): StreamMessageReply { + return { + id, + message: transformTuplesReply(message) + }; +} -export function transformStreamMessagesReply(reply: Array): StreamMessagesReply { - const messages = []; +export function transformStreamMessageNullReply(reply: Array): StreamMessageReply | null { + if (reply === null) return null; + return transformStreamMessageReply(reply); +} - for (const [id, message] of reply) { - messages.push({ - id, - message: transformTuplesReply(message) - }); - } - return messages; +export type StreamMessagesReply = Array; +export function transformStreamMessagesReply(reply: Array): StreamMessagesReply { + return reply.map(transformStreamMessageReply); +} + +export type StreamMessagesNullReply = Array; +export function transformStreamMessagesNullReply(reply: Array): StreamMessagesNullReply { + return reply.map(transformStreamMessageNullReply); } export type StreamsMessagesReply = Array<{