-
Notifications
You must be signed in to change notification settings - Fork 0
/
mod.ts
172 lines (161 loc) · 4.27 KB
/
mod.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import { BufReader, writeAll } from "./deps.ts";
/** Redis command, which is an array of arguments. */
export type Command = (string | number)[];
/** Parsed Redis reply */
export type Reply = string | number | null | Reply[];
const CRLF = "\r\n";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
function removePrefix(line: string): string {
return line.slice(1);
}
async function readLine(bufReader: BufReader): Promise<string | null> {
const result = await bufReader.readLine();
return decoder.decode(result?.line) ?? null;
}
/**
* Transforms a command, which is an array of arguments, into an RESP request string.
*
* See {@link https://redis.io/docs/reference/protocol-spec/#send-commands-to-a-redis-server}
*/
function createRequest(command: Command): string {
let request = "*" + command.length + CRLF;
for (const arg of command) {
request += "$" + arg.toString().length + CRLF;
request += arg + CRLF;
}
return request;
}
async function writeRequest(
redisConn: Deno.Conn,
request: string,
): Promise<void> {
await writeAll(redisConn, encoder.encode(request));
}
/**
* Just writes a command to the Redis server.
*
* Example:
* ```ts
* await writeCommand(redisConn, ["SHUTDOWN"]);
* ```
*/
export async function writeCommand(
redisConn: Deno.Conn,
command: Command,
): Promise<void> {
await writeRequest(redisConn, createRequest(command));
}
/**
* Reads and processes the response line-by-line.
*
* See {@link https://redis.io/docs/reference/protocol-spec/#resp-protocol-description}
*/
async function readReply(bufReader: BufReader): Promise<Reply> {
const line = await readLine(bufReader);
if (line === null) {
return await Promise.reject("No response received from Redis server");
}
switch (line.charAt(0)) {
/** Simple string */
case "+":
return removePrefix(line);
/** Error */
case "-":
return await Promise.reject(removePrefix(line));
/** Integer */
case ":":
return Number(removePrefix(line));
/** Bulk string */
case "$":
return Number(removePrefix(line)) === -1
? null
: /** Skip to reading the next line, which is a string */
await readReply(bufReader);
/** Array */
case "*": {
const length = Number(removePrefix(line));
if (length === -1) {
return null;
}
const array: Reply[] = [];
for (let i = 0; i < length; i++) {
array.push(await readReply(bufReader));
}
return array;
}
/** No prefix */
default:
return line;
}
}
/**
* Sends a command to the Redis server and returns the parsed reply.
*
* Example:
* ```ts
* const redisConn = await Deno.connect({ port: 6379 });
*
* // Returns "OK"
* await sendCommand(redisConn, ["SET", "hello", "world"]);
*
* // Returns "world"
* await sendCommand(redisConn, ["GET", "hello"]);
* ```
*/
export async function sendCommand(
redisConn: Deno.Conn,
command: Command,
): Promise<Reply> {
await writeCommand(redisConn, command);
return await readReply(new BufReader(redisConn));
}
/**
* Pipelines commands to the Redis server and returns the parsed replies.
*
* Example:
* ```ts
* const redisConn = await Deno.connect({ port: 6379 });
*
* await pipelineCommands(redisConn, [
* ["INCR", "X"],
* ["INCR", "X"],
* ["INCR", "X"],
* ["INCR", "X"],
* ]); // Returns [1, 2, 3, 4]
* ```
*/
export async function pipelineCommands(
redisConn: Deno.Conn,
commands: Command[],
): Promise<Reply[]> {
const request = commands.map(createRequest).join("");
await writeRequest(redisConn, request);
const bufReader = new BufReader(redisConn);
const replies: Reply[] = [];
for (let i = 0; i < commands.length; i++) {
replies.push(await readReply(bufReader));
}
return replies;
}
/**
* Used for pub/sub. Listens for replies from the Redis server.
*
* Example:
* ```ts
* await writeCommand(redisConn, ["SUBSCRIBE", "mychannel"]);
*
* for await (const [_, channel, message] of listenReplies(redisConn)) {
* // Prints ["subscribe", "mychannel", 1];
* console.log(`${channel} said ${message}`);
* }
* ```
*/
export async function* listenReplies(
redisConn: Deno.Conn,
): AsyncIterable<Reply> {
const bufReader = new BufReader(redisConn);
while (true) {
yield await readReply(bufReader);
}
}