;
+```
+
+### Using Defined Streams in Tasks
+
+Once defined, you can use all stream methods on your defined stream:
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { aiStream } from "./streams";
+
+export const streamTask = task({
+ id: "stream-task",
+ run: async (payload: { prompt: string }) => {
+ // Get a stream from an AI service, database, etc.
+ const stream = await getAIStream(payload.prompt);
+
+ // Pipe the stream using your defined stream
+ const { stream: readableStream, waitUntilComplete } = aiStream.pipe(stream);
+
+ // Option A: Iterate over the stream locally
+ for await (const chunk of readableStream) {
+ console.log("Received chunk:", chunk);
+ }
+
+ // Option B: Wait for the stream to complete
+ await waitUntilComplete();
+
+ return { message: "Stream completed" };
+ },
+});
+```
+
+#### Reading from a Stream
+
+Use the defined stream's `read()` method to consume data from anywhere (frontend, backend, or another task):
+
+```ts
+import { aiStream } from "./streams";
+
+const stream = await aiStream.read(runId);
+
+for await (const chunk of stream) {
+ console.log(chunk); // chunk is typed as the stream's chunk type
+}
+```
+
+With options:
+
+```ts
+const stream = await aiStream.read(runId, {
+ timeoutInSeconds: 60, // Stop if no data for 60 seconds
+ startIndex: 10, // Start from the 10th chunk
+});
+```
+
+#### Appending to a Stream
+
+Use the defined stream's `append()` method to add a single chunk:
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { aiStream, progressStream, logStream } from "./streams";
+
+export const appendTask = task({
+ id: "append-task",
+ run: async (payload) => {
+ // Append to different streams with full type safety
+ await logStream.append("Processing started");
+ await progressStream.append({ step: "Initialization", percent: 0 });
+
+ // Do some work...
+
+ await progressStream.append({ step: "Processing", percent: 50 });
+ await logStream.append("Step 1 complete");
+
+ // Do more work...
+
+ await progressStream.append({ step: "Complete", percent: 100 });
+ await logStream.append("All steps complete");
+ },
+});
+```
+
+#### Writing Multiple Chunks
+
+Use the defined stream's `writer()` method for more complex stream writing:
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { logStream } from "./streams";
+
+export const writerTask = task({
+ id: "writer-task",
+ run: async (payload) => {
+ const { waitUntilComplete } = logStream.writer({
+ execute: ({ write, merge }) => {
+ // Write individual chunks
+ write("Chunk 1");
+ write("Chunk 2");
+
+ // Merge another stream
+ const additionalStream = ReadableStream.from(["Chunk 3", "Chunk 4", "Chunk 5"]);
+ merge(additionalStream);
+ },
+ });
+
+ await waitUntilComplete();
+ },
+});
+```
+
+### Using Defined Streams in React
+
+Defined streams work seamlessly with the `useRealtimeStream` hook:
+
+```tsx
+"use client";
+
+import { useRealtimeStream } from "@trigger.dev/react-hooks";
+import { aiStream } from "@/app/streams";
+
+export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
+ // Pass the defined stream directly - full type safety!
+ const { parts, error } = useRealtimeStream(aiStream, runId, {
+ accessToken,
+ timeoutInSeconds: 600,
+ });
+
+ if (error) return Error: {error.message}
;
+ if (!parts) return Loading...
;
+
+ return (
+
+ {parts.map((part, i) => (
+ {part}
+ ))}
+
+ );
+}
+```
+
+## Direct Stream Methods (Without Defining)
+
+
+ We strongly recommend using `streams.define()` instead of direct methods. Defined streams provide
+ better organization, full type safety, and make it easier to maintain your codebase as it grows.
+
+
+If you have a specific reason to avoid defined streams, you can use stream methods directly by specifying the stream key each time.
+
+### Direct Piping
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const directStreamTask = task({
+ id: "direct-stream",
+ run: async (payload: { prompt: string }) => {
+ const stream = await getAIStream(payload.prompt);
+
+ // Specify the stream key directly
+ const { stream: readableStream, waitUntilComplete } = streams.pipe("ai-output", stream);
+
+ await waitUntilComplete();
+ },
+});
+```
+
+### Direct Reading
+
+```ts
+import { streams } from "@trigger.dev/sdk";
+
+// Specify the stream key when reading
+const stream = await streams.read(runId, "ai-output");
+
+for await (const chunk of stream) {
+ console.log(chunk);
+}
+```
+
+### Direct Appending
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const directAppendTask = task({
+ id: "direct-append",
+ run: async (payload) => {
+ // Specify the stream key each time
+ await streams.append("logs", "Processing started");
+ await streams.append("progress", "50%");
+ await streams.append("logs", "Complete");
+ },
+});
+```
+
+### Direct Writing
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const directWriterTask = task({
+ id: "direct-writer",
+ run: async (payload) => {
+ const { waitUntilComplete } = streams.writer("output", {
+ execute: ({ write, merge }) => {
+ write("Chunk 1");
+ write("Chunk 2");
+ },
+ });
+
+ await waitUntilComplete();
+ },
+});
+```
+
+## Default Stream
+
+Every run has a "default" stream, allowing you to skip the stream key entirely. This is useful for simple cases where you only need one stream per run.
+
+Using direct methods:
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const defaultStreamTask = task({
+ id: "default-stream",
+ run: async (payload) => {
+ const stream = getDataStream();
+
+ // No stream key needed - uses "default"
+ const { waitUntilComplete } = streams.pipe(stream);
+
+ await waitUntilComplete();
+ },
+});
+
+// Reading from the default stream
+const readStream = await streams.read(runId);
+```
+
+## Targeting Different Runs
+
+You can pipe streams to parent, root, or any other run using the `target` option. This works with both defined streams and direct methods.
+
+### With Defined Streams
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { logStream } from "./streams";
+
+export const childTask = task({
+ id: "child-task",
+ run: async (payload, { ctx }) => {
+ const stream = getDataStream();
+
+ // Pipe to parent run
+ logStream.pipe(stream, { target: "parent" });
+
+ // Pipe to root run
+ logStream.pipe(stream, { target: "root" });
+
+ // Pipe to self (default behavior)
+ logStream.pipe(stream, { target: "self" });
+
+ // Pipe to a specific run ID
+ logStream.pipe(stream, { target: payload.otherRunId });
+ },
+});
+```
+
+### With Direct Methods
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const childTask = task({
+ id: "child-task",
+ run: async (payload, { ctx }) => {
+ const stream = getDataStream();
+
+ // Pipe to parent run
+ streams.pipe("output", stream, { target: "parent" });
+
+ // Pipe to root run
+ streams.pipe("output", stream, { target: "root" });
+
+ // Pipe to a specific run ID
+ streams.pipe("output", stream, { target: payload.otherRunId });
+ },
+});
+```
+
+## Streaming from Outside a Task
+
+If you specify a `target` run ID, you can pipe streams from anywhere (like a Next.js API route):
+
+```ts
+import { streams } from "@trigger.dev/sdk";
+import { openai } from "@ai-sdk/openai";
+import { streamText } from "ai";
+
+export async function POST(req: Request) {
+ const { messages, runId } = await req.json();
+
+ const result = streamText({
+ model: openai("gpt-4o"),
+ messages,
+ });
+
+ // Pipe AI stream to a Trigger.dev run
+ const { stream } = streams.pipe("ai-stream", result.toUIMessageStream(), {
+ target: runId,
+ });
+
+ return new Response(stream as any, {
+ headers: { "Content-Type": "text/event-stream" },
+ });
+}
+```
+
+## React Hook
+
+Use the `useRealtimeStream` hook to subscribe to streams in your React components.
+
+### With Defined Streams (Recommended)
+
+```tsx
+"use client";
+
+import { useRealtimeStream } from "@trigger.dev/react-hooks";
+import { aiStream } from "@/app/streams";
+
+export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
+ // Pass the defined stream directly for full type safety
+ const { parts, error } = useRealtimeStream(aiStream, runId, {
+ accessToken,
+ timeoutInSeconds: 600,
+ onData: (chunk) => {
+ console.log("New chunk:", chunk); // chunk is typed!
+ },
+ });
+
+ if (error) return Error: {error.message}
;
+ if (!parts) return Loading...
;
+
+ return (
+
+ {parts.map((part, i) => (
+ {part}
+ ))}
+
+ );
+}
+```
+
+### With Direct Stream Keys
+
+If you prefer not to use defined streams, you can specify the stream key directly:
+
+```tsx
+"use client";
+
+import { useRealtimeStream } from "@trigger.dev/react-hooks";
+
+export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
+ const { parts, error } = useRealtimeStream(runId, "ai-output", {
+ accessToken,
+ timeoutInSeconds: 600,
+ });
+
+ if (error) return Error: {error.message}
;
+ if (!parts) return Loading...
;
+
+ return (
+
+ {parts.map((part, i) => (
+ {part}
+ ))}
+
+ );
+}
+```
+
+### Using Default Stream
+
+```tsx
+// Omit stream key to use the default stream
+const { parts, error } = useRealtimeStream(runId, {
+ accessToken,
+});
+```
+
+### Hook Options
+
+```tsx
+const { parts, error } = useRealtimeStream(streamDef, runId, {
+ accessToken: "pk_...", // Required: Public access token
+ baseURL: "https://api.trigger.dev", // Optional: Custom API URL
+ timeoutInSeconds: 60, // Optional: Timeout (default: 60)
+ startIndex: 0, // Optional: Start from specific chunk
+ throttleInMs: 16, // Optional: Throttle updates (default: 16ms)
+ onData: (chunk) => {}, // Optional: Callback for each chunk
+});
+```
+
+## Complete Example: AI Streaming
+
+### Define the stream
+
+```ts
+// app/streams.ts
+import { streams, InferStreamType } from "@trigger.dev/sdk";
+import { UIMessageChunk } from "ai";
+
+export const aiStream = streams.define({
+ id: "ai",
+});
+
+export type AIStreamPart = InferStreamType;
+```
+
+### Create the task
+
+```ts
+// trigger/ai-task.ts
+import { task } from "@trigger.dev/sdk";
+import { openai } from "@ai-sdk/openai";
+import { streamText } from "ai";
+import { aiStream } from "@/app/streams";
+
+export const generateAI = task({
+ id: "generate-ai",
+ run: async (payload: { prompt: string }) => {
+ const result = streamText({
+ model: openai("gpt-4o"),
+ prompt: payload.prompt,
+ });
+
+ const { waitUntilComplete } = aiStream.pipe(result.toUIMessageStream());
+
+ await waitUntilComplete();
+
+ return { success: true };
+ },
+});
+```
+
+### Frontend component
+
+```tsx
+// components/ai-stream.tsx
+"use client";
+
+import { useRealtimeStream } from "@trigger.dev/react-hooks";
+import { aiStream } from "@/app/streams";
+
+export function AIStream({ accessToken, runId }: { accessToken: string; runId: string }) {
+ const { parts, error } = useRealtimeStream(aiStream, runId, {
+ accessToken,
+ timeoutInSeconds: 300,
+ });
+
+ if (error) return Error: {error.message}
;
+ if (!parts) return Loading...
;
+
+ return (
+
+ {parts.map((part, i) => (
+ {part}
+ ))}
+
+ );
+}
+```
+
+## Migration from v1
+
+If you're using the old `metadata.stream()` API, here's how to migrate to the recommended v2 approach:
+
+### Step 1: Define Your Streams
+
+Create a shared streams definition file:
+
+```ts
+// app/streams.ts or trigger/streams.ts
+import { streams, InferStreamType } from "@trigger.dev/sdk";
+
+export const myStream = streams.define({
+ id: "my-stream",
+});
+
+export type MyStreamPart = InferStreamType;
+```
+
+### Step 2: Update Your Tasks
+
+Replace `metadata.stream()` with the defined stream's `pipe()` method:
+
+```ts
+// Before (v1)
+import { metadata, task } from "@trigger.dev/sdk";
+
+export const myTask = task({
+ id: "my-task",
+ run: async (payload) => {
+ const stream = getDataStream();
+ await metadata.stream("my-stream", stream);
+ },
+});
+```
+
+```ts
+// After (v2 - Recommended)
+import { task } from "@trigger.dev/sdk";
+import { myStream } from "./streams";
+
+export const myTask = task({
+ id: "my-task",
+ run: async (payload) => {
+ const stream = getDataStream();
+
+ // Don't await - returns immediately
+ const { waitUntilComplete } = myStream.pipe(stream);
+
+ // Optionally wait for completion
+ await waitUntilComplete();
+ },
+});
+```
+
+### Step 3: Update Your Frontend
+
+Use the defined stream with `useRealtimeStream`:
+
+```tsx
+// Before
+const { parts, error } = useRealtimeStream(runId, "my-stream", {
+ accessToken,
+});
+```
+
+```tsx
+// After
+import { myStream } from "@/app/streams";
+
+const { parts, error } = useRealtimeStream(myStream, runId, {
+ accessToken,
+});
+```
+
+### Alternative: Direct Methods (Not Recommended)
+
+If you prefer not to use defined streams, you can use direct methods:
+
+```ts
+import { streams, task } from "@trigger.dev/sdk";
+
+export const myTask = task({
+ id: "my-task",
+ run: async (payload) => {
+ const stream = getDataStream();
+ const { waitUntilComplete } = streams.pipe("my-stream", stream);
+ await waitUntilComplete();
+ },
+});
+```
+
+## Reliability Features
+
+Streams v2 includes automatic reliability improvements:
+
+- **Automatic resumption**: If a connection is lost, both appending and reading will automatically resume from the last successful chunk
+- **No data loss**: Network issues won't cause stream data to be lost
+- **Idempotent operations**: Duplicate chunks are automatically handled
+
+These improvements happen automatically - no code changes needed.
+
+## Dashboard Integration
+
+Streams are now visible in the Trigger.dev dashboard, allowing you to:
+
+- View stream data in real-time as it's generated
+- Inspect historical stream data for completed runs
+- Debug streaming issues with full visibility into chunk delivery
+
+
+
+## Best Practices
+
+1. **Always use `streams.define()`**: Define your streams in a shared location for better organization, type safety, and code reusability. This is the recommended approach for all streams.
+2. **Export stream types**: Use `InferStreamType` to export types for your frontend components
+3. **Handle errors gracefully**: Always check for errors when reading streams in your UI
+4. **Set appropriate timeouts**: Adjust `timeoutInSeconds` based on your use case (AI completions may need longer timeouts)
+5. **Target parent runs**: When orchestrating with child tasks, pipe to parent runs for easier consumption
+6. **Throttle frontend updates**: Use `throttleInMs` in `useRealtimeStream` to prevent excessive re-renders
+7. **Use descriptive stream IDs**: Choose clear, descriptive IDs like `"ai-output"` or `"progress"` instead of generic names
+
+## Troubleshooting
+
+### Stream not appearing in dashboard
+
+- Ensure you've enabled Streams v2 via the future flag or environment variable
+- Verify your task is actually writing to the stream
+- Check that the stream key matches between writing and reading
+
+### Stream timeout errors
+
+- Increase `timeoutInSeconds` in your `read()` or `useRealtimeStream()` calls
+- Ensure your stream source is actively producing data
+- Check network connectivity between your application and Trigger.dev
+
+### Missing chunks
+
+- With v2, chunks should never be lost due to automatic resumption
+- Verify you're reading from the correct stream key
+- Check the `startIndex` option if you're not seeing expected chunks