Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make logs query consistent with nanoseconds #862

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions proto/backend/job/log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ enum StreamType {
}

message LogEntry {
// Timestamp the log was received.
int64 ts = 1;
// Timestamp the log was received (in nanoseconds).
int64 nts = 1;

// Message that was logged.
bytes message = 3;
Expand Down
14 changes: 7 additions & 7 deletions svc/api/cloud/src/route/games/matchmaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn get_lobby_logs(
};

// Timestamp to start the query at
let before_ts = util::timestamp::now() * 1_000_000;
let before_nts = util::timestamp::now() * 1_000_000;

// Get run ID
let run_id = if let Some(x) = get_run_id(&ctx, game_id, lobby_id).await? {
Expand All @@ -170,7 +170,7 @@ pub async fn get_lobby_logs(
return Ok(models::CloudGamesGetLobbyLogsResponse {
lines: Vec::new(),
timestamps: Vec::new(),
watch: WatchResponse::new_as_model(before_ts),
watch: WatchResponse::new_as_model(before_nts),
});
};

Expand All @@ -194,7 +194,7 @@ pub async fn get_lobby_logs(
stream_type: stream_type as i32,
count: 64,
order_asc: false,
query: Some(job_log::read::request::Query::AfterTs(anchor))
query: Some(job_log::read::request::Query::AfterNts(anchor))

})
.await?;
Expand Down Expand Up @@ -226,7 +226,7 @@ pub async fn get_lobby_logs(
stream_type: stream_type as i32,
count: 256,
order_asc: false,
query: Some(job_log::read::request::Query::BeforeTs(before_ts)),
query: Some(job_log::read::request::Query::BeforeNts(before_nts)),
})
.await?
};
Expand All @@ -240,19 +240,19 @@ pub async fn get_lobby_logs(
let mut timestamps = logs_res
.entries
.iter()
.map(|x| x.ts / 1_000_000)
.map(|x| x.nts / 1_000_000)
.map(util::timestamp::to_string)
.collect::<Result<Vec<_>, _>>()?;

// Order desc
lines.reverse();
timestamps.reverse();

let watch_ts = logs_res.entries.first().map_or(before_ts, |x| x.ts);
let watch_nts = logs_res.entries.first().map_or(before_nts, |x| x.nts);
Ok(models::CloudGamesGetLobbyLogsResponse {
lines,
timestamps,
watch: WatchResponse::new_as_model(watch_ts),
watch: WatchResponse::new_as_model(watch_nts),
})
}

Expand Down
41 changes: 21 additions & 20 deletions svc/pkg/job-log/ops/read/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rivet_operation::prelude::*;

#[derive(clickhouse::Row, serde::Deserialize)]
struct LogEntry {
// In nanoseconds
ts: i64,
AngelOnFira marked this conversation as resolved.
Show resolved Hide resolved
AngelOnFira marked this conversation as resolved.
Show resolved Hide resolved
message: Vec<u8>,
}
Expand All @@ -22,19 +23,19 @@ async fn handle(
job_log::read::request::Query::All(_) => {
query_all(ctx.body(), &clickhouse, run_id, order_by).await?
}
job_log::read::request::Query::BeforeTs(ts) => {
query_before_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await?
job_log::read::request::Query::BeforeNts(nts) => {
query_before_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await?
}
job_log::read::request::Query::AfterTs(ts) => {
query_after_ts(ctx.body(), &clickhouse, run_id, *ts, order_by).await?
job_log::read::request::Query::AfterNts(nts) => {
query_after_nts(ctx.body(), &clickhouse, run_id, *nts, order_by).await?
}
job_log::read::request::Query::TsRange(query) => {
query_ts_range(
job_log::read::request::Query::NtsRange(query) => {
query_nts_range(
ctx.body(),
&clickhouse,
run_id,
query.after_ts,
query.before_ts,
query.after_nts,
query.before_nts,
order_by,
)
.await?
Expand Down Expand Up @@ -74,11 +75,11 @@ async fn query_all(
Ok(entries)
}

async fn query_before_ts(
async fn query_before_nts(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -94,7 +95,7 @@ async fn query_before_ts(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(ts)
.bind(nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -106,11 +107,11 @@ async fn query_before_ts(
Ok(entries)
}

async fn query_after_ts(
async fn query_after_nts(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -126,7 +127,7 @@ async fn query_after_ts(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(ts)
.bind(nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -138,12 +139,12 @@ async fn query_after_ts(
Ok(entries)
}

async fn query_ts_range(
async fn query_nts_range(
req: &job_log::read::Request,
clickhouse: &ClickHousePool,
run_id: Uuid,
after_ts: i64,
before_ts: i64,
after_nts: i64,
before_nts: i64,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
Expand All @@ -159,8 +160,8 @@ async fn query_ts_range(
.bind(run_id)
.bind(&req.task)
.bind(req.stream_type as u8)
.bind(after_ts)
.bind(before_ts)
.bind(after_nts)
.bind(before_nts)
.bind(req.count)
.fetch::<LogEntry>()?;

Expand All @@ -174,7 +175,7 @@ async fn query_ts_range(

fn convert_entry(entry: LogEntry) -> backend::job::log::LogEntry {
backend::job::log::LogEntry {
ts: entry.ts,
nts: entry.ts,
message: entry.message,
}
}
12 changes: 6 additions & 6 deletions svc/pkg/job-log/types/read.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import "proto/backend/job/log.proto";
import "proto/common.proto";

message Request {
message TsRangeQuery {
message NtsRangeQuery {
/// Timestamp in nanoseconds
int64 after_ts = 1;
int64 after_nts = 1;
/// Timestamp in nanoseconds
int64 before_ts = 2;
int64 before_nts = 2;
}

rivet.common.Uuid run_id = 1;
Expand All @@ -23,10 +23,10 @@ message Request {
oneof query {
google.protobuf.Empty all = 101;
/// Timestamp in nanoseconds
int64 before_ts = 102;
int64 before_nts = 102;
/// Timestamp in nanoseconds
int64 after_ts = 103;
TsRangeQuery ts_range = 104;
int64 after_nts = 103;
NtsRangeQuery nts_range = 104;
}
}

Expand Down
Loading