Skip to content

Commit

Permalink
feat: 🎸 progress on call execution
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 13, 2023
1 parent d2009e3 commit ae273c7
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
41 changes: 31 additions & 10 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {RedisClient, ReconnectingSocket, CmdOpts} from '../node';
import {RedisClusterRouter} from './RedisClusterRouter';
import {RedisClusterNodeInfo} from './RedisClusterNodeInfo';
import {RedisClusterCall} from './RedisClusterCall';
import {endpointByClient} from './endpoints';
import {isMovedError, parseMovedError} from './errors';

const calculateSlot = require('cluster-key-slot');

Expand Down Expand Up @@ -107,7 +109,7 @@ export class RedisCluster {
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
this.createClientForSlot(info);
this.createClientFromInfo(info);
return this.getAnyClient();
}

Expand All @@ -118,26 +120,29 @@ export class RedisCluster {
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
this.createClientForSlot(info);
this.createClientFromInfo(info);
return this.getAnyClient();
}

private async createClientForSlot(info: RedisClusterNodeInfo): Promise<RedisClient> {
const client = await this.createClient({
private async createClientFromInfo(info: RedisClusterNodeInfo): Promise<RedisClient> {
const [client] = await this.createClient({
...this.opts.connectionConfig,
host: info.endpoint,
host: info.endpoint || info.ip,
port: info.port,
});
this.router.setClient(info, client);
return client;
}

protected async createClient(config: RedisClusterNodeConfig): Promise<RedisClient> {
protected async createClient(config: RedisClusterNodeConfig): Promise<[client: RedisClient, id: string]> {
const client = this.createClientRaw(config);
client.start();
const {user, pwd} = config;
await client.hello(3, pwd, user);
return client;
const [, id] = await Promise.all([
client.hello(3, pwd, user),
client.clusterMyId(),
]);
this.router.setClient(id, client);
return [client, id];
}

protected createClientRaw(config: RedisClusterNodeConfig): RedisClient {
Expand All @@ -158,6 +163,7 @@ export class RedisCluster {
encoder: this.encoder,
decoder: this.decoder,
});
endpointByClient.set(client, host);
return client;
}

Expand All @@ -167,6 +173,21 @@ export class RedisCluster {
return master ? this.getBestAvailableWriteClient(slot) : this.getBestAvailableReadClient(slot);
}

protected async callWithClient(call: RedisClusterCall, client: RedisClient): Promise<unknown> {
try {
return await client.call(call);
} catch (error) {
if (isMovedError(error)) {
const redirect = parseMovedError((error as Error).message);
let host = redirect[0];
if (!host) host = endpointByClient.get(client) || '';
if (!host) throw new Error('NO_HOST');
// TODO: Start router table rebuild.
}
throw error;
}
}

public async call(call: RedisClusterCall): Promise<unknown> {
const args = call.args;
let cmd: string = args[0] as string;
Expand All @@ -175,7 +196,7 @@ export class RedisCluster {
const isWrite = commands.write.has(cmd);
const key = call.key || (args[1] + '') || '';
const client = await this.getClientForKey(key, isWrite);
return await client.call(call);
return await this.callWithClient(call, client);
}

public async cmd(args: unknown[], opts?: CmdOpts): Promise<unknown> {
Expand Down
5 changes: 2 additions & 3 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ export class RedisClusterRouter {
if (this.infos.has(id)) this.clients.set(id, client);
}

public setClient(info: RedisClusterNodeInfo, client: RedisClient): void {
if (!this.infos.has(info.id)) throw new Error('NO_SUCH_NODE');
this.clients.set(info.id, client);
public setClient(id: string, client: RedisClient): void {
this.clients.set(id, client);
}

public getNodesForSlot(slot: number): RedisClusterNodeInfo[] {
Expand Down
3 changes: 3 additions & 0 deletions src/cluster/endpoints.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import type {RedisClient} from "../node";

export const endpointByClient = new WeakMap<RedisClient, string>();
18 changes: 18 additions & 0 deletions src/cluster/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export const isMovedError = (error: unknown): boolean => {
if (error instanceof Error) {
const msg = error.message;
return msg.charCodeAt(0) === 77 && // M
msg.charCodeAt(1) === 79 && // O
msg.charCodeAt(2) === 86 && // V
msg.charCodeAt(3) === 69 && // E
msg.charCodeAt(4) === 68; // D
}
return false;
};

const movedErrorRegex = /^MOVED [0-9]+ ([^:]*):([0-9]+)\s*$/;
export const parseMovedError = (message: string): [endpoint: string, port: number] => {
const match = movedErrorRegex.exec(message);
if (!match) throw new Error('NOT_MOVED_ERROR');
return [match[1], +match[2]];
};
6 changes: 6 additions & 0 deletions src/node/ReconnectingSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ export class ReconnectingSocket {
return socket;
}

public getEndpoint(): string {
const socket = this.socket;
if (!socket) throw new Error('NOT_CONNECTED');
return `${socket.remoteAddress}:${socket.remotePort}`;
}

private readonly handleConnect = () => {
// Clear the connection timeout timer.
clearTimeout(this.connectionTimeoutTimer);
Expand Down

0 comments on commit ae273c7

Please sign in to comment.