Skip to content

Commit

Permalink
feat: 🎸 simplify auth and connection
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 17, 2023
1 parent 929f76e commit 9810441
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 57 deletions.
22 changes: 8 additions & 14 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {RedirectType} from './constants';
import {withTimeout} from '../util/timeout';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import {getSlotAny} from '../util/slots';
import {isMultiCmd} from '../util/commands';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {CmdOpts} from '../node';
import {isMultiCmd} from '../util/commands';

export interface RedisClusterOpts extends RedisClientCodecOpts {
/**
Expand Down Expand Up @@ -97,10 +97,10 @@ export class RedisCluster implements Printable {
const {seeds} = this.opts;
seed = seed % seeds.length;
const seedConfig = seeds[seed];
const [client, id] = await this.startClientFromConfig(seedConfig);
const client = await this.startClientFromConfig(seedConfig);
try {
if (this.stopped) return;
await this.router.rebuild(client, id);
await this.router.rebuild(client);
} finally {
// Discard the seed client as some clusters always return MOVED errors
// from seed clients.
Expand Down Expand Up @@ -180,7 +180,7 @@ export class RedisCluster implements Printable {
const config: RedisClusterNodeClientOpts = {host, port: node.port};
if (node.tls) config.tls = true;
try {
const [client] = await withTimeout(5000, this.startClientFromConfig(config));
const client = await withTimeout(5000, this.startClientFromConfig(config));
this.clients.set(node.id, client);
return client;
} catch (error) {
Expand All @@ -191,22 +191,16 @@ export class RedisCluster implements Printable {
}

/** When cluster client boots it creates nodes from seed configs. */
protected async startClientFromConfig(config: RedisClusterNodeClientOpts): Promise<[client: RedisClusterNodeClient, id: string]> {
protected async startClientFromConfig(config: RedisClusterNodeClientOpts): Promise<RedisClusterNodeClient> {
const conf = {
...this.opts.connectionConfig,
...config,
};
const client = this.createClient(conf);
try {
client.start();
const whenAuthenticated = new Promise<string>((resolve, reject) => {
const unsubscribe = client.onAuth.listen(([err, id]) => {
unsubscribe();
if (err) reject(err); else resolve(id!);
});
});
const id = await withTimeout(5000, whenAuthenticated);
return [client, id];
await withTimeout(5000, client.whenReady);
return client;
} catch (error) {
client.stop();
throw error;
Expand Down Expand Up @@ -252,7 +246,7 @@ export class RedisCluster implements Printable {
if (node) return await this.ensureNodeHasClient(node);
const seeds = this.opts.seeds;
const seed = seeds[Math.floor(Math.random() * seeds.length)];
const [client] = await this.startClientFromConfig(seed);
const client = await this.startClientFromConfig(seed);
return client;
}

Expand Down
2 changes: 0 additions & 2 deletions src/cluster/RedisClusterNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export class RedisClusterNode implements Printable {
public static fromNodeInfo = (
cluster: RedisCluster,
nodeInfo: RedisClusterShardsResponseNode,
extraHost: string = '',
): RedisClusterNode => {
const id = nodeInfo.id + '';
const port = Number(nodeInfo.port ? nodeInfo.port : nodeInfo['tls-port']);
Expand All @@ -25,7 +24,6 @@ export class RedisClusterNode implements Printable {
if (nodeInfo.endpoint && nodeInfo.endpoint !== '?') hosts.push(nodeInfo.endpoint + '');
if (nodeInfo.hostname && nodeInfo.hostname !== '?') hosts.push(nodeInfo.hostname + '');
if (nodeInfo.ip && nodeInfo.ip !== '?') hosts.push(nodeInfo.ip + '');
if (!hosts.length && extraHost) hosts.push(extraHost);
if (!hosts.length) throw new Error('NO_HOSTS');
const node = new RedisClusterNode(cluster, id, port, hosts, tls);
annotate(node, nodeInfo);
Expand Down
31 changes: 3 additions & 28 deletions src/cluster/RedisClusterNodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ import * as tls from 'tls';
import * as net from 'net';
import {ReconnectingSocket, RedisClient} from '../node';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import {FanOut} from 'thingies/es2020/fanout';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisClientCodecOpts} from '../types';
import type {RedisClusterShardsResponse} from './types';
import type {RedisMode} from '../node/types';

export interface RedisClusterNodeClientOpts {
/** Hostname or IP address of the Redis node. Defaults to 'localhost'. */
Expand Down Expand Up @@ -39,41 +37,18 @@ export class RedisClusterNodeClient extends RedisClient implements Printable {
port,
...opts.secureContext,
})
: () =>
net.connect({
host,
port,
}),
: () => net.connect({host, port}),
}),
user: opts.user,
pwd: opts.pwd,
encoder: codec.encoder,
decoder: codec.decoder,
});
this.host = host;
this.port = port;
this.socket.onReady.listen(() => {
Promise.allSettled([
this.hello(3, opts.pwd, opts.user),
this.clusterMyId(),
]).then(([hello, id]) => {
const myId: string = id.status === 'fulfilled' ? id.value : '';
if (hello.status === 'rejected') {
this.onAuth.emit([hello.reason]);
return;
}
const mode: RedisMode = hello.value.mode;
this.onAuth.emit([null, myId, mode]);
}).catch(err => {
this.onAuth.emit([err]);
});
});
}


// ------------------------------------------------------------------- Events

public readonly onAuth = new FanOut<[error: Error | null, id?: string, mode?: RedisMode]>();


// -------------------------------------------------------- Built-in commands

public async clusterMyId(): Promise<string> {
Expand Down
7 changes: 2 additions & 5 deletions src/cluster/RedisClusterRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class RedisClusterRouter implements Printable {
* Rebuild the router hash slot mapping.
* @param client Redis client to use to query the cluster.
*/
public async rebuild(client: RedisClusterNodeClient, id?: string): Promise<void> {
public async rebuild(client: RedisClusterNodeClient): Promise<void> {
if (!client) throw new Error('NO_CLIENT');
const slots = await client.clusterShards();
this.ranges.clear();
Expand All @@ -38,10 +38,7 @@ export class RedisClusterRouter implements Printable {
for (const slot of slots) {
const range = new RedisClusterSlotRange(slot.slots[0], slot.slots[1], []);
for (const nodeInfo of slot.nodes) {
const node =
id && nodeInfo.id === id
? RedisClusterNode.fromNodeInfo(this.cluster, nodeInfo)
: RedisClusterNode.fromNodeInfo(this.cluster, nodeInfo, client.host);
const node = RedisClusterNode.fromNodeInfo(this.cluster, nodeInfo);
this.setNode(node);
range.nodes.push(node);
}
Expand Down
30 changes: 22 additions & 8 deletions src/node/RedisClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Defer} from 'thingies/es2020/Defer';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import {FanOut} from 'thingies/es2020/fanout';
import {Defer} from 'thingies/es2020/Defer';
import {ReconnectingSocket} from './ReconnectingSocket';
import {RedisCall, callNoRes} from './RedisCall';
import {isMultiCmd} from '../util/commands';
Expand All @@ -9,11 +10,12 @@ import type {RedisHelloResponse} from './types';

export interface RedisClientOpts extends RedisClientCodecOpts {
socket: ReconnectingSocket;
user?: string;
pwd?: string;
}

export class RedisClient {
protected readonly socket: ReconnectingSocket;
protected protocol: 2 | 3 = 2;

constructor(opts: RedisClientOpts) {
const socket = (this.socket = opts.socket);
Expand All @@ -23,12 +25,26 @@ export class RedisClient {
decoder.push(data);
this.scheduleRead();
});
socket.onReady.listen(() => {
this.hello(3, opts.pwd, opts.user)
.then(() => {
this.__whenReady.resolve();
this.onReady.emit();
})
.catch((error) => {
this.__whenReady.reject(error);
this.onError.emit(error);
});
});
}


// ------------------------------------------------------------------- Events

public readonly onProtocolError = new Defer<Error>();
private readonly __whenReady = new Defer<void>();
public readonly whenReady = this.__whenReady.promise;
public readonly onReady = new FanOut<void>();
public readonly onError = new FanOut<Error | unknown>();


// ------------------------------------------------------------ Socket writes
Expand Down Expand Up @@ -66,7 +82,7 @@ export class RedisClient {
this.socket.write(buf);
requests.splice(0, length);
} catch (error) {
this.onProtocolError.reject(error);
this.onError.emit(error as Error);
this.socket.reconnect();
}
};
Expand Down Expand Up @@ -107,7 +123,7 @@ export class RedisClient {
}
if (i > 0) responses.splice(0, i);
} catch (error) {
this.onProtocolError.reject(error);
this.onError.emit(error as Error);
this.socket.reconnect();
}
};
Expand Down Expand Up @@ -160,9 +176,7 @@ export class RedisClient {
/** Authenticate and negotiate protocol version. */
public async hello(protocol: 2 | 3, pwd?: string, usr: string = ''): Promise<RedisHelloResponse> {
const args: Cmd = pwd ? ['HELLO', protocol, 'AUTH', usr, pwd] : ['HELLO', protocol];
const result = await this.call(new RedisCall(args)) as RedisHelloResponse;
this.protocol = protocol;
return result;
return await this.call(new RedisCall(args)) as RedisHelloResponse;
}
}

Expand Down

0 comments on commit 9810441

Please sign in to comment.