Skip to content

Commit

Permalink
chore: make logs query consistent with nanoseconds (#862)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->
Fixes RVT-3757
## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 10, 2024
1 parent 1a468d3 commit 4ffab51
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 35 deletions.
4 changes: 2 additions & 2 deletions proto/backend/job/log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ enum StreamType {
}

message LogEntry {
// Timestamp the log was received.
int64 ts = 1;
// Timestamp the log was received (in nanoseconds).
int64 nts = 1;

// Message that was logged.
bytes message = 3;
Expand Down
14 changes: 7 additions & 7 deletions svc/api/cloud/src/route/games/matchmaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn get_lobby_logs(
};

// Timestamp to start the query at
let before_ts = util::timestamp::now() * 1_000_000;
let before_nts = util::timestamp::now() * 1_000_000;

// Get run ID
let run_id = if let Some(x) = get_run_id(&ctx, game_id, lobby_id).await? {
Expand All @@ -170,7 +170,7 @@ pub async fn get_lobby_logs(
return Ok(models::CloudGamesGetLobbyLogsResponse {
lines: Vec::new(),
timestamps: Vec::new(),
watch: WatchResponse::new_as_model(before_ts),
watch: WatchResponse::new_as_model(before_nts),
});
};

Expand All @@ -194,7 +194,7 @@ pub async fn get_lobby_logs(
stream_type: stream_type as i32,
count: 64,
order_asc: false,
query: Some(job_log::read::request::Query::AfterTs(anchor))
query: Some(job_log::read::request::Query::AfterNts(anchor))

})
.await?;
Expand Down Expand Up @@ -226,7 +226,7 @@ pub async fn get_lobby_logs(
stream_type: stream_type as i32,
count: 256,
order_asc: false,
query: Some(job_log::read::request::Query::BeforeTs(before_ts)),
query: Some(job_log::read::request::Query::BeforeNts(before_nts)),
})
.await?
};
Expand All @@ -240,19 +240,19 @@ pub async fn get_lobby_logs(
let mut timestamps = logs_res
.entries
.iter()
.map(|x| x.ts / 1_000_000)
.map(|x| x.nts / 1_000_000)
.map(util::timestamp::to_string)
.collect::<Result<Vec<_>, _>>()?;

// Order desc
lines.reverse();
timestamps.reverse();

let watch_ts = logs_res.entries.first().map_or(before_ts, |x| x.ts);
let watch_nts = logs_res.entries.first().map_or(before_nts, |x| x.nts);
Ok(models::CloudGamesGetLobbyLogsResponse {
lines,
timestamps,
watch: WatchResponse::new_as_model(watch_ts),
watch: WatchResponse::new_as_model(watch_nts),
})
}

Expand Down
41 changes: 21 additions & 20 deletions svc/pkg/job-log/ops/read/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rivet_operation::prelude::*;

#[derive(clickhouse::Row, serde::Deserialize)]
struct LogEntry {
// In nanoseconds
ts: i64,
message: Vec<u8>,
}
Expand All @@ -22,19 +23,19 @@ async fn handle(
job_log::read::request::Query::All(_) => {
query_all(ctx.body(), &clickhouse, run_id, order_by).await?
}
job_log::read::request::Query::BeforeTs(ts) => {
query_before_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await?
job_log::read::request::Query::BeforeNts(nts) => {
query_before_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await?
}
job_log::read::request::Query::AfterTs(ts) => {
query_after_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await?
job_log::read::request::Query::AfterNts(nts) => {
query_after_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await?
}
job_log::read::request::Query::TsRange(query) => {
query_ts_range(
job_log::read::request::Query::NtsRange(query) => {
query_nts_range(
ctx.body(),
&clickhouse,
run_id,
query.after_ts,
query.before_ts,
query.after_nts,
query.before_nts,
order_by,
)
.await?
Expand Down Expand Up @@ -74,11 +75,11 @@ async fn query_all(
Ok(entries)
}

async fn query_before_ts(
async fn query_before_nts(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -94,7 +95,7 @@ async fn query_before_ts(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(ts)
.bind(nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -106,11 +107,11 @@ async fn query_before_ts(
Ok(entries)
}

async fn query_after_ts(
async fn query_after_nts(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -126,7 +127,7 @@ async fn query_after_ts(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(ts)
.bind(nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -138,12 +139,12 @@ async fn query_after_ts(
Ok(entries)
}

async fn query_ts_range(
async fn query_nts_range(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
after_ts: i64,
before_ts: i64,
after_nts: i64,
before_nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -159,8 +160,8 @@ async fn query_ts_range(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(after_ts)
.bind(before_ts)
.bind(after_nts)
.bind(before_nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -174,7 +175,7 @@ async fn query_ts_range(

fn convert_entry(entry: LogEntry) -> backend::job::log::LogEntry {
backend::job::log::LogEntry {
ts: entry.ts,
nts: entry.ts,
message: entry.message,
}
}
12 changes: 6 additions & 6 deletions svc/pkg/job-log/types/read.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import "proto/backend/job/log.proto";
import "proto/common.proto";

message Request {
message TsRangeQuery {
message NtsRangeQuery {
/// Timestamp in nanoseconds
int64 after_ts = 1;
int64 after_nts = 1;
/// Timestamp in nanoseconds
int64 before_ts = 2;
int64 before_nts = 2;
}

rivet.common.Uuid run_id = 1;
Expand All @@ -23,10 +23,10 @@ message Request {
oneof query {
google.protobuf.Empty all = 101;
/// Timestamp in nanoseconds
int64 before_ts = 102;
int64 before_nts = 102;
/// Timestamp in nanoseconds
int64 after_ts = 103;
TsRangeQuery ts_range = 104;
int64 after_nts = 103;
NtsRangeQuery nts_range = 104;
}
}

Expand Down

0 comments on commit 4ffab51

Please sign in to comment.