Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/cache/build/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ impl RequestConfig {
//
// Drop `keys` bc this is not the same as the keys list in `ctx`, so it should not be used
// again.
let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec());
drop(keys);
let mut ctx = GetterCtx::new(base_key.clone().into(), keys);

// Build keys to look up values in Redis
let redis_keys = ctx
Expand Down
2 changes: 2 additions & 0 deletions proto/backend/ds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import "proto/common.proto";
import "proto/backend/captcha.proto";
import "proto/backend/region.proto";

// TODO: Delete file after converting ds-log pkg to native ops

message Server {
reserved 8;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn build_ds(
) -> GlobalResult<()> {
let dynamic_servers = ctx
.cache()
.ttl(60)
.ttl(60_000)
.fetch_one_json("servers_ports", dc_id, |mut cache, dc_id| async move {
let rows = sql_fetch_all!(
[ctx, DynamicServer]
Expand All @@ -65,7 +65,7 @@ pub async fn build_ds(
JOIN db_ds.docker_ports_protocol_game_guard AS gg
ON
ip.server_id = gg.server_id AND
ip.nomad_label = CONCAT('ds_', gg.port_name)
ip.nomad_label = CONCAT('ds_', REPLACE(gg.port_name, '-', '_'))
WHERE
s.datacenter_id = $1 AND
s.destroy_ts IS NULL
Expand Down Expand Up @@ -116,8 +116,6 @@ pub async fn build_ds(
},
);

tracing::info!(?config, "config timeeee");

// TODO: add middleware & services & ports
// TODO: same as jobs, watch out for namespaces
Ok(())
Expand Down
4 changes: 0 additions & 4 deletions svc/api/traefik-provider/src/route/game_guard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,10 @@ pub async fn config(
let mut config = types::TraefikConfigResponse::default();

// Fetch configs and catch any errors
tracing::info!(?config, "traefik config ds");
tracing::info!("asdgaerwvsdfvasdf");

build_ds(&ctx, datacenter, &mut config).await?;
build_job(&ctx, datacenter, &mut config).await?;

tracing::info!(?config, "traefik config ds");

// tracing::info!(
// http_services = ?config.http.services.len(),
// http_routers = config.http.routers.len(),
Expand Down
16 changes: 14 additions & 2 deletions svc/pkg/cluster/src/workflows/server/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn drain_node(ctx: &ActivityCtx, input: &DrainNodeInput) -> GlobalResult<b

if let Some(nomad_node_id) = nomad_node_id {
// Drain complete message is caught by `cluster-nomad-node-drain-complete`
nodes_api::update_node_drain(
let res = nodes_api::update_node_drain(
&NOMAD_CONFIG,
&nomad_node_id,
models::NodeUpdateDrainRequest {
Expand All @@ -102,7 +102,19 @@ async fn drain_node(ctx: &ActivityCtx, input: &DrainNodeInput) -> GlobalResult<b
None,
None,
)
.await?;
.await;

// Catch "node not found" error
if let Err(nomad_client::apis::Error::ResponseError(
nomad_client::apis::ResponseContent { content, .. },
)) = res
{
if content == "node not found" {
tracing::warn!("node does not exist, not draining");

return Ok(false);
}
}

// Prevent new matchmaker requests to the node running on this server
msg!([ctx] mm::msg::nomad_node_closed_set(&nomad_node_id) {
Expand Down
16 changes: 14 additions & 2 deletions svc/pkg/cluster/src/workflows/server/undrain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn undrain_node(ctx: &ActivityCtx, input: &UndrainNodeInput) -> GlobalResu
.await?;

if let Some(nomad_node_id) = nomad_node_id {
nodes_api::update_node_drain(
let res = nodes_api::update_node_drain(
&NOMAD_CONFIG,
&nomad_node_id,
models::NodeUpdateDrainRequest {
Expand All @@ -86,7 +86,19 @@ async fn undrain_node(ctx: &ActivityCtx, input: &UndrainNodeInput) -> GlobalResu
None,
None,
)
.await?;
.await;

// Catch "node not found" error
if let Err(nomad_client::apis::Error::ResponseError(
nomad_client::apis::ResponseContent { content, .. },
)) = res
{
if content == "node not found" {
tracing::warn!("node does not exist, not undraining");

return Ok(());
}
}

// Allow new matchmaker requests to the node running on this server
msg!([ctx] mm::msg::nomad_node_closed_set(&nomad_node_id) {
Expand Down
14 changes: 11 additions & 3 deletions svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use chirp_worker::prelude::*;
use rivet_operation::prelude::proto::backend::{self, pkg::*};
use rivet_operation::prelude::proto::backend::pkg::*;
use serde::Deserialize;

use crate::util::NEW_NOMAD_CONFIG;
Expand Down Expand Up @@ -31,7 +31,15 @@ struct RunData {
nomad_node_name: String,
nomad_node_public_ipv4: String,
nomad_node_vlan_ipv4: String,
ports: Vec<backend::job::Port>,
ports: Vec<Port>,
}

#[derive(Clone, Debug)]
struct Port {
label: String,
source: u32,
target: u32,
ip: String,
}

#[worker(name = "ds-nomad-monitor-alloc-plan")]
Expand Down Expand Up @@ -74,7 +82,7 @@ async fn worker(
for port in dynamic_ports {
// Don't share connect proxy ports
let label = unwrap_ref!(port.label);
ports.push(backend::job::Port {
ports.push(Port {
label: label.clone(),
source: *unwrap_ref!(port.value) as u32,
target: *unwrap_ref!(port.to) as u32,
Expand Down
6 changes: 3 additions & 3 deletions svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ async fn worker(
) -> GlobalResult<()> {
let payload_value = serde_json::from_str::<serde_json::Value>(&ctx.payload_json)?;

let job_id = unwrap!(unwrap!(payload_value.get("JobID"), "eval has no job id").as_str());
let eval_status_raw = unwrap!(unwrap!(payload_value.get("Status")).as_str());

// We can't decode this with serde, so manually deserialize the response
let eval_value = unwrap!(payload_value.get("Evaluation"));

let job_id = unwrap!(unwrap!(eval_value.get("JobID"), "eval has no job id").as_str());
let eval_status_raw = unwrap!(unwrap!(eval_value.get("Status")).as_str());

if !util_job::is_nomad_job_run(job_id) {
tracing::info!(%job_id, "disregarding event");
return Ok(());
Expand Down
Loading