From b2a01796d502fb36e447c12bfc1185fe5ce4e989 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Sat, 21 Jun 2025 01:29:18 +0000 Subject: [PATCH] feat: get multi actors working e2e on docker compose --- docker/dev-full/docker-compose.yml | 2 + docker/dev-full/vector-client/vector.yaml | 32 +---- docker/dev-full/vector-server/vector.yaml | 24 ++++ docker/http-debug/Dockerfile | 15 ++ docker/http-debug/debug_server.py | 122 ++++++++++++++++ .../system-test-actor/src/managerClient.ts | 134 ++++++++++-------- examples/system-test-actor/tests/client.ts | 8 +- .../install_scripts/components/vector.rs | 4 +- .../migrations/20200101000000_init.up.sql | 2 +- .../pegboard/src/workflows/actor2/destroy.rs | 52 ++++++- .../pegboard/src/workflows/actor2/mod.rs | 18 +-- .../pegboard/src/workflows/actor2/runtime.rs | 15 +- .../pegboard/src/workflows/actor2/setup.rs | 5 +- .../toolchain/src/util/docker/push.rs | 14 +- 14 files changed, 331 insertions(+), 116 deletions(-) create mode 100644 docker/http-debug/Dockerfile create mode 100644 docker/http-debug/debug_server.py diff --git a/docker/dev-full/docker-compose.yml b/docker/dev-full/docker-compose.yml index 489dc5c9d3..f5e73a3c96 100644 --- a/docker/dev-full/docker-compose.yml +++ b/docker/dev-full/docker-compose.yml @@ -357,6 +357,8 @@ services: volumes: - vector-server-data:/var/lib/vector - ./vector-server:/etc/vector + # environment: + # - VECTOR_LOG=debug networks: - rivet-network diff --git a/docker/dev-full/vector-client/vector.yaml b/docker/dev-full/vector-client/vector.yaml index 945d0711c2..37a05d8662 100644 --- a/docker/dev-full/vector-client/vector.yaml +++ b/docker/dev-full/vector-client/vector.yaml @@ -21,15 +21,10 @@ sources: include: - /var/lib/rivet-client/log - pegboard_v8_isolate_runner: - type: file - include: - - /var/lib/rivet-client/runner/log - pegboard_container_runners: type: file include: - - /var/lib/rivet-client/actors/*/log + - /var/lib/rivet-client/runners/*/log transforms: filter_metrics: @@ -46,7 +41,7 @@ transforms: .tags.server_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" .tags.datacenter_id = "f288913c-735d-4188-bf9b-2fcf6eac7b9c" .tags.cluster_id = "unknown" - .tags.pool_type = "pegboard_isolate" + .tags.pool_type = "pegboard" .tags.public_ip = "127.0.0.1" pegboard_manager_add_meta: @@ -60,36 +55,22 @@ transforms: .server_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" .datacenter_id = "f288913c-735d-4188-bf9b-2fcf6eac7b9c" .tags.cluster_id = "unknown" - .pool_type = "pegboard_isolate" - .public_ip = "127.0.0.1" - - pegboard_v8_isolate_runner_add_meta: - type: remap - inputs: - - pegboard_v8_isolate_runner - source: | - .source = "pegboard_v8_isolate_runner" - - .client_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" - .server_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" - .datacenter_id = "f288913c-735d-4188-bf9b-2fcf6eac7b9c" - .tags.cluster_id = "unknown" - .pool_type = "pegboard_isolate" + .pool_type = "pegboard" .public_ip = "127.0.0.1" - + pegboard_container_runner_add_meta: type: remap inputs: - pegboard_container_runners source: | .source = "pegboard_container_runner" - .actor_id = parse_regex!(.file, r'/etc/pegboard/actors/(?P[0-9a-fA-F-]+)/log').actor_id + .runner_id = parse_regex!(.file, r'/etc/pegboard/runners/(?P[0-9a-fA-F-]+)/log').runner_id .client_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" .server_id = "fc67e54e-5d6a-4726-ab23-77b0e54f068f" .datacenter_id = "f288913c-735d-4188-bf9b-2fcf6eac7b9c" .cluster_id = "unknown" - .pool_type = "pegboard_isolate" + .pool_type = "pegboard" .public_ip = "127.0.0.1" sinks: @@ -99,7 +80,6 @@ sinks: - metrics_add_meta - dynamic_events_http - pegboard_manager_add_meta - - pegboard_v8_isolate_runner_add_meta - pegboard_container_runner_add_meta address: vector-server:6000 healthcheck: diff --git a/docker/dev-full/vector-server/vector.yaml b/docker/dev-full/vector-server/vector.yaml index 9fa0090fcd..5eeb4b12fb 100644 --- a/docker/dev-full/vector-server/vector.yaml +++ b/docker/dev-full/vector-server/vector.yaml @@ -34,6 +34,15 @@ transforms: condition: type: vrl source: .source == "actors" + + runners: + type: filter + inputs: + - vector + - tcp_json + condition: + type: vrl + source: .source == "runners" job_run: type: filter @@ -126,6 +135,21 @@ sinks: password: vector batch: timeout_secs: 1.0 + + clickhouse_runner_logs: + type: clickhouse + inputs: + - runners + compression: gzip + database: db_pegboard_runner_log + endpoint: http://clickhouse:8123 + table: runner_logs + auth: + strategy: basic + user: vector + password: vector + batch: + timeout_secs: 1.0 clickhouse_job_run_logs: type: clickhouse diff --git a/docker/http-debug/Dockerfile b/docker/http-debug/Dockerfile new file mode 100644 index 0000000000..949bf5c826 --- /dev/null +++ b/docker/http-debug/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install required packages +RUN pip install flask + +# Create the debug server +COPY debug_server.py . + +# Expose port 8080 +EXPOSE 8080 + +# Run the server +CMD ["python", "debug_server.py"] \ No newline at end of file diff --git a/docker/http-debug/debug_server.py b/docker/http-debug/debug_server.py new file mode 100644 index 0000000000..fdf98478f5 --- /dev/null +++ b/docker/http-debug/debug_server.py @@ -0,0 +1,122 @@ +from flask import Flask, request, jsonify +import gzip +import zlib +from datetime import datetime + +app = Flask(__name__) + +def log_request_details(): + """Log all request details in a formatted way""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + print("=" * 80) + print(f"[{timestamp}] NEW REQUEST") + print("=" * 80) + + # Method and URL + print(f"Method: {request.method}") + print(f"URL: {request.url}") + print(f"Path: {request.path}") + print(f"Query String: {request.query_string.decode('utf-8')}") + + # Headers + print("\n--- HEADERS ---") + for header_name, header_value in request.headers: + print(f"{header_name}: {header_value}") + + # Query Parameters + if request.args: + print("\n--- QUERY PARAMETERS ---") + for key, value in request.args.items(): + print(f"{key}: {value}") + + # Form Data + if request.form: + print("\n--- FORM DATA ---") + for key, value in request.form.items(): + print(f"{key}: {value}") + + # Files + if request.files: + print("\n--- FILES ---") + for key, file in request.files.items(): + print(f"{key}: {file.filename} (Content-Type: {file.content_type})") + + # Raw Body with decompression support + try: + raw_body = request.get_data() + content_encoding = request.headers.get('Content-Encoding', '').lower() + + print("\n--- REQUEST BODY ---") + + if not raw_body: + print("(empty)") + else: + # Handle compressed content + decompressed_body = None + + if content_encoding == 'gzip': + try: + decompressed_body = gzip.decompress(raw_body).decode('utf-8') + print("(Content was gzip-compressed, showing decompressed version)") + except Exception as e: + print(f"Failed to decompress gzip content: {e}") + elif content_encoding == 'deflate': + try: + decompressed_body = zlib.decompress(raw_body).decode('utf-8') + print("(Content was deflate-compressed, showing decompressed version)") + except Exception as e: + print(f"Failed to decompress deflate content: {e}") + elif content_encoding in ['br', 'brotli']: + print("(Content is brotli-compressed - brotli decompression not available)") + print("Raw compressed data (first 200 bytes):") + print(repr(raw_body[:200])) + elif content_encoding == 'zstd': + print("(Content is zstd-compressed - zstd decompression not available)") + print("Raw compressed data (first 200 bytes):") + print(repr(raw_body[:200])) + else: + # No compression or unknown compression + try: + decompressed_body = raw_body.decode('utf-8') + except UnicodeDecodeError: + print("(Binary content - showing first 200 bytes as hex)") + print(raw_body[:200].hex()) + + # Display the decompressed content + if decompressed_body: + print(decompressed_body) + + # Also show raw length info + print(f"\nRaw body length: {len(raw_body)} bytes") + print(f"Decompressed length: {len(decompressed_body)} bytes") + + except Exception as e: + print(f"\n--- REQUEST BODY ---") + print(f"Error reading body: {e}") + + print("=" * 80) + print() + +# Catch all routes for any HTTP method +@app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS']) +@app.route('/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS']) +def debug_endpoint(path): + log_request_details() + + # Return a simple response + response_data = { + "message": "Request received and logged", + "method": request.method, + "path": f"/{path}" if path else "/", + "timestamp": datetime.now().isoformat() + } + + return jsonify(response_data), 200 + +if __name__ == '__main__': + print("Starting HTTP Debug Server...") + print("All incoming requests will be logged to console") + print("Server listening on port 8080") + print("=" * 80) + app.run(host='0.0.0.0', port=8080, debug=False) \ No newline at end of file diff --git a/examples/system-test-actor/src/managerClient.ts b/examples/system-test-actor/src/managerClient.ts index 352a950895..cc86b1b0c8 100644 --- a/examples/system-test-actor/src/managerClient.ts +++ b/examples/system-test-actor/src/managerClient.ts @@ -1,81 +1,73 @@ -import WebSocket from "ws"; +import * as net from "net"; +import * as fs from "fs"; +import { setInterval, clearInterval } from "timers"; export function connectToManager() { - let managerIp = process.env.RIVET_MANAGER_IP; - let managerPort = process.env.RIVET_MANAGER_PORT; + const socketPath = process.env.RIVET_MANAGER_SOCKET_PATH; let pingInterval: NodeJS.Timeout; - if (!managerIp || !managerPort) { - console.error("Missing RIVET_MANAGER_IP or RIVET_MANAGER_PORT environment variables"); + if (!socketPath) { + console.error("Missing RIVET_MANAGER_SOCKET_PATH environment variable"); return; } - let wsUrl = `ws://${managerIp}:${managerPort}`; - console.log(`Connecting to manager WebSocket at ${wsUrl}`); + console.log(`Connecting to Unix socket at ${socketPath}`); - let ws = new WebSocket(wsUrl); + // Ensure the socket path exists + if (!fs.existsSync(socketPath)) { + console.error(`Socket path does not exist: ${socketPath}`); + return; + } - ws.on("open", () => { - console.log("Connected to manager WebSocket"); - - let message = { - init: { - access_token: process.env.RIVET_ACCESS_TOKEN - } - }; - let buffer = Buffer.from(JSON.stringify(message)); - ws.send(buffer); + const client = net.createConnection(socketPath, () => { + console.log("Socket connection established"); // Start ping loop to keep connection alive pingInterval = setInterval(() => { - if (ws.readyState === WebSocket.OPEN) { - ws.ping(); - } + const pingMessage = { ping: null }; + client.write(encodeFrame(pingMessage)); }, 2000); }); - ws.on("message", (data) => { - let json = data.toString(); + client.on("data", (data) => { + const packets = decodeFrames(data); + packets.forEach((packet) => { + console.log("Received packet from manager:", packet); - console.log("Received message from manager:", json); - - let packet = JSON.parse(json); - - if (packet.start_actor) { - let message = { - actor_state_update: { - actor_id: packet.start_actor.actor_id, - generation: packet.start_actor.generation, - state: { - running: null, + if (packet.start_actor) { + const response = { + actor_state_update: { + actor_id: packet.start_actor.actor_id, + generation: packet.start_actor.generation, + state: { + running: null, + }, }, - } - }; - let buffer = Buffer.from(JSON.stringify(message)); - ws.send(buffer); - } else if (packet.signal_actor) { - let message = { - actor_state_update: { - actor_id: packet.start_actor.actor_id, - generation: packet.start_actor.generation, - state: { - exited: { - exit_code: 0, - } + }; + client.write(encodeFrame(response)); + } else if (packet.signal_actor) { + const response = { + actor_state_update: { + actor_id: packet.signal_actor.actor_id, + generation: packet.signal_actor.generation, + state: { + exited: { + exit_code: 0, + }, + }, }, - } - }; - let buffer = Buffer.from(JSON.stringify(message)); - ws.send(buffer); - } + }; + client.write(encodeFrame(response)); + } + }); }); - ws.on("error", (error) => { - console.error("WebSocket error:", error); + client.on("error", (error) => { + console.error("Socket error:", error); }); - ws.on("close", code => { - console.log("WebSocket connection closed, attempting to reconnect...", code); + client.on("close", () => { + console.log("Socket connection closed, attempting to reconnect..."); // Clear ping interval when connection closes if (pingInterval) clearInterval(pingInterval); @@ -83,3 +75,33 @@ export function connectToManager() { setTimeout(connectToManager, 5000); }); } + +function encodeFrame(payload: any): Buffer { + const json = JSON.stringify(payload); + const payloadLength = Buffer.alloc(4); + payloadLength.writeUInt32BE(json.length, 0); + + const header = Buffer.alloc(4); // All zeros for now + return Buffer.concat([payloadLength, header, Buffer.from(json)]); +} + +function decodeFrames(buffer: Buffer): any[] { + const packets = []; + let offset = 0; + + while (offset < buffer.length) { + if (buffer.length - offset < 8) break; // Incomplete frame length + header + const payloadLength = buffer.readUInt32BE(offset); + offset += 4; + + // Skip the header (4 bytes) + offset += 4; + + if (buffer.length - offset < payloadLength) break; // Incomplete frame data + const json = buffer.slice(offset, offset + payloadLength).toString(); + packets.push(JSON.parse(json)); + offset += payloadLength; + } + + return packets; +} diff --git a/examples/system-test-actor/tests/client.ts b/examples/system-test-actor/tests/client.ts index 68773ed22e..46e9be06bf 100644 --- a/examples/system-test-actor/tests/client.ts +++ b/examples/system-test-actor/tests/client.ts @@ -61,10 +61,10 @@ async function run() { lifecycle: { durable: false, }, - resources: { - cpu: 100, - memory: 100, - }, + // resources: { + // cpu: 100, + // memory: 100, + // }, }, }); actorId = actor.id; diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs index d3de063b3c..dc7753ce6a 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs @@ -132,7 +132,7 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa config_json["sources"]["pegboard_container_runners"] = json!({ "type": "file", - "include": ["/var/lib/rivet-client/runners/*/logs/*"] + "include": ["/var/lib/rivet-client/actors/*/logs/*"] }); config_json["transforms"]["pegboard_container_runner_add_meta"] = json!({ @@ -141,7 +141,7 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa "source": formatdoc!( r#" .source = "pegboard_container_runner" - .runner_id = parse_regex!(.file, r'/var/lib/rivet-client/runners/(?P[0-9a-fA-F-]+)/logs/').runner_id + .actor_id = parse_regex!(.file, r'/var/lib/rivet-client/actors/(?P[0-9a-fA-F-]+)/logs/').actor_id .namespace = "{namespace}" .client_id = "___SERVER_ID___" diff --git a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql index 342ed6c2a0..580e7beaea 100644 --- a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql +++ b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql @@ -1,5 +1,5 @@ CREATE TABLE IF NOT EXISTS actor_runners ( - actor_id UUID, + actor_id String, generation UInt32, runner_id UUID, started_at DateTime64 (9), diff --git a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs index 3a5d3c9cbe..aa573d591a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs @@ -4,18 +4,21 @@ use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE}; use foundationdb::{self as fdb, options::ConflictRangeType}; use nix::sys::signal::Signal; -use super::{analytics::InsertClickHouseInput, DestroyComplete, DestroyStarted}; +use super::{ + analytics::InsertClickHouseInput, runtime::ActorRunnerClickhouseRow, DestroyComplete, + DestroyStarted, +}; use crate::{keys, protocol, types::GameGuardProtocol}; #[derive(Debug, Serialize, Deserialize)] pub struct KillCtx { - pub generation: u32, pub kill_timeout_ms: i64, } #[derive(Debug, Serialize, Deserialize)] pub(crate) struct Input { pub actor_id: util::Id, + pub generation: u32, pub image_id: Uuid, pub build_allocation_type: Option, /// Whether or not to send signals to the pb actor. In the case that the actor was already stopped @@ -36,6 +39,16 @@ pub(crate) async fn pegboard_actor_destroy( let actor = ctx.activity(UpdateDbInput {}).await?; if let Some(actor) = actor { + if let (Some(start_ts), Some(runner_id)) = (actor.start_ts, actor.runner_id) { + ctx.activity(FinishRunnerClickhouseInput { + actor_id: input.actor_id, + generation: input.generation, + start_ts, + runner_id, + }) + .await?; + } + let client_workflow_id = actor.client_workflow_id; let runner_id = actor.runner_id; @@ -53,7 +66,7 @@ pub(crate) async fn pegboard_actor_destroy( kill( ctx, input.actor_id, - kill_data.generation, + input.generation, client_workflow_id, kill_data.kill_timeout_ms, false, @@ -100,6 +113,7 @@ struct UpdateDbOutput { selected_resources_cpu_millicores: Option, tags: sqlx::types::Json>, create_ts: i64, + start_ts: Option, runner_id: Option, client_id: Option, client_workflow_id: Option, @@ -124,6 +138,7 @@ async fn update_db( selected_resources_cpu_millicores, json(tags) AS tags, create_ts, + start_ts, runner_id, client_id, client_workflow_id @@ -133,6 +148,37 @@ async fn update_db( .await } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct FinishRunnerClickhouseInput { + actor_id: util::Id, + generation: u32, + start_ts: i64, + runner_id: Uuid, +} + +#[activity(FinishRunnerClickhouse)] +async fn finish_runner_clickhouse( + ctx: &ActivityCtx, + input: &FinishRunnerClickhouseInput, +) -> GlobalResult<()> { + let inserter = ctx.clickhouse_inserter().await?; + + // Set alloc as finished + inserter.insert( + "db_pegboard_runner", + "actor_runners", + ActorRunnerClickhouseRow { + actor_id: input.actor_id.to_string(), + generation: input.generation, + runner_id: input.runner_id, + started_at: input.start_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + finished_at: util::timestamp::now() * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + }, + )?; + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] pub struct UpdateFdbInput { actor_id: util::Id, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs index 1eb00861bd..b043da1902 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs @@ -111,6 +111,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu ctx.workflow(destroy::Input { actor_id: input.actor_id, + generation: 0, image_id: input.image_id, build_allocation_type: None, kill: None, @@ -145,6 +146,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu ctx.workflow(destroy::Input { actor_id: input.actor_id, + generation: 0, image_id: input.image_id, build_allocation_type: Some(initial_actor_setup.meta.build_allocation_type), kill: None, @@ -202,9 +204,9 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu { // Destroyed early return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: state.image_id, kill: Some(KillCtx { - generation: state.generation, kill_timeout_ms: sig .override_kill_timeout_ms .unwrap_or(input.lifecycle.kill_timeout_ms), @@ -216,9 +218,9 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu } } else { return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: state.image_id, kill: Some(KillCtx { - generation: state.generation, kill_timeout_ms: input.lifecycle.kill_timeout_ms, }), })); @@ -351,6 +353,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu { // Destroyed early return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: state.image_id, // None here because if we received the destroy signal, it is // guaranteed that we did not allocate another actor. @@ -372,13 +375,11 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu } return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: state.image_id, // No need to kill if already exited kill: matches!(sig.state, protocol::ActorState::Lost) - .then_some(KillCtx { - generation: state.generation, - kill_timeout_ms: 0, - }), + .then_some(KillCtx { kill_timeout_ms: 0 }), })); } } @@ -416,9 +417,9 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu { // Destroyed early return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: input.image_id, kill: Some(KillCtx { - generation: state.generation, kill_timeout_ms: sig .override_kill_timeout_ms .unwrap_or(input.lifecycle.kill_timeout_ms), @@ -442,9 +443,9 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu } Main::Destroy(sig) => { return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, image_id: input.image_id, kill: Some(KillCtx { - generation: state.generation, kill_timeout_ms: sig .override_kill_timeout_ms .unwrap_or(input.lifecycle.kill_timeout_ms), @@ -462,6 +463,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu ctx.workflow(destroy::Input { actor_id: input.actor_id, + generation: lifecycle_res.generation, image_id: lifecycle_res.image_id, build_allocation_type: Some(initial_actor_setup.meta.build_allocation_type), kill: lifecycle_res.kill, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs index f295065192..85e5c4e564 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs @@ -57,6 +57,7 @@ impl State { #[derive(Serialize, Deserialize)] pub struct LifecycleRes { + pub generation: u32, pub image_id: Uuid, pub kill: Option, } @@ -583,12 +584,12 @@ pub struct SetStartedInput { } #[derive(Serialize)] -pub struct ActorRunnerClickhouseRow { - actor_id: String, - generation: u32, - runner_id: Uuid, - started_at: i64, - finished_at: i64, +pub(crate) struct ActorRunnerClickhouseRow { + pub actor_id: String, + pub generation: u32, + pub runner_id: Uuid, + pub started_at: i64, + pub finished_at: i64, } #[activity(SetStarted)] @@ -627,7 +628,7 @@ pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> GlobalRe generation: input.generation, runner_id: old_runner_id, started_at: old_start_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) - finished_at: start_ts * 1_000_000, + finished_at: start_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) }, )?; } diff --git a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs index b915aade1a..946f943751 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs @@ -551,10 +551,7 @@ struct InsertPortsInput { } #[activity(InsertPorts)] -async fn insert_ports( - ctx: &ActivityCtx, - input: &InsertPortsInput, -) -> GlobalResult<()> { +async fn insert_ports(ctx: &ActivityCtx, input: &InsertPortsInput) -> GlobalResult<()> { let pool = ctx.sqlite().await?; let mut conn = pool.conn().await?; let mut tx = conn.begin().await?; diff --git a/packages/toolchain/toolchain/src/util/docker/push.rs b/packages/toolchain/toolchain/src/util/docker/push.rs index 176a0e4955..2ec7342a16 100644 --- a/packages/toolchain/toolchain/src/util/docker/push.rs +++ b/packages/toolchain/toolchain/src/util/docker/push.rs @@ -75,12 +75,16 @@ pub async fn push_tar( compression: Some(build_compression), // TODO: Expose to CLI and config allocation: Some(Box::new(models::BuildsAllocation { - single: Some(serde_json::json!({})), - multi: None, - // single: None, - // multi: Some(Box::new(models::BuildsAllocationMulti { slots: 4 })), + // single: Some(serde_json::json!({})), + // multi: None, + single: None, + multi: Some(Box::new(models::BuildsAllocationMulti { slots: 4 })), + })), + // resources: None, + resources: Some(Box::new(models::BuildsResources { + cpu: 128, + memory: 128, })), - resources: None, }, Some(&ctx.project.name_id), Some(&push_opts.env.slug),