diff --git a/infra/tf/vector/vector.tf b/infra/tf/vector/vector.tf index 18ce83abe9..1e3a9ea400 100644 --- a/infra/tf/vector/vector.tf +++ b/infra/tf/vector/vector.tf @@ -68,38 +68,30 @@ resource "helm_release" "vector" { vector_metrics = { type = "internal_metrics" } + vector_logs = { type = "internal_logs" } } transforms = { - job_run = { + dynamic_servers = { type = "filter" inputs = ["vector", "tcp_json"] condition = { type = "vrl" - source = ".source == \"job_run\"" + source = ".source == \"dynamic_servers\"" } } - dynamic_servers = { + job_run = { type = "filter" inputs = ["vector", "tcp_json"] condition = { type = "vrl" - source = ".source == \"dynamic_servers\"" + source = ".source == \"job_run\"" } } - ds_fix_id = { - type = "remap" - inputs = ["dynamic_servers"] - source = <<-EOF - .server_id = .run_id - del(.run_id) - EOF - } - backend_worker = { type = "filter" inputs = ["http_json"] @@ -116,13 +108,13 @@ resource "helm_release" "vector" { address = "0.0.0.0:9598" } - clickhouse_job_run_logs = { + clickhouse_ds_logs = { type = "clickhouse" - inputs = ["job_run"] + inputs = ["dynamic_servers"] compression = "gzip" - database = "db_job_log" + database = "db_ds_log" endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}" - table = "run_logs" + table = "server_logs" auth = { strategy = "basic" user = "vector" @@ -138,13 +130,13 @@ resource "helm_release" "vector" { } } - clickhouse_ds_logs = { + clickhouse_job_run_logs = { type = "clickhouse" - inputs = ["ds_fix_id"] + inputs = ["job_run"] compression = "gzip" - database = "db_ds_log" + database = "db_job_log" endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}" - table = "server_logs" + table = "run_logs" auth = { strategy = "basic" user = "vector" diff --git a/lib/job-runner/src/lib.rs b/lib/job-runner/src/lib.rs index 0172b8e2f9..fd52aa7e0f 100644 --- a/lib/job-runner/src/lib.rs +++ b/lib/job-runner/src/lib.rs @@ -1,2 +1,7 @@ pub mod log_shipper; pub mod throttle; + +pub enum Manager { + DynamicServers { server_id: String }, + JobRun { run_id: String }, +} diff --git a/lib/job-runner/src/log_shipper.rs b/lib/job-runner/src/log_shipper.rs index 3ed285d2db..153b22d956 100644 --- a/lib/job-runner/src/log_shipper.rs +++ b/lib/job-runner/src/log_shipper.rs @@ -34,9 +34,8 @@ pub struct LogShipper { /// trying to send to this channel. pub msg_rx: mpsc::Receiver, - pub job_run_id: String, pub nomad_task_name: String, - pub runner: String, + pub manager: crate::Manager, } impl LogShipper { @@ -89,13 +88,21 @@ impl LogShipper { println!("Log shipper connected"); while let Result::Ok(message) = self.msg_rx.recv() { - let vector_message = VectorMessage { - source: self.runner.as_str(), - run_id: self.job_run_id.as_str(), - task: self.nomad_task_name.as_str(), - stream_type: message.stream_type as u8, - ts: message.ts, - message: message.message.as_str(), + let vector_message = match &self.manager { + crate::Manager::DynamicServers { server_id } => VectorMessage::DynamicServers { + server_id: server_id.as_str(), + task: self.nomad_task_name.as_str(), + stream_type: message.stream_type as u8, + ts: message.ts, + message: message.message.as_str(), + }, + crate::Manager::JobRun { run_id } => VectorMessage::JobRun { + run_id: run_id.as_str(), + task: self.nomad_task_name.as_str(), + stream_type: message.stream_type as u8, + ts: message.ts, + message: message.message.as_str(), + }, }; serde_json::to_writer(&mut stream, &vector_message)?; @@ -110,11 +117,22 @@ impl LogShipper { /// Vector-compatible message format #[derive(Serialize)] -struct VectorMessage<'a> { - source: &'a str, - run_id: &'a str, - task: &'a str, - stream_type: u8, - ts: u64, - message: &'a str, +#[serde(tag = "source")] +enum VectorMessage<'a> { + #[serde(rename = "dynamic_servers")] + DynamicServers { + server_id: &'a str, + task: &'a str, + stream_type: u8, + ts: u64, + message: &'a str, + }, + #[serde(rename = "job_run")] + JobRun { + run_id: &'a str, + task: &'a str, + stream_type: u8, + ts: u64, + message: &'a str, + }, } diff --git a/lib/job-runner/src/main.rs b/lib/job-runner/src/main.rs index f6ba0bb7fa..1fbf426d68 100644 --- a/lib/job-runner/src/main.rs +++ b/lib/job-runner/src/main.rs @@ -23,12 +23,19 @@ const MAX_PREVIEW_LINES: usize = 128; fn main() -> anyhow::Result<()> { let nomad_alloc_dir = std::env::var("NOMAD_ALLOC_DIR").context("NOMAD_ALLOC_DIR")?; - let job_run_id = std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?; let nomad_task_name = std::env::var("NOMAD_TASK_NAME").context("NOMAD_TASK_NAME")?; let root_user_enabled = std::env::var("NOMAD_META_root_user_enabled") .context("NOMAD_META_root_user_enabled")? == "1"; - let runner = std::env::var("NOMAD_META_runner").unwrap_or("job_run".to_string()); + + let manager = match std::env::var("NOMAD_META_manager").ok() { + Some(x) if x == "dynamic_servers" => job_runner::Manager::DynamicServers { + server_id: std::env::var("NOMAD_META_server_id").context("NOMAD_META_server_id")?, + }, + _ => job_runner::Manager::JobRun { + run_id: std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?, + }, + }; let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1); @@ -38,9 +45,8 @@ fn main() -> anyhow::Result<()> { let log_shipper = log_shipper::LogShipper { shutdown_rx, msg_rx, - job_run_id, nomad_task_name, - runner, + manager, }; let log_shipper_thread = log_shipper.spawn(); diff --git a/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs index 7fd79ec9ce..d1c273a498 100644 --- a/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs @@ -7,7 +7,7 @@ use serde::Deserialize; use crate::util::NEW_NOMAD_CONFIG; // TODO: -const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3); +const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2); #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/svc/pkg/ds/src/workflows/server/mod.rs b/svc/pkg/ds/src/workflows/server/mod.rs index 5eaae3eeb9..3245de0344 100644 --- a/svc/pkg/ds/src/workflows/server/mod.rs +++ b/svc/pkg/ds/src/workflows/server/mod.rs @@ -600,9 +600,9 @@ async fn submit_job(ctx: &ActivityCtx, input: &SubmitJobInput) -> GlobalResult GlobalResu value: "0".into(), }, backend::job::Parameter { - key: "runner".into(), + key: "manager".into(), value: "dynamic_servers".into(), }, backend::job::Parameter { @@ -1040,7 +1040,7 @@ async fn dispatch_job(ctx: &ActivityCtx, input: &DispatchJobInput) -> GlobalResu .into_iter() .collect::>(); - let job_params = vec![("job_run_id".to_string(), input.server_id.to_string())]; + let job_params = vec![("server_id".to_string(), input.server_id.to_string())]; // MARK: Dispatch job let dispatch_res = nomad_client::apis::jobs_api::post_job_dispatch( diff --git a/svc/pkg/mm/worker/src/workers/lobby_create/mod.rs b/svc/pkg/mm/worker/src/workers/lobby_create/mod.rs index a594349e4b..e34edc2cb2 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_create/mod.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_create/mod.rs @@ -736,7 +736,7 @@ async fn create_docker_job( value: if mm_game_config.root_user_enabled { "1" } else { "0" }.into() }, backend::job::Parameter { - key: "runner".into(), + key: "manager".into(), value: "job_run".into() }, ].into_iter() diff --git a/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs b/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs index e96d6aa7d8..55cdc47fb9 100644 --- a/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs +++ b/svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs @@ -405,7 +405,7 @@ pub fn gen_lobby_docker_job( "max_players_direct".into(), "max_players_party".into(), "root_user_enabled".into(), - "runner".into(), + "manager".into(), ]), meta_optional: Some(vec!["rivet_test_id".into()]), })),