/
autoPipelining.ts
159 lines (137 loc) 路 4.09 KB
/
autoPipelining.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import * as PromiseContainer from "./promiseContainer";
import * as calculateSlot from "cluster-key-slot";
import asCallback from "standard-as-callback";
export const kExec = Symbol("exec");
export const kCallbacks = Symbol("callbacks");
export const notAllowedAutoPipelineCommands = [
"auth",
"info",
"script",
"quit",
"cluster",
"pipeline",
"multi",
"subscribe",
"psubscribe",
"unsubscribe",
"unpsubscribe",
];
function findAutoPipeline(
client,
_commandName,
...args: Array<string>
): string {
if (!client.isCluster) {
return "main";
}
// We have slot information, we can improve routing by grouping slots served by the same subset of nodes
return client.slots[calculateSlot(args[0])].join(",");
}
function executeAutoPipeline(client, slotKey: string) {
/*
If a pipeline is already executing, keep queueing up commands
since ioredis won't serve two pipelines at the same time
*/
if (client._runningAutoPipelines.has(slotKey)) {
return;
}
client._runningAutoPipelines.add(slotKey);
// Get the pipeline and immediately delete it so that new commands are queued on a new pipeline
const pipeline = client._autoPipelines.get(slotKey);
client._autoPipelines.delete(slotKey);
const callbacks = pipeline[kCallbacks];
// Perform the call
pipeline.exec(function (err, results) {
client._runningAutoPipelines.delete(slotKey);
/*
Invoke all callback in nextTick so the stack is cleared
and callbacks can throw errors without affecting other callbacks.
*/
if (err) {
for (let i = 0; i < callbacks.length; i++) {
process.nextTick(callbacks[i], err);
}
} else {
for (let i = 0; i < callbacks.length; i++) {
process.nextTick(callbacks[i], ...results[i]);
}
}
// If there is another pipeline on the same node, immediately execute it without waiting for nextTick
if (client._autoPipelines.has(slotKey)) {
executeAutoPipeline(client, slotKey);
}
});
}
export function shouldUseAutoPipelining(
client,
functionName: string,
commandName: string
): boolean {
return (
functionName &&
client.options.enableAutoPipelining &&
!client.isPipeline &&
!notAllowedAutoPipelineCommands.includes(commandName) &&
!client.options.autoPipeliningIgnoredCommands.includes(commandName)
);
}
export function executeWithAutoPipelining(
client,
functionName: string,
commandName: string,
args: string[],
callback
) {
const CustomPromise = PromiseContainer.get();
// On cluster mode let's wait for slots to be available
if (client.isCluster && !client.slots.length) {
return new CustomPromise(function (resolve, reject) {
client.delayUntilReady((err) => {
if (err) {
reject(err);
return;
}
executeWithAutoPipelining(
client,
functionName,
commandName,
args,
callback
).then(resolve, reject);
});
});
}
const slotKey = findAutoPipeline(client, commandName, ...args);
if (!client._autoPipelines.has(slotKey)) {
const pipeline = client.pipeline();
pipeline[kExec] = false;
pipeline[kCallbacks] = [];
client._autoPipelines.set(slotKey, pipeline);
}
const pipeline = client._autoPipelines.get(slotKey);
/*
Mark the pipeline as scheduled.
The symbol will make sure that the pipeline is only scheduled once per tick.
New commands are appended to an already scheduled pipeline.
*/
if (!pipeline[kExec]) {
pipeline[kExec] = true;
/*
Deferring with setImmediate so we have a chance to capture multiple
commands that can be scheduled by I/O events already in the event loop queue.
*/
setImmediate(executeAutoPipeline, client, slotKey);
}
// Create the promise which will execute the
const autoPipelinePromise = new CustomPromise(function (resolve, reject) {
pipeline[kCallbacks].push(function (err, value) {
if (err) {
reject(err);
return;
}
resolve(value);
});
pipeline[functionName](...args);
});
return asCallback(autoPipelinePromise, callback);
}