Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions packages/react/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,23 @@ function App() {

When sending data back to the stream, the active connection to the stream is canceled before sending the new data. All requests are sent as JSON `POST` requests.

The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior. The default values for this object are shown below:

```tsx
import { useStream } from "@laravel/stream-react";

function App() {
const { data } = useStream("chat", {
id: undefined,
initialInput: undefined,
headers: undefined,
csrfToken: undefined,
credentials: undefined,
onResponse: (response: Response) => void,
onData: (data: string) => void,
onCancel: () => void,
onFinish: () => void,
onError: (error: Error) => void,
});

return <div>{data}</div>;
}
The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior:

```ts
type StreamOptions = {
id?: string;
initialInput?: Record<string, any>;
headers?: Record<string, string>;
csrfToken?: string;
json?: boolean;
credentials?: RequestCredentials;
onResponse?: (response: Response) => void;
onData?: (data: string) => void;
onCancel?: () => void;
onFinish?: () => void;
onError?: (error: Error) => void;
onBeforeSend?: (request: RequestInit) => boolean | RequestInit | void;
};
```

`onResponse` is triggered after a successful initial response from the stream and the raw [Response](https://developer.mozilla.org/en-US/docs/Web/API/Response) is passed to the callback.
Expand All @@ -78,6 +74,8 @@ function App() {

`onFinish` is called when a stream has finished and when an error is thrown during the fetch/read cycle.

`onBeforeSend` is called right before sending the request to the server and receives the `RequestInit` object as an argument. Returning `false` from this callback cancels the request, returning a [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/RequestInit) object will override the existing `RequestInit` object.

By default, a request is not made the to stream on initialization. You may pass an initial payload to the stream by using the `initialInput` option:

```tsx
Expand Down
23 changes: 16 additions & 7 deletions packages/react/src/hooks/use-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { nanoid } from "nanoid";
import { useCallback, useEffect, useRef, useState } from "react";
import {
addCallbacks,
onBeforeSend,
onCancel,
onData,
onError,
Expand Down Expand Up @@ -81,12 +82,7 @@ export const useStream = <TJsonData = null>(
(body: Record<string, any> = {}) => {
const controller = new AbortController();

updateStream({
isFetching: true,
controller,
});

fetch(url, {
const request: RequestInit = {
method: "POST",
signal: controller.signal,
headers: {
Expand All @@ -95,7 +91,20 @@ export const useStream = <TJsonData = null>(
},
body: JSON.stringify(body),
credentials: options.credentials ?? "same-origin",
})
};

const modifiedRequest = onBeforeSend(id.current, request);

if (modifiedRequest === false) {
return;
}

updateStream({
isFetching: true,
controller,
});

fetch(url, modifiedRequest ?? request)
.then(async (response) => {
if (!response.ok) {
const error = await response.text();
Expand Down
46 changes: 38 additions & 8 deletions packages/react/src/streams/dispatch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RequiredCallbacks, StreamOptions } from "../types";
import { Callback, RequiredCallbacks, StreamOptions } from "../types";

const callbacks = new Map<
string,
Expand All @@ -8,6 +8,7 @@ const callbacks = new Map<
onFinish: RequiredCallbacks["onFinish"][];
onResponse: RequiredCallbacks["onResponse"][];
onCancel: RequiredCallbacks["onCancel"][];
onBeforeSend: RequiredCallbacks["onBeforeSend"][];
}
>();

Expand All @@ -19,6 +20,7 @@ export const addCallbacks = (id: string, options: StreamOptions) => {
onFinish: [],
onResponse: [],
onCancel: [],
onBeforeSend: [],
});
}

Expand All @@ -44,26 +46,28 @@ export const addCallbacks = (id: string, options: StreamOptions) => {
streamCallbacks.onCancel.push(options.onCancel);
}

if (options.onBeforeSend) {
streamCallbacks.onBeforeSend.push(options.onBeforeSend);
}

return () => {
removeCallbacks(id, options);
};
};

const dispatchCallbacks = (
id: string,
callback: "onData" | "onError" | "onFinish" | "onResponse" | "onCancel",
callback: Callback,
...args: unknown[]
) => {
): any[] => {
const streamCallbacks = callbacks.get(id);

if (!streamCallbacks) {
return;
return [];
}

streamCallbacks[callback].forEach((cb) => {
// @ts-expect-error Any args
cb(...args);
});
// @ts-expect-error Any args
return streamCallbacks[callback].map((cb) => cb(...args));
};

export const onFinish = (id: string) => {
Expand All @@ -86,6 +90,26 @@ export const onData = (id: string, data: string) => {
dispatchCallbacks(id, "onData", data);
};

export const onBeforeSend = (id: string, request: RequestInit) => {
const results = dispatchCallbacks(id, "onBeforeSend", request) as (
| boolean
| RequestInit
| void
)[];

for (const result of results) {
if (result === false) {
return false;
}

if (result !== null && typeof result === "object") {
return result;
}
}

return null;
};

export const removeCallbacks = (id: string, options: StreamOptions) => {
const streamCallbacks = callbacks.get(id);

Expand Down Expand Up @@ -122,4 +146,10 @@ export const removeCallbacks = (id: string, options: StreamOptions) => {
(cb) => cb !== options.onCancel,
);
}

if (options.onBeforeSend) {
streamCallbacks.onBeforeSend = streamCallbacks.onBeforeSend.filter(
(cb) => cb !== options.onBeforeSend,
);
}
};
16 changes: 10 additions & 6 deletions packages/react/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ export type StreamOptions = {
onCancel?: () => void;
onFinish?: () => void;
onError?: (error: Error) => void;
onBeforeSend?: (request: RequestInit) => RequestInit | boolean | void;
};

export type RequiredCallbacks = Required<
Pick<
StreamOptions,
"onData" | "onError" | "onFinish" | "onResponse" | "onCancel"
>
>;
export type Callback =
| "onData"
| "onError"
| "onFinish"
| "onResponse"
| "onCancel"
| "onBeforeSend";

export type RequiredCallbacks = Required<Pick<StreamOptions, Callback>>;

export type StreamMeta<TJsonData = null> = {
controller: AbortController;
Expand Down
78 changes: 78 additions & 0 deletions packages/react/tests/use-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,84 @@ describe("useStream", () => {
expect(onFinish).toHaveBeenCalled();
});

it("will trigger the onBeforeSend callback", async () => {
const payload = { test: "data" };
const onBeforeSend = vi.fn();

const { result } = renderHook(() =>
useStream(url, {
onBeforeSend,
}),
);

act(() => {
result.current.send(payload);
});

await waitFor(() => expect(result.current.isStreaming).toBe(true));
await waitFor(() => expect(result.current.isStreaming).toBe(false));

expect(onBeforeSend).toHaveBeenCalled();
});

it("can cancel via the onBeforeSend callback", async () => {
const payload = { test: "data" };
const onBeforeSend = vi.fn(() => false);
let requested = false;

server.use(
http.post(url, async () => {
requested = true;
return response();
}),
);

const { result } = renderHook(() =>
useStream(url, {
onBeforeSend,
}),
);

act(() => {
result.current.send(payload);
});

expect(onBeforeSend).toHaveBeenCalled();
expect(requested).toBe(false);
});

it("can modify the request via the onBeforeSend callback", async () => {
const payload = { test: "data" };
const onBeforeSend = vi.fn((request) => ({
...request,
body: JSON.stringify({ modified: true }),
}));
let capturedBody;

server.use(
http.post(url, async ({ request }) => {
capturedBody = await request.json();
return response();
}),
);

const { result } = renderHook(() =>
useStream(url, {
onBeforeSend,
}),
);

act(() => {
result.current.send(payload);
});

await waitFor(() => expect(result.current.isStreaming).toBe(true));
await waitFor(() => expect(result.current.isStreaming).toBe(false));

expect(onBeforeSend).toHaveBeenCalled();
expect(capturedBody).toEqual({ modified: true });
});

it("will trigger the onData callback", async () => {
const payload = { test: "data" };
const onData = vi.fn();
Expand Down
42 changes: 19 additions & 23 deletions packages/vue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,23 @@ const sendMessage = () => {

When sending data back to the stream, the active connection to the stream is canceled before sending the new data. All requests are sent as JSON `POST` requests.

The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior. The default values for this object are shown below:

```vue
<script setup lang="ts">
import { useStream } from "@laravel/stream-vue";

const { data } = useStream("chat", {
id: undefined,
initialInput: undefined,
headers: undefined,
csrfToken: undefined,
credentials: undefined,
onResponse: (response: Response) => void,
onData: (data: string) => void,
onCancel: () => void,
onFinish: () => void,
onError: (error: Error) => void,
});
</script>

<template>
<div>{{ data }}</div>
</template>
The second argument given to `useStream` is an options object that you may use to customize the stream consumption behavior:

```ts
type StreamOptions = {
id?: string;
initialInput?: Record<string, any>;
headers?: Record<string, string>;
csrfToken?: string;
json?: boolean;
credentials?: RequestCredentials;
onResponse?: (response: Response) => void;
onData?: (data: string) => void;
onCancel?: () => void;
onFinish?: () => void;
onError?: (error: Error) => void;
onBeforeSend?: (request: RequestInit) => boolean | RequestInit | void;
};
```

`onResponse` is triggered after a successful initial response from the stream and the raw [Response](https://developer.mozilla.org/en-US/docs/Web/API/Response) is passed to the callback.
Expand All @@ -80,6 +74,8 @@ const { data } = useStream("chat", {

`onFinish` is called when a stream has finished and when an error is thrown during the fetch/read cycle.

`onBeforeSend` is called right before sending the request to the server and receives the `RequestInit` object as an argument. Returning `false` from this callback cancels the request, returning a [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/RequestInit) object will override the existing `RequestInit` object.

By default, a request is not made the to stream on initialization. You may pass an initial payload to the stream by using the `initialInput` option:

```vue
Expand Down
Loading