Skip to content

Commit

Permalink
feat: 🎸 implement routing table rebuild
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 14, 2023
1 parent cf99e59 commit 8321c33
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 19 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# redis-joy

WIP
A modern fast implementation of Redis client in TypeScript.

- Supports Redis 7+.
- Supports Redis cluster mode.
- RESP3 protocol support.
- Very fast RESP3 message parser and serializer.
- Supports TLS.
107 changes: 92 additions & 15 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {RedisClusterNodeInfo} from './RedisClusterNodeInfo';
import {RedisClusterCall} from './RedisClusterCall';
import {isMovedError, parseMovedError} from './errors';
import {RedisClusterNodeClient, RedisClusterNodeClientOpts} from './RedisClusterNodeClient';
import {RedirectType} from './constants';
import type {CmdOpts} from '../node';

const calculateSlot = require('cluster-key-slot');
Expand Down Expand Up @@ -35,54 +36,70 @@ export interface RedisClusterOpts extends RedisClientCodecOpts {
* be redirected to a different node. This option controls how many times
* the command will be redirected before giving up.
*/
maxRedirects?: number;
// This probably is not something that user should control?
// maxRedirects?: number;
}

export class RedisCluster {
protected readonly encoder: RespEncoder;
protected readonly decoder: RespStreamingDecoder;
protected readonly router = new RedisClusterRouter();
protected stopped = false;

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
this.decoder = opts.decoder ?? new RespStreamingDecoder();
}


// ---------------------------------------------------- Events

/** Emitted on unexpected and asynchronous errors. */
public readonly onError = new FanOut<Error>();

/** Emitted each time router table is rebuilt. */
public readonly onRouter = new FanOut<void>();

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
this.decoder = opts.decoder ?? new RespStreamingDecoder();
}

// ---------------------------------------------------- Life cycle management

protected stopped: boolean = false;

public start(): void {
this.stopped = false;
this.buildInitialRouteTable();
}

public stop(): void {
clearTimeout(this.initialTableBuildTimer);
this.initialTableBuildAttempt = 0;
clearTimeout(this.rebuildTimer);
this.isRebuildingRouteTable = false;
this.routeTableRebuildRetry = 0;
this.stopped = true;
}


// ----------------------------------------------- Build initial router table

private initialTableBuildAttempt = 0;
private initialTableBuildTimer: NodeJS.Timeout | undefined = undefined;

private buildInitialRouteTable(seed: number = 0): void {
const attempt = this.initialTableBuildAttempt++;
(async () => {
if (this.stopped) return;
if (!this.router.isEmpty()) return;
const {seeds, connectionConfig} = this.opts;
const {seeds} = this.opts;
seed = seed % seeds.length;
const client = await this.createClient({
...connectionConfig,
...seeds[seed],
});
const client = await this.createClientFromSeed(seeds[seed]);
if (this.stopped) return;
await this.router.rebuild(client);
if (this.stopped) return;
this.initialTableBuildAttempt = 0;
this.onRouter.emit();
})().catch((error) => {
const delay = Math.max(Math.min(1000 * 2 ** attempt, 1000 * 60), 1000);
setTimeout(() => this.buildInitialRouteTable(seed + 1), delay);
this.initialTableBuildTimer = setTimeout(() => this.buildInitialRouteTable(seed + 1), delay);
this.onError.emit(error);
});
}
Expand All @@ -97,19 +114,68 @@ export class RedisCluster {
});
}


// ----------------------------------------------------- Router table rebuild

private isRebuildingRouteTable: boolean = false;
private routeTableRebuildRetry: number = 0;
private rebuildTimer: NodeJS.Timeout | undefined = undefined;

protected scheduleRoutingTableRebuild(): void {
if (this.isRebuildingRouteTable) return;
this.isRebuildingRouteTable = true;
this.routeTableRebuildRetry = 0;
const delay = Math.max(Math.min(1000 * 2 ** this.routeTableRebuildRetry, 1000 * 60), 1000);
this.rebuildTimer = setTimeout(() => {
this.rebuildTimer = undefined;
this.rebuildRoutingTable()
.then(() => {
if (this.stopped) return;
this.isRebuildingRouteTable = false;
this.routeTableRebuildRetry = 0;
this.onRouter.emit();
})
.catch((error) => {
if (this.stopped) return;
this.isRebuildingRouteTable = false;
this.routeTableRebuildRetry++;
this.onError.emit(error);
this.scheduleRoutingTableRebuild();
});
}, delay);
}

private async rebuildRoutingTable(): Promise<void> {
const client = await this.getAnyClientOrSeed();
if (this.stopped) return;
await this.router.rebuild(client);
}


// -------------------------------------------------------- Client management

protected getAnyClient(): RedisClusterNodeClient {
const randomClient = this.router.getRandomClient();
if (!randomClient) throw new Error('NO_CLIENT');
return randomClient;
}

protected async getAnyClientOrSeed(): Promise<RedisClusterNodeClient> {
const randomClient = this.router.getRandomClient();
if (randomClient) return randomClient;
const seeds = this.opts.seeds;
const seed = seeds[Math.floor(Math.random() * seeds.length)];
return this.createClientFromSeed(seed);
}

protected async getBestAvailableWriteClient(slot: number): Promise<RedisClusterNodeClient> {
await this.whenRouterReady();
const router = this.router;
const info = router.getMasterForSlot(slot);
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
// TODO: construct the right client, as we will be redirected here anyways.
this.createClientFromInfo(info);
return this.getAnyClient();
}
Expand All @@ -121,6 +187,7 @@ export class RedisCluster {
if (!info) return this.getAnyClient();
const client = router.getClient(info.id);
if (client) return client;
// TODO: construct the right client, as we will be redirected here anyways.
this.createClientFromInfo(info);
return this.getAnyClient();
}
Expand All @@ -137,6 +204,13 @@ export class RedisCluster {
});
}

protected createClientFromSeed(seed: RedisClusterNodeClientOpts): Promise<RedisClusterNodeClient> {
return this.createClient({
...this.opts.connectionConfig,
...seed,
});
}

protected async createClient(config: RedisClusterNodeClientOpts, id?: string): Promise<RedisClusterNodeClient> {
const client = this.createClientRaw(config);
client.start();
Expand Down Expand Up @@ -168,20 +242,23 @@ export class RedisCluster {
return master ? this.getBestAvailableWriteClient(slot) : this.getBestAvailableReadClient(slot);
}


// -------------------------------------------------------- Command execution

protected async callWithClient(call: RedisClusterCall): Promise<unknown> {
const client = call.client!;
try {
return await client.call(call);
} catch (error) {
if (isMovedError(error)) {
// TODO: Schedule routing table rebuild.
this.scheduleRoutingTableRebuild();
const redirect = parseMovedError((error as Error).message);
let host = redirect[0] || client.host;
if (!host) throw new Error('NO_HOST');
const port = redirect[1];
if (port === client.port && host === client.host) throw new Error('SELF_REDIRECT');
const nextClient = await this.createClientForHost(host, port);
const next = RedisClusterCall.redirect(call, nextClient);
const next = RedisClusterCall.redirect(call, nextClient, RedirectType.MOVED);
return await this.callWithClient(next);
}
// TODO: Handle ASK redirection.
Expand Down Expand Up @@ -212,4 +289,4 @@ export class RedisCluster {
}
}

export type ClusterCmdOpts = CmdOpts & Partial<Pick<RedisClusterCall, 'key' | 'maxRedirects'>>;
export type ClusterCmdOpts = CmdOpts & Partial<Pick<RedisClusterCall, 'key' | 'maxRedirects'>>;
13 changes: 10 additions & 3 deletions src/cluster/RedisClusterCall.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import {RedisCall} from "../node";
import {RedirectType} from "./constants";
import type {RedisClusterNodeClient} from "./RedisClusterNodeClient";

/**
* Represents a single Redis request/response command call.
*/
export class RedisClusterCall extends RedisCall {
public static redirect(call: RedisClusterCall, client: RedisClusterNodeClient): RedisClusterCall {
public static redirect(call: RedisClusterCall, client: RedisClusterNodeClient, type: RedirectType): RedisClusterCall {
const next = new RedisClusterCall(call.args);
next.prev = call;
// next.prev = call;
next.client = client;
next.redirect = type;
next.redirects = call.redirects + 1;
next.maxRedirects = call.maxRedirects;
if (next.redirects > next.maxRedirects) throw new Error('MAX_REDIRECTS');
Expand All @@ -23,6 +25,11 @@ export class RedisClusterCall extends RedisCall {
*/
public key: string = '';

/**
* Type of redirect of this call.
*/
public redirect: RedirectType = RedirectType.NONE;

/**
* Number of redirects that have been performed for this command.
*/
Expand All @@ -41,5 +48,5 @@ export class RedisClusterCall extends RedisCall {
/**
* Previous call in the chain, in case the command was redirected.
*/
public prev: RedisClusterCall | null = null;
// public prev: RedisClusterCall | null = null;
}
1 change: 1 addition & 0 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class RedisClusterRouter {
*/
public async rebuild(client: RedisClusterNodeClient): Promise<void> {
const [id, slots] = await Promise.all([
// TODO: Remove need for knowing own ID.
client.clusterMyId(),
client.clusterShards(),
]);
Expand Down
5 changes: 5 additions & 0 deletions src/cluster/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const enum RedirectType {
NONE,
MOVED,
ASK,
}

0 comments on commit 8321c33

Please sign in to comment.