/
pubsub.ts
180 lines (168 loc) · 5.36 KB
/
pubsub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import type { Fn, Predicate2 } from "@thi.ng/api";
import { EquivMap } from "@thi.ng/associative";
import { unsupported } from "@thi.ng/errors";
import type { Transducer } from "@thi.ng/transducers";
import {
CloseMode,
ISubscriber,
LOGGER,
SubscriptionOpts,
TransformableOpts,
WithErrorHandlerOpts,
} from "./api";
import { Subscription, subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";
export interface PubSubOpts<A, B, T> {
/**
* Topic function. Incoming values will be routed to topic
* subscriptions using this function's return value.
*/
topic: Fn<B, T>;
/**
* Optional transformer for incoming values. If given, `xform` will
* be applied first and the transformed value passed to the
* `topic` fn.
*/
xform?: Transducer<A, B>;
/**
* Equivalence check for topic values. Should return truthy result
* if given topics are considered equal.
*/
equiv?: Predicate2<T>;
/**
* Optional subscription ID for the PubSub instance.
*/
id?: string;
}
/**
* Topic based stream splitter. Applies `topic` function to each received value
* and only forwards it to the child subscriptions of the returned topic.
*
* @remarks
* The actual topic (return value from `topic` fn) can be of any type `T`, or
* `undefined`. If the latter is returned, the incoming value will not be
* processed further. Complex topics (e.g objects / arrays) are allowed and
* they're matched against registered topics using {@link @thi.ng/equiv#equiv}
* by default (but customizable via `equiv` option). Each topic can have any
* number of subscribers.
*
* If a `xform` transducer is given, it is always applied prior to passing the
* input to the topic function. I.e. in this case the topic function will
* receive the transformed inputs.
*
* {@link PubSub} supports dynamic topic subscriptions and unsubscriptions via
* {@link PubSub.(subscribeTopic:1)} and {@link PubSub.unsubscribeTopic}.
* However, the standard {@link ISubscribable.(subscribe:1)} /
* {@link ISubscribable.unsubscribe} methods are NOT supported (since
* meaningless) and will throw an error! `unsubscribe()` can only be called
* WITHOUT argument to unsubscribe the entire `PubSub` instance (incl. all topic
* subscriptions) from the parent stream.
*
* @param opts -
*/
export const pubsub = <A, B = A, T = any>(opts: PubSubOpts<A, B, T>) =>
new PubSub(opts);
/**
* @see {@link pubsub} for reference & examples.
*/
export class PubSub<A, B = A, T = any> extends Subscription<A, B> {
topicfn: Fn<B, T>;
topics: EquivMap<T, Subscription<B, B>>;
constructor(opts: PubSubOpts<A, B, T>) {
super(
undefined,
optsWithID("pubsub", <Partial<SubscriptionOpts<A, B>>>{
xform: opts.xform,
})
);
this.topicfn = opts.topic;
this.topics = new EquivMap<T, Subscription<B, B>>(undefined, {
equiv: opts.equiv,
});
}
/**
* Unsupported. Use {@link PubSub.(subscribeTopic:1)} instead.
*/
subscribe(): Subscription<B, any> {
return unsupported(`use subscribeTopic() instead`);
}
/**
* Unsupported. Use {@link PubSub.(subscribeTopic:1)} instead.
*/
transform(): Subscription<B, any> {
return unsupported(`use subscribeTopic() instead`);
}
subscribeTopic<C>(
topicID: T,
opts?: Partial<TransformableOpts<B, C>>
): Subscription<B, C>;
subscribeTopic<C>(
topicID: T,
sub: ISubscriber<C>,
opts?: Partial<TransformableOpts<B, C>>
): Subscription<B, C>;
subscribeTopic(
topicID: T,
sub: any,
opts?: Partial<TransformableOpts<any, any>>
): Subscription<any, any> {
let t = this.topics.get(topicID);
!t &&
this.topics.set(
topicID,
(t = subscription(undefined, {
closeOut: CloseMode.NEVER,
}))
);
return t.subscribe(sub, opts);
}
transformTopic<C>(
topicID: T,
xform: Transducer<B, C>,
opts: Partial<WithErrorHandlerOpts> = {}
) {
return this.subscribeTopic(
topicID,
<ISubscriber<B>>{ error: opts.error },
<any>{
...opts,
xform,
}
);
}
unsubscribeTopic(topicID: T, sub: Subscription<B, any>) {
const t = this.topics.get(topicID);
return t ? t.unsubscribe(sub) : false;
}
unsubscribe(sub: Subscription<B, any>) {
if (!sub) {
for (let t of this.topics.values()) {
t.unsubscribe();
}
this.topics.clear();
return super.unsubscribe();
}
// only the PubSub itself can be unsubscribed
return unsupported();
}
done() {
for (let t of this.topics.values()) {
t.done();
}
super.done();
}
protected dispatch(x: B) {
LOGGER.debug(this.id, "dispatch", x);
const t = this.topicfn(x);
if (t !== undefined) {
const sub = this.topics.get(t);
if (sub) {
try {
sub.next && sub.next(x);
} catch (e) {
sub.error ? sub.error(e) : this.error(e);
}
}
}
}
}