Skip to content

Commit

Permalink
feat: 🎸 add client code
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 13, 2023
1 parent b5bed11 commit 2a0819b
Show file tree
Hide file tree
Showing 13 changed files with 688 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# redis-joy

WIP
152 changes: 152 additions & 0 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import * as net from 'net';
import * as tls from 'tls';
import {FanOut} from 'thingies/es2020/fanout';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import {PartialExcept, RedisClientCodecOpts} from '../types';
import {RedisClient, ReconnectingSocket} from '../node';
import {RedisClusterRouter} from './RedisClusterRouter';
import {RedisClusterNodeInfo} from './RedisClusterNodeInfo';

export interface RedisClusterOpts extends RedisClientCodecOpts {
/** Nodes to connect to to retrieve cluster configuration. */
seeds: RedisClusterNodeConfig[];
/** Shared config applied to all nodes. */
connectionConfig?: RedisClusterNodeConfig;
}

export interface RedisClusterNodeConfig {
/** Hostname or IP address of the Redis node. Defaults to 'localhost'. */
host?: string;
/** Port of the Redis node. Defaults to 6379. */
port?: number;
/** Username to use for authentication. */
user?: string;
/** Password to use for authentication. Auth is skipped if omitted. */
pwd?: string;
/** Whether to use TLS. Defaults to false. */
tls?: boolean;
/** TLS options. */
secureContext?: tls.SecureContextOptions;
}

export class RedisCluster {
protected readonly encoder: RespEncoder;
protected readonly decoder: RespStreamingDecoder;
protected readonly router = new RedisClusterRouter();
protected stopped = false;

/** Emitted on unexpected and asynchronous errors. */
public readonly onError = new FanOut<Error>();
/** Emitted each time router table is rebuilt. */
public readonly onRouter = new FanOut<void>();

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
this.decoder = opts.decoder ?? new RespStreamingDecoder();
}

public start(): void {
this.stopped = false;
this.buildInitialRouteTable();
}

public stop(): void {
this.stopped = true;
}

private initialTableBuildAttempt = 0;
private buildInitialRouteTable(seed: number = 0): void {
const attempt = this.initialTableBuildAttempt++;
(async () => {
if (this.stopped) return;
if (!this.router.isEmpty()) return;
const {seeds, connectionConfig} = this.opts;
seed = seed % seeds.length;
const client = await this.createClient({
...connectionConfig,
...seeds[seed],
});
if (this.stopped) return;
await this.router.rebuild(client);
if (this.stopped) return;
this.initialTableBuildAttempt = 0;
console.log(this.router);
this.onRouter.emit();
})().catch((error) => {
const delay = Math.max(Math.min(1000 * 2 ** attempt, 1000 * 60), 1000);
setTimeout(() => this.buildInitialRouteTable(seed + 1), delay);
this.onError.emit(error);
});
}

public async whenRouterReady(): Promise<void> {
if (!this.router.isEmpty()) return;
return new Promise((resolve) => {
const unsubscribe = this.onRouter.listen(() => {
unsubscribe();
resolve();
});
});
}

protected getAnyClient(): RedisClient {
const randomClient = this.router.getRandomClient();
if (!randomClient) throw new Error('NO_CLIENT');
return randomClient;
}

protected async getBestAvailableReadClient(slot: number): Promise<RedisClient> {
await this.whenRouterReady();
const router = this.router;
const info = router.getRandomNodeForSlot(slot);
if (!info) {
const client = router.getRandomClient();
if (!client) throw new Error('NO_CLIENT');
return client;
}
const client = router.getClient(info.id);
if (client) return client;
this.createReadClientForSlot(info);
return this.getAnyClient();
}

private async createReadClientForSlot(info: RedisClusterNodeInfo): Promise<RedisClient> {
const client = await this.createClient({
...this.opts.connectionConfig,
host: info.endpoint,
port: info.port,
});
this.router.setClient(info, client);
return client;
}

protected async createClient(config: RedisClusterNodeConfig): Promise<RedisClient> {
const client = this.createClientRaw(config);
client.start();
const {user, pwd} = config;
await client.hello(3, pwd, user);
return client;
}

protected createClientRaw(config: RedisClusterNodeConfig): RedisClient {
const {host = 'localhost', port = 6379} = config;
const client = new RedisClient({
socket: new ReconnectingSocket({
createSocket: config.tls
? () => tls.connect({
host,
port,
...config.secureContext,
})
: () => net.connect({
host,
port,
}),
}),
encoder: this.encoder,
decoder: this.decoder,
});
return client;
}
}
6 changes: 6 additions & 0 deletions src/cluster/RedisClusterNode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import type {RedisClient} from '../node';
import type {RedisClusterNodeInfo} from './RedisClusterNodeInfo';

export class RedisClusterNode {
constructor(public info: RedisClusterNodeInfo, public client: RedisClient) {}
}
25 changes: 25 additions & 0 deletions src/cluster/RedisClusterNodeInfo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {RedisClusterShardsResponseNode} from "../node/types";

export class RedisClusterNodeInfo {
public static from = (response: RedisClusterShardsResponseNode): RedisClusterNodeInfo => {
return new RedisClusterNodeInfo(
response.id + '',
Number(response.port),
response.ip + '',
response.endpoint + '',
(response.role + '') as 'master' | 'replica',
Number(response['replication-offset']),
(response.health + '') as 'online' | 'failed' | 'loading',
);
};

constructor(
public readonly id: string,
public readonly port: number,
public readonly ip: string,
public readonly endpoint: string,
public readonly role: 'master' | 'replica',
public readonly replicationOffset: number,
public readonly health: 'online' | 'failed' | 'loading',
) {}
}
100 changes: 100 additions & 0 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {AvlMap} from 'json-joy/es2020/util/trees/avl/AvlMap';
import {RedisClusterSlotRange} from './RedisClusterSlotRange';
import {RedisClusterNodeInfo} from './RedisClusterNodeInfo';
import type {RedisClient} from '../node';

export class RedisClusterRouter {
/** Map of slots ordered by slot end (max) value. */
protected readonly ranges = new AvlMap<number, RedisClusterSlotRange>();

/** Information about each node in the cluster, by node ID. */
protected readonly infos = new Map<string, RedisClusterNodeInfo>();

/** A sparse list of clients, by node ID. */
protected readonly clients = new Map<string, RedisClient>();

/** Whether the route table is empty. */
public isEmpty(): boolean {
return this.ranges.isEmpty();
}

/**
* Rebuild the router hash slot mapping.
* @param client Redis client to use to query the cluster.
*/
public async rebuild(client: RedisClient): Promise<void> {
const [id, slots] = await Promise.all([
client.clusterMyId(),
client.clusterShards(),
]);
this.ranges.clear();
this.infos.clear();
for (const slot of slots) {
const range = new RedisClusterSlotRange(slot.slots[0], slot.slots[1], []);
for (const node of slot.nodes) {
const info = RedisClusterNodeInfo.from(node);
this.infos.set(info.id, info);
range.nodes.push(info);
}
this.ranges.insert(range.max, range);
}
this.clients.forEach((client, id) => {
if (!this.infos.has(id)) {
client.stop();
this.clients.delete(id);
}
});
if (this.infos.has(id)) this.clients.set(id, client);
}

public setClient(info: RedisClusterNodeInfo, client: RedisClient): void {
if (!this.infos.has(info.id)) throw new Error('NO_SUCH_NODE');
this.clients.set(info.id, client);
}

public getNodesForSlot(slot: number): RedisClusterNodeInfo[] {
const range = this.ranges.getOrNextLower(slot);
if (!range) return [];
return range.v.nodes;
}

public getMasterForSlot(slot: number): RedisClusterNodeInfo | undefined {
const nodes = this.getNodesForSlot(slot);
if (!nodes) return undefined;
for (const node of nodes) if (node.role === 'master') return node;
return;
}

public getReplicasForSlot(slot: number): RedisClusterNodeInfo[] {
const nodes = this.getNodesForSlot(slot);
const replicas: RedisClusterNodeInfo[] = [];
for (const node of nodes) if (node.role === 'replica') replicas.push(node);
return replicas;
}

public getRandomReplicaForSlot(slot: number): RedisClusterNodeInfo | undefined {
const replicas = this.getReplicasForSlot(slot);
if (!replicas.length) return undefined;
return replicas[Math.floor(Math.random() * replicas.length)];
}

public getRandomNodeForSlot(slot: number): RedisClusterNodeInfo | undefined {
const nodes = this.getNodesForSlot(slot);
if (!nodes.length) return undefined;
return nodes[Math.floor(Math.random() * nodes.length)];
}

public getClient(id: string): RedisClient | undefined {
return this.clients.get(id);
}

public getRandomClient(): RedisClient | undefined {
const size = this.clients.size;
if (!size) return undefined;
const index = Math.floor(Math.random() * size);
let i = 0;
for (const client of this.clients.values())
if (i++ === index) return client;
return;
}
}
5 changes: 5 additions & 0 deletions src/cluster/RedisClusterSlotRange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {RedisClusterNodeInfo} from "./RedisClusterNodeInfo";

export class RedisClusterSlotRange {
constructor(public readonly min: number, public readonly max: number, public readonly nodes: RedisClusterNodeInfo[]) {}
}
32 changes: 32 additions & 0 deletions src/demo-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// npx ts-node src/demo-cluster.ts

import {RedisCluster} from "./cluster/RedisCluster";

const main = async () => {
const host = 'localhost';
const port = 7000;
const user = 'default';
const pwd = 'AoQhB7bNYljT8IiZ7nbgvSQSXiGHRwQX';

const client = new RedisCluster({
seeds: [
{
host,
port,
user,
pwd,
}
],
});

client.onError.listen((err) => {
console.error('onError', err);
});

client.start();
};

main().catch((err) => {
console.error(err);
process.exit(1);
});
Loading

0 comments on commit 2a0819b

Please sign in to comment.