Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 73 additions & 57 deletions apps/server/src/git/Layers/OpenCodeTextGeneration.test.ts
Original file line number Diff line number Diff line change
@@ -1,88 +1,100 @@
import type { ChildProcess } from "node:child_process";

import * as NodeServices from "@effect/platform-node/NodeServices";
import { it } from "@effect/vitest";
import { Duration, Effect, Layer } from "effect";
import { TestClock } from "effect/testing";
import { beforeEach, expect, vi } from "vitest";
import { NetService } from "@t3tools/shared/Net";
import { beforeEach, expect } from "vitest";

import { ServerConfig } from "../../config.ts";
import { ServerSettingsService } from "../../serverSettings.ts";
import {
OpenCodeRuntime,
OpenCodeRuntimeError,
type OpenCodeRuntimeShape,
} from "../../provider/opencodeRuntime.ts";
import { TextGeneration } from "../Services/TextGeneration.ts";
import { OpenCodeTextGenerationLive } from "./OpenCodeTextGeneration.ts";

const runtimeMock = vi.hoisted(() => {
const state = {
const runtimeMock = {
state: {
startCalls: [] as string[],
promptUrls: [] as string[],
authHeaders: [] as Array<string | null>,
closeCalls: [] as string[],
promptResult: undefined as
| { data?: { info?: { error?: unknown }; parts?: Array<{ type: string; text?: string }> } }
| undefined,
};

return {
state,
reset() {
state.startCalls.length = 0;
state.promptUrls.length = 0;
state.authHeaders.length = 0;
state.closeCalls.length = 0;
state.promptResult = undefined;
},
};
});

vi.mock("../../provider/opencodeRuntime.ts", async () => {
const actual = await vi.importActual<typeof import("../../provider/opencodeRuntime.ts")>(
"../../provider/opencodeRuntime.ts",
);
},
reset() {
this.state.startCalls.length = 0;
this.state.promptUrls.length = 0;
this.state.authHeaders.length = 0;
this.state.closeCalls.length = 0;
this.state.promptResult = undefined;
},
};

return {
...actual,
startOpenCodeServerProcess: vi.fn(async ({ binaryPath }: { binaryPath: string }) => {
const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = {
startOpenCodeServerProcess: ({ binaryPath }) =>
Effect.gen(function* () {
const index = runtimeMock.state.startCalls.length + 1;
const url = `http://127.0.0.1:${4_300 + index}`;
runtimeMock.state.startCalls.push(binaryPath);
// The production runtime binds server lifetime to the caller's scope.
// Mirror that here so the closeCalls probe observes scope close.
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
runtimeMock.state.closeCalls.push(url);
}),
);
return {
url,
process: {} as ChildProcess,
close: () => {
runtimeMock.state.closeCalls.push(url);
},
exitCode: Effect.never,
};
}),
createOpenCodeSdkClient: vi.fn(
({ baseUrl, serverPassword }: { baseUrl: string; serverPassword?: string }) => ({
session: {
create: vi.fn(async () => ({ data: { id: `${baseUrl}/session` } })),
prompt: vi.fn(async () => {
runtimeMock.state.promptUrls.push(baseUrl);
runtimeMock.state.authHeaders.push(
serverPassword ? `Basic ${btoa(`opencode:${serverPassword}`)}` : null,
);
return (
runtimeMock.state.promptResult ?? {
data: {
parts: [
{
type: "text",
text: JSON.stringify({
subject: "Improve OpenCode reuse",
body: "Reuse one server for the full action.",
}),
},
],
},
}
);
}),
connectToOpenCodeServer: ({ serverUrl }) =>
Effect.succeed({
url: serverUrl ?? "http://127.0.0.1:4301",
exitCode: null,
external: Boolean(serverUrl),
}),
runOpenCodeCommand: () => Effect.succeed({ stdout: "", stderr: "", code: 0 }),
createOpenCodeSdkClient: ({ baseUrl, serverPassword }) =>
({
session: {
create: async () => ({ data: { id: `${baseUrl}/session` } }),
prompt: async () => {
runtimeMock.state.promptUrls.push(baseUrl);
runtimeMock.state.authHeaders.push(
serverPassword ? `Basic ${btoa(`opencode:${serverPassword}`)}` : null,
);
return (
runtimeMock.state.promptResult ?? {
data: {
parts: [
{
type: "text",
text: JSON.stringify({
subject: "Improve OpenCode reuse",
body: "Reuse one server for the full action.",
}),
},
],
},
}
);
},
},
}) as unknown as ReturnType<OpenCodeRuntimeShape["createOpenCodeSdkClient"]>,
loadOpenCodeInventory: () =>
Effect.fail(
new OpenCodeRuntimeError({
operation: "loadOpenCodeInventory",
detail: "OpenCodeRuntimeTestDouble.loadOpenCodeInventory not used in this test",
cause: null,
}),
),
};
});
};

const DEFAULT_TEST_MODEL_SELECTION = {
provider: "opencode" as const,
Expand All @@ -92,6 +104,7 @@ const DEFAULT_TEST_MODEL_SELECTION = {
const OPENCODE_TEXT_GENERATION_IDLE_TTL_MS = 30_000;

const OpenCodeTextGenerationTestLayer = OpenCodeTextGenerationLive.pipe(
Layer.provideMerge(Layer.succeed(OpenCodeRuntime, OpenCodeRuntimeTestDouble)),
Layer.provideMerge(
ServerSettingsService.layerTest({
providers: {
Expand All @@ -106,10 +119,12 @@ const OpenCodeTextGenerationTestLayer = OpenCodeTextGenerationLive.pipe(
prefix: "t3code-opencode-text-generation-test-",
}),
),
Layer.provideMerge(NetService.layer),
Layer.provideMerge(NodeServices.layer),
);

const OpenCodeTextGenerationExistingServerTestLayer = OpenCodeTextGenerationLive.pipe(
Layer.provideMerge(Layer.succeed(OpenCodeRuntime, OpenCodeRuntimeTestDouble)),
Layer.provideMerge(
ServerSettingsService.layerTest({
providers: {
Expand All @@ -126,6 +141,7 @@ const OpenCodeTextGenerationExistingServerTestLayer = OpenCodeTextGenerationLive
prefix: "t3code-opencode-text-generation-existing-server-test-",
}),
),
Layer.provideMerge(NetService.layer),
Layer.provideMerge(NodeServices.layer),
);

Expand Down
113 changes: 77 additions & 36 deletions apps/server/src/git/Layers/OpenCodeTextGeneration.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Duration, Effect, Exit, Fiber, Layer, Schema, Scope } from "effect";
import { Effect, Exit, Fiber, Layer, Schema, Scope } from "effect";
import * as Semaphore from "effect/Semaphore";

import {
Expand All @@ -25,15 +25,15 @@ import {
sanitizeThreadTitle,
} from "../Utils.ts";
import {
createOpenCodeSdkClient,
OpenCodeRuntime,
type OpenCodeServerConnection,
type OpenCodeServerProcess,
openCodeRuntimeErrorDetail,
parseOpenCodeModelSlug,
startOpenCodeServerProcess,
toOpenCodeFileParts,
} from "../../provider/opencodeRuntime.ts";

const OPENCODE_TEXT_GENERATION_IDLE_TTL_MS = 30_000;
const OPENCODE_TEXT_GENERATION_IDLE_TTL = "30 seconds";

function getOpenCodePromptErrorMessage(error: unknown): string | null {
if (!error || typeof error !== "object") {
Expand Down Expand Up @@ -80,6 +80,13 @@ function getOpenCodeTextResponse(parts: ReadonlyArray<unknown> | undefined): str

interface SharedOpenCodeTextGenerationServerState {
server: OpenCodeServerProcess | null;
/**
* The scope that owns the shared server's lifetime. Closing this scope
* terminates the OpenCode child process and interrupts any fibers the
* runtime forked during startup. We don't hold a `close()` function on
* the server handle anymore — the scope is the only lifecycle handle.
*/
serverScope: Scope.Closeable | null;
binaryPath: string | null;
activeRequests: number;
idleCloseFiber: Fiber.Fiber<void, never> | null;
Expand All @@ -88,24 +95,28 @@ interface SharedOpenCodeTextGenerationServerState {
const makeOpenCodeTextGeneration = Effect.gen(function* () {
const serverConfig = yield* ServerConfig;
const serverSettingsService = yield* ServerSettingsService;
const openCodeRuntime = yield* OpenCodeRuntime;
const idleFiberScope = yield* Effect.acquireRelease(Scope.make(), (scope) =>
Scope.close(scope, Exit.void),
);
const sharedServerMutex = yield* Semaphore.make(1);
const sharedServerState: SharedOpenCodeTextGenerationServerState = {
server: null,
serverScope: null,
binaryPath: null,
activeRequests: 0,
idleCloseFiber: null,
};

const closeSharedServer = (server: OpenCodeServerProcess) => {
if (sharedServerState.server === server) {
sharedServerState.server = null;
sharedServerState.binaryPath = null;
const closeSharedServer = Effect.fn("closeSharedServer")(function* () {
const scope = sharedServerState.serverScope;
sharedServerState.server = null;
sharedServerState.serverScope = null;
sharedServerState.binaryPath = null;
if (scope !== null) {
yield* Scope.close(scope, Exit.void).pipe(Effect.ignore);
}
server.close();
};
});

const cancelIdleCloseFiber = Effect.fn("cancelIdleCloseFiber")(function* () {
const idleCloseFiber = sharedServerState.idleCloseFiber;
Expand All @@ -119,15 +130,15 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
server: OpenCodeServerProcess,
) {
yield* cancelIdleCloseFiber();
const fiber = yield* Effect.sleep(Duration.millis(OPENCODE_TEXT_GENERATION_IDLE_TTL_MS)).pipe(
const fiber = yield* Effect.sleep(OPENCODE_TEXT_GENERATION_IDLE_TTL).pipe(
Effect.andThen(
sharedServerMutex.withPermit(
Effect.sync(() => {
Effect.gen(function* () {
if (sharedServerState.server !== server || sharedServerState.activeRequests > 0) {
return;
}
sharedServerState.idleCloseFiber = null;
closeSharedServer(server);
yield* closeSharedServer();
}),
),
),
Expand All @@ -154,7 +165,7 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
sharedServerState.binaryPath !== input.binaryPath &&
sharedServerState.activeRequests === 0
) {
closeSharedServer(existingServer);
yield* closeSharedServer();
} else {
if (sharedServerState.binaryPath !== input.binaryPath) {
yield* Effect.logWarning(
Expand All @@ -170,20 +181,53 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
}
}

const server = yield* Effect.tryPromise({
try: () => startOpenCodeServerProcess({ binaryPath: input.binaryPath }),
catch: (cause) =>
new TextGenerationError({
operation: input.operation,
detail: cause instanceof Error ? cause.message : "Failed to start OpenCode server.",
cause,
}),
});
// Create a fresh scope that owns this shared server. The runtime
// will attach its child-process and fiber finalizers to this scope;
// closing it kills the server and interrupts those fibers.
//
// The `Scope.make` / spawn / record-or-close transitions run inside
// `uninterruptibleMask` so an interrupt arriving between any two
// steps can't orphan the scope (and the child process attached to
// it) before we either close it on failure or hand ownership to
// `sharedServerState`. `restore` keeps the actual spawn
// interruptible; an interrupt during the spawn is captured by
// `Effect.exit` and drives us through the failure branch that
// closes the fresh scope.
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const serverScope = yield* Scope.make();
const startedExit = yield* Effect.exit(
restore(
openCodeRuntime
.startOpenCodeServerProcess({
binaryPath: input.binaryPath,
})
.pipe(
Effect.provideService(Scope.Scope, serverScope),
Effect.mapError(
(cause) =>
new TextGenerationError({
operation: input.operation,
detail: openCodeRuntimeErrorDetail(cause),
cause,
}),
),
),
),
);
if (startedExit._tag === "Failure") {
yield* Scope.close(serverScope, Exit.void).pipe(Effect.ignore);
return yield* Effect.failCause(startedExit.cause);
}

sharedServerState.server = server;
sharedServerState.binaryPath = input.binaryPath;
sharedServerState.activeRequests = 1;
return server;
const server = startedExit.value;
sharedServerState.server = server;
sharedServerState.serverScope = serverScope;
sharedServerState.binaryPath = input.binaryPath;
sharedServerState.activeRequests = 1;
return server;
}),
);
}),
);

Expand All @@ -200,17 +244,15 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
}),
);

// Module-level finalizer: on layer shutdown, cancel the idle close fiber
// and close the shared server scope. Consumers therefore cannot leak
// the shared OpenCode server by forgetting to call anything.
yield* Effect.addFinalizer(() =>
sharedServerMutex.withPermit(
Effect.gen(function* () {
yield* cancelIdleCloseFiber();
const server = sharedServerState.server;
sharedServerState.server = null;
sharedServerState.binaryPath = null;
sharedServerState.activeRequests = 0;
if (server !== null) {
server.close();
}
yield* closeSharedServer();
}),
),
);
Expand Down Expand Up @@ -264,7 +306,7 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
const runAgainstServer = (server: Pick<OpenCodeServerConnection, "url">) =>
Effect.tryPromise({
try: async () => {
const client = createOpenCodeSdkClient({
const client = openCodeRuntime.createOpenCodeSdkClient({
baseUrl: server.url,
directory: input.cwd,
...(settings.serverUrl.length > 0 && settings.serverPassword
Expand Down Expand Up @@ -304,8 +346,7 @@ const makeOpenCodeTextGeneration = Effect.gen(function* () {
catch: (cause) =>
new TextGenerationError({
operation: input.operation,
detail:
cause instanceof Error ? cause.message : "OpenCode text generation request failed.",
detail: openCodeRuntimeErrorDetail(cause),
cause,
}),
});
Expand Down
Loading
Loading