-
Notifications
You must be signed in to change notification settings - Fork 1
/
EventSource.ts
62 lines (54 loc) · 1.89 KB
/
EventSource.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import * as http from 'node:http';
import * as https from 'node:https';
import { createEventStreamTransform } from '../eventStreamParser.js';
import { BaseEventSource, EVENT_STREAM_HEADERS } from '../BaseEventSource.js';
/**
* polyfill of browser EventSource relying on Node EventTarget,
* Node http/https/http2 api,
* and reader-parser of this package
*/
export class EventSource extends BaseEventSource {
constructor(url: string, eventSourceInitDict: https.RequestOptions = {}) {
super(url);
this.#eventSourceInitDict = eventSourceInitDict;
this.init(url);
}
protected init(url: string): void {
if (this.#request !== undefined) {
this.#request.destroy();
this.#request = undefined;
}
if (this.#eventSourceInitDict.headers === undefined)
this.#eventSourceInitDict.headers = {};
Object.assign(this.#eventSourceInitDict.headers, EVENT_STREAM_HEADERS);
const wgUrl = new URL(url);
this.#request = (wgUrl.protocol === 'https:' ? https : http)
.request(wgUrl, this.#eventSourceInitDict, response => {
this.#response = response;
if (!this.isValidResponse(response)) {
this.handleInvalidResponse();
this.init(url);
return;
}
this.signalOpen();
const stream = response.pipe(createEventStreamTransform());
this.initStreamAdaptor(stream, this.cleaning);
})
.on('close', () => {
this.signalError(new Error('Connection closed'));
setTimeout(() => this.init(url), this.#reconnectionTime);
})
.end();
}
// implementation
#reconnectionTime: number = 3000;
#eventSourceInitDict: https.RequestOptions;
#request?: http.ClientRequest;
#response?: http.IncomingMessage;
protected readonly cleaning = (): void => {
this.#response?.destroy();
this.#request?.destroy();
this.#response = undefined;
this.#request = undefined;
};
}