From 7850c9ab70bce9f3eb46c61913268bf9ccafc5e6 Mon Sep 17 00:00:00 2001 From: soyuka Date: Fri, 11 Sep 2020 18:49:01 +0200 Subject: [PATCH] fix(sse): Drain stream as recommended --- .../core/router/router-response-controller.ts | 15 ++++-- packages/core/services/sse-stream.service.ts | 27 +++++++---- .../test/services/sse-stream.service.spec.ts | 47 ++++++++++++------- 3 files changed, 59 insertions(+), 30 deletions(-) diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts index fd35c49294c..61ac9e7f25a 100644 --- a/packages/core/router/router-response-controller.ts +++ b/packages/core/router/router-response-controller.ts @@ -8,6 +8,7 @@ import { isFunction } from '@nestjs/common/utils/shared.utils'; import { Observable } from 'rxjs'; import { SseStream, HeaderStream } from '../services'; import { IncomingMessage, ServerResponse } from 'http'; +import { debounce } from 'rxjs/operators'; export interface CustomHeader { name: string; @@ -93,10 +94,16 @@ export class RouterResponseController { const stream = new SseStream(request); stream.pipe(response); - const subscription = observable.subscribe((message: any) => { - if (typeof message !== 'object') message = { data: message }; - stream.writeMessage(message); - }); + const subscription = observable + .pipe( + debounce((message: any) => { + return new Promise(resolve => { + if (typeof message !== 'object') message = { data: message }; + stream.writeMessage(message, resolve); + }); + }), + ) + .subscribe(); request.on('close', () => { response.end(); diff --git a/packages/core/services/sse-stream.service.ts b/packages/core/services/sse-stream.service.ts index fbb93701c23..061b10cf65e 100644 --- a/packages/core/services/sse-stream.service.ts +++ b/packages/core/services/sse-stream.service.ts @@ -50,7 +50,7 @@ export class SseStream extends Transform { pipe(destination: T, options?: { end?: boolean }): T { if (destination.writeHead) { destination.writeHead(200, { - // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130 + // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130 'Content-Type': 'text/event-stream', Connection: 'keep-alive', // Disable cache, even for old browsers and proxies @@ -74,24 +74,31 @@ export class SseStream extends Transform { encoding: string, callback: (error?: Error | null, data?: any) => void, ) { - if (message.type) this.push(`event: ${message.type}\n`); - if (message.id) this.push(`id: ${message.id}\n`); - if (message.retry) this.push(`retry: ${message.retry}\n`); - if (message.data) this.push(toDataString(message.data)); - this.push('\n'); + let data = message.type ? `event: ${message.type}\n` : ''; + data += message.id ? `id: ${message.id}\n` : ''; + data += message.retry ? `retry: ${message.retry}\n` : ''; + data += message.data ? toDataString(message.data) : ''; + data += '\n'; + this.push(data); callback(); } + /** + * Calls `.write` but handles the drain if needed + */ writeMessage( message: MessageEvent, - encoding?: string, - cb?: (error: Error | null | undefined) => void, - ): boolean { + cb: (error: Error | null | undefined) => void, + ) { if (!message.id) { this.lastEventId++; message.id = this.lastEventId.toString(); } - return this.write(message, encoding, cb); + if (!this.write(message, 'utf-8', cb)) { + this.once('drain', cb); + } else { + process.nextTick(cb); + } } } diff --git a/packages/core/test/services/sse-stream.service.spec.ts b/packages/core/test/services/sse-stream.service.spec.ts index 826b91f87c8..6b99a65edad 100644 --- a/packages/core/test/services/sse-stream.service.spec.ts +++ b/packages/core/test/services/sse-stream.service.spec.ts @@ -5,6 +5,8 @@ import * as EventSource from 'eventsource'; import { SseStream, HeaderStream } from '../../services/sse-stream.service'; import { AddressInfo } from 'net'; +const noop = () => {}; + const written = (stream: Writable) => new Promise((resolve, reject) => stream.on('error', reject).on('finish', resolve), @@ -41,12 +43,18 @@ describe('SseStream', () => { const sse = new SseStream(); const sink = new Sink(); sse.pipe(sink); - sse.writeMessage({ - data: 'hello\nworld', - }); - sse.write({ - data: 'bonjour\nmonde', - }); + sse.writeMessage( + { + data: 'hello\nworld', + }, + noop, + ); + sse.writeMessage( + { + data: 'bonjour\nmonde', + }, + noop, + ); sse.end(); await written(sink); expect(sink.content).to.equal( @@ -55,6 +63,7 @@ id: 1 data: hello data: world +id: 2 data: bonjour data: monde @@ -66,9 +75,12 @@ data: monde const sse = new SseStream(); const sink = new Sink(); sse.pipe(sink); - sse.writeMessage({ - data: { hello: 'world' }, - }); + sse.writeMessage( + { + data: { hello: 'world' }, + }, + noop, + ); sse.end(); await written(sink); expect(sink.content).to.equal( @@ -84,12 +96,15 @@ data: {"hello":"world"} const sse = new SseStream(); const sink = new Sink(); sse.pipe(sink); - sse.writeMessage({ - type: 'tea-time', - id: 'the-id', - retry: 222, - data: 'hello', - }); + sse.writeMessage( + { + type: 'tea-time', + id: 'the-id', + retry: 222, + data: 'hello', + }, + noop, + ); sse.end(); await written(sink); expect(sink.content).to.equal( @@ -137,7 +152,7 @@ data: hello es.close(); server.close(callback); }; - es.onopen = () => sse.writeMessage({ data: 'hello' }); + es.onopen = () => sse.writeMessage({ data: 'hello' }, noop); es.onerror = e => callback(new Error(`Error from EventSource: ${JSON.stringify(e)}`)); });