Skip to content

feat (ai): restructure chat transports #6746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a13a36c
1
lgrammel Jun 14, 2025
666add4
2
lgrammel Jun 14, 2025
fcfebe5
3
lgrammel Jun 14, 2025
758ed47
4
lgrammel Jun 14, 2025
27b95da
5
lgrammel Jun 14, 2025
c2f6957
6
lgrammel Jun 14, 2025
a6581d2
7
lgrammel Jun 14, 2025
f310538
resume on switch
lgrammel Jun 14, 2025
228073a
x
lgrammel Jun 14, 2025
d8240b7
re
lgrammel Jun 14, 2025
ae4c2f2
fx
lgrammel Jun 14, 2025
ca56db9
1
lgrammel Jun 15, 2025
92b2ca4
123
lgrammel Jun 15, 2025
95bd191
re
lgrammel Jun 15, 2025
a6fc316
fcx
lgrammel Jun 15, 2025
27666d4
fx
lgrammel Jun 15, 2025
b5989c4
x
lgrammel Jun 16, 2025
7af4980
simpler
lgrammel Jun 16, 2025
4cfc07f
x
lgrammel Jun 16, 2025
3f26739
xx
lgrammel Jun 16, 2025
f94fc99
x
lgrammel Jun 16, 2025
4d59539
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 16, 2025
47b6588
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 17, 2025
634ae7b
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 17, 2025
c14c763
Merge branch 'lg/gZmVk3BO' of https://github.com/vercel/ai into lg/gZ…
lgrammel Jun 17, 2025
3397bf7
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 17, 2025
7d4c24f
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 17, 2025
c30c767
Merge branch 'v5' into lg/gZmVk3BO
lgrammel Jun 17, 2025
8ac4e88
Merge branch 'lg/gZmVk3BO' of https://github.com/vercel/ai into lg/gZ…
lgrammel Jun 17, 2025
5317818
1
lgrammel Jun 17, 2025
8d9869c
2
lgrammel Jun 17, 2025
6463fb2
3
lgrammel Jun 17, 2025
2b07d0a
4
lgrammel Jun 17, 2025
fa4e7c0
5
lgrammel Jun 17, 2025
987d7e9
6
lgrammel Jun 17, 2025
b09cc3d
7
lgrammel Jun 17, 2025
0a16db3
fx
lgrammel Jun 17, 2025
8d71ca1
e
lgrammel Jun 17, 2025
8e5c19f
cs
lgrammel Jun 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tricky-ravens-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': major
---

feat (ai): restructure chat transports
44 changes: 44 additions & 0 deletions examples/next-openai/app/api/use-chat-resume/[id]/stream/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { loadStreams } from '@/util/chat-store';
import { createUIMessageStream, JsonToSseTransformStream } from 'ai';
import { after } from 'next/server';
import { createResumableStreamContext } from 'resumable-stream';

// Allow streaming responses up to 30 seconds
export const maxDuration = 30;

export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;

if (!id) {
return new Response('id is required', { status: 400 });
}

const streamIds = await loadStreams(id);

if (!streamIds.length) {
return new Response('No streams found', { status: 204 });
}

const recentStreamId = streamIds.at(-1);

if (!recentStreamId) {
return new Response('No recent stream found', { status: 204 });
}

const emptyDataStream = createUIMessageStream({
execute: () => {},
});

const streamContext = createResumableStreamContext({
waitUntil: after,
});

return new Response(
await streamContext.resumableStream(recentStreamId, () =>
emptyDataStream.pipeThrough(new JsonToSseTransformStream()),
),
);
}
36 changes: 0 additions & 36 deletions examples/next-openai/app/api/use-chat-resume/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
appendMessageToChat,
appendStreamId,
loadStreams,
saveChat,
} from '@/util/chat-store';
import { openai } from '@ai-sdk/openai';
Expand Down Expand Up @@ -64,38 +63,3 @@ export async function POST(req: Request) {
),
);
}

export async function GET(request: Request) {
const streamContext = createResumableStreamContext({
waitUntil: after,
});

const { searchParams } = new URL(request.url);
const chatId = searchParams.get('chatId');

if (!chatId) {
return new Response('id is required', { status: 400 });
}

const streamIds = await loadStreams(chatId);

if (!streamIds.length) {
return new Response('No streams found', { status: 404 });
}

const recentStreamId = streamIds.at(-1);

if (!recentStreamId) {
return new Response('No recent stream found', { status: 404 });
}

const emptyDataStream = createUIMessageStream({
execute: () => {},
});

return new Response(
await streamContext.resumableStream(recentStreamId, () =>
emptyDataStream.pipeThrough(new JsonToSseTransformStream()),
),
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export default function Chat({
transport: new DefaultChatTransport({
api: '/api/use-chat-persistence-single-message',
// only send the last message to the server:
prepareRequest({ messages, id }) {
prepareSubmitMessagesRequest({ messages, id }) {
return { body: { message: messages[messages.length - 1], id } };
},
}),
Expand Down
27 changes: 27 additions & 0 deletions examples/next/app/api/chat/[id]/stream/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { readChat } from '@util/chat-store';
import { UI_MESSAGE_STREAM_HEADERS } from 'ai';
import { after } from 'next/server';
import { createResumableStreamContext } from 'resumable-stream';

export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;

const chat = await readChat(id);

if (chat.activeStreamId == null) {
// no content response when there is no active stream
return new Response(null, { status: 204 });
}

const streamContext = createResumableStreamContext({
waitUntil: after,
});

return new Response(
await streamContext.resumeExistingStream(chat.activeStreamId),
{ headers: UI_MESSAGE_STREAM_HEADERS },
);
}
22 changes: 17 additions & 5 deletions examples/next/app/api/chat/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { MyUIMessage } from '@/util/chat-schema';
import { readChat, saveChat } from '@util/chat-store';
import { convertToModelMessages, streamText } from 'ai';
import { convertToModelMessages, generateId, streamText } from 'ai';
import { after } from 'next/server';
import { createResumableStreamContext } from 'resumable-stream';

export async function POST(req: Request) {
const { message, id }: { message: MyUIMessage; id: string } =
Expand All @@ -9,13 +11,14 @@ export async function POST(req: Request) {
const chat = await readChat(id);
const messages = [...chat.messages, message];

// save the user message
saveChat({ id, messages, activeStreamId: null });

const result = streamText({
model: 'openai/gpt-4o-mini',
messages: convertToModelMessages(messages),
});

result.consumeStream(); // TODO always consume the stream even when the client disconnects

return result.toUIMessageStreamResponse({
originalMessages: messages,
messageMetadata: ({ part }) => {
Expand All @@ -24,8 +27,17 @@ export async function POST(req: Request) {
}
},
onFinish: ({ messages }) => {
// TODO fix type safety
saveChat({ id, messages: messages as MyUIMessage[] });
saveChat({ id, messages, activeStreamId: null });
},
async consumeSseStream({ stream }) {
const streamId = generateId();

// send the sse stream into a resumable stream sink as well:
const streamContext = createResumableStreamContext({ waitUntil: after });
await streamContext.createNewResumableStream(streamId, () => stream);

// update the chat with the streamId
saveChat({ id, activeStreamId: streamId });
},
});
}
5 changes: 4 additions & 1 deletion examples/next/app/chat/[chatId]/chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ import Message from './message';
export default function ChatComponent({
chatData,
isNewChat = false,
resume = false,
}: {
chatData: { id: string; messages: MyUIMessage[] };
isNewChat?: boolean;
resume?: boolean;
}) {
const inputRef = useRef<HTMLInputElement>(null);

const { status, sendMessage, messages } = useChat({
id: chatData.id,
messages: chatData.messages,
resume,
transport: new DefaultChatTransport({
// only send the last message to the server to limit the request size:
prepareRequest: ({ id, messages }) => ({
prepareSubmitMessagesRequest: ({ id, messages }) => ({
body: { id, message: messages[messages.length - 1] },
}),
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/next/app/chat/[chatId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default async function Page(props: {
</li>
))}
</ul>
<Chat chatData={chatData} />;
<Chat chatData={chatData} resume={chatData.activeStreamId !== null} />;
</div>
);
}
3 changes: 1 addition & 2 deletions examples/next/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
"react": "^18",
"react-dom": "^18",
"react-markdown": "9.0.1",
"redis": "^4.7.0",
"resumable-stream": "^2.0.0",
"resumable-stream": "^2.2.0",
"zod": "3.25.49"
},
"devDependencies": {
Expand Down
1 change: 1 addition & 0 deletions examples/next/util/chat-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ export type ChatData = {
id: string;
messages: MyUIMessage[];
createdAt: number;
activeStreamId: string | null;
};
40 changes: 13 additions & 27 deletions examples/next/util/chat-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@ export async function createChat(): Promise<string> {

export async function saveChat({
id,
activeStreamId,
messages,
}: {
id: string;
messages: MyUIMessage[];
activeStreamId?: string | null;
messages?: MyUIMessage[];
}): Promise<void> {
const chat = await readChat(id);
chat.messages = messages;

if (messages !== undefined) {
chat.messages = messages;
}

if (activeStreamId !== undefined) {
chat.activeStreamId = activeStreamId;
}

writeChat(chat);
}

Expand All @@ -42,6 +52,7 @@ async function writeChat(chat: ChatData) {
await writeFile(await getChatFile(chat.id), JSON.stringify(chat, null, 2));
}

// TODO return null if the chat does not exist
export async function readChat(id: string): Promise<ChatData> {
return JSON.parse(await readFile(await getChatFile(id), 'utf8'));
}
Expand Down Expand Up @@ -72,28 +83,3 @@ async function getChatFile(id: string): Promise<string> {

return chatFile;
}

export async function appendStreamId({
id,
streamId,
}: {
id: string;
streamId: string;
}) {
const file = getStreamsFile(id);
const streams = await loadStreams(id);
streams.push(streamId);
await writeFile(file, JSON.stringify(streams, null, 2));
}

export async function loadStreams(id: string): Promise<string[]> {
const file = getStreamsFile(id);
if (!existsSync(file)) return [];
return JSON.parse(await readFile(file, 'utf8'));
}

function getStreamsFile(id: string): string {
const chatDir = path.join(process.cwd(), '.streams');
if (!existsSync(chatDir)) mkdirSync(chatDir, { recursive: true });
return path.join(chatDir, `${id}.json`);
}
7 changes: 6 additions & 1 deletion packages/ai/src/ui/chat-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export interface ChatTransport<UI_MESSAGE extends UIMessage> {
chatId: string;
messages: UI_MESSAGE[];
abortSignal: AbortSignal | undefined;
requestType: 'generate' | 'resume'; // TODO have separate functions
} & ChatRequestOptions,
) => Promise<ReadableStream<UIMessageStreamPart>>;

reconnectToStream: (
options: {
chatId: string;
} & ChatRequestOptions,
) => Promise<ReadableStream<UIMessageStreamPart> | null>;
}
35 changes: 26 additions & 9 deletions packages/ai/src/ui/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
type UIDataTypes,
type UIMessage,
} from './ui-messages';
import { UIMessageStreamPart } from '../ui-message-stream/ui-message-stream-parts';

export type CreateUIMessage<UI_MESSAGE extends UIMessage> = Omit<
UI_MESSAGE,
Expand Down Expand Up @@ -384,15 +385,31 @@ export abstract class AbstractChat<UI_MESSAGE extends UIMessage> {

this.activeResponse = activeResponse;

const stream = await this.transport.submitMessages({
chatId: this.id,
messages: this.state.messages,
abortSignal: activeResponse.abortController.signal,
metadata,
headers,
body,
requestType,
});
let stream: ReadableStream<UIMessageStreamPart>;

if (requestType === 'resume') {
const reconnect = await this.transport.reconnectToStream({
chatId: this.id,
metadata,
headers,
body,
});

if (reconnect == null) {
return; // no active stream found, so we do not resume
}

stream = reconnect;
} else {
stream = await this.transport.submitMessages({
chatId: this.id,
messages: this.state.messages,
abortSignal: activeResponse.abortController.signal,
metadata,
headers,
body,
});
}

const runUpdateMessageJob = (
job: (options: {
Expand Down
Loading
Loading