Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/full/typescript/archive.tgz

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdks/runtime/typescript/archive.tgz

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ members = [
"pkg/cf-custom-hostname/ops/list-for-namespace-id",
"pkg/cf-custom-hostname/ops/resolve-hostname",
"pkg/cf-custom-hostname/worker",
"pkg/cf/ops/tail-event-read",
"pkg/cloud/ops/device-link-create",
"pkg/cloud/ops/game-config-create",
"pkg/cloud/ops/game-config-get",
Expand Down
17 changes: 17 additions & 0 deletions svc/pkg/cf/ops/tail-event-read/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "cf-tail-event-read"
version = "0.0.1"
edition = "2021"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
chirp-client = { path = "../../../../../lib/chirp/client" }
chrono = "0.4"
clickhouse = { version = "0.11.2", features = ["wa-37420", "uuid"] }
prost = "0.10"
rivet-operation = { path = "../../../../../lib/operation/core" }
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
chirp-worker = { path = "../../../../../lib/chirp/worker" }
13 changes: 13 additions & 0 deletions svc/pkg/cf/ops/tail-event-read/Service.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[service]
name = "cf-tail-event-read"

[runtime]
kind = "rust"

[operation]

[secrets]
"clickhouse/users/chirp/password" = {}

[databases]
db-job-log = {}
166 changes: 166 additions & 0 deletions svc/pkg/cf/ops/tail-event-read/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use proto::backend::pkg::*;
use rivet_operation::prelude::*;

#[derive(clickhouse::Row, serde::Deserialize)]
struct TailEvent {
ts: i64,
tail_event: String,
}

#[operation(name = "cf-tail-event-read")]
async fn handle(
ctx: OperationContext<cf::tail_event_read::Request>,
) -> GlobalResult<cf::tail_event_read::Response> {
let clickhouse = ctx.clickhouse().await?;

let req_query = unwrap_ref!(ctx.query);

let order_by = if ctx.order_asc { "ASC" } else { "DESC" };

let events = match req_query {
cf::tail_event_read::request::Query::All(_) => {
query_all(ctx.body(), &clickhouse, order_by).await?
}
cf::tail_event_read::request::Query::BeforeTs(ts) => {
query_before_ts(ctx.body(), &clickhouse, *ts, order_by).await?
}
cf::tail_event_read::request::Query::AfterTs(ts) => {
query_after_ts(ctx.body(), &clickhouse, *ts, order_by).await?
}
cf::tail_event_read::request::Query::TsRange(query) => {
query_ts_range(
ctx.body(),
&clickhouse,
query.after_ts,
query.before_ts,
order_by,
)
.await?
}
};

Ok(cf::tail_event_read::Response { events })
}

async fn query_all(
req: &cf::tail_event_read::Request,
clickhouse: &ClickHousePool,
order_by: &str,
) -> GlobalResult<Vec<cf::tail_event_read::TailEvent>> {
let mut events_cursor = clickhouse
.query(&formatdoc!(
"
SELECT ts, toJSONString(tail_event)
FROM db_cf_log.cf_tail_events
WHERE script_name = ?
ORDER BY ts {order_by}
LIMIT ?
"
))
.bind(&req.script_name)
.bind(req.count)
.fetch::<TailEvent>()?;

let mut events = Vec::new();
while let Some(entry) = events_cursor.next().await? {
events.push(convert_entry(entry));
}

Ok(events)
}

async fn query_before_ts(
req: &cf::tail_event_read::Request,
clickhouse: &ClickHousePool,
ts: i64,
order_by: &str,
) -> GlobalResult<Vec<cf::tail_event_read::TailEvent>> {
let mut events_cursor = clickhouse
.query(&formatdoc!(
"
SELECT ts, toJSONString(tail_event)
FROM db_cf_log.cf_tail_events
WHERE script_name = ? AND ts < fromUnixTimestamp64Milli(?)
ORDER BY ts {order_by}
LIMIT ?
"
))
.bind(&req.script_name)
.bind(ts)
.bind(req.count)
.fetch::<TailEvent>()?;

let mut events = Vec::new();
while let Some(entry) = events_cursor.next().await? {
events.push(convert_entry(entry));
}

Ok(events)
}

async fn query_after_ts(
req: &cf::tail_event_read::Request,
clickhouse: &ClickHousePool,
ts: i64,
order_by: &str,
) -> GlobalResult<Vec<cf::tail_event_read::TailEvent>> {
let mut events_cursor = clickhouse
.query(&formatdoc!(
"
SELECT ts, toJSONString(tail_event)
FROM db_cf_log.cf_tail_events
WHERE script_name = ? AND ts > fromUnixTimestamp64Milli(?)
ORDER BY ts {order_by}
LIMIT ?
"
))
.bind(&req.script_name)
.bind(ts)
.bind(req.count)
.fetch::<TailEvent>()?;

let mut events = Vec::new();
while let Some(entry) = events_cursor.next().await? {
events.push(convert_entry(entry));
}

Ok(events)
}

async fn query_ts_range(
req: &cf::tail_event_read::Request,
clickhouse: &ClickHousePool,
after_ts: i64,
before_ts: i64,
order_by: &str,
) -> GlobalResult<Vec<cf::tail_event_read::TailEvent>> {
let mut events_cursor = clickhouse
.query(&formatdoc!(
"
SELECT ts, toJSONString(tail_event)
FROM db_cf_log.cf_tail_events
WHERE script_name = ? AND ts > fromUnixTimestamp64Milli(?) AND ts < fromUnixTimestamp64Nano(?)
ORDER BY ts {order_by}
LIMIT ?
"
))
.bind(&req.script_name)
.bind(after_ts)
.bind(before_ts)
.bind(req.count)
.fetch::<TailEvent>()?;

let mut events = Vec::new();
while let Some(entry) = events_cursor.next().await? {
events.push(convert_entry(entry));
}

Ok(events)
}

fn convert_entry(entry: TailEvent) -> cf::tail_event_read::TailEvent {
cf::tail_event_read::TailEvent {
ts: entry.ts,
json: entry.tail_event,
}
}
7 changes: 7 additions & 0 deletions svc/pkg/cf/ops/tail-event-read/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use chirp_worker::prelude::*;
use proto::backend::{self, pkg::*};

#[worker_test]
async fn basic(ctx: TestCtx) {
// TODO: Create opengb env, send request to it, read its tail event, delete env
}
34 changes: 34 additions & 0 deletions svc/pkg/cf/types/tail-event-read.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
syntax = "proto3";

package rivet.backend.pkg.cf.tail_event_read;

import "google/protobuf/empty.proto";
import "proto/common.proto";

message Request {
message TsRangeQuery {
int64 after_ts = 1;
int64 before_ts = 2;
}

string script_name = 1;
int64 count = 2;
bool order_asc = 3;

oneof query {
google.protobuf.Empty all = 101;
int64 before_ts = 102;
int64 after_ts = 103;
TsRangeQuery ts_range = 104;
}
}

message Response {
repeated TailEvent events = 1;
}

message TailEvent {
int64 ts = 1;
string json = 2;
}