Skip to content

Commit

Permalink
test(sse): Test all the things
Browse files Browse the repository at this point in the history
  • Loading branch information
soyuka committed May 29, 2020
1 parent aa36e8a commit 7bc3e00
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 22 deletions.
20 changes: 19 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Expand Up @@ -66,8 +66,8 @@
"reflect-metadata": "0.1.13",
"rxjs": "6.5.5",
"socket.io": "2.3.0",
"uuid": "8.0.0",
"tslib": "1.11.2"
"tslib": "1.11.2",
"uuid": "8.0.0"
},
"devDependencies": {
"@codechecks/client": "0.1.10",
Expand Down Expand Up @@ -117,6 +117,7 @@
"eslint": "7.0.0",
"eslint-config-prettier": "6.11.0",
"eslint-plugin-import": "2.20.2",
"eventsource": "^1.0.7",
"fancy-log": "1.3.3",
"fastify": "2.14.1",
"fastify-cors": "3.0.3",
Expand Down
1 change: 0 additions & 1 deletion packages/common/http/index.ts
@@ -1,4 +1,3 @@
export * from './http.module';
export * from './http.service';
export * from './interfaces';
export { SseStream } from './sse-stream.service';
14 changes: 12 additions & 2 deletions packages/common/test/decorators/sse.decorator.spec.ts
@@ -1,6 +1,12 @@
import { expect } from 'chai';
import { Sse } from '../../decorators/http/sse.decorator';
import { HTTP_CODE_METADATA, SSE_METADATA } from '../../constants';
import {
HTTP_CODE_METADATA,
SSE_METADATA,
PATH_METADATA,
METHOD_METADATA,
} from '../../constants';
import { RequestMethod } from '../../enums/request-method.enum';

describe('@Sse', () => {
const prefix = '/prefix';
Expand All @@ -10,7 +16,11 @@ describe('@Sse', () => {
}

it('should enhance method with expected http status code', () => {
const path = Reflect.getMetadata(PATH_METADATA, Test.test);
expect(path).to.be.eql('/prefix');
const method = Reflect.getMetadata(METHOD_METADATA, Test.test);
expect(method).to.be.eql(RequestMethod.GET);
const metadata = Reflect.getMetadata(SSE_METADATA, Test.test);
expect(metadata).to.be.eql(prefix);
expect(metadata).to.be.eql(true);
});
});
12 changes: 7 additions & 5 deletions packages/core/helpers/handler-metadata-storage.ts
Expand Up @@ -6,6 +6,12 @@ import { ParamProperties } from './context-utils';

export const HANDLER_METADATA_SYMBOL = Symbol.for('handler_metadata:cache');

export type HandleResponseFn = <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;

export interface HandlerMetadata {
argsLength: number;
paramtypes: any[];
Expand All @@ -17,11 +23,7 @@ export interface HandlerMetadata {
contextId?: ContextId,
inquirerId?: string,
) => (ParamProperties & { metatype?: any })[];
fnHandleResponse: <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;
fnHandleResponse: HandleResponseFn;
}

export class HandlerMetadataStorage<
Expand Down
5 changes: 3 additions & 2 deletions packages/core/router/router-execution-context.ts
Expand Up @@ -27,6 +27,7 @@ import { ExecutionContextHost } from '../helpers/execution-context-host';
import {
HandlerMetadata,
HandlerMetadataStorage,
HandleResponseFn,
} from '../helpers/handler-metadata-storage';
import { STATIC_CONTEXT } from '../injector/constants';
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
Expand Down Expand Up @@ -403,7 +404,7 @@ export class RouterExecutionContext {
isResponseHandled: boolean,
redirectResponse?: RedirectResponse,
httpStatusCode?: number,
) {
): HandleResponseFn {
const renderTemplate = this.reflectRenderTemplate(callback);
if (renderTemplate) {
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
Expand All @@ -420,7 +421,7 @@ export class RouterExecutionContext {
return async <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req: TRequest,
req?: TRequest,
) => {
await this.responseController.sse(result, res, req);
};
Expand Down
14 changes: 8 additions & 6 deletions packages/core/router/router-response-controller.ts
Expand Up @@ -2,11 +2,12 @@ import {
HttpServer,
HttpStatus,
RequestMethod,
SseStream,
MessageEvent,
} from '@nestjs/common';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { SseStream, HeaderStream } from '../services';
import { IncomingMessage, ServerResponse } from 'http';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -86,10 +87,10 @@ export class RouterResponseController {
this.applicationRef.status(response, statusCode);
}

public async sse<TInput = unknown, TResponse = unknown, TRequest = unknown>(
result: any,
response: any,
request: any,
public async sse<TInput = unknown>(
result: Observable<TInput>,
response: ServerResponse,
request: IncomingMessage,
) {
if (!isFunction(result.subscribe)) {
throw new ReferenceError(
Expand All @@ -100,7 +101,8 @@ export class RouterResponseController {
const stream = new SseStream(request);
stream.pipe(response);

const subscription = result.subscribe((message: MessageEvent) => {
const subscription = result.subscribe((message: any) => {
if (typeof message !== 'object') message = { data: message };
stream.writeMessage(message);
});

Expand Down
1 change: 1 addition & 0 deletions packages/core/services/index.ts
@@ -1 +1,2 @@
export * from './reflector.service';
export * from './sse-stream.service';
@@ -1,6 +1,6 @@
import { Transform } from 'stream';
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
import { MessageEvent } from '../interfaces';
import { MessageEvent } from '@nestjs/common/interfaces';

function toDataString(data: string | object): string {
if (typeof data === 'object') return toDataString(JSON.stringify(data));
Expand All @@ -11,7 +11,12 @@ function toDataString(data: string | object): string {
}

interface WriteHeaders {
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders;
writeHead?(
statusCode: number,
reasonPhrase?: string,
headers?: OutgoingHttpHeaders,
): void;
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
flushHeaders?(): void;
}

Expand All @@ -35,7 +40,7 @@ export class SseStream extends Transform {

constructor(req?: IncomingMessage) {
super({ objectMode: true });
if (req) {
if (req && req.socket) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
Expand Down
53 changes: 53 additions & 0 deletions packages/core/test/router/router-execution-context.spec.ts
@@ -1,5 +1,6 @@
import { ForbiddenException } from '@nestjs/common/exceptions/forbidden.exception';
import { expect } from 'chai';
import { of } from 'rxjs';
import * as sinon from 'sinon';
import { HttpException, HttpStatus, RouteParamMetadata } from '../../../common';
import { CUSTOM_ROUTE_AGRS_METADATA } from '../../../common/constants';
Expand All @@ -17,6 +18,7 @@ import { PipesContextCreator } from '../../pipes/pipes-context-creator';
import { RouteParamsFactory } from '../../router/route-params-factory';
import { RouterExecutionContext } from '../../router/router-execution-context';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
import { PassThrough } from 'stream';

describe('RouterExecutionContext', () => {
let contextCreator: RouterExecutionContext;
Expand Down Expand Up @@ -326,6 +328,7 @@ describe('RouterExecutionContext', () => {

sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand Down Expand Up @@ -377,6 +380,7 @@ describe('RouterExecutionContext', () => {

sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand All @@ -396,6 +400,7 @@ describe('RouterExecutionContext', () => {
const response = {};

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand All @@ -414,5 +419,53 @@ describe('RouterExecutionContext', () => {
).to.be.true;
});
});

describe('when "isSse" is enabled', () => {
it('should use sse-stream.service', async () => {
const result = of('test');
const response = new PassThrough();
response.write = sinon.spy();
const request = new PassThrough();
request.on = sinon.spy();

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns('/');

const handler = contextCreator.createHandleResponseFn(
null,
true,
undefined,
200,
);
await handler(result, response, request);

expect((response.write as any).called).to.be.true;
expect((request.on as any).called).to.be.true;
});

it('should not allow a non-observable result', async () => {
const result = Promise.resolve('test');
const response = new PassThrough();
const request = new PassThrough();

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns('/');

const handler = contextCreator.createHandleResponseFn(
null,
true,
undefined,
200,
);

try {
await handler(result, response, request);
} catch (e) {
expect(e.message).to.equal(
'You should use an observable to use server-sent events.',
);
}
});
});
});
});
77 changes: 77 additions & 0 deletions packages/core/test/router/router-response-controller.spec.ts
Expand Up @@ -5,6 +5,8 @@ import * as sinon from 'sinon';
import { RequestMethod, HttpStatus } from '../../../common';
import { RouterResponseController } from '../../router/router-response-controller';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
import { ServerResponse, IncomingMessage } from 'http';
import { PassThrough, Writable, Readable } from 'stream';

describe('RouterResponseController', () => {
let adapter: NoopHttpAdapter;
Expand Down Expand Up @@ -247,4 +249,79 @@ describe('RouterResponseController', () => {
});
});
});
describe('Server-Sent-Events', () => {
it('should accept only observables', async () => {
const result = Promise.resolve('test');
try {
await routerResponseController.sse(
(result as unknown) as any,
({} as unknown) as ServerResponse,
({} as unknown) as IncomingMessage,
);
} catch (e) {
expect(e.message).to.eql(
'You should use an observable to use server-sent events.',
);
}
});

it('should write string', async () => {
class Sink extends Writable {
private readonly chunks: string[] = [];

_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void,
): void {
this.chunks.push(chunk);
callback();
}

get content() {
return this.chunks.join('');
}
}

const written = (stream: Writable) =>
new Promise((resolve, reject) =>
stream.on('error', reject).on('finish', resolve),
);

const result = of('test');
const response = new Sink();
const request = new PassThrough();
routerResponseController.sse(
result,
(response as unknown) as ServerResponse,
(request as unknown) as IncomingMessage,
);
request.destroy();
await written(response);
expect(response.content).to.eql(
`:ok
event: message
id: 0
data: test
`,
);
});

it('should close on request close', done => {
const result = of('test');
const response = new Writable();
response.end = () => done();
response._write = () => {};
const request = new Writable();
request._write = () => {};
routerResponseController.sse(
result,
(response as unknown) as ServerResponse,
(request as unknown) as IncomingMessage,
);
request.destroy();
});
});
});

0 comments on commit 7bc3e00

Please sign in to comment.