From 4ffab516e794ff2d53dcfe58bacce8199efe8b78 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:54:36 +0000 Subject: [PATCH] chore: make logs query consistent with nanoseconds (#862) Fixes RVT-3757 ## Changes --- proto/backend/job/log.proto | 4 +- svc/api/cloud/src/route/games/matchmaker.rs | 14 +++---- svc/pkg/job-log/ops/read/src/lib.rs | 41 +++++++++++---------- svc/pkg/job-log/types/read.proto | 12 +++--- 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/proto/backend/job/log.proto b/proto/backend/job/log.proto index a0af2fc96..3ae58f849 100644 --- a/proto/backend/job/log.proto +++ b/proto/backend/job/log.proto @@ -10,8 +10,8 @@ enum StreamType { } message LogEntry { - // Timestamp the log was received. - int64 ts = 1; + // Timestamp the log was received (in nanoseconds). + int64 nts = 1; // Message that was logged. bytes message = 3; diff --git a/svc/api/cloud/src/route/games/matchmaker.rs b/svc/api/cloud/src/route/games/matchmaker.rs index c72ae00a6..737716d2b 100644 --- a/svc/api/cloud/src/route/games/matchmaker.rs +++ b/svc/api/cloud/src/route/games/matchmaker.rs @@ -155,7 +155,7 @@ pub async fn get_lobby_logs( }; // Timestamp to start the query at - let before_ts = util::timestamp::now() * 1_000_000; + let before_nts = util::timestamp::now() * 1_000_000; // Get run ID let run_id = if let Some(x) = get_run_id(&ctx, game_id, lobby_id).await? { @@ -170,7 +170,7 @@ pub async fn get_lobby_logs( return Ok(models::CloudGamesGetLobbyLogsResponse { lines: Vec::new(), timestamps: Vec::new(), - watch: WatchResponse::new_as_model(before_ts), + watch: WatchResponse::new_as_model(before_nts), }); }; @@ -194,7 +194,7 @@ pub async fn get_lobby_logs( stream_type: stream_type as i32, count: 64, order_asc: false, - query: Some(job_log::read::request::Query::AfterTs(anchor)) + query: Some(job_log::read::request::Query::AfterNts(anchor)) }) .await?; @@ -226,7 +226,7 @@ pub async fn get_lobby_logs( stream_type: stream_type as i32, count: 256, order_asc: false, - query: Some(job_log::read::request::Query::BeforeTs(before_ts)), + query: Some(job_log::read::request::Query::BeforeNts(before_nts)), }) .await? }; @@ -240,7 +240,7 @@ pub async fn get_lobby_logs( let mut timestamps = logs_res .entries .iter() - .map(|x| x.ts / 1_000_000) + .map(|x| x.nts / 1_000_000) .map(util::timestamp::to_string) .collect::, _>>()?; @@ -248,11 +248,11 @@ pub async fn get_lobby_logs( lines.reverse(); timestamps.reverse(); - let watch_ts = logs_res.entries.first().map_or(before_ts, |x| x.ts); + let watch_nts = logs_res.entries.first().map_or(before_nts, |x| x.nts); Ok(models::CloudGamesGetLobbyLogsResponse { lines, timestamps, - watch: WatchResponse::new_as_model(watch_ts), + watch: WatchResponse::new_as_model(watch_nts), }) } diff --git a/svc/pkg/job-log/ops/read/src/lib.rs b/svc/pkg/job-log/ops/read/src/lib.rs index a18905314..78e65cd08 100644 --- a/svc/pkg/job-log/ops/read/src/lib.rs +++ b/svc/pkg/job-log/ops/read/src/lib.rs @@ -3,6 +3,7 @@ use rivet_operation::prelude::*; #[derive(clickhouse::Row, serde::Deserialize)] struct LogEntry { + // In nanoseconds ts: i64, message: Vec, } @@ -22,19 +23,19 @@ async fn handle( job_log::read::request::Query::All(_) => { query_all(ctx.body(), &clickhouse, run_id, order_by).await? } - job_log::read::request::Query::BeforeTs(ts) => { - query_before_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await? + job_log::read::request::Query::BeforeNts(nts) => { + query_before_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await? } - job_log::read::request::Query::AfterTs(ts) => { - query_after_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await? + job_log::read::request::Query::AfterNts(nts) => { + query_after_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await? } - job_log::read::request::Query::TsRange(query) => { - query_ts_range( + job_log::read::request::Query::NtsRange(query) => { + query_nts_range( ctx.body(), &clickhouse, run_id, - query.after_ts, - query.before_ts, + query.after_nts, + query.before_nts, order_by, ) .await? @@ -74,11 +75,11 @@ async fn query_all( Ok(entries) } -async fn query_before_ts( +async fn query_before_nts( req: &job_log::read::Request, clickhouse: &ClickHousePool, run_id: Uuid, - ts: i64, + nts: i64, order_by: &str, ) -> GlobalResult> { let mut entries_cursor = clickhouse @@ -94,7 +95,7 @@ async fn query_before_ts( .bind(run_id) .bind(&req.task) .bind(req.stream_type as u8) - .bind(ts) + .bind(nts) .bind(req.count) .fetch::()?; @@ -106,11 +107,11 @@ async fn query_before_ts( Ok(entries) } -async fn query_after_ts( +async fn query_after_nts( req: &job_log::read::Request, clickhouse: &ClickHousePool, run_id: Uuid, - ts: i64, + nts: i64, order_by: &str, ) -> GlobalResult> { let mut entries_cursor = clickhouse @@ -126,7 +127,7 @@ async fn query_after_ts( .bind(run_id) .bind(&req.task) .bind(req.stream_type as u8) - .bind(ts) + .bind(nts) .bind(req.count) .fetch::()?; @@ -138,12 +139,12 @@ async fn query_after_ts( Ok(entries) } -async fn query_ts_range( +async fn query_nts_range( req: &job_log::read::Request, clickhouse: &ClickHousePool, run_id: Uuid, - after_ts: i64, - before_ts: i64, + after_nts: i64, + before_nts: i64, order_by: &str, ) -> GlobalResult> { let mut entries_cursor = clickhouse @@ -159,8 +160,8 @@ async fn query_ts_range( .bind(run_id) .bind(&req.task) .bind(req.stream_type as u8) - .bind(after_ts) - .bind(before_ts) + .bind(after_nts) + .bind(before_nts) .bind(req.count) .fetch::()?; @@ -174,7 +175,7 @@ async fn query_ts_range( fn convert_entry(entry: LogEntry) -> backend::job::log::LogEntry { backend::job::log::LogEntry { - ts: entry.ts, + nts: entry.ts, message: entry.message, } } diff --git a/svc/pkg/job-log/types/read.proto b/svc/pkg/job-log/types/read.proto index bba80376a..42f4201bc 100644 --- a/svc/pkg/job-log/types/read.proto +++ b/svc/pkg/job-log/types/read.proto @@ -7,11 +7,11 @@ import "proto/backend/job/log.proto"; import "proto/common.proto"; message Request { - message TsRangeQuery { + message NtsRangeQuery { /// Timestamp in nanoseconds - int64 after_ts = 1; + int64 after_nts = 1; /// Timestamp in nanoseconds - int64 before_ts = 2; + int64 before_nts = 2; } rivet.common.Uuid run_id = 1; @@ -23,10 +23,10 @@ message Request { oneof query { google.protobuf.Empty all = 101; /// Timestamp in nanoseconds - int64 before_ts = 102; + int64 before_nts = 102; /// Timestamp in nanoseconds - int64 after_ts = 103; - TsRangeQuery ts_range = 104; + int64 after_nts = 103; + NtsRangeQuery nts_range = 104; } }