diff --git a/packages/edge/infra/client/container-runner/src/log_shipper.rs b/packages/edge/infra/client/container-runner/src/log_shipper.rs index 8dabafbd39..d0c10100be 100644 --- a/packages/edge/infra/client/container-runner/src/log_shipper.rs +++ b/packages/edge/infra/client/container-runner/src/log_shipper.rs @@ -38,6 +38,7 @@ pub struct LogShipper { pub vector_socket_addr: String, pub runner_id: String, + pub actor_id: Option, pub env_id: Uuid, } @@ -94,6 +95,7 @@ impl LogShipper { while let Result::Ok(message) = self.msg_rx.recv() { let vector_message = VectorMessage::Runners { runner_id: self.runner_id.as_str(), + actor_id: self.actor_id.as_ref().map(|x| x.as_str()), env_id: self.env_id, stream_type: message.stream_type as u8, ts: message.ts, @@ -117,6 +119,7 @@ enum VectorMessage<'a> { #[serde(rename = "runners")] Runners { runner_id: &'a str, + actor_id: Option<&'a str>, env_id: Uuid, stream_type: u8, ts: u64, diff --git a/packages/edge/infra/client/container-runner/src/main.rs b/packages/edge/infra/client/container-runner/src/main.rs index 7018694d5a..20a66a5d7c 100644 --- a/packages/edge/infra/client/container-runner/src/main.rs +++ b/packages/edge/infra/client/container-runner/src/main.rs @@ -37,6 +37,8 @@ fn main() -> Result<()> { .transpose() .context("failed to parse vector socket addr")?; let runner_id = var("RUNNER_ID")?; + // Only set if this is a single allocation runner (one actor running on it) + let actor_id = var("ACTOR_ID").ok(); let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?; println!("Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}", vector_socket_addr.as_ref().map(|x| x.as_str()).unwrap_or("?")); @@ -51,6 +53,7 @@ fn main() -> Result<()> { msg_rx, vector_socket_addr, runner_id, + actor_id, env_id, }; let log_shipper_thread = log_shipper.spawn(); diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index 2e08046510..452b2be6a2 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -116,9 +116,15 @@ impl Actor { .context("should have runner config")? { protocol::ActorRunner::New { .. } => { + let actor_id = matches!( + self.runner.config().image.allocation_type, + protocol::ImageAllocationType::Single + ) + .then_some(self.actor_id); + // Because the runner is not already started we can get the ports here instead of reading from // sqlite - let ports = self.runner.start(ctx).await?; + let ports = self.runner.start(ctx, actor_id).await?; let pid = self.runner.pid().await?; diff --git a/packages/edge/infra/client/manager/src/runner/mod.rs b/packages/edge/infra/client/manager/src/runner/mod.rs index 75392e2942..a482f44838 100644 --- a/packages/edge/infra/client/manager/src/runner/mod.rs +++ b/packages/edge/infra/client/manager/src/runner/mod.rs @@ -250,9 +250,12 @@ impl Runner { Ok(()) } + // `actor_id` is set if this runner has a single allocation type which means there is only one actor + // runner on it pub async fn start( self: &Arc, ctx: &Arc, + actor_id: Option, ) -> Result> { tracing::info!(runner_id=?self.runner_id, "starting"); @@ -305,7 +308,7 @@ impl Runner { let self2 = self.clone(); let ctx2 = ctx.clone(); tokio::spawn(async move { - match self2.run(&ctx2).await { + match self2.run(&ctx2, actor_id).await { Ok(_) => { if let Err(err) = self2.observe(&ctx2, false).await { tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed"); @@ -339,7 +342,12 @@ impl Runner { .to_string(), ), ("RUNNER_ID", self.runner_id.to_string()), + ( + "ENVIRONMENT_ID", + self.metadata.environment.env_id.to_string(), + ), ]; + if let Some(vector) = &ctx.config().vector { runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string())); } diff --git a/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.down.sql b/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql new file mode 100644 index 0000000000..287855eb37 --- /dev/null +++ b/packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.up.sql @@ -0,0 +1,17 @@ + +CREATE TABLE IF NOT EXISTS runner_logs ( + runner_id UUID, + actor_id UUID, -- When not set will be the NIL UUID (all zeros) + stream_type UInt8, -- pegboard::types::LogsStreamType + ts DateTime64 (9), + message String +) ENGINE = ReplicatedMergeTree () +PARTITION BY + toStartOfHour (ts) +ORDER BY ( + runner_id, + toUnixTimestamp (ts), + stream_type +) +TTL toDate (ts + toIntervalDay (3)) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;