Skip to content

Commit

Permalink
fix: #23 Added mux system when sending commands (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
keroxp committed Oct 18, 2019
1 parent b18a527 commit 855270b
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 49 deletions.
3 changes: 2 additions & 1 deletion modules-lock.json
Expand Up @@ -2,10 +2,11 @@
"https://deno.land/std": {
"version": "@v0.20.0",
"modules": [
"/util/async.ts",
"/testing/mod.ts",
"/testing/asserts.ts",
"/io/bufio.ts",
"/fmt/colors.ts"
]
}
}
}
1 change: 1 addition & 0 deletions modules.json
Expand Up @@ -2,6 +2,7 @@
"https://deno.land/std": {
"version": "@v0.20.0",
"modules": [
"/util/async.ts",
"/testing/mod.ts",
"/testing/asserts.ts",
"/io/bufio.ts",
Expand Down
71 changes: 49 additions & 22 deletions pipeline.ts
@@ -1,7 +1,8 @@
import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts";
import { createRequest, readReply, RedisRawReply } from "./io.ts";
import { ErrorReplyError } from "./errors.ts";
import { create, Redis } from "./redis.ts";
import { create, muxExecutor, Redis } from "./redis.ts";
import { deferred, Deferred } from "./vendor/https/deno.land/std/util/async.ts";

const encoder = new TextEncoder();
export type RedisPipeline = {
Expand All @@ -14,36 +15,62 @@ export function createRedisPipeline(
reader: BufReader,
opts?: { tx: true }
): RedisPipeline {
let queue: string[] = [];
let commands: string[] = [];
let queue: {
commands: string[];
d: Deferred<RedisRawReply[]>;
}[] = [];

function dequeue() {
const [e] = queue;
if (!e) return;
exec(e.commands)
.then(e.d.resolve)
.catch(e.d.reject)
.finally(() => {
queue.shift();
dequeue();
});
}

async function exec(cmds: string[]) {
const msg = cmds.join("");
await writer.write(encoder.encode(msg));
await writer.flush();
const ret: RedisRawReply[] = [];
for (let i = 0; i < cmds.length; i++) {
try {
const rep = await readReply(reader);
ret.push(rep);
} catch (e) {
if (e.constructor === ErrorReplyError) {
ret.push(e);
} else {
throw e;
}
}
}
return ret;
}

const executor = {
enqueue(command: string, ...args) {
const msg = createRequest(command, ...args);
queue.push(msg);
commands.push(msg);
},
async flush() {
// wrap pipelined commands with MULTI/EXEC
if (opts && opts.tx) {
queue.splice(0, 0, createRequest("MULTI"));
queue.push(createRequest("EXEC"));
commands.splice(0, 0, createRequest("MULTI"));
commands.push(createRequest("EXEC"));
}
const msg = queue.join("");
await writer.write(encoder.encode(msg));
await writer.flush();
const ret: RedisRawReply[] = [];
for (let i = 0; i < queue.length; i++) {
try {
const rep = await readReply(reader);
ret.push(rep);
} catch (e) {
if (e.constructor === ErrorReplyError) {
ret.push(e);
} else {
throw e;
}
}
const d = deferred<RedisRawReply[]>();
queue.push({ commands, d });
if (queue.length === 1) {
dequeue();
}
queue = [];
return ret;
commands = [];
return d;
},
async execRawReply(
command: string,
Expand Down
30 changes: 29 additions & 1 deletion pipeline_test.ts
@@ -1,4 +1,4 @@
import { test } from "./vendor/https/deno.land/std/testing/mod.ts";
import { runIfMain, test } from "./vendor/https/deno.land/std/testing/mod.ts";
import { assertEquals } from "./vendor/https/deno.land/std/testing/asserts.ts";
import { connect } from "./redis.ts";

Expand Down Expand Up @@ -72,3 +72,31 @@ test(async function testTx() {
parseInt(rep3[0][1] as string) + 3
);
});

test("pipeline in concurrent", async () => {
const redis = await connect(addr);
const tx = redis.pipeline();
let promises: Promise<any>[] = [];
await redis.del("a", "b", "c");
for (const key of ["a", "b", "c"]) {
promises.push(tx.set(key, key));
}
promises.push(tx.flush());
for (const key of ["a", "b", "c"]) {
promises.push(tx.get(key));
}
promises.push(tx.flush());
const res = await Promise.all(promises);
assertEquals(res, [
"OK", //set(a)
"OK", //set(b)
"OK", //set(c)
[["status", "OK"], ["status", "OK"], ["status", "OK"]], //flush()
"OK", // get(a)
"OK", // get(b)
"OK", //get(c)
[["bulk", "a"], ["bulk", "b"], ["bulk", "c"]] //flush()
]);
});

runIfMain(import.meta);
40 changes: 38 additions & 2 deletions redis.ts
Expand Up @@ -9,6 +9,7 @@ import { ConnectionClosedError } from "./errors.ts";
import { psubscribe, RedisSubscription, subscribe } from "./pubsub.ts";
import { RedisRawReply, sendCommand } from "./io.ts";
import { createRedisPipeline, RedisPipeline } from "./pipeline.ts";
import { deferred, Deferred } from "./vendor/https/deno.land/std/util/async.ts";

export type Redis = {
// Connection
Expand Down Expand Up @@ -443,19 +444,54 @@ export interface CommandExecutor {
): Promise<RedisRawReply>;
}

class RedisImpl implements Redis, CommandExecutor {
export function muxExecutor(r: BufReader, w: BufWriter): CommandExecutor {
let queue: {
command: string;
args: (string | number)[];
d: Deferred<RedisRawReply>;
}[] = [];

function dequeue(): void {
const [e] = queue;
if (!e) return;
sendCommand(w, r, e.command, ...e.args)
.then(v => e.d.resolve(v))
.catch(err => e.d.reject(err))
.finally(() => {
queue.shift();
dequeue();
});
}

return {
async execRawReply(
command: string,
...args: (string | number)[]
): Promise<RedisRawReply> {
const d = deferred<RedisRawReply>();
queue.push({ command, args, d });
if (queue.length === 1) {
dequeue();
}
return d;
}
};
}

class RedisImpl implements Redis {
_isClosed = false;
get isClosed() {
return this._isClosed;
}

private executor: CommandExecutor;
constructor(
private closer: Closer,
private writer: BufWriter,
private reader: BufReader,
executor?: CommandExecutor
) {
this.executor = executor || this;
this.executor = executor || muxExecutor(reader, writer);
}

async execRawReply(
Expand Down
51 changes: 28 additions & 23 deletions redis_test.ts
@@ -1,5 +1,9 @@
import { connect } from "./redis.ts";
import { runIfMain, test } from "./vendor/https/deno.land/std/testing/mod.ts";
import { connect, Redis } from "./redis.ts";
import {
runIfMain,
setFilter,
test
} from "./vendor/https/deno.land/std/testing/mod.ts";
import {
assertEquals,
assertThrowsAsync
Expand All @@ -10,8 +14,9 @@ const addr = {
port: 6379
};

let redis:Redis;
test(async function beforeAll() {
const redis = await connect(addr);
redis = await connect(addr);
await redis.del(
"incr",
"incrby",
Expand All @@ -25,87 +30,87 @@ test(async function beforeAll() {
});

test(async function testExists() {
const redis = await connect(addr);
const none = await redis.exists("none", "none2");
assertEquals(none, 0);
await redis.set("exists", "aaa");
const exists = await redis.exists("exists", "none");
assertEquals(exists, 1);
redis.close();
});

test(async function testGetWhenNil() {
const redis = await connect(addr);
const hoge = await redis.get("none");
assertEquals(hoge, void 0);
redis.close();
});

test(async function testSet() {
const redis = await connect(addr);
const s = await redis.set("get", "fuga你好こんにちは");
assertEquals(s, "OK");
const fuga = await redis.get("get");
assertEquals(fuga, "fuga你好こんにちは");
redis.close();
});

test(async function testGetSet() {
const redis = await connect(addr);
await redis.set("getset", "val");
const v = await redis.getset("getset", "lav");
assertEquals(v, "val");
assertEquals(await redis.get("getset"), "lav");
redis.close();
});

test(async function testMget() {
const redis = await connect(addr);
await redis.set("mget1", "val1");
await redis.set("mget2", "val2");
await redis.set("mget3", "val3");
const v = await redis.mget("mget1", "mget2", "mget3");
assertEquals(v, ["val1", "val2", "val3"]);
redis.close();
});

test(async function testDel() {
const redis = await connect(addr);
let s = await redis.set("del1", "fuga");
assertEquals(s, "OK");
s = await redis.set("del2", "fugaaa");
assertEquals(s, "OK");
const deleted = await redis.del("del1", "del2");
assertEquals(deleted, 2);
redis.close();
});

test(async function testIncr() {
const redis = await connect(addr);
const rep = await redis.incr("incr");
assertEquals(rep, 1);
assertEquals(await redis.get("incr"), "1");
redis.close();
});

test(async function testIncrby() {
const redis = await connect(addr);
const rep = await redis.incrby("incrby", 101);
assertEquals(rep, 101);
assertEquals(await redis.get("incrby"), "101");
redis.close();
});

test(async function testDecr() {
const redis = await connect(addr);
const rep = await redis.decr("decr");
assertEquals(rep, -1);
assertEquals(await redis.get("decr"), "-1");
redis.close();
});

test(async function testDecrby() {
const redis = await connect(addr);
const rep = await redis.decrby("decryby", 101);
assertEquals(rep, -101);
assertEquals(await redis.get("decryby"), "-101");
redis.close();
});

test(async function testConcurrent() {
let promises: Promise<any>[] = [];
for (const key of ["a", "b", "c"]) {
promises.push(redis.set(key, key));
}
await Promise.all(promises);
promises = [];
for (const key of ["a", "b", "c"]) {
promises.push(redis.get(key));
}
const [a, b, c] = await Promise.all(promises);
assertEquals(a, "a");
assertEquals(b, "b");
assertEquals(c, "c");
});

[Infinity, NaN, "", "port"].forEach(v => {
Expand Down
1 change: 1 addition & 0 deletions vendor/https/deno.land/std/util/async.ts
@@ -0,0 +1 @@
export * from "https://deno.land/std@v0.20.0/util/async.ts";

0 comments on commit 855270b

Please sign in to comment.