Skip to content

Commit

Permalink
add nodeAddressMap config for cluster (#1827)
Browse files Browse the repository at this point in the history
* add `nodeAddressMap` config for cluster

* Update cluster-slots.ts

* Update cluster-slots.ts

* update docs

Co-authored-by: Guy Royse <guy@guyroyse.com>

Co-authored-by: Guy Royse <guy@guyroyse.com>
  • Loading branch information
leibale and guyroyse committed Feb 14, 2022
1 parent 6dd15d9 commit 0803f4e
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 54 deletions.
23 changes: 23 additions & 0 deletions docs/clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,32 @@ import { createCluster } from 'redis';
| defaults | | The default configuration values for every client in the cluster. Use this for example when specifying an ACL user to connect with |
| useReplicas | `false` | When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes |
| maxCommandRedirections | `16` | The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors |
| nodeAddressMap | | Object defining the [node address mapping](#node-address-map) |
| modules | | Object defining which [Redis Modules](../README.md#modules) to include |
| scripts | | Object defining Lua Scripts to use with this client (see [Lua Scripts](../README.md#lua-scripts)) |

## Node Address Map

Your cluster might be configured to work within an internal network that your local environment doesn't have access to. For example, your development machine could only have access to external addresses, but the cluster returns its internal addresses. In this scenario, it's useful to provide a map from those internal addresses to the external ones.

The configuration for this is a simple mapping. Just provide a `nodeAddressMap` property mapping the internal addresses and ports to the external addresses and ports. Then, any address provided to `rootNodes` or returned from the cluster will be mapped accordingly:

```javascript
createCluster({
rootNodes: [{
url: '10.0.0.1:30001'
}, {
url: '10.0.0.2:30002'
}],
nodeAddressMap: {
'10.0.0.1:30001': 'external-host-1.io:30001',
'10.0.0.2:30002': 'external-host-2.io:30002'
}
});
```

> This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that.
## Command Routing

### Commands that operate on Redis Keys
Expand Down
80 changes: 51 additions & 29 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ export interface ClusterNode<M extends RedisModules, S extends RedisScripts> {
client: RedisClientType<M, S>;
}

interface NodeAddress {
host: string;
port: number;
}

export type NodeAddressMap = {
[address: string]: NodeAddress;
} | ((address: string) => NodeAddress | undefined);

interface SlotNodes<M extends RedisModules, S extends RedisScripts> {
master: ClusterNode<M, S>;
replicas: Array<ClusterNode<M, S>>;
Expand All @@ -26,7 +35,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
readonly #options: RedisClusterOptions<M, S>;
readonly #Client: InstantiableRedisClient<M, S>;
readonly #onError: OnError;
readonly #nodeByUrl = new Map<string, ClusterNode<M, S>>();
readonly #nodeByAddress = new Map<string, ClusterNode<M, S>>();
readonly #slots: Array<SlotNodes<M, S>> = [];

constructor(options: RedisClusterOptions<M, S>, onError: OnError) {
Expand All @@ -37,7 +46,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc

async connect(): Promise<void> {
for (const rootNode of this.#options.rootNodes) {
if (await this.#discoverNodes(this.#clientOptionsDefaults(rootNode))) return;
if (await this.#discoverNodes(rootNode)) return;
}

throw new RootNodesUnavailableError();
Expand Down Expand Up @@ -75,7 +84,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
async #rediscover(startWith: RedisClientType<M, S>): Promise<void> {
if (await this.#discoverNodes(startWith.options)) return;

for (const { client } of this.#nodeByUrl.values()) {
for (const { client } of this.#nodeByAddress.values()) {
if (client === startWith) continue;

if (await this.#discoverNodes(client.options)) return;
Expand All @@ -85,7 +94,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
}

async #reset(masters: Array<RedisClusterMasterNode>): Promise<void> {
// Override this.#slots and add not existing clients to this.#nodeByUrl
// Override this.#slots and add not existing clients to this.#nodeByAddress
const promises: Array<Promise<void>> = [],
clientsInUse = new Set<string>();
for (const master of masters) {
Expand All @@ -94,7 +103,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
replicas: this.#options.useReplicas ?
master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) :
[],
clientIterator: undefined // will be initiated in use
clientIterator: undefined // will be initiated in use
};

for (const { from, to } of master.slots) {
Expand All @@ -104,12 +113,12 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
}
}

// Remove unused clients from this.#nodeByUrl using clientsInUse
for (const [url, { client }] of this.#nodeByUrl.entries()) {
if (clientsInUse.has(url)) continue;
// Remove unused clients from this.#nodeByAddress using clientsInUse
for (const [address, { client }] of this.#nodeByAddress.entries()) {
if (clientsInUse.has(address)) continue;

promises.push(client.disconnect());
this.#nodeByUrl.delete(url);
this.#nodeByAddress.delete(address);
}

await Promise.all(promises);
Expand All @@ -118,38 +127,49 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
#clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined {
if (!this.#options.defaults) return options;

const merged = Object.assign({}, this.#options.defaults, options);

if (options?.socket && this.#options.defaults.socket) {
Object.assign({}, this.#options.defaults.socket, options.socket);
}

return merged;
return {
...this.#options.defaults,
...options,
socket: this.#options.defaults.socket && options?.socket ? {
...this.#options.defaults.socket,
...options.socket
} : this.#options.defaults.socket ?? options?.socket
};
}

#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, S> {
return new this.#Client(this.#clientOptionsDefaults(options))
.on('error', this.#onError);
}

#getNodeAddress(address: string): NodeAddress | undefined {
switch (typeof this.#options.nodeAddressMap) {
case 'object':
return this.#options.nodeAddressMap[address];

case 'function':
return this.#options.nodeAddressMap(address);
}
}

#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
const url = `${nodeData.host}:${nodeData.port}`;
clientsInUse.add(url);
const address = `${nodeData.host}:${nodeData.port}`;
clientsInUse.add(address);

let node = this.#nodeByUrl.get(url);
let node = this.#nodeByAddress.get(address);
if (!node) {
node = {
id: nodeData.id,
client: this.#initiateClient({
socket: {
socket: this.#getNodeAddress(address) ?? {
host: nodeData.host,
port: nodeData.port
},
readonly
})
};
promises.push(node.client.connect());
this.#nodeByUrl.set(url, node);
this.#nodeByAddress.set(address, node);
}

return node;
Expand Down Expand Up @@ -186,12 +206,12 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
#randomClientIterator?: IterableIterator<ClusterNode<M, S>>;

#getRandomClient(): RedisClientType<M, S> {
if (!this.#nodeByUrl.size) {
if (!this.#nodeByAddress.size) {
throw new Error('Cluster is not connected');
}

if (!this.#randomClientIterator) {
this.#randomClientIterator = this.#nodeByUrl.values();
this.#randomClientIterator = this.#nodeByAddress.values();
}

const {done, value} = this.#randomClientIterator.next();
Expand All @@ -218,8 +238,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc

getMasters(): Array<ClusterNode<M, S>> {
const masters = [];

for (const node of this.#nodeByUrl.values()) {
for (const node of this.#nodeByAddress.values()) {
if (node.client.options?.readonly) continue;

masters.push(node);
Expand All @@ -228,8 +247,11 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
return masters;
}

getNodeByUrl(url: string): ClusterNode<M, S> | undefined {
return this.#nodeByUrl.get(url);
getNodeByAddress(address: string): ClusterNode<M, S> | undefined {
const mappedAddress = this.#getNodeAddress(address);
return this.#nodeByAddress.get(
mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address
);
}

quit(): Promise<void> {
Expand All @@ -242,13 +264,13 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc

async #destroy(fn: (client: RedisClientType<M, S>) => Promise<unknown>): Promise<void> {
const promises = [];
for (const { client } of this.#nodeByUrl.values()) {
for (const { client } of this.#nodeByAddress.values()) {
promises.push(fn(client));
}

await Promise.all(promises);

this.#nodeByUrl.clear();
this.#nodeByAddress.clear();
this.#slots.splice(0);
}
}
11 changes: 6 additions & 5 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import { ClientCommandOptions, RedisClientCommandSignature, RedisClientOptions, RedisClientType, WithModules, WithScripts } from '../client';
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots';
import { extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, extendWithCommands } from '../commander';
import { EventEmitter } from 'events';
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
Expand All @@ -17,6 +17,7 @@ export interface RedisClusterOptions<
defaults?: Partial<RedisClusterClientOptions>;
useReplicas?: boolean;
maxCommandRedirections?: number;
nodeAddressMap?: NodeAddressMap;
}

type WithCommands = {
Expand Down Expand Up @@ -144,16 +145,16 @@ export default class RedisCluster<M extends RedisModules, S extends RedisScripts
}

if (err.message.startsWith('ASK')) {
const url = err.message.substring(err.message.lastIndexOf(' ') + 1);
if (this.#slots.getNodeByUrl(url)?.client === client) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
if (this.#slots.getNodeByAddress(address)?.client === client) {
await client.asking();
continue;
}

await this.#slots.rediscover(client);
const redirectTo = this.#slots.getNodeByUrl(url);
const redirectTo = this.#slots.getNodeByAddress(address);
if (!redirectTo) {
throw new Error(`Cannot find node ${url}`);
throw new Error(`Cannot find node ${address}`);
}

await redirectTo.client.asking();
Expand Down
12 changes: 6 additions & 6 deletions packages/client/lib/commands/CLUSTER_NODES.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('CLUSTER NODES', () => {
].join('\n')),
[{
id: 'master',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,
Expand All @@ -34,7 +34,7 @@ describe('CLUSTER NODES', () => {
}],
replicas: [{
id: 'slave',
url: '127.0.0.1:30002@31002',
address: '127.0.0.1:30002@31002',
host: '127.0.0.1',
port: 30002,
cport: 31002,
Expand All @@ -48,14 +48,14 @@ describe('CLUSTER NODES', () => {
);
});

it('should support urls without cport', () => {
it('should support addresses without cport', () => {
assert.deepEqual(
transformReply(
'id 127.0.0.1:30001 master - 0 0 0 connected 0-16384\n'
),
[{
id: 'id',
url: '127.0.0.1:30001',
address: '127.0.0.1:30001',
host: '127.0.0.1',
port: 30001,
cport: null,
Expand All @@ -80,7 +80,7 @@ describe('CLUSTER NODES', () => {
),
[{
id: 'id',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,
Expand All @@ -102,7 +102,7 @@ describe('CLUSTER NODES', () => {
),
[{
id: 'id',
url: '127.0.0.1:30001@31001',
address: '127.0.0.1:30001@31001',
host: '127.0.0.1',
port: 30001,
cport: 31001,
Expand Down
28 changes: 14 additions & 14 deletions packages/client/lib/commands/CLUSTER_NODES.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ export enum RedisClusterNodeLinkStates {
DISCONNECTED = 'disconnected'
}

interface RedisClusterNodeTransformedUrl {
interface RedisClusterNodeAddress {
host: string;
port: number;
cport: number | null;
}

export interface RedisClusterReplicaNode extends RedisClusterNodeTransformedUrl {
export interface RedisClusterReplicaNode extends RedisClusterNodeAddress {
id: string;
url: string;
address: string;
flags: Array<string>;
pingSent: number;
pongRecv: number;
Expand All @@ -39,11 +39,11 @@ export function transformReply(reply: string): Array<RedisClusterMasterNode> {
replicasMap = new Map<string, Array<RedisClusterReplicaNode>>();

for (const line of lines) {
const [id, url, flags, masterId, pingSent, pongRecv, configEpoch, linkState, ...slots] = line.split(' '),
const [id, address, flags, masterId, pingSent, pongRecv, configEpoch, linkState, ...slots] = line.split(' '),
node = {
id,
url,
...transformNodeUrl(url),
address,
...transformNodeAddress(address),
flags: flags.split(','),
pingSent: Number(pingSent),
pongRecv: Number(pongRecv),
Expand Down Expand Up @@ -84,22 +84,22 @@ export function transformReply(reply: string): Array<RedisClusterMasterNode> {
return [...mastersMap.values()];
}

function transformNodeUrl(url: string): RedisClusterNodeTransformedUrl {
const indexOfColon = url.indexOf(':'),
indexOfAt = url.indexOf('@', indexOfColon),
host = url.substring(0, indexOfColon);
function transformNodeAddress(address: string): RedisClusterNodeAddress {
const indexOfColon = address.indexOf(':'),
indexOfAt = address.indexOf('@', indexOfColon),
host = address.substring(0, indexOfColon);

if (indexOfAt === -1) {
return {
host,
port: Number(url.substring(indexOfColon + 1)),
port: Number(address.substring(indexOfColon + 1)),
cport: null
};
}

return {
host: url.substring(0, indexOfColon),
port: Number(url.substring(indexOfColon + 1, indexOfAt)),
cport: Number(url.substring(indexOfAt + 1))
host: address.substring(0, indexOfColon),
port: Number(address.substring(indexOfColon + 1, indexOfAt)),
cport: Number(address.substring(indexOfAt + 1))
};
}

0 comments on commit 0803f4e

Please sign in to comment.