Skip to content

Commit

Permalink
feat: 🎸 authenticate on every client reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 17, 2023
1 parent b7d6378 commit d030a02
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"tslib": "2"
},
"dependencies": {
"json-joy": "^11.22.0",
"json-joy": "^11.24.0",
"thingies": "^1.15.0"
},
"devDependencies": {
Expand Down
33 changes: 22 additions & 11 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class RedisCluster implements Printable {
this.decoder = opts.decoder ?? new RespStreamingDecoder();
}


// ------------------------------------------------------------------- Events

/** Emitted on unexpected and asynchronous errors. */
Expand All @@ -62,6 +63,7 @@ export class RedisCluster implements Printable {
/** Emitted each time router table is rebuilt. */
public readonly onRouter = new FanOut<void>();


// ---------------------------------------------------- Life cycle management

protected stopped: boolean = false;
Expand All @@ -81,6 +83,7 @@ export class RedisCluster implements Printable {
this.clients.forEach((client) => client.stop());
}


// ---------------------------------------------- Build initial routing table

private initialTableBuildAttempt = 0;
Expand Down Expand Up @@ -123,6 +126,7 @@ export class RedisCluster implements Printable {
});
}


// ----------------------------------------------------- Router table rebuild

private isRebuildingRouteTable: boolean = false;
Expand Down Expand Up @@ -159,6 +163,7 @@ export class RedisCluster implements Printable {
// await this.router.rebuild(client);
}


// ------------------------------------------------------ Client construction

protected async ensureNodeHasClient(node: RedisClusterNode): Promise<RedisClusterNodeClient> {
Expand Down Expand Up @@ -186,22 +191,26 @@ export class RedisCluster implements Printable {
}

/** When cluster client boots it creates nodes from seed configs. */
protected async startClientFromConfig(
config: RedisClusterNodeClientOpts,
loadId?: boolean,
): Promise<[client: RedisClusterNodeClient, id: string]> {
protected async startClientFromConfig(config: RedisClusterNodeClientOpts): Promise<[client: RedisClusterNodeClient, id: string]> {
const conf = {
...this.opts.connectionConfig,
...config,
};
const client = this.createClient(conf);
client.start();
const {user, pwd} = conf;
const [, id = ''] = await Promise.all([
await client.hello(3, pwd, user),
loadId ? client.clusterMyId() : Promise.resolve(),
]);
return [client, id];
try {
client.start();
const whenAuthenticated = new Promise<string>((resolve, reject) => {
const unsubscribe = client.onAuth.listen(([err, id]) => {
unsubscribe();
if (err) reject(err); else resolve(id!);
});
});
const id = await withTimeout(5000, whenAuthenticated);
return [client, id];
} catch (error) {
client.stop();
throw error;
}
}

protected createClient(conf: RedisClusterNodeClientOpts): RedisClusterNodeClient {
Expand All @@ -212,6 +221,7 @@ export class RedisCluster implements Printable {
return new RedisClusterNodeClient(conf, codec);
}


// ----------------------------------------------------------- Client picking

public getClient(nodeId: string): RedisClusterNodeClient | undefined {
Expand Down Expand Up @@ -314,6 +324,7 @@ export class RedisCluster implements Printable {
return await this.call(call);
}


// ---------------------------------------------------------------- Printable

public toString(tab?: string): string {
Expand Down
18 changes: 18 additions & 0 deletions src/cluster/RedisClusterNodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as tls from 'tls';
import * as net from 'net';
import {ReconnectingSocket, RedisClient} from '../node';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import {FanOut} from 'thingies/es2020/fanout';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisClientCodecOpts} from '../types';
import type {RedisClusterShardsResponse} from './types';
Expand Down Expand Up @@ -48,8 +49,24 @@ export class RedisClusterNodeClient extends RedisClient implements Printable {
});
this.host = host;
this.port = port;
this.socket.onReady.listen(() => {
Promise.all([
this.hello(3, opts.pwd, opts.user),
this.clusterMyId(),
]).then(([, id]) => {
this.onAuth.emit([null, id]);
}).catch(err => {
this.onAuth.emit([err]);
});
});
}


// ------------------------------------------------------------------- Events

public readonly onAuth = new FanOut<[error: Error | null, id?: string]>();


// -------------------------------------------------------- Built-in commands

public async clusterMyId(): Promise<string> {
Expand All @@ -67,6 +84,7 @@ export class RedisClusterNodeClient extends RedisClient implements Printable {
return this.cmd(['CLUSTER', 'SHARDS'], {utf8Res: true}) as Promise<RedisClusterShardsResponse>;
}


// ---------------------------------------------------------------- Printable

public toString(tab?: string): string {
Expand Down
1 change: 1 addition & 0 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export class RedisClusterRouter implements Printable {
return;
}


// ---------------------------------------------------------------- Printable

public toString(tab?: string): string {
Expand Down
4 changes: 4 additions & 0 deletions src/node/ReconnectingSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ export class ReconnectingSocket {
this.getSocket().destroySoon();
}

public reconnect(): void {
this.getSocket().destroy();
}

protected retry(): void {
if (this.retryTimer) return;
const retryTimeout = this.getRetryTimeout();
Expand Down
13 changes: 6 additions & 7 deletions src/node/RedisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import {ReconnectingSocket} from './ReconnectingSocket';
import {RedisCall, callNoRes} from './RedisCall';
import type {Cmd, MultiCmd, RedisClientCodecOpts} from '../types';
import {isMultiCmd} from '../util/commands';
import type {Cmd, MultiCmd, RedisClientCodecOpts} from '../types';

export interface RedisClientOpts extends RedisClientCodecOpts {
socket: ReconnectingSocket;
Expand Down Expand Up @@ -59,8 +59,8 @@ export class RedisClient {
this.socket.write(buf);
requests.splice(0, length);
} catch (error) {
// this.onProtocolError.reject(error);
// TODO: Re-establish socket ...
this.onProtocolError.reject(error);
this.socket.reconnect();
}
};

Expand Down Expand Up @@ -92,16 +92,15 @@ export class RedisClient {
if (msg instanceof Error) res.reject(msg);
else res.resolve(msg);
} else {
// TODO: Use skipping here...
decoder.tryUtf8 = false;
const msg = decoder.read();
const msg = decoder.skip();
if (msg === undefined) break;
}
}
if (i > 0) responses.splice(0, i);
} catch (error) {
// this.onProtocolError.reject(error);
// TODO: Re-establish socket ...
this.onProtocolError.reject(error);
this.socket.reconnect();
}
};

Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1860,10 +1860,10 @@ jsesc@^2.5.1:
resolved "https://registry.yarnpkg.com/jsesc/-/jsesc-2.5.2.tgz#80564d2e483dacf6e8ef209650a67df3f0c283a4"
integrity sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA==

json-joy@^11.22.0:
version "11.22.0"
resolved "https://registry.yarnpkg.com/json-joy/-/json-joy-11.22.0.tgz#c81975c0ca0faa51dc67d775bafcae5540c2f293"
integrity sha512-D/wssJX6aLqIf2CbY8KNYPgbdCPDH+QoPWgmYBOnWhork89fDPDGTDKTw9hwCb3qpaTi4vdsrGArXTZYUHWeVQ==
json-joy@^11.24.0:
version "11.24.0"
resolved "https://registry.yarnpkg.com/json-joy/-/json-joy-11.24.0.tgz#ad6bc46a3e0ef7cecefbaab5c5bc5a55827dca22"
integrity sha512-FhhtHGHwyviIqSPT1XddauVod0psjtN8AWJzJlTyhwA/t/I+ssiusFsS9C947Wm82EC9BbDwZ7zY6+ASihZF4A==
dependencies:
arg "^5.0.2"
hyperdyperid "^1.2.0"
Expand Down

0 comments on commit d030a02

Please sign in to comment.