Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 160 additions & 45 deletions ts/packages/agents/code/src/codeActionHandler.ts
Comment thread
TalZaccai marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,31 @@ import {
} from "@typeagent/agent-sdk/helpers/action";
import {
evaluateCodeReadiness,
resolveCodePort,
resolveCodePortOverride,
setupCode,
whichExists,
} from "./readiness.js";

const debug = registerDebug("typeagent:code");

// Shared WebSocket server that bridges this code agent to the Coda VS Code
// extension (ts/packages/coda) on port 8082. Created on first enable, closed
// when the last session disables the code agent. Storing it per-session caused
// "No websocket connection" errors when an action ran on a session different
// from the one that originally created the server (e.g. after schema enable on
// a different conversation), and also masked EADDRINUSE failures from a second
// bind attempt on port 8082.
// extension (ts/packages/coda). Created on first session-enable, closed when
// the last session disables. Storing it per-session caused "No websocket
// connection" errors when an action ran on a session different from the one
// that originally created the server (e.g. after schema enable on a different
// conversation), and also masked EADDRINUSE failures from a second bind
// attempt on the configured port.
//
// Port allocation: by default the OS picks a free ephemeral port (port=0).
// Each session that uses the shared server registers it under its own
// `sessionContextId`, so the PortRegistrar's `closeSessionContext` backstop
// auto-releases per-session entries and `lookup("code")` keeps returning the
// shared port as long as ≥1 session has it enabled. `CODE_WEBSOCKET_PORT`
// remains an explicit override (useful for pinning the port when debugging
// or when an external client expects a known address).
let sharedWebSocketServer: CodeAgentWebSocketServer | undefined;
let sharedStartingPromise: Promise<CodeAgentWebSocketServer> | undefined;
let sharedClosingPromise: Promise<void> | undefined;
let sharedWebSocketRefCount = 0;
const sharedPendingCalls: Map<
number,
Expand Down Expand Up @@ -66,7 +76,7 @@ export function instantiate(): AppAgent {
actionContext,
ctx.choiceManager,
() => ctx.webSocketServer?.isConnected() === true,
resolveCodePort(process.env),
getKnownCodePort(),
);
},
handleChoice: async (choiceId, response, context) => {
Expand All @@ -90,6 +100,11 @@ type CodeActionContext = {
// Manages yes/no choice callbacks (currently only the setup-flow card).
// Hooked up via the AppAgent.handleChoice in instantiate() above.
choiceManager: ChoiceManager;
// Handle returned by sessionContext.registerPort, kept so we can release
// exactly this session's registration on disable. The
// closeSessionContext backstop will also release it if the disable path
// is skipped.
portRegistration?: { release: () => void };
};

async function initializeCodeContext(): Promise<CodeActionContext> {
Expand Down Expand Up @@ -126,10 +141,101 @@ async function checkCodeReadiness(
return evaluateCodeReadiness({
clientConnected,
vsCodeCliInstalled,
port: resolveCodePort(process.env),
port: getKnownCodePort(),
});
}

// Returns the port the code agent's WS server is/will be reachable on,
// for display in readiness/setup messaging. Two phases:
// - After bind: `getSharedCodePort()` returns the actual bound port
// (OS-assigned by default, or `CODE_WEBSOCKET_PORT` if set; either way,
// this is the authoritative answer).
// - Before bind: no live port exists, so we fall back to the static
// prediction from `CODE_WEBSOCKET_PORT` if set, else `undefined`
// (the UI shows "port unknown until bind").
function getKnownCodePort(): number | undefined {
return getSharedCodePort() ?? resolveCodePortOverride(process.env);
}

// Bind hint for the shared server. Returns the explicit override if
// CODE_WEBSOCKET_PORT is set (useful for pinning the port when debugging);
// otherwise 0 so the OS picks a free port and the registrar/discovery
// channel publishes it.
//
// Note: we only validate the *shape* of the env var here (numeric, >= 0).
// If the caller asks for a specific port and the OS can't bind it
// (EADDRINUSE), `CodeAgentWebSocketServer.start()` rejects with that error
// and the schema-enable fails loudly — we deliberately do NOT silently
// fall back to an OS-assigned port, since the user explicitly asked for
// a specific one.
function getCodeBindPort(): number {
const raw = process.env["CODE_WEBSOCKET_PORT"];
if (raw === undefined) return 0;
const n = parseInt(raw, 10);
if (!Number.isFinite(n) || n < 0) {
debug(
`Ignoring malformed CODE_WEBSOCKET_PORT=${raw}; using OS-assigned port instead`,
);
return 0;
}
return n;
}

// Wire the shared server's onMessage handler. Module-scoped because the
// server itself is module-scoped — all sessions route their pending-call
// completions through the same handler.
function attachSharedOnMessage(server: CodeAgentWebSocketServer): void {
server.onMessage = (message: string) => {
try {
const data = JSON.parse(message) as WebSocketMessageV2;

if (data.id !== undefined && data.result !== undefined) {
const pendingCall = sharedPendingCalls.get(Number(data.id));

if (pendingCall) {
sharedPendingCalls.delete(Number(data.id));
const { resolve, context } = pendingCall;
if (context?.actionIO) {
context.actionIO.setDisplay(data.result);
}
resolve();
}
}
} catch (error) {
debug("Error parsing WebSocket message:", error);
}
};
}

// Start (or attach to an in-flight start of) the shared WebSocket server.
// Concurrent enables from different sessions can race; serialize via
// sharedStartingPromise so only one bind attempt is in flight.
async function ensureSharedServer(): Promise<CodeAgentWebSocketServer> {
// If a previous teardown is still releasing the port (matters under
// CODE_WEBSOCKET_PORT override), await it before binding again.
if (sharedClosingPromise !== undefined) {
await sharedClosingPromise;
}
if (sharedWebSocketServer !== undefined) {
return sharedWebSocketServer;
}
if (sharedStartingPromise !== undefined) {
return sharedStartingPromise;
}
sharedStartingPromise = (async () => {
try {
const server =
await CodeAgentWebSocketServer.start(getCodeBindPort());
attachSharedOnMessage(server);
sharedWebSocketServer = server;
return server;
} finally {
sharedStartingPromise = undefined;
}
})();
return sharedStartingPromise;
}

async function updateCodeContext(
enable: boolean,
context: SessionContext<CodeActionContext>,
Expand All @@ -140,60 +246,69 @@ async function updateCodeContext(
if (agentContext.enabled.has(schemaName)) {
return;
}
if (agentContext.enabled.size === 0) {
sharedWebSocketRefCount++;
}
const isFirstSchemaForSession = agentContext.enabled.size === 0;
agentContext.enabled.add(schemaName);

if (!sharedWebSocketServer) {
// TODO: stop hardcoding the port. The dispatcher should hand each
// agent a free port at initialize time so multiple TypeAgent
// installs / sessions on one host can't collide on 8082.
const port = parseInt(process.env["CODE_WEBSOCKET_PORT"] || "8082");
sharedWebSocketServer = new CodeAgentWebSocketServer(port);

sharedWebSocketServer.onMessage = (message: string) => {
try {
const data = JSON.parse(message) as WebSocketMessageV2;

if (data.id !== undefined && data.result !== undefined) {
const pendingCall = sharedPendingCalls.get(
Number(data.id),
);

if (pendingCall) {
sharedPendingCalls.delete(Number(data.id));
const { resolve, context } = pendingCall;
if (context?.actionIO) {
context.actionIO.setDisplay(data.result);
}
resolve();
}
}
} catch (error) {
debug("Error parsing WebSocket message:", error);
}
};
try {
const server = await ensureSharedServer();
agentContext.webSocketServer = server;
agentContext.pendingCall = sharedPendingCalls;
if (isFirstSchemaForSession) {
// Per-session registration: the registrar allows multiple
// entries for `(code, default)` across sessions and lookup
// returns the most recent, so each active session
// independently keeps the shared port discoverable. The
// backstop in closeSessionContext releases ours if disable
// is skipped.
agentContext.portRegistration = context.registerPort(
"default",
server.port,
);
sharedWebSocketRefCount++;
}
} catch (e) {
// Roll back the per-session schema bookkeeping so a subsequent
// retry sees a clean slate. Don't touch shared module state —
// the bind itself failed, so we never incremented the refcount
// or registered.
agentContext.enabled.delete(schemaName);
throw e;
}
agentContext.webSocketServer = sharedWebSocketServer;
agentContext.pendingCall = sharedPendingCalls;
} else {
if (!agentContext.enabled.has(schemaName)) {
return;
}
agentContext.enabled.delete(schemaName);
if (agentContext.enabled.size === 0) {
agentContext.webSocketServer = undefined;
// Release this session's registration before potentially closing
// the server. Release is idempotent and a no-op if already
// released by the backstop.
agentContext.portRegistration?.release();
delete agentContext.portRegistration;

sharedWebSocketRefCount = Math.max(0, sharedWebSocketRefCount - 1);
if (sharedWebSocketRefCount === 0 && sharedWebSocketServer) {
sharedWebSocketServer.close();
const server = sharedWebSocketServer;
sharedWebSocketServer = undefined;
sharedPendingCalls.clear();
// Track the in-flight close so a rapid re-enable awaits
// port release under a fixed-port override.
sharedClosingPromise = server.close().finally(() => {
sharedClosingPromise = undefined;
});
await sharedClosingPromise;
}
}
}
}

// Exposed for readiness/setup messaging — undefined when the shared server
// isn't bound yet, otherwise the actual bound port. Lets readiness messages
// always reflect the real listener.
export function getSharedCodePort(): number | undefined {
return sharedWebSocketServer?.port;
}

function getVSCodeStoragePath(): string {
const platform = os.platform();
if (platform === "darwin")
Expand Down
85 changes: 74 additions & 11 deletions ts/packages/agents/code/src/codeAgentWebSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,81 @@
// Licensed under the MIT License.

import { WebSocketServer, WebSocket } from "ws";
import { AddressInfo } from "net";
import registerDebug from "debug";

const debug = registerDebug("typeagent:code:websocket");

export class CodeAgentWebSocketServer {
private server: WebSocketServer;
private clients: Map<string, WebSocket> = new Map();
private clientIdCounter = 0;
public onMessage?: (message: string) => void;

constructor(port: number = 8082) {
this.server = new WebSocketServer({ port });
/**
* @param server the underlying ws server, already bound and listening.
* @param port the actually bound port (OS-assigned when the caller
* passed 0).
*
* Construction is private — use {@link CodeAgentWebSocketServer.start}
* so callers always get a server that is guaranteed to be bound
* before they read {@link port} or pass it to the registrar.
*/
private constructor(
private readonly server: WebSocketServer,
public readonly port: number,
) {
this.setupHandlers();
debug(`CodeAgentWebSocketServer listening on port ${port}`);
}

this.server.on("error", (error) => {
debug("Server error:", error);
/**
* Bind a new server on `port`. Resolves only after the
* `listening` event so callers can synchronously read
* {@link port}; rejects on the first `error` event so bind
* failures (EADDRINUSE under fixed-port overrides) surface
* loudly instead of being swallowed by an attached error
* handler.
*
* Pass `0` to let the OS pick a free ephemeral port; the
* actual port is then available via {@link port}.
*/
public static start(port: number = 0): Promise<CodeAgentWebSocketServer> {
return new Promise((resolve, reject) => {
const server = new WebSocketServer({ port });
let settled = false;
const onError = (error: Error) => {
if (settled) {
debug("Server error after listening:", error);
return;
}
settled = true;
server.removeListener("listening", onListening);
debug("Server bind error:", error);
reject(error);
};
const onListening = () => {
if (settled) return;
settled = true;
server.removeListener("error", onError);
const address = server.address() as AddressInfo | null;
if (!address || typeof address === "string") {
server.close();
reject(
new Error(
"ws server.address() did not return an AddressInfo",
),
);
return;
}
// Re-attach a permanent error handler so post-listen errors
// are logged rather than crashing the process.
server.on("error", (error) => {
debug("Server error:", error);
});
resolve(new CodeAgentWebSocketServer(server, address.port));
};
server.once("error", onError);
server.once("listening", onListening);
});
}

Expand Down Expand Up @@ -48,10 +106,6 @@ export class CodeAgentWebSocketServer {
this.clients.delete(clientId);
});
});

this.server.on("error", (error) => {
debug("Server error:", error);
});
}

public broadcast(message: string): number {
Expand Down Expand Up @@ -117,14 +171,23 @@ export class CodeAgentWebSocketServer {
return states;
}

public close(): void {
/**
* Close all client connections and the underlying server.
* Resolves when the server has fully released its port — important
* for a rapid disable→enable cycle under a fixed-port override
* (`CODE_WEBSOCKET_PORT`), where a synchronous return would race
* the new bind into EADDRINUSE.
*/
public close(): Promise<void> {
debug("Closing CodeAgentWebSocketServer");
for (const [, client] of this.clients.entries()) {
if (client.readyState === WebSocket.OPEN) {
client.close();
}
}
this.clients.clear();
this.server.close();
return new Promise((resolve) => {
this.server.close(() => resolve());
});
}
}
Loading
Loading