Skip to content

Commit dcb9110

Browse files
committed
Commit
1 parent 925eb08 commit dcb9110

File tree

7 files changed

+665
-4
lines changed

7 files changed

+665
-4
lines changed

async-queue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
// deno-lint-ignore-file
2+
3+
// TODO: Move to separate module
4+
25
type Resolver<T> = (value: T | PromiseLike<T>) => void;
36
type Rejecter = (reason?: any) => void;
47

json-parse-stream.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
// deno-lint-ignore-file no-explicit-any
2-
import { JSONParser } from 'https://ghuc.cc/qwtel/jsonparse/index.js';
32
import { ResolvablePromise } from 'https://ghuc.cc/worker-tools/resolvable-promise/index.ts';
43
import { asyncIterToStream } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
4+
import { JSONParser } from './json-parser.js';
55
import { normalize, match } from './json-path.ts'
66
import { AsyncQueue } from './async-queue.ts';
7+
import { BinarySplitStream } from './split-stream.ts'
78

89
async function* identity<T>(iter: Iterable<T> | AsyncIterable<T>) {
910
for await (const x of iter) yield x;
@@ -12,7 +13,7 @@ async function* identity<T>(iter: Iterable<T> | AsyncIterable<T>) {
1213
/**
1314
*
1415
*/
15-
export class JSONParseStream<T = any> extends TransformStream<string | BufferSource, T> {
16+
export class JSONParseStream<T = any> extends TransformStream<string | Uint8Array, T> {
1617
#promises: Map<string, ResolvablePromise<any>> = new Map()
1718
#queues: Map<string, AsyncQueue<any>> = new Map()
1819

@@ -73,3 +74,35 @@ export class JSONParseStream<T = any> extends TransformStream<string | BufferSou
7374
return asyncIterToStream(this.iterable(jsonPath));
7475
}
7576
}
77+
78+
/** @deprecated Untested */
79+
export class ND_JSONParseStream<T = any> extends TransformStream<Uint8Array, T> {
80+
constructor() {
81+
let splitStream: BinarySplitStream;
82+
let writer: WritableStreamDefaultWriter;
83+
let decoder: TextDecoder;
84+
super({
85+
start(controller) {
86+
splitStream = new BinarySplitStream()
87+
writer = splitStream.writable.getWriter();
88+
decoder = new TextDecoder();
89+
(async () => {
90+
try {
91+
for await (const line of splitStream.readable) {
92+
const sLine = decoder.decode(line).trim()
93+
if (sLine) controller.enqueue(JSON.parse(sLine))
94+
}
95+
} catch (err) {
96+
writer.abort(err)
97+
}
98+
})()
99+
},
100+
transform(chunk) {
101+
writer.write(chunk)
102+
},
103+
flush() {
104+
writer.close()
105+
},
106+
})
107+
}
108+
}

0 commit comments

Comments
 (0)