Skip to content

Commit

Permalink
fix(sse): Drain stream as recommended
Browse files Browse the repository at this point in the history
  • Loading branch information
soyuka committed Oct 13, 2020
1 parent e0e1feb commit 7850c9a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 30 deletions.
15 changes: 11 additions & 4 deletions packages/core/router/router-response-controller.ts
Expand Up @@ -8,6 +8,7 @@ import { isFunction } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { SseStream, HeaderStream } from '../services';
import { IncomingMessage, ServerResponse } from 'http';
import { debounce } from 'rxjs/operators';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -93,10 +94,16 @@ export class RouterResponseController {
const stream = new SseStream(request);
stream.pipe(response);

const subscription = observable.subscribe((message: any) => {
if (typeof message !== 'object') message = { data: message };
stream.writeMessage(message);
});
const subscription = observable
.pipe(
debounce((message: any) => {
return new Promise(resolve => {
if (typeof message !== 'object') message = { data: message };
stream.writeMessage(message, resolve);
});
}),
)
.subscribe();

request.on('close', () => {
response.end();
Expand Down
27 changes: 17 additions & 10 deletions packages/core/services/sse-stream.service.ts
Expand Up @@ -50,7 +50,7 @@ export class SseStream extends Transform {
pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
if (destination.writeHead) {
destination.writeHead(200, {
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
// Disable cache, even for old browsers and proxies
Expand All @@ -74,24 +74,31 @@ export class SseStream extends Transform {
encoding: string,
callback: (error?: Error | null, data?: any) => void,
) {
if (message.type) this.push(`event: ${message.type}\n`);
if (message.id) this.push(`id: ${message.id}\n`);
if (message.retry) this.push(`retry: ${message.retry}\n`);
if (message.data) this.push(toDataString(message.data));
this.push('\n');
let data = message.type ? `event: ${message.type}\n` : '';
data += message.id ? `id: ${message.id}\n` : '';
data += message.retry ? `retry: ${message.retry}\n` : '';
data += message.data ? toDataString(message.data) : '';
data += '\n';
this.push(data);
callback();
}

/**
* Calls `.write` but handles the drain if needed
*/
writeMessage(
message: MessageEvent,
encoding?: string,
cb?: (error: Error | null | undefined) => void,
): boolean {
cb: (error: Error | null | undefined) => void,
) {
if (!message.id) {
this.lastEventId++;
message.id = this.lastEventId.toString();
}

return this.write(message, encoding, cb);
if (!this.write(message, 'utf-8', cb)) {
this.once('drain', cb);
} else {
process.nextTick(cb);
}
}
}
47 changes: 31 additions & 16 deletions packages/core/test/services/sse-stream.service.spec.ts
Expand Up @@ -5,6 +5,8 @@ import * as EventSource from 'eventsource';
import { SseStream, HeaderStream } from '../../services/sse-stream.service';
import { AddressInfo } from 'net';

const noop = () => {};

const written = (stream: Writable) =>
new Promise((resolve, reject) =>
stream.on('error', reject).on('finish', resolve),
Expand Down Expand Up @@ -41,12 +43,18 @@ describe('SseStream', () => {
const sse = new SseStream();
const sink = new Sink();
sse.pipe(sink);
sse.writeMessage({
data: 'hello\nworld',
});
sse.write({
data: 'bonjour\nmonde',
});
sse.writeMessage(
{
data: 'hello\nworld',
},
noop,
);
sse.writeMessage(
{
data: 'bonjour\nmonde',
},
noop,
);
sse.end();
await written(sink);
expect(sink.content).to.equal(
Expand All @@ -55,6 +63,7 @@ id: 1
data: hello
data: world
id: 2
data: bonjour
data: monde
Expand All @@ -66,9 +75,12 @@ data: monde
const sse = new SseStream();
const sink = new Sink();
sse.pipe(sink);
sse.writeMessage({
data: { hello: 'world' },
});
sse.writeMessage(
{
data: { hello: 'world' },
},
noop,
);
sse.end();
await written(sink);
expect(sink.content).to.equal(
Expand All @@ -84,12 +96,15 @@ data: {"hello":"world"}
const sse = new SseStream();
const sink = new Sink();
sse.pipe(sink);
sse.writeMessage({
type: 'tea-time',
id: 'the-id',
retry: 222,
data: 'hello',
});
sse.writeMessage(
{
type: 'tea-time',
id: 'the-id',
retry: 222,
data: 'hello',
},
noop,
);
sse.end();
await written(sink);
expect(sink.content).to.equal(
Expand Down Expand Up @@ -137,7 +152,7 @@ data: hello
es.close();
server.close(callback);
};
es.onopen = () => sse.writeMessage({ data: 'hello' });
es.onopen = () => sse.writeMessage({ data: 'hello' }, noop);
es.onerror = e =>
callback(new Error(`Error from EventSource: ${JSON.stringify(e)}`));
});
Expand Down

0 comments on commit 7850c9a

Please sign in to comment.