Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions lib/convert/src/impls/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,6 @@ pub fn analytics_lobby_summary_from_lobby(
})
}

// TODO: Remove
// impl ApiTryFrom<mm::lobby_runtime_aggregate::response::RegionTierTime>
// for models::CloudRegionTierExpenses
// {
// type Error = GlobalError;

// fn api_try_from(
// value: mm::lobby_runtime_aggregate::response::RegionTierTime,
// ) -> GlobalResult<Self> {
// let uptime_in_seconds = util::div_up!(value.total_time, 1_000);

// Ok(models::CloudRegionTierExpenses {
// namespace_id: unwrap_ref!(value.namespace_id).as_uuid(),
// region_id: unwrap_ref!(value.region_id).as_uuid(),
// tier_name_id: value.tier_name_id,
// lobby_group_name_id: value.lobby_group_name_id,
// uptime: uptime_in_seconds,
// })
// }
// }

impl ApiTryFrom<backend::game::Game> for models::GameHandle {
type Error = GlobalError;

Expand Down
2 changes: 1 addition & 1 deletion lib/convert/src/impls/portal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use proto::backend::{self, pkg::*};
use proto::backend;
use rivet_operation::prelude::*;
use rivet_portal_server::models;

Expand Down
18 changes: 0 additions & 18 deletions proto/backend/billing.proto

This file was deleted.

173 changes: 29 additions & 144 deletions svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,33 @@ struct LobbyRow {
namespace_id: Uuid,
lobby_id: Option<Uuid>,
region_id: Option<Uuid>,
lobby_group_id: Option<Uuid>,
create_ts: Option<i64>,
stop_ts: Option<i64>,
}

#[derive(Default)]
struct LobbyAggregate {
struct RegionAggregate {
query_start: i64,
query_end: i64,

/// Total time in milliseconds for each (namespace_id, region_id, lobby_group_id)
total_time: HashMap<(Uuid, Uuid, Uuid), i64>,
/// Total time in milliseconds for each (namespace_id, region_id)
total_time: HashMap<(Uuid, Uuid), i64>,

/// Lobbies that are included in the aggregation.
processed_lobby_ids: HashSet<Uuid>,
}

impl LobbyAggregate {
impl RegionAggregate {
fn process_lobby(&mut self, lobby_row: &LobbyRow) {
// Unwrap values or ignore row
let (lobby_id, region_id, lobby_group_id, create_ts) =
if let (Some(a), Some(b), Some(c), Some(d)) = (
lobby_row.lobby_id,
lobby_row.region_id,
lobby_row.lobby_group_id,
lobby_row.create_ts,
) {
(a, b, c, d)
} else {
tracing::warn!(?lobby_row, "missing data in lobby row history");
return;
};
let (lobby_id, region_id, create_ts) = if let (Some(a), Some(b), Some(c)) =
(lobby_row.lobby_id, lobby_row.region_id, lobby_row.create_ts)
{
(a, b, c)
} else {
tracing::warn!(?lobby_row, "missing data in lobby row history");
return;
};

// Check it's not already registered
if self.processed_lobby_ids.contains(&lobby_id) {
Expand All @@ -62,17 +57,10 @@ impl LobbyAggregate {
let duration = i64::min(stop_ts, self.query_end) - i64::max(start_ts, self.query_start);
*self
.total_time
.entry((lobby_row.namespace_id, region_id, lobby_group_id))
.entry((lobby_row.namespace_id, region_id))
.or_insert(0) += duration;
self.processed_lobby_ids.insert(lobby_id);
}

fn lobby_group_ids(&self) -> HashSet<Uuid> {
self.total_time
.iter()
.map(|((_, _, x), _)| *x)
.collect::<HashSet<Uuid>>()
}
}

#[operation(name = "mm-lobby-runtime-aggregate")]
Expand All @@ -86,10 +74,10 @@ async fn handle(
.collect::<Vec<_>>();
tracing::info!(?namespace_ids, "namespaces");

let mut lobby_aggregate = LobbyAggregate {
let mut region_aggregate = RegionAggregate {
query_start: ctx.query_start,
query_end: ctx.query_end,
..LobbyAggregate::default()
..Default::default()
};

// Aggregate all lobbies that finished during the given query span.
Expand All @@ -107,7 +95,7 @@ async fn handle(
let mut lobby_rows = sql_fetch!(
[ctx, LobbyRow, &crdb]
"
SELECT namespace_id, lobby_id, region_id, lobby_group_id, create_ts, stop_ts
SELECT namespace_id, lobby_id, region_id, create_ts, stop_ts
FROM db_mm_state.lobbies AS OF SYSTEM TIME '-5s'
WHERE namespace_id = ANY($1) AND (
-- Lobbies stopped during the query window
Expand All @@ -124,129 +112,26 @@ async fn handle(
);
while let Some(lobby_row) = lobby_rows.next().await {
let lobby_row = lobby_row?;
lobby_aggregate.process_lobby(&lobby_row);
region_aggregate.process_lobby(&lobby_row);
}
tracing::info!(
total_time = ?lobby_aggregate.total_time,
processed_len = ?lobby_aggregate.processed_lobby_ids.len(),
total_time = ?region_aggregate.total_time,
processed_len = ?region_aggregate.processed_lobby_ids.len(),
"aggregated all lobbies"
);

// Look up region tiers for all lobby groups
let lobby_group_ids = lobby_aggregate
.lobby_group_ids()
.into_iter()
.map(Into::<common::Uuid>::into)
.collect::<Vec<_>>();
let lg_resolve_res = op!([ctx] mm_config_lobby_group_resolve_version {
lobby_group_ids: lobby_group_ids.clone(),
})
.await?;
tracing::info!(
lobby_group_ids_len = ?lobby_group_ids.len(),
versions_len = ?lg_resolve_res.versions.len(),
"resolved lobby group versions"
);

let version_ids = lg_resolve_res
.versions
.iter()
.filter_map(|x| x.version_id.as_ref())
.map(common::Uuid::as_uuid)
.collect::<HashSet<_>>()
.into_iter()
.map(Into::<common::Uuid>::into)
.collect::<Vec<_>>();
let version_res = op!([ctx] mm_config_version_get {
version_ids: version_ids.clone(),
})
.await?;
ensure_eq!(
version_ids.len(),
version_res.versions.len(),
"missing version ids"
);
tracing::info!(versions_len = ?version_res.versions.len(), "fetched mm versions");

// Convert responses
let mut tier_aggregates = HashMap::<(Uuid, Uuid, String, &str), i64>::new(); // (namespace_id, region_id, lobby_group_name_id, tier_name_id) -> time (ms)
for ((namespace_id, region_id, lobby_group_id), total_time) in lobby_aggregate.total_time {
let region_id_proto = Some(common::Uuid::from(region_id));
let lgi_proto = Some(common::Uuid::from(lobby_group_id));

// Find the version ID for the lobby group
let version_id_proto = if let Some(version) = lg_resolve_res
.versions
.iter()
.find(|x| x.lobby_group_id == lgi_proto)
{
&version.version_id
} else {
tracing::warn!(%lobby_group_id, "could not find matching version for lobby group");
continue;
};
let version_id = unwrap_ref!(version_id_proto).as_uuid();

// Find the matching version config
let version_res = if let Some(x) = version_res
.versions
.iter()
.find(|x| x.version_id == *version_id_proto)
{
x
} else {
tracing::warn!(%lobby_group_id, %version_id, "could not find matching version config for version id");
continue;
};
let version_config = unwrap_ref!(version_res.config);
let version_meta = unwrap_ref!(version_res.config_meta);

// Resolve the configured tier name ID
let lobby_group_idx = unwrap!(
version_meta
.lobby_groups
.iter()
.enumerate()
.find(|(_, x)| x.lobby_group_id == lgi_proto),
"could not find matching tier"
)
.0;
let lobby_group_config = unwrap!(version_config.lobby_groups.get(lobby_group_idx));
let lobby_group_region = unwrap!(
lobby_group_config
.regions
.iter()
.find(|x| x.region_id == region_id_proto),
"could not find matching region id config"
);
let tier_name_id = lobby_group_region.tier_name_id.as_str();

// Append to region + tier aggregate
*tier_aggregates
.entry((
namespace_id,
region_id,
lobby_group_config.name_id.clone(),
tier_name_id,
))
.or_insert(0) += total_time;
}

// Build response
let region_tier_times = tier_aggregates
let usage = region_aggregate
.total_time
.into_iter()
.map(
|((namespace_id, region_id, lobby_group_name_id, tier_name_id), total_time)| {
mm::lobby_runtime_aggregate::response::RegionTierTime {
namespace_id: Some(namespace_id.into()),
region_id: Some(region_id.into()),
lobby_group_name_id,
tier_name_id: tier_name_id.to_string(),
total_time,
}
},
)
.map(|((namespace_id, region_id), total_time)| {
mm::lobby_runtime_aggregate::response::NamespaceUsage {
namespace_id: Some(namespace_id.into()),
region_id: Some(region_id.into()),
total_time,
}
})
.collect::<Vec<_>>();

Ok(mm::lobby_runtime_aggregate::Response { region_tier_times })
Ok(mm::lobby_runtime_aggregate::Response { usage })
}
14 changes: 7 additions & 7 deletions svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ async fn default(ctx: TestCtx) {
.await
.unwrap();

assert_eq!(res.region_tier_times.len(), 2, "ns not found");
for times in &res.region_tier_times {
assert_eq!(res.usage.len(), 2, "ns not found");
for times in &res.usage {
assert!(times.total_time > 0, "should have time");
}
}
Expand Down Expand Up @@ -82,8 +82,8 @@ async fn missing_columns(ctx: TestCtx) {
.await
.unwrap();

assert_eq!(2, res.region_tier_times.len(), "logs not found");
for times in &res.region_tier_times {
assert_eq!(2, res.usage.len(), "logs not found");
for times in &res.usage {
let times_ns_id = times.namespace_id.as_ref().unwrap().as_uuid();
if times_ns_id == fake_namespace_id {
// TODO: Calculate what the total time should be and that it doesn't
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn out_of_range(ctx: TestCtx) {
.await
.unwrap();

assert!(res.region_tier_times.is_empty(), "range check failed");
assert!(res.usage.is_empty(), "range check failed");
}

#[worker_test]
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn min(ctx: TestCtx) {
.unwrap();

assert_eq!(
res.region_tier_times.first().unwrap().total_time,
res.usage.first().unwrap().total_time,
5,
"minimum check failed"
);
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn max(ctx: TestCtx) {
.unwrap();

assert_eq!(
res.region_tier_times.first().unwrap().total_time,
res.usage.first().unwrap().total_time,
5,
"minimum check failed"
);
Expand Down
10 changes: 4 additions & 6 deletions svc/pkg/mm/types/lobby-runtime-aggregate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ message Request {
}

message Response {
message RegionTierTime {
message NamespaceUsage {
reserved 3, 5;

rivet.common.Uuid namespace_id = 1;
rivet.common.Uuid region_id = 2;
string tier_name_id = 3;
// Use the name ID instead of the UUID since we want to combine the same
// expenses across multiple lobby groups
string lobby_group_name_id = 5;
int64 total_time = 4; // in milliseconds
}

repeated RegionTierTime region_tier_times = 1;
repeated NamespaceUsage usage = 1;
}
25 changes: 0 additions & 25 deletions svc/pkg/team/types/billing-aggregate.proto

This file was deleted.