Skip to content

Commit 68acac6

Browse files
authored
fix(stage-ui): restore streaming output; wrap streamText so finish/error truly gate completion (#714)
1 parent 5d2b7cd commit 68acac6

File tree

4 files changed

+132
-93
lines changed

4 files changed

+132
-93
lines changed

apps/stage-tamagotchi/src/renderer/components/ChatHistory.vue

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { MarkdownRenderer } from '@proj-airi/stage-ui/components'
33
import { useChatStore } from '@proj-airi/stage-ui/stores/chat'
44
import { useBroadcastChannel } from '@vueuse/core'
55
import { storeToRefs } from 'pinia'
6-
import { nextTick, ref, watch } from 'vue'
6+
import { ref, watch } from 'vue'
77
import { useI18n } from 'vue-i18n'
88
99
const chatHistoryRef = ref<HTMLDivElement>()
@@ -31,25 +31,30 @@ watch(presentEvent, (ev) => {
3131
}
3232
})
3333
34-
onBeforeMessageComposed(async () => {
35-
// Scroll down to the new sent message
36-
nextTick().then(() => {
37-
if (!chatHistoryRef.value)
38-
return
34+
function scrollToBottom() {
35+
requestAnimationFrame(() => {
36+
requestAnimationFrame(() => {
37+
if (!chatHistoryRef.value)
38+
return
3939
40-
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
40+
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
41+
})
4142
})
43+
}
44+
45+
onBeforeMessageComposed(async () => {
46+
// Scroll down to the new sent message
47+
await scrollToBottom()
4248
})
4349
4450
onTokenLiteral(async () => {
4551
// Scroll down to the new responding message
46-
nextTick().then(() => {
47-
if (!chatHistoryRef.value)
48-
return
49-
50-
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
51-
})
52+
await scrollToBottom()
5253
})
54+
55+
watch(sending, () => {
56+
scrollToBottom()
57+
}, { flush: 'post' })
5358
</script>
5459

5560
<template>

apps/stage-web/src/components/Widgets/ChatHistory.vue

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { MarkdownRenderer } from '@proj-airi/stage-ui/components'
33
import { useChatStore } from '@proj-airi/stage-ui/stores/chat'
44
import { storeToRefs } from 'pinia'
5-
import { nextTick, ref } from 'vue'
5+
import { ref, watch } from 'vue'
66
import { useI18n } from 'vue-i18n'
77
88
const chatHistoryRef = ref<HTMLDivElement>()
@@ -12,25 +12,30 @@ const { messages, sending, streamingMessage } = storeToRefs(useChatStore())
1212
1313
const { onBeforeMessageComposed, onTokenLiteral } = useChatStore()
1414
15-
onBeforeMessageComposed(async () => {
16-
// Scroll down to the new sent message
17-
nextTick().then(() => {
18-
if (!chatHistoryRef.value)
19-
return
15+
function scrollToBottom() {
16+
requestAnimationFrame(() => {
17+
requestAnimationFrame(() => {
18+
if (!chatHistoryRef.value)
19+
return
2020
21-
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
21+
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
22+
})
2223
})
24+
}
25+
26+
onBeforeMessageComposed(async () => {
27+
// Scroll down to the new sent message
28+
await scrollToBottom()
2329
})
2430
2531
onTokenLiteral(async () => {
2632
// Scroll down to the new responding message
27-
nextTick().then(() => {
28-
if (!chatHistoryRef.value)
29-
return
30-
31-
chatHistoryRef.value.scrollTop = chatHistoryRef.value.scrollHeight
32-
})
33+
await scrollToBottom()
3334
})
35+
36+
watch(sending, () => {
37+
scrollToBottom()
38+
}, { flush: 'post' })
3439
</script>
3540

3641
<template>

packages/stage-ui/src/stores/chat.ts

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,11 @@ export const useChatStore = defineStore('chat', () => {
114114
attachments?: { type: 'image', data: string, mimeType: string }[]
115115
},
116116
) {
117-
try {
118-
sending.value = true
119-
120-
if (!sendingMessage && !options.attachments?.length)
121-
return
117+
if (!sendingMessage && !options.attachments?.length)
118+
return
119+
sending.value = true
122120

121+
try {
123122
for (const hook of onBeforeMessageComposedHooks.value) {
124123
await hook(sendingMessage)
125124
}
@@ -209,53 +208,58 @@ export const useChatStore = defineStore('chat', () => {
209208

210209
await stream(options.model, options.chatProvider, newMessages as Message[], {
211210
headers,
212-
async onStreamEvent(event: StreamEvent) {
213-
if (event.type === 'tool-call') {
214-
toolCallQueue.enqueue({
215-
type: 'tool-call',
216-
toolCall: event,
217-
})
218-
}
219-
else if (event.type === 'tool-result') {
220-
toolCallQueue.enqueue({
221-
type: 'tool-call-result',
222-
id: event.toolCallId,
223-
result: event.result,
224-
})
225-
}
226-
else if (event.type === 'text-delta') {
227-
fullText += event.text
228-
await parser.consume(event.text)
211+
onStreamEvent: async (event: StreamEvent) => {
212+
switch (event.type) {
213+
case 'tool-call':
214+
toolCallQueue.enqueue({
215+
type: 'tool-call',
216+
toolCall: event,
217+
})
218+
break
219+
case 'tool-result':
220+
toolCallQueue.enqueue({
221+
type: 'tool-call-result',
222+
id: event.toolCallId,
223+
result: event.result,
224+
})
225+
break
226+
case 'text-delta':
227+
fullText += event.text
228+
await parser.consume(event.text)
229+
break
230+
case 'finish':
231+
// Do nothing, resolve
232+
break
233+
case 'error':
234+
throw event.error ?? new Error('Stream error')
229235
}
230-
else if (event.type === 'finish') {
231-
// Finalize the parsing of the actual message content
232-
await parser.end()
236+
},
237+
})
238+
// Finalize the parsing of the actual message content
239+
await parser.end()
233240

234-
// Add the completed message to the history only if it has content
235-
if (streamingMessage.value.slices.length > 0)
236-
messages.value.push(toRaw(streamingMessage.value))
241+
// Add the completed message to the history only if it has content
242+
if (streamingMessage.value.slices.length > 0)
243+
messages.value.push(toRaw(streamingMessage.value))
237244

238-
// Reset the streaming message for the next turn
239-
streamingMessage.value = { role: 'assistant', content: '', slices: [], tool_results: [] }
245+
// Reset the streaming message for the next turn
246+
streamingMessage.value = { role: 'assistant', content: '', slices: [], tool_results: [] }
240247

241-
// Instruct the TTS pipeline to flush by calling hooks directly
242-
const flushSignal = `${TTS_FLUSH_INSTRUCTION}${TTS_FLUSH_INSTRUCTION}`
243-
for (const hook of onTokenLiteralHooks.value)
244-
await hook(flushSignal)
248+
// Instruct the TTS pipeline to flush by calling hooks directly
249+
const flushSignal = `${TTS_FLUSH_INSTRUCTION}${TTS_FLUSH_INSTRUCTION}`
250+
for (const hook of onTokenLiteralHooks.value)
251+
await hook(flushSignal)
245252

246-
// Call the end-of-stream hooks
247-
for (const hook of onStreamEndHooks.value)
248-
await hook()
253+
// Call the end-of-stream hooks
254+
for (const hook of onStreamEndHooks.value)
255+
await hook()
249256

250-
// Call the end-of-response hooks with the full text
251-
for (const hook of onAssistantResponseEndHooks.value)
252-
await hook(fullText)
257+
// Call the end-of-response hooks with the full text
258+
for (const hook of onAssistantResponseEndHooks.value)
259+
await hook(fullText)
253260

254-
// eslint-disable-next-line no-console
255-
console.debug('LLM output:', fullText)
256-
}
257-
},
258-
})
261+
// eslint-disable-next-line no-console
262+
console.debug('LLM output:', fullText)
259263

260264
for (const hook of onAfterSendHooks.value) {
261265
await hook(sendingMessage)

packages/stage-ui/src/stores/llm.ts

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import type { ChatProvider } from '@xsai-ext/shared-providers'
22
import type { CommonContentPart, CompletionToolCall, Message } from '@xsai/shared-chat'
33

4-
import { readableStreamToAsyncIterator } from '@moeru/std'
54
import { listModels } from '@xsai/model'
65
import { XSAIError } from '@xsai/shared'
76
import { streamText } from '@xsai/stream-text'
@@ -24,40 +23,66 @@ export interface StreamOptions {
2423
supportsTools?: boolean
2524
}
2625

26+
// TODO: proper format for other error messages.
27+
function sanitizeMessages(messages: unknown[]): Message[] {
28+
return messages.map((m: any) => {
29+
if (m && m.role === 'error') {
30+
return {
31+
role: 'user',
32+
content: `User encountered error: ${String(m.content ?? '')}`,
33+
} as Message
34+
}
35+
return m as Message
36+
})
37+
}
38+
2739
function streamOptionsToolsCompatibilityOk(model: string, chatProvider: ChatProvider, _: Message[], options?: StreamOptions, toolsCompatibility: Map<string, boolean> = new Map()): boolean {
2840
return !!(options?.supportsTools || toolsCompatibility.get(`${chatProvider.chat(model).baseURL}-${model}`))
2941
}
3042

3143
async function streamFrom(model: string, chatProvider: ChatProvider, messages: Message[], options?: StreamOptions) {
3244
const headers = options?.headers
3345

34-
return await streamText({
35-
...chatProvider.chat(model),
36-
maxSteps: 10,
37-
// TODO: proper format for other error messages.
38-
messages: messages.map(msg => ({ ...msg, content: (msg.role as string === 'error' ? `User encountered error: ${msg.content}` : msg.content), role: (msg.role as string === 'error' ? 'user' : msg.role) } as Message)),
39-
headers,
40-
// TODO: we need Automatic tools discovery
41-
tools: streamOptionsToolsCompatibilityOk(model, chatProvider, messages, options)
42-
? [
43-
...await mcp(),
44-
...await debug(),
45-
]
46-
: undefined,
47-
onEvent(event) {
48-
options?.onStreamEvent?.(event as StreamEvent)
49-
},
46+
const sanitized = sanitizeMessages(messages as unknown[])
47+
48+
return new Promise<void>(async (resolve, reject) => {
49+
try {
50+
await streamText({
51+
...chatProvider.chat(model),
52+
maxSteps: 10,
53+
messages: sanitized,
54+
headers,
55+
// TODO: we need Automatic tools discovery
56+
tools: streamOptionsToolsCompatibilityOk(model, chatProvider, messages, options)
57+
? [
58+
...await mcp(),
59+
...await debug(),
60+
]
61+
: undefined,
62+
async onEvent(event) {
63+
try {
64+
await options?.onStreamEvent?.(event as StreamEvent)
65+
if (event.type === 'finish')
66+
resolve()
67+
else if (event.type === 'error')
68+
reject(event.error ?? new Error('Stream error'))
69+
}
70+
catch (err) {
71+
reject(err)
72+
}
73+
},
74+
})
75+
}
76+
catch (err) {
77+
reject(err)
78+
}
5079
})
5180
}
5281

5382
export async function attemptForToolsCompatibilityDiscovery(model: string, chatProvider: ChatProvider, _: Message[], options?: Omit<StreamOptions, 'supportsTools'>): Promise<boolean> {
5483
async function attempt(enable: boolean) {
5584
try {
56-
const res = await streamFrom(model, chatProvider, [{ role: 'user', content: 'Hello, world!' }], { ...options, supportsTools: enable })
57-
for await (const _ of readableStreamToAsyncIterator(res.textStream)) {
58-
// Drop
59-
}
60-
85+
await streamFrom(model, chatProvider, [{ role: 'user', content: 'Hello, world!' }], { ...options, supportsTools: enable })
6186
return true
6287
}
6388
catch (err) {

0 commit comments

Comments
 (0)