Replies: 9 comments 34 replies
-
I'm working on a project where I want to sync server and client state. Here is my feedback on streamedQuery:
|
Beta Was this translation helpful? Give feedback.
-
I wrote something very similar to
My feedback
|
Beta Was this translation helpful? Give feedback.
-
@TkDodo I'm using this to stream AI messages for mobile, I just wanted to say thank you! It was super easy to use and really straightforward. This simplified the code I was working on a lot and works perfect. |
Beta Was this translation helpful? Give feedback.
-
Hi, great to hear react-query can support stream!!! By the way, |
Beta Was this translation helpful? Give feedback.
-
@TkDodo Is it possible to add a parameter configuration to limit the size of the array, and when it exceeds the limit, the earliest data will be removed to avoid accumulating too much data for a long time? Or maybe I just want to display the latest 5 data. Currently, you must manually setQueryData to avoid accumulating too much data and occupying memory. |
Beta Was this translation helpful? Give feedback.
-
A use case/mode that I would like to have is one that replaces the query data rather than appending to it. I want a single object instead of a list of them. I believe it should be built into import type { QueryFunction, QueryFunctionContext, QueryKey } from "@tanstack/react-query";
export function replaceQuery<TData = unknown, TKey extends QueryKey = QueryKey>({
queryFn,
}: {
queryFn: (
ctx: QueryFunctionContext<TKey>
) => AsyncIterable<TData> | Promise<AsyncIterable<TData>>;
}): QueryFunction<TData, TKey> {
return async (ctx) => {
const stream = await queryFn(ctx);
let last: TData | undefined;
for await (const chunk of stream) {
if (ctx.signal.aborted) {
break;
}
last = chunk;
ctx.client.setQueryData<TData>(ctx.queryKey, chunk);
}
if (last !== undefined) return last;
const cached = ctx.client.getQueryData<TData>(ctx.queryKey);
if (cached !== undefined) return cached;
throw new Error("No data emitted and no cached data available");
};
} my use case is with trpc-live and orpc. I could (and probably will) do |
Beta Was this translation helpful? Give feedback.
-
I need the ability to surgically replace specific chunks in the cache. I'm getting back a stream of events, and typically I just want to append them. However some of the events are updates to older entries, and when those come in I want to dump the old entry. It's similar to Docker build output, where there are multiple progress bars going at once:
Subsequent events for the same ID look like this: [
{
"id": "sha256:1593650c75729f64218ae272e8ffff9da7bbba9599bd1815877da99a2651fd9b",
"data": "22.40MB / 23.59MB"
},
{
"id": "sha256:1593650c75729f64218ae272e8ffff9da7bbba9599bd1815877da99a2651fd9b",
"data": "23.00MB / 23.59MB"
},
{
"id": "sha256:1593650c75729f64218ae272e8ffff9da7bbba9599bd1815877da99a2651fd9b",
"data": "23.59MB / 23.59MB"
}
] There can be a lot of events with the same ID, so I don't want to leave them in the cache. ATM I am using a fork of |
Beta Was this translation helpful? Give feedback.
-
I'm having some issues cancelling a running streaming query. Here's some example code. Hitting the stop button does not cancel the query (I still see the API call running in Network). Am I misunderstanding how to use Tanstack here? Sorry if I'm missing something super obvious. import {
QueryClient,
QueryClientProvider,
queryOptions,
useQuery,
} from "@tanstack/react-query";
import { experimental_streamedQuery as streamedQuery } from "@tanstack/react-query";
import { ReactQueryDevtools } from "@tanstack/react-query-devtools";
import { useState } from "react";
const queryClient = new QueryClient();
function StreamingPageInner() {
const [isStreaming, setIsStreaming] = useState(false);
const query = queryOptions({
queryKey: ["blah"],
queryFn: streamedQuery({
queryFn: () => {
return {
async *[Symbol.asyncIterator]() {
const response = await fetch(
"http://localhost:8000/v1/simulation/streaming-example",
{
headers: {
Accept: "text/event-stream",
},
},
);
const reader = response.body?.getReader();
if (!reader) throw new Error("No reader available");
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Convert the Uint8Array to text
const text = new TextDecoder().decode(value);
yield text;
}
} finally {
reader.releaseLock();
}
},
};
},
}),
staleTime: Infinity,
enabled: isStreaming,
});
const handleStreamingToggle = () => {
if (isStreaming) {
// Cancel the ongoing query
console.log("Cancelling query");
queryClient.cancelQueries({ queryKey: ["blah"] });
}
setIsStreaming(!isStreaming);
};
const { data } = useQuery(query);
return (
<QueryClientProvider client={queryClient}>
<div className="p-8">
<h1 className="text-2xl font-bold mb-4">Streaming Text</h1>
<button
onClick={handleStreamingToggle}
className="mb-4 px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 transition-colors"
>
{isStreaming ? "Stop Stream" : "Start Stream"}
</button>
<div className="text-lg text-white">{data}</div>
</div>
</QueryClientProvider>
);
}
function StreamingPagev2() {
return (
<QueryClientProvider client={queryClient}>
<StreamingPageInner />
<ReactQueryDevtools initialIsOpen={false} />
</QueryClientProvider>
);
}
export default StreamingPagev2; Here's a working version that doesn't leverage Tanstack import { useEffect, useState } from "react";
function StreamingPageInner() {
const [isStreaming, setIsStreaming] = useState(false);
const [streamedData, setStreamedData] = useState("");
const [reader, setReader] =
useState<ReadableStreamDefaultReader<Uint8Array> | null>(null);
useEffect(() => {
if (!isStreaming) {
// Clean up the reader when streaming is stopped
if (reader) {
reader.cancel();
setReader(null);
}
return;
}
const startStreaming = async () => {
try {
const response = await fetch(
"http://localhost:8000/v1/simulation/streaming-example",
{
headers: {
Accept: "text/event-stream",
},
},
);
const newReader = response.body?.getReader();
if (!newReader) throw new Error("No reader available");
setReader(newReader);
while (true) {
const { done, value } = await newReader.read();
if (done) break;
const text = new TextDecoder().decode(value);
setStreamedData((prev) => prev + text);
}
} catch (error) {
console.error("Streaming error:", error);
setIsStreaming(false);
}
};
startStreaming();
return () => {
if (reader) {
reader.cancel();
setReader(null);
}
};
}, [isStreaming]);
const handleStreamingToggle = () => {
setIsStreaming(!isStreaming);
if (!isStreaming) {
setStreamedData(""); // Clear previous data when starting new stream
}
};
return (
<div className="p-8">
<h1 className="text-2xl font-bold mb-4">Streaming Text</h1>
<button
onClick={handleStreamingToggle}
className="mb-4 px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 transition-colors"
>
{isStreaming ? "Stop Stream" : "Start Stream"}
</button>
<div className="text-lg text-white">{streamedData}</div>
</div>
);
}
export default function StreamingPage() {
return <StreamingPageInner />;
} |
Beta Was this translation helpful? Give feedback.
-
Feature request Description Example 1 - Video views live const subscribeToVideoViews = ({videoId}) => {
return {
async *[Symbol.asyncIterator]() {
let { resolve, promise } = Promise.withResolvers();
const initialStateMessage = subscribe({
route: `video.${videoId}.views`,
onNext(videoViews) {
resolve(videoViews);
},
});
while (true) {
const videoViews = await promise;
const promiseWithResolves = Promise.withResolvers();
resolve = promiseWithResolves.resolve;
promise = promiseWithResolves.promise;
yield videoViews;
}
},
};
}
queryOptions({
queryKey: ['video', videoId, 'views'],
queryFn: streamedQuery({
queryFn: ({ signal }) =>
subscribeToVideoViews({ videoId, }),
}),
}); Example 2 - Live user updates const subscribeToVideoViews = ({userId, getData}) => {
return {
async *[Symbol.asyncIterator]() {
let { resolve, promise } = Promise.withResolvers();
const initialStateMessage = subscribe({
route: `users.${userId}`,
onNext(userMessage) {
if (userMessage.type === 'initialState') {
resolve(userMessage.payload) // User
} else if (userMessage.type === 'patch') {
resolve({
...getData(),
...userMessage.payload // Partial<User>
})
}
},
});
while (true) {
const user = await promise;
const promiseWithResolves = Promise.withResolvers();
resolve = promiseWithResolves.resolve;
promise = promiseWithResolves.promise;
yield user;
}
},
};
}
queryOptions({
queryKey: ['users', userId],
queryFn: streamedQuery({
queryFn: ({getData}) => // Note getData that helps with updates
subscribeToUser({ userId, getData }),
}),
}); |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
We’ve recently released a new API to support streaming through
AsyncIterable
s. You can read about that in the docs here, and we also have an example set-up here.Since the API is currently marked as
experimental
, we would value your feedback if you can build the things you want to build with that API. Note that it’s notexperimental
because it “doesn’t work”, it’s merely marked asexperimental
so that we can adapt the API to your needs without releasing a new major version.So please let us know if you’ve worked with the
streamedQuery
helper, if you like it, if there’s something missing etc. This will help us mark it asstable
sooner. Thank you 🙏Beta Was this translation helpful? Give feedback.
All reactions