Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions infra/tf/vector/vector.tf
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,30 @@ resource "helm_release" "vector" {
vector_metrics = {
type = "internal_metrics"
}

vector_logs = {
type = "internal_logs"
}
}
transforms = {
job_run = {
dynamic_servers = {
type = "filter"
inputs = ["vector", "tcp_json"]
condition = {
type = "vrl"
source = ".source == \"job_run\""
source = ".source == \"dynamic_servers\""
}
}

dynamic_servers = {
job_run = {
type = "filter"
inputs = ["vector", "tcp_json"]
condition = {
type = "vrl"
source = ".source == \"dynamic_servers\""
source = ".source == \"job_run\""
}
}

ds_fix_id = {
type = "remap"
inputs = ["dynamic_servers"]
source = <<-EOF
.server_id = .run_id
del(.run_id)
EOF
}

backend_worker = {
type = "filter"
inputs = ["http_json"]
Expand All @@ -116,13 +108,13 @@ resource "helm_release" "vector" {
address = "0.0.0.0:9598"
}

clickhouse_job_run_logs = {
clickhouse_ds_logs = {
type = "clickhouse"
inputs = ["job_run"]
inputs = ["dynamic_servers"]
compression = "gzip"
database = "db_job_log"
database = "db_ds_log"
endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}"
table = "run_logs"
table = "server_logs"
auth = {
strategy = "basic"
user = "vector"
Expand All @@ -138,13 +130,13 @@ resource "helm_release" "vector" {
}
}

clickhouse_ds_logs = {
clickhouse_job_run_logs = {
type = "clickhouse"
inputs = ["ds_fix_id"]
inputs = ["job_run"]
compression = "gzip"
database = "db_ds_log"
database = "db_job_log"
endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}"
table = "server_logs"
table = "run_logs"
auth = {
strategy = "basic"
user = "vector"
Expand Down
5 changes: 5 additions & 0 deletions lib/job-runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
pub mod log_shipper;
pub mod throttle;

pub enum Manager {
DynamicServers { server_id: String },
JobRun { run_id: String },
}
50 changes: 34 additions & 16 deletions lib/job-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ pub struct LogShipper {
/// trying to send to this channel.
pub msg_rx: mpsc::Receiver<ReceivedMessage>,

pub job_run_id: String,
pub nomad_task_name: String,
pub runner: String,
pub manager: crate::Manager,
}

impl LogShipper {
Expand Down Expand Up @@ -89,13 +88,21 @@ impl LogShipper {
println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage {
source: self.runner.as_str(),
run_id: self.job_run_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
let vector_message = match &self.manager {
crate::Manager::DynamicServers { server_id } => VectorMessage::DynamicServers {
server_id: server_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
crate::Manager::JobRun { run_id } => VectorMessage::JobRun {
run_id: run_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
};

serde_json::to_writer(&mut stream, &vector_message)?;
Expand All @@ -110,11 +117,22 @@ impl LogShipper {

/// Vector-compatible message format
#[derive(Serialize)]
struct VectorMessage<'a> {
source: &'a str,
run_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "dynamic_servers")]
DynamicServers {
server_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
#[serde(rename = "job_run")]
JobRun {
run_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
}
14 changes: 10 additions & 4 deletions lib/job-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ const MAX_PREVIEW_LINES: usize = 128;

fn main() -> anyhow::Result<()> {
let nomad_alloc_dir = std::env::var("NOMAD_ALLOC_DIR").context("NOMAD_ALLOC_DIR")?;
let job_run_id = std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?;
let nomad_task_name = std::env::var("NOMAD_TASK_NAME").context("NOMAD_TASK_NAME")?;
let root_user_enabled = std::env::var("NOMAD_META_root_user_enabled")
.context("NOMAD_META_root_user_enabled")?
== "1";
let runner = std::env::var("NOMAD_META_runner").unwrap_or("job_run".to_string());

let manager = match std::env::var("NOMAD_META_manager").ok() {
Some(x) if x == "dynamic_servers" => job_runner::Manager::DynamicServers {
server_id: std::env::var("NOMAD_META_server_id").context("NOMAD_META_server_id")?,
},
_ => job_runner::Manager::JobRun {
run_id: std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?,
},
};

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);

Expand All @@ -38,9 +45,8 @@ fn main() -> anyhow::Result<()> {
let log_shipper = log_shipper::LogShipper {
shutdown_rx,
msg_rx,
job_run_id,
nomad_task_name,
runner,
manager,
};
let log_shipper_thread = log_shipper.spawn();

Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::Deserialize;
use crate::util::NEW_NOMAD_CONFIG;

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3);
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2);

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
8 changes: 4 additions & 4 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ async fn submit_job(ctx: &ActivityCtx, input: &SubmitJobInput) -> GlobalResult<S
"vector_socket_addr".into(),
"image_artifact_url".into(),
"root_user_enabled".into(),
"runner".into(),
"manager".into(),
"user_env".into(),
"job_run_id".into(),
"server_id".into(),
]),
meta_optional: Some(vec!["rivet_test_id".into()]),
})),
Expand Down Expand Up @@ -1022,7 +1022,7 @@ async fn dispatch_job(ctx: &ActivityCtx, input: &DispatchJobInput) -> GlobalResu
value: "0".into(),
},
backend::job::Parameter {
key: "runner".into(),
key: "manager".into(),
value: "dynamic_servers".into(),
},
backend::job::Parameter {
Expand All @@ -1040,7 +1040,7 @@ async fn dispatch_job(ctx: &ActivityCtx, input: &DispatchJobInput) -> GlobalResu
.into_iter()
.collect::<Vec<_>>();

let job_params = vec![("job_run_id".to_string(), input.server_id.to_string())];
let job_params = vec![("server_id".to_string(), input.server_id.to_string())];

// MARK: Dispatch job
let dispatch_res = nomad_client::apis::jobs_api::post_job_dispatch(
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/mm/worker/src/workers/lobby_create/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ async fn create_docker_job(
value: if mm_game_config.root_user_enabled { "1" } else { "0" }.into()
},
backend::job::Parameter {
key: "runner".into(),
key: "manager".into(),
value: "job_run".into()
},
].into_iter()
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ pub fn gen_lobby_docker_job(
"max_players_direct".into(),
"max_players_party".into(),
"root_user_enabled".into(),
"runner".into(),
"manager".into(),
]),
meta_optional: Some(vec!["rivet_test_id".into()]),
})),
Expand Down