/
ConnectionPool.ts
150 lines (131 loc) 路 4.05 KB
/
ConnectionPool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import { EventEmitter } from "events";
import { sample, Debug, noop, defaults } from "../utils";
import { RedisOptions, getNodeKey, NodeKey, NodeRole } from "./util";
import Redis from "../Redis";
const debug = Debug("cluster:connectionPool");
type NODE_TYPE = "all" | "master" | "slave";
export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: { [key in NODE_TYPE]: { [key: string]: Redis } } = {
all: {},
master: {},
slave: {},
};
private specifiedOptions: { [key: string]: any } = {};
constructor(private redisOptions) {
super();
}
getNodes(role: NodeRole = "all"): Redis[] {
const nodes = this.nodes[role];
return Object.keys(nodes).map((key) => nodes[key]);
}
getInstanceByKey(key: NodeKey): Redis {
return this.nodes.all[key];
}
getSampleInstance(role: NodeRole): Redis {
const keys = Object.keys(this.nodes[role]);
const sampleKey = sample(keys);
return this.nodes[role][sampleKey];
}
/**
* Find or create a connection to the node
*/
findOrCreate(node: RedisOptions, readOnly = false): Redis {
const key = getNodeKey(node);
readOnly = Boolean(readOnly);
if (this.specifiedOptions[key]) {
Object.assign(node, this.specifiedOptions[key]);
} else {
this.specifiedOptions[key] = node;
}
let redis: Redis;
if (this.nodes.all[key]) {
redis = this.nodes.all[key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
if (readOnly) {
delete this.nodes.master[key];
this.nodes.slave[key] = redis;
} else {
delete this.nodes.slave[key];
this.nodes.master[key] = redis;
}
}
} else {
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
// instead, waiting for a `MOVED` error and
// fetch the slots again.
retryStrategy: null,
// Offline queue should be enabled so that
// we don't need to wait for the `ready` event
// before sending commands to the node.
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
this.redisOptions,
{ lazyConnect: true }
)
);
this.nodes.all[key] = redis;
this.nodes[readOnly ? "slave" : "master"][key] = redis;
redis.once("end", () => {
this.removeNode(key);
this.emit("-node", redis, key);
if (!Object.keys(this.nodes.all).length) {
this.emit("drain");
}
});
this.emit("+node", redis, key);
redis.on("error", function (error) {
this.emit("nodeError", error, key);
});
}
return redis;
}
/**
* Reset the pool with a set of nodes.
* The old node will be removed.
*/
reset(nodes: RedisOptions[]): void {
debug("Reset with %O", nodes);
const newNodes = {};
nodes.forEach((node) => {
const key = getNodeKey(node);
// Don't override the existing (master) node
// when the current one is slave.
if (!(node.readOnly && newNodes[key])) {
newNodes[key] = node;
}
});
Object.keys(this.nodes.all).forEach((key) => {
if (!newNodes[key]) {
debug("Disconnect %s because the node does not hold any slot", key);
this.nodes.all[key].disconnect();
this.removeNode(key);
}
});
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
}
/**
* Remove a node from the pool.
*/
private removeNode(key: string): void {
const { nodes } = this;
if (nodes.all[key]) {
debug("Remove %s from the pool", key);
delete nodes.all[key];
}
delete nodes.master[key];
delete nodes.slave[key];
}
}