Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions engine/sdks/typescript/runner/src/tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import type { MessageId, RequestId } from "@rivetkit/engine-runner-protocol";
import type { Logger } from "pino";
import { stringify as uuidstringify, v4 as uuidv4 } from "uuid";
import { logger } from "./log";

Check warning on line 5 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedImports

This import is unused.
import type { ActorInstance, Runner } from "./mod";
import { unreachable } from "./utils";
import { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";

const GC_INTERVAL = 60000; // 60 seconds
const MESSAGE_ACK_TIMEOUT = 5000; // 5 seconds
const WEBSOCKET_STATE_PERSIST_TIMEOUT = 30000; // 30 seconds

Check warning on line 12 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable WEBSOCKET_STATE_PERSIST_TIMEOUT is unused.

interface PendingRequest {
resolve: (response: Response) => void;
Expand All @@ -23,6 +23,12 @@
requestIdStr: string;
}

class RunnerShutdownError extends Error {
constructor() {
super("Runner shut down");
}
}

export class Tunnel {
#runner: Runner;

Expand All @@ -49,20 +55,31 @@
}

shutdown() {
// NOTE: Pegboard WS already closed at this point, cannot send
// anything. All teardown logic is handled by pegboard-runner.

if (this.#gcInterval) {
clearInterval(this.#gcInterval);
this.#gcInterval = undefined;
}

// Reject all pending requests
//
// RunnerShutdownError will be explicitly ignored
for (const [_, request] of this.#actorPendingRequests) {
request.reject(new Error("Tunnel shutting down"));
request.reject(new RunnerShutdownError());
}
this.#actorPendingRequests.clear();

// Close all WebSockets
//
// The WebSocket close event with retry is automatically sent when the
// runner WS closes, so we only need to notify the client that the WS
// closed:
// https://github.com/rivet-dev/rivet/blob/00d4f6a22da178a6f8115e5db50d96c6f8387c2e/engine/packages/pegboard-runner/src/lib.rs#L157
for (const [_, ws] of this.#actorWebSockets) {
ws.__closeWithRetry();
// TODO: Trigger close event, but do not send anything over the tunnel
ws.__closeWithoutCallback(1000, "ws.tunnel_shutdown");
}
this.#actorWebSockets.clear();
}
Expand Down Expand Up @@ -407,8 +424,16 @@
await this.#sendResponse(requestId, response);
}
} catch (error) {
this.log?.error({ msg: "error handling request", error });
this.#sendResponseError(requestId, 500, "Internal Server Error");
if (error instanceof RunnerShutdownError) {
this.log?.debug({ msg: "catught runner shutdown error" });
} else {
this.log?.error({ msg: "error handling request", error });
this.#sendResponseError(
requestId,
500,
"Internal Server Error",
);
}
} finally {
// Clean up request tracking
const actor = this.#runner.getActor(req.actorId);
Expand Down Expand Up @@ -604,8 +629,8 @@
headerInit[k] = v;
}
}
headerInit["Upgrade"] = "websocket";

Check notice on line 632 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.
headerInit["Connection"] = "Upgrade";

Check notice on line 633 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.

const request = new Request(`http://localhost${open.path}`, {
method: "GET",
Expand Down
19 changes: 15 additions & 4 deletions engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { logger } from "./log";

export class WebSocketTunnelAdapter {
#webSocketId: string;

Check warning on line 7 in engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.
#readyState: number = 0; // CONNECTING
#eventListeners: Map<string, Set<(event: any) => void>> = new Map();
#onopen: ((this: any, ev: any) => any) | null = null;
Expand Down Expand Up @@ -190,14 +190,23 @@
}

close(code?: number, reason?: string): void {
this.closeInner(code, reason);
this.closeInner(code, reason, false, true);
}

__closeWithRetry(code?: number, reason?: string): void {
this.closeInner(code, reason, true);
this.closeInner(code, reason, true, true);
}

closeInner(code?: number, reason?: string, retry: boolean = false): void {
__closeWithoutCallback(code?: number, reason?: string): void {
this.closeInner(code, reason, false, false);
}

closeInner(
code: number | undefined,
reason: string | undefined,
retry: boolean,
callback: boolean,
): void {
if (
this.#readyState === 2 || // CLOSING
this.#readyState === 3 // CLOSED
Expand All @@ -208,7 +217,9 @@
this.#readyState = 2; // CLOSING

// Send close through tunnel
this.#closeCallback(code, reason, retry);
if (callback) {
this.#closeCallback(code, reason, retry);
}

// Update state and fire event
this.#readyState = 3; // CLOSED
Expand All @@ -227,7 +238,7 @@
addEventListener(
type: string,
listener: (event: any) => void,
options?: boolean | any,

Check warning on line 241 in engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedFunctionParameters

This parameter is unused.
): void {
if (typeof listener === "function") {
let listeners = this.#eventListeners.get(type);
Expand All @@ -245,7 +256,7 @@
removeEventListener(
type: string,
listener: (event: any) => void,
options?: boolean | any,

Check warning on line 259 in engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedFunctionParameters

This parameter is unused.
): void {
if (typeof listener === "function") {
const listeners = this.#eventListeners.get(type);
Expand All @@ -255,7 +266,7 @@
}
}

dispatchEvent(event: any): boolean {

Check warning on line 269 in engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedFunctionParameters

This parameter is unused.
// Simple implementation
return true;
}
Expand Down
Loading