From 538d9b811ecb76ad95d201754745ed0c82224063 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Tue, 18 Jun 2024 19:19:13 +0000 Subject: [PATCH] chore: cleanup runtime aggregate op (#902) ## Changes --- lib/convert/src/impls/cloud/mod.rs | 21 --- lib/convert/src/impls/portal.rs | 2 +- proto/backend/billing.proto | 18 -- .../mm/ops/lobby-runtime-aggregate/src/lib.rs | 173 +++--------------- .../tests/integration.rs | 14 +- .../mm/types/lobby-runtime-aggregate.proto | 10 +- svc/pkg/team/types/billing-aggregate.proto | 25 --- 7 files changed, 41 insertions(+), 222 deletions(-) delete mode 100644 proto/backend/billing.proto delete mode 100644 svc/pkg/team/types/billing-aggregate.proto diff --git a/lib/convert/src/impls/cloud/mod.rs b/lib/convert/src/impls/cloud/mod.rs index 6f0bef36ab..48e0d58868 100644 --- a/lib/convert/src/impls/cloud/mod.rs +++ b/lib/convert/src/impls/cloud/mod.rs @@ -35,27 +35,6 @@ pub fn analytics_lobby_summary_from_lobby( }) } -// TODO: Remove -// impl ApiTryFrom -// for models::CloudRegionTierExpenses -// { -// type Error = GlobalError; - -// fn api_try_from( -// value: mm::lobby_runtime_aggregate::response::RegionTierTime, -// ) -> GlobalResult { -// let uptime_in_seconds = util::div_up!(value.total_time, 1_000); - -// Ok(models::CloudRegionTierExpenses { -// namespace_id: unwrap_ref!(value.namespace_id).as_uuid(), -// region_id: unwrap_ref!(value.region_id).as_uuid(), -// tier_name_id: value.tier_name_id, -// lobby_group_name_id: value.lobby_group_name_id, -// uptime: uptime_in_seconds, -// }) -// } -// } - impl ApiTryFrom for models::GameHandle { type Error = GlobalError; diff --git a/lib/convert/src/impls/portal.rs b/lib/convert/src/impls/portal.rs index 128fb26edb..c0c85dd5cd 100644 --- a/lib/convert/src/impls/portal.rs +++ b/lib/convert/src/impls/portal.rs @@ -1,4 +1,4 @@ -use proto::backend::{self, pkg::*}; +use proto::backend; use rivet_operation::prelude::*; use rivet_portal_server::models; diff --git a/proto/backend/billing.proto b/proto/backend/billing.proto deleted file mode 100644 index 2cde3c0cf0..0000000000 --- a/proto/backend/billing.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; - -package rivet.backend.billing; - -import "proto/common.proto"; - -message GameLobbyMetrics { - rivet.common.Uuid game_id = 1; - repeated RegionTierMetrics metrics = 2; -} - -message RegionTierMetrics { - rivet.common.Uuid namespace_id = 6; - rivet.common.Uuid region_id = 1; - string tier_name_id = 2; - string lobby_group_name_id = 5; - int64 uptime = 4; // in seconds -} diff --git a/svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs b/svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs index f84c55d5bd..e898fd77f0 100644 --- a/svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs +++ b/svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs @@ -10,38 +10,33 @@ struct LobbyRow { namespace_id: Uuid, lobby_id: Option, region_id: Option, - lobby_group_id: Option, create_ts: Option, stop_ts: Option, } #[derive(Default)] -struct LobbyAggregate { +struct RegionAggregate { query_start: i64, query_end: i64, - /// Total time in milliseconds for each (namespace_id, region_id, lobby_group_id) - total_time: HashMap<(Uuid, Uuid, Uuid), i64>, + /// Total time in milliseconds for each (namespace_id, region_id) + total_time: HashMap<(Uuid, Uuid), i64>, /// Lobbies that are included in the aggregation. processed_lobby_ids: HashSet, } -impl LobbyAggregate { +impl RegionAggregate { fn process_lobby(&mut self, lobby_row: &LobbyRow) { // Unwrap values or ignore row - let (lobby_id, region_id, lobby_group_id, create_ts) = - if let (Some(a), Some(b), Some(c), Some(d)) = ( - lobby_row.lobby_id, - lobby_row.region_id, - lobby_row.lobby_group_id, - lobby_row.create_ts, - ) { - (a, b, c, d) - } else { - tracing::warn!(?lobby_row, "missing data in lobby row history"); - return; - }; + let (lobby_id, region_id, create_ts) = if let (Some(a), Some(b), Some(c)) = + (lobby_row.lobby_id, lobby_row.region_id, lobby_row.create_ts) + { + (a, b, c) + } else { + tracing::warn!(?lobby_row, "missing data in lobby row history"); + return; + }; // Check it's not already registered if self.processed_lobby_ids.contains(&lobby_id) { @@ -62,17 +57,10 @@ impl LobbyAggregate { let duration = i64::min(stop_ts, self.query_end) - i64::max(start_ts, self.query_start); *self .total_time - .entry((lobby_row.namespace_id, region_id, lobby_group_id)) + .entry((lobby_row.namespace_id, region_id)) .or_insert(0) += duration; self.processed_lobby_ids.insert(lobby_id); } - - fn lobby_group_ids(&self) -> HashSet { - self.total_time - .iter() - .map(|((_, _, x), _)| *x) - .collect::>() - } } #[operation(name = "mm-lobby-runtime-aggregate")] @@ -86,10 +74,10 @@ async fn handle( .collect::>(); tracing::info!(?namespace_ids, "namespaces"); - let mut lobby_aggregate = LobbyAggregate { + let mut region_aggregate = RegionAggregate { query_start: ctx.query_start, query_end: ctx.query_end, - ..LobbyAggregate::default() + ..Default::default() }; // Aggregate all lobbies that finished during the given query span. @@ -107,7 +95,7 @@ async fn handle( let mut lobby_rows = sql_fetch!( [ctx, LobbyRow, &crdb] " - SELECT namespace_id, lobby_id, region_id, lobby_group_id, create_ts, stop_ts + SELECT namespace_id, lobby_id, region_id, create_ts, stop_ts FROM db_mm_state.lobbies AS OF SYSTEM TIME '-5s' WHERE namespace_id = ANY($1) AND ( -- Lobbies stopped during the query window @@ -124,129 +112,26 @@ async fn handle( ); while let Some(lobby_row) = lobby_rows.next().await { let lobby_row = lobby_row?; - lobby_aggregate.process_lobby(&lobby_row); + region_aggregate.process_lobby(&lobby_row); } tracing::info!( - total_time = ?lobby_aggregate.total_time, - processed_len = ?lobby_aggregate.processed_lobby_ids.len(), + total_time = ?region_aggregate.total_time, + processed_len = ?region_aggregate.processed_lobby_ids.len(), "aggregated all lobbies" ); - // Look up region tiers for all lobby groups - let lobby_group_ids = lobby_aggregate - .lobby_group_ids() - .into_iter() - .map(Into::::into) - .collect::>(); - let lg_resolve_res = op!([ctx] mm_config_lobby_group_resolve_version { - lobby_group_ids: lobby_group_ids.clone(), - }) - .await?; - tracing::info!( - lobby_group_ids_len = ?lobby_group_ids.len(), - versions_len = ?lg_resolve_res.versions.len(), - "resolved lobby group versions" - ); - - let version_ids = lg_resolve_res - .versions - .iter() - .filter_map(|x| x.version_id.as_ref()) - .map(common::Uuid::as_uuid) - .collect::>() - .into_iter() - .map(Into::::into) - .collect::>(); - let version_res = op!([ctx] mm_config_version_get { - version_ids: version_ids.clone(), - }) - .await?; - ensure_eq!( - version_ids.len(), - version_res.versions.len(), - "missing version ids" - ); - tracing::info!(versions_len = ?version_res.versions.len(), "fetched mm versions"); - - // Convert responses - let mut tier_aggregates = HashMap::<(Uuid, Uuid, String, &str), i64>::new(); // (namespace_id, region_id, lobby_group_name_id, tier_name_id) -> time (ms) - for ((namespace_id, region_id, lobby_group_id), total_time) in lobby_aggregate.total_time { - let region_id_proto = Some(common::Uuid::from(region_id)); - let lgi_proto = Some(common::Uuid::from(lobby_group_id)); - - // Find the version ID for the lobby group - let version_id_proto = if let Some(version) = lg_resolve_res - .versions - .iter() - .find(|x| x.lobby_group_id == lgi_proto) - { - &version.version_id - } else { - tracing::warn!(%lobby_group_id, "could not find matching version for lobby group"); - continue; - }; - let version_id = unwrap_ref!(version_id_proto).as_uuid(); - - // Find the matching version config - let version_res = if let Some(x) = version_res - .versions - .iter() - .find(|x| x.version_id == *version_id_proto) - { - x - } else { - tracing::warn!(%lobby_group_id, %version_id, "could not find matching version config for version id"); - continue; - }; - let version_config = unwrap_ref!(version_res.config); - let version_meta = unwrap_ref!(version_res.config_meta); - - // Resolve the configured tier name ID - let lobby_group_idx = unwrap!( - version_meta - .lobby_groups - .iter() - .enumerate() - .find(|(_, x)| x.lobby_group_id == lgi_proto), - "could not find matching tier" - ) - .0; - let lobby_group_config = unwrap!(version_config.lobby_groups.get(lobby_group_idx)); - let lobby_group_region = unwrap!( - lobby_group_config - .regions - .iter() - .find(|x| x.region_id == region_id_proto), - "could not find matching region id config" - ); - let tier_name_id = lobby_group_region.tier_name_id.as_str(); - - // Append to region + tier aggregate - *tier_aggregates - .entry(( - namespace_id, - region_id, - lobby_group_config.name_id.clone(), - tier_name_id, - )) - .or_insert(0) += total_time; - } - // Build response - let region_tier_times = tier_aggregates + let usage = region_aggregate + .total_time .into_iter() - .map( - |((namespace_id, region_id, lobby_group_name_id, tier_name_id), total_time)| { - mm::lobby_runtime_aggregate::response::RegionTierTime { - namespace_id: Some(namespace_id.into()), - region_id: Some(region_id.into()), - lobby_group_name_id, - tier_name_id: tier_name_id.to_string(), - total_time, - } - }, - ) + .map(|((namespace_id, region_id), total_time)| { + mm::lobby_runtime_aggregate::response::NamespaceUsage { + namespace_id: Some(namespace_id.into()), + region_id: Some(region_id.into()), + total_time, + } + }) .collect::>(); - Ok(mm::lobby_runtime_aggregate::Response { region_tier_times }) + Ok(mm::lobby_runtime_aggregate::Response { usage }) } diff --git a/svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs b/svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs index 7431943f8e..49f4d0bf06 100644 --- a/svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs +++ b/svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs @@ -28,8 +28,8 @@ async fn default(ctx: TestCtx) { .await .unwrap(); - assert_eq!(res.region_tier_times.len(), 2, "ns not found"); - for times in &res.region_tier_times { + assert_eq!(res.usage.len(), 2, "ns not found"); + for times in &res.usage { assert!(times.total_time > 0, "should have time"); } } @@ -82,8 +82,8 @@ async fn missing_columns(ctx: TestCtx) { .await .unwrap(); - assert_eq!(2, res.region_tier_times.len(), "logs not found"); - for times in &res.region_tier_times { + assert_eq!(2, res.usage.len(), "logs not found"); + for times in &res.usage { let times_ns_id = times.namespace_id.as_ref().unwrap().as_uuid(); if times_ns_id == fake_namespace_id { // TODO: Calculate what the total time should be and that it doesn't @@ -122,7 +122,7 @@ async fn out_of_range(ctx: TestCtx) { .await .unwrap(); - assert!(res.region_tier_times.is_empty(), "range check failed"); + assert!(res.usage.is_empty(), "range check failed"); } #[worker_test] @@ -171,7 +171,7 @@ async fn min(ctx: TestCtx) { .unwrap(); assert_eq!( - res.region_tier_times.first().unwrap().total_time, + res.usage.first().unwrap().total_time, 5, "minimum check failed" ); @@ -223,7 +223,7 @@ async fn max(ctx: TestCtx) { .unwrap(); assert_eq!( - res.region_tier_times.first().unwrap().total_time, + res.usage.first().unwrap().total_time, 5, "minimum check failed" ); diff --git a/svc/pkg/mm/types/lobby-runtime-aggregate.proto b/svc/pkg/mm/types/lobby-runtime-aggregate.proto index c1651a2e56..01c1deb202 100644 --- a/svc/pkg/mm/types/lobby-runtime-aggregate.proto +++ b/svc/pkg/mm/types/lobby-runtime-aggregate.proto @@ -11,15 +11,13 @@ message Request { } message Response { - message RegionTierTime { + message NamespaceUsage { + reserved 3, 5; + rivet.common.Uuid namespace_id = 1; rivet.common.Uuid region_id = 2; - string tier_name_id = 3; - // Use the name ID instead of the UUID since we want to combine the same - // expenses across multiple lobby groups - string lobby_group_name_id = 5; int64 total_time = 4; // in milliseconds } - repeated RegionTierTime region_tier_times = 1; + repeated NamespaceUsage usage = 1; } diff --git a/svc/pkg/team/types/billing-aggregate.proto b/svc/pkg/team/types/billing-aggregate.proto deleted file mode 100644 index 1e9e5f69e4..0000000000 --- a/svc/pkg/team/types/billing-aggregate.proto +++ /dev/null @@ -1,25 +0,0 @@ -syntax = "proto3"; - -package rivet.backend.pkg.team.billing_aggregate; - -import "proto/common.proto"; -import "proto/backend/billing.proto"; - -message Request { - message TeamBillingRequest { - rivet.common.Uuid team_id = 1; - int64 query_start = 2; - int64 query_end = 3; - } - - repeated TeamBillingRequest teams = 1; -} - -message Response { - message TeamBillingMetrics { - rivet.common.Uuid team_id = 1; - repeated rivet.backend.billing.GameLobbyMetrics games = 2; - } - - repeated TeamBillingMetrics teams = 1; -}