Refactor terminal manager onto Effect runtime#1525
Conversation
- Move terminal lifecycle and PTY callbacks onto Effect-managed layers - Update contracts and WebSocket paths for terminalId-aware operations - Expand tests for streaming, shutdown, and session retention behavior
- Run kill escalation inline during shutdown cleanup - Treat subprocess checks as optional and preserve state updates - Add coverage for SIGKILL escalation after grace period
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Default test helper path ignores historyLineLimit parameter
- Eliminated the dual-path approach in makeManager so it always uses makeTerminalManagerWithOptions with the provided historyLineLimit parameter instead of a fast path via TerminalManagerLive that silently defaulted to 5000.
- ✅ Fixed: Falsy zero-value options prevent custom runtime creation
- Replaced truthy checks with !== undefined checks for all numeric options (subprocessPollIntervalMs, processKillGraceMs, maxRetainedInactiveSessions) so explicit zero values are correctly propagated.
- ✅ Fixed: Eviction test wait condition is always true
- Changed the wait condition from the tautological ptyAdapter.processes.length === 2 to waiting for 2 'exited' events in the event stream, ensuring exit handlers and eviction complete before assertions.
Or push these changes by commenting:
@cursor push 3b9d72eea8
Preview (3b9d72eea8)
diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts
--- a/apps/server/src/terminal/Layers/Manager.test.ts
+++ b/apps/server/src/terminal/Layers/Manager.test.ts
@@ -12,17 +12,15 @@
import { Effect, Encoding, Exit, Layer, ManagedRuntime, Ref, Scope, Stream } from "effect";
import { afterEach, describe, expect, it } from "vitest";
-import { ServerConfig } from "../../config";
import { TerminalManager } from "../Services/Manager";
import {
- PtyAdapter,
type PtyAdapterShape,
type PtyExitEvent,
type PtyProcess,
type PtySpawnInput,
PtySpawnError,
} from "../Services/PTY";
-import { makeTerminalManagerWithOptions, TerminalManagerLive } from "./Manager";
+import { makeTerminalManagerWithOptions } from "./Manager";
class FakePtyProcess implements PtyProcess {
readonly writes: string[] = [];
@@ -194,13 +192,29 @@
const logsDir = path.join(baseDir, "userdata", "logs", "terminals");
const ptyAdapter = options.ptyAdapter ?? new FakePtyAdapter();
- const terminalLayer = TerminalManagerLive.pipe(
- Layer.provideMerge(Layer.succeed(PtyAdapter, ptyAdapter)),
- Layer.provideMerge(ServerConfig.layerTest(process.cwd(), baseDir)),
- Layer.provideMerge(NodeServices.layer),
- );
+ const layer = Layer.effect(
+ TerminalManager,
+ makeTerminalManagerWithOptions({
+ logsDir,
+ historyLineLimit,
+ ptyAdapter,
+ ...(options.shellResolver !== undefined ? { shellResolver: options.shellResolver } : {}),
+ ...(options.subprocessChecker !== undefined
+ ? { subprocessChecker: options.subprocessChecker }
+ : {}),
+ ...(options.subprocessPollIntervalMs !== undefined
+ ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
+ : {}),
+ ...(options.processKillGraceMs !== undefined
+ ? { processKillGraceMs: options.processKillGraceMs }
+ : {}),
+ ...(options.maxRetainedInactiveSessions !== undefined
+ ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
+ : {}),
+ }),
+ ).pipe(Layer.provideMerge(NodeServices.layer));
- const runtime = ManagedRuntime.make(terminalLayer);
+ const runtime = ManagedRuntime.make(layer);
const manager = await runtime.runPromise(Effect.service(TerminalManager));
const eventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
const eventScope = await Effect.runPromise(Scope.make("sequential"));
@@ -210,60 +224,6 @@
).pipe(Effect.forkIn(eventScope)),
);
- if (
- historyLineLimit !== 5 ||
- options.shellResolver ||
- options.subprocessChecker ||
- options.subprocessPollIntervalMs ||
- options.processKillGraceMs ||
- options.maxRetainedInactiveSessions
- ) {
- await runtime.dispose();
-
- const customLayer = Layer.effect(
- TerminalManager,
- makeTerminalManagerWithOptions({
- logsDir,
- historyLineLimit,
- ptyAdapter,
- ...(options.shellResolver ? { shellResolver: options.shellResolver } : {}),
- ...(options.subprocessChecker ? { subprocessChecker: options.subprocessChecker } : {}),
- ...(options.subprocessPollIntervalMs
- ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
- : {}),
- ...(options.processKillGraceMs ? { processKillGraceMs: options.processKillGraceMs } : {}),
- ...(options.maxRetainedInactiveSessions
- ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
- : {}),
- }),
- ).pipe(Layer.provideMerge(NodeServices.layer));
-
- const customRuntime = ManagedRuntime.make(customLayer);
- const customManager = await customRuntime.runPromise(Effect.service(TerminalManager));
- const customEventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
- const customEventScope = await Effect.runPromise(Scope.make("sequential"));
- await customRuntime.runPromise(
- Stream.runForEach(customManager.streamEvents, (event) =>
- Ref.update(customEventsRef, (events) => [...events, event]),
- ).pipe(Effect.forkIn(customEventScope)),
- );
-
- return {
- baseDir,
- logsDir,
- ptyAdapter,
- runtime: customRuntime,
- manager: customManager,
- eventsRef: customEventsRef,
- run: <A, E>(effect: Effect.Effect<A, E>) => customRuntime.runPromise(effect),
- getEvents: () => Effect.runPromise(Ref.get(customEventsRef)),
- dispose: async () => {
- await Effect.runPromise(Scope.close(customEventScope, Exit.void));
- await customRuntime.dispose();
- },
- };
- }
-
return {
baseDir,
logsDir,
@@ -636,7 +596,7 @@
});
it("evicts oldest inactive terminal sessions when retention limit is exceeded", async () => {
- const { manager, ptyAdapter, run, logsDir } = await createManager(5, {
+ const { manager, ptyAdapter, run, logsDir, getEvents } = await createManager(5, {
maxRetainedInactiveSessions: 1,
});
@@ -656,7 +616,10 @@
await new Promise((resolve) => setTimeout(resolve, 5));
second.emitExit({ exitCode: 0, signal: 0 });
- await waitFor(() => ptyAdapter.processes.length === 2);
+ await waitFor(async () => {
+ const events = await getEvents();
+ return events.filter((e) => e.type === "exited").length === 2;
+ });
const reopenedSecond = await run(manager.open(openInput({ threadId: "thread-2" })));
const reopenedFirst = await run(manager.open(openInput({ threadId: "thread-1" })));- Treat omitted terminal IDs as the default session ID in manager operations - Simplify terminal manager tests around retained session eviction - Update terminal contract input types to use encoded schema types
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Persist worker exits without processing newly arrived data
- Changed the cleanup block to return a boolean indicating whether new pendingHistory arrived during the race window, and continue the loop instead of returning when it did, ensuring the worker processes newly queued data.
Or push these changes by commenting:
@cursor push 414a2e4f49
Preview (414a2e4f49)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -837,15 +837,21 @@
});
if (!startState) {
- yield* modifyManagerState((state) => {
+ const hasPending = yield* modifyManagerState((state) => {
const existing = state.persistStates.get(sessionKey);
- if (!existing || existing.pendingHistory !== null) {
- return [undefined, state] as const;
+ if (!existing) {
+ return [false, state] as const;
}
+ if (existing.pendingHistory !== null) {
+ return [true, state] as const;
+ }
const persistStates = new Map(state.persistStates);
persistStates.delete(sessionKey);
- return [undefined, { ...state, persistStates }] as const;
+ return [false, { ...state, persistStates }] as const;
});
+ if (hasPending) {
+ continue;
+ }
return;
}- Add shared CoalescingDrainableWorker utility and tests - Simplify terminal history persistence to drain per key - Persist session history on close before removing session state
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Worker error leaves key permanently stuck in activeKeys
- Added Effect.onError handler around processKey to remove the key from activeKeys when the process effect fails, preventing drainKey/drain from hanging forever.
- ✅ Fixed: Subprocess polling runs forever without active sessions
- Added a hasRunningSessions gate and early-return in pollSubprocessActivity so subprocess checker invocations are skipped entirely when no running sessions exist.
Or push these changes by commenting:
@cursor push 92c0e406e4
Preview (92c0e406e4)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -1377,6 +1377,10 @@
session.status === "running" && Number.isInteger(session.pid),
);
+ if (runningSessions.length === 0) {
+ return;
+ }
+
yield* Effect.forEach(
runningSessions,
(session) =>
@@ -1434,8 +1438,20 @@
);
});
+ const hasRunningSessions = readManagerState.pipe(
+ Effect.map((state) => [...state.sessions.values()].some((s) => s.status === "running")),
+ );
+
yield* Effect.forever(
- Effect.sleep(subprocessPollIntervalMs).pipe(Effect.flatMap(() => pollSubprocessActivity())),
+ hasRunningSessions.pipe(
+ Effect.flatMap((active) =>
+ active
+ ? pollSubprocessActivity().pipe(
+ Effect.flatMap(() => Effect.sleep(subprocessPollIntervalMs)),
+ )
+ : Effect.sleep(subprocessPollIntervalMs),
+ ),
+ ),
).pipe(Effect.forkIn(workerScope));
yield* Effect.addFinalizer(() =>
diff --git a/packages/shared/src/CoalescingDrainableWorker.ts b/packages/shared/src/CoalescingDrainableWorker.ts
--- a/packages/shared/src/CoalescingDrainableWorker.ts
+++ b/packages/shared/src/CoalescingDrainableWorker.ts
@@ -77,7 +77,17 @@
] as const;
}).pipe(Effect.tx),
),
- Effect.flatMap((item) => (item === null ? Effect.void : processKey(item.key, item.value))),
+ Effect.flatMap((item) =>
+ item === null
+ ? Effect.void
+ : Effect.onError(processKey(item.key, item.value), () =>
+ TxRef.update(stateRef, (state) => {
+ const activeKeys = new Set(state.activeKeys);
+ activeKeys.delete(item.key);
+ return { ...state, activeKeys };
+ }),
+ ),
+ ),
Effect.forever,
Effect.forkScoped,
);- Skip subprocess polling until a terminal session is active - Requeue a key after processor failure so draining continues - Add regression coverage for both behaviors
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Double disposal of runtime in shutdown test
- Added a
disposedguard flag to make thedispose()function idempotent, preventing the afterEach hook from double-closing the scope and runtime.
- Added a
- ✅ Fixed: Unused
drainexport in coalescing worker- Removed the unused
drainproperty from both theCoalescingDrainableWorkerinterface and its implementation, as no consumer of this worker type uses it.
- Removed the unused
Or push these changes by commenting:
@cursor push 60a613cd93
Preview (60a613cd93)
diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts
--- a/apps/server/src/terminal/Layers/Manager.test.ts
+++ b/apps/server/src/terminal/Layers/Manager.test.ts
@@ -211,6 +211,7 @@
).pipe(Effect.forkIn(eventScope)),
);
+ let disposed = false;
return {
baseDir,
logsDir,
@@ -221,6 +222,8 @@
run: <A, E>(effect: Effect.Effect<A, E>) => runtime.runPromise(effect),
getEvents: () => Effect.runPromise(Ref.get(eventsRef)),
dispose: async () => {
+ if (disposed) return;
+ disposed = true;
await Effect.runPromise(Scope.close(eventScope, Exit.void));
await runtime.dispose();
},
diff --git a/packages/shared/src/CoalescingDrainableWorker.ts b/packages/shared/src/CoalescingDrainableWorker.ts
--- a/packages/shared/src/CoalescingDrainableWorker.ts
+++ b/packages/shared/src/CoalescingDrainableWorker.ts
@@ -13,7 +13,6 @@
export interface CoalescingDrainableWorker<K, V> {
readonly enqueue: (key: K, value: V) => Effect.Effect<void>;
readonly drainKey: (key: K) => Effect.Effect<void>;
- readonly drain: Effect.Effect<void>;
}
interface CoalescingWorkerState<K, V> {
@@ -126,16 +125,6 @@
Effect.asVoid,
);
- const drain: CoalescingDrainableWorker<K, V>["drain"] = TxRef.get(stateRef).pipe(
- Effect.tap((state) =>
- state.latestByKey.size > 0 || state.queuedKeys.size > 0 || state.activeKeys.size > 0
- ? Effect.txRetry
- : Effect.void,
- ),
- Effect.asVoid,
- Effect.tx,
- );
-
const drainKey: CoalescingDrainableWorker<K, V>["drainKey"] = (key) =>
TxRef.get(stateRef).pipe(
Effect.tap((state) =>
@@ -147,5 +136,5 @@
Effect.tx,
);
- return { enqueue, drainKey, drain } satisfies CoalescingDrainableWorker<K, V>;
+ return { enqueue, drainKey } satisfies CoalescingDrainableWorker<K, V>;
});- Rename the shared coalescing worker API and exports - Switch terminal persistence to the keyed worker - Drop the unused global drain helper
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: All stat errors incorrectly mapped to "notFound" reason
- Changed assertValidCwd to only map PlatformError with reason._tag "NotFound" to TerminalCwdError, while re-raising all other stat errors (permission-denied, IO errors, etc.) as defects via Effect.die, matching the original pre-refactor ENOENT-only handling.
Or push these changes by commenting:
@cursor push 02e642acce
Preview (02e642acce)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -977,13 +977,16 @@
const assertValidCwd = Effect.fn("terminal.assertValidCwd")(function* (cwd: string) {
const stats = yield* fileSystem.stat(cwd).pipe(
- Effect.mapError(
- (cause) =>
- new TerminalCwdError({
- cwd,
- reason: "notFound",
- cause,
- }),
+ Effect.catch((error) =>
+ error.reason._tag === "NotFound"
+ ? Effect.fail(
+ new TerminalCwdError({
+ cwd,
+ reason: "notFound",
+ cause: error,
+ }),
+ )
+ : Effect.die(error),
),
);
if (stats.type !== "Directory") {- Preserve non-NotFound cwd stat errors as statFailed - Add coverage for permission-denied cwd stat failures
- Skip repeated cleanup calls in the test harness - Prevent double-closing the event scope and runtime
- migrate test setup to Effect/Vitest layers - add coverage for async timing and lifecycle cases
- Replace PubSub-backed terminal event streaming with explicit listeners - Preserve PTY output/exit ordering and cover the new flow in tests
Integrates upstream changes including: - Refactor terminal manager onto Effect runtime (pingdotgg#1525) - Refactor web orchestration sync to incremental events and isolated recovery (pingdotgg#1560) - Remove redundant add-project cancel button (pingdotgg#1302) - README documentation updates (pingdotgg#1406, pingdotgg#1564, pingdotgg#1565) Conflict resolution across 8 files: adopted upstream's incremental event store architecture, terminal Effect runtime, and batched orchestration effects while preserving fork's multi-provider state.


Summary
SIGKILLafter the grace period.Testing
apps/server/src/terminal/Layers/Manager.test.tsfor lifecycle, persistence, subprocess activity, and shutdown behavior.apps/server/src/wsServer.test.tsfor websocket protocol changes.Note
High Risk
Large refactor of terminal session lifecycle/eventing and history persistence, including new concurrency/cleanup semantics and error types; regressions could break terminal stability, subprocess polling, or websocket event delivery.
Overview
Refactors the server terminal manager from an imperative
EventEmitterclass into an Effect-managed implementation (makeTerminalManagerWithOptions) with scoped lifecycle cleanup, per-thread locking viaSemaphore, and Fiber-based subprocess polling.History persistence is reworked to use a new
@t3tools/sharedKeyedCoalescingWorkerfor debounced, per-session coalesced writes, with explicit legacy history migration and stronger typed errors (TerminalCwdError,TerminalHistoryError,TerminalSessionLookupError,TerminalNotRunningError).Terminal event delivery now uses Effect callbacks (
subscribe: (event) => Effect), and websocket wiring is updated to subscribe/unsubscribe via scope finalizers; tests are rewritten to@effect/vitest/TestClockand expanded to cover shutdown escalation (SIGTERM→SIGKILL), ordering guarantees, and scoped shutdown. Contracts are adjusted to use encoded schema types for IPC terminal inputs and tweak some exported terminal input types.Written by Cursor Bugbot for commit 1e7690f. This will update automatically on new commits. Configure here.
Note
Refactor terminal manager from EventEmitter class to Effect runtime
TerminalManagerRuntimeclass withmakeTerminalManagerWithOptions, a fully Effect-native factory. Session state usesSynchronizedRef, concurrency is controlled via per-threadSemaphore, and event delivery uses registered Effect listeners instead of NodeEventEmitter.makeKeyedCoalescingWorkerinpackages/sharedto debounce and coalesce history writes per session, replacing ad-hoc timers.Fibers with structured cleanup viaScope.TerminalErrorwith a union of specific tagged error types:TerminalCwdError,TerminalHistoryError,TerminalSessionLookupError, andTerminalNotRunningError.Manager.test.tssuite to use@effect/vitestwithit.layerandTestClock, replacing allnode:fs/node:oscalls with the EffectFileSystemservice.TerminalManagerShape.subscribenow requires an effectful listener; callers providing a plain callback must be updated. Thedisposefield is removed from the interface.Macroscope summarized 1e7690f.