Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(workflows): clean up internals #899

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ impl ActivityCtx {
self.name
}

// pub fn timeout(&self) -> Duration {
// self.timeout
// }

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
3 changes: 1 addition & 2 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ApiCtx {
ts: i64,
name: &'static str,
) -> Self {
let mut op_ctx = rivet_operation::OperationContext::new(
let op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
Expand All @@ -44,7 +44,6 @@ impl ApiCtx {
ts,
(),
);
op_ctx.from_workflow = true;

ApiCtx {
ray_id,
Expand Down
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ impl OperationCtx {
self.name
}

// pub fn timeout(&self) -> Duration {
// self.timeout
// }

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
119 changes: 108 additions & 11 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use serde::Serialize;
use tokio::time::Duration;
use uuid::Uuid;
Expand All @@ -15,7 +16,10 @@ pub struct TestCtx {

db: DatabaseHandle,

conn: Option<rivet_connection::Connection>,
conn: rivet_connection::Connection,

// Backwards compatibility
op_ctx: rivet_operation::OperationContext<()>,
}

impl TestCtx {
Expand All @@ -42,6 +46,18 @@ impl TestCtx {
Uuid::new_v4(),
&service_name,
);
let ts = rivet_util::timestamp::now();
let req_id = Uuid::new_v4();
let op_ctx = rivet_operation::OperationContext::new(
service_name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
ts,
(),
);

let db = DatabasePostgres::from_pool(pools.crdb().unwrap());

Expand All @@ -50,17 +66,12 @@ impl TestCtx {
ray_id,
ts: rivet_util::timestamp::now(),
db,
conn: Some(conn),
conn,
op_ctx,
}
}
}

impl TestCtx {
pub fn name(&self) -> &str {
&self.name
}
}

impl TestCtx {
pub async fn dispatch_workflow<I>(&self, input: I) -> GlobalResult<Uuid>
where
Expand Down Expand Up @@ -158,9 +169,7 @@ impl TestCtx {
{
let ctx = OperationCtx::new(
self.db.clone(),
self.conn
.as_ref()
.expect("ops cannot be triggered from an internal test"),
&self.conn,
self.ray_id,
self.ts,
false,
Expand All @@ -173,3 +182,91 @@ impl TestCtx {
.map_err(GlobalError::raw)
}
}

impl TestCtx {
pub fn name(&self) -> &str {
&self.name
}

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}

pub fn ray_id(&self) -> Uuid {
self.ray_id
}

/// Timestamp at which the request started.
pub fn ts(&self) -> i64 {
self.ts
}

/// Timestamp at which the request was published.
pub fn req_ts(&self) -> i64 {
self.op_ctx.req_ts()
}

/// Time between when the timestamp was processed and when it was published.
pub fn req_dt(&self) -> i64 {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}

pub fn test(&self) -> bool {
self.trace()
.iter()
.any(|x| x.run_context == chirp_client::RunContext::Test as i32)
}

pub fn chirp(&self) -> &chirp_client::Client {
self.conn.chirp()
}

pub fn cache(&self) -> rivet_cache::RequestConfig {
self.conn.cache()
}

pub fn cache_handle(&self) -> rivet_cache::Cache {
self.conn.cache_handle()
}

pub async fn crdb(&self) -> Result<CrdbPool, rivet_pools::Error> {
self.conn.crdb().await
}

pub async fn redis_cache(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_cache().await
}

pub async fn redis_cdn(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_cdn().await
}

pub async fn redis_job(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_job().await
}

pub async fn redis_mm(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_mm().await
}

pub async fn redis_user_presence(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_user_presence().await
}

pub async fn clickhouse(&self) -> GlobalResult<ClickHousePool> {
self.conn.clickhouse().await
}

// Backwards compatibility
pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> {
&self.op_ctx
}
}
36 changes: 21 additions & 15 deletions lib/chirp-workflow/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use proc_macro::TokenStream;
use quote::{quote, ToTokens};
use syn::{
parse::{Parse, ParseStream},
parse_macro_input, spanned::Spanned, GenericArgument, Ident, ItemFn, ItemStruct, LitStr,
PathArguments, ReturnType, Type,
parse_macro_input,
spanned::Spanned,
GenericArgument, Ident, ItemFn, ItemStruct, LitStr, PathArguments, ReturnType, Type,
};

struct Config {
Expand All @@ -22,7 +23,10 @@ impl Default for Config {

#[proc_macro_attribute]
pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Workflow".to_string());
let name = parse_macro_input!(attr as OptionalIdent)
.ident
.map(|x| x.to_string())
.unwrap_or_else(|| "Workflow".to_string());
let item_fn = parse_macro_input!(item as ItemFn);

if let Err(err) = parse_empty_config(&item_fn.attrs) {
Expand Down Expand Up @@ -134,7 +138,10 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream {

#[proc_macro_attribute]
pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Operation".to_string());
let name = parse_macro_input!(attr as OptionalIdent)
.ident
.map(|x| x.to_string())
.unwrap_or_else(|| "Operation".to_string());
let item_fn = parse_macro_input!(item as ItemFn);

let config = match parse_config(&item_fn.attrs) {
Expand Down Expand Up @@ -235,8 +242,7 @@ fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) -
if segment.ident == "GlobalResult" {
match &segment.arguments {
PathArguments::AngleBracketed(args) => {
if let Some(GenericArgument::Type(ty)) = args.args.first()
{
if let Some(GenericArgument::Type(ty)) = args.args.first() {
ty.clone()
} else {
panic!("Unsupported Result type");
Expand Down Expand Up @@ -405,16 +411,16 @@ fn parse_empty_config(attrs: &[syn::Attribute]) -> syn::Result<()> {
}

struct OptionalIdent {
ident: Option<Ident>,
ident: Option<Ident>,
}

impl Parse for OptionalIdent {
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
Ok(OptionalIdent { ident: None })
} else {
let ident: Ident = input.parse()?;
Ok(OptionalIdent { ident: Some(ident) })
}
}
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
Ok(OptionalIdent { ident: None })
} else {
let ident: Ident = input.parse()?;
Ok(OptionalIdent { ident: Some(ident) })
}
}
}
Loading