Skip to content

Commit

Permalink
fix: ensure CSR works with the ioredis package
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Mar 1, 2024
1 parent d2d635d commit 78075ec
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 27 deletions.
27 changes: 12 additions & 15 deletions lib/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
} from "socket.io-adapter";
import { decode, encode } from "@msgpack/msgpack";
import debugModule from "debug";
import { hasBinary, XADD, XREAD } from "./util";
import { hasBinary, GETDEL, SET, XADD, XRANGE, XREAD } from "./util";

const debug = debugModule("socket.io-redis-streams-adapter");

Expand Down Expand Up @@ -235,9 +235,12 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
const sessionKey = this.#opts.sessionKeyPrefix + session.pid;
const encodedSession = Buffer.from(encode(session)).toString("base64");

this.#redisClient.set(sessionKey, encodedSession, {
PX: this.nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration,
});
SET(
this.#redisClient,
sessionKey,
encodedSession,
this.nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration
);
}

override async restoreSession(
Expand All @@ -253,12 +256,8 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
const sessionKey = this.#opts.sessionKeyPrefix + pid;

const results = await Promise.all([
this.#redisClient
.multi()
.get(sessionKey)
.del(sessionKey) // GETDEL was added in Redis version 6.2
.exec(),
this.#redisClient.xRange(this.#opts.streamName, offset, offset),
GETDEL(this.#redisClient, sessionKey),
XRANGE(this.#redisClient, this.#opts.streamName, offset, offset),
]);

const rawSession = results[0][0];
Expand All @@ -277,13 +276,11 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
// FIXME we need to add an arbitrary limit here, because if entries are added faster than what we can consume, then
// we will loop endlessly. But if we stop before reaching the end of the stream, we might lose messages.
for (let i = 0; i < RESTORE_SESSION_MAX_XRANGE_CALLS; i++) {
const entries = await this.#redisClient.xRange(
const entries = await XRANGE(
this.#redisClient,
this.#opts.streamName,
RedisStreamsAdapter.nextOffset(offset),
"+",
{
COUNT: this.#opts.readCount,
}
"+"
);

if (entries.length === 0) {
Expand Down
85 changes: 73 additions & 12 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

/**
* Map the output of the XREAD/XRANGE command with the ioredis package to the format of the redis package
* @param result
*/
function mapResult(result) {
const id = result[0];
const inlineValues = result[1];
const message = {};
for (let i = 0; i < inlineValues.length; i += 2) {
message[inlineValues[i]] = inlineValues[i + 1];
}
return {
id,
message,
};
}

/**
* @see https://redis.io/commands/xread/
*/
Expand Down Expand Up @@ -81,18 +98,7 @@ export function XREAD(
}
return [
{
messages: results[0][1].map((result) => {
const id = result[0];
const inlineValues = result[1];
const message = {};
for (let i = 0; i < inlineValues.length; i += 2) {
message[inlineValues[i]] = inlineValues[i + 1];
}
return {
id,
message,
};
}),
messages: results[0][1].map(mapResult),
},
];
});
Expand Down Expand Up @@ -125,3 +131,58 @@ export function XADD(
return redisClient.xadd.call(redisClient, args);
}
}

/**
* @see https://redis.io/commands/xrange/
*/
export function XRANGE(
redisClient: any,
streamName: string,
start: string,
end: string
) {
if (isRedisV4Client(redisClient)) {
return redisClient.xRange(streamName, start, end);
} else {
return redisClient.xrange(streamName, start, end).then((res) => {
return res.map(mapResult);
});
}
}

/**
* @see https://redis.io/commands/set/
*/
export function SET(
redisClient: any,
key: string,
value: string,
expiryInSeconds: number
) {
if (isRedisV4Client(redisClient)) {
return redisClient.set(key, value, {
PX: expiryInSeconds,
});
} else {
return redisClient.set(key, value, "PX", expiryInSeconds);
}
}

/**
* @see https://redis.io/commands/getdel/
*/
export function GETDEL(redisClient: any, key: string) {
if (isRedisV4Client(redisClient)) {
// note: GETDEL was added in Redis version 6.2
return redisClient.multi().get(key).del(key).exec();
} else {
return redisClient
.multi()
.get(key)
.del(key)
.exec()
.then((res) => {
return [res[0][1]];
});
}
}

0 comments on commit 78075ec

Please sign in to comment.