diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts index 839d8199..4a3ff99e 100644 --- a/tests/streaming.test.ts +++ b/tests/streaming.test.ts @@ -1,7 +1,7 @@ -import { PassThrough } from 'stream'; import assert from 'assert'; import { Stream, _iterSSEMessages } from 'writer-sdk/streaming'; import { APIConnectionError } from 'writer-sdk/error'; +import { ReadableStreamFrom } from 'writer-sdk/internal/shims'; describe('streaming decoding', () => { test('basic', async () => { @@ -11,7 +11,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -29,7 +29,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -48,7 +48,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -69,7 +69,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -97,7 +97,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -126,7 +126,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -147,7 +147,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -172,7 +172,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -205,7 +205,7 @@ describe('streaming decoding', () => { yield Buffer.from('\n'); } - const stream = _iterSSEMessages(new Response(await iteratorToStream(body())), new AbortController())[ + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ Symbol.asyncIterator ](); @@ -240,27 +240,3 @@ test('error handling', async () => { ); await err.toBeInstanceOf(APIConnectionError); }); - -async function iteratorToStream(iterator: AsyncGenerator): Promise { - const parts: unknown[] = []; - - for await (const chunk of iterator) { - parts.push(chunk); - } - - let index = 0; - - const stream = new PassThrough({ - read() { - const value = parts[index]; - if (value === undefined) { - stream.end(); - } else { - index += 1; - stream.write(value); - } - }, - }); - - return stream; -}