Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 10 additions & 34 deletions tests/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PassThrough } from 'stream';
import assert from 'assert';
import { Stream, _iterSSEMessages } from 'writer-sdk/streaming';
import { APIConnectionError } from 'writer-sdk/error';
import { ReadableStreamFrom } from 'writer-sdk/internal/shims';

describe('streaming decoding', () => {
test('basic', async () => {
Expand All @@ -11,7 +11,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand All @@ -29,7 +29,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand All @@ -48,7 +48,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand All @@ -69,7 +69,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand Down Expand Up @@ -97,7 +97,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand Down Expand Up @@ -126,7 +126,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand All @@ -147,7 +147,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand All @@ -172,7 +172,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand Down Expand Up @@ -205,7 +205,7 @@ describe('streaming decoding', () => {
yield Buffer.from('\n');
}

const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[
const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[
Symbol.asyncIterator
]();

Expand Down Expand Up @@ -240,27 +240,3 @@ test('error handling', async () => {
);
await err.toBeInstanceOf(APIConnectionError);
});

async function iteratorToStream(iterator: AsyncGenerator<any>): Promise<PassThrough> {
const parts: unknown[] = [];

for await (const chunk of iterator) {
parts.push(chunk);
}

let index = 0;

const stream = new PassThrough({
read() {
const value = parts[index];
if (value === undefined) {
stream.end();
} else {
index += 1;
stream.write(value);
}
},
});

return stream;
}