Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 166 additions & 93 deletions .github/CONTRIBUTING.md

Large diffs are not rendered by default.

144 changes: 143 additions & 1 deletion apps/sim/lib/copilot/request/go/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ import {
MothershipStreamV1ToolOutcome,
MothershipStreamV1ToolPhase,
} from '@/lib/copilot/generated/mothership-stream-v1'

vi.mock('@/lib/copilot/request/session', async () => {
const actual = await vi.importActual<typeof import('@/lib/copilot/request/session')>(
'@/lib/copilot/request/session'
)
return {
...actual,
hasAbortMarker: vi.fn().mockResolvedValue(false),
}
})

import {
buildPreviewContentUpdate,
decodeJsonStringPrefix,
extractEditContent,
runStreamLoop,
} from '@/lib/copilot/request/go/stream'
import { createEvent } from '@/lib/copilot/request/session'
import { AbortReason, createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace'
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types'

Expand Down Expand Up @@ -285,6 +296,137 @@ describe('copilot go stream helpers', () => {
).toBe(true)
})

it('reclassifies as aborted when the body closes without terminal but the abort marker is set', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})

vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)

const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}

await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
})

expect(hasAbortMarker).toHaveBeenCalledWith(context.messageId)
expect(context.wasAborted).toBe(true)
expect(
context.errors.some((message) =>
message.includes('Copilot backend stream ended before a terminal event')
)
).toBe(false)
})

it('invokes onAbortObserved with MarkerObservedAtBodyClose when reclassifying via the abort marker', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})

vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)

const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
const onAbortObserved = vi.fn()

await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
onAbortObserved,
})

expect(onAbortObserved).toHaveBeenCalledTimes(1)
expect(onAbortObserved).toHaveBeenCalledWith(AbortReason.MarkerObservedAtBodyClose)
expect(context.wasAborted).toBe(true)
})

it('does not invoke onAbortObserved when no abort marker is present at body close', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})

vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(false)

const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
const onAbortObserved = vi.fn()

await expect(
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
onAbortObserved,
})
).rejects.toThrow('Copilot backend stream ended before a terminal event')

expect(onAbortObserved).not.toHaveBeenCalled()
})

it('still fails closed when the body closes without terminal and the abort marker check throws', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})

vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockRejectedValueOnce(new Error('redis unavailable'))

const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}

await expect(
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
})
).rejects.toThrow('Copilot backend stream ended before a terminal event')
expect(context.wasAborted).toBe(false)
})

it('fails closed when the shared stream receives an invalid event', async () => {
vi.mocked(fetch).mockResolvedValueOnce(
createSseResponse([
Expand Down
38 changes: 28 additions & 10 deletions apps/sim/lib/copilot/request/go/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import {
} from '@/lib/copilot/request/handlers/types'
import { getCopilotTracer } from '@/lib/copilot/request/otel'
import {
AbortReason,
eventToStreamEvent,
hasAbortMarker,
isSubagentSpanStreamEvent,
parsePersistedStreamEventEnvelope,
} from '@/lib/copilot/request/session'
Expand Down Expand Up @@ -436,16 +438,32 @@ export async function runStreamLoop(
})

if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) {
const streamPath = new URL(fetchUrl).pathname
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
context.errors.push(message)
logger.error('Copilot backend stream ended before a terminal event', {
path: streamPath,
requestId: context.requestId,
messageId: context.messageId,
})
endedOn = CopilotSseCloseReason.ClosedNoTerminal
throw new CopilotBackendError(message, { status: 503 })
let abortRequested = false
try {
abortRequested = await hasAbortMarker(context.messageId)
} catch (error) {
logger.warn('Failed to read abort marker at body close', {
streamId: context.messageId,
error: error instanceof Error ? error.message : String(error),
})
}

if (abortRequested) {
options.onAbortObserved?.(AbortReason.MarkerObservedAtBodyClose)
context.wasAborted = true
endedOn = CopilotSseCloseReason.Aborted
} else {
const streamPath = new URL(fetchUrl).pathname
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
context.errors.push(message)
logger.error('Copilot backend stream ended before a terminal event', {
path: streamPath,
requestId: context.requestId,
messageId: context.messageId,
})
endedOn = CopilotSseCloseReason.ClosedNoTerminal
throw new CopilotBackendError(message, { status: 503 })
}
}
} catch (error) {
if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) {
Expand Down
8 changes: 4 additions & 4 deletions apps/sim/lib/copilot/request/lifecycle/headless.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ export async function runHeadlessCopilotLifecycle(
simRequestId,
otelContext,
})
outcome = options.abortSignal?.aborted
? RequestTraceV1Outcome.cancelled
: result.success
? RequestTraceV1Outcome.success
outcome = result.success
? RequestTraceV1Outcome.success
: options.abortSignal?.aborted || result.cancelled
? RequestTraceV1Outcome.cancelled
: RequestTraceV1Outcome.error
return result
} catch (error) {
Expand Down
46 changes: 45 additions & 1 deletion apps/sim/lib/copilot/request/lifecycle/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import { propagation, trace } from '@opentelemetry/api'
import { W3CTraceContextPropagator } from '@opentelemetry/core'
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { MothershipStreamV1EventType } from '@/lib/copilot/generated/mothership-stream-v1'
import {
MothershipStreamV1CompletionStatus,
MothershipStreamV1EventType,
} from '@/lib/copilot/generated/mothership-stream-v1'

const {
runCopilotLifecycle,
Expand Down Expand Up @@ -60,6 +63,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
registerActiveStream: vi.fn(),
unregisterActiveStream: vi.fn(),
startAbortPoller: vi.fn().mockReturnValue(setInterval(() => {}, 999999)),
isExplicitStopReason: vi.fn().mockReturnValue(false),
SSE_RESPONSE_HEADERS: {},
StreamWriter: vi.fn().mockImplementation(() => ({
attach: vi.fn().mockImplementation((ctrl: ReadableStreamDefaultController) => {
Expand Down Expand Up @@ -211,6 +215,46 @@ describe('createSSEStream terminal error handling', () => {
expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1')
})

it('publishes a cancelled completion (not an error) when the orchestrator reports cancelled without abortSignal aborted', async () => {
runCopilotLifecycle.mockResolvedValue({
success: false,
cancelled: true,
content: '',
contentBlocks: [],
toolCalls: [],
})

const stream = createSSEStream({
requestPayload: { message: 'hello' },
userId: 'user-1',
streamId: 'stream-1',
executionId: 'exec-1',
runId: 'run-1',
currentChat: null,
isNewChat: false,
message: 'hello',
titleModel: 'gpt-5.4',
requestId: 'req-cancelled',
orchestrateOptions: {},
})

await drainStream(stream)

expect(appendEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
type: MothershipStreamV1EventType.error,
})
)
expect(appendEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: MothershipStreamV1EventType.complete,
payload: expect.objectContaining({
status: MothershipStreamV1CompletionStatus.cancelled,
}),
})
)
})

it('passes an OTel context into the streaming lifecycle', async () => {
let lifecycleTraceparent = ''
runCopilotLifecycle.mockImplementation(async (_payload, options) => {
Expand Down
7 changes: 6 additions & 1 deletion apps/sim/lib/copilot/request/lifecycle/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
onEvent: async (event) => {
await publisher.publish(event)
},
onAbortObserved: (reason) => {
if (!abortController.signal.aborted) {
abortController.abort(reason)
}
},
})

lifecycleResult = result
Expand All @@ -266,7 +271,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
// 3. Otherwise → error.
outcome = result.success
? RequestTraceV1Outcome.success
: abortController.signal.aborted || publisher.clientDisconnected
: result.cancelled || abortController.signal.aborted || publisher.clientDisconnected
? RequestTraceV1Outcome.cancelled
: RequestTraceV1Outcome.error
if (outcome === RequestTraceV1Outcome.cancelled) {
Expand Down
12 changes: 11 additions & 1 deletion apps/sim/lib/copilot/request/session/abort-reason.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export const AbortReason = {
* that the node that DID receive it wrote, and aborts on the poll.
*/
RedisPoller: 'redis_abort_marker:poller',
/**
* Cross-process stop: same root cause as `RedisPoller`, but observed
* by `runStreamLoop` at body close (the Go body ended before the
* 250ms poller's next tick) rather than by the polling timer.
*/
MarkerObservedAtBodyClose: 'redis_abort_marker:body_close',
/** Internal timeout on the outbound explicit-abort fetch to Go. */
ExplicitAbortFetchTimeout: 'timeout:go_explicit_abort_fetch',
} as const
Expand All @@ -38,5 +44,9 @@ export type AbortReasonValue = (typeof AbortReason)[keyof typeof AbortReason]
* stops, mirroring `requestctx.IsExplicitUserStop` on the Go side.
*/
export function isExplicitStopReason(reason: unknown): boolean {
return reason === AbortReason.UserStop || reason === AbortReason.RedisPoller
return (
reason === AbortReason.UserStop ||
reason === AbortReason.RedisPoller ||
reason === AbortReason.MarkerObservedAtBodyClose
)
}
41 changes: 41 additions & 0 deletions apps/sim/lib/copilot/request/session/abort.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,47 @@ describe('startAbortPoller heartbeat', () => {
}
})

it('aborts the controller before clearing the marker so the marker is never observable as cleared while the signal is still unaborted', async () => {
const controller = new AbortController()
const streamId = 'stream-order-1'

let signalAbortedWhenMarkerCleared: boolean | null = null
mockClearAbortMarker.mockImplementationOnce(async () => {
signalAbortedWhenMarkerCleared = controller.signal.aborted
})
mockHasAbortMarker.mockResolvedValueOnce(true)

const interval = startAbortPoller(streamId, controller, {})

try {
await vi.advanceTimersByTimeAsync(300)

expect(mockClearAbortMarker).toHaveBeenCalledWith(streamId)
expect(signalAbortedWhenMarkerCleared).toBe(true)
expect(controller.signal.aborted).toBe(true)
} finally {
clearInterval(interval)
}
})

it('does not clear the marker when the signal is already aborted (no double abort)', async () => {
const controller = new AbortController()
controller.abort('preexisting')
const streamId = 'stream-order-2'

mockHasAbortMarker.mockResolvedValueOnce(true)

const interval = startAbortPoller(streamId, controller, {})

try {
await vi.advanceTimersByTimeAsync(300)

expect(mockClearAbortMarker).not.toHaveBeenCalled()
} finally {
clearInterval(interval)
}
})

it('stops heartbeating after ownership is lost', async () => {
const controller = new AbortController()
const streamId = 'stream-lost'
Expand Down
Loading
Loading