/
ClusterSubscriber.ts
152 lines (138 loc) 路 4.5 KB
/
ClusterSubscriber.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import { EventEmitter } from "events";
import ConnectionPool from "./ConnectionPool";
import { getConnectionName, getNodeKey } from "./util";
import { sample, noop, Debug } from "../utils";
import Redis from "../Redis";
const debug = Debug("cluster:subscriber");
export default class ClusterSubscriber {
private started = false;
private subscriber: any = null;
private lastActiveSubscriber: any;
constructor(
private connectionPool: ConnectionPool,
private emitter: EventEmitter
) {
this.connectionPool.on("-node", (_, key: string) => {
if (!this.started || !this.subscriber) {
return;
}
if (getNodeKey(this.subscriber.options) === key) {
debug("subscriber has left, selecting a new one...");
this.selectSubscriber();
}
});
this.connectionPool.on("+node", () => {
if (!this.started || this.subscriber) {
return;
}
debug(
"a new node is discovered and there is no subscriber, selecting a new one..."
);
this.selectSubscriber();
});
}
getInstance(): any {
return this.subscriber;
}
start(): void {
this.started = true;
this.selectSubscriber();
debug("started");
}
stop(): void {
this.started = false;
if (this.subscriber) {
this.subscriber.disconnect();
this.subscriber = null;
}
debug("stopped");
}
private selectSubscriber() {
const lastActiveSubscriber = this.lastActiveSubscriber;
// Disconnect the previous subscriber even if there
// will not be a new one.
if (lastActiveSubscriber) {
lastActiveSubscriber.disconnect();
}
if (this.subscriber) {
this.subscriber.disconnect();
}
const sampleNode = sample(this.connectionPool.getNodes());
if (!sampleNode) {
debug(
"selecting subscriber failed since there is no node discovered in the cluster yet"
);
this.subscriber = null;
return;
}
const { options } = sampleNode;
debug("selected a subscriber %s:%s", options.host, options.port);
/*
* Create a specialized Redis connection for the subscription.
* Note that auto reconnection is enabled here.
*
* `enableReadyCheck` is also enabled because although subscription is allowed
* while redis is loading data from the disk, we can check if the password
* provided for the subscriber is correct, and if not, the current subscriber
* will be disconnected and a new subscriber will be selected.
*/
this.subscriber = new Redis({
port: options.port,
host: options.host,
username: options.username,
password: options.password,
enableReadyCheck: true,
connectionName: getConnectionName("subscriber", options.connectionName),
lazyConnect: true,
tls: options.tls,
});
// Ignore the errors since they're handled in the connection pool.
this.subscriber.on("error", noop);
// Re-subscribe previous channels
const previousChannels = { subscribe: [], psubscribe: [] };
if (lastActiveSubscriber) {
const condition =
lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
if (condition && condition.subscriber) {
previousChannels.subscribe = condition.subscriber.channels("subscribe");
previousChannels.psubscribe =
condition.subscriber.channels("psubscribe");
}
}
if (
previousChannels.subscribe.length ||
previousChannels.psubscribe.length
) {
let pending = 0;
for (const type of ["subscribe", "psubscribe"]) {
const channels = previousChannels[type];
if (channels.length) {
pending += 1;
debug("%s %d channels", type, channels.length);
this.subscriber[type](channels)
.then(() => {
if (!--pending) {
this.lastActiveSubscriber = this.subscriber;
}
})
.catch(() => {
// TODO: should probably disconnect the subscriber and try again.
debug("failed to %s %d channels", type, channels.length);
});
}
}
} else {
this.lastActiveSubscriber = this.subscriber;
}
for (const event of ["message", "messageBuffer"]) {
this.subscriber.on(event, (arg1, arg2) => {
this.emitter.emit(event, arg1, arg2);
});
}
for (const event of ["pmessage", "pmessageBuffer"]) {
this.subscriber.on(event, (arg1, arg2, arg3) => {
this.emitter.emit(event, arg1, arg2, arg3);
});
}
}
}