Skip to content
Merged
124 changes: 111 additions & 13 deletions apps/webapp/app/components/runs/v3/agent/AgentView.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { UIMessage } from "@ai-sdk/react";
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3";
import { useEffect, useMemo, useRef, useState } from "react";
import { Paragraph } from "~/components/primitives/Paragraph";
import { Spinner } from "~/components/primitives/Spinner";
Expand Down Expand Up @@ -27,6 +27,15 @@ export type AgentViewAuth = {
* channel and is merged in by the AgentView subscription.
*/
initialMessages: UIMessage[];
/**
* Presigned GET URL for the session's chat-snapshot S3 blob (written
* by the agent after each turn-complete; see `ChatSnapshotV1`).
* Optional — sessions that registered a `hydrateMessages` hook skip
* snapshot writes and the URL fetch will 404. In that case the
* dashboard falls back to seq=0 SSE (which, post-trim, shows only the
* most recent turn). Generated server-side by `SessionPresenter`.
*/
snapshotPresignedUrl?: string;
};

/**
Expand Down Expand Up @@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
projectSlug: project.slug,
envSlug: environment.slug,
initialMessages: agentView.initialMessages,
snapshotPresignedUrl: agentView.snapshotPresignedUrl,
});

// Sticky-bottom auto-scroll: walks up to find the inspector's scroll
Expand Down Expand Up @@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
* - `kind: "stop"` is a stop signal — no messages, nothing to render
* here, so it's filtered.
*
* Wire payloads are slim-wire (one new UIMessage per record, on
* `payload.message`). The legacy `payload.messages` array shape is kept
* here as a fallback so any historical records on a long-lived session
* still render.
*
* The server wraps records in `{data, id}` and writes `data` as a JSON
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
* re-parses to recover the object.
*/
type InputStreamChunk = {
kind?: "message" | "stop";
payload?: {
message?: { id?: string; role?: string; parts?: unknown[] };
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
trigger?: string;
};
Expand Down Expand Up @@ -217,13 +233,15 @@ function useAgentSessionMessages({
projectSlug,
envSlug,
initialMessages,
snapshotPresignedUrl,
}: {
sessionId: string;
apiOrigin: string;
orgSlug: string;
projectSlug: string;
envSlug: string;
initialMessages: UIMessage[];
snapshotPresignedUrl?: string;
}): UIMessage[] {
// Seed with the user messages from the run's task payload.
const seedMessages = useMemo(
Expand Down Expand Up @@ -285,27 +303,92 @@ function useAgentSessionMessages({
const outputUrl = `${sessionBase}/out`;
const inputUrl = `${sessionBase}/in`;

/**
* Try to seed `pendingRef` from the agent's S3 snapshot blob and return
* the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
* just past the snapshot. Returns undefined for sessions that don't
* have a snapshot (e.g. `hydrateMessages` customers, or sessions that
* have never completed a turn).
*/
const loadSnapshot = async (): Promise<string | undefined> => {
if (!snapshotPresignedUrl) return undefined;
try {
const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal });
if (!resp.ok) return undefined;
const json = (await resp.json()) as unknown;
const parsed = ChatSnapshotV1Schema.safeParse(json);
if (!parsed.success) return undefined;
const snapshot = parsed.data;
// Preserve the snapshot's array order in the final render by
// giving each message a unique, monotonically increasing
// timestamp from `(savedAt - count + index)`. Real chunk
// timestamps from the SSE path use S2 arrival ms (positive
// numbers in the present), so anything below `savedAt` sorts
// before live chunks while preserving snapshot order among
// themselves.
const count = snapshot.messages.length;
snapshot.messages.forEach((raw, i) => {
const message = raw as UIMessage;
if (!message?.id) return;
// The snapshot's seed wins over the task-payload seed for any
// overlapping ids (the snapshot represents the agent's
// canonical accumulator, post-turn).
pendingRef.current.set(message.id, message);
if (!timestampsRef.current.has(message.id)) {
timestampsRef.current.set(message.id, snapshot.savedAt - count + i);
}
});
scheduleFlush.current();
return snapshot.lastOutEventId;
} catch {
// 404 / network / parse / abort — fall back to seq=0 SSE
return undefined;
}
};

const outputSubOptions = (lastEventId: string | undefined) =>
({
signal: abort.signal,
timeoutInSeconds: 120,
...(lastEventId !== undefined ? { lastEventId } : {}),
}) as const;

const commonSubOptions = {
signal: abort.signal,
timeoutInSeconds: 120,
} as const;

// ---- Output stream: assistant messages ---------------------------------
//
// The output stream delivers UIMessageChunks interleaved with
// Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
// filter the control chunks and fold everything else into an assistant
// `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
// `readUIMessageStream` helper is only available in `ai@6`, and the
// webapp is pinned to `ai@4`, so we re-implement just the chunk types
// that `renderPart` actually displays.
// The output stream delivers data records (UIMessageChunks) interleaved
// with Trigger control records (`turn-complete`, `upgrade-required`) and
// S2 command records (`trim`). Control + command records ride on
// `record.headers` with empty bodies; the SSE parser strips S2 command
// records entirely, and control records arrive with `value.chunk ===
// undefined`, which `parseChunkPayload` drops below.
//
// We fold everything else into an assistant `UIMessage` via our own
// `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
// helper is only available in `ai@6`, and the webapp is pinned to
// `ai@4`, so we re-implement just the chunk types that `renderPart`
// actually displays.
//
// We capture the **server timestamp of each assistant message's first
// `start` chunk** so later sort-by-timestamp merges with the input
// stream correctly.
const runOutput = async () => {
try {
const sub = new SSEStreamSubscription(outputUrl, commonSubOptions);
// Seed messages from the snapshot first (if available), then
// resume the SSE from the snapshot's last event id so we don't
// re-stream chunks already represented in the snapshot. If no
// snapshot exists (no URL, 404, parse failure), the SSE opens
// at seq=0 — which, post-trim, contains roughly one turn of
// records (acceptable fallback for `hydrateMessages` sessions
// and fresh sessions).
const snapshotLastEventId = await loadSnapshot();
if (abort.signal.aborted) return;

const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId));
const raw = await sub.subscribe();
const reader = raw.getReader();

Expand All @@ -318,6 +401,12 @@ function useAgentSessionMessages({

const chunk = parseChunkPayload(value.chunk) as OutputChunk | null;
if (!chunk || typeof chunk.type !== "string") continue;
// Legacy belt-and-suspenders: prior versions of the SDK
// emitted `trigger:turn-complete` / `trigger:upgrade-required`
// as data records (`type` field). Current versions use
// header-form control records, which `parseChunkPayload`
// drops above. Keep this filter to handle any in-flight
// sessions whose `.out` was populated by the older SDK.
if (chunk.type.startsWith("trigger:")) continue;

if (chunk.type === "start") {
Expand Down Expand Up @@ -413,9 +502,18 @@ function useAgentSessionMessages({
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
if (!chunk || chunk.kind !== "message") continue;
const payload = chunk.payload;
if (!payload || !Array.isArray(payload.messages)) continue;

const incomingUsers = payload.messages.filter(
if (!payload) continue;

// Slim-wire is one UIMessage on `payload.message`; legacy
// payloads carried an array on `payload.messages`. Accept
// either so historical records on a long-lived session still
// render.
const candidates = Array.isArray(payload.messages)
? payload.messages
: payload.message
? [payload.message]
: [];
const incomingUsers = candidates.filter(
(m): m is UIMessage =>
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
);
Expand Down Expand Up @@ -454,7 +552,7 @@ function useAgentSessionMessages({
pendingTimerRef.current = null;
}
};
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]);

return useMemo(() => {
const timestamps = timestampsRef.current;
Expand Down
52 changes: 52 additions & 0 deletions apps/webapp/app/presenters/v3/SessionPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { type Span } from "@opentelemetry/api";
import { chatSnapshotKeySuffix } from "@trigger.dev/core/v3";
import { type PrismaClientOrTransaction } from "@trigger.dev/database";
import { env } from "~/env.server";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import { logger } from "~/services/logger.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { startActiveSpan } from "~/v3/tracer.server";

Expand All @@ -15,6 +18,8 @@ export class SessionPresenter {
userId: string;
environmentId: string;
sessionParam: string;
projectExternalRef: string;
environmentSlug: string;
}) {
return startActiveSpan(
"SessionPresenter.call",
Expand All @@ -33,10 +38,14 @@ export class SessionPresenter {
userId,
environmentId,
sessionParam,
projectExternalRef,
environmentSlug,
}: {
userId: string;
environmentId: string;
sessionParam: string;
projectExternalRef: string;
environmentSlug: string;
},
rootSpan: Span
) {
Expand Down Expand Up @@ -112,6 +121,48 @@ export class SessionPresenter {
// unused — kept here to match the existing `AgentViewAuth` shape.
const addressingKey = session.externalId ?? session.friendlyId;

// Presign a GET URL for the agent's S3 snapshot blob. The browser
// fetches it directly, parses + validates, and seeds the
// TriggerChatTransport with the full history + lastEventId before
// opening the SSE. Presign succeeds regardless of whether the blob
// exists; the frontend handles 404 gracefully.
//
// Snapshots are only written when no `hydrateMessages` hook is
// registered — sessions that use `hydrateMessages` will 404 here
// and the dashboard falls back to seq=0 SSE (which, post-trim,
// shows only the most recent turn — accepted, those customers
// have their own DB-backed dashboards).
// The agent writes snapshots keyed on the session's friendlyId (the
// `session_*` form), which matches what the SDK's `chat.agent` payload
// carries as `sessionId`. Use the same key shape here so the dashboard
// hits the same S3 object.
let snapshotPresignedUrl: string | undefined;
try {
const signed = await startActiveSpan(
"SessionPresenter.presignSnapshot",
async () =>
generatePresignedUrl(
projectExternalRef,
environmentSlug,
chatSnapshotKeySuffix(session.friendlyId),
"GET"
)
);
if (signed.success) {
snapshotPresignedUrl = signed.url;
} else {
logger.warn("SessionPresenter: snapshot presign failed", {
sessionId: session.id,
error: signed.error,
});
}
} catch (error) {
logger.warn("SessionPresenter: snapshot presign threw", {
sessionId: session.id,
error: error instanceof Error ? error.message : String(error),
});
}

return {
id: session.id,
friendlyId: session.friendlyId,
Expand Down Expand Up @@ -147,6 +198,7 @@ export class SessionPresenter {
apiOrigin: env.API_ORIGIN || env.LOGIN_ORIGIN,
sessionId: addressingKey,
initialMessages: [],
snapshotPresignedUrl,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
userId,
environmentId: environment.id,
sessionParam,
projectExternalRef: project.externalRef,
environmentSlug: environment.slug,
});

if (!session) {
Expand Down
Loading
Loading