diff --git a/packages/react/README.md b/packages/react/README.md index d672e16..a7b866f 100644 --- a/packages/react/README.md +++ b/packages/react/README.md @@ -49,27 +49,23 @@ function App() { When sending data back to the stream, the active connection to the stream is canceled before sending the new data. All requests are sent as JSON `POST` requests. -The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior. The default values for this object are shown below: - -```tsx -import { useStream } from "@laravel/stream-react"; - -function App() { - const { data } = useStream("chat", { - id: undefined, - initialInput: undefined, - headers: undefined, - csrfToken: undefined, - credentials: undefined, - onResponse: (response: Response) => void, - onData: (data: string) => void, - onCancel: () => void, - onFinish: () => void, - onError: (error: Error) => void, - }); - - return
{data}
; -} +The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior: + +```ts +type StreamOptions = { + id?: string; + initialInput?: Record; + headers?: Record; + csrfToken?: string; + json?: boolean; + credentials?: RequestCredentials; + onResponse?: (response: Response) => void; + onData?: (data: string) => void; + onCancel?: () => void; + onFinish?: () => void; + onError?: (error: Error) => void; + onBeforeSend?: (request: RequestInit) => boolean | RequestInit | void; +}; ``` `onResponse` is triggered after a successful initial response from the stream and the raw [Response](https://developer.mozilla.org/en-US/docs/Web/API/Response) is passed to the callback. @@ -78,6 +74,8 @@ function App() { `onFinish` is called when a stream has finished and when an error is thrown during the fetch/read cycle. +`onBeforeSend` is called right before sending the request to the server and receives the `RequestInit` object as an argument. Returning `false` from this callback cancels the request, returning a [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/RequestInit) object will override the existing `RequestInit` object. + By default, a request is not made the to stream on initialization. You may pass an initial payload to the stream by using the `initialInput` option: ```tsx diff --git a/packages/react/src/hooks/use-stream.ts b/packages/react/src/hooks/use-stream.ts index f2527d0..b21214a 100644 --- a/packages/react/src/hooks/use-stream.ts +++ b/packages/react/src/hooks/use-stream.ts @@ -2,6 +2,7 @@ import { nanoid } from "nanoid"; import { useCallback, useEffect, useRef, useState } from "react"; import { addCallbacks, + onBeforeSend, onCancel, onData, onError, @@ -81,12 +82,7 @@ export const useStream = ( (body: Record = {}) => { const controller = new AbortController(); - updateStream({ - isFetching: true, - controller, - }); - - fetch(url, { + const request: RequestInit = { method: "POST", signal: controller.signal, headers: { @@ -95,7 +91,20 @@ export const useStream = ( }, body: JSON.stringify(body), credentials: options.credentials ?? "same-origin", - }) + }; + + const modifiedRequest = onBeforeSend(id.current, request); + + if (modifiedRequest === false) { + return; + } + + updateStream({ + isFetching: true, + controller, + }); + + fetch(url, modifiedRequest ?? request) .then(async (response) => { if (!response.ok) { const error = await response.text(); diff --git a/packages/react/src/streams/dispatch.ts b/packages/react/src/streams/dispatch.ts index 255d99e..ef460f4 100644 --- a/packages/react/src/streams/dispatch.ts +++ b/packages/react/src/streams/dispatch.ts @@ -1,4 +1,4 @@ -import { RequiredCallbacks, StreamOptions } from "../types"; +import { Callback, RequiredCallbacks, StreamOptions } from "../types"; const callbacks = new Map< string, @@ -8,6 +8,7 @@ const callbacks = new Map< onFinish: RequiredCallbacks["onFinish"][]; onResponse: RequiredCallbacks["onResponse"][]; onCancel: RequiredCallbacks["onCancel"][]; + onBeforeSend: RequiredCallbacks["onBeforeSend"][]; } >(); @@ -19,6 +20,7 @@ export const addCallbacks = (id: string, options: StreamOptions) => { onFinish: [], onResponse: [], onCancel: [], + onBeforeSend: [], }); } @@ -44,6 +46,10 @@ export const addCallbacks = (id: string, options: StreamOptions) => { streamCallbacks.onCancel.push(options.onCancel); } + if (options.onBeforeSend) { + streamCallbacks.onBeforeSend.push(options.onBeforeSend); + } + return () => { removeCallbacks(id, options); }; @@ -51,19 +57,17 @@ export const addCallbacks = (id: string, options: StreamOptions) => { const dispatchCallbacks = ( id: string, - callback: "onData" | "onError" | "onFinish" | "onResponse" | "onCancel", + callback: Callback, ...args: unknown[] -) => { +): any[] => { const streamCallbacks = callbacks.get(id); if (!streamCallbacks) { - return; + return []; } - streamCallbacks[callback].forEach((cb) => { - // @ts-expect-error Any args - cb(...args); - }); + // @ts-expect-error Any args + return streamCallbacks[callback].map((cb) => cb(...args)); }; export const onFinish = (id: string) => { @@ -86,6 +90,26 @@ export const onData = (id: string, data: string) => { dispatchCallbacks(id, "onData", data); }; +export const onBeforeSend = (id: string, request: RequestInit) => { + const results = dispatchCallbacks(id, "onBeforeSend", request) as ( + | boolean + | RequestInit + | void + )[]; + + for (const result of results) { + if (result === false) { + return false; + } + + if (result !== null && typeof result === "object") { + return result; + } + } + + return null; +}; + export const removeCallbacks = (id: string, options: StreamOptions) => { const streamCallbacks = callbacks.get(id); @@ -122,4 +146,10 @@ export const removeCallbacks = (id: string, options: StreamOptions) => { (cb) => cb !== options.onCancel, ); } + + if (options.onBeforeSend) { + streamCallbacks.onBeforeSend = streamCallbacks.onBeforeSend.filter( + (cb) => cb !== options.onBeforeSend, + ); + } }; diff --git a/packages/react/src/types.ts b/packages/react/src/types.ts index 1cb2473..2f66513 100644 --- a/packages/react/src/types.ts +++ b/packages/react/src/types.ts @@ -27,14 +27,18 @@ export type StreamOptions = { onCancel?: () => void; onFinish?: () => void; onError?: (error: Error) => void; + onBeforeSend?: (request: RequestInit) => RequestInit | boolean | void; }; -export type RequiredCallbacks = Required< - Pick< - StreamOptions, - "onData" | "onError" | "onFinish" | "onResponse" | "onCancel" - > ->; +export type Callback = + | "onData" + | "onError" + | "onFinish" + | "onResponse" + | "onCancel" + | "onBeforeSend"; + +export type RequiredCallbacks = Required>; export type StreamMeta = { controller: AbortController; diff --git a/packages/react/tests/use-stream.test.ts b/packages/react/tests/use-stream.test.ts index 80b7d92..26ea574 100644 --- a/packages/react/tests/use-stream.test.ts +++ b/packages/react/tests/use-stream.test.ts @@ -150,6 +150,84 @@ describe("useStream", () => { expect(onFinish).toHaveBeenCalled(); }); + it("will trigger the onBeforeSend callback", async () => { + const payload = { test: "data" }; + const onBeforeSend = vi.fn(); + + const { result } = renderHook(() => + useStream(url, { + onBeforeSend, + }), + ); + + act(() => { + result.current.send(payload); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(onBeforeSend).toHaveBeenCalled(); + }); + + it("can cancel via the onBeforeSend callback", async () => { + const payload = { test: "data" }; + const onBeforeSend = vi.fn(() => false); + let requested = false; + + server.use( + http.post(url, async () => { + requested = true; + return response(); + }), + ); + + const { result } = renderHook(() => + useStream(url, { + onBeforeSend, + }), + ); + + act(() => { + result.current.send(payload); + }); + + expect(onBeforeSend).toHaveBeenCalled(); + expect(requested).toBe(false); + }); + + it("can modify the request via the onBeforeSend callback", async () => { + const payload = { test: "data" }; + const onBeforeSend = vi.fn((request) => ({ + ...request, + body: JSON.stringify({ modified: true }), + })); + let capturedBody; + + server.use( + http.post(url, async ({ request }) => { + capturedBody = await request.json(); + return response(); + }), + ); + + const { result } = renderHook(() => + useStream(url, { + onBeforeSend, + }), + ); + + act(() => { + result.current.send(payload); + }); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(onBeforeSend).toHaveBeenCalled(); + expect(capturedBody).toEqual({ modified: true }); + }); + it("will trigger the onData callback", async () => { const payload = { test: "data" }; const onData = vi.fn(); diff --git a/packages/vue/README.md b/packages/vue/README.md index 780e38c..6a4c0e4 100644 --- a/packages/vue/README.md +++ b/packages/vue/README.md @@ -49,29 +49,23 @@ const sendMessage = () => { When sending data back to the stream, the active connection to the stream is canceled before sending the new data. All requests are sent as JSON `POST` requests. -The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior. The default values for this object are shown below: - -```vue - - - +The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior: + +```ts +type StreamOptions = { + id?: string; + initialInput?: Record; + headers?: Record; + csrfToken?: string; + json?: boolean; + credentials?: RequestCredentials; + onResponse?: (response: Response) => void; + onData?: (data: string) => void; + onCancel?: () => void; + onFinish?: () => void; + onError?: (error: Error) => void; + onBeforeSend?: (request: RequestInit) => boolean | RequestInit | void; +}; ``` `onResponse` is triggered after a successful initial response from the stream and the raw [Response](https://developer.mozilla.org/en-US/docs/Web/API/Response) is passed to the callback. @@ -80,6 +74,8 @@ const { data } = useStream("chat", { `onFinish` is called when a stream has finished and when an error is thrown during the fetch/read cycle. +`onBeforeSend` is called right before sending the request to the server and receives the `RequestInit` object as an argument. Returning `false` from this callback cancels the request, returning a [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/RequestInit) object will override the existing `RequestInit` object. + By default, a request is not made the to stream on initialization. You may pass an initial payload to the stream by using the `initialInput` option: ```vue diff --git a/packages/vue/src/composables/useStream.ts b/packages/vue/src/composables/useStream.ts index 3bb3fab..54b6e5e 100644 --- a/packages/vue/src/composables/useStream.ts +++ b/packages/vue/src/composables/useStream.ts @@ -2,6 +2,7 @@ import { nanoid } from "nanoid"; import { onMounted, onUnmounted, readonly, Ref, ref } from "vue"; import { addCallbacks, + onBeforeSend, onCancel, onData, onError, @@ -78,12 +79,7 @@ export const useStream = ( const makeRequest = (body: Record = {}) => { const controller = new AbortController(); - updateStream({ - isFetching: true, - controller, - }); - - fetch(url, { + const request: RequestInit = { method: "POST", signal: controller.signal, headers: { @@ -92,7 +88,20 @@ export const useStream = ( }, body: JSON.stringify(body), credentials: options.credentials ?? "same-origin", - }) + }; + + const modifiedRequest = onBeforeSend(id, request); + + if (modifiedRequest === false) { + return; + } + + updateStream({ + isFetching: true, + controller, + }); + + fetch(url, modifiedRequest ?? request) .then(async (response) => { if (!response.ok) { const error = await response.text(); diff --git a/packages/vue/src/streams/dispatch.ts b/packages/vue/src/streams/dispatch.ts index 255d99e..ef460f4 100644 --- a/packages/vue/src/streams/dispatch.ts +++ b/packages/vue/src/streams/dispatch.ts @@ -1,4 +1,4 @@ -import { RequiredCallbacks, StreamOptions } from "../types"; +import { Callback, RequiredCallbacks, StreamOptions } from "../types"; const callbacks = new Map< string, @@ -8,6 +8,7 @@ const callbacks = new Map< onFinish: RequiredCallbacks["onFinish"][]; onResponse: RequiredCallbacks["onResponse"][]; onCancel: RequiredCallbacks["onCancel"][]; + onBeforeSend: RequiredCallbacks["onBeforeSend"][]; } >(); @@ -19,6 +20,7 @@ export const addCallbacks = (id: string, options: StreamOptions) => { onFinish: [], onResponse: [], onCancel: [], + onBeforeSend: [], }); } @@ -44,6 +46,10 @@ export const addCallbacks = (id: string, options: StreamOptions) => { streamCallbacks.onCancel.push(options.onCancel); } + if (options.onBeforeSend) { + streamCallbacks.onBeforeSend.push(options.onBeforeSend); + } + return () => { removeCallbacks(id, options); }; @@ -51,19 +57,17 @@ export const addCallbacks = (id: string, options: StreamOptions) => { const dispatchCallbacks = ( id: string, - callback: "onData" | "onError" | "onFinish" | "onResponse" | "onCancel", + callback: Callback, ...args: unknown[] -) => { +): any[] => { const streamCallbacks = callbacks.get(id); if (!streamCallbacks) { - return; + return []; } - streamCallbacks[callback].forEach((cb) => { - // @ts-expect-error Any args - cb(...args); - }); + // @ts-expect-error Any args + return streamCallbacks[callback].map((cb) => cb(...args)); }; export const onFinish = (id: string) => { @@ -86,6 +90,26 @@ export const onData = (id: string, data: string) => { dispatchCallbacks(id, "onData", data); }; +export const onBeforeSend = (id: string, request: RequestInit) => { + const results = dispatchCallbacks(id, "onBeforeSend", request) as ( + | boolean + | RequestInit + | void + )[]; + + for (const result of results) { + if (result === false) { + return false; + } + + if (result !== null && typeof result === "object") { + return result; + } + } + + return null; +}; + export const removeCallbacks = (id: string, options: StreamOptions) => { const streamCallbacks = callbacks.get(id); @@ -122,4 +146,10 @@ export const removeCallbacks = (id: string, options: StreamOptions) => { (cb) => cb !== options.onCancel, ); } + + if (options.onBeforeSend) { + streamCallbacks.onBeforeSend = streamCallbacks.onBeforeSend.filter( + (cb) => cb !== options.onBeforeSend, + ); + } }; diff --git a/packages/vue/src/types.ts b/packages/vue/src/types.ts index 61e460f..d34d9f8 100644 --- a/packages/vue/src/types.ts +++ b/packages/vue/src/types.ts @@ -29,14 +29,18 @@ export type StreamOptions = { onCancel?: () => void; onFinish?: () => void; onError?: (error: Error) => void; + onBeforeSend?: (request: RequestInit) => boolean | RequestInit | void; }; -export type RequiredCallbacks = Required< - Pick< - StreamOptions, - "onData" | "onError" | "onFinish" | "onResponse" | "onCancel" - > ->; +export type Callback = + | "onData" + | "onError" + | "onFinish" + | "onResponse" + | "onCancel" + | "onBeforeSend"; + +export type RequiredCallbacks = Required>; export type StreamMeta = { controller: AbortController; diff --git a/packages/vue/tests/useStream.test.ts b/packages/vue/tests/useStream.test.ts index a95f31e..08ad2cb 100644 --- a/packages/vue/tests/useStream.test.ts +++ b/packages/vue/tests/useStream.test.ts @@ -141,6 +141,64 @@ describe("useStream", () => { expect(onFinish).toHaveBeenCalled(); }); + it("triggers onBeforeSend callback", async () => { + const onBeforeSend = vi.fn(); + + const [result] = withSetup(() => useStream(url, { onBeforeSend })); + + result.send({ test: "data" }); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(onBeforeSend).toHaveBeenCalled(); + }); + + it("can cancel a call via onBeforeSend callback", async () => { + const onBeforeSend = vi.fn(() => false); + let requested = false; + + server.use( + http.post(url, async () => { + requested = true; + return response(); + }), + ); + + const [result] = withSetup(() => useStream(url, { onBeforeSend })); + + result.send({ test: "data" }); + + expect(onBeforeSend).toHaveBeenCalled(); + expect(requested).toBe(false); + }); + + it("can modify a request via onBeforeSend callback", async () => { + const onBeforeSend = vi.fn((request) => ({ + ...request, + body: JSON.stringify({ modified: true }), + })); + + let capturedBody; + + server.use( + http.post(url, async ({ request }) => { + capturedBody = await request.json(); + return response(); + }), + ); + + const [result] = withSetup(() => useStream(url, { onBeforeSend })); + + result.send({ test: "data" }); + + await vi.waitFor(() => expect(result.isStreaming.value).toBe(true)); + await vi.waitFor(() => expect(result.isStreaming.value).toBe(false)); + + expect(onBeforeSend).toHaveBeenCalled(); + expect(capturedBody).toEqual({ modified: true }); + }); + it("triggers onData callback with chunks", async () => { const onData = vi.fn();