Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
fail-fast: false
matrix:
node-version: ["18", "20", "22"]
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-M01-pre", "8.4-RC1-pre"]
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-RC1-pre"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
100 changes: 73 additions & 27 deletions packages/client/lib/commands/XREADGROUP.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ describe('XREADGROUP', () => {
['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'STREAMS', 'key', '0-0']
);
});

it('with CLAIM', () => {
assert.deepEqual(
parseArgs(XREADGROUP, 'group', 'consumer', {
key: 'key',
id: '0-0'
}, {
CLAIM: 100
}),
['XREADGROUP', 'GROUP', 'group', 'consumer', 'CLAIM', '100', 'STREAMS', 'key', '0-0']
);
});

it('with COUNT, BLOCK, NOACK, CLAIM', () => {
assert.deepEqual(
parseArgs(XREADGROUP, 'group', 'consumer', {
key: 'key',
id: '0-0'
}, {
COUNT: 1,
BLOCK: 0,
NOACK: true,
CLAIM: 100
}),
['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'CLAIM', '100', 'STREAMS', 'key', '0-0']
);
});
});

testUtils.testAll('xReadGroup - null', async client => {
Expand Down Expand Up @@ -156,35 +183,54 @@ describe('XREADGROUP', () => {
cluster: GLOBAL.CLUSTERS.OPEN
});

testUtils.testWithClient('client.xReadGroup should throw with resp3 and unstableResp3: false', async client => {
assert.throws(
() => client.xReadGroup('group', 'consumer', {
key: 'key',
id: '>'
testUtils.testAll('xReadGroup - without CLAIM should not include delivery fields', async client => {
const [, id] = await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
{
message: 'Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance'
}
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3
}
});
client.xAdd('key', '*', { field: 'value' })
]);

testUtils.testWithClient('client.xReadGroup should not throw with resp3 and unstableResp3: true', async client => {
assert.doesNotThrow(
() => client.xReadGroup('group', 'consumer', {
key: 'key',
id: '>'
})
);
const readGroupReply = await client.xReadGroup('group', 'consumer', {
key: 'key',
id: '>'
});

assert.ok(readGroupReply);
assert.equal(readGroupReply[0].messages[0].millisElapsedFromDelivery, undefined);
assert.equal(readGroupReply[0].messages[0].deliveriesCounter, undefined);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
unstableResp3: true
}
client: GLOBAL.SERVERS.OPEN,
cluster: GLOBAL.CLUSTERS.OPEN
});

testUtils.testWithClientIfVersionWithinRange([[8,4], 'LATEST'],'xReadGroup - with CLAIM should include delivery fields', async client => {
const [, id] = await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xAdd('key', '*', { field: 'value' })
]);

// First read to add message to PEL
await client.xReadGroup('group', 'consumer', {
key: 'key',
id: '>'
});

// Read with CLAIM to get delivery fields
const readGroupReply = await client.xReadGroup('group', 'consumer2', {
key: 'key',
id: '>'
}, {
CLAIM: 0
});

assert.ok(readGroupReply);
assert.equal(readGroupReply[0].messages[0].id, id);
assert.ok(readGroupReply[0].messages[0].millisElapsedFromDelivery !== undefined);
assert.ok(readGroupReply[0].messages[0].deliveriesCounter !== undefined);
assert.equal(typeof readGroupReply[0].messages[0].millisElapsedFromDelivery, 'number');
assert.equal(typeof readGroupReply[0].messages[0].deliveriesCounter, 'number');
}, GLOBAL.SERVERS.OPEN);
});
9 changes: 7 additions & 2 deletions packages/client/lib/commands/XREADGROUP.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import { transformStreamsMessagesReplyResp2 } from './generic-transformers';

/**
* Options for the XREADGROUP command
*
*
* @property COUNT - Limit the number of entries returned per stream
* @property BLOCK - Milliseconds to block waiting for new entries (0 for indefinite)
* @property NOACK - Skip adding the message to the PEL (Pending Entries List)
* @property CLAIM - Prepend PEL entries that are at least this many milliseconds old
*/
export interface XReadGroupOptions {
COUNT?: number;
BLOCK?: number;
NOACK?: boolean;
CLAIM?: number;
}

export default {
Expand Down Expand Up @@ -50,6 +52,10 @@ export default {
parser.push('NOACK');
}

if (options?.CLAIM !== undefined) {
parser.push('CLAIM', options.CLAIM.toString());
}

pushXReadStreams(parser, streams);
},
/**
Expand All @@ -59,5 +65,4 @@ export default {
2: transformStreamsMessagesReplyResp2,
3: undefined as unknown as () => ReplyUnion
},
unstableResp3: true,
} as const satisfies Command;
44 changes: 25 additions & 19 deletions packages/client/lib/commands/generic-transformers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function transformStringDoubleArgument(num: RedisArgument | number): Redi
export const transformDoubleReply = {
2: (reply: BlobStringReply, preserve?: any, typeMapping?: TypeMapping): DoubleReply => {
const double = typeMapping ? typeMapping[RESP_TYPES.DOUBLE] : undefined;

switch (double) {
case String: {
return reply as unknown as DoubleReply;
Expand All @@ -58,13 +58,13 @@ export const transformDoubleReply = {
case 'inf':
case '+inf':
ret = Infinity;

case '-inf':
ret = -Infinity;

case 'nan':
ret = NaN;

default:
ret = Number(reply);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ export function createTransformNullableDoubleReplyResp2Func(preserve?: any, type
export const transformNullableDoubleReply = {
2: (reply: BlobStringReply | NullReply, preserve?: any, typeMapping?: TypeMapping) => {
if (reply === null) return null;

return transformDoubleReply[2](reply as BlobStringReply, preserve, typeMapping);
},
3: undefined as unknown as () => DoubleReply | NullReply
Expand Down Expand Up @@ -514,19 +514,25 @@ export function parseArgs(command: Command, ...args: Array<any>): CommandArgumen

export type StreamMessageRawReply = TuplesReply<[
id: BlobStringReply,
message: ArrayReply<BlobStringReply>
message: ArrayReply<BlobStringReply>,
millisElapsedFromDelivery?: NumberReply,
deliveriesCounter?: NumberReply
]>;

export type StreamMessageReply = {
id: BlobStringReply,
message: MapReply<BlobStringReply | string, BlobStringReply>,
millisElapsedFromDelivery?: number
deliveriesCounter?: number
};

export function transformStreamMessageReply(typeMapping: TypeMapping | undefined, reply: StreamMessageRawReply): StreamMessageReply {
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
const [ id, message, millisElapsedFromDelivery, deliveriesCounter ] = reply as unknown as UnwrapReply<typeof reply>;
return {
id: id,
message: transformTuplesReply(message, undefined, typeMapping)
message: transformTuplesReply(message, undefined, typeMapping),
...(millisElapsedFromDelivery !== undefined ? { millisElapsedFromDelivery: Number(millisElapsedFromDelivery) } : {}),
...(deliveriesCounter !== undefined ? { deliveriesCounter: Number(deliveriesCounter) } : {})
};
}

Expand Down Expand Up @@ -557,7 +563,7 @@ export function transformStreamsMessagesReplyResp2(
reply: UnwrapReply<StreamsMessagesRawReply2 | NullReply>,
preserve?: any,
typeMapping?: TypeMapping
): StreamsMessagesReply | NullReply {
): StreamsMessagesReply | NullReply {
// FUTURE: resposne type if resp3 was working, reverting to old v4 for now
//: MapReply<BlobStringReply | string, StreamMessagesReply> | NullReply {
if (reply === null) return null as unknown as NullReply;
Expand All @@ -569,25 +575,25 @@ export function transformStreamsMessagesReplyResp2(

for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;

const name = stream[0];
const rawMessages = stream[1];

ret.set(name.toString(), transformStreamMessagesReply(rawMessages, typeMapping));
}

return ret as unknown as MapReply<string, StreamMessagesReply>;
}
case Array: {
const ret: Array<BlobStringReply | StreamMessagesReply> = [];

for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;

const name = stream[0];
const rawMessages = stream[1];
ret.push(name);

ret.push(name);
ret.push(transformStreamMessagesReply(rawMessages, typeMapping));
}

Expand All @@ -598,13 +604,13 @@ export function transformStreamsMessagesReplyResp2(

for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;

const name = stream[0] as unknown as UnwrapReply<BlobStringReply>;
const rawMessages = stream[1];

ret[name.toString()] = transformStreamMessagesReply(rawMessages);
}

return ret as unknown as MapReply<string, StreamMessagesReply>;
}
*/
Expand All @@ -630,7 +636,7 @@ type StreamsMessagesRawReply3 = MapReply<BlobStringReply, ArrayReply<StreamMessa

export function transformStreamsMessagesReplyResp3(reply: UnwrapReply<StreamsMessagesRawReply3 | NullReply>): MapReply<BlobStringReply, StreamMessagesReply> | NullReply {
if (reply === null) return null as unknown as NullReply;

if (reply instanceof Map) {
const ret = new Map<string, StreamMessagesReply>();

Expand Down
Loading