Skip to content

Commit

Permalink
test(sse): Test sse
Browse files Browse the repository at this point in the history
  • Loading branch information
soyuka committed Aug 25, 2020
1 parent 2ee6c88 commit 082a569
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 54 deletions.
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -116,6 +116,7 @@
"eslint": "7.6.0",
"eslint-config-prettier": "6.11.0",
"eslint-plugin-import": "2.22.0",
"eventsource": "^1.0.7",
"fancy-log": "1.3.3",
"fastify": "3.2.0",
"fastify-cors": "4.1.0",
Expand Down
1 change: 0 additions & 1 deletion packages/common/http/index.ts
@@ -1,4 +1,3 @@
export * from './http.module';
export * from './http.service';
export * from './interfaces';
export { SseStream } from './sse-stream.service';
14 changes: 12 additions & 2 deletions packages/common/test/decorators/sse.decorator.spec.ts
@@ -1,6 +1,12 @@
import { expect } from 'chai';
import { Sse } from '../../decorators/http/sse.decorator';
import { HTTP_CODE_METADATA, SSE_METADATA } from '../../constants';
import {
HTTP_CODE_METADATA,
SSE_METADATA,
PATH_METADATA,
METHOD_METADATA,
} from '../../constants';
import { RequestMethod } from '../../enums/request-method.enum';

describe('@Sse', () => {
const prefix = '/prefix';
Expand All @@ -10,7 +16,11 @@ describe('@Sse', () => {
}

it('should enhance method with expected http status code', () => {
const path = Reflect.getMetadata(PATH_METADATA, Test.test);
expect(path).to.be.eql('/prefix');
const method = Reflect.getMetadata(METHOD_METADATA, Test.test);
expect(method).to.be.eql(RequestMethod.GET);
const metadata = Reflect.getMetadata(SSE_METADATA, Test.test);
expect(metadata).to.be.eql(prefix);
expect(metadata).to.be.eql(true);
});
});
12 changes: 7 additions & 5 deletions packages/core/helpers/handler-metadata-storage.ts
Expand Up @@ -6,6 +6,12 @@ import { ParamProperties } from './context-utils';

export const HANDLER_METADATA_SYMBOL = Symbol.for('handler_metadata:cache');

export type HandleResponseFn = <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;

export interface HandlerMetadata {
argsLength: number;
paramtypes: any[];
Expand All @@ -17,11 +23,7 @@ export interface HandlerMetadata {
contextId?: ContextId,
inquirerId?: string,
) => (ParamProperties & { metatype?: any })[];
fnHandleResponse: <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;
fnHandleResponse: HandleResponseFn;
}

export class HandlerMetadataStorage<
Expand Down
5 changes: 3 additions & 2 deletions packages/core/router/router-execution-context.ts
Expand Up @@ -27,6 +27,7 @@ import { ExecutionContextHost } from '../helpers/execution-context-host';
import {
HandlerMetadata,
HandlerMetadataStorage,
HandleResponseFn,
} from '../helpers/handler-metadata-storage';
import { STATIC_CONTEXT } from '../injector/constants';
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
Expand Down Expand Up @@ -403,7 +404,7 @@ export class RouterExecutionContext {
isResponseHandled: boolean,
redirectResponse?: RedirectResponse,
httpStatusCode?: number,
) {
): HandleResponseFn {
const renderTemplate = this.reflectRenderTemplate(callback);
if (renderTemplate) {
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
Expand All @@ -420,7 +421,7 @@ export class RouterExecutionContext {
return async <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req: TRequest,
req?: TRequest,
) => {
await this.responseController.sse(result, res, req);
};
Expand Down
28 changes: 16 additions & 12 deletions packages/core/router/router-response-controller.ts
Expand Up @@ -2,11 +2,12 @@ import {
HttpServer,
HttpStatus,
RequestMethod,
SseStream,
MessageEvent,
} from '@nestjs/common';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { SseStream, HeaderStream } from '../services';
import { IncomingMessage, ServerResponse } from 'http';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -86,21 +87,14 @@ export class RouterResponseController {
this.applicationRef.status(response, statusCode);
}

public async sse<TInput = unknown, TResponse = unknown, TRequest = unknown>(
result: any,
response: any,
request: any,
) {
if (!isFunction(result.subscribe)) {
throw new ReferenceError(
'You should use an observable to use server-sent events.',
);
}
public async sse<TInput>(result: TInput, response: any, request: any) {
const observable = this.assertObservable(result);

const stream = new SseStream(request);
stream.pipe(response);

const subscription = result.subscribe((message: MessageEvent) => {
const subscription = observable.subscribe((message: any) => {
if (typeof message !== 'object') message = { data: message };
stream.writeMessage(message);
});

Expand All @@ -109,4 +103,14 @@ export class RouterResponseController {
subscription.unsubscribe();
});
}

private assertObservable(result: any): Observable<unknown> {
if (!isFunction(result.subscribe)) {
throw new ReferenceError(
'You should use an observable to use server-sent events.',
);
}

return result;
}
}
1 change: 1 addition & 0 deletions packages/core/services/index.ts
@@ -1 +1,2 @@
export * from './reflector.service';
export * from './sse-stream.service';
@@ -1,6 +1,6 @@
import { Transform } from 'stream';
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
import { MessageEvent } from '../interfaces';
import { MessageEvent } from '@nestjs/common/interfaces';

function toDataString(data: string | object): string {
if (typeof data === 'object') return toDataString(JSON.stringify(data));
Expand All @@ -11,7 +11,12 @@ function toDataString(data: string | object): string {
}

interface WriteHeaders {
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders;
writeHead?(
statusCode: number,
reasonPhrase?: string,
headers?: OutgoingHttpHeaders,
): void;
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
flushHeaders?(): void;
}

Expand All @@ -35,7 +40,7 @@ export class SseStream extends Transform {

constructor(req?: IncomingMessage) {
super({ objectMode: true });
if (req) {
if (req && req.socket) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
Expand All @@ -45,15 +50,22 @@ export class SseStream extends Transform {
pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
if (destination.writeHead) {
destination.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'identity',
'Cache-Control': 'no-cache',
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
// Disable cache, even for old browsers and proxies
'Cache-Control':
'private, no-cache, no-store, must-revalidate, max-age=0',
'Transfer-Encoding': 'identity',
Pragma: 'no-cache',
Expire: '0',
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
'X-Accel-Buffering': 'no',
});
destination.flushHeaders();
}

destination.write(':ok\n');
destination.write(':\n');
return super.pipe(destination, options);
}

Expand All @@ -76,12 +88,8 @@ export class SseStream extends Transform {
cb?: (error: Error | null | undefined) => void,
): boolean {
if (!message.id) {
this.lastEventId = this.lastEventId === null ? 0 : this.lastEventId + 1;
message.id = '' + this.lastEventId;
}

if (!message.type) {
message.type = 'message';
this.lastEventId++;
message.id = this.lastEventId.toString();
}

return this.write(message, encoding, cb);
Expand Down
53 changes: 53 additions & 0 deletions packages/core/test/router/router-execution-context.spec.ts
@@ -1,5 +1,6 @@
import { ForbiddenException } from '@nestjs/common/exceptions/forbidden.exception';
import { expect } from 'chai';
import { of } from 'rxjs';
import * as sinon from 'sinon';
import { HttpException, HttpStatus, RouteParamMetadata } from '../../../common';
import { CUSTOM_ROUTE_AGRS_METADATA } from '../../../common/constants';
Expand All @@ -17,6 +18,7 @@ import { PipesContextCreator } from '../../pipes/pipes-context-creator';
import { RouteParamsFactory } from '../../router/route-params-factory';
import { RouterExecutionContext } from '../../router/router-execution-context';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
import { PassThrough } from 'stream';

describe('RouterExecutionContext', () => {
let contextCreator: RouterExecutionContext;
Expand Down Expand Up @@ -326,6 +328,7 @@ describe('RouterExecutionContext', () => {

sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand Down Expand Up @@ -377,6 +380,7 @@ describe('RouterExecutionContext', () => {

sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand All @@ -396,6 +400,7 @@ describe('RouterExecutionContext', () => {
const response = {};

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns(undefined);

const handler = contextCreator.createHandleResponseFn(
null,
Expand All @@ -414,5 +419,53 @@ describe('RouterExecutionContext', () => {
).to.be.true;
});
});

describe('when "isSse" is enabled', () => {
it('should use sse-stream.service', async () => {
const result = of('test');
const response = new PassThrough();
response.write = sinon.spy();
const request = new PassThrough();
request.on = sinon.spy();

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns('/');

const handler = contextCreator.createHandleResponseFn(
null,
true,
undefined,
200,
);
await handler(result, response, request);

expect((response.write as any).called).to.be.true;
expect((request.on as any).called).to.be.true;
});

it('should not allow a non-observable result', async () => {
const result = Promise.resolve('test');
const response = new PassThrough();
const request = new PassThrough();

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns('/');

const handler = contextCreator.createHandleResponseFn(
null,
true,
undefined,
200,
);

try {
await handler(result, response, request);
} catch (e) {
expect(e.message).to.equal(
'You should use an observable to use server-sent events.',
);
}
});
});
});
});

0 comments on commit 082a569

Please sign in to comment.