-
Notifications
You must be signed in to change notification settings - Fork 73
/
messaging.server.redis.spec.ts
111 lines (93 loc) · 3.94 KB
/
messaging.server.redis.spec.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import { matchEvent, use } from '@marblejs/core';
import { eventValidator$, t } from '@marblejs/middleware-io';
import { map, tap } from 'rxjs/operators';
import { Transport } from '../../transport/transport.interface';
import { MsgEffect, MsgOutputEffect } from '../../effects/messaging.effects.interface';
import { RedisStrategyOptions } from '../../transport/strategies/redis.strategy.interface';
import { runClient, runServer, createMessage, wait } from '../../util/messaging.test.util';
const createOptions = (config: { channel?: string } = {}): RedisStrategyOptions => ({
host: 'redis://127.0.0.1:6379',
channel: config.channel || 'test_channel_server',
});
describe('messagingServer::Redis', () => {
beforeEach(() => {
jest.spyOn(console, 'error').mockImplementation(() => jest.fn());
});
test('starts a server and closes connection immediately', async () => {
const options = createOptions();
const client = await runClient(Transport.REDIS, options);
const server = await runServer(Transport.REDIS, options)();
await client.close();
await server.close();
});
test('handles RPC event', async () => {
const rpc$: MsgEffect = event$ =>
event$.pipe(
matchEvent('RPC_TEST'),
use(eventValidator$(t.number)),
map(event => event.payload),
map(payload => ({ type: 'RPC_TEST_RESULT', payload: payload + 1 })),
);
const options = createOptions();
const client = await runClient(Transport.REDIS, options);
const server = await runServer(Transport.REDIS, options)(rpc$);
const message = createMessage({ type: 'RPC_TEST', payload: 1 });
const result = await client.sendMessage(options.channel, message);
const parsedResult = JSON.parse(result.data.toString());
expect(parsedResult).toEqual({ type: 'RPC_TEST_RESULT', payload: 2 });
await server.close();
await client.close();
});
test('handles published event', async done => {
const event$: MsgEffect = event$ =>
event$.pipe(
matchEvent('EVENT_TEST'),
use(eventValidator$(t.number)),
map(event => event.payload),
map(payload => ({ type: 'EVENT_TEST_RESPONSE', payload: payload + 1 })),
tap(async event => {
expect(event).toEqual({ type: 'EVENT_TEST_RESPONSE', payload: 2 });
await wait();
await server.close();
await client.close();
done();
}),
);
const options = createOptions();
const client = await runClient(Transport.REDIS, options);
const server = await runServer(Transport.REDIS, options)(event$);
const message = createMessage({ type: 'EVENT_TEST', payload: 1 });
const emitResult = await client.emitMessage(options.channel, message);
expect(emitResult).toEqual(true);
});
test('throws an UnsupportedError for unsupported "ackMessage/nackMessage"', async done => {
const expectedEventType = 'UNHANDLED_ERROR';
const expectedEventError = {
name: 'UnsupportedError',
message: 'Unsupported operation. Method \"ackMessage\" is unsupported for Redis transport layer.',
};
const event$: MsgEffect = (event$, ctx) =>
event$.pipe(
matchEvent('EVENT_TEST'),
tap(event => ctx.client.ackMessage(event.raw)),
);
const output$: MsgOutputEffect = event$ =>
event$.pipe(
tap(async ({ event }) => {
expect(event.type).toEqual(expectedEventType);
expect(event.error).toEqual(expectedEventError);
await wait();
await server.close();
await client.close();
done();
}),
map(({ event }) => event),
);
const options = createOptions();
const client = await runClient(Transport.REDIS, options);
const server = await runServer(Transport.REDIS, options)(event$, output$);
const message = createMessage({ type: 'EVENT_TEST' });
const emitResult = await client.emitMessage(options.channel, message);
expect(emitResult).toEqual(true);
});
});