Skip to content

Commit f7c9cea

Browse files
committed
WIP
1 parent 4430f50 commit f7c9cea

File tree

4 files changed

+147
-61
lines changed

4 files changed

+147
-61
lines changed

json-parse-stream.ts

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// deno-lint-ignore-file no-explicit-any no-cond-assign ban-unused-ignore no-unused-vars
22
import { streamToAsyncIter } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
3+
import { ResolvablePromise } from 'https://ghuc.cc/worker-tools/resolvable-promise/index.ts';
34
import { JSONParser } from './json-parser.js';
45
import { normalize, match } from './json-path.ts'
56

@@ -44,67 +45,104 @@ export class JSONParseStream<T = any> extends TransformStream<string | Uint8Arra
4445
get path() { return this.#jsonPath }
4546
}
4647

47-
export class JSONParseWritable<T = any> extends WritableStream<string | Uint8Array> {
48-
#readable: ReadableStream<[string, T]>;
49-
// #streams = new Map<string, ReadableStream<unknown>>();
48+
const extract = <K, V>(m: Map<K, V>, k: K) => { const v = m.get(k); m.delete(k); return v; }
49+
50+
/** @deprecated Rename!!! */
51+
export class JSONParseNexus<T = any> extends TransformStream<string | Uint8Array, [string, T]> {
52+
#promises = new Map<string, ResolvablePromise<any>>();
5053

5154
constructor() {
5255
let parser: JSONParser;
53-
let readable: ReadableStream<[string, T]>
5456
super({
55-
start: (writeCtrl) => {
57+
start: (controller) => {
5658
parser = new JSONParser();
57-
readable = new ReadableStream({
58-
start: (readCtrl) => {
59-
parser.onValue = (value: T) => {
60-
const path = mkPath(parser)
61-
readCtrl.enqueue([path, value]);
62-
};
63-
},
64-
})
59+
parser.onValue = (value: T) => {
60+
const path = mkPath(parser)
61+
62+
controller.enqueue([path, value]);
63+
64+
for (const expr of this.#promises.keys()) {
65+
if (match(expr, path)) {
66+
extract(this.#promises, expr)!.resolve(value)
67+
} else if (expr.startsWith(path)) {
68+
extract(this.#promises, expr)!.resolve(undefined)
69+
}
70+
}
71+
};
6572
},
66-
write: (chunk) => {
67-
parser.write(chunk);
73+
transform(buffer) {
74+
console.log('starting to pull')
75+
// console.log('write', buffer)
76+
parser.write(buffer)
77+
},
78+
flush() {
79+
// TODO: close all open promises?
6880
},
6981
});
70-
this.#readable = readable!; // sus
7182
}
7283

7384
#filterStream(expr: string) {
7485
return new TransformStream({
7586
transform: ([path, value], controller) => {
7687
if (match(expr, path)) {
7788
controller.enqueue(value as any);
78-
}
79-
else if (expr.startsWith(path)) {
80-
// Closing the stream early when the selected path can no longer yield values.
89+
} else if (expr.startsWith(path)) {
8190
controller.terminate()
8291
// this.#streams.delete(expr) // no longer need to track the stream
8392
}
8493
}
8594
})
8695
}
8796

88-
/** @deprecated should this be exposed? */
89-
get readable(): ReadableStream<[string, T]> {
90-
return this.#readable
97+
#a?: ReadableStream<[string, T]>
98+
get #readable(): ReadableStream<[string, T]> {
99+
return this.#a ?? this.readable;
91100
}
92101

93-
#clone() {
102+
#clone(last?: boolean) {
103+
if (last) return this.#readable;
94104
const [a, b] = this.#readable.tee()
95-
this.#readable = a;
105+
this.#a = a;
96106
return b;
97107
}
98108

99-
async promise<T = any>(jsonPath: string): Promise<T | undefined> {
109+
/**
110+
* Returns a promise that resolves with the value found at the provided `jsonPath` or `undefined` otherwise.
111+
*
112+
* __Starts to pull values form the underlying sink immediately!__
113+
* If the value is located after a large array in the JSON, the entire array will be parsed and kept in a queue!
114+
* Use `lazy` instead if pulling form the stream elsewhere.
115+
*/
116+
async eager<T = any>(jsonPath: string): Promise<T | undefined> {
117+
console.log('eager', jsonPath, this.writable.locked)
100118
const expr = normalize(jsonPath)
101119
const stream = this.#clone().pipeThrough(this.#filterStream(expr))
102120
// this.#streams.set(expr, stream)
103121
const { done, value } = await stream.getReader().read();
122+
// console.log('eager', value)
104123
return done ? undefined : value;
105124
}
106125

126+
/**
127+
* Returns a promise that resolves with the value found at the provided `jsonPath` or `undefined` otherwise.
128+
*
129+
* __Does not pull from the underlying sink on its own!__
130+
* If there isn't another consumer pulling past the point where the value if found, it will never resolve!
131+
* Use with care!
132+
*/
133+
lazy<T = any>(jsonPath: string): Promise<T | undefined> & { pull: () => Promise<T | undefined> } {
134+
console.log('lazy', jsonPath, this.writable.locked)
135+
const p = new ResolvablePromise<T | undefined>();
136+
this.#promises.set(normalize(jsonPath), p)
137+
return Object.assign(p, { pull: () => this.eager(jsonPath) })
138+
}
139+
140+
promise<T = any>(jsonPath: string): Promise<T | undefined> & { pull: () => Promise<T | undefined> } {
141+
return this.lazy(jsonPath);
142+
}
143+
107144
stream<T = any>(jsonPath: string): ReadableStream<T> {
145+
console.log('stream', jsonPath, this.writable.locked)
108146
const expr = normalize(jsonPath)
109147
const stream = this.#clone().pipeThrough(this.#filterStream(expr))
110148
// this.#streams.set(expr, stream)

scripts/build_npm.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ await build({
4242
name: '@worker-tools/resolvable-promise',
4343
version: 'latest',
4444
},
45-
'https://ghuc.cc/worker-tools/extendable-promise/index.ts': {
46-
name: '@worker-tools/extendable-promise',
47-
version: 'latest',
48-
},
45+
// 'https://ghuc.cc/worker-tools/extendable-promise/index.ts': {
46+
// name: '@worker-tools/extendable-promise',
47+
// version: 'latest',
48+
// },
4949
'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts': {
5050
name: 'whatwg-stream-to-async-iter',
5151
version: 'latest',

test/json-parse-stream.test.ts

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import {
1212
} from 'https://deno.land/std@0.133.0/testing/asserts.ts'
1313
const { test } = Deno;
1414

15-
import { jsonStringifyGenerator } from '../json-stringify.ts'
16-
import { JSONParseStream, JSONParseWritable } from '../json-parse-stream.ts'
15+
import { jsonStringifyGenerator, jsonStringifyStream } from '../json-stringify.ts'
16+
import { JSONParseStream, JSONParseNexus } from '../json-parse-stream.ts'
1717

1818
async function consume(stream: ReadableStream) {
1919
const reader = stream.getReader();
@@ -75,65 +75,102 @@ const aJoin = async (iter: AsyncIterable<string>, separator = '') => {
7575
}
7676

7777
test('promise value', async () => {
78-
const parseStream = new JSONParseWritable()
78+
const parseStream = new JSONParseNexus()
7979
const actual = {
80-
type: parseStream.promise('$.type'),
81-
data: parseStream.iterable('$.data.*')
80+
type: parseStream.lazy('$.type'),
81+
items: parseStream.iterable('$.items.*')
8282
}
83-
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
84-
const done = new Response(expected).body!
85-
.pipeTo(parseStream)
83+
const expected = JSON.stringify({ type: 'foo', items: [{ a: 1 }, { a: 2 }, { a: 3 }] })
84+
new Response(expected).body!
85+
.pipeThrough(parseStream)
8686

8787
const actualString = await aJoin(jsonStringifyGenerator(actual))
8888
assertEquals(actualString, expected)
89-
await done;
9089
})
9190

9291
test('promise value II', async () => {
93-
const parseStream = new JSONParseWritable()
92+
const parseStream = new JSONParseNexus()
9493
const actual = {
95-
type: parseStream.promise('$.type'),
96-
data: parseStream.stream('$.data.*')
94+
type: parseStream.lazy('$.type'),
95+
items: parseStream.stream('$.items.*')
9796
}
98-
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
99-
const done = new Response(expected).body!
100-
.pipeTo(parseStream)
97+
const expected = JSON.stringify({ type: 'foo', items: [{ a: 1 }, { a: 2 }, { a: 3 }] })
98+
new Response(expected).body!
99+
.pipeThrough(parseStream)
101100

102101
const actualString = await aJoin(jsonStringifyGenerator(actual))
103102
assertEquals(actualString, expected)
104-
await done;
105103
})
106104

107105
test('promise value III', async () => {
108-
const parseStream = new JSONParseWritable()
106+
const parseStream = new JSONParseNexus()
109107
const actual = {
110-
type: parseStream.promise('$.type'),
111-
data: parseStream.iterable('$.data.*')
108+
type: parseStream.lazy('$.type'),
109+
items: parseStream.iterable('$.items.*')
112110
}
113-
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
114-
new Response(expected).body!.pipeTo(parseStream)
111+
const expected = JSON.stringify({ type: 'foo', items: [{ a: 1 }, { a: 2 }, { a: 3 }] })
112+
new Response(expected).body!.pipeThrough(parseStream)
115113

116114
const actualString = await aJoin(jsonStringifyGenerator(actual))
117115
assertEquals(actualString, expected)
118116
})
119117

120118
test('promise value IV', async () => {
121-
const parseStream = new JSONParseWritable()
119+
const parseStream = new JSONParseNexus()
122120
const actual = {
123-
type: parseStream.promise('$.type'),
124-
data: parseStream.stream('$.data.*')
121+
type: parseStream.lazy('$.type'),
122+
items: parseStream.stream('$.items.*')
125123
}
126-
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
127-
new Response(expected).body!.pipeTo(parseStream)
124+
const expected = JSON.stringify({ type: 'foo', items: [{ a: 1 }, { a: 2 }, { a: 3 }] })
125+
new Response(expected).body!.pipeThrough(parseStream)
128126

129127
const actualString = await aJoin(jsonStringifyGenerator(actual))
130128
assertEquals(actualString, expected)
131129
})
132130

133-
test('read only until first value', async () => {
134-
const parseStream = new JSONParseWritable()
135-
const type = parseStream.promise('$.type');
136-
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
137-
new Response(expected).body!.pipeTo(parseStream)
131+
async function* asyncGen<T>(xs: T[]) {
132+
for (const x of xs) yield x
133+
}
134+
135+
const json1 = { filler: asyncGen(['__', '__', '__']), type: 'foo', items: asyncGen([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }]) }
136+
test('read only until first value eager', async () => {
137+
const parseStream = new JSONParseNexus()
138+
const type = parseStream.eager<string>('$.type');
139+
jsonStringifyStream(json1).pipeThrough(parseStream)
140+
138141
assertEquals(await type, 'foo')
142+
})
143+
const timeout = (n?: number) => new Promise(r => setTimeout(r, n))
144+
145+
test('read only until first value lazy', async () => {
146+
const parseStream = new JSONParseNexus()
147+
const type = parseStream.promise<string>('$.type');
148+
jsonStringifyStream(json1).pipeThrough(parseStream)
149+
150+
assertEquals(await Promise.race([type, timeout(10).then(() => 'x')]), 'x')
151+
})
152+
153+
test('read only until first value lazy II', async () => {
154+
const parseStream = new JSONParseNexus()
155+
const type = parseStream.promise<string>('$.type');
156+
const _items = parseStream.stream('$.items.*')
157+
jsonStringifyStream(json1).pipeThrough(parseStream)
158+
159+
assertEquals(await Promise.race([type, timeout(10).then(() => 'x')]), 'x')
160+
})
161+
162+
test('read only until first value lazy+pull', async () => {
163+
const parseStream = new JSONParseNexus()
164+
const type = parseStream.promise<string>('$.type');
165+
jsonStringifyStream(json1).pipeThrough(parseStream)
166+
167+
assertEquals(await type.pull(), 'foo')
168+
})
169+
170+
test('writable locked?', async () => {
171+
const parseStream = new JSONParseNexus()
172+
const filler = parseStream.stream<string>('$.filler.*');
173+
const items = parseStream.stream<string>('$.items.*');
174+
// jsonStringifyStream(json1).pipeThrough(parseStream)
175+
// assertEquals(await type.pull(), 'foo')
139176
})

test/json-stringify.test.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,15 @@ test('undefined values in generators become null, same as arrays', async () => {
7373
test('undefined toJSON result', async () => {
7474
const a = { toJSON() { return undefined } }
7575
assertEquals(await aJoin(jsonStringifyGenerator({ a, b: 4 })), JSON.stringify({ a, b: 4 }))
76-
})
76+
})
77+
78+
// test('with type', async () => {
79+
// const json1 = { type: 'foo', items: asyncGen([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }]) }
80+
// assertEquals(await aJoin(jsonStringifyGenerator(json1)), JSON.stringify(json1))
81+
// })
82+
83+
// test('two generators', async () => {
84+
// const json1 = { filler: asyncGen(['__', '__', '__']), type: 'foo', items: asyncGen([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }]) }
85+
// assertEquals(await aJoin(jsonStringifyGenerator(json1)), JSON.stringify(json1))
86+
// })
87+

0 commit comments

Comments
 (0)