Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions scripts/task-launch-e2e-config.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
const DEFAULT_API_BASE_URL_BY_PROTOCOL = {
anthropic: "https://api.anthropic.com",
openai: "https://api.openai.com/v1",
azure: "",
google: "https://generativelanguage.googleapis.com",
ollama: "https://ollama.com",
senseaudio: "https://api.senseaudio.cn",
};

const DEFAULT_API_MODEL_BY_PROTOCOL = {
anthropic: "claude-sonnet-4-5",
openai: "gpt-4o",
azure: "gpt-4o",
google: "gemini-2.0-flash",
ollama: "gemma3:4b",
senseaudio: "senseaudio-s2",
};

function nonEmpty(value) {
return typeof value === "string" && value.trim() ? value.trim() : null;
}

function parseMaxTokens(value) {
const parsed = Number(nonEmpty(value));
return Number.isInteger(parsed) && parsed > 0 ? parsed : 8192;
}

function parseJsonObject(value) {
const raw = nonEmpty(value);
if (!raw) return null;
try {
const parsed = JSON.parse(raw);
return parsed && typeof parsed === "object" && !Array.isArray(parsed)
? parsed
: null;
} catch {
return null;
}
}

function readByokApiKey(env) {
const direct = nonEmpty(env.DEVLOG_E2E_API_KEY);
if (direct) return direct;

const envVarName =
nonEmpty(env.DEVLOG_E2E_API_KEY_ENV_VAR) ??
nonEmpty(env.DEVLOG_E2E_AGENT_API_KEY_ENV_VAR) ??
"ANTHROPIC_API_KEY";
return nonEmpty(env[envVarName]);
}

export function buildTaskLaunchRuntimePayload(env = process.env) {
const mode = nonEmpty(env.DEVLOG_E2E_AUTH_MODE) ?? "local-cli";
const model = nonEmpty(env.DEVLOG_E2E_AGENT_MODEL);

if (mode === "agent-api-key") {
return {
session_auth_mode: "agent-api-key",
agent_api_key_env_var:
nonEmpty(env.DEVLOG_E2E_AGENT_API_KEY_ENV_VAR) ?? "ANTHROPIC_API_KEY",
agent_model: model ?? "claude-sonnet-4-6",
};
}

if (mode === "anthropic-api-key") {
const protocol = nonEmpty(env.DEVLOG_E2E_API_PROTOCOL) ?? "anthropic";
const apiKey = readByokApiKey(env);
return {
session_auth_mode: "anthropic-api-key",
agent_api_protocol: protocol,
agent_model:
model ??
DEFAULT_API_MODEL_BY_PROTOCOL[protocol] ??
"claude-sonnet-4-5",
agent_base_url:
nonEmpty(env.DEVLOG_E2E_AGENT_BASE_URL) ??
DEFAULT_API_BASE_URL_BY_PROTOCOL[protocol] ??
"",
agent_api_version: nonEmpty(env.DEVLOG_E2E_AGENT_API_VERSION) ?? "",
agent_max_tokens: parseMaxTokens(env.DEVLOG_E2E_AGENT_MAX_TOKENS),
...(apiKey ? { anthropic_api_key: apiKey } : {}),
};
}

const localCliEnv = parseJsonObject(env.DEVLOG_E2E_LOCAL_CLI_ENV_JSON);
return {
session_auth_mode: "local-cli",
local_cli_agent_id:
nonEmpty(env.DEVLOG_E2E_LOCAL_CLI_AGENT_ID) ??
nonEmpty(env.DEVLOG_E2E_LOCAL_CLI_AGENT) ??
"claude",
agent_model: model ?? "default",
agent_reasoning: nonEmpty(env.DEVLOG_E2E_AGENT_REASONING) ?? "medium",
...(localCliEnv ? { local_cli_agent_env: localCliEnv } : {}),
};
}

export function describeTaskLaunchRuntimePayload(payload) {
if (payload.session_auth_mode === "anthropic-api-key") {
return [
"mode=anthropic-api-key",
`protocol=${payload.agent_api_protocol}`,
`model=${payload.agent_model}`,
`baseUrl=${payload.agent_base_url}`,
`key=${payload.anthropic_api_key ? "provided" : "missing"}`,
].join(" ");
}

if (payload.session_auth_mode === "agent-api-key") {
return [
"mode=agent-api-key",
`keyEnv=${payload.agent_api_key_env_var ? "configured" : "missing"}`,
`model=${payload.agent_model}`,
].join(" ");
}

return [
"mode=local-cli",
`agent=${payload.local_cli_agent_id}`,
`model=${payload.agent_model}`,
`reasoning=${payload.agent_reasoning}`,
].join(" ");
}
17 changes: 10 additions & 7 deletions scripts/task-launch-e2e.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#!/usr/bin/env node

import {
buildTaskLaunchRuntimePayload,
describeTaskLaunchRuntimePayload,
} from "./task-launch-e2e-config.mjs";

const baseUrl = (process.env.DEVLOG_E2E_BASE_URL ?? "http://localhost:3000")
.replace(/\/$/, "");
const projectId = process.env.DEVLOG_E2E_PROJECT_ID ?? "devlog";
Expand All @@ -8,10 +13,7 @@ const codingAgentId =
process.env.DEVLOG_E2E_CODING_AGENT_ID ?? "general-coding-agent";
const agentTeamId =
process.env.DEVLOG_E2E_AGENT_TEAM_ID ?? "implementation-review-team";
const sessionAuthMode =
process.env.DEVLOG_E2E_AUTH_MODE ?? "backend-oauth";
const agentApiKeyEnvVar =
process.env.DEVLOG_E2E_AGENT_API_KEY_ENV_VAR ?? "ANTHROPIC_API_KEY";
const runtimePayload = buildTaskLaunchRuntimePayload(process.env);

const startedAt = Date.now();
const marker = `DEVLOG_E2E_${Date.now()}`;
Expand Down Expand Up @@ -92,6 +94,7 @@ async function watchSse(sessionId, evidence, signal) {

async function main() {
console.log(`DevLog task launch E2E against ${baseUrl} project=${projectId}`);
console.log(`runtime ${describeTaskLaunchRuntimePayload(runtimePayload)}`);
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

const task = await request("/api/tasks", {
method: "POST",
Expand All @@ -114,8 +117,7 @@ async function main() {
body: JSON.stringify({
coding_agent_id: codingAgentId,
agent_team_id: agentTeamId,
session_auth_mode: sessionAuthMode,
agent_api_key_env_var: agentApiKeyEnvVar,
...runtimePayload,
}),
});
const session = launched.session;
Expand Down Expand Up @@ -147,6 +149,7 @@ async function main() {
"Human follow-up E2E instruction.",
`Reply with exactly ${followupAck}.`,
].join(" "),
...runtimePayload,
}),
});
if (!("queue_length" in patched) || !("is_processing" in patched)) {
Expand All @@ -164,7 +167,7 @@ async function main() {
.join("\n");
if (/Failed to authenticate|API Error/i.test(assistantText)) {
throw new Error(
"Claude session authentication failed. Use backend OAuth on a logged-in Claude Code runtime or set DEVLOG_E2E_AUTH_MODE=agent-api-key with a backend env var such as ANTHROPIC_API_KEY.",
"Session authentication failed. Use DEVLOG_E2E_AUTH_MODE=local-cli with a usable local coding agent, DEVLOG_E2E_AUTH_MODE=anthropic-api-key with DEVLOG_E2E_API_KEY, or explicitly use DEVLOG_E2E_AUTH_MODE=agent-api-key with a backend env var such as ANTHROPIC_API_KEY.",
);
}
if (assistantText.includes(followupAck)) return rows;
Expand Down
151 changes: 108 additions & 43 deletions src/app/api/sessions/[id]/stream/route.ts
Original file line number Diff line number Diff line change
@@ -1,72 +1,137 @@
import { NextRequest } from "next/server";
import { getDb } from "@/core/db";
import { filterBufferedReplayDuplicates } from "@/core/session-stream-dedupe";
import { buildSessionStreamReplayEvents } from "@/core/session-stream-replay";
import { streamManager } from "@/core/stream-manager";
import type { ChatMessage } from "@/core/types-dashboard";
import { resolveProjectId } from "@/lib/api-utils";
import type { ChatStreamEvent } from "@/core/stream-manager";

export async function GET(
_req: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const projectId = resolveProjectId(_req);
const db = getDb();
const ownedSession = db
.prepare("SELECT 1 FROM sessions WHERE id = ? AND project_id = ? LIMIT 1")
.get(id, projectId);

if (!ownedSession) {
return new Response("Forbidden", { status: 403 });
}

const encoder = new TextEncoder();
let cleanupStream = () => {};
const stream = new ReadableStream({
start(controller) {
// Replay persisted messages so the frontend catches up
try {
const db = getDb();
const messages = db
.prepare(
"SELECT * FROM session_messages WHERE session_id = ? ORDER BY id ASC"
)
.all(id) as ChatMessage[];

for (const msg of messages) {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: "message", role: msg.role, content: msg.content })}\n\n`
)
);
}
} catch {
// DB might not be ready
}
const liveEventBuffer: ChatStreamEvent[] = [];
let bufferingLiveEvents = true;
let heartbeat: ReturnType<typeof setInterval> | undefined;
let unsubscribe = () => {};
let cleanedUp = false;
let abortHandler: (() => void) | undefined;

// Signal replay complete
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: "sync" })}\n\n`)
);

// Subscribe to live events
const unsubscribe = streamManager.subscribe(id, (event) => {
try {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(event)}\n\n`)
);
} catch {
unsubscribe();
const cleanup = () => {
if (cleanedUp) {
return;
}
});
cleanedUp = true;
if (heartbeat) {
clearInterval(heartbeat);
}
if (abortHandler) {
_req.signal.removeEventListener("abort", abortHandler);
}
unsubscribe();
};
cleanupStream = cleanup;

// Heartbeat every 15s
const heartbeat = setInterval(() => {
const safeEnqueue = (chunk: string) => {
try {
controller.enqueue(encoder.encode(`: heartbeat\n\n`));
controller.enqueue(encoder.encode(chunk));
return true;
} catch {
clearInterval(heartbeat);
unsubscribe();
cleanup();
return false;
}
}, 15000);
};

_req.signal.addEventListener("abort", () => {
clearInterval(heartbeat);
unsubscribe();
const enqueueEvent = (event: ChatStreamEvent) => {
return safeEnqueue(`data: ${JSON.stringify(event)}\n\n`);
};

abortHandler = () => {
cleanup();
try {
controller.close();
} catch {
// already closed
}
};
_req.signal.addEventListener("abort", abortHandler);

if (_req.signal.aborted) {
cleanup();
return;
}

// Subscribe before replay so live events are not lost during catch-up.
unsubscribe = streamManager.subscribe(id, (event) => {
if (bufferingLiveEvents) {
liveEventBuffer.push(event);
return;
}

enqueueEvent(event);
});

if (_req.signal.aborted) {
cleanup();
return;
}

// Replay persisted messages so the frontend catches up
let replayEvents: ChatStreamEvent[] = [];
try {
replayEvents = buildSessionStreamReplayEvents(db, id);

for (const event of replayEvents) {
if (!enqueueEvent(event)) {
return;
}
}
} catch (error) {
// DB might not be ready
console.warn("Failed to replay session stream events", {
sessionId: id,
error,
});
}

// Signal replay complete
if (!safeEnqueue(`data: ${JSON.stringify({ type: "sync" })}\n\n`)) {
return;
}

const eventsToDrain = filterBufferedReplayDuplicates(
replayEvents,
liveEventBuffer,
);
for (const event of eventsToDrain) {
if (!enqueueEvent(event)) {
return;
}
}
bufferingLiveEvents = false;

// Heartbeat every 15s
heartbeat = setInterval(() => {
safeEnqueue(": heartbeat\n\n");
}, 15000);
},
cancel() {
cleanupStream();
},
});

Expand Down
Loading
Loading