diff --git a/README.md b/README.md index 8cd064b..4ec0b3d 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ # redis-joy -WIP +A modern fast implementation of Redis client in TypeScript. + +- Supports Redis 7+. +- Supports Redis cluster mode. +- RESP3 protocol support. +- Very fast RESP3 message parser and serializer. +- Supports TLS. diff --git a/src/cluster/RedisCluster.ts b/src/cluster/RedisCluster.ts index 5eb3e0a..2f9cc60 100644 --- a/src/cluster/RedisCluster.ts +++ b/src/cluster/RedisCluster.ts @@ -8,6 +8,7 @@ import {RedisClusterNodeInfo} from './RedisClusterNodeInfo'; import {RedisClusterCall} from './RedisClusterCall'; import {isMovedError, parseMovedError} from './errors'; import {RedisClusterNodeClient, RedisClusterNodeClientOpts} from './RedisClusterNodeClient'; +import {RedirectType} from './constants'; import type {CmdOpts} from '../node'; const calculateSlot = require('cluster-key-slot'); @@ -35,24 +36,33 @@ export interface RedisClusterOpts extends RedisClientCodecOpts { * be redirected to a different node. This option controls how many times * the command will be redirected before giving up. */ - maxRedirects?: number; + // This probably is not something that user should control? + // maxRedirects?: number; } export class RedisCluster { protected readonly encoder: RespEncoder; protected readonly decoder: RespStreamingDecoder; protected readonly router = new RedisClusterRouter(); - protected stopped = false; + + constructor(protected readonly opts: PartialExcept) { + this.encoder = opts.encoder ?? new RespEncoder(); + this.decoder = opts.decoder ?? new RespStreamingDecoder(); + } + + + // ---------------------------------------------------- Events /** Emitted on unexpected and asynchronous errors. */ public readonly onError = new FanOut(); + /** Emitted each time router table is rebuilt. */ public readonly onRouter = new FanOut(); - constructor(protected readonly opts: PartialExcept) { - this.encoder = opts.encoder ?? new RespEncoder(); - this.decoder = opts.decoder ?? new RespStreamingDecoder(); - } + + // ---------------------------------------------------- Life cycle management + + protected stopped: boolean = false; public start(): void { this.stopped = false; @@ -60,21 +70,28 @@ export class RedisCluster { } public stop(): void { + clearTimeout(this.initialTableBuildTimer); + this.initialTableBuildAttempt = 0; + clearTimeout(this.rebuildTimer); + this.isRebuildingRouteTable = false; + this.routeTableRebuildRetry = 0; this.stopped = true; } + + // ----------------------------------------------- Build initial router table + private initialTableBuildAttempt = 0; + private initialTableBuildTimer: NodeJS.Timeout | undefined = undefined; + private buildInitialRouteTable(seed: number = 0): void { const attempt = this.initialTableBuildAttempt++; (async () => { if (this.stopped) return; if (!this.router.isEmpty()) return; - const {seeds, connectionConfig} = this.opts; + const {seeds} = this.opts; seed = seed % seeds.length; - const client = await this.createClient({ - ...connectionConfig, - ...seeds[seed], - }); + const client = await this.createClientFromSeed(seeds[seed]); if (this.stopped) return; await this.router.rebuild(client); if (this.stopped) return; @@ -82,7 +99,7 @@ export class RedisCluster { this.onRouter.emit(); })().catch((error) => { const delay = Math.max(Math.min(1000 * 2 ** attempt, 1000 * 60), 1000); - setTimeout(() => this.buildInitialRouteTable(seed + 1), delay); + this.initialTableBuildTimer = setTimeout(() => this.buildInitialRouteTable(seed + 1), delay); this.onError.emit(error); }); } @@ -97,12 +114,60 @@ export class RedisCluster { }); } + + // ----------------------------------------------------- Router table rebuild + + private isRebuildingRouteTable: boolean = false; + private routeTableRebuildRetry: number = 0; + private rebuildTimer: NodeJS.Timeout | undefined = undefined; + + protected scheduleRoutingTableRebuild(): void { + if (this.isRebuildingRouteTable) return; + this.isRebuildingRouteTable = true; + this.routeTableRebuildRetry = 0; + const delay = Math.max(Math.min(1000 * 2 ** this.routeTableRebuildRetry, 1000 * 60), 1000); + this.rebuildTimer = setTimeout(() => { + this.rebuildTimer = undefined; + this.rebuildRoutingTable() + .then(() => { + if (this.stopped) return; + this.isRebuildingRouteTable = false; + this.routeTableRebuildRetry = 0; + this.onRouter.emit(); + }) + .catch((error) => { + if (this.stopped) return; + this.isRebuildingRouteTable = false; + this.routeTableRebuildRetry++; + this.onError.emit(error); + this.scheduleRoutingTableRebuild(); + }); + }, delay); + } + + private async rebuildRoutingTable(): Promise { + const client = await this.getAnyClientOrSeed(); + if (this.stopped) return; + await this.router.rebuild(client); + } + + + // -------------------------------------------------------- Client management + protected getAnyClient(): RedisClusterNodeClient { const randomClient = this.router.getRandomClient(); if (!randomClient) throw new Error('NO_CLIENT'); return randomClient; } + protected async getAnyClientOrSeed(): Promise { + const randomClient = this.router.getRandomClient(); + if (randomClient) return randomClient; + const seeds = this.opts.seeds; + const seed = seeds[Math.floor(Math.random() * seeds.length)]; + return this.createClientFromSeed(seed); + } + protected async getBestAvailableWriteClient(slot: number): Promise { await this.whenRouterReady(); const router = this.router; @@ -110,6 +175,7 @@ export class RedisCluster { if (!info) return this.getAnyClient(); const client = router.getClient(info.id); if (client) return client; + // TODO: construct the right client, as we will be redirected here anyways. this.createClientFromInfo(info); return this.getAnyClient(); } @@ -121,6 +187,7 @@ export class RedisCluster { if (!info) return this.getAnyClient(); const client = router.getClient(info.id); if (client) return client; + // TODO: construct the right client, as we will be redirected here anyways. this.createClientFromInfo(info); return this.getAnyClient(); } @@ -137,6 +204,13 @@ export class RedisCluster { }); } + protected createClientFromSeed(seed: RedisClusterNodeClientOpts): Promise { + return this.createClient({ + ...this.opts.connectionConfig, + ...seed, + }); + } + protected async createClient(config: RedisClusterNodeClientOpts, id?: string): Promise { const client = this.createClientRaw(config); client.start(); @@ -168,20 +242,23 @@ export class RedisCluster { return master ? this.getBestAvailableWriteClient(slot) : this.getBestAvailableReadClient(slot); } + + // -------------------------------------------------------- Command execution + protected async callWithClient(call: RedisClusterCall): Promise { const client = call.client!; try { return await client.call(call); } catch (error) { if (isMovedError(error)) { - // TODO: Schedule routing table rebuild. + this.scheduleRoutingTableRebuild(); const redirect = parseMovedError((error as Error).message); let host = redirect[0] || client.host; if (!host) throw new Error('NO_HOST'); const port = redirect[1]; if (port === client.port && host === client.host) throw new Error('SELF_REDIRECT'); const nextClient = await this.createClientForHost(host, port); - const next = RedisClusterCall.redirect(call, nextClient); + const next = RedisClusterCall.redirect(call, nextClient, RedirectType.MOVED); return await this.callWithClient(next); } // TODO: Handle ASK redirection. @@ -212,4 +289,4 @@ export class RedisCluster { } } -export type ClusterCmdOpts = CmdOpts & Partial>; \ No newline at end of file +export type ClusterCmdOpts = CmdOpts & Partial>; diff --git a/src/cluster/RedisClusterCall.ts b/src/cluster/RedisClusterCall.ts index e041364..13c7324 100644 --- a/src/cluster/RedisClusterCall.ts +++ b/src/cluster/RedisClusterCall.ts @@ -1,14 +1,16 @@ import {RedisCall} from "../node"; +import {RedirectType} from "./constants"; import type {RedisClusterNodeClient} from "./RedisClusterNodeClient"; /** * Represents a single Redis request/response command call. */ export class RedisClusterCall extends RedisCall { - public static redirect(call: RedisClusterCall, client: RedisClusterNodeClient): RedisClusterCall { + public static redirect(call: RedisClusterCall, client: RedisClusterNodeClient, type: RedirectType): RedisClusterCall { const next = new RedisClusterCall(call.args); - next.prev = call; + // next.prev = call; next.client = client; + next.redirect = type; next.redirects = call.redirects + 1; next.maxRedirects = call.maxRedirects; if (next.redirects > next.maxRedirects) throw new Error('MAX_REDIRECTS'); @@ -23,6 +25,11 @@ export class RedisClusterCall extends RedisCall { */ public key: string = ''; + /** + * Type of redirect of this call. + */ + public redirect: RedirectType = RedirectType.NONE; + /** * Number of redirects that have been performed for this command. */ @@ -41,5 +48,5 @@ export class RedisClusterCall extends RedisCall { /** * Previous call in the chain, in case the command was redirected. */ - public prev: RedisClusterCall | null = null; + // public prev: RedisClusterCall | null = null; } diff --git a/src/cluster/RedisClusterRouter.ts b/src/cluster/RedisClusterRouter.ts index 96495cf..73c1c62 100644 --- a/src/cluster/RedisClusterRouter.ts +++ b/src/cluster/RedisClusterRouter.ts @@ -24,6 +24,7 @@ export class RedisClusterRouter { */ public async rebuild(client: RedisClusterNodeClient): Promise { const [id, slots] = await Promise.all([ + // TODO: Remove need for knowing own ID. client.clusterMyId(), client.clusterShards(), ]); diff --git a/src/cluster/constants.ts b/src/cluster/constants.ts new file mode 100644 index 0000000..f790169 --- /dev/null +++ b/src/cluster/constants.ts @@ -0,0 +1,5 @@ +export const enum RedirectType { + NONE, + MOVED, + ASK, +}