1
1
import {
2
2
convertToModelMessages ,
3
3
createUIMessageStream ,
4
- createUIMessageStreamResponse ,
4
+ JsonToSseTransformStream ,
5
5
smoothStream ,
6
6
stepCountIs ,
7
7
streamText ,
@@ -15,7 +15,6 @@ import {
15
15
getChatById ,
16
16
getMessageCountByUserId ,
17
17
getMessagesByChatId ,
18
- getStreamIdsByChatId ,
19
18
saveChat ,
20
19
saveMessages ,
21
20
} from '@/lib/db/queries' ;
@@ -24,45 +23,19 @@ import { isProductionEnvironment } from '@/lib/constants';
24
23
import { myProvider } from '@/lib/ai/providers' ;
25
24
import { postRequestBodySchema , type PostRequestBody } from './schema' ;
26
25
import { geolocation } from '@vercel/functions' ;
27
- import {
28
- createResumableStreamContext ,
29
- type ResumableStreamContext ,
30
- } from 'resumable-stream' ;
31
- import { after } from 'next/server' ;
32
- import type { Chat } from '@/lib/db/schema' ;
33
- import { differenceInSeconds } from 'date-fns' ;
34
26
import { ChatSDKError } from '@/lib/errors' ;
35
27
import { createDocument } from '@/lib/ai/tools/create-document' ;
36
28
import { updateDocument } from '@/lib/ai/tools/update-document' ;
37
29
import { requestSuggestions } from '@/lib/ai/tools/request-suggestions' ;
38
30
import { generateTitleFromUserMessage } from '../../actions' ;
39
- import { convertToStringStream , generateUUID } from '@/lib/utils' ;
31
+ import { generateUUID } from '@/lib/utils' ;
40
32
import { entitlementsByUserType } from '@/lib/ai/entitlements' ;
33
+ import type { ChatMessage } from '@/lib/types' ;
34
+ import { createResumableStreamContext } from 'resumable-stream' ;
35
+ import { after } from 'next/server' ;
41
36
42
37
export const maxDuration = 60 ;
43
38
44
- let globalStreamContext : ResumableStreamContext | null = null ;
45
-
46
- function getStreamContext ( ) {
47
- if ( ! globalStreamContext ) {
48
- try {
49
- globalStreamContext = createResumableStreamContext ( {
50
- waitUntil : after ,
51
- } ) ;
52
- } catch ( error : any ) {
53
- if ( error . message . includes ( 'REDIS_URL' ) ) {
54
- console . log (
55
- ' > Resumable streams are disabled due to missing REDIS_URL' ,
56
- ) ;
57
- } else {
58
- console . error ( error ) ;
59
- }
60
- }
61
- }
62
-
63
- return globalStreamContext ;
64
- }
65
-
66
39
export async function POST ( request : Request ) {
67
40
let requestBody : PostRequestBody ;
68
41
@@ -74,6 +47,10 @@ export async function POST(request: Request) {
74
47
}
75
48
76
49
try {
50
+ const streamContext = createResumableStreamContext ( {
51
+ waitUntil : after ,
52
+ } ) ;
53
+
77
54
const { id, message, selectedChatModel, selectedVisibilityType } =
78
55
requestBody ;
79
56
@@ -164,17 +141,23 @@ export async function POST(request: Request) {
164
141
isEnabled : isProductionEnvironment ,
165
142
functionId : 'stream-text' ,
166
143
} ,
144
+ _internal : {
145
+ generateId : generateUUID ,
146
+ } ,
167
147
} ) ;
168
148
149
+ result . consumeStream ( ) ;
150
+
169
151
streamWriter . merge (
170
- result . toUIMessageStream ( {
152
+ result . toUIMessageStream < ChatMessage > ( {
171
153
sendReasoning : true ,
172
- newMessageId : generateUUID ( ) ,
173
154
onFinish : async ( { responseMessage } ) => {
174
155
await saveMessages ( {
175
156
messages : [
176
157
{
177
- ...responseMessage ,
158
+ id : responseMessage . id ,
159
+ role : 'assistant' ,
160
+ parts : responseMessage . parts ,
178
161
createdAt : new Date ( ) ,
179
162
attachments : [ ] ,
180
163
chatId : id ,
@@ -184,126 +167,24 @@ export async function POST(request: Request) {
184
167
} ,
185
168
} ) ,
186
169
) ;
187
-
188
- result . consumeStream ( ) ;
189
170
} ,
190
171
onError : ( ) => {
191
172
return 'Oops! Something went wrong, please try again later.' ;
192
173
} ,
193
174
} ) ;
194
175
195
- const streamContext = getStreamContext ( ) ;
196
-
197
- if ( streamContext ) {
198
- return new Response (
199
- await streamContext . resumableStream ( streamId , ( ) =>
200
- convertToStringStream ( stream ) ,
201
- ) ,
202
- ) ;
203
- } else {
204
- return createUIMessageStreamResponse ( { stream } ) ;
205
- }
176
+ return new Response (
177
+ await streamContext . resumableStream ( streamId , ( ) =>
178
+ stream . pipeThrough ( new JsonToSseTransformStream ( ) ) ,
179
+ ) ,
180
+ ) ;
206
181
} catch ( error ) {
207
182
if ( error instanceof ChatSDKError ) {
208
183
return error . toResponse ( ) ;
209
184
}
210
185
}
211
186
}
212
187
213
- export async function GET ( request : Request ) {
214
- const streamContext = getStreamContext ( ) ;
215
- const resumeRequestedAt = new Date ( ) ;
216
-
217
- if ( ! streamContext ) {
218
- return new Response ( null , { status : 204 } ) ;
219
- }
220
-
221
- const { searchParams } = new URL ( request . url ) ;
222
- const chatId = searchParams . get ( 'chatId' ) ;
223
-
224
- if ( ! chatId ) {
225
- return new ChatSDKError ( 'bad_request:api' ) . toResponse ( ) ;
226
- }
227
-
228
- const session = await auth ( ) ;
229
-
230
- if ( ! session ?. user ) {
231
- return new ChatSDKError ( 'unauthorized:chat' ) . toResponse ( ) ;
232
- }
233
-
234
- let chat : Chat ;
235
-
236
- try {
237
- chat = await getChatById ( { id : chatId } ) ;
238
- } catch {
239
- return new ChatSDKError ( 'not_found:chat' ) . toResponse ( ) ;
240
- }
241
-
242
- if ( ! chat ) {
243
- return new ChatSDKError ( 'not_found:chat' ) . toResponse ( ) ;
244
- }
245
-
246
- if ( chat . visibility === 'private' && chat . userId !== session . user . id ) {
247
- return new ChatSDKError ( 'forbidden:chat' ) . toResponse ( ) ;
248
- }
249
-
250
- const streamIds = await getStreamIdsByChatId ( { chatId } ) ;
251
-
252
- if ( ! streamIds . length ) {
253
- return new ChatSDKError ( 'not_found:stream' ) . toResponse ( ) ;
254
- }
255
-
256
- const recentStreamId = streamIds . at ( - 1 ) ;
257
-
258
- if ( ! recentStreamId ) {
259
- return new ChatSDKError ( 'not_found:stream' ) . toResponse ( ) ;
260
- }
261
-
262
- const emptyDataStream = createUIMessageStream ( {
263
- execute : ( ) => { } ,
264
- } ) ;
265
-
266
- const stream = await streamContext . resumableStream ( recentStreamId , ( ) =>
267
- convertToStringStream ( emptyDataStream ) ,
268
- ) ;
269
-
270
- /*
271
- * For when the generation is streaming during SSR
272
- * but the resumable stream has concluded at this point.
273
- */
274
- if ( ! stream ) {
275
- const messages = await getMessagesByChatId ( { id : chatId } ) ;
276
- const mostRecentMessage = messages . at ( - 1 ) ;
277
-
278
- if ( ! mostRecentMessage ) {
279
- return new Response ( emptyDataStream , { status : 200 } ) ;
280
- }
281
-
282
- if ( mostRecentMessage . role !== 'assistant' ) {
283
- return new Response ( emptyDataStream , { status : 200 } ) ;
284
- }
285
-
286
- const messageCreatedAt = new Date ( mostRecentMessage . createdAt ) ;
287
-
288
- if ( differenceInSeconds ( resumeRequestedAt , messageCreatedAt ) > 15 ) {
289
- return new Response ( emptyDataStream , { status : 200 } ) ;
290
- }
291
-
292
- const restoredStream = createUIMessageStream ( {
293
- execute : ( { writer } ) => {
294
- writer . write ( {
295
- type : 'data-append-in-flight-message' ,
296
- data : mostRecentMessage ,
297
- } ) ;
298
- } ,
299
- } ) ;
300
-
301
- return new Response ( restoredStream , { status : 200 } ) ;
302
- }
303
-
304
- return new Response ( stream , { status : 200 } ) ;
305
- }
306
-
307
188
export async function DELETE ( request : Request ) {
308
189
const { searchParams } = new URL ( request . url ) ;
309
190
const id = searchParams . get ( 'id' ) ;
0 commit comments