Skip to content

Commit b7d8622

Browse files
committed
Commit
1 parent 093e557 commit b7d8622

File tree

4 files changed

+58
-37
lines changed

4 files changed

+58
-37
lines changed

json-parse-stream.ts

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import { streamToAsyncIter } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
33
import { JSONParser } from './json-parser.js';
44
import { normalize, match } from './json-path.ts'
5-
// import { AsyncQueue } from './async-queue.ts';
6-
// import { BinarySplitStream } from './split-stream.ts'
75

86
async function* _identity<T>(iter: Iterable<T> | AsyncIterable<T>) {
97
for await (const x of iter) yield x;
@@ -16,64 +14,89 @@ const mkPath = (parser: any) => {
1614
return normalize(path.join('.')); // FIXME: avoid string concatenation/joining
1715
}
1816

19-
/**
20-
*
21-
*/
2217
export class JSONParseStream<T = any> extends TransformStream<string | Uint8Array, T> {
23-
#pathMap = new Map<any, string>(); // FIXME: clear when processing is done!?
2418
#jsonPath;
25-
// #streams = new Map<string, ReadableStream<unknown>>();
2619

2720
constructor(jsonPath = '$.*') {
2821
let parser!: JSONParser;
22+
const expr = normalize(jsonPath)
2923
super({
3024
start: (controller) => {
3125
parser = new JSONParser();
3226
parser.onValue = (value: T) => {
3327
const path = mkPath(parser)
3428

35-
// FIXME: better solution?
36-
this.#pathMap.set(value, path);
37-
controller.enqueue(value);
29+
if (match(expr, path)) {
30+
controller.enqueue(value as any);
31+
} else if (expr.startsWith(path)) {
32+
// Closing the stream early when the selected path can no longer yield values.
33+
controller.terminate()
34+
}
3835
};
3936
},
4037
transform: (chunk) => {
4138
parser.write(chunk);
4239
},
4340
});
44-
const expr = normalize(jsonPath)
4541
this.#jsonPath = expr;
4642
}
4743

44+
get path() { return this.#jsonPath }
45+
}
46+
47+
export class JSONParseWritable<T = any> extends WritableStream<string | Uint8Array> {
48+
#pathMap = new Map<any, string>(); // FIXME: clear when processing is done!?
49+
#readable: ReadableStream<T>;
50+
// #streams = new Map<string, ReadableStream<unknown>>();
51+
52+
constructor() {
53+
let parser: JSONParser;
54+
let readable: ReadableStream<T>
55+
super({
56+
start: (writeCtrl) => {
57+
parser = new JSONParser();
58+
readable = new ReadableStream({
59+
start: (readCtrl) => {
60+
parser.onValue = (value: T) => {
61+
const path = mkPath(parser)
62+
63+
// FIXME: better solution?
64+
this.#pathMap.set(value, path);
65+
readCtrl.enqueue(value);
66+
};
67+
},
68+
})
69+
},
70+
write: (chunk) => {
71+
parser.write(chunk);
72+
},
73+
});
74+
this.#readable = readable!; // sus
75+
}
76+
4877
#filterStream(expr: string) {
4978
return new TransformStream({
5079
transform: (value, controller) => {
5180
const path = this.#pathMap.get(value)!
5281
if (match(expr, path)) {
5382
controller.enqueue(value as any);
5483
}
55-
// Closing the stream early when the selected path can no longer yield values.
5684
else if (expr.startsWith(path)) {
85+
// Closing the stream early when the selected path can no longer yield values.
5786
controller.terminate()
5887
// this.#streams.delete(expr) // no longer need to track the stream
5988
}
6089
}
6190
})
6291
}
6392

64-
// FIXME: Just acquiring this property will lock the internal stream. Different from regular transform stream.
6593
get readable(): ReadableStream<T> {
66-
return this.#readable.pipeThrough(this.#filterStream(this.#jsonPath))
67-
}
68-
69-
#a?: ReadableStream<T>;
70-
get #readable() {
71-
return this.#a ?? super.readable
94+
return this.#readable
7295
}
7396

7497
#clone() {
7598
const [a, b] = this.#readable.tee()
76-
this.#a = a;
99+
this.#readable = a;
77100
return b;
78101
}
79102

test/index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import * as json from '../index.ts';
1616

1717
test('transform streams', () => {
1818
assertExists(json.JSONParseStream)
19+
assertExists(json.JSONParseWritable)
1920
assertExists(json.JSONStringifyStream)
2021
})
2122

test/json-parse-stream.test.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
const { test } = Deno;
1414

1515
import { jsonStringifyGenerator } from '../json-stringify.ts'
16-
import { JSONParseStream } from '../json-parse-stream.ts'
16+
import { JSONParseStream, JSONParseWritable } from '../json-parse-stream.ts'
1717

1818
async function consume(stream: ReadableStream) {
1919
const reader = stream.getReader();
@@ -75,67 +75,65 @@ const aJoin = async (iter: AsyncIterable<string>, separator = '') => {
7575
}
7676

7777
test('promise value', async () => {
78-
const parseStream = new JSONParseStream()
78+
const parseStream = new JSONParseWritable()
7979
const actual = {
8080
type: parseStream.promise('$.type'),
8181
data: parseStream.iterable('$.data.*')
8282
}
8383
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
8484
const done = new Response(expected).body!
85-
.pipeThrough(parseStream)
86-
.pipeTo(new WritableStream())
85+
.pipeTo(parseStream)
8786

8887
const actualString = await aJoin(jsonStringifyGenerator(actual))
8988
assertEquals(actualString, expected)
9089
await done;
9190
})
9291

9392
test('promise value II', async () => {
94-
const parseStream = new JSONParseStream()
93+
const parseStream = new JSONParseWritable()
9594
const actual = {
9695
type: parseStream.promise('$.type'),
9796
data: parseStream.stream('$.data.*')
9897
}
9998
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
10099
const done = new Response(expected).body!
101-
.pipeThrough(parseStream)
102-
.pipeTo(new WritableStream())
100+
.pipeTo(parseStream)
103101

104102
const actualString = await aJoin(jsonStringifyGenerator(actual))
105103
assertEquals(actualString, expected)
106104
await done;
107105
})
108106

109107
test('promise value III', async () => {
110-
const parseStream = new JSONParseStream('$..*')
108+
const parseStream = new JSONParseWritable()
111109
const actual = {
112110
type: parseStream.promise('$.type'),
113111
data: parseStream.iterable('$.data.*')
114112
}
115113
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
116-
new Response(expected).body!.pipeThrough(parseStream) // no pipeTo necessary
114+
new Response(expected).body!.pipeTo(parseStream)
117115

118116
const actualString = await aJoin(jsonStringifyGenerator(actual))
119117
assertEquals(actualString, expected)
120118
})
121119

122120
test('promise value IV', async () => {
123-
const parseStream = new JSONParseStream('$..*')
121+
const parseStream = new JSONParseWritable()
124122
const actual = {
125123
type: parseStream.promise('$.type'),
126124
data: parseStream.stream('$.data.*')
127125
}
128126
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
129-
new Response(expected).body!.pipeThrough(parseStream) // no pipeTo necessary
127+
new Response(expected).body!.pipeTo(parseStream)
130128

131129
const actualString = await aJoin(jsonStringifyGenerator(actual))
132130
assertEquals(actualString, expected)
133131
})
134132

135133
test('read only until first value', async () => {
136-
const parseStream = new JSONParseStream('$..*')
134+
const parseStream = new JSONParseWritable()
137135
const type = parseStream.promise('$.type');
138136
const expected = JSON.stringify({ type: 'foo', data: [{ a: 1 }, { b: 2 }, { c: 3 }] })
139-
new Response(expected).body!.pipeThrough(parseStream) // no pipeTo necessary
137+
new Response(expected).body!.pipeTo(parseStream)
140138
assertEquals(await type, 'foo')
141139
})
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
const { test } = Deno;
1414

1515
import { JSONStringifyStream } from '../json-stringify-stream.ts'
16-
import { JSONParseStream } from '../json-parse-stream.ts'
16+
import { JSONParseStream, JSONParseWritable } from '../json-parse-stream.ts'
1717
import { jsonStringifyStream } from '../json-stringify.ts'
1818

1919
const collect = async <T>(stream: ReadableStream<T>) => {
@@ -66,7 +66,7 @@ test('roundtrip', async () => {
6666
})
6767

6868
test('Retrieving multiple values and collections', async () => {
69-
const jsonStream = new JSONParseStream();
69+
const jsonStream = new JSONParseWritable();
7070
const asyncData = {
7171
type: jsonStream.promise('$.type'),
7272
items: jsonStream.stream('$.items.*'),
@@ -82,8 +82,7 @@ test('Retrieving multiple values and collections', async () => {
8282
]
8383
};
8484

85-
new Response(JSON.stringify(nested)).body!
86-
.pipeThrough(jsonStream)
85+
new Response(JSON.stringify(nested)).body!.pipeTo(jsonStream)
8786

8887
assertEquals(await asyncData.type, 'foo')
8988

0 commit comments

Comments
 (0)