Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/react/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,41 @@ function StreamStatus({ id }) {
}
```

The `useJsonStream` hook is identical to the `useStream` hook except that it will attempt to parse the data as JSON once it has finished streaming:

```tsx
import { useJsonStream } from "@laravel/stream-react";

type User = {
id: number;
name: string;
email: string;
};

function App() {
const { data, send } = useJsonStream<{ users: User[] }>("users");

const loadUsers = () => {
send({
query: "taylor",
});
};

return (
<div>
<ul>
{data?.users.map((user) => (
<li>
{user.id}: {user.name}
</li>
))}
</ul>
<button onClick={loadUsers}>Load Users</button>
</div>
);
}
```

## Event Streams (SSE)

The `useEventStream` hook allows you to seamlessly consume [Server-Sent Events (SSE)](https://laravel.com/docs/responses#event-streams) in your React application.
Expand Down
96 changes: 67 additions & 29 deletions packages/react/src/hooks/use-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@ import { nanoid } from "nanoid";
import { useCallback, useEffect, useRef, useState } from "react";
import { StreamListenerCallback, StreamMeta, StreamOptions } from "../types";

const streams = new Map<string, StreamMeta>();
const streams = new Map<string, StreamMeta<unknown>>();
const listeners = new Map<string, StreamListenerCallback[]>();

const resolveStream = (id: string): StreamMeta => {
const stream = streams.get(id);
const resolveStream = <TJsonData = null>(id: string): StreamMeta<TJsonData> => {
const stream = streams.get(id) as StreamMeta<TJsonData> | undefined;

if (stream) {
return stream;
}

streams.set(id, {
const newStream: StreamMeta<TJsonData> = {
controller: new AbortController(),
data: "",
isFetching: false,
isStreaming: false,
});
jsonData: null as TJsonData,
};

return streams.get(id)!;
streams.set(id, newStream);

return newStream;
};

const resolveListener = (id: string) => {
Expand Down Expand Up @@ -50,9 +53,12 @@ const addListener = (id: string, listener: StreamListenerCallback) => {
};
};

export const useStream = (url: string, options: StreamOptions = {}) => {
export const useStream = <TJsonData = null>(
url: string,
options: StreamOptions = {},
) => {
const id = useRef<string>(options.id ?? nanoid());
const stream = useRef(resolveStream(id.current));
const stream = useRef(resolveStream<TJsonData>(id.current));
const headers = useRef<HeadersInit>(
(() => {
const headers: HeadersInit = {
Expand All @@ -75,21 +81,27 @@ export const useStream = (url: string, options: StreamOptions = {}) => {
);

const [data, setData] = useState<string>(stream.current.data);
const [jsonData, setJsonData] = useState<TJsonData | null>(
stream.current.jsonData,
);
const [isFetching, setIsFetching] = useState(stream.current.isFetching);
const [isStreaming, setIsStreaming] = useState(stream.current.isStreaming);

const updateStream = useCallback((params: Partial<StreamMeta>) => {
streams.set(id.current, {
...resolveStream(id.current),
...params,
});
const updateStream = useCallback(
(params: Partial<StreamMeta<TJsonData>>) => {
streams.set(id.current, {
...resolveStream(id.current),
...params,
});

const updatedStream = resolveStream(id.current);
const updatedStream = resolveStream(id.current);

listeners
.get(id.current)
?.forEach((listener) => listener(updatedStream));
}, []);
listeners
.get(id.current)
?.forEach((listener) => listener(updatedStream));
},
[],
);

const cancel = useCallback(() => {
stream.current.controller.abort();
Expand All @@ -107,6 +119,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => {
const clearData = useCallback(() => {
updateStream({
data: "",
jsonData: null,
});
}, []);

Expand Down Expand Up @@ -179,22 +192,33 @@ export const useStream = (url: string, options: StreamOptions = {}) => {

options.onData?.(incomingStr);

if (done) {
updateStream({
data: newData,
isStreaming: false,
});
const streamParams: Partial<StreamMeta<TJsonData>> = {
data: newData,
};

options.onFinish?.();
if (!done) {
updateStream(streamParams);

return "";
return read(reader, newData);
}

updateStream({
data: newData,
});
streamParams.isStreaming = false;

if (options.json) {
try {
streamParams.jsonData = JSON.parse(
newData,
) as TJsonData;
} catch (error) {
options.onError?.(error as Error);
}
}

return read(reader, newData);
updateStream(streamParams);

options.onFinish?.();

return "";
});
},
[],
Expand All @@ -208,6 +232,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => {
setIsFetching(streamUpdate.isFetching);
setIsStreaming(streamUpdate.isStreaming);
setData(streamUpdate.data);
setJsonData(streamUpdate.jsonData);
},
);

Expand Down Expand Up @@ -236,6 +261,7 @@ export const useStream = (url: string, options: StreamOptions = {}) => {

return {
data,
jsonData,
isFetching,
isStreaming,
id: id.current,
Expand All @@ -244,3 +270,15 @@ export const useStream = (url: string, options: StreamOptions = {}) => {
clearData,
};
};

export const useJsonStream = <TJsonData = null>(
url: string,
options: Omit<StreamOptions, "json"> = {},
) => {
const { jsonData, data, ...rest } = useStream<TJsonData>(url, {
...options,
json: true,
});

return { data: jsonData, rawData: data, ...rest };
};
2 changes: 1 addition & 1 deletion packages/react/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { useEventStream } from "./hooks/use-event-stream";
export { useStream } from "./hooks/use-stream";
export { useJsonStream, useStream } from "./hooks/use-stream";
4 changes: 3 additions & 1 deletion packages/react/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ export type StreamOptions = {
initialInput?: Record<string, any>;
headers?: Record<string, string>;
csrfToken?: string;
json?: boolean;
onResponse?: (response: Response) => void;
onData?: (data: string) => void;
onCancel?: () => void;
onFinish?: () => void;
onError?: (error: Error) => void;
};

export type StreamMeta = {
export type StreamMeta<TJsonData = null> = {
controller: AbortController;
data: string;
isFetching: boolean;
isStreaming: boolean;
jsonData: TJsonData | null;
};

export type StreamListenerCallback = (stream: StreamMeta) => void;
Loading