Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fern/definition/servers/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ types:
lifecycle: Lifecycle
created_at: long
started_at: optional<long>
connectable_at: optional<long>
destroyed_at: optional<long>

Runtime:
Expand Down
2 changes: 1 addition & 1 deletion lib/convert/src/impls/ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ApiTryFrom<backend::ds::Server> for models::ServersServer {
datacenter: unwrap!(value.datacenter_id).as_uuid(),
cluster: unwrap!(value.cluster_id).as_uuid(),
created_at: value.create_ts,
started_at: value.start_ts,
started_at: value.connectable_ts,
destroyed_at: value.destroy_ts,
tags: Some(to_value(value.tags).unwrap()),
runtime: Box::new(models::ServersRuntime {
Expand Down
1 change: 1 addition & 0 deletions proto/backend/ds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message Server {
int64 kill_timeout_ms = 7;
int64 create_ts = 9;
optional int64 start_ts = 10;
optional int64 connectable_ts = 17;
optional int64 destroy_ts = 11;
rivet.common.Uuid image_id = 12;
repeated string args = 13;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ pub async fn build_ds(
docker_ports_protocol_game_guard.port_name,
docker_ports_protocol_game_guard.protocol
FROM
db_dynamic_servers.internal_ports
db_ds.internal_ports
JOIN
db_dynamic_servers.servers
db_ds.servers
ON
internal_ports.server_id = servers.server_id
JOIN
db_dynamic_servers.docker_ports_protocol_game_guard
db_ds.docker_ports_protocol_game_guard
ON
internal_ports.server_id = docker_ports_protocol_game_guard.server_id
AND
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/db/servers/Service.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[service]
name = "db-dynamic-servers"
name = "db-ds"

[runtime]
kind = "crdb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CREATE TABLE servers (

create_ts INT NOT NULL,
start_ts INT,
connectable_ts INT,
stop_ts INT,
finish_ts INT,
cleanup_ts INT,
Expand All @@ -29,7 +30,6 @@ CREATE TABLE servers (
INDEX (env_id)
);


CREATE TABLE docker_ports_protocol_game_guard (
server_id UUID NOT NULL REFERENCES servers,
port_name STRING NOT NULL,
Expand Down
10 changes: 5 additions & 5 deletions svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

CREATE INDEX ON db_dynamic_servers.servers (datacenter_id, stop_ts);
CREATE INDEX ON servers (datacenter_id, stop_ts);

CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts);
DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx;
CREATE INDEX ON server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts);
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;

CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id)
CREATE INDEX ON server_nomad (nomad_dispatched_job_id)
STORING (
nomad_alloc_id,
nomad_node_id,
Expand All @@ -15,4 +15,4 @@ STORING (
nomad_node_public_ipv4,
nomad_node_vlan_ipv4
);
DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx;
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;
2 changes: 1 addition & 1 deletion svc/pkg/ds/ops/server-create/Service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ kind = "rust"
[operation]

[databases]
db-dynamic-servers = {}
db-ds = {}
15 changes: 8 additions & 7 deletions svc/pkg/ds/ops/server-create/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ async fn bind_with_retries(
"
SELECT EXISTS(
SELECT 1
FROM db_dynamic_servers.servers as r
JOIN db_dynamic_servers.docker_ports_protocol_game_guard as p
FROM db_ds.servers as r
JOIN db_ds.docker_ports_protocol_game_guard as p
ON r.server_id = p.server_id
WHERE
r.cleanup_ts IS NULL AND
Expand Down Expand Up @@ -183,7 +183,7 @@ pub async fn handle(
WITH
servers_cte AS (
INSERT INTO
db_dynamic_servers.servers (
db_ds.servers (
server_id,
env_id,
datacenter_id,
Expand All @@ -205,7 +205,7 @@ pub async fn handle(
),
docker_ports_host_cte AS (
INSERT INTO
db_dynamic_servers.docker_ports_host (
db_ds.docker_ports_host (
server_id,
port_name,
port_number
Expand All @@ -220,7 +220,7 @@ pub async fn handle(
),
docker_ports_protocol_game_guard_cte AS (
INSERT INTO
db_dynamic_servers.docker_ports_protocol_game_guard (
db_ds.docker_ports_protocol_game_guard (
server_id,
port_name,
port_number,
Expand Down Expand Up @@ -1292,7 +1292,7 @@ pub async fn handle(
[ctx]
"
INSERT INTO
db_dynamic_servers.server_nomad (server_id)
db_ds.server_nomad (server_id)
VALUES
($1)
",
Expand Down Expand Up @@ -1339,7 +1339,7 @@ pub async fn handle(
[ctx]
"
UPDATE
db_dynamic_servers.server_nomad
db_ds.server_nomad
SET
nomad_dispatched_job_id = $2
WHERE
Expand Down Expand Up @@ -1400,6 +1400,7 @@ pub async fn handle(
kill_timeout_ms: ctx.kill_timeout_ms,
create_ts,
start_ts: None,
connectable_ts: None,
destroy_ts: None,
args: ctx.args.clone(),
environment: ctx.environment.clone(),
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/ops/server-delete/Service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ kind = "rust"
[operation]

[databases]
db-dynamic-servers = {}
db-ds = {}
8 changes: 4 additions & 4 deletions svc/pkg/ds/ops/server-delete/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn handle(
let dynamic_server = sql_fetch_one!(
[ctx, UpdatedServer, @tx tx]
"
UPDATE db_dynamic_servers.servers
UPDATE db_ds.servers
SET delete_ts = $2
WHERE
server_id = $1
Expand All @@ -35,11 +35,11 @@ pub async fn handle(
server_nomad.nomad_dispatched_job_id,
server_nomad.nomad_alloc_id,
FROM
db_dynamic_servers.servers
db_ds.servers
JOIN
db_dynamic_servers.server_nomad
db_ds.server_nomad
ON
db_dynamic_servers.servers.server_id = db_dynamic_servers.server_nomad.server_id
db_ds.servers.server_id = db_ds.server_nomad.server_id
",
server_id,
ctx.ts(),
Expand Down
13 changes: 8 additions & 5 deletions svc/pkg/ds/ops/server-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct Server {
kill_timeout_ms: i64,
create_ts: i64,
start_ts: Option<i64>,
connectable_ts: Option<i64>,
destroy_ts: Option<i64>,
image_id: Uuid,
args: Vec<String>,
Expand Down Expand Up @@ -84,13 +85,14 @@ pub async fn handle(
kill_timeout_ms,
create_ts,
start_ts,
connectable_ts,
destroy_ts,
image_id,
args,
network_mode,
environment
FROM
db_dynamic_servers.servers
db_ds.servers
WHERE
server_id = ANY($1)
",
Expand All @@ -106,7 +108,7 @@ pub async fn handle(
gg_port,
protocol
FROM
db_dynamic_servers.docker_ports_protocol_game_guard
db_ds.docker_ports_protocol_game_guard
WHERE
server_id = ANY($1)
",
Expand All @@ -121,7 +123,7 @@ pub async fn handle(
port_number,
protocol
FROM
db_dynamic_servers.docker_ports_host
db_ds.docker_ports_host
WHERE
server_id = ANY($1)
",
Expand All @@ -140,7 +142,7 @@ pub async fn handle(
nomad_node_vlan_ipv4,
nomad_alloc_plan_ts
FROM
db_dynamic_servers.server_nomad
db_ds.server_nomad
WHERE
server_id = ANY($1)
",
Expand All @@ -155,7 +157,7 @@ pub async fn handle(
nomad_ip,
nomad_source
FROM
db_dynamic_servers.internal_ports
db_ds.internal_ports
WHERE
server_id = ANY($1)
",
Expand Down Expand Up @@ -223,6 +225,7 @@ pub async fn handle(
network_ports: ports,
create_ts: server.create_ts,
start_ts: server.start_ts,
connectable_ts: server.connectable_ts,
destroy_ts: server.destroy_ts,
};

Expand Down
4 changes: 2 additions & 2 deletions svc/pkg/ds/ops/server-list-for-env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ async fn handle(
"
WITH after_server AS (
SELECT create_ts, server_id
FROM db_dynamic_servers.servers
FROM db_ds.servers
WHERE server_id = $4
)
SELECT server_id
FROM db_dynamic_servers.servers
FROM db_ds.servers
WHERE
env_id = $1
AND tags @> $2
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/worker/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn webhook_call(
// SELECT
// server_id
// FROM
// db_dynamic_servers.server_nomad
// db_ds.server_nomad
// WHERE
// nomad_alloc_id = $1
// ",
Expand Down
42 changes: 28 additions & 14 deletions svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ use chirp_worker::prelude::*;
use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;
use std::time::Duration;

use crate::workers::NEW_NOMAD_CONFIG;

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3);

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct PlanResult {
Expand All @@ -16,6 +20,7 @@ struct RunRow {
server_id: Uuid,
datacenter_id: Uuid,
stop_ts: Option<i64>,
connectable_ts: Option<i64>,
nomad_alloc_plan_ts: Option<i64>, // this was nomad_plan_ts
}

Expand Down Expand Up @@ -92,10 +97,8 @@ async fn worker(
tracing::info!("no network returned");
}

// This works
tracing::info!(?ports, "found protsadf");

// {"timestamp":"2024-06-28T01:43:24.930496Z","level":"INFO","fields":{"message":"found protsadf","ports":"[Port { label: \"game_testing2\", source: 20202, target: 0, ip: \"10.0.50.97\" }]"},"target":"ds_worker::workers::nomad_monitor_alloc_plan","spans":[{"ray_id":"1c8bfa81-3c80-4a2c-ab7c-2655f6c6a665","req_id":"a44227ad-4f1a-44b8-b4d0-7746dd8a622e","worker_name":"monolith-worker--ds-nomad-monitor-alloc-plan","name":"handle_req"},{"name":"ds-nomad-monitor-alloc-plan","tick_index":0,"name":"handle"}]}
// Wait for Traefik to be ready
tokio::time::sleep(TRAEFIK_GRACE_PERIOD).await;

// Fetch the run
//
Expand All @@ -110,7 +113,6 @@ async fn worker(
nomad_node_vlan_ipv4: unwrap!(meta.remove("network-vlan-ipv4")),
ports: ports.clone(),
};
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let db_output = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.clone();
let now = ctx.ts();
Expand Down Expand Up @@ -171,11 +173,12 @@ async fn update_db(
servers.server_id,
servers.datacenter_id,
servers.stop_ts,
servers.connectable_ts,
server_nomad.nomad_alloc_plan_ts
FROM
db_dynamic_servers.server_nomad
db_ds.server_nomad
INNER JOIN
db_dynamic_servers.servers
db_ds.servers
ON
servers.server_id = server_nomad.server_id
WHERE
Expand All @@ -186,9 +189,6 @@ async fn update_db(
&job_id,
)
.await?;
tracing::info!(?job_id, "checking jobid");

tracing::info!(?run_row, "ayy event2a");

// Check if run found
let run_row = if let Some(run_row) = run_row {
Expand All @@ -199,7 +199,21 @@ async fn update_db(
};
let server_id = run_row.server_id;

tracing::info!("ayy event2b");
if run_row.connectable_ts.is_some() {
tracing::warn!("connectable ts already set");
} else {
sql_execute!(
[ctx, @tx tx]
"
UPDATE db_ds.servers
SET connectable_ts = $2
WHERE server_id = $1
",
server_id,
now,
)
.await?;
}

// Write run meta on first plan
if run_row.nomad_alloc_plan_ts.is_none() {
Expand All @@ -208,14 +222,14 @@ async fn update_db(
[ctx, @tx tx]
"
UPDATE
db_dynamic_servers.server_nomad
db_ds.server_nomad
SET
nomad_alloc_id = $2,
nomad_alloc_plan_ts = $3,
nomad_node_id = $4,
nomad_node_name = $5,
nomad_node_public_ipv4 = $6,
nomad_node_vlan_ipv4 = $7
nomad_node_vlan_ipv4 = $
WHERE
server_id = $1
",
Expand All @@ -238,7 +252,7 @@ async fn update_db(
[ctx, @tx tx]
"
INSERT INTO
db_dynamic_servers.internal_ports (
db_ds.internal_ports (
server_id,
nomad_label,
nomad_source,
Expand Down
Loading