Skip to content

Commit 7db0bda

Browse files
committed
Commit
1 parent fec717c commit 7db0bda

File tree

6 files changed

+202
-81
lines changed

6 files changed

+202
-81
lines changed

json-fetch.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
// deno-lint-ignore-file no-explicit-any
22
import { StreamResponse, StreamRequest } from "https://ghuc.cc/worker-tools/stream-response/index.ts";
3-
import { JSONStringifyReadable } from './json-stringify.ts';
3+
import { asyncIterToStream } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts';
4+
import { JSONStringifyReadable, isAsyncIterable } from './json-stringify.ts';
45

5-
export type JSONStreamBodyInit = any
6+
export type JSONStreamBodyInit = ReadableStream<string> | AsyncIterable<string> | any;
67
export type JSONStreamRequestInit = Omit<RequestInit, 'body'> & { body?: JSONStreamBodyInit }
78

9+
const toBody = (x: any) => x instanceof ReadableStream
10+
? x
11+
: isAsyncIterable(x)
12+
? asyncIterToStream(x)
13+
: new JSONStringifyReadable(x)
14+
815
export class JSONStreamRequest extends StreamRequest {
916
static contentType = 'application/json;charset=UTF-8';
1017
static accept = 'application/json, text/plain, */*';
@@ -17,7 +24,7 @@ export class JSONStreamRequest extends StreamRequest {
1724
) {
1825
const { headers: _headers, body: _body, ...rest } = init || {};
1926

20-
const body = new JSONStringifyReadable(_body)
27+
const body = toBody(_body);
2128

2229
const headers = new Headers(_headers);
2330
if (!headers.has('Content-Type') && _body != null)
@@ -41,7 +48,7 @@ export class JSONStreamResponse extends StreamResponse {
4148
) {
4249
const { headers: _headers, ...rest } = init || {};
4350

44-
const _body = new JSONStringifyReadable(body)
51+
const _body = toBody(body)
4552

4653
const headers = new Headers(_headers);
4754

json-parse-lazy-promise.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// deno-lint-ignore-file no-explicit-any
2+
import { ResolvablePromise } from 'https://ghuc.cc/worker-tools/resolvable-promise/index.ts';
3+
import { pipe } from 'https://cdn.skypack.dev/ts-functional-pipe@3.1.2';
4+
5+
const id = (_: any) => _;
6+
7+
type Awaitable<T> = T | PromiseLike<T>;
8+
9+
// TODO: Make own module?
10+
// TODO: Add abort signal?
11+
export class JSONParseLazyPromise<T, TTask = T> implements Promise<T> {
12+
#promise;
13+
#task;
14+
#mapFn;
15+
#thisArg;
16+
17+
// static create<U>(task: () => Awaitable<U>): JSONParseLazyPromise<U> {
18+
// this.#promise = new ResolvablePromise<T>()
19+
// }
20+
21+
constructor(
22+
task: () => Awaitable<TTask>,
23+
mapFn?: ((value: TTask, i?: 0) => Awaitable<T>) | undefined | null,
24+
thisArg?: any,
25+
) {
26+
// FIXME: Can avoid repeated creation?
27+
this.#promise = new ResolvablePromise<T>();
28+
this.#task = task;
29+
this.#mapFn = mapFn;
30+
this.#thisArg = thisArg;
31+
}
32+
33+
#pull() {
34+
Promise.resolve(this.#task())
35+
.then(this.#mapFn && (x => this.#mapFn!.call(this.#thisArg, x, 0)))
36+
.then(x => this.#promise.resolve(x), err => this.#promise.reject(err));
37+
}
38+
39+
/**
40+
* Starts the execution of the task associated with the lazy promise.
41+
* If you don't want to start the task at this moment, use `.map` instead.
42+
*/
43+
then<U = T, V = never>(
44+
onfulfilled?: ((value: T) => Awaitable<U>) | undefined | null,
45+
onrejected?: ((reason: any) => Awaitable<V>) | undefined | null
46+
): Promise<U | V> {
47+
this.#pull();
48+
return this.#promise.then(onfulfilled, onrejected)
49+
}
50+
51+
/**
52+
* Applies transformations to the resolved value without triggering execution.
53+
* Returns another lazy promise that triggers execution via `.then`
54+
*/
55+
map<U = T>(
56+
mapFn?: ((value: T, i?: 0) => Awaitable<U>) | undefined | null,
57+
thisArg?: any
58+
): JSONParseLazyPromise<U, TTask> {
59+
return new JSONParseLazyPromise(this.#task, pipe(this.#mapFn ?? id, mapFn ?? id), thisArg)
60+
}
61+
62+
catch<V = never>(onrejected?: ((reason: any) => V | PromiseLike<V>) | null): Promise<T | V> {
63+
return this.#promise.catch(onrejected)
64+
}
65+
66+
finally(onfinally?: (() => void) | null): Promise<T> {
67+
return this.#promise.finally(onfinally)
68+
}
69+
70+
[Symbol.toStringTag] = 'JSONParseLazyPromise'
71+
}

json-parse-stream.ts

Lines changed: 12 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
// deno-lint-ignore-file no-explicit-any no-cond-assign ban-unused-ignore no-unused-vars
22
import { streamToAsyncIter } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
3-
import { ResolvablePromise } from 'https://ghuc.cc/worker-tools/resolvable-promise/index.ts';
43
import { JSONParser } from './json-parser.js';
4+
import { JSONParseLazyPromise } from './json-parse-lazy-promise.ts';
55
import { normalize, match } from './json-path.ts'
66

7-
async function* identity<T>(iter: Iterable<T> | AsyncIterable<T>) {
8-
for await (const x of iter) yield x;
9-
}
10-
117
// FIXME: avoid string concatenation/joining
128
const mkPath = (parser: any) => {
139
const path = [...parser.stack.map((_: any) => _.key), parser.key]; // TODO: modify parser to provide key efficiently
@@ -34,7 +30,7 @@ export class JSONParseStream<T = any> extends TransformStream<string | Uint8Arra
3430
}
3531
};
3632
},
37-
transform: (chunk, controller) => {
33+
transform: (chunk) => {
3834
parser.write(chunk);
3935
},
4036
});
@@ -46,10 +42,10 @@ export class JSONParseStream<T = any> extends TransformStream<string | Uint8Arra
4642

4743
const remove = <K, V>(m: Map<K, V>, k: K) => { const v = m.get(k); m.delete(k); return v; }
4844

45+
4946
/** @deprecated Rename!!! */
5047
export class JSONParseNexus<T = any> extends TransformStream<string | Uint8Array, [string, T]> {
5148
#queues = new Map<string, ReadableStreamDefaultController<any>>();
52-
#lazies = new Map<string, ResolvablePromise<any>>();
5349
#reader: ReadableStreamDefaultReader<[string, T]>
5450

5551
constructor() {
@@ -68,13 +64,6 @@ export class JSONParseNexus<T = any> extends TransformStream<string | Uint8Array
6864
remove(this.#queues, expr)!.close()
6965
}
7066
}
71-
for (const expr of this.#lazies.keys()) {
72-
if (match(expr, path)) {
73-
remove(this.#lazies, expr)!.resolve(value)
74-
} else if (expr.startsWith(path)) {
75-
remove(this.#lazies, expr)!.resolve(undefined)
76-
}
77-
}
7867

7968
controller.enqueue([path, value]);
8069
};
@@ -87,34 +76,12 @@ export class JSONParseNexus<T = any> extends TransformStream<string | Uint8Array
8776
this.#reader = this.readable.getReader();
8877
}
8978

90-
/**
91-
* Returns a promise that resolves with the value found at the provided `jsonPath` or `undefined` otherwise.
92-
*
93-
* __Starts to pull values form the underlying sink immediately!__
94-
* If the value is located after a large array in the JSON, the entire array will be parsed and kept in a queue!
95-
* Consider using `lazy` instead if pulling form a stream elsewhere.
96-
*/
97-
async eager<U = any>(jsonPath: string): Promise<U | undefined> {
98-
const x = await this.stream(jsonPath).getReader().read();
99-
return x.done ? undefined : x.value;
100-
}
101-
102-
/**
103-
* Returns a promise that resolves with the value found at the provided `jsonPath` or `undefined` otherwise.
104-
*
105-
* __Does not pull from the underlying sink on its own!__
106-
* If there isn't another consumer pulling past the point where the value if found, it will never resolve!
107-
* Consider using `eager` instead when running into deadlocks.
108-
*/
109-
lazy<U = any>(jsonPath: string): Promise<U | undefined> & { pull: () => Promise<U | undefined> } {
110-
const p = new ResolvablePromise<U | undefined>();
111-
this.#lazies.set(normalize(jsonPath), p)
112-
return Object.assign(p, { pull: () => this.eager(jsonPath) })
113-
}
114-
115-
/** @deprecated Use lazy/eager instead to meet your use case */
116-
promise<T = any>(jsonPath: string): Promise<T | undefined> {
117-
return this.eager(jsonPath);
79+
promise<T = any>(jsonPath: string): JSONParseLazyPromise<T | undefined> {
80+
const stream = this.stream(jsonPath);
81+
return new JSONParseLazyPromise(async () => {
82+
const x = await stream.getReader().read();
83+
return x.done ? undefined : x.value;
84+
})
11885
}
11986

12087
stream<U = any>(jsonPath: string): ReadableStream<U> {
@@ -123,17 +90,16 @@ export class JSONParseNexus<T = any> extends TransformStream<string | Uint8Array
12390
start: (queue) => {
12491
this.#queues.set(path, queue)
12592
},
126-
pull: async (queue) => {
127-
// console.log('pull', jsonPath, queue.desiredSize)
93+
pull: async () => {
12894
while (true) {
12995
const { done, value } = await this.#reader.read();
13096
// FIXME: avoid duplicate match
13197
if (done || match(value[0], path)) break;
13298
}
133-
// console.log('pull result', jsonPath, queue.desiredSize)
13499
},
135100
cancel: (err) => {
136-
// If one of the child streams errors, error the whole pipeline. // TODO: or should it?
101+
// If one of the child streams errors, error the whole pipeline.
102+
// TODO: Or should it?
137103
this.#reader.cancel(err)
138104
},
139105
}, { highWaterMark: 0 }) // does not pull on its own

json-stringify.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const check = (v: any) => {
3232

3333
// TODO: Add replacer
3434
// TODO: add formatting/spaces
35-
// TODO: concurrent rendering
35+
// TODO: concurrent objects/arrays
3636
/**
3737
* @deprecated Change name to something more descriptive!?
3838
*/

0 commit comments

Comments
 (0)