From bd1275fca33190514b00892373f625938f4ecb9d Mon Sep 17 00:00:00 2001 From: Joe Tannenbaum Date: Tue, 20 May 2025 10:58:32 -0400 Subject: [PATCH 1/2] cancel stream on unmount --- packages/react/src/hooks/use-stream.ts | 13 +++++++++++++ packages/react/tests/use-stream.test.ts | 21 +++++++++++++++++++++ packages/vue/src/composables/useStream.ts | 13 +++++++++++++ 3 files changed, 47 insertions(+) diff --git a/packages/react/src/hooks/use-stream.ts b/packages/react/src/hooks/use-stream.ts index de848b6..d449a3a 100644 --- a/packages/react/src/hooks/use-stream.ts +++ b/packages/react/src/hooks/use-stream.ts @@ -30,6 +30,10 @@ const resolveListener = (id: string) => { return listeners.get(id)!; }; +const hasListeners = (id: string) => { + return listeners.has(id) && listeners.get(id)?.length; +}; + const addListener = (id: string, listener: StreamListenerCallback) => { resolveListener(id).push(listener); @@ -38,6 +42,11 @@ const addListener = (id: string, listener: StreamListenerCallback) => { id, resolveListener(id).filter((l) => l !== listener), ); + + if (!hasListeners(id)) { + streams.delete(id); + listeners.delete(id); + } }; }; @@ -200,6 +209,10 @@ export const useStream = (url: string, options: StreamOptions = {}) => { return () => { stopListening(); + + if (!hasListeners(id.current)) { + cancel(); + } }; }, []); diff --git a/packages/react/tests/use-stream.test.ts b/packages/react/tests/use-stream.test.ts index 77402d4..d24daf4 100644 --- a/packages/react/tests/use-stream.test.ts +++ b/packages/react/tests/use-stream.test.ts @@ -364,4 +364,25 @@ describe("useStream", () => { expect(capturedHeaders.get("X-STREAM-ID")).toBe(id); }); + + it.skip("should cancel stream when component unmounts", async () => { + const onCancel = vi.fn(); + const { unmount, result } = renderHook(() => + useStream(url, { onCancel }), + ); + + await act(() => { + result.current.send({ + test: "ok", + }); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + + unmount(); + + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(onCancel).toHaveBeenCalled(); + }); }); diff --git a/packages/vue/src/composables/useStream.ts b/packages/vue/src/composables/useStream.ts index 9ab2d9b..eae7b26 100644 --- a/packages/vue/src/composables/useStream.ts +++ b/packages/vue/src/composables/useStream.ts @@ -30,6 +30,10 @@ const resolveListener = (id: string) => { return listeners.get(id)!; }; +const hasListeners = (id: string) => { + return listeners.has(id) && listeners.get(id)?.length; +}; + const addListener = (id: string, listener: StreamListenerCallback) => { resolveListener(id).push(listener); @@ -38,6 +42,11 @@ const addListener = (id: string, listener: StreamListenerCallback) => { id, resolveListener(id).filter((l) => l !== listener), ); + + if (!hasListeners(id)) { + streams.delete(id); + listeners.delete(id); + } }; }; @@ -197,6 +206,10 @@ export const useStream = (url: string, options: StreamOptions = {}) => { onUnmounted(() => { stopListening(); window.removeEventListener("beforeunload", cancel); + + if (!hasListeners(id)) { + cancel(); + } }); return { From 38dcfef4df5d97e151601daddee4908c81964de4 Mon Sep 17 00:00:00 2001 From: Joe Tannenbaum Date: Tue, 20 May 2025 11:04:33 -0400 Subject: [PATCH 2/2] clear data function --- packages/react/src/hooks/use-stream.ts | 11 ++++++++--- packages/react/tests/use-stream.test.ts | 6 ++++++ packages/vue/src/composables/useStream.ts | 5 +++++ packages/vue/tests/useStream.test.ts | 4 ++++ 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/react/src/hooks/use-stream.ts b/packages/react/src/hooks/use-stream.ts index d449a3a..4051d05 100644 --- a/packages/react/src/hooks/use-stream.ts +++ b/packages/react/src/hooks/use-stream.ts @@ -104,6 +104,12 @@ export const useStream = (url: string, options: StreamOptions = {}) => { }); }, [isFetching, isStreaming]); + const clearData = useCallback(() => { + updateStream({ + data: "", + }); + }, []); + const makeRequest = useCallback( (body: Record = {}) => { const controller = new AbortController(); @@ -159,9 +165,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => { const send = useCallback((body: Record) => { cancel(); makeRequest(body); - updateStream({ - data: "", - }); + clearData(); }, []); const read = useCallback( @@ -237,5 +241,6 @@ export const useStream = (url: string, options: StreamOptions = {}) => { id: id.current, send, cancel, + clearData, }; }; diff --git a/packages/react/tests/use-stream.test.ts b/packages/react/tests/use-stream.test.ts index d24daf4..c5cb3d2 100644 --- a/packages/react/tests/use-stream.test.ts +++ b/packages/react/tests/use-stream.test.ts @@ -75,6 +75,12 @@ describe("useStream", () => { expect(result.current.isStreaming).toBe(false); expect(result.current.data).toBe("chunk1chunk2"); + + await act(() => { + result.current.clearData(); + }); + + expect(result.current.data).toBe(""); }); it("can send data back to the endpoint", async () => { diff --git a/packages/vue/src/composables/useStream.ts b/packages/vue/src/composables/useStream.ts index eae7b26..7c91ca8 100644 --- a/packages/vue/src/composables/useStream.ts +++ b/packages/vue/src/composables/useStream.ts @@ -154,6 +154,10 @@ export const useStream = (url: string, options: StreamOptions = {}) => { const send = (body: Record) => { cancel(); makeRequest(body); + clearData(); + }; + + const clearData = () => { updateStream({ data: "", }); @@ -219,5 +223,6 @@ export const useStream = (url: string, options: StreamOptions = {}) => { id, send, cancel, + clearData, }; }; diff --git a/packages/vue/tests/useStream.test.ts b/packages/vue/tests/useStream.test.ts index 9f6f8fb..53e4049 100644 --- a/packages/vue/tests/useStream.test.ts +++ b/packages/vue/tests/useStream.test.ts @@ -87,6 +87,10 @@ describe("useStream", () => { await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); expect(result.data.value).toBe("chunk1chunk2"); + + result.clearData(); + + expect(result.data.value).toBe(""); }); it("can send data to the endpoint", async () => {