Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 36 additions & 42 deletions svc/api/traefik-provider/src/route/game_guard/dynamic_servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,43 @@ pub async fn build_ds(
dc_id: Uuid,
config: &mut types::TraefikConfigResponse,
) -> GlobalResult<()> {
// TODO put in function, clean up
// TODO: remove cache for now
let dynamic_servers = ctx
.cache()
.ttl(60)
.fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| async move {
let rows = sql_fetch_all!(
[ctx, DynamicServer]
"
SELECT
s.server_id,
s.datacenter_id,
ip.nomad_label AS label,
ip.nomad_ip,
ip.nomad_source,
gg.port_number,
gg.gg_port,
gg.port_name,
gg.protocol
FROM db_ds.internal_ports AS ip
JOIN db_ds.servers AS s
ON ip.server_id = s.server_id
JOIN db_ds.docker_ports_protocol_game_guard AS gg
ON
ip.server_id = gg.server_id AND
ip.nomad_label = CONCAT('ds_', gg.port_name)
WHERE
s.datacenter_id = $1 AND
s.destroy_ts IS NULL
",
dc_id
)
.await?;
cache.resolve(&dc_id, rows);

// let dynamic_servers: Option<Vec<DynamicServer>> = ctx
// .cache()
// // TODO: Set this for longer, this should mean that no caching happens
// .ttl(1)
// .fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| {
// let ctx = ctx.clone();
// async move {
let dynamic_servers = sql_fetch_all!(
[ctx, DynamicServer]
"
SELECT
s.server_id,
s.datacenter_id,
ip.nomad_label AS label,
ip.nomad_ip,
ip.nomad_source,
gg.port_number,
gg.gg_port,
gg.port_name,
gg.protocol
FROM db_ds.internal_ports AS ip
JOIN db_ds.servers AS s
ON ip.server_id = s.server_id
JOIN db_ds.docker_ports_protocol_game_guard AS gg
ON
ip.server_id = gg.server_id AND
ip.nomad_label = CONCAT('ds_', gg.port_name)
WHERE
s.datacenter_id = $1 AND
s.stop_ts IS NULL
",
dc_id
)
.await?;
// cache.resolve(&dc_id, rows);

// Ok(cache)
// }
// })
// .await?;
Ok(cache)
})
.await?
.unwrap_or_default();

// Process proxied ports
for dynamic_server in &dynamic_servers {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@

CREATE INDEX ON servers (datacenter_id, stop_ts);

CREATE INDEX ON server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts);
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;

Expand All @@ -16,3 +13,5 @@ STORING (
nomad_node_vlan_ipv4
);
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;

CREATE INDEX ON servers (datacenter_id, stop_ts) STORING (kill_timeout_ms);
18 changes: 9 additions & 9 deletions svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@ struct PlanResult {
#[derive(Debug, sqlx::FromRow)]
struct RunRow {
server_id: Uuid,
datacenter_id: Uuid,
connectable_ts: Option<i64>,
nomad_alloc_plan_ts: Option<i64>, // this was nomad_plan_ts
}

// #[derive(Debug, sqlx::FromRow)]
// struct ProxiedPort {
// target_nomad_port_label: Option<String>,
// ingress_port: i64,
// ingress_hostnames: Vec<String>,
// proxy_protocol: i64,
// ssl_domain_mode: i64,
// }

#[derive(Clone)]
struct RunData {
job_id: String,
Expand Down Expand Up @@ -160,6 +152,7 @@ async fn update_db(
"
SELECT
s.server_id,
s.datacenter_id,
s.connectable_ts,
s.stop_ts,
sn.nomad_alloc_plan_ts
Expand Down Expand Up @@ -248,6 +241,13 @@ async fn update_db(
)
.await?;
}

// Invalidate cache when ports are updated
if !ports.is_empty() {
ctx.cache()
.purge("servers_ports", [run_row.datacenter_id])
.await?;
}
}

Ok(Some(DbOutput { server_id }))
Expand Down
2 changes: 2 additions & 0 deletions svc/pkg/ds/src/workers/nomad_monitor_alloc_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ async fn worker(
}
};

tracing::info!("run pending");

crate::workers::webhook::call(ctx, alloc_id.to_string()).await?;

Ok(())
Expand Down
15 changes: 11 additions & 4 deletions svc/pkg/ds/src/workflows/server/destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ struct UpdateDbInput {

#[derive(Debug, Serialize, Deserialize, Hash, sqlx::FromRow)]
struct UpdateDbOutput {
server_id: Uuid,
datacenter_id: Uuid,
kill_timeout_ms: i64,
dispatched_job_id: Option<String>,
Expand All @@ -84,7 +83,7 @@ struct UpdateDbOutput {
#[activity(UpdateDb)]
async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<UpdateDbOutput> {
// Run in transaction for internal retryability
rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let db_output = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.clone();
let server_id = input.server_id;

Expand All @@ -102,7 +101,6 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
s1.server_id = s2.server_id AND
s2.destroy_ts IS NULL
RETURNING
s1.server_id,
s1.datacenter_id,
s1.kill_timeout_ms,
sn.nomad_dispatched_job_id AS dispatched_job_id,
Expand All @@ -115,7 +113,16 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
}
.boxed()
})
.await
.await?;

// NOTE: This call is infallible because redis is infallible. If it was not, it would be put in its own
// workflow step
// Invalidate cache when server is destroyed
ctx.cache()
.purge("servers_ports", [db_output.datacenter_id])
.await?;

Ok(db_output)
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
7 changes: 7 additions & 0 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
})
.await?;

// NOTE: This call is infallible because redis is infallible. If it was not, it would be put in its own
// workflow step
// Invalidate cache when new server is created
ctx.cache()
.purge("servers_ports", [input.datacenter_id])
.await?;

Ok(())
}

Expand Down