-
Notifications
You must be signed in to change notification settings - Fork 71
/
websockets.server.ts
45 lines (40 loc) · 1.18 KB
/
websockets.server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import {
WsEffect,
WsMiddlewareEffect,
WsConnectionEffect,
WebSocketConnectionError,
webSocketListener,
} from '@marblejs/websockets';
import { iif, throwError, of } from 'rxjs';
import { tap, mergeMap, buffer, map } from 'rxjs/operators';
import { matchEvent, use } from '@marblejs/core';
import { eventValidator$, t } from '../../../middleware-io/dist';
const sum$: WsEffect = event$ =>
event$.pipe(
matchEvent('SUM')
);
const add$: WsEffect = (event$, ...args) =>
event$.pipe(
matchEvent('ADD'),
use(eventValidator$(t.number)),
buffer(sum$(event$, ...args)),
map(addEvents => addEvents.reduce((a, e) => e.payload + a, 0)),
map(payload => ({ type: 'SUM_RESULT', payload })),
);
const logger$: WsMiddlewareEffect = event$ =>
event$.pipe(
tap(e => console.log(`type: ${e.type}, payload: ${e.payload}`)),
);
const connection$: WsConnectionEffect = req$ =>
req$.pipe(
mergeMap(req => iif(
() => req.headers.upgrade !== 'websocket',
throwError(new WebSocketConnectionError('Unauthorized', 4000)),
of(req),
)),
);
export const websocketsServer = webSocketListener({
middlewares: [logger$],
effects: [add$],
connection$,
})();