Skip to content

Commit

Permalink
feat: 🎸 improve cluster calling setup
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 14, 2023
1 parent e385870 commit 250e808
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 126 deletions.
141 changes: 69 additions & 72 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {RedisClusterCall} from './RedisClusterCall';
import {isMovedError, parseMovedError} from './errors';
import {RedisClusterNodeClient, RedisClusterNodeClientOpts} from './RedisClusterNodeClient';
import {RedirectType} from './constants';
import {withTimeout} from '../util/timeout';
import type {CmdOpts} from '../node';

const calculateSlot = require('cluster-key-slot');
Expand Down Expand Up @@ -89,13 +90,11 @@ export class RedisCluster {
(async () => {
if (this.stopped) return;
if (!this.router.isEmpty()) return;
const {seeds, connectionConfig} = this.opts;
const {seeds} = this.opts;
seed = seed % seeds.length;
const seedConfig = seeds[seed]
const client = await this.createClientFromSeed(seedConfig);
const seedConfig = seeds[seed];
const node = await this.startClientFromConfig(seedConfig);
if (this.stopped) return;
const tls = !!seedConfig.tls || !!connectionConfig?.tls;
const node = new RedisClusterNode(client.id, client.port, [client.host], tls);
await this.router.rebuild(node);
if (this.stopped) return;
this.initialTableBuildAttempt = 0;
Expand Down Expand Up @@ -157,102 +156,100 @@ export class RedisCluster {

// ------------------------------------------------------ Client construction


private async createClientFromInfo(info: RedisClusterNode): Promise<RedisClusterNodeClient> {
return await this.createClientForHost(info.endpoint || info.ip, info.port);
protected async ensureNodeHasClient(node: RedisClusterNode): Promise<RedisClusterNodeClient> {
let client = node.client;
if (client) return client;
client = await this.startClientFromNode(node);
node.client = client;
return client;
}

private async createClientForHost(host: string, port: number): Promise<RedisClusterNodeClient> {
return await this.createNode({
...this.opts.connectionConfig,
host,
port,
});
/** When redirect points to a new host, which is not present in the route table */
private async startClientFromOrphanRedirect(host: string, port: number): Promise<RedisClusterNode> {
const config: RedisClusterNodeClientOpts = {host, port};
let node = await this.startClientFromConfig(config);
node = this.router.mergeNode(node);
return node;
}

protected createClientFromSeed(seed: RedisClusterNodeClientOpts): Promise<RedisClusterNodeClient> {
return this.createNode({
...this.opts.connectionConfig,
...seed,
});
/** When route table is created, it creates clients for each node. */
private async startClientFromNode(node: RedisClusterNode, hostIndex: number = 0): Promise<RedisClusterNodeClient> {
const host = node.hosts[hostIndex];
if (!host) throw new Error('NO_HOST');
const config: RedisClusterNodeClientOpts = {host, port: node.port};
if (node.tls) config.tls = true;
try {
const tmp = await withTimeout(5000, this.startClientFromConfig(config, node.id));
const client = tmp.client!;
return node.client = client;
} catch (error) {
if (error instanceof Error && error.message === 'TIMEOUT')
return await this.startClientFromNode(node, hostIndex + 1);
throw error;
}
}


/**
* 1. Create from host:port
* 2. Create from node info
* 1. Use preferred node info host
* 2. Iterate through all node info hosts
*/

protected async createNode(config: RedisClusterNodeClientOpts, id?: string): Promise<RedisClusterNode> {
const client = this.createClientRaw(config);
/** When cluster client boots it creates nodes from seed configs. */
protected async startClientFromConfig(config: RedisClusterNodeClientOpts, id?: string): Promise<RedisClusterNode> {
const conf = {
...this.opts.connectionConfig,
...config,
};
const client = this.createClient(conf);
client.start();
const {user, pwd} = config;
const {user, pwd} = conf;
const response = await Promise.all([
client.hello(3, pwd, user),
id ? Promise.resolve() : client.clusterMyId(),
]);
this.router.setNode(client);
return client;
id = id || response[1] as string;
const node = new RedisClusterNode(id, client.port, [client.host], !!conf.tls);
node.client = client;
return node;
}

protected createClientRaw(config: RedisClusterNodeClientOpts): RedisClusterNodeClient {
protected createClient(conf: RedisClusterNodeClientOpts): RedisClusterNodeClient {
const codec = {
encoder: this.encoder,
decoder: this.decoder,
};
const client = new RedisClusterNodeClient({
...this.opts.connectionConfig,
...config,
}, codec);
return client;
return new RedisClusterNodeClient(conf, codec);
}


// ----------------------------------------------------------- Client picking

public async getClientForKey(key: string, write: boolean): Promise<RedisClusterNodeClient> {
if (!key) return this.getAnyClient();
const slot = calculateSlot(key);
return write ? this.getBestAvailableWriteClient(slot) : this.getBestAvailableReadClient(slot);
protected async getRedirectNode(host: string, port: number): Promise<RedisClusterNode> {
const node = this.router.getNodeByEndpoint(host, port);
if (!node) return await this.startClientFromOrphanRedirect(host, port);
this.ensureNodeHasClient(node);
return node;
}

protected getAnyClient(): RedisClusterNodeClient {
const randomClient = this.router.getRandomClient();
if (!randomClient) throw new Error('NO_CLIENT');
return randomClient;
protected getAnyNode(): RedisClusterNode {
const node = this.router.getRandomNode();
if (node) return node;
throw new Error('NO_NODE');
}

protected async getAnyClientOrSeed(): Promise<RedisClusterNodeClient> {
const randomClient = this.router.getRandomClient();
if (randomClient) return randomClient;
protected async getAnyNodeOrSeed(): Promise<RedisClusterNode> {
const node = this.router.getRandomNode();
if (node) return node;
const seeds = this.opts.seeds;
const seed = seeds[Math.floor(Math.random() * seeds.length)];
return this.createClientFromSeed(seed);
return this.startClientFromConfig(seed);
}

protected async getBestAvailableWriteClient(slot: number): Promise<RedisClusterNodeClient> {
await this.whenRouterReady();
const router = this.router;
const info = router.getMasterNodeForSlot(slot);
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
return await this.createClientFromInfo(info);
}

protected async getBestAvailableReadClient(slot: number): Promise<RedisClusterNodeClient> {
public async getNodeForKey(key: string, write: boolean): Promise<RedisClusterNode> {
if (!key) return await this.getAnyNodeOrSeed();
const slot = calculateSlot(key);
await this.whenRouterReady();
const router = this.router;
const info = router.getRandomNodeForSlot(slot);
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
return await this.createClientFromInfo(info);
const node = write ? router.getMasterNodeForSlot(slot) : router.getRandomReplicaNodeForSlot(slot);
if (node) return node;
return await this.getAnyNodeOrSeed();
}


// -------------------------------------------------------- Command execution

protected async __call(call: RedisClusterCall): Promise<unknown> {
Expand All @@ -268,7 +265,7 @@ export class RedisCluster {
if (!host) throw new Error('NO_HOST');
const port = redirect[1];
if (port === client.port && host === client.host) throw new Error('SELF_REDIRECT');
const nextClient = await this.createClientForHost(host, port);
const nextClient = (await this.getRedirectNode(host, port)).client!;
const next = RedisClusterCall.redirect(call, nextClient, RedirectType.MOVED);
return await this.__call(next);
}
Expand All @@ -280,13 +277,13 @@ export class RedisCluster {
public async call(call: RedisClusterCall): Promise<unknown> {
const args = call.args;
let cmd: string = args[0] as string;
if (typeof cmd !== 'string') throw new Error('INVALID_COMMAND');
if (typeof cmd !== 'string' || !cmd) throw new Error('INVALID_COMMAND');
cmd = cmd.toUpperCase();
const isWrite = commands.write.has(cmd);
const key = call.key || (args[1] + '') || '';
console.log('key', key, 'isWrite', isWrite);
call.client = await this.getClientForKey(key, isWrite);
console.log('client', call.client.host, call.client.port);
const node = await this.getNodeForKey(key, isWrite);
if (!node.client) await this.ensureNodeHasClient(node);
call.client = node.client!;
return await this.__call(call);
}

Expand Down
2 changes: 1 addition & 1 deletion src/cluster/RedisClusterNode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {NodeHealth, NodeRole} from "./constants";
import type {RedisClusterShardsResponseNode} from "../node/types";
import type {RedisClusterShardsResponseNode} from "./types";
import type {RedisClusterNodeClient} from "./RedisClusterNodeClient";

const annotate = (node: RedisClusterNode, nodeInfo: RedisClusterShardsResponseNode): void => {
Expand Down
33 changes: 25 additions & 8 deletions src/cluster/RedisClusterNodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as tls from 'tls';
import * as net from 'net';
import {ReconnectingSocket, RedisClient} from '../node';
import type {RedisClientCodecOpts} from '../types';
import type {RedisClusterShardsResponse} from './types';

export interface RedisClusterNodeClientOpts {
/** Hostname or IP address of the Redis node. Defaults to 'localhost'. */
Expand All @@ -19,12 +20,10 @@ export interface RedisClusterNodeClientOpts {
}

export class RedisClusterNodeClient extends RedisClient {
// /** Cluster node ID, randomly assigned when node boots, retrieved with `CLUSTER MYID`. */
// public id: string = '';
// /** Hostname of the Redis node. */
// public readonly host: string;
// /** Port of the Redis node. */
// public readonly port: number;
/** Hostname of the Redis node. */
public readonly host: string;
/** Port of the Redis node. */
public readonly port: number;

constructor({host = 'localhost', port = 6379, ...opts}: RedisClusterNodeClientOpts, codec: RedisClientCodecOpts) {
super({
Expand All @@ -43,7 +42,25 @@ export class RedisClusterNodeClient extends RedisClient {
encoder: codec.encoder,
decoder: codec.decoder,
});
// this.host = host;
// this.port = port;
this.host = host;
this.port = port;
}


// -------------------------------------------------------- Built-in commands

public async clusterMyId(): Promise<string> {
// `CLUSTER MYID` is not supported in a number of servers, for example,
// redis.com returns "ERR unknown subcommand 'myid'". Instead, we parse
// `CLUSTER NODES` output.
const reg = /^([^ ]+) .+myself/gm;
const nodes = await this.cmd(['CLUSTER', 'NODES']) as string;
const match = reg.exec(nodes);
if (!match) throw new Error('Failed to parse CLUSTER NODES output.');
return match[1];
}

public clusterShards(): Promise<RedisClusterShardsResponse> {
return this.cmd(['CLUSTER', 'SHARDS'], {utf8Res: true}) as Promise<RedisClusterShardsResponse>;
}
}
41 changes: 35 additions & 6 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {AvlMap} from 'json-joy/es2020/util/trees/avl/AvlMap';
import {RedisClusterSlotRange} from './RedisClusterSlotRange';
import {RedisClusterNode} from './RedisClusterNode';
import {NodeRole} from './constants';
import {NodeHealth, NodeRole} from './constants';

export class RedisClusterRouter {
/** Map of slots ordered by slot end (max) value. */
Expand Down Expand Up @@ -40,15 +40,44 @@ export class RedisClusterRouter {
}
}

public setNode(info: RedisClusterNode): void {
this.byId.set(info.id, info);
const port = info.port;
for (const host of info.hosts) {
/** Overwrite the node value. */
public setNode(node: RedisClusterNode): void {
this.byId.set(node.id, node);
const port = node.port;
for (const host of node.hosts) {
const hostAndPort = host + ':' + port;
this.byHostAndPort.set(hostAndPort, info);
this.byHostAndPort.set(hostAndPort, node);
}
}

/** Merge the node value into a potentially existing node. */
public mergeNode(node: RedisClusterNode): RedisClusterNode {
const existing = this.byId.get(node.id);
if (existing) {
if (existing.port !== node.port) throw new Error('INVALID_PORT');
for (const host of node.hosts) {
if (!existing.hosts.includes(host)) {
existing.hosts.push(host);
const endpoint = host + ':' + existing.port;
this.byHostAndPort.set(endpoint, existing);
}
}
if (existing.role === NodeRole.UNKNOWN) existing.role = node.role;
if (existing.replicationOffset === 0) existing.replicationOffset = node.replicationOffset;
if (existing.health === NodeHealth.UNKNOWN) existing.health = node.health;
if (!existing.client) existing.client = node.client;
return existing;
} else {
this.setNode(node);
return node;
}
}

public getNodeByEndpoint(host: string, port: number): RedisClusterNode | undefined {
const hostAndPort = host + ':' + port;
return this.byHostAndPort.get(hostAndPort);
}

public getNodesForSlot(slot: number): RedisClusterNode[] {
const range = this.ranges.getOrNextLower(slot);
if (!range) return [];
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/RedisClusterSlotRange.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {RedisClusterNode} from "./RedisClusterNodeInfo";
import {RedisClusterNode} from './RedisClusterNode';

export class RedisClusterSlotRange {
constructor(public readonly min: number, public readonly max: number, public readonly nodes: RedisClusterNode[]) {}
Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions src/demo-cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ const main = async () => {
};

await exec(['SET', 'foo', 1], {key: 'foo'});
await exec(['SET', 'bar', 2], {key: 'bar'});
await exec(['SET', 'baz', 3], {key: 'baz'});
await exec(['SET', 'qux', 4], {key: 'qux'});
await exec(['SET', 'quux', 5], {key: 'quux'});
// await exec(['SET', 'bar', 2], {key: 'bar'});
// await exec(['SET', 'baz', 3], {key: 'baz'});
// await exec(['SET', 'qux', 4], {key: 'qux'});
// await exec(['SET', 'quux', 5], {key: 'quux'});
};

main().catch((err) => {
Expand Down
Loading

0 comments on commit 250e808

Please sign in to comment.