This repository has been archived by the owner on Nov 13, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.cs
385 lines (329 loc) · 11.2 KB
/
client.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
// Public: Create a new RedisClient instance which connects to the specified
// server.
//
// port - The integer port to connect to. (default: 6379)
// host - The string address to connect to. (default: "127.0.0.1")
//
// Returns the new instance.
function RedisClient(%port, %host)
{
// Fill in default values for empty arguments
if (%port $= "")
%port = 6379;
if (%host $= "")
%host = "127.0.0.1";
return new ScriptObject()
{
class = "RedisClient";
port = %port;
host = %host;
};
}
// Internal: Initialize a new RedisClient.
//
// Returns nothing.
function RedisClient::onAdd(%this)
{
// Create classes mapped to the client
%this.parser = RedisReplyParser(%this);
%this.socket = RedisClientSocket(%this);
%this.callbacks = Queue().ref();
%this.subscriptions = Map().ref();
// Start connecting on the next frame, so the user has a chance to replace
// member fields and possibly change the port/host using the attributes.
%this.socket.schedule(0, "connect");
}
// Internal: Deconstruct a RedisClient.
//
// Returns nothing.
function RedisClient::onRemove(%this)
{
// Don't delete, just unref. Something else might have a reference to them.
%this.subscriptions.unref();
%this.callbacks.unref();
// Should perhaps force a emergency disconnect here?
%this.socket.delete();
}
// Internal: Handle an incoming parsed reply from the RedisReplyParser
// instance.
//
// Returns nothing.
function RedisClient::onReply(%this, %reply)
{
%reply.ref();
// If there are no callbacks, either something is wrong or we're receiving
// subscription replies ...
if (%this.callbacks.empty())
{
// ... if that is the case, make sure we check them
if (%reply.item[0] $= "message" &&
%this.subscriptions.exists(%reply.item[1]))
{
%entries = %this.subscriptions.get(%reply.item[1]);
// Call every mapped callback
for (%i = 0; %i < %entries.size; %i++)
{
%entry = %entries.item[%i];
%func = %entries.item[%i].item[0];
%data = %entries.item[%i].item[1];
// It might just be an empty callback
if (isFunction(%func))
call(%func, %reply.item[2], %data, %this);
}
}
%reply.unref();
return;
}
// Pop it, don't drop it. If it's a multi we'll keep it there until the
// required number of replies is satisfied.
%entry = %this.callbacks.peek();
// Check for multi callbacks. These are special in that the third element is
// a positive number of replies required. Normal callbacks only have two
// elements.
if (%entry.item[2])
{
// For these, the final element is an Array used for accumulating the
// replies.
%entry.item[3].append(%reply);
// If the Array has enough elements, just pop it and run the callback code
// for normal callbacks! Simple ;)
if (%entry.item[3].size >= %entry.item[2])
{
%this.callbacks.pop();
}
else
{
// If we don't have enough replies yet, keep it in the Queue and do
// nothing else in particular.
%reply.unref();
return;
}
}
else
{
// Normal callback, just pop it anyway.
%this.callbacks.pop();
}
%func = %entry.item[0];
%data = %entry.item[1];
// Callbacks can refer to empty functions. This is to keep the order in the
// Queue in respect to the Redis server's replies.
if (isFunction(%func))
{
// If it's a multi callback, use the fourth element. Otherwise, just pass
// the reply data like normal.
call(%func, %entry.item[2] ? %entry.item[3] : %reply, %data, %this);
}
%reply.unref();
// Did the user request a disconnect? If there's no more callbacks, try now.
if (%this.socket.pendingDisconnect && %this.callbacks.empty())
%this.socket.disconnect();
}
// Public: Issue a new command to the Redis server using an Array of values.
// This is the best for commands that employ arbitrary input of some degree, as
// simply using `::commandString()` could open up for vulnerabilities similar
// to SQL injection.
//
// argv - An Array of strings to send to the server.
// func - An optional function to call once the command reply is
// received.
// data - A value which will be passed to *func*, in addition to
// the reply of the command.
// noAttachCallback - Don't add a callback to the queue. Use only for commands
// that do not trigger a straight-forward reply, like EXEC.
// Defaults to false.
//
// Returns 0 on success, 1 otherwise.
function RedisClient::command(%this, %argv, %func, %data, %noAttachCallback)
{
if (!isCollection(%argv))
return 1;
%argv.ref();
// Don't allow new commands if the user requested a disconnect.
if (%this.socket.pendingDisconnect)
{
%argv.unref();
return 1;
}
// Build the text payload to send to the Redis server. *argv* is serialized
// as a single RESP array, containing RESP bulk strings.
%payload = "*" @ %argv.size @ "\r\n";
for (%i = 0; %i < %argv.size; %i++)
{
%item = %argv.item[%i];
%payload = %payload @ "$" @ strLen(%item) @ "\r\n";
%payload = %payload @ %item @ "\r\n";
}
%argv.unref();
// The RedisClientSocket implementation will automatically queue sent data
// for when the socket connects and is ready to do so, so there's no need to
// check it here. The callback entry is appended regardless of state.
%this.socket.send(%payload);
if (!%noAttachCallback)
%this.callbacks.append(buildArray(%func, %data));
return 0;
}
// Public: Issue a new command to the Redis server using a plain string
// representation. The given string will be split into an Array by each word.
//
// argv - A string which will be split into individual words.
// func - An optional function to call once the command reply is
// received.
// data - A value which will be passed to *func*, in addition to
// the reply of the command.
// noAttachCallback - Don't add a callback to the queue. Use only for commands
// that do not trigger a straight-forward reply, like EXEC.
// Defaults to false.
//
// Returns 0 on success, 1 otherwise.
function RedisClient::commandString(%this, %str, %func, %data, %noAttachCallback)
{
return %this.command(split(%str, " "), %func, %data, %noAttachCallback);
}
// Public: Tell the Redis server to start sending messages published on the
// specified channels. If a valid callback function is passed, it will be
// called with each message received. No normal commands can be issued while
// subscribed.
//
// channels - Either an Array or a space-separated list of channel names to
// subscribe the callback function to.
// func - A function to call for each message received from the channels.
// data - A value which will be passed to *func* for each message, in
// addition to the value of the message.
//
// Returns nothing.
function RedisClient::subscribe(%this, %channels, %func, %data)
{
if (!isCollection(%channels))
%channels = split(%channels, " ");
// Just use `::command()` like normal to indicate the intent of subscribing.
%argv = buildArray("SUBSCRIBE");
%argv.concat(%channels.ref().unref());
%this.command(%argv);
for (%i = 0; %i < %channels.size; %i++)
{
// This assigns an empty Array to the value of the channel in the map if it
// was not already set. Finally sets *channel* to the resulting Array.
%channel = %this.subscriptions.setDefault(
%channels.item[%i],
Array().ref().unref()
);
// If there's no callback function, simply don't bother with it. The
// presence of the channel key as an Array is enough for the subscription
// to work in `::onReply()` regardless.
if (%func $= "")
continue;
// Remove all callback entries with the same function. This is both to
// allow changing the data used with the function, as well as to avoid
// duplicate callbacks.
for (%j = 0; %j < %channel.size; %i++)
{
%entry = %channel.item[%j];
if (%entry.item[0] $= %func)
{
%entry.pop(%j);
%j--;
}
}
// As all instances of the function have been removed, add the entry to the
// callback map unconditionally.
%channel.append(buildArray(%func, %data));
}
}
// Public: Remove an active subscription from the given set of channels. The
// Redis server will be told to stop sending messages from them, and all
// callback functions mapped to the channels will be removed.
//
// channels - Either an Array or a space-separated list of channel names to
// unsubscribe from.
//
// Returns nothing.
function RedisClient::unsubscribe(%this, %channels)
{
if (!isCollection(%channels))
%channels = split(%channels, " ");
for (%i = %this.channels.size - 1; %i >= 0; %i--)
{
// `::remove()` works even if the key doesn't exist.
%this.subscriptions.remove(%channels.item[%i]);
}
// TODO: The UNSUBSCRIBE command may actually send a reply for every channel
// that the client unsubscribed from. Will have to test further, as this only
// consumes a single reply.
%argv = buildArray("UNSUBSCRIBE");
%argv.concat(%channels.ref().unref());
%this.command(%argv);
}
// Public: Create a new MULTI command execution context.
//
// Returns the new RedisClientMulti instance.
function RedisClient::multi(%this)
{
return new ScriptObject()
{
class = "RedisClientMulti";
client = %this;
};
}
// Internal: Initialize a new RedisClientMulti instance.
//
// Returns nothing.
function RedisClientMulti::onAdd(%this)
{
%this.commands = Array().ref();
}
// Internal: Deconstruct a RedisClientMulti instance.
//
// Returns nothing.
function RedisClientMulti::onRemove(%this)
{
%this.commands.unref();
}
// Public: Send the queued up sequence of commands to the Redis server. The
// callback will be given an Array with the reply from each command, in order.
//
// func - An optional function to call with the results of the MULTI.
// data - A value which will be passed to *func* in addition to the results.
//
// Returns nothing.
function RedisClientMulti::exec(%this, %func, %data)
{
// This is where the magic happens
%this.client.command(buildArray("MULTI"));
// Issue each command that was queued
for (%i = 0; %i < %this.commands.size; %i++)
%this.client.command(%this.commands.item[%i]);
// Attach a multi callback for the exec
%this.client.callbacks.append(buildArray(
%func, %data, %this.commands.size, Array()
));
// Exec it without a callback
%this.client.command(buildArray("EXEC"), "", "", 1);
// That's it, folks — see you next time
%this.delete();
}
// Public: Add a command to the MULTI queue. See `RedisClient::command` for
// more detail.
//
// argv - See `RedisClient::command`.
//
// Returns 0 on success, 1 otherwise.
function RedisClientMulti::command(%this, %argv)
{
if (!isCollection(%argv))
return 1;
%argv.ref();
%this.commands.append(%argv);
%argv.unref();
return 0;
}
// Public: Add a command to the queue using a space-separated string. See
// `RedisClient::commandString` for more detail.
//
// argv - See `RedisClient::commandString`.
//
// Returns 0 on success, 1 otherwise.
function RedisClientMulti::commandString(%this, %str)
{
return %this.command(split(%str, " "));
}