-
Notifications
You must be signed in to change notification settings - Fork 5
/
event-source.ts
92 lines (87 loc) · 2.54 KB
/
event-source.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { fromEvent, merge, Observable, of } from 'rxjs';
import { map, finalize, takeUntil, switchMap } from 'rxjs/operators';
export interface EventSourceOptions<TData = unknown> {
/**
* Event types to subscribe to.
* @default ['message']
*/
eventTypes: Array<string>;
/**
* Function to parse messages.
* @default (_, message) => JSON.parse(message)
*/
parseFn: (eventType: string, message: string) => TData;
}
/**
* Takes EventSource and creates an observable from it.
*
* @deprecated Use `fromEventSource` from `rx-event-source` package instead.
*
* @example
* ```ts
* const sse = new EventSource(url, configuration);
* return fromEventSource(sse).pipe(
* finalize(() => {
* // Make sure the EventSource is closed once not needed.
* sse.close();
* }),
* );
* ```
*/
export function fromEventSource<TData = unknown>(
sse: EventSource,
options?: EventSourceOptions<TData>
): Observable<TData> {
const defaultOptions: EventSourceOptions<TData> = {
eventTypes: ['message'],
parseFn: (_, message) => JSON.parse(message),
};
const { eventTypes, parseFn } = { ...defaultOptions, ...(options || {}) };
const events$ = eventTypes.map((eventType) =>
fromEvent<{ data: string }>(sse, eventType).pipe(
map((event: MessageEvent<string>) => parseFn(eventType, event.data))
)
);
const error$ = fromEvent<{ data?: string }>(sse, 'error').pipe(
map((message) => JSON.parse(message?.data || 'null')),
map((data) => {
throw new Error(data?.errorMessage || 'Event Source Error');
})
);
const complete$ = fromEvent(sse, 'complete');
return merge(...events$, error$).pipe(takeUntil(complete$));
}
/**
* Creates event source from url (and config) and returns an observable with
* parsed event source data.
* Opens the event source once subscribed.
* Closes the event source, once unsubscribed.
*
* @deprecated Use `eventSource$` from `rx-event-source` package instead.
*
* @example
* ```ts
* const sse$ = eventSource$('https://example.com/sse', {
* withCredentials: true,
* });
* sse$.subscribe((data) => {
* console.log(data);
* });
* ```
*/
export function eventSource$<TData = unknown>(
url: string,
configuration?: EventSourceInit,
options?: EventSourceOptions<TData>
): Observable<TData> {
return of({ url, configuration }).pipe(
switchMap(({ url, configuration }) => {
const sse = new EventSource(url, configuration);
return fromEventSource<TData>(sse, options).pipe(
finalize(() => {
sse.close();
})
);
})
);
}