-
-
Notifications
You must be signed in to change notification settings - Fork 150
/
object.ts
170 lines (166 loc) · 4.68 KB
/
object.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import type { Keys, Predicate2 } from "@thi.ng/api";
import { dedupe } from "@thi.ng/transducers";
import type { CommonOpts, SubscriptionOpts } from "../api";
import { Subscription, subscription } from "../subscription";
import { nextID } from "../utils/idgen";
export type KeyStreams<T, K extends Keys<T>> = {
[id in K]-?: Subscription<T[id], T[id]>;
};
/**
* Result object type for {@link fromObject}.
*/
export interface StreamObj<T, K extends Keys<T>> {
/**
* Object of managed & typed streams for registered keys.
*/
streams: KeyStreams<T, K>;
/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
*
* @param x
*/
next(x: T): void;
/**
* Calls {@link ISubscriber.done} for all streams created. Satifies
* {@link ISubscriber.done} interface.
*/
done(): void;
}
export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {
/**
* Array of selected `keys` (else selects all by default) for which
* to create streams.
*/
keys: K[];
/**
* If true (default), all created streams will be seeded with key
* values from the source object.
*
* @defaultValue true
*/
initial: boolean;
/**
* Default values to use for `undefined` values of registered keys.
*/
defaults: Partial<T>;
/**
* If true, attaches {@link @thi.ng/transducers#dedupe} transducer
* to each key's value stream to avoid obsolete downstream
* propagation when a key's value hasn't actually changed.
*
* @defaultValue true
*/
dedupe: boolean;
/**
* Generic equality predicate to be used for `dedupe` (`===` by
* default). Ignored if `dedupe` option is false.
*/
equiv: Predicate2<any>;
}
/**
* Takes an arbitrary object `src` and object of options (see
* {@link StreamObjOpts}). Creates a new object and for each selected
* key creates a new stream, optionally seeded with the key's value in
* `src`. Returns new object of streams.
*
* @remarks
* The structure of the returned object is
* {@link StreamObj | as follows}:
*
* ```ts
* {
* streams: { ... },
* next(x): void;
* done(): void;
* }
* ```
*
* All streams will be stored under `streams`. The `next()` and `done()`
* functions/methods allow the object itself to be used as subscriber
* for an upstream subscribable (see 2nd example below):
*
* - `next()` - takes a object of same type as `src` and feeds each
* key's new value into its respective stream. If the `defaults`
* option is given, `undefined` key values are replaced with their
* specified default. If `dedupe` is enabled (default) only changed
* values (as per `equiv` predicate option) will be propagated
* downstream.
* - `done()` - calls {@link ISubscriber.done} on all streams
*
* The optional `opts` arg is used to customize overall behavior of
* `fromObject` and specify shared options for *all* created streams.
*
* @example
* ```ts
* type Foo = { a?: number; b: string; };
*
* const obj = fromObject(<Foo>{ a: 1, b: "foo" })
*
* obj.streams.a.subscribe(trace("a"))
* // a 1
* obj.streams.b.subscribe(trace("b"))
* // b foo
*
* obj.next({ b: "bar" })
* // a undefined
* // b bar
* ```
*
* @example
* ```ts
* const obj = fromObject(<Foo>{}, ["a", "b"], { initial: false });
* obj.streams.a.subscribe(trace("a"));
* obj.streams.b.subscribe(trace("b"));
*
* const src = subscription<Foo, Foo>();
* // use as subscriber
* src.subscribe(obj);
*
* src.next({ a: 1, b: "foo" });
* // a 1
* // b foo
* ```
*
* @param src
* @param opts
*/
export const fromObject = <T, K extends Keys<T>>(
src: T,
opts: Partial<StreamObjOpts<T, K>> = {}
) => {
const id = opts.id || `obj${nextID()}`;
const keys = opts.keys || <K[]>Object.keys(src);
const _opts: Partial<SubscriptionOpts<any, any>> =
opts.dedupe !== false
? {
xform: dedupe<any>(opts.equiv || ((a, b) => a === b)),
...opts,
}
: opts;
const streams: any = {};
for (let k of keys) {
streams[k] = subscription(undefined, {
..._opts,
id: `${id}-${k}`,
});
}
const res = <StreamObj<T, K>>{
streams,
next(state) {
for (let k of keys) {
const val = state[k];
streams[k].next(
opts.defaults && val === undefined ? opts.defaults[k] : val
);
}
},
done() {
for (let k of keys) {
streams[k].done();
}
},
};
opts.initial !== false && res.next(src);
return res;
};