From 5efb04db316c1e10d642f9f9ea4350d51cce1d39 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Wed, 29 May 2024 00:26:52 +0000 Subject: [PATCH] feat: add cf tail event op (#809) ## Changes --- sdks/full/typescript/archive.tgz | 4 +- sdks/runtime/typescript/archive.tgz | 4 +- svc/Cargo.lock | 13 ++ svc/Cargo.toml | 1 + svc/pkg/cf/ops/tail-event-read/Cargo.toml | 17 ++ svc/pkg/cf/ops/tail-event-read/Service.toml | 13 ++ svc/pkg/cf/ops/tail-event-read/src/lib.rs | 166 ++++++++++++++++++ .../ops/tail-event-read/tests/integration.rs | 7 + svc/pkg/cf/types/tail-event-read.proto | 34 ++++ 9 files changed, 255 insertions(+), 4 deletions(-) create mode 100644 svc/pkg/cf/ops/tail-event-read/Cargo.toml create mode 100644 svc/pkg/cf/ops/tail-event-read/Service.toml create mode 100644 svc/pkg/cf/ops/tail-event-read/src/lib.rs create mode 100644 svc/pkg/cf/ops/tail-event-read/tests/integration.rs create mode 100644 svc/pkg/cf/types/tail-event-read.proto diff --git a/sdks/full/typescript/archive.tgz b/sdks/full/typescript/archive.tgz index f35260476c..6c97278c09 100644 --- a/sdks/full/typescript/archive.tgz +++ b/sdks/full/typescript/archive.tgz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ddc8719d23439938e4e8e5ebb89c58b394996f24150954c187f6b3b01d88b697 -size 638614 +oid sha256:ae4a163c17e475b42a1fad61e32001d9bcd1c5f63922ecb50b204ac3f07bcdaa +size 638662 diff --git a/sdks/runtime/typescript/archive.tgz b/sdks/runtime/typescript/archive.tgz index e6c7a7a1cb..3083d18c49 100644 --- a/sdks/runtime/typescript/archive.tgz +++ b/sdks/runtime/typescript/archive.tgz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:22f20214cc64fc04525bd931e4307d48b1c89428ef08944266a4591eea589fe1 -size 371476 +oid sha256:1625363256f6eeb6eff2a15ec4fba51ebdbe713a486b42c6f7ab1a9369e28bf5 +size 371559 diff --git a/svc/Cargo.lock b/svc/Cargo.lock index ee6f2690b9..3194a79654 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -1938,6 +1938,19 @@ dependencies = [ "serde", ] +[[package]] +name = "cf-tail-event-read" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-worker", + "chrono", + "clickhouse", + "prost 0.10.4", + "rivet-operation", + "serde", +] + [[package]] name = "cfg-if" version = "1.0.0" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index 8cf0d285fa..787fc13086 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -48,6 +48,7 @@ members = [ "pkg/cf-custom-hostname/ops/list-for-namespace-id", "pkg/cf-custom-hostname/ops/resolve-hostname", "pkg/cf-custom-hostname/worker", + "pkg/cf/ops/tail-event-read", "pkg/cloud/ops/device-link-create", "pkg/cloud/ops/game-config-create", "pkg/cloud/ops/game-config-get", diff --git a/svc/pkg/cf/ops/tail-event-read/Cargo.toml b/svc/pkg/cf/ops/tail-event-read/Cargo.toml new file mode 100644 index 0000000000..b419110638 --- /dev/null +++ b/svc/pkg/cf/ops/tail-event-read/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "cf-tail-event-read" +version = "0.0.1" +edition = "2021" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +chrono = "0.4" +clickhouse = { version = "0.11.2", features = ["wa-37420", "uuid"] } +prost = "0.10" +rivet-operation = { path = "../../../../../lib/operation/core" } +serde = { version = "1.0", features = ["derive"] } + +[dev-dependencies] +chirp-worker = { path = "../../../../../lib/chirp/worker" } diff --git a/svc/pkg/cf/ops/tail-event-read/Service.toml b/svc/pkg/cf/ops/tail-event-read/Service.toml new file mode 100644 index 0000000000..05d78e84b2 --- /dev/null +++ b/svc/pkg/cf/ops/tail-event-read/Service.toml @@ -0,0 +1,13 @@ +[service] +name = "cf-tail-event-read" + +[runtime] +kind = "rust" + +[operation] + +[secrets] +"clickhouse/users/chirp/password" = {} + +[databases] +db-job-log = {} diff --git a/svc/pkg/cf/ops/tail-event-read/src/lib.rs b/svc/pkg/cf/ops/tail-event-read/src/lib.rs new file mode 100644 index 0000000000..4f78498c42 --- /dev/null +++ b/svc/pkg/cf/ops/tail-event-read/src/lib.rs @@ -0,0 +1,166 @@ +use proto::backend::pkg::*; +use rivet_operation::prelude::*; + +#[derive(clickhouse::Row, serde::Deserialize)] +struct TailEvent { + ts: i64, + tail_event: String, +} + +#[operation(name = "cf-tail-event-read")] +async fn handle( + ctx: OperationContext, +) -> GlobalResult { + let clickhouse = ctx.clickhouse().await?; + + let req_query = unwrap_ref!(ctx.query); + + let order_by = if ctx.order_asc { "ASC" } else { "DESC" }; + + let events = match req_query { + cf::tail_event_read::request::Query::All(_) => { + query_all(ctx.body(), &clickhouse, order_by).await? + } + cf::tail_event_read::request::Query::BeforeTs(ts) => { + query_before_ts(ctx.body(), &clickhouse, *ts, order_by).await? + } + cf::tail_event_read::request::Query::AfterTs(ts) => { + query_after_ts(ctx.body(), &clickhouse, *ts, order_by).await? + } + cf::tail_event_read::request::Query::TsRange(query) => { + query_ts_range( + ctx.body(), + &clickhouse, + query.after_ts, + query.before_ts, + order_by, + ) + .await? + } + }; + + Ok(cf::tail_event_read::Response { events }) +} + +async fn query_all( + req: &cf::tail_event_read::Request, + clickhouse: &ClickHousePool, + order_by: &str, +) -> GlobalResult> { + let mut events_cursor = clickhouse + .query(&formatdoc!( + " + SELECT ts, toJSONString(tail_event) + FROM db_cf_log.cf_tail_events + WHERE script_name = ? + ORDER BY ts {order_by} + LIMIT ? + " + )) + .bind(&req.script_name) + .bind(req.count) + .fetch::()?; + + let mut events = Vec::new(); + while let Some(entry) = events_cursor.next().await? { + events.push(convert_entry(entry)); + } + + Ok(events) +} + +async fn query_before_ts( + req: &cf::tail_event_read::Request, + clickhouse: &ClickHousePool, + ts: i64, + order_by: &str, +) -> GlobalResult> { + let mut events_cursor = clickhouse + .query(&formatdoc!( + " + SELECT ts, toJSONString(tail_event) + FROM db_cf_log.cf_tail_events + WHERE script_name = ? AND ts < fromUnixTimestamp64Milli(?) + ORDER BY ts {order_by} + LIMIT ? + " + )) + .bind(&req.script_name) + .bind(ts) + .bind(req.count) + .fetch::()?; + + let mut events = Vec::new(); + while let Some(entry) = events_cursor.next().await? { + events.push(convert_entry(entry)); + } + + Ok(events) +} + +async fn query_after_ts( + req: &cf::tail_event_read::Request, + clickhouse: &ClickHousePool, + ts: i64, + order_by: &str, +) -> GlobalResult> { + let mut events_cursor = clickhouse + .query(&formatdoc!( + " + SELECT ts, toJSONString(tail_event) + FROM db_cf_log.cf_tail_events + WHERE script_name = ? AND ts > fromUnixTimestamp64Milli(?) + ORDER BY ts {order_by} + LIMIT ? + " + )) + .bind(&req.script_name) + .bind(ts) + .bind(req.count) + .fetch::()?; + + let mut events = Vec::new(); + while let Some(entry) = events_cursor.next().await? { + events.push(convert_entry(entry)); + } + + Ok(events) +} + +async fn query_ts_range( + req: &cf::tail_event_read::Request, + clickhouse: &ClickHousePool, + after_ts: i64, + before_ts: i64, + order_by: &str, +) -> GlobalResult> { + let mut events_cursor = clickhouse + .query(&formatdoc!( + " + SELECT ts, toJSONString(tail_event) + FROM db_cf_log.cf_tail_events + WHERE script_name = ? AND ts > fromUnixTimestamp64Milli(?) AND ts < fromUnixTimestamp64Nano(?) + ORDER BY ts {order_by} + LIMIT ? + " + )) + .bind(&req.script_name) + .bind(after_ts) + .bind(before_ts) + .bind(req.count) + .fetch::()?; + + let mut events = Vec::new(); + while let Some(entry) = events_cursor.next().await? { + events.push(convert_entry(entry)); + } + + Ok(events) +} + +fn convert_entry(entry: TailEvent) -> cf::tail_event_read::TailEvent { + cf::tail_event_read::TailEvent { + ts: entry.ts, + json: entry.tail_event, + } +} diff --git a/svc/pkg/cf/ops/tail-event-read/tests/integration.rs b/svc/pkg/cf/ops/tail-event-read/tests/integration.rs new file mode 100644 index 0000000000..78fdb0e8c2 --- /dev/null +++ b/svc/pkg/cf/ops/tail-event-read/tests/integration.rs @@ -0,0 +1,7 @@ +use chirp_worker::prelude::*; +use proto::backend::{self, pkg::*}; + +#[worker_test] +async fn basic(ctx: TestCtx) { + // TODO: Create opengb env, send request to it, read its tail event, delete env +} diff --git a/svc/pkg/cf/types/tail-event-read.proto b/svc/pkg/cf/types/tail-event-read.proto new file mode 100644 index 0000000000..140fc70974 --- /dev/null +++ b/svc/pkg/cf/types/tail-event-read.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package rivet.backend.pkg.cf.tail_event_read; + +import "google/protobuf/empty.proto"; +import "proto/common.proto"; + +message Request { + message TsRangeQuery { + int64 after_ts = 1; + int64 before_ts = 2; + } + + string script_name = 1; + int64 count = 2; + bool order_asc = 3; + + oneof query { + google.protobuf.Empty all = 101; + int64 before_ts = 102; + int64 after_ts = 103; + TsRangeQuery ts_range = 104; + } +} + +message Response { + repeated TailEvent events = 1; +} + +message TailEvent { + int64 ts = 1; + string json = 2; +} +