Skip to content

Commit

Permalink
reafactor(core, websockets): context (#106)
Browse files Browse the repository at this point in the history
* feat(@integration): added example usage of WebSocket middleware (logger$)

* imp(core): naming correction: injector -> context

* feat(core): new context implementation

* style(@integration): WebSocketServerToken -> WsServerToken

* style(core, websockets): naming coorection: Injectable -> ContextReader

* style(core, websockets): naming correction: askContext -> reader

* fix(core): corrected the way how the dependencies inside the context are resolved

* fix(core, @integration): context: bind eagerly readers

* fix(core, websockets): improved the way how the server is bootsrapped + rermoved http server from the context container

* feat(core): createServer.run with predicate prameter
  • Loading branch information
JozefFlakus committed Feb 24, 2019
1 parent df5abff commit 1963b61
Show file tree
Hide file tree
Showing 41 changed files with 472 additions and 380 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -12,7 +12,7 @@
"pretest": "yarn lint",
"test": "yarn test:unit && yarn test:integration",
"test:unit": "SCOPE=unit jest --expand --coverage --detectOpenHandles",
"test:integration": "SCOPE=integration jest --expand",
"test:integration": "SCOPE=integration jest --expand --detectOpenHandles",
"test:watch": "jest --expand --onlyChanged --watch",
"clean": "lerna run clean",
"purge": "yarn clean && rimraf node_modules",
Expand Down
8 changes: 5 additions & 3 deletions packages/@integration/src/effects/api.effects.ts
Expand Up @@ -4,7 +4,7 @@ import { throwError } from 'rxjs';
import { map, mergeMap, tap } from 'rxjs/operators';
import { user$ } from './user.effects';
import { static$ } from './static.effects';
import { WebSocketServerToken } from '../tokens';
import { WsServerToken } from '../tokens';

const rootValiadtor$ = requestValidator$({
params: t.type({
Expand All @@ -18,11 +18,13 @@ const rootValiadtor$ = requestValidator$({
const root$ = r.pipe(
r.matchPath('/'),
r.matchType('GET'),
r.useEffect((req$, _, { inject }) => req$.pipe(
r.useEffect((req$, _, { ask }) => req$.pipe(
use(rootValiadtor$),
map(req => req.params.version),
map(version => `API version: ${version}`),
tap(message => inject(WebSocketServerToken).sendBroadcastResponse({ type: 'ROOT', payload: message })),
tap(message => ask(WsServerToken).map(server =>
server.sendBroadcastResponse({ type: 'ROOT', payload: message })),
),
map(message => ({ body: message })),
)));

Expand Down
2 changes: 1 addition & 1 deletion packages/@integration/src/effects/calculator.ws-effects.ts
Expand Up @@ -13,6 +13,6 @@ export const add$: WsEffect = (event$, ...args) =>
matchEvent('ADD'),
use(eventValidator$(t.number)),
buffer(sum$(event$, ...args)),
map(events => events.reduce((a, e) => e.payload! + a, 0)),
map(addEvents => addEvents.reduce((a, e) => e.payload + a, 0)),
map(payload => ({ type: 'SUM_RESULT', payload })),
);
2 changes: 1 addition & 1 deletion packages/@integration/src/http.listener.ts
Expand Up @@ -3,7 +3,7 @@ import { bodyParser$ } from '@marblejs/middleware-body';
import { loggerDev$, loggerFile$ } from './middlewares/logger.middleware';
import { api$ } from './effects/api.effects';

export const httpServer = httpListener({
export default httpListener({
middlewares: [
loggerDev$,
loggerFile$,
Expand Down
27 changes: 15 additions & 12 deletions packages/@integration/src/index.ts
@@ -1,36 +1,39 @@
import { createServer, matchEvent, ServerEvent, HttpServerEffect, bind } from '@marblejs/core';
import { createServer, matchEvent, ServerEvent, HttpServerEffect, bindTo } from '@marblejs/core';
import { mapToServer } from '@marblejs/websockets';
import { merge } from 'rxjs';
import { tap, map } from 'rxjs/operators';
import { httpServer } from './http.listener';
import { webSocketServer } from './ws.listener';
import { WebSocketServerToken } from './tokens';
import { WsServerToken } from './tokens';
import httpListener from './http.listener';
import webSocketListener from './ws.listener';

const upgrade$: HttpServerEffect = (event$, _, { inject }) =>
const upgrade$: HttpServerEffect = (event$, _, { ask }) =>
event$.pipe(
matchEvent(ServerEvent.upgrade),
mapToServer({
path: '/api/:version/ws',
server: inject(WebSocketServerToken),
server: ask(WsServerToken),
}),
);

const listen$: HttpServerEffect = event$ =>
const listening$: HttpServerEffect = event$ =>
event$.pipe(
matchEvent(ServerEvent.listen),
matchEvent(ServerEvent.listening),
map(event => event.payload),
tap(({ port, host }) => console.log(`Server running @ http://${host}:${port}/ 🚀`)),
);

export const server = createServer({
hostname: '127.0.0.1',
port: 1337,
httpListener: httpServer,
httpListener,
dependencies: [
bind(WebSocketServerToken).to(webSocketServer({ noServer: true })),
bindTo(WsServerToken)(webSocketListener().run),
],
event$: (...args) => merge(
listen$(...args),
listening$(...args),
upgrade$(...args),
),
});

server.run(
process.env.NODE_ENV !== 'test'
);
7 changes: 7 additions & 0 deletions packages/@integration/src/middlewares/logger.ws-middleware.ts
@@ -0,0 +1,7 @@
import { WsMiddlewareEffect } from '@marblejs/websockets';
import { tap } from 'rxjs/operators';

export const logger$: WsMiddlewareEffect = event$ =>
event$.pipe(
tap(e => console.log(`type: ${e.type}, payload: ${e.payload}`)),
);
4 changes: 2 additions & 2 deletions packages/@integration/src/tokens.ts
@@ -1,4 +1,4 @@
import { createInjectionToken } from '@marblejs/core';
import { createContextToken } from '@marblejs/core';
import { MarbleWebSocketServer } from '@marblejs/websockets';

export const WebSocketServerToken = createInjectionToken<MarbleWebSocketServer>();
export const WsServerToken = createContextToken<MarbleWebSocketServer>();
5 changes: 3 additions & 2 deletions packages/@integration/src/ws.listener.ts
Expand Up @@ -2,6 +2,7 @@ import { webSocketListener, WebSocketConnectionError, WsConnectionEffect } from
import { iif, throwError, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { add$ } from './effects/calculator.ws-effects';
import { logger$ } from './middlewares/logger.ws-middleware';

const connection$: WsConnectionEffect = req$ =>
req$.pipe(
Expand All @@ -12,8 +13,8 @@ const connection$: WsConnectionEffect = req$ =>
)),
);

export const webSocketServer = webSocketListener({
middlewares: [],
export default webSocketListener({
middlewares: [logger$],
effects: [add$],
connection$,
});
6 changes: 4 additions & 2 deletions packages/core/package.json
Expand Up @@ -36,12 +36,14 @@
},
"dependencies": {
"@types/qs": "^6.5.1",
"@types/uuid": "^3.4.4",
"chalk": "~2.4.1",
"file-type": "^8.0.0",
"fp-ts": "1.12.2",
"fp-ts": "1.14.1",
"mime": "^2.3.1",
"path-to-regexp": "^2.4.0",
"qs": "^6.6.0"
"qs": "^6.6.0",
"uuid": "^3.3.2"
},
"devDependencies": {
"@types/file-type": "^5.2.1",
Expand Down
57 changes: 57 additions & 0 deletions packages/core/src/context/context.factory.ts
@@ -0,0 +1,57 @@
import * as R from 'fp-ts/lib/Reader';
import * as M from 'fp-ts/lib/Map';
import { Setoid } from 'fp-ts/lib/Setoid';
import { Option } from 'fp-ts/lib/Option';
import { contramap, ordString, Ord } from 'fp-ts/lib/Ord';
import { ContextToken } from './context.token.factory';

const ordContextToken: Ord<ContextToken<any>> = contramap((t: ContextToken) => t._id, ordString);
const setoidContextToken: Setoid<ContextToken> = { equals: ordContextToken.equals };

export interface Context extends Map<ContextToken, R.Reader<any, any> | any> {}

export interface ContextProvider { <T>(token: ContextToken<T>): Option<T>; }

export interface ContextReader extends R.Reader<Context, any> {}

export interface ContextEagerReader { (ctx: Context): any; }

export type ContextDependency = ContextReader | ContextEagerReader;

export interface BoundDependency<T, U extends ContextDependency = ContextDependency> {
token: ContextToken<T>;
dependency: U;
}

const isReader = (x: any): x is R.Reader<any, any> => !!x.run;

export const createContext = () => M.empty;

export const register = <T>(boundDependency: BoundDependency<T, any>) => (context: Context) =>
M.insert(setoidContextToken)(
boundDependency.token,
isReader(boundDependency.dependency)
? boundDependency.dependency
: boundDependency.dependency(context),
context
);

export const registerAll = (boundDependencies: BoundDependency<any, any>[]) => (context: Context) =>
boundDependencies.reduce(
(ctx, dependency) => register(dependency)(ctx),
context,
);

export const lookup = (context: Context) => <T>(token: ContextToken<T>): Option<T> =>
M.lookup(ordContextToken)(token, context).map(dependency =>
isReader(dependency)
? dependency.run(context)
: dependency
);

export const bindTo =
<T>(token: ContextToken<T>) =>
<U extends ContextDependency>(dependency: U): BoundDependency<T, U> =>
({ token, dependency });

export const reader = R.ask<Context>().map(lookup);
9 changes: 9 additions & 0 deletions packages/core/src/context/context.token.factory.ts
@@ -0,0 +1,9 @@
import * as uuid from 'uuid';

export class ContextToken<T = any> {
_id = uuid();
_T!: T;
}

export const createContextToken = <T>() =>
new class extends ContextToken<T> {};
138 changes: 138 additions & 0 deletions packages/core/src/context/specs/context.factory.spec.ts
@@ -0,0 +1,138 @@
import { isEmpty, size } from 'fp-ts/lib/Map';
import { some } from 'fp-ts/lib/Option';
import { ask } from 'fp-ts/lib/Reader';
import {
Context,
ContextEagerReader,
createContext,
bindTo,
register,
registerAll,
reader,
lookup,
} from '../context.factory';
import { createContextToken } from '../context.token.factory';

describe('#bindTo', () => {
test('binds reader to token', () => {
// given
const context = createContext();
const reader = ask<Context>().map(_ => 'test');
const Token = createContextToken<typeof reader>();

// when
const boundDependency = bindTo(Token)(reader);

// then
expect(boundDependency.token).toBe(Token);
expect((boundDependency.dependency).run(context)).toEqual('test');
});

test('binds singleton to token', () => {
// given
const singleton: ContextEagerReader = _ => 'test';
const Token = createContextToken<typeof reader>();

// when
const boundDependency = bindTo(Token)(singleton);

// then
expect(boundDependency.token).toBe(Token);
expect(boundDependency.dependency).toEqual(singleton);
});
});

describe('#createContext', () => {
test('creates empty context', () => {
const context = createContext();
expect(isEmpty(context)).toBe(true);
});
});

describe('#register', () => {
test('registers bound readers', () => {
// given
const token = createContextToken();
const dependency = ask<Context>().map(_ => 'test');
const boundDependency = bindTo(token)(dependency);

// when
const context = register(boundDependency)(createContext());

// then
expect(lookup(context)(token)).toEqual(some('test'));
});
});

describe('#registerAll', () => {
test('registers set of bound readers', () => {
// given
const token1 = createContextToken<string>();
const token2 = createContextToken<string>();
const token3 = createContextToken<string>();
const dependency1 = ask<Context>().map(_ => 'test_1');
const dependency2 = ask<Context>().map(_ => 'test_2');
const dependency3 = ask<Context>().map(_ => 'test_3');

// when
const context = registerAll([
bindTo(token1)(dependency1),
bindTo(token2)(dependency2),
bindTo(token3)(dependency3),
])(createContext());

// then
expect(size(context)).toEqual(3);
expect(lookup(context)(token2)).toEqual(some('test_2'));
});
});

describe('#reader', () => {
test('asks context for a reader dependency', () => {
// given
const token1 = createContextToken<string>();
const token2 = createContextToken<string>();
const dependency1 = reader.map(() => 'test_1');
const dependency2 = reader
.map(ask => ask(token1)
.map(v => v + '_2')
.getOrElse(''));

// when ordered
const context1 = registerAll([
bindTo(token1)(dependency1),
bindTo(token2)(dependency2),
])(createContext());

// when reordered
const context2 = registerAll([
bindTo(token2)(dependency2),
bindTo(token1)(dependency1),
])(createContext());

// then
expect(lookup(context1)(token2)).toEqual(some('test_1_2'));
expect(lookup(context2)(token2)).toEqual(some('test_1_2'));
});

test('asks context for a eager reader dependency', () => {
// given
const token1 = createContextToken<string>();
const token2 = createContextToken<string>();
const token3 = createContextToken<string>();
const dependency1 = reader.map(() => 'test');
const dependency2 = reader.map(ask => ask(token1).map(v => v + '_1').getOrElse(''));
const dependency3 = reader.map(ask => ask(token2).map(v => v + '_2').getOrElse(''));

const context = registerAll([
bindTo(token1)(dependency1),
bindTo(token2)(ctx => dependency2.run(ctx)),
bindTo(token3)(dependency3),
])(createContext());

// then
expect(lookup(context)(token1)).toEqual(some('test'));
expect(lookup(context)(token2)).toEqual(some('test_1'));
expect(lookup(context)(token3)).toEqual(some('test_1_2'));
});
});
4 changes: 2 additions & 2 deletions packages/core/src/effects/effects.interface.ts
@@ -1,5 +1,5 @@
import { Observable, SchedulerLike } from 'rxjs';
import { InjectorGetter } from '../server/server.injector';
import { ContextProvider } from '../context/context.factory';

export interface EffectLike {
(input$: Observable<any>, ...args: any[]): Observable<any>;
Expand All @@ -10,7 +10,7 @@ export interface Effect<I, O, C, E extends Error = Error> {
}

export interface EffectMetadata<T extends Error = Error> {
inject: InjectorGetter;
ask: ContextProvider;
scheduler: SchedulerLike;
error?: T;
[key: string]: any;
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/effects/effectsMetadata.factory.ts
@@ -1,12 +1,12 @@
import { EffectMetadata } from './effects.interface';
import { AsyncScheduler } from 'rxjs/internal/scheduler/AsyncScheduler';
import { SchedulerLike } from 'rxjs';
import { InjectorGetter } from '../server/server.injector';
import { ContextProvider } from '../context/context.factory';

export const createEffectMetadata = <T extends SchedulerLike, U extends Error>(
metadata: { inject: InjectorGetter, scheduler?: T, error?: U; }
metadata: { ask: ContextProvider, scheduler?: T, error?: U; }
): EffectMetadata<U> => ({
inject: metadata.inject,
ask: metadata.ask,
scheduler: metadata.scheduler || AsyncScheduler as any,
error: metadata.error,
});

0 comments on commit 1963b61

Please sign in to comment.