From 22b3fecf0ffa8f1cf10661ebf930ff78fc83f269 Mon Sep 17 00:00:00 2001 From: NathanFlurry Date: Fri, 16 Aug 2024 05:35:05 +0000 Subject: [PATCH] chore: update start_ts to be set when networking is ready (#1062) ## Changes --- fern/definition/servers/common.yml | 1 + lib/convert/src/impls/ds.rs | 2 +- proto/backend/ds.proto | 1 + .../src/route/game_guard/dynamic_servers.rs | 6 +-- svc/pkg/ds/db/servers/Service.toml | 2 +- .../migrations/20240501133910_init.up.sql | 2 +- .../migrations/20240809224504_add_idx.up.sql | 10 ++--- svc/pkg/ds/ops/server-create/Service.toml | 2 +- svc/pkg/ds/ops/server-create/src/lib.rs | 15 +++---- svc/pkg/ds/ops/server-delete/Service.toml | 2 +- svc/pkg/ds/ops/server-delete/src/lib.rs | 8 ++-- svc/pkg/ds/ops/server-get/src/lib.rs | 13 +++--- svc/pkg/ds/ops/server-list-for-env/src/lib.rs | 4 +- svc/pkg/ds/worker/src/workers/mod.rs | 2 +- .../src/workers/nomad_monitor_alloc_plan.rs | 42 ++++++++++++------- .../src/workers/nomad_monitor_alloc_update.rs | 18 ++++---- 16 files changed, 75 insertions(+), 55 deletions(-) diff --git a/fern/definition/servers/common.yml b/fern/definition/servers/common.yml index 2e10861ea1..f02d0fa847 100644 --- a/fern/definition/servers/common.yml +++ b/fern/definition/servers/common.yml @@ -17,6 +17,7 @@ types: lifecycle: Lifecycle created_at: long started_at: optional + connectable_at: optional destroyed_at: optional Runtime: diff --git a/lib/convert/src/impls/ds.rs b/lib/convert/src/impls/ds.rs index fbf71788b0..fe15e38c06 100644 --- a/lib/convert/src/impls/ds.rs +++ b/lib/convert/src/impls/ds.rs @@ -17,7 +17,7 @@ impl ApiTryFrom for models::ServersServer { datacenter: unwrap!(value.datacenter_id).as_uuid(), cluster: unwrap!(value.cluster_id).as_uuid(), created_at: value.create_ts, - started_at: value.start_ts, + started_at: value.connectable_ts, destroyed_at: value.destroy_ts, tags: Some(to_value(value.tags).unwrap()), runtime: Box::new(models::ServersRuntime { diff --git a/proto/backend/ds.proto b/proto/backend/ds.proto index e72790b705..90ac05c06f 100644 --- a/proto/backend/ds.proto +++ b/proto/backend/ds.proto @@ -18,6 +18,7 @@ message Server { int64 kill_timeout_ms = 7; int64 create_ts = 9; optional int64 start_ts = 10; + optional int64 connectable_ts = 17; optional int64 destroy_ts = 11; rivet.common.Uuid image_id = 12; repeated string args = 13; diff --git a/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs b/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs index 26e4b6c672..f54937aeac 100644 --- a/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs +++ b/svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs @@ -71,13 +71,13 @@ pub async fn build_ds( docker_ports_protocol_game_guard.port_name, docker_ports_protocol_game_guard.protocol FROM - db_dynamic_servers.internal_ports + db_ds.internal_ports JOIN - db_dynamic_servers.servers + db_ds.servers ON internal_ports.server_id = servers.server_id JOIN - db_dynamic_servers.docker_ports_protocol_game_guard + db_ds.docker_ports_protocol_game_guard ON internal_ports.server_id = docker_ports_protocol_game_guard.server_id AND diff --git a/svc/pkg/ds/db/servers/Service.toml b/svc/pkg/ds/db/servers/Service.toml index 352986b38d..6926ef1f9b 100644 --- a/svc/pkg/ds/db/servers/Service.toml +++ b/svc/pkg/ds/db/servers/Service.toml @@ -1,5 +1,5 @@ [service] -name = "db-dynamic-servers" +name = "db-ds" [runtime] kind = "crdb" diff --git a/svc/pkg/ds/db/servers/migrations/20240501133910_init.up.sql b/svc/pkg/ds/db/servers/migrations/20240501133910_init.up.sql index 33ad23aadd..1fb9ebbfb8 100644 --- a/svc/pkg/ds/db/servers/migrations/20240501133910_init.up.sql +++ b/svc/pkg/ds/db/servers/migrations/20240501133910_init.up.sql @@ -13,6 +13,7 @@ CREATE TABLE servers ( create_ts INT NOT NULL, start_ts INT, + connectable_ts INT, stop_ts INT, finish_ts INT, cleanup_ts INT, @@ -29,7 +30,6 @@ CREATE TABLE servers ( INDEX (env_id) ); - CREATE TABLE docker_ports_protocol_game_guard ( server_id UUID NOT NULL REFERENCES servers, port_name STRING NOT NULL, diff --git a/svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql b/svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql index dfde0d99c6..ab82f2aba8 100644 --- a/svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql +++ b/svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql @@ -1,10 +1,10 @@ -CREATE INDEX ON db_dynamic_servers.servers (datacenter_id, stop_ts); +CREATE INDEX ON servers (datacenter_id, stop_ts); -CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts); -DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx; +CREATE INDEX ON server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts); +DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx; -CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id) +CREATE INDEX ON server_nomad (nomad_dispatched_job_id) STORING ( nomad_alloc_id, nomad_node_id, @@ -15,4 +15,4 @@ STORING ( nomad_node_public_ipv4, nomad_node_vlan_ipv4 ); -DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx; +DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx; diff --git a/svc/pkg/ds/ops/server-create/Service.toml b/svc/pkg/ds/ops/server-create/Service.toml index a0260ec28c..4c22e9bb5e 100644 --- a/svc/pkg/ds/ops/server-create/Service.toml +++ b/svc/pkg/ds/ops/server-create/Service.toml @@ -7,4 +7,4 @@ kind = "rust" [operation] [databases] -db-dynamic-servers = {} +db-ds = {} diff --git a/svc/pkg/ds/ops/server-create/src/lib.rs b/svc/pkg/ds/ops/server-create/src/lib.rs index ca869961d2..409b9ac9b7 100644 --- a/svc/pkg/ds/ops/server-create/src/lib.rs +++ b/svc/pkg/ds/ops/server-create/src/lib.rs @@ -95,8 +95,8 @@ async fn bind_with_retries( " SELECT EXISTS( SELECT 1 - FROM db_dynamic_servers.servers as r - JOIN db_dynamic_servers.docker_ports_protocol_game_guard as p + FROM db_ds.servers as r + JOIN db_ds.docker_ports_protocol_game_guard as p ON r.server_id = p.server_id WHERE r.cleanup_ts IS NULL AND @@ -183,7 +183,7 @@ pub async fn handle( WITH servers_cte AS ( INSERT INTO - db_dynamic_servers.servers ( + db_ds.servers ( server_id, env_id, datacenter_id, @@ -205,7 +205,7 @@ pub async fn handle( ), docker_ports_host_cte AS ( INSERT INTO - db_dynamic_servers.docker_ports_host ( + db_ds.docker_ports_host ( server_id, port_name, port_number @@ -220,7 +220,7 @@ pub async fn handle( ), docker_ports_protocol_game_guard_cte AS ( INSERT INTO - db_dynamic_servers.docker_ports_protocol_game_guard ( + db_ds.docker_ports_protocol_game_guard ( server_id, port_name, port_number, @@ -1292,7 +1292,7 @@ pub async fn handle( [ctx] " INSERT INTO - db_dynamic_servers.server_nomad (server_id) + db_ds.server_nomad (server_id) VALUES ($1) ", @@ -1339,7 +1339,7 @@ pub async fn handle( [ctx] " UPDATE - db_dynamic_servers.server_nomad + db_ds.server_nomad SET nomad_dispatched_job_id = $2 WHERE @@ -1400,6 +1400,7 @@ pub async fn handle( kill_timeout_ms: ctx.kill_timeout_ms, create_ts, start_ts: None, + connectable_ts: None, destroy_ts: None, args: ctx.args.clone(), environment: ctx.environment.clone(), diff --git a/svc/pkg/ds/ops/server-delete/Service.toml b/svc/pkg/ds/ops/server-delete/Service.toml index 5262383369..d701bee942 100644 --- a/svc/pkg/ds/ops/server-delete/Service.toml +++ b/svc/pkg/ds/ops/server-delete/Service.toml @@ -7,4 +7,4 @@ kind = "rust" [operation] [databases] -db-dynamic-servers = {} +db-ds = {} diff --git a/svc/pkg/ds/ops/server-delete/src/lib.rs b/svc/pkg/ds/ops/server-delete/src/lib.rs index 0a8175680e..1306d13c43 100644 --- a/svc/pkg/ds/ops/server-delete/src/lib.rs +++ b/svc/pkg/ds/ops/server-delete/src/lib.rs @@ -24,7 +24,7 @@ pub async fn handle( let dynamic_server = sql_fetch_one!( [ctx, UpdatedServer, @tx tx] " - UPDATE db_dynamic_servers.servers + UPDATE db_ds.servers SET delete_ts = $2 WHERE server_id = $1 @@ -35,11 +35,11 @@ pub async fn handle( server_nomad.nomad_dispatched_job_id, server_nomad.nomad_alloc_id, FROM - db_dynamic_servers.servers + db_ds.servers JOIN - db_dynamic_servers.server_nomad + db_ds.server_nomad ON - db_dynamic_servers.servers.server_id = db_dynamic_servers.server_nomad.server_id + db_ds.servers.server_id = db_ds.server_nomad.server_id ", server_id, ctx.ts(), diff --git a/svc/pkg/ds/ops/server-get/src/lib.rs b/svc/pkg/ds/ops/server-get/src/lib.rs index 336d3b48ce..2b4f4dfc14 100644 --- a/svc/pkg/ds/ops/server-get/src/lib.rs +++ b/svc/pkg/ds/ops/server-get/src/lib.rs @@ -15,6 +15,7 @@ struct Server { kill_timeout_ms: i64, create_ts: i64, start_ts: Option, + connectable_ts: Option, destroy_ts: Option, image_id: Uuid, args: Vec, @@ -84,13 +85,14 @@ pub async fn handle( kill_timeout_ms, create_ts, start_ts, + connectable_ts, destroy_ts, image_id, args, network_mode, environment FROM - db_dynamic_servers.servers + db_ds.servers WHERE server_id = ANY($1) ", @@ -106,7 +108,7 @@ pub async fn handle( gg_port, protocol FROM - db_dynamic_servers.docker_ports_protocol_game_guard + db_ds.docker_ports_protocol_game_guard WHERE server_id = ANY($1) ", @@ -121,7 +123,7 @@ pub async fn handle( port_number, protocol FROM - db_dynamic_servers.docker_ports_host + db_ds.docker_ports_host WHERE server_id = ANY($1) ", @@ -140,7 +142,7 @@ pub async fn handle( nomad_node_vlan_ipv4, nomad_alloc_plan_ts FROM - db_dynamic_servers.server_nomad + db_ds.server_nomad WHERE server_id = ANY($1) ", @@ -155,7 +157,7 @@ pub async fn handle( nomad_ip, nomad_source FROM - db_dynamic_servers.internal_ports + db_ds.internal_ports WHERE server_id = ANY($1) ", @@ -223,6 +225,7 @@ pub async fn handle( network_ports: ports, create_ts: server.create_ts, start_ts: server.start_ts, + connectable_ts: server.connectable_ts, destroy_ts: server.destroy_ts, }; diff --git a/svc/pkg/ds/ops/server-list-for-env/src/lib.rs b/svc/pkg/ds/ops/server-list-for-env/src/lib.rs index dab785a891..396e0e84f9 100644 --- a/svc/pkg/ds/ops/server-list-for-env/src/lib.rs +++ b/svc/pkg/ds/ops/server-list-for-env/src/lib.rs @@ -13,11 +13,11 @@ async fn handle( " WITH after_server AS ( SELECT create_ts, server_id - FROM db_dynamic_servers.servers + FROM db_ds.servers WHERE server_id = $4 ) SELECT server_id - FROM db_dynamic_servers.servers + FROM db_ds.servers WHERE env_id = $1 AND tags @> $2 diff --git a/svc/pkg/ds/worker/src/workers/mod.rs b/svc/pkg/ds/worker/src/workers/mod.rs index afb0a7df41..7dfe02b161 100644 --- a/svc/pkg/ds/worker/src/workers/mod.rs +++ b/svc/pkg/ds/worker/src/workers/mod.rs @@ -33,7 +33,7 @@ pub async fn webhook_call( // SELECT // server_id // FROM - // db_dynamic_servers.server_nomad + // db_ds.server_nomad // WHERE // nomad_alloc_id = $1 // ", diff --git a/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs index dc06ab9c26..f82c7ec86c 100644 --- a/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs @@ -2,9 +2,13 @@ use chirp_worker::prelude::*; use proto::backend::{self, pkg::*}; use redis::AsyncCommands; use serde::Deserialize; +use std::time::Duration; use crate::workers::NEW_NOMAD_CONFIG; +// TODO: +const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3); + #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct PlanResult { @@ -16,6 +20,7 @@ struct RunRow { server_id: Uuid, datacenter_id: Uuid, stop_ts: Option, + connectable_ts: Option, nomad_alloc_plan_ts: Option, // this was nomad_plan_ts } @@ -92,10 +97,8 @@ async fn worker( tracing::info!("no network returned"); } - // This works - tracing::info!(?ports, "found protsadf"); - - // {"timestamp":"2024-06-28T01:43:24.930496Z","level":"INFO","fields":{"message":"found protsadf","ports":"[Port { label: \"game_testing2\", source: 20202, target: 0, ip: \"10.0.50.97\" }]"},"target":"ds_worker::workers::nomad_monitor_alloc_plan","spans":[{"ray_id":"1c8bfa81-3c80-4a2c-ab7c-2655f6c6a665","req_id":"a44227ad-4f1a-44b8-b4d0-7746dd8a622e","worker_name":"monolith-worker--ds-nomad-monitor-alloc-plan","name":"handle_req"},{"name":"ds-nomad-monitor-alloc-plan","tick_index":0,"name":"handle"}]} + // Wait for Traefik to be ready + tokio::time::sleep(TRAEFIK_GRACE_PERIOD).await; // Fetch the run // @@ -110,7 +113,6 @@ async fn worker( nomad_node_vlan_ipv4: unwrap!(meta.remove("network-vlan-ipv4")), ports: ports.clone(), }; - tokio::time::sleep(std::time::Duration::from_secs(3)).await; let db_output = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| { let ctx = ctx.clone(); let now = ctx.ts(); @@ -171,11 +173,12 @@ async fn update_db( servers.server_id, servers.datacenter_id, servers.stop_ts, + servers.connectable_ts, server_nomad.nomad_alloc_plan_ts FROM - db_dynamic_servers.server_nomad + db_ds.server_nomad INNER JOIN - db_dynamic_servers.servers + db_ds.servers ON servers.server_id = server_nomad.server_id WHERE @@ -186,9 +189,6 @@ async fn update_db( &job_id, ) .await?; - tracing::info!(?job_id, "checking jobid"); - - tracing::info!(?run_row, "ayy event2a"); // Check if run found let run_row = if let Some(run_row) = run_row { @@ -199,7 +199,21 @@ async fn update_db( }; let server_id = run_row.server_id; - tracing::info!("ayy event2b"); + if run_row.connectable_ts.is_some() { + tracing::warn!("connectable ts already set"); + } else { + sql_execute!( + [ctx, @tx tx] + " + UPDATE db_ds.servers + SET connectable_ts = $2 + WHERE server_id = $1 + ", + server_id, + now, + ) + .await?; + } // Write run meta on first plan if run_row.nomad_alloc_plan_ts.is_none() { @@ -208,14 +222,14 @@ async fn update_db( [ctx, @tx tx] " UPDATE - db_dynamic_servers.server_nomad + db_ds.server_nomad SET nomad_alloc_id = $2, nomad_alloc_plan_ts = $3, nomad_node_id = $4, nomad_node_name = $5, nomad_node_public_ipv4 = $6, - nomad_node_vlan_ipv4 = $7 + nomad_node_vlan_ipv4 = $ WHERE server_id = $1 ", @@ -238,7 +252,7 @@ async fn update_db( [ctx, @tx tx] " INSERT INTO - db_dynamic_servers.internal_ports ( + db_ds.internal_ports ( server_id, nomad_label, nomad_source, diff --git a/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_update.rs b/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_update.rs index fa183b90e3..9ce99067a8 100644 --- a/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_update.rs +++ b/svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_update.rs @@ -71,7 +71,7 @@ async fn worker( [ctx, (Uuid,)] " UPDATE - db_dynamic_servers.server_nomad + db_ds.server_nomad SET nomad_alloc_state = $2 WHERE @@ -104,14 +104,14 @@ async fn worker( servers.server_id, servers.start_ts FROM - db_dynamic_servers.server_nomad - INNER JOIN db_dynamic_servers.servers ON servers.server_id = server_nomad.server_id + db_ds.server_nomad + INNER JOIN db_ds.servers ON servers.server_id = server_nomad.server_id WHERE nomad_dispatched_job_id = $1 ), _update_servers AS ( UPDATE - db_dynamic_servers.servers + db_ds.servers SET start_ts = $2 FROM @@ -122,7 +122,7 @@ async fn worker( ), _update_server_nomad AS ( UPDATE - db_dynamic_servers.server_nomad + db_ds.server_nomad SET nomad_alloc_state = $3 FROM @@ -176,14 +176,14 @@ async fn worker( servers.server_id, servers.finish_ts FROM - db_dynamic_servers.server_nomad - INNER JOIN db_dynamic_servers.servers ON servers.server_id = server_nomad.server_id + db_ds.server_nomad + INNER JOIN db_ds.servers ON servers.server_id = server_nomad.server_id WHERE nomad_dispatched_job_id = $1 ), _update_servers AS ( UPDATE - db_dynamic_servers.servers + db_ds.servers SET -- If the job stops immediately, the task state will never be "running" so we need to -- make sure start_ts is set here as well @@ -197,7 +197,7 @@ async fn worker( ), _update_server_nomad AS ( UPDATE - db_dynamic_servers.server_nomad + db_ds.server_nomad SET nomad_alloc_state = $3 FROM