Skip to content

Commit

Permalink
Merge pull request #7 from streamich/server
Browse files Browse the repository at this point in the history
Server
  • Loading branch information
streamich committed Dec 18, 2023
2 parents 08491e5 + ba8fc81 commit a75bb34
Show file tree
Hide file tree
Showing 30 changed files with 607 additions and 77 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,16 @@ An implementation of Redis client in TypeScript.
- Very fast RESP3 message encoder and streaming decoder.
- Supports TLS connections.
- Supports all subscription types: `SUBSCRIBE`, `PSUBSCRIBE`, `SSUBSCRIBE`.


## Benchmarks

`redis-joy` performs substantially faster than `ioredis` and `redis` packages:

```
npx ts-node src/__bench__/GET.bench.ts
redis-joy x 320,967 ops/sec ±5.26% (79 runs sampled)
ioredis x 152,971 ops/sec ±6.76% (76 runs sampled)
redis x 221,573 ops/sec ±50.06% (53 runs sampled)
Fastest is redis-joy
```
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@
"thingies": "^1.15.0"
},
"devDependencies": {
"@types/benchmark": "^2.1.5",
"@types/jest": "^29.5.11",
"benchmark": "^2.1.4",
"cluster-key-slot": "^1.1.2",
"commands": "https://github.com/streamich/commands#4321d5d40473c48fadf49fd99662032eac9b855b",
"ioredis": "^5.3.2",
"jest": "^29.7.0",
"prettier": "^3.1.1",
"pretty-quick": "^3.1.3",
"redis": "^4.6.11",
"rimraf": "^5.0.5",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.2",
Expand Down
72 changes: 72 additions & 0 deletions src/__bench__/GET.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// npx ts-node src/__bench__/GET.bench.ts

/* tslint:disable no-console */

import {Suite} from 'benchmark';
import * as net from 'net';
import {StandaloneClient} from '../standalone';
import * as config from '../__tests__/config';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import {Redis} from 'ioredis';
import {createClient} from 'redis';
import {ReconnectingSocket} from '../util/ReconnectingSocket';

const host = config.standalone.host;
const port = config.standalone.port;
const client = new StandaloneClient({
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
}),
encoder: new RespEncoder(),
decoder: new RespStreamingDecoder(),
});
client.start();

const ioredis = new Redis({
host,
port,
});

const key = '{partition}:scope:this_is_a_key';
const value = 'b45165ed3cd437b9ffad02a2aad22a4ddc69162470e2622982889ce5826f6e3d';

const main = async () => {
const redis = await createClient()
.on('error', (err) => console.log('Redis Client Error', err))
.connect();

await client.cmd(['SET', 'a', 'b']);
await ioredis.set('a', 'b');
await redis.set('a', 'b');

const suite = new Suite();
suite
.add('redis-joy', async () => {
await client.cmd(['SET', key, value]);
const res = await client.cmd(['GET', key], {utf8Res: true});
if (res !== value) throw new Error('Unexpected response');
})
.add('ioredis', async () => {
await ioredis.set(key, value);
const res = await ioredis.get(key);
if (res !== value) throw new Error('Unexpected response');
})
.add('redis', async () => {
await redis.set(key, value);
const res = await redis.get(key);
if (res !== value) throw new Error('Unexpected response');
})
.on('cycle', (event: any) => {
console.log(String(event.target));
})
.on('complete', () => {
console.log('Fastest is ' + suite.filter('fastest').map('name'));
})
.run({async: true});
};

main().catch((err) => {
console.error(err);
process.exit(1);
});
4 changes: 4 additions & 0 deletions src/__tests__/commands/pubsub/PSUBSCRIBE.standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {setupStandalone} from '../../setupStandalone';
import * as PSUBSCRIBE from './PSUBSCRIBE';

PSUBSCRIBE.standalone(setupStandalone);
20 changes: 20 additions & 0 deletions src/__tests__/commands/pubsub/PSUBSCRIBE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,25 @@ export const standalone = (setup: StandaloneTestSetup) => {
await client.publish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
});

test('re-subscribes when socket re-connects', async () => {
const {client} = await setup();
const pattern = 'p?ttern_reconnect_' + Date.now();
const msgs: unknown[] = [];
client.psub(pattern, (recv) => {
msgs.push(recv);
});
client.publish(pattern, new Uint8Array([1, 2, 3]));
await until(() => msgs.length === 1);
expect(msgs[0]).toEqual([ascii(pattern), new Uint8Array([1, 2, 3])]);
(client as any).socket.socket.destroy();
await new Promise((resolve) => setTimeout(resolve, 10));
await until(() => client.isConnected());
await until(() => msgs.length === 1);
await new Promise((resolve) => setTimeout(resolve, 25));
client.publish(pattern, new Uint8Array([4]));
await until(() => msgs.length === 2);
expect(msgs[1]).toEqual([ascii(pattern), new Uint8Array([4])]);
});
});
};
4 changes: 4 additions & 0 deletions src/__tests__/commands/pubsub/SSUBSCRIBE.standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {setupStandalone} from '../../setupStandalone';
import * as SSUBSCRIBE from './SSUBSCRIBE';

SSUBSCRIBE.standalone(setupStandalone);
20 changes: 20 additions & 0 deletions src/__tests__/commands/pubsub/SSUBSCRIBE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,25 @@ export const standalone = (setup: StandaloneTestSetup) => {
await until(() => msgs.length === 2);
expect(msgs).toEqual([new Uint8Array([1]), new Uint8Array([1])]);
});

test('re-subscribes when socket re-connects', async () => {
const {client} = await setup();
const channel = 'shard_reconnect_' + Date.now();
const msgs: unknown[] = [];
client.ssub(channel, (recv) => {
msgs.push(recv);
});
client.spublish(channel, new Uint8Array([1, 2, 3]));
await until(() => msgs.length === 1);
expect(msgs[0]).toEqual(new Uint8Array([1, 2, 3]));
(client as any).socket.socket.destroy();
await new Promise((resolve) => setTimeout(resolve, 10));
await until(() => client.isConnected());
await until(() => msgs.length === 1);
await new Promise((resolve) => setTimeout(resolve, 25));
client.spublish(channel, new Uint8Array([4]));
await until(() => msgs.length === 2);
expect(msgs[1]).toEqual(new Uint8Array([4]));
});
});
};
4 changes: 4 additions & 0 deletions src/__tests__/commands/pubsub/SUBSCRIBE.standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {setupStandalone} from '../../setupStandalone';
import * as SUBSCRIBE from './SUBSCRIBE';

SUBSCRIBE.standalone(setupStandalone);
20 changes: 20 additions & 0 deletions src/__tests__/commands/pubsub/SUBSCRIBE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,25 @@ export const standalone = (setup: StandaloneTestSetup) => {
await until(() => msgs.length === 1);
expect(msgs).toEqual([utf8`a😱b`]);
});

test('re-subscribes when socket re-connects', async () => {
const {client} = await setup();
const channel = 'channel_reconnect_' + Date.now();
const msgs: unknown[] = [];
client.sub(channel, (recv) => {
msgs.push(recv);
});
client.publish(channel, new Uint8Array([1, 2, 3]));
await until(() => msgs.length === 1);
expect(msgs[0]).toEqual(new Uint8Array([1, 2, 3]));
(client as any).socket.socket.destroy();
await new Promise((resolve) => setTimeout(resolve, 10));
await until(() => client.isConnected());
await until(() => msgs.length === 1);
await new Promise((resolve) => setTimeout(resolve, 25));
client.publish(channel, new Uint8Array([4]));
await until(() => msgs.length === 2);
expect(msgs[1]).toEqual(new Uint8Array([4]));
});
});
};
5 changes: 5 additions & 0 deletions src/__tests__/commands/string/GET.standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {setupStandalone} from '../../setupStandalone';
import * as GET from './GET';

GET.run(setupStandalone);
GET.standalone(setupStandalone);
13 changes: 13 additions & 0 deletions src/__tests__/commands/string/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@ export const run = (setup: TestSetup) => {
});
});
};

export const standalone = (setup: TestSetup) => {
describe('GET', () => {
test('can get a key after disconnect', async () => {
const {client} = await setup();
const key = 'fetch_existing_key_disconnect_' + Date.now();
await client.cmd(['SET', key, '42']);
(client as any).socket.socket.destroy();
const res = await client.cmd(['GET', key], {utf8Res: true});
expect(res).toBe('42');
});
});
};
1 change: 1 addition & 0 deletions src/__tests__/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export const cluster = {
export const standalone = {
host: '127.0.0.1',
port: 6379,
// port: 9999,
};
35 changes: 35 additions & 0 deletions src/__tests__/setupStandalone.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import * as net from 'net';
import {StandaloneClient} from '../standalone/StandaloneClient';
import {StandaloneTestSetup} from './types';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import {ReconnectingSocket} from '../util/ReconnectingSocket';
import * as config from './config';

const host = config.standalone.host;
const port = config.standalone.port;
const clients: StandaloneClient[] = [];

export const setupStandalone: StandaloneTestSetup = async () => {
const client = new StandaloneClient({
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
}),
encoder: new RespEncoder(),
decoder: new RespStreamingDecoder(),
});
// client.onError.listen((err) => {
// console.error('onError', err);
// });
// client.onPush.listen((push) => {
// console.log(push);
// });
client.start();
clients.push(client);
await client.whenReady;
return {client};
};

afterAll(() => {
for (const client of clients) client.stop();
});
6 changes: 3 additions & 3 deletions src/__tests__/standalone-singleton.spec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import * as net from 'net';
import {RedisClient} from '../node/RedisClient';
import {StandaloneClient} from '../standalone/StandaloneClient';
import {ClusterTestSetup} from './types';
import * as commands from './commands';
import {ReconnectingSocket} from '../node';
import {ReconnectingSocket} from '../util/ReconnectingSocket';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import * as config from './config';

const host = config.standalone.host;
const port = config.standalone.port;
const client = new RedisClient({
const client = new StandaloneClient({
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
}),
Expand Down
38 changes: 2 additions & 36 deletions src/__tests__/standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,6 @@
import * as net from 'net';
import {RedisClient} from '../node/RedisClient';
import {StandaloneTestSetup} from './types';
import * as commands from './commands';
import {ReconnectingSocket} from '../node';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
import {RespStreamingDecoder} from 'json-joy/es2020/json-pack/resp/RespStreamingDecoder';
import * as config from './config';

const host = config.standalone.host;
const port = config.standalone.port;
const clients: RedisClient[] = [];

const setupCluster: StandaloneTestSetup = async () => {
const client = new RedisClient({
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
}),
encoder: new RespEncoder(),
decoder: new RespStreamingDecoder(),
});
// client.onError.listen((err) => {
// console.error('onError', err);
// });
// client.onPush.listen((push) => {
// console.log(push);
// });
client.start();
clients.push(client);
await client.whenReady;
return {client};
};
import {setupStandalone} from './setupStandalone';

describe('standalone (client per test)', () => {
commands.standalone(setupCluster);
});

afterAll(() => {
for (const client of clients) client.stop();
commands.standalone(setupStandalone);
});
17 changes: 14 additions & 3 deletions src/__tests__/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {RedisCluster} from '../cluster/RedisCluster';
import {RedisClient} from '../node';
import {StandaloneClient} from '../standalone';

export type TestSetup = ClusterTestSetup;
export type ClusterTestSetup = () => Promise<{
Expand All @@ -13,6 +13,17 @@ export type StandaloneTestSetup = () => Promise<{
client: StandaloneTestClient;
}>;
export type StandaloneTestClient = Pick<
RedisClient,
'cmd' | 'subscribe' | 'sub' | 'publish' | 'pub' | 'psubscribe' | 'psub' | 'ssubscribe' | 'ssub' | 'spub' | 'spublish'
StandaloneClient,
| 'cmd'
| 'subscribe'
| 'sub'
| 'publish'
| 'pub'
| 'psubscribe'
| 'psub'
| 'ssubscribe'
| 'ssub'
| 'spub'
| 'spublish'
| 'isConnected'
>;
2 changes: 1 addition & 1 deletion src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {printTree} from 'json-joy/es2020/util/print/printTree';
import {getSlotAny} from '../util/slots';
import {isMultiCmd} from '../util/commands';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {CmdOpts} from '../node';
import type {CmdOpts} from '../standalone';

export interface RedisClusterOpts extends RedisClientCodecOpts {
/**
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/RedisClusterCall.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {RedisCall} from '../node';
import {StandaloneCall} from '../standalone';
import {RedirectType} from './constants';
import type {RedisClusterNodeClient} from './RedisClusterNodeClient';

/**
* Represents a single Redis request/response command call.
*/
export class RedisClusterCall extends RedisCall {
export class RedisClusterCall extends StandaloneCall {
public static chain(call: RedisClusterCall, client: RedisClusterNodeClient): RedisClusterCall {
const next = new RedisClusterCall(call.args);
// next.prev = call;
Expand Down
Loading

0 comments on commit a75bb34

Please sign in to comment.