Skip to content

Commit

Permalink
feat: 🎸 shotre clients in a separate map
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 15, 2023
1 parent fbec3b3 commit 6faf3ab
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 54 deletions.
80 changes: 36 additions & 44 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ export interface RedisClusterOpts extends RedisClientCodecOpts {
export class RedisCluster implements Printable {
protected readonly encoder: RespEncoder;
protected readonly decoder: RespStreamingDecoder;
protected readonly router = new RedisClusterRouter();
protected readonly router = new RedisClusterRouter(this);
protected readonly clients = new Map<string, RedisClusterNodeClient>();

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
Expand Down Expand Up @@ -95,9 +96,9 @@ export class RedisCluster implements Printable {
const {seeds} = this.opts;
seed = seed % seeds.length;
const seedConfig = seeds[seed];
const node = await this.startClientFromConfig(seedConfig);
const [client, id] = await this.startClientFromConfig(seedConfig);
if (this.stopped) return;
await this.router.rebuild(node);
await this.router.rebuild(client, id);
if (this.stopped) return;
this.initialTableBuildAttempt = 0;
this.onRouter.emit();
Expand Down Expand Up @@ -159,19 +160,10 @@ export class RedisCluster implements Printable {
// ------------------------------------------------------ Client construction

protected async ensureNodeHasClient(node: RedisClusterNode): Promise<RedisClusterNodeClient> {
let client = node.client;
const id = node.id;
const client = this.clients.get(id);
if (client) return client;
client = await this.startClientFromNode(node);
node.client = client;
return client;
}

/** When redirect points to a new host, which is not present in the route table */
private async startClientFromOrphanRedirect(host: string, port: number): Promise<RedisClusterNode> {
const config: RedisClusterNodeClientOpts = {host, port};
let node = await this.startClientFromConfig(config);
node = this.router.mergeNode(node);
return node;
return await this.startClientFromNode(node);
}

/** When route table is created, it creates clients for each node. */
Expand All @@ -181,9 +173,9 @@ export class RedisCluster implements Printable {
const config: RedisClusterNodeClientOpts = {host, port: node.port};
if (node.tls) config.tls = true;
try {
const tmp = await withTimeout(5000, this.startClientFromConfig(config, node.id));
const client = tmp.client!;
return node.client = client;
const [client] = await withTimeout(5000, this.startClientFromConfig(config));
this.clients.set(node.id, client);
return client;
} catch (error) {
if (error instanceof Error && error.message === 'TIMEOUT')
return await this.startClientFromNode(node, hostIndex + 1);
Expand All @@ -192,22 +184,19 @@ export class RedisCluster implements Printable {
}

/** When cluster client boots it creates nodes from seed configs. */
protected async startClientFromConfig(config: RedisClusterNodeClientOpts, id?: string): Promise<RedisClusterNode> {
protected async startClientFromConfig(config: RedisClusterNodeClientOpts, loadId?: boolean): Promise<[client: RedisClusterNodeClient, id: string]> {
const conf = {
...this.opts.connectionConfig,
...config,
};
const client = this.createClient(conf);
client.start();
const {user, pwd} = conf;
const response = await Promise.all([
client.hello(3, pwd, user),
id ? Promise.resolve() : client.clusterMyId(),
const [, id = ''] = await Promise.all([
await client.hello(3, pwd, user),
loadId ? client.clusterMyId() : Promise.resolve(),
]);
id = id || response[1] as string;
const node = new RedisClusterNode(id, client.port, [client.host], !!conf.tls);
node.client = client;
return node;
return [client, id];
}

protected createClient(conf: RedisClusterNodeClientOpts): RedisClusterNodeClient {
Expand All @@ -221,11 +210,17 @@ export class RedisCluster implements Printable {

// ----------------------------------------------------------- Client picking

protected async getRedirectNode(host: string, port: number): Promise<RedisClusterNode> {
public getClient(nodeId: string): RedisClusterNodeClient | undefined {
return this.clients.get(nodeId);
}

protected async getRedirectClient(host: string, port: number, oldClient: RedisClusterNodeClient): Promise<RedisClusterNodeClient> {
const node = this.router.getNodeByEndpoint(host, port);
if (!node) return await this.startClientFromOrphanRedirect(host, port);
this.ensureNodeHasClient(node);
return node;
if (node) return await this.ensureNodeHasClient(node);
await this.router.rebuild(oldClient);
const node2 = this.router.getNodeByEndpoint(host, port);
if (!node2) throw new Error('NO_NODE');
return await this.ensureNodeHasClient(node2);
}

protected getAnyNode(): RedisClusterNode {
Expand All @@ -234,22 +229,23 @@ export class RedisCluster implements Printable {
throw new Error('NO_NODE');
}

protected async getAnyNodeOrSeed(): Promise<RedisClusterNode> {
protected async getAnyClientOrSeedClient(): Promise<RedisClusterNodeClient> {
const node = this.router.getRandomNode();
if (node) return node;
if (node) return await this.ensureNodeHasClient(node);
const seeds = this.opts.seeds;
const seed = seeds[Math.floor(Math.random() * seeds.length)];
return this.startClientFromConfig(seed);
const [client] = await this.startClientFromConfig(seed);
return client;
}

public async getNodeForKey(key: string, write: boolean): Promise<RedisClusterNode> {
if (!key) return await this.getAnyNodeOrSeed();
public async getClientForKey(key: string, write: boolean): Promise<RedisClusterNodeClient> {
if (!key) return await this.getAnyClientOrSeedClient();
const slot = calculateSlot(key);
await this.whenRouterReady();
const router = this.router;
const node = write ? router.getMasterNodeForSlot(slot) : router.getRandomReplicaNodeForSlot(slot);
if (node) return node;
return await this.getAnyNodeOrSeed();
if (node) return await this.ensureNodeHasClient(node);
return await this.getAnyClientOrSeedClient();
}


Expand All @@ -268,8 +264,7 @@ export class RedisCluster implements Printable {
if (!host) throw new Error('NO_HOST');
const port = redirect[1];
if (port === client.port && host === client.host) throw new Error('SELF_REDIRECT');
const redirectNode = await this.getRedirectNode(host, port);
const nextClient = redirectNode.client ?? await this.ensureNodeHasClient(redirectNode);
const nextClient = await this.getRedirectClient(host, port, client);
const next = RedisClusterCall.redirect(call, nextClient, RedirectType.MOVED);
return await this.__call(next);
}
Expand All @@ -285,11 +280,8 @@ export class RedisCluster implements Printable {
cmd = cmd.toUpperCase();
const isWrite = commands.write.has(cmd);
const key = call.key || (args[1] + '') || '';
const node = await this.getNodeForKey(key, isWrite);
if (!node.client) await this.ensureNodeHasClient(node);
call.client = node.client!;
console.log(this + '');
console.log(node.client + '');
const client = await this.getClientForKey(key, isWrite);
call.client = client;
return await this.__call(call);
}

Expand Down
12 changes: 6 additions & 6 deletions src/cluster/RedisClusterNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {NodeHealth, NodeRole} from "./constants";
import {printTree} from 'json-joy/es2020/util/print/printTree';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisClusterShardsResponseNode} from "./types";
import type {RedisClusterNodeClient} from "./RedisClusterNodeClient";
import type {RedisCluster} from "./RedisCluster";

const annotate = (node: RedisClusterNode, nodeInfo: RedisClusterShardsResponseNode): void => {
node.role = nodeInfo.role === 'master' ? NodeRole.MASTER : NodeRole.REPLICA;
Expand All @@ -12,7 +12,7 @@ const annotate = (node: RedisClusterNode, nodeInfo: RedisClusterShardsResponseNo
};

export class RedisClusterNode implements Printable {
public static fromNodeInfo = (nodeInfo: RedisClusterShardsResponseNode, extraHost: string = ''): RedisClusterNode => {
public static fromNodeInfo = (cluster: RedisCluster, nodeInfo: RedisClusterShardsResponseNode, extraHost: string = ''): RedisClusterNode => {
const id = nodeInfo.id + '';
const port = Number(nodeInfo.port ? nodeInfo.port : nodeInfo['tls-port']);
if (!port) throw new Error('NO_PORT');
Expand All @@ -23,7 +23,7 @@ export class RedisClusterNode implements Printable {
if (nodeInfo.ip && nodeInfo.ip !== '?') hosts.push(nodeInfo.ip + '');
if (!hosts.length && extraHost) hosts.push(extraHost);
if (!hosts.length) throw new Error('NO_HOSTS');
const node = new RedisClusterNode(id, port, hosts, tls);
const node = new RedisClusterNode(cluster, id, port, hosts, tls);
annotate(node, nodeInfo);
return node;
};
Expand All @@ -35,9 +35,8 @@ export class RedisClusterNode implements Printable {
public role: NodeRole = NodeRole.UNKNOWN;
public replicationOffset: number = 0;
public health: NodeHealth = NodeHealth.UNKNOWN;
public client: RedisClusterNodeClient | undefined = undefined;

constructor(id: string, port: number, hosts: string[], tls: boolean) {
constructor(protected readonly cluster: RedisCluster, id: string, port: number, hosts: string[], tls: boolean) {
this.id = id;
this.port = port;
this.hosts = [...new Set(hosts)];
Expand All @@ -50,8 +49,9 @@ export class RedisClusterNode implements Printable {
public toString(tab?: string): string {
const role = this.role === NodeRole.MASTER ? 'master' : 'replica';
const health = this.health === NodeHealth.ONLINE ? 'online' : this.health === NodeHealth.FAILED ? 'failed' : 'loading';
const client = this.cluster.getClient(this.id);
return `node (${this.id})${this.tls ? ' TLS' : ''} [${this.hosts.join(', ')}]:${this.port} ${role} ${this.replicationOffset} ${health}` + printTree(tab, [
this.client ? (tab => `${this.client?.toString(tab)}`) : null,
tab => client ? `${client.toString(tab)}` : 'on client'
]);
}
}
13 changes: 9 additions & 4 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {RedisClusterNode} from './RedisClusterNode';
import {NodeHealth, NodeRole} from './constants';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisCluster} from './RedisCluster';
import type {RedisClusterNodeClient} from './RedisClusterNodeClient';

export class RedisClusterRouter implements Printable {
/** Map of slots ordered by slot end (max) value. */
Expand All @@ -15,6 +17,8 @@ export class RedisClusterRouter implements Printable {
/** Mapping of "host:port" to node info instance. */
protected readonly byHostAndPort = new Map<string, RedisClusterNode>();

constructor(protected readonly cluster: RedisCluster) {}

/** Whether the route table is empty. */
public isEmpty(): boolean {
return this.ranges.isEmpty();
Expand All @@ -24,8 +28,7 @@ export class RedisClusterRouter implements Printable {
* Rebuild the router hash slot mapping.
* @param client Redis client to use to query the cluster.
*/
public async rebuild(info: RedisClusterNode): Promise<void> {
const client = info.client;
public async rebuild(client: RedisClusterNodeClient, id?: string): Promise<void> {
if (!client) throw new Error('NO_CLIENT');
const slots = await client.clusterShards();
this.ranges.clear();
Expand All @@ -34,12 +37,15 @@ export class RedisClusterRouter implements Printable {
for (const slot of slots) {
const range = new RedisClusterSlotRange(slot.slots[0], slot.slots[1], []);
for (const nodeInfo of slot.nodes) {
const node = nodeInfo.id === info.id ? RedisClusterNode.fromNodeInfo(nodeInfo) : RedisClusterNode.fromNodeInfo(nodeInfo, client.host);
const node = id && (nodeInfo.id === id)
? RedisClusterNode.fromNodeInfo(this.cluster, nodeInfo)
: RedisClusterNode.fromNodeInfo(this.cluster, nodeInfo, client.host);
this.setNode(node);
range.nodes.push(node);
}
this.ranges.insert(range.max, range);
}
// TODO: remove orphan clients
}

/** Overwrite the node value. */
Expand Down Expand Up @@ -67,7 +73,6 @@ export class RedisClusterRouter implements Printable {
if (existing.role === NodeRole.UNKNOWN) existing.role = node.role;
if (existing.replicationOffset === 0) existing.replicationOffset = node.replicationOffset;
if (existing.health === NodeHealth.UNKNOWN) existing.health = node.health;
if (!existing.client) existing.client = node.client;
return existing;
} else {
this.setNode(node);
Expand Down

0 comments on commit 6faf3ab

Please sign in to comment.