Skip to content

Commit 290227f

Browse files
committed
Commit
1 parent f487754 commit 290227f

File tree

5 files changed

+147
-12
lines changed

5 files changed

+147
-12
lines changed

async-queue.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export class AsyncQueue<T = any> implements AsyncIterableIterator<T> {
4949
this.#error = err;
5050
}
5151

52-
this.return();
52+
this.return();
5353
}
5454

5555
#abortListener = () => {
@@ -59,17 +59,17 @@ export class AsyncQueue<T = any> implements AsyncIterableIterator<T> {
5959
push(el: T) {
6060
const promise = this.#unconsumedPromises.shift();
6161
if (promise) {
62-
promise.resolve({ value: el as T, done: false }); // FIXME
62+
promise.resolve({ value: el, done: false });
6363
} else {
64-
this.#unconsumedValues.push(el as T); // FIXME
64+
this.#unconsumedValues.push(el);
6565
}
6666
}
6767

6868
// TODO: does it make sense/is it possible to add `shift` / `pop`??
6969

70-
/** Alias for `next` */
71-
shift(): Promise<IteratorResult<T, void>> {
72-
return this.next();
70+
async shift(): Promise<T | undefined> {
71+
const { done, value } = await this.next();
72+
return done ? undefined : value;
7373
}
7474

7575
next(): Promise<IteratorResult<T, void>> {
@@ -99,7 +99,7 @@ export class AsyncQueue<T = any> implements AsyncIterableIterator<T> {
9999
this.#unconsumedPromises.push({ resolve, reject });
100100
});
101101
}
102-
102+
103103
return(): Promise<IteratorResult<T, void>> {
104104
if (this.#signal) {
105105
this.#signal.removeEventListener('abort', this.#abortListener);

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './json-stringify.ts'
22
export * from './json-parse-stream.ts'
33
export * from './json-stringify-stream.ts'
4+
export * from './json-fetch.ts'
45
export * from './ndjson-parse-stream.ts'
56
export * from './ndjson-stringify-stream.ts'

json-fetch.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// deno-lint-ignore-file no-explicit-any
2+
import { StreamResponse, StreamRequest } from "../stream-response/index.ts";
3+
import { JSONStringifyReadable } from './json-stringify.ts';
4+
5+
export type JSONStreamBodyInit = any
6+
export type JSONStreamRequestInit = Omit<RequestInit, 'body'> & { body?: JSONStreamBodyInit }
7+
8+
export class JSONStreamRequest extends StreamRequest {
9+
static contentType = 'application/json;charset=UTF-8';
10+
static accept = 'application/json, text/plain, */*';
11+
12+
constructor(
13+
input: RequestInfo | URL,
14+
init?: JSONStreamRequestInit,
15+
// replacer?: Parameters<typeof JSON.stringify>[1],
16+
// space?: Parameters<typeof JSON.stringify>[2],
17+
) {
18+
const { headers: _headers, body: _body, ...rest } = init || {};
19+
20+
const body = new JSONStringifyReadable(_body)
21+
22+
const headers = new Headers(_headers);
23+
if (!headers.has('Content-Type') && _body != null)
24+
headers.set('Content-Type', JSONStreamRequest.contentType);
25+
26+
if (!headers.has('Accept'))
27+
headers.set('Accept', JSONStreamRequest.accept);
28+
29+
super(input instanceof URL ? input.href : input, { headers, body, ...rest });
30+
}
31+
}
32+
33+
export class JSONStreamResponse extends StreamResponse {
34+
static contentType = 'application/json;charset=UTF-8';
35+
36+
constructor(
37+
body?: JSONStreamBodyInit | null,
38+
init?: ResponseInit,
39+
// replacer?: Parameters<typeof JSON.stringify>[1],
40+
// space?: Parameters<typeof JSON.stringify>[2],
41+
) {
42+
const { headers: _headers, ...rest } = init || {};
43+
44+
const _body = new JSONStringifyReadable(body)
45+
46+
const headers = new Headers(_headers);
47+
48+
if (!headers.has('Content-Type') && body != null)
49+
headers.set('Content-Type', JSONStreamResponse.contentType);
50+
51+
super(_body, { headers, ...rest });
52+
}
53+
}
54+
55+
export type {
56+
JSONStreamRequest as JSONRequest,
57+
JSONStreamResponse as JSONResponse,
58+
}

json-stringify.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
// deno-lint-ignore-file no-explicit-any no-empty
2-
import { asyncIterableToStream } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
2+
import { asyncIterToStream } from 'https://ghuc.cc/qwtel/whatwg-stream-to-async-iter/index.ts'
33

44
type SeenWeakSet = WeakSet<any>;
55

66
type Primitive = undefined | boolean | number | string | bigint | symbol;
77

88
export type ToJSON = { toJSON: (key?: any) => string }
99

10-
const _isIterable = <T>(x: unknown): x is Iterable<T> =>
10+
export const isIterable = <T>(x: unknown): x is Iterable<T> =>
1111
x != null && typeof x === 'object' && Symbol.iterator in x
1212

13-
const isAsyncIterable = <T>(x: unknown): x is AsyncIterable<T> =>
13+
export const isAsyncIterable = <T>(x: unknown): x is AsyncIterable<T> =>
1414
x != null && typeof x === 'object' && Symbol.asyncIterator in x
1515

1616
const isPromiseLike = <T>(x: unknown): x is PromiseLike<T> =>
@@ -103,7 +103,7 @@ export async function* jsonStringifyGenerator(
103103
export function jsonStringifyStream(
104104
value: null | Primitive | ToJSON | any[] | Record<string, any> | PromiseLike<any> | AsyncIterable<any> | ReadableStream,
105105
): ReadableStream<string> {
106-
return asyncIterableToStream(jsonStringifyGenerator(value))
106+
return asyncIterToStream(jsonStringifyGenerator(value))
107107
}
108108

109109
export class JSONStringifyReadable extends ReadableStream<string> {
@@ -118,7 +118,7 @@ export class JSONStringifyReadable extends ReadableStream<string> {
118118
if (!done) controller.enqueue(value); else controller.close();
119119
},
120120
async cancel(reason) {
121-
try { await iterator.throw?.(reason) } catch { }
121+
try { await iterator.throw?.(reason) } catch { }
122122
},
123123
})
124124
}

test/json-fetch.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// deno-lint-ignore-file no-unused-vars no-explicit-any
2+
import 'https://gist.githubusercontent.com/qwtel/b14f0f81e3a96189f7771f83ee113f64/raw/TestRequest.ts'
3+
import {
4+
assert,
5+
assertExists,
6+
assertEquals,
7+
assertStrictEquals,
8+
assertStringIncludes,
9+
assertThrows,
10+
assertRejects,
11+
assertArrayIncludes,
12+
} from 'https://deno.land/std@0.133.0/testing/asserts.ts'
13+
const { test } = Deno;
14+
15+
import { JSONStreamResponse, JSONStreamRequest } from '../json-fetch.ts'
16+
import { JSONParseStream } from '../index.ts'
17+
18+
test('exists', () =>{
19+
assertExists(JSONStreamRequest)
20+
assertExists(JSONStreamResponse)
21+
})
22+
23+
test('simple response', async () => {
24+
const actual = await new JSONStreamResponse({ a: 3, b: { nested: 4 }, c: [1, 2, 3], __x: undefined }).json()
25+
assertEquals(actual, { a: 3, b: { nested: 4 }, c: [1, 2, 3] })
26+
})
27+
28+
test('simple request', async () => {
29+
const actual = await new JSONStreamRequest('/', { method: 'PUT', body: { a: 3, b: { nested: 4 }, c: [1, 2, 3], __x: undefined } }).json()
30+
assertEquals(actual, { a: 3, b: { nested: 4 }, c: [1, 2, 3] })
31+
})
32+
33+
test('with promise response', async () => {
34+
const actual = await new JSONStreamResponse(({ a: 3, b: Promise.resolve(4) })).json()
35+
assertEquals(actual, { a: 3, b: 4 })
36+
})
37+
38+
test('with promise request', async () => {
39+
const actual = await new JSONStreamRequest('/', { method: 'PUT', body: { a: 3, b: Promise.resolve(4) } }).json()
40+
assertEquals(actual, { a: 3, b: 4 })
41+
})
42+
43+
const timeout = (n?: number) => new Promise(r => setTimeout(r, n))
44+
async function* asyncGen<T>(xs: T[]) {
45+
for (const x of xs) { await timeout(); yield x }
46+
}
47+
48+
test('with generator response', async () => {
49+
const actual = await new JSONStreamResponse(({ a: 3, b: Promise.resolve(4), c: asyncGen([1, 2, 3]) })).text()
50+
assertEquals(actual, JSON.stringify({ a: 3, b: 4, c: [1, 2, 3] }))
51+
})
52+
53+
test('with generator request', async () => {
54+
const actual = await new JSONStreamRequest('/', { method: 'PUT', body: { a: 3, b: Promise.resolve(4), c: asyncGen([1, 2, 3]) } }).json()
55+
assertEquals(actual, { a: 3, b: 4, c: [1, 2, 3] })
56+
})
57+
58+
test('circular throws', () => {
59+
const a: any = { a: 3, foo: { b: 4 } }
60+
a.foo.a = a;
61+
assertRejects(() => new JSONStreamResponse((a)).json(), TypeError)
62+
})
63+
64+
test('GET with body throws', () => {
65+
const a: any = { a: 3, foo: { b: 4 } }
66+
assertRejects(() => new JSONStreamRequest('/', { body: a }).json(), TypeError)
67+
})
68+
69+
test('stream', async () => {
70+
const actual = new JSONStreamResponse(({ a: 3, b: Promise.resolve(4), c: asyncGen([1, 2, 3]) }))
71+
const reader = actual.body!.pipeThrough(new JSONParseStream('$.c.*')).getReader()
72+
assertEquals((await reader.read()).value, 1)
73+
assertEquals((await reader.read()).value, 2)
74+
assertEquals((await reader.read()).value, 3)
75+
assertEquals((await reader.read()).done, true)
76+
})

0 commit comments

Comments
 (0)