Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sse): Implement Server-Sent Events #4842

Merged
merged 3 commits into from Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19,253 changes: 7,411 additions & 11,842 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -116,6 +116,7 @@
"eslint": "7.10.0",
"eslint-config-prettier": "6.11.0",
"eslint-plugin-import": "2.22.1",
"eventsource": "^1.0.7",
"fancy-log": "1.3.3",
"fastify": "3.5.1",
"fastify-cors": "4.1.0",
Expand Down
1 change: 1 addition & 0 deletions packages/common/constants.ts
Expand Up @@ -26,3 +26,4 @@ export const HTTP_CODE_METADATA = '__httpCode__';
export const MODULE_PATH = '__module_path__';
export const HEADERS_METADATA = '__headers__';
export const REDIRECT_METADATA = '__redirect__';
export const SSE_METADATA = '__sse__';
1 change: 1 addition & 0 deletions packages/common/decorators/http/index.ts
Expand Up @@ -5,3 +5,4 @@ export * from './create-route-param-metadata.decorator';
export * from './render.decorator';
export * from './header.decorator';
export * from './redirect.decorator';
export * from './sse.decorator';
28 changes: 28 additions & 0 deletions packages/common/decorators/http/sse.decorator.ts
@@ -0,0 +1,28 @@
import { SSE_METADATA, PATH_METADATA, METHOD_METADATA } from '../../constants';
import { RequestMethod } from '../../enums/request-method.enum';

/**
* Declares this route as a Server-Sent-Events endpoint
*
* @publicApi
*/
export function Sse(path?: string): MethodDecorator {
kamilmysliwiec marked this conversation as resolved.
Show resolved Hide resolved
return (
target: object,
key: string | symbol,
descriptor: TypedPropertyDescriptor<any>,
) => {
Reflect.defineMetadata(
PATH_METADATA,
path && path.length ? path : '/',
descriptor.value,
);
Reflect.defineMetadata(
METHOD_METADATA,
RequestMethod.GET,
descriptor.value,
);
Reflect.defineMetadata(SSE_METADATA, true, descriptor.value);
return descriptor;
};
}
1 change: 1 addition & 0 deletions packages/common/index.ts
Expand Up @@ -31,6 +31,7 @@ export {
INestApplicationContext,
INestMicroservice,
IntrospectionResult,
MessageEvent,
MiddlewareConsumer,
ModuleMetadata,
NestApplicationOptions,
Expand Down
7 changes: 7 additions & 0 deletions packages/common/interfaces/http/http-server.interface.ts
Expand Up @@ -69,3 +69,10 @@ export interface HttpServer<TRequest = any, TResponse = any> {
getType(): string;
init?(): Promise<void>;
}

export interface MessageEvent {
data: string | object;
id?: string;
type?: string;
retry?: number;
}
26 changes: 26 additions & 0 deletions packages/common/test/decorators/sse.decorator.spec.ts
@@ -0,0 +1,26 @@
import { expect } from 'chai';
import { Sse } from '../../decorators/http/sse.decorator';
import {
HTTP_CODE_METADATA,
SSE_METADATA,
PATH_METADATA,
METHOD_METADATA,
} from '../../constants';
import { RequestMethod } from '../../enums/request-method.enum';

describe('@Sse', () => {
const prefix = '/prefix';
class Test {
@Sse(prefix)
public static test() {}
}

it('should enhance method with expected http status code', () => {
const path = Reflect.getMetadata(PATH_METADATA, Test.test);
expect(path).to.be.eql('/prefix');
const method = Reflect.getMetadata(METHOD_METADATA, Test.test);
expect(method).to.be.eql(RequestMethod.GET);
const metadata = Reflect.getMetadata(SSE_METADATA, Test.test);
expect(metadata).to.be.eql(true);
});
});
11 changes: 7 additions & 4 deletions packages/core/helpers/handler-metadata-storage.ts
Expand Up @@ -6,6 +6,12 @@ import { ParamProperties } from './context-utils';

export const HANDLER_METADATA_SYMBOL = Symbol.for('handler_metadata:cache');

export type HandleResponseFn = <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;

export interface HandlerMetadata {
argsLength: number;
paramtypes: any[];
Expand All @@ -17,10 +23,7 @@ export interface HandlerMetadata {
contextId?: ContextId,
inquirerId?: string,
) => (ParamProperties & { metatype?: any })[];
fnHandleResponse: <TResult, TResponse>(
result: TResult,
res: TResponse,
) => any;
fnHandleResponse: HandleResponseFn;
}

export class HandlerMetadataStorage<
Expand Down
20 changes: 18 additions & 2 deletions packages/core/router/router-execution-context.ts
Expand Up @@ -13,6 +13,7 @@ import {
REDIRECT_METADATA,
RENDER_METADATA,
ROUTE_ARGS_METADATA,
SSE_METADATA,
} from '@nestjs/common/constants';
import { RouteParamMetadata } from '@nestjs/common/decorators';
import { RouteParamtypes } from '@nestjs/common/enums/route-paramtypes.enum';
Expand All @@ -26,6 +27,7 @@ import { ExecutionContextHost } from '../helpers/execution-context-host';
import {
HandlerMetadata,
HandlerMetadataStorage,
HandleResponseFn,
} from '../helpers/handler-metadata-storage';
import { STATIC_CONTEXT } from '../injector/constants';
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
Expand Down Expand Up @@ -160,7 +162,7 @@ export class RouterExecutionContext {
handler(args, req, res, next),
contextType,
);
await fnHandleResponse(result, res);
await fnHandleResponse(result, res, req);
};
}

Expand Down Expand Up @@ -265,6 +267,10 @@ export class RouterExecutionContext {
return Reflect.getMetadata(HEADERS_METADATA, callback) || [];
}

public reflectSse(callback: (...args: unknown[]) => unknown): string {
return Reflect.getMetadata(SSE_METADATA, callback);
}

public exchangeKeysForValues(
keys: string[],
metadata: Record<number, RouteParamMetadata>,
Expand Down Expand Up @@ -398,7 +404,7 @@ export class RouterExecutionContext {
isResponseHandled: boolean,
redirectResponse?: RedirectResponse,
httpStatusCode?: number,
) {
): HandleResponseFn {
const renderTemplate = this.reflectRenderTemplate(callback);
if (renderTemplate) {
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
Expand All @@ -410,6 +416,16 @@ export class RouterExecutionContext {
await this.responseController.redirect(result, res, redirectResponse);
};
}
const isSse = !!this.reflectSse(callback);
if (isSse) {
return async <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => {
await this.responseController.sse(result, res, req);
};
}
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
result = await this.responseController.transformToResult(result);
!isResponseHandled &&
Expand Down
44 changes: 43 additions & 1 deletion packages/core/router/router-response-controller.ts
@@ -1,5 +1,14 @@
import { HttpServer, HttpStatus, RequestMethod } from '@nestjs/common';
import {
HttpServer,
HttpStatus,
RequestMethod,
MessageEvent,
} from '@nestjs/common';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { SseStream, HeaderStream } from '../services';
import { IncomingMessage, ServerResponse } from 'http';
import { debounce } from 'rxjs/operators';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -78,4 +87,37 @@ export class RouterResponseController {
) {
this.applicationRef.status(response, statusCode);
}

public async sse<TInput>(result: TInput, response: any, request: any) {
const observable = this.assertObservable(result);

const stream = new SseStream(request);
stream.pipe(response);

const subscription = observable
.pipe(
debounce((message: any) => {
return new Promise(resolve => {
if (typeof message !== 'object') message = { data: message };
stream.writeMessage(message, resolve);
});
}),
)
.subscribe();

request.on('close', () => {
response.end();
subscription.unsubscribe();
});
}

private assertObservable(result: any): Observable<unknown> {
if (!isFunction(result.subscribe)) {
throw new ReferenceError(
'You should use an observable to use server-sent events.',
);
}

return result;
}
}
1 change: 1 addition & 0 deletions packages/core/services/index.ts
@@ -1 +1,2 @@
export * from './reflector.service';
export * from './sse-stream.service';
104 changes: 104 additions & 0 deletions packages/core/services/sse-stream.service.ts
@@ -0,0 +1,104 @@
import { Transform } from 'stream';
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
import { MessageEvent } from '@nestjs/common/interfaces';

function toDataString(data: string | object): string {
if (typeof data === 'object') return toDataString(JSON.stringify(data));
return data
.split(/\r\n|\r|\n/)
.map(line => `data: ${line}\n`)
.join('');
}

interface WriteHeaders {
writeHead?(
statusCode: number,
reasonPhrase?: string,
headers?: OutgoingHttpHeaders,
): void;
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
flushHeaders?(): void;
}

export type HeaderStream = NodeJS.WritableStream & WriteHeaders;

/**
* Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
* Transforms "messages" to W3C event stream content.
* See https://html.spec.whatwg.org/multipage/server-sent-events.html
* A message is an object with one or more of the following properties:
* - data (String or object, which gets turned into JSON)
* - type
* - id
* - retry
*
* If constructed with a HTTP Request, it will optimise the socket for streaming.
* If this stream is piped to an HTTP Response, it will set appropriate headers.
*/
export class SseStream extends Transform {
private lastEventId: number = null;

constructor(req?: IncomingMessage) {
super({ objectMode: true });
if (req && req.socket) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
}
}

pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
if (destination.writeHead) {
destination.writeHead(200, {
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
// Disable cache, even for old browsers and proxies
'Cache-Control':
'private, no-cache, no-store, must-revalidate, max-age=0',
'Transfer-Encoding': 'identity',
Pragma: 'no-cache',
Expire: '0',
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
'X-Accel-Buffering': 'no',
});
destination.flushHeaders();
}

destination.write(':\n');
return super.pipe(destination, options);
}

_transform(
message: MessageEvent,
encoding: string,
callback: (error?: Error | null, data?: any) => void,
) {
let data = message.type ? `event: ${message.type}\n` : '';
data += message.id ? `id: ${message.id}\n` : '';
data += message.retry ? `retry: ${message.retry}\n` : '';
data += message.data ? toDataString(message.data) : '';
data += '\n';
this.push(data);
callback();
}

/**
* Calls `.write` but handles the drain if needed
*/
writeMessage(
message: MessageEvent,
cb: (error: Error | null | undefined) => void,
) {
if (!message.id) {
this.lastEventId++;
message.id = this.lastEventId.toString();
}

if (!this.write(message, 'utf-8', cb)) {
this.once('drain', cb);
} else {
process.nextTick(cb);
}
}
}