From b84001926ef052f37cc3e1f59f50c953f9b8dfa9 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Tue, 18 Jun 2024 19:19:05 +0000 Subject: [PATCH] chore(workflows): clean up internals (#899) ## Changes --- lib/chirp-workflow/core/src/ctx/activity.rs | 4 - lib/chirp-workflow/core/src/ctx/api.rs | 3 +- lib/chirp-workflow/core/src/ctx/operation.rs | 4 - lib/chirp-workflow/core/src/ctx/test.rs | 119 +++++++++++++++++-- lib/chirp-workflow/macros/src/lib.rs | 36 +++--- 5 files changed, 130 insertions(+), 36 deletions(-) diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index c925bb7ee..83ba83364 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -83,10 +83,6 @@ impl ActivityCtx { self.name } - // pub fn timeout(&self) -> Duration { - // self.timeout - // } - pub fn req_id(&self) -> Uuid { self.op_ctx.req_id() } diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs index cb99d82d9..3c084eb4a 100644 --- a/lib/chirp-workflow/core/src/ctx/api.rs +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -34,7 +34,7 @@ impl ApiCtx { ts: i64, name: &'static str, ) -> Self { - let mut op_ctx = rivet_operation::OperationContext::new( + let op_ctx = rivet_operation::OperationContext::new( name.to_string(), std::time::Duration::from_secs(60), conn.clone(), @@ -44,7 +44,6 @@ impl ApiCtx { ts, (), ); - op_ctx.from_workflow = true; ApiCtx { ray_id, diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index 702eba23b..1715ed16f 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -82,10 +82,6 @@ impl OperationCtx { self.name } - // pub fn timeout(&self) -> Duration { - // self.timeout - // } - pub fn req_id(&self) -> Uuid { self.op_ctx.req_id() } diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index eec38775f..a9e0425bf 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -1,4 +1,5 @@ use global_error::{GlobalError, GlobalResult}; +use rivet_pools::prelude::*; use serde::Serialize; use tokio::time::Duration; use uuid::Uuid; @@ -15,7 +16,10 @@ pub struct TestCtx { db: DatabaseHandle, - conn: Option, + conn: rivet_connection::Connection, + + // Backwards compatibility + op_ctx: rivet_operation::OperationContext<()>, } impl TestCtx { @@ -42,6 +46,18 @@ impl TestCtx { Uuid::new_v4(), &service_name, ); + let ts = rivet_util::timestamp::now(); + let req_id = Uuid::new_v4(); + let op_ctx = rivet_operation::OperationContext::new( + service_name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + ts, + (), + ); let db = DatabasePostgres::from_pool(pools.crdb().unwrap()); @@ -50,17 +66,12 @@ impl TestCtx { ray_id, ts: rivet_util::timestamp::now(), db, - conn: Some(conn), + conn, + op_ctx, } } } -impl TestCtx { - pub fn name(&self) -> &str { - &self.name - } -} - impl TestCtx { pub async fn dispatch_workflow(&self, input: I) -> GlobalResult where @@ -158,9 +169,7 @@ impl TestCtx { { let ctx = OperationCtx::new( self.db.clone(), - self.conn - .as_ref() - .expect("ops cannot be triggered from an internal test"), + &self.conn, self.ray_id, self.ts, false, @@ -173,3 +182,91 @@ impl TestCtx { .map_err(GlobalError::raw) } } + +impl TestCtx { + pub fn name(&self) -> &str { + &self.name + } + + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } + + pub fn ray_id(&self) -> Uuid { + self.ray_id + } + + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } + + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } + + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } + + // pub fn perf(&self) -> &chirp_perf::PerfCtx { + // self.conn.perf() + // } + + pub fn trace(&self) -> &[chirp_client::TraceEntry] { + self.conn.trace() + } + + pub fn test(&self) -> bool { + self.trace() + .iter() + .any(|x| x.run_context == chirp_client::RunContext::Test as i32) + } + + pub fn chirp(&self) -> &chirp_client::Client { + self.conn.chirp() + } + + pub fn cache(&self) -> rivet_cache::RequestConfig { + self.conn.cache() + } + + pub fn cache_handle(&self) -> rivet_cache::Cache { + self.conn.cache_handle() + } + + pub async fn crdb(&self) -> Result { + self.conn.crdb().await + } + + pub async fn redis_cache(&self) -> Result { + self.conn.redis_cache().await + } + + pub async fn redis_cdn(&self) -> Result { + self.conn.redis_cdn().await + } + + pub async fn redis_job(&self) -> Result { + self.conn.redis_job().await + } + + pub async fn redis_mm(&self) -> Result { + self.conn.redis_mm().await + } + + pub async fn redis_user_presence(&self) -> Result { + self.conn.redis_user_presence().await + } + + pub async fn clickhouse(&self) -> GlobalResult { + self.conn.clickhouse().await + } + + // Backwards compatibility + pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> { + &self.op_ctx + } +} diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index c07dd5908..c8ca0488f 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -2,8 +2,9 @@ use proc_macro::TokenStream; use quote::{quote, ToTokens}; use syn::{ parse::{Parse, ParseStream}, - parse_macro_input, spanned::Spanned, GenericArgument, Ident, ItemFn, ItemStruct, LitStr, - PathArguments, ReturnType, Type, + parse_macro_input, + spanned::Spanned, + GenericArgument, Ident, ItemFn, ItemStruct, LitStr, PathArguments, ReturnType, Type, }; struct Config { @@ -22,7 +23,10 @@ impl Default for Config { #[proc_macro_attribute] pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream { - let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Workflow".to_string()); + let name = parse_macro_input!(attr as OptionalIdent) + .ident + .map(|x| x.to_string()) + .unwrap_or_else(|| "Workflow".to_string()); let item_fn = parse_macro_input!(item as ItemFn); if let Err(err) = parse_empty_config(&item_fn.attrs) { @@ -134,7 +138,10 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { #[proc_macro_attribute] pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { - let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Operation".to_string()); + let name = parse_macro_input!(attr as OptionalIdent) + .ident + .map(|x| x.to_string()) + .unwrap_or_else(|| "Operation".to_string()); let item_fn = parse_macro_input!(item as ItemFn); let config = match parse_config(&item_fn.attrs) { @@ -235,8 +242,7 @@ fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) - if segment.ident == "GlobalResult" { match &segment.arguments { PathArguments::AngleBracketed(args) => { - if let Some(GenericArgument::Type(ty)) = args.args.first() - { + if let Some(GenericArgument::Type(ty)) = args.args.first() { ty.clone() } else { panic!("Unsupported Result type"); @@ -405,16 +411,16 @@ fn parse_empty_config(attrs: &[syn::Attribute]) -> syn::Result<()> { } struct OptionalIdent { - ident: Option, + ident: Option, } impl Parse for OptionalIdent { - fn parse(input: ParseStream) -> syn::Result { - if input.is_empty() { - Ok(OptionalIdent { ident: None }) - } else { - let ident: Ident = input.parse()?; - Ok(OptionalIdent { ident: Some(ident) }) - } - } + fn parse(input: ParseStream) -> syn::Result { + if input.is_empty() { + Ok(OptionalIdent { ident: None }) + } else { + let ident: Ident = input.parse()?; + Ok(OptionalIdent { ident: Some(ident) }) + } + } }