From 2b76d1ee46925e0f2e39dd80b22c48afd65e6777 Mon Sep 17 00:00:00 2001 From: Joe Tannenbaum Date: Tue, 20 May 2025 11:55:00 -0400 Subject: [PATCH 1/3] json react --- packages/react/src/hooks/use-stream.ts | 61 ++++++++++++------ packages/react/src/types.ts | 4 +- packages/react/tests/use-stream.test.ts | 86 +++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 19 deletions(-) diff --git a/packages/react/src/hooks/use-stream.ts b/packages/react/src/hooks/use-stream.ts index 4051d05..ecf7373 100644 --- a/packages/react/src/hooks/use-stream.ts +++ b/packages/react/src/hooks/use-stream.ts @@ -2,24 +2,27 @@ import { nanoid } from "nanoid"; import { useCallback, useEffect, useRef, useState } from "react"; import { StreamListenerCallback, StreamMeta, StreamOptions } from "../types"; -const streams = new Map(); +const streams = new Map>(); const listeners = new Map(); -const resolveStream = (id: string): StreamMeta => { - const stream = streams.get(id); +const resolveStream = (id: string): StreamMeta => { + const stream = streams.get(id) as StreamMeta | undefined; if (stream) { return stream; } - streams.set(id, { + const newStream: StreamMeta = { controller: new AbortController(), data: "", isFetching: false, isStreaming: false, - }); + jsonData: null as TJsonData, + }; + + streams.set(id, newStream); - return streams.get(id)!; + return newStream; }; const resolveListener = (id: string) => { @@ -50,9 +53,12 @@ const addListener = (id: string, listener: StreamListenerCallback) => { }; }; -export const useStream = (url: string, options: StreamOptions = {}) => { +export const useStream = ( + url: string, + options: StreamOptions = {}, +) => { const id = useRef(options.id ?? nanoid()); - const stream = useRef(resolveStream(id.current)); + const stream = useRef(resolveStream(id.current)); const headers = useRef( (() => { const headers: HeadersInit = { @@ -75,21 +81,27 @@ export const useStream = (url: string, options: StreamOptions = {}) => { ); const [data, setData] = useState(stream.current.data); + const [jsonData, setJsonData] = useState( + stream.current.jsonData, + ); const [isFetching, setIsFetching] = useState(stream.current.isFetching); const [isStreaming, setIsStreaming] = useState(stream.current.isStreaming); - const updateStream = useCallback((params: Partial) => { - streams.set(id.current, { - ...resolveStream(id.current), - ...params, - }); + const updateStream = useCallback( + (params: Partial>) => { + streams.set(id.current, { + ...resolveStream(id.current), + ...params, + }); - const updatedStream = resolveStream(id.current); + const updatedStream = resolveStream(id.current); - listeners - .get(id.current) - ?.forEach((listener) => listener(updatedStream)); - }, []); + listeners + .get(id.current) + ?.forEach((listener) => listener(updatedStream)); + }, + [], + ); const cancel = useCallback(() => { stream.current.controller.abort(); @@ -107,6 +119,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { const clearData = useCallback(() => { updateStream({ data: "", + jsonData: null, }); }, []); @@ -187,6 +200,16 @@ export const useStream = (url: string, options: StreamOptions = {}) => { options.onFinish?.(); + if (options.json) { + try { + updateStream({ + jsonData: JSON.parse(newData) as TJsonData, + }); + } catch (error) { + options.onError?.(error as Error); + } + } + return ""; } @@ -208,6 +231,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { setIsFetching(streamUpdate.isFetching); setIsStreaming(streamUpdate.isStreaming); setData(streamUpdate.data); + setJsonData(streamUpdate.jsonData); }, ); @@ -236,6 +260,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { return { data, + jsonData, isFetching, isStreaming, id: id.current, diff --git a/packages/react/src/types.ts b/packages/react/src/types.ts index c225161..0b46edd 100644 --- a/packages/react/src/types.ts +++ b/packages/react/src/types.ts @@ -20,6 +20,7 @@ export type StreamOptions = { initialInput?: Record; headers?: Record; csrfToken?: string; + json?: boolean; onResponse?: (response: Response) => void; onData?: (data: string) => void; onCancel?: () => void; @@ -27,11 +28,12 @@ export type StreamOptions = { onError?: (error: Error) => void; }; -export type StreamMeta = { +export type StreamMeta = { controller: AbortController; data: string; isFetching: boolean; isStreaming: boolean; + jsonData: TJsonData | null; }; export type StreamListenerCallback = (stream: StreamMeta) => void; diff --git a/packages/react/tests/use-stream.test.ts b/packages/react/tests/use-stream.test.ts index c5cb3d2..c1dfbf5 100644 --- a/packages/react/tests/use-stream.test.ts +++ b/packages/react/tests/use-stream.test.ts @@ -391,4 +391,90 @@ describe("useStream", () => { expect(onCancel).toHaveBeenCalled(); }); + + it("should parse JSON data when json option is true", async () => { + const jsonData = { test: "data", value: 123 }; + server.use( + http.post(url, async () => { + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode('{"test":"data",'), + ); + + await delay(20); + controller.enqueue( + new TextEncoder().encode('"value":123}'), + ); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const { result } = renderHook(() => useStream(url, { json: true })); + + await act(() => { + result.current.send({}); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(result.current.data).toEqual(JSON.stringify(jsonData)); + expect(result.current.jsonData).toEqual(jsonData); + }); + + it("should handle JSON parsing errors", async () => { + const invalidJson = "{invalid json}"; + const onError = vi.fn(); + + server.use( + http.post(url, async () => { + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode(invalidJson), + ); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const { result } = renderHook(() => + useStream(url, { json: true, onError }), + ); + + await act(() => { + result.current.send({}); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(onError).toHaveBeenCalled(); + + expect(result.current.data).toBe(invalidJson); + expect(result.current.jsonData).toBeNull(); + }); }); From e519f0029e274920bcce084b8759800edf63ade5 Mon Sep 17 00:00:00 2001 From: Joe Tannenbaum Date: Tue, 20 May 2025 13:44:57 -0400 Subject: [PATCH 2/3] useJsonStream --- packages/react/src/hooks/use-stream.ts | 53 ++++--- packages/react/src/index.ts | 2 +- packages/react/tests/use-stream.test.ts | 88 ++++++++++- packages/vue/src/composables/useStream.ts | 86 ++++++++--- packages/vue/src/index.ts | 2 +- packages/vue/src/types.ts | 8 +- packages/vue/tests/useStream.test.ts | 172 +++++++++++++++++++++- 7 files changed, 362 insertions(+), 49 deletions(-) diff --git a/packages/react/src/hooks/use-stream.ts b/packages/react/src/hooks/use-stream.ts index ecf7373..3bc8f2d 100644 --- a/packages/react/src/hooks/use-stream.ts +++ b/packages/react/src/hooks/use-stream.ts @@ -192,32 +192,33 @@ export const useStream = ( options.onData?.(incomingStr); - if (done) { - updateStream({ - data: newData, - isStreaming: false, - }); + const streamParams: Partial> = { + data: newData, + }; - options.onFinish?.(); + if (!done) { + updateStream(streamParams); - if (options.json) { - try { - updateStream({ - jsonData: JSON.parse(newData) as TJsonData, - }); - } catch (error) { - options.onError?.(error as Error); - } - } + return read(reader, newData); + } - return ""; + streamParams.isStreaming = false; + + if (options.json) { + try { + streamParams.jsonData = JSON.parse( + newData, + ) as TJsonData; + } catch (error) { + options.onError?.(error as Error); + } } - updateStream({ - data: newData, - }); + updateStream(streamParams); - return read(reader, newData); + options.onFinish?.(); + + return ""; }); }, [], @@ -269,3 +270,15 @@ export const useStream = ( clearData, }; }; + +export const useJsonStream = ( + url: string, + options: Omit = {}, +) => { + const { jsonData, data, ...rest } = useStream(url, { + ...options, + json: true, + }); + + return { data: jsonData, rawData: data, ...rest }; +}; diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts index 5443974..44726cc 100644 --- a/packages/react/src/index.ts +++ b/packages/react/src/index.ts @@ -1,2 +1,2 @@ export { useEventStream } from "./hooks/use-event-stream"; -export { useStream } from "./hooks/use-stream"; +export { useJsonStream, useStream } from "./hooks/use-stream"; diff --git a/packages/react/tests/use-stream.test.ts b/packages/react/tests/use-stream.test.ts index c1dfbf5..5261deb 100644 --- a/packages/react/tests/use-stream.test.ts +++ b/packages/react/tests/use-stream.test.ts @@ -10,7 +10,7 @@ import { it, vi, } from "vitest"; -import { useStream } from "../src/hooks/use-stream"; +import { useJsonStream, useStream } from "../src/hooks/use-stream"; describe("useStream", () => { const url = "/chat"; @@ -394,6 +394,7 @@ describe("useStream", () => { it("should parse JSON data when json option is true", async () => { const jsonData = { test: "data", value: 123 }; + server.use( http.post(url, async () => { return new HttpResponse( @@ -477,4 +478,89 @@ describe("useStream", () => { expect(result.current.data).toBe(invalidJson); expect(result.current.jsonData).toBeNull(); }); + + it("should parse JSON data when json option is true (useJsonStream)", async () => { + const jsonData = { test: "data", value: 123 }; + + server.use( + http.post(url, async () => { + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode('{"test":"data",'), + ); + + await delay(20); + controller.enqueue( + new TextEncoder().encode('"value":123}'), + ); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const { result } = renderHook(() => useJsonStream(url)); + + await act(() => { + result.current.send({}); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(result.current.data).toEqual(jsonData); + expect(result.current.rawData).toEqual(JSON.stringify(jsonData)); + }); + + it("should handle JSON parsing errors (useJsonStream)", async () => { + const invalidJson = "{invalid json}"; + const onError = vi.fn(); + + server.use( + http.post(url, async () => { + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode(invalidJson), + ); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const { result } = renderHook(() => useJsonStream(url, { onError })); + + await act(() => { + result.current.send({}); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(onError).toHaveBeenCalled(); + + expect(result.current.data).toBeNull(); + expect(result.current.rawData).toBe(invalidJson); + }); }); diff --git a/packages/vue/src/composables/useStream.ts b/packages/vue/src/composables/useStream.ts index 7c91ca8..bb737a8 100644 --- a/packages/vue/src/composables/useStream.ts +++ b/packages/vue/src/composables/useStream.ts @@ -2,24 +2,27 @@ import { nanoid } from "nanoid"; import { onMounted, onUnmounted, readonly, ref } from "vue"; import { StreamListenerCallback, StreamMeta, StreamOptions } from "../types"; -const streams = new Map(); +const streams = new Map>(); const listeners = new Map(); -const resolveStream = (id: string): StreamMeta => { - const stream = streams.get(id); +const resolveStream = (id: string): StreamMeta => { + const stream = streams.get(id) as StreamMeta | undefined; if (stream) { return stream; } - streams.set(id, { + const newStream = { controller: new AbortController(), data: "", isFetching: false, isStreaming: false, - }); + jsonData: null as TJsonData, + }; + + streams.set(id, newStream); - return streams.get(id)!; + return newStream; }; const resolveListener = (id: string) => { @@ -50,9 +53,21 @@ const addListener = (id: string, listener: StreamListenerCallback) => { }; }; -export const useStream = (url: string, options: StreamOptions = {}) => { +export const useStream = ( + url: string, + options: StreamOptions = {}, +): { + data: ReturnType; + jsonData: ReturnType; + isFetching: ReturnType; + isStreaming: ReturnType; + id: string; + send: (body: Record) => void; + cancel: () => void; + clearData: () => void; +} => { const id = options.id ?? nanoid(); - const stream = ref(resolveStream(id)); + const stream = ref>(resolveStream(id)); const headers = (() => { const headers: HeadersInit = { "Content-Type": "application/json", @@ -72,13 +87,14 @@ export const useStream = (url: string, options: StreamOptions = {}) => { return headers; })(); - const data = ref(stream.value.data); + const data = ref(stream.value.data); + const jsonData = ref(stream.value.jsonData); const isFetching = ref(stream.value.isFetching); const isStreaming = ref(stream.value.isStreaming); let stopListening: () => void; - const updateStream = (params: Partial) => { + const updateStream = (params: Partial>) => { streams.set(id, { ...resolveStream(id), ...params, @@ -160,6 +176,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { const clearData = () => { updateStream({ data: "", + jsonData: null, }); }; @@ -173,31 +190,41 @@ export const useStream = (url: string, options: StreamOptions = {}) => { options.onData?.(incomingStr); - if (done) { - updateStream({ - data: newData, - isStreaming: false, - }); + const streamParams: Partial> = { + data: newData, + }; - options.onFinish?.(); + if (!done) { + updateStream(streamParams); - return ""; + return read(reader, newData); } - updateStream({ - data: newData, - }); + streamParams.isStreaming = false; - return read(reader, newData); + if (options.json) { + try { + streamParams.jsonData = JSON.parse(newData) as TJsonData; + } catch (error) { + options.onError?.(error as Error); + } + } + + updateStream(streamParams); + + options.onFinish?.(); + + return ""; }); }; onMounted(() => { - stopListening = addListener(id, (streamUpdate: StreamMeta) => { - stream.value = resolveStream(id); + stopListening = addListener(id, (streamUpdate) => { + stream.value = resolveStream(id); isFetching.value = streamUpdate.isFetching; isStreaming.value = streamUpdate.isStreaming; data.value = streamUpdate.data; + jsonData.value = streamUpdate.jsonData; }); window.addEventListener("beforeunload", cancel); @@ -218,6 +245,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { return { data: readonly(data), + jsonData: readonly(jsonData), isFetching: readonly(isFetching), isStreaming: readonly(isStreaming), id, @@ -226,3 +254,15 @@ export const useStream = (url: string, options: StreamOptions = {}) => { clearData, }; }; + +export const useJsonStream = ( + url: string, + options: Omit = {}, +) => { + const { jsonData, data, ...rest } = useStream(url, { + ...options, + json: true, + }); + + return { data: jsonData, rawData: data, ...rest }; +}; diff --git a/packages/vue/src/index.ts b/packages/vue/src/index.ts index c738d59..6eeba08 100644 --- a/packages/vue/src/index.ts +++ b/packages/vue/src/index.ts @@ -1,2 +1,2 @@ export { useEventStream } from "./composables/useEventStream"; -export { useStream } from "./composables/useStream"; +export { useJsonStream, useStream } from "./composables/useStream"; diff --git a/packages/vue/src/types.ts b/packages/vue/src/types.ts index bd0ed5c..fb3f55a 100644 --- a/packages/vue/src/types.ts +++ b/packages/vue/src/types.ts @@ -22,6 +22,7 @@ export type StreamOptions = { initialInput?: Record; headers?: Record; csrfToken?: string; + json?: boolean; onResponse?: (response: Response) => void; onData?: (data: string) => void; onCancel?: () => void; @@ -29,11 +30,14 @@ export type StreamOptions = { onError?: (error: Error) => void; }; -export type StreamMeta = { +export type StreamMeta = { controller: AbortController; data: string; isFetching: boolean; isStreaming: boolean; + jsonData: TJsonData | null; }; -export type StreamListenerCallback = (stream: StreamMeta) => void; +export type StreamListenerCallback = ( + stream: StreamMeta, +) => void; diff --git a/packages/vue/tests/useStream.test.ts b/packages/vue/tests/useStream.test.ts index 53e4049..487909b 100644 --- a/packages/vue/tests/useStream.test.ts +++ b/packages/vue/tests/useStream.test.ts @@ -10,7 +10,7 @@ import { vi, } from "vitest"; import { createApp } from "vue"; -import { useStream } from "../src/composables/useStream"; +import { useJsonStream, useStream } from "../src/composables/useStream"; function withSetup(composable) { let result; @@ -285,4 +285,174 @@ describe("useStream", () => { expect(capturedHeaders.get("X-STREAM-ID")).toBe(id); }); + + it("parses JSON data when json option is true", async () => { + const jsonData = { test: "data", value: 123 }; + + server.use( + http.post(url, async () => { + await delay(20); + + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode('{"test":"data",'), + ); + + await delay(20); + controller.enqueue( + new TextEncoder().encode('"value":123}'), + ); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const [result] = withSetup(() => useStream(url, { json: true })); + + result.send({}); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(result.data.value).toBe(JSON.stringify(jsonData)); + expect(result.jsonData.value).toEqual(jsonData); + }); + + it("handles JSON parsing errors", async () => { + const invalidJson = "{invalid json}"; + const onError = vi.fn(); + + server.use( + http.post(url, async () => { + await delay(50); + + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(50); + controller.enqueue( + new TextEncoder().encode(invalidJson), + ); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const [result] = withSetup(() => + useStream(url, { json: true, onError }), + ); + + result.send({}); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(onError).toHaveBeenCalled(); + expect(result.data.value).toBe(invalidJson); + expect(result.jsonData.value).toBeNull(); + }); + + it("parses JSON data when json option is true (useJsonStream)", async () => { + const jsonData = { test: "data", value: 123 }; + + server.use( + http.post(url, async () => { + await delay(20); + + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(20); + controller.enqueue( + new TextEncoder().encode('{"test":"data",'), + ); + + await delay(20); + controller.enqueue( + new TextEncoder().encode('"value":123}'), + ); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const [result] = withSetup(() => useJsonStream(url)); + + result.send({}); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(result.data.value).toEqual(jsonData); + expect(result.rawData.value).toBe(JSON.stringify(jsonData)); + }); + + it("handles JSON parsing errors (useJsonStream)", async () => { + const invalidJson = "{invalid json}"; + const onError = vi.fn(); + + server.use( + http.post(url, async () => { + await delay(50); + + return new HttpResponse( + new ReadableStream({ + async start(controller) { + await delay(50); + controller.enqueue( + new TextEncoder().encode(invalidJson), + ); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }), + ); + + const [result] = withSetup(() => useJsonStream(url, { onError })); + + result.send({}); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(onError).toHaveBeenCalled(); + expect(result.data.value).toBeNull(); + expect(result.rawData.value).toBe(invalidJson); + }); }); From 08d898ccc24e63cfe2c54d118341faa9d2058389 Mon Sep 17 00:00:00 2001 From: Joe Tannenbaum Date: Tue, 20 May 2025 13:53:31 -0400 Subject: [PATCH 3/3] useJsonStream docs --- packages/react/README.md | 35 +++++++++++++++++++++++++++++++++++ packages/vue/README.md | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/packages/react/README.md b/packages/react/README.md index aa156ab..93d1c4b 100644 --- a/packages/react/README.md +++ b/packages/react/README.md @@ -144,6 +144,41 @@ function StreamStatus({ id }) { } ``` +The `useJsonStream` hook is identical to the `useStream` hook except that it will attempt to parse the data as JSON once it has finished streaming: + +```tsx +import { useJsonStream } from "@laravel/stream-react"; + +type User = { + id: number; + name: string; + email: string; +}; + +function App() { + const { data, send } = useJsonStream<{ users: User[] }>("users"); + + const loadUsers = () => { + send({ + query: "taylor", + }); + }; + + return ( +
+
    + {data?.users.map((user) => ( +
  • + {user.id}: {user.name} +
  • + ))} +
+ +
+ ); +} +``` + ## Event Streams (SSE) The `useEventStream` hook allows you to seamlessly consume [Server-Sent Events (SSE)](https://laravel.com/docs/responses#event-streams) in your React application. diff --git a/packages/vue/README.md b/packages/vue/README.md index 8980691..4a1304c 100644 --- a/packages/vue/README.md +++ b/packages/vue/README.md @@ -155,6 +155,39 @@ const { isFetching, isStreaming } = useStream("chat", { id: props.id }); ``` +The `useJsonStream` hook is identical to the `useStream` hook except that it will attempt to parse the data as JSON once it has finished streaming: + +```vue + + + +``` + ## Event Streams (SSE) The `useEventStream` hook allows you to seamlessly consume [Server-Sent Events (SSE)](https://laravel.com/docs/responses#event-streams) in your Vue application.