/
pipeline_test.ts
143 lines (139 loc) Β· 3.28 KB
/
pipeline_test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import {
assertEquals,
assert,
} from "./vendor/https/deno.land/std/testing/asserts.ts";
import { connect } from "./redis.ts";
import { ErrorReplyError } from "./errors.ts";
const test = Deno.test;
const addr = {
hostname: "127.0.0.1",
port: 6379,
};
test({
name: "testPipeline",
fn: async function testPipeline() {
const redis = await connect(addr);
const pl = redis.pipeline();
await Promise.all([
pl.ping(),
pl.ping(),
pl.set("set1", "value1"),
pl.set("set2", "value2"),
pl.mget("set1", "set2"),
pl.del("set1"),
pl.del("set2"),
]);
const ret = await pl.flush();
assertEquals(ret, [
["status", "PONG"],
["status", "PONG"],
["status", "OK"],
["status", "OK"],
["array", ["value1", "value2"]],
["integer", 1],
["integer", 1],
]);
redis.close();
},
});
test({
name: "testTx",
fn: async function testTx() {
const redis = await connect(addr);
const tx1 = redis.tx();
const tx2 = redis.tx();
const tx3 = redis.tx();
await redis.del("key");
await Promise.all<any>([
tx1.get("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.get("key"),
//
tx2.get("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.get("key"),
//
tx3.get("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.get("key"),
]);
const rep1 = await tx1.flush();
const rep2 = await tx2.flush();
const rep3 = await tx3.flush();
assertEquals(
parseInt(rep1[4][1] as string),
parseInt(rep1[0][1] as string) + 3,
);
assertEquals(
parseInt(rep2[4][1] as string),
parseInt(rep2[0][1] as string) + 3,
);
assertEquals(
parseInt(rep3[4][1] as string),
parseInt(rep3[0][1] as string) + 3,
);
redis.close();
},
});
test({
name: "pipeline in concurrent",
async fn() {
{
const redis = await connect(addr);
const tx = redis.pipeline();
let promises: Promise<any>[] = [];
await redis.del("a", "b", "c");
for (const key of ["a", "b", "c"]) {
promises.push(tx.set(key, key));
}
promises.push(tx.flush());
for (const key of ["a", "b", "c"]) {
promises.push(tx.get(key));
}
promises.push(tx.flush());
const res = await Promise.all(promises);
assertEquals(res, [
"OK", // set(a)
"OK", // set(b)
"OK", // set(c)
[
["status", "OK"],
["status", "OK"],
["status", "OK"],
], // flush()
"OK", // get(a)
"OK", // get(b)
"OK", // get(c)
[
["bulk", "a"],
["bulk", "b"],
["bulk", "c"],
], // flush()
]);
redis.close();
}
},
});
test({
name: "error while pipeline",
async fn() {
const redis = await connect(addr);
const tx = redis.pipeline();
tx.set("a", "a");
tx.eval("var", 1, "k", "v");
tx.get("a");
const resp = await tx.flush();
assertEquals(resp.length, 3);
assertEquals(resp[0], ["status", "OK"]);
assertEquals(resp[1][0], "error");
assert(resp[1][1] instanceof ErrorReplyError);
assertEquals(resp[2], ["bulk", "a"]);
redis.close();
},
});