Skip to content

Commit

Permalink
chore(workflows): clean up internals (#899)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 18, 2024
1 parent 0a0d377 commit b840019
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 36 deletions.
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ impl ActivityCtx {
self.name
}

// pub fn timeout(&self) -> Duration {
// self.timeout
// }

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
3 changes: 1 addition & 2 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ApiCtx {
ts: i64,
name: &'static str,
) -> Self {
let mut op_ctx = rivet_operation::OperationContext::new(
let op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
Expand All @@ -44,7 +44,6 @@ impl ApiCtx {
ts,
(),
);
op_ctx.from_workflow = true;

ApiCtx {
ray_id,
Expand Down
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ impl OperationCtx {
self.name
}

// pub fn timeout(&self) -> Duration {
// self.timeout
// }

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
119 changes: 108 additions & 11 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use serde::Serialize;
use tokio::time::Duration;
use uuid::Uuid;
Expand All @@ -15,7 +16,10 @@ pub struct TestCtx {

db: DatabaseHandle,

conn: Option<rivet_connection::Connection>,
conn: rivet_connection::Connection,

// Backwards compatibility
op_ctx: rivet_operation::OperationContext<()>,
}

impl TestCtx {
Expand All @@ -42,6 +46,18 @@ impl TestCtx {
Uuid::new_v4(),
&service_name,
);
let ts = rivet_util::timestamp::now();
let req_id = Uuid::new_v4();
let op_ctx = rivet_operation::OperationContext::new(
service_name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
ts,
(),
);

let db = DatabasePostgres::from_pool(pools.crdb().unwrap());

Expand All @@ -50,17 +66,12 @@ impl TestCtx {
ray_id,
ts: rivet_util::timestamp::now(),
db,
conn: Some(conn),
conn,
op_ctx,
}
}
}

impl TestCtx {
pub fn name(&self) -> &str {
&self.name
}
}

impl TestCtx {
pub async fn dispatch_workflow<I>(&self, input: I) -> GlobalResult<Uuid>
where
Expand Down Expand Up @@ -158,9 +169,7 @@ impl TestCtx {
{
let ctx = OperationCtx::new(
self.db.clone(),
self.conn
.as_ref()
.expect("ops cannot be triggered from an internal test"),
&self.conn,
self.ray_id,
self.ts,
false,
Expand All @@ -173,3 +182,91 @@ impl TestCtx {
.map_err(GlobalError::raw)
}
}

impl TestCtx {
pub fn name(&self) -> &str {
&self.name
}

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}

pub fn ray_id(&self) -> Uuid {
self.ray_id
}

/// Timestamp at which the request started.
pub fn ts(&self) -> i64 {
self.ts
}

/// Timestamp at which the request was published.
pub fn req_ts(&self) -> i64 {
self.op_ctx.req_ts()
}

/// Time between when the timestamp was processed and when it was published.
pub fn req_dt(&self) -> i64 {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}

pub fn test(&self) -> bool {
self.trace()
.iter()
.any(|x| x.run_context == chirp_client::RunContext::Test as i32)
}

pub fn chirp(&self) -> &chirp_client::Client {
self.conn.chirp()
}

pub fn cache(&self) -> rivet_cache::RequestConfig {
self.conn.cache()
}

pub fn cache_handle(&self) -> rivet_cache::Cache {
self.conn.cache_handle()
}

pub async fn crdb(&self) -> Result<CrdbPool, rivet_pools::Error> {
self.conn.crdb().await
}

pub async fn redis_cache(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_cache().await
}

pub async fn redis_cdn(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_cdn().await
}

pub async fn redis_job(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_job().await
}

pub async fn redis_mm(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_mm().await
}

pub async fn redis_user_presence(&self) -> Result<RedisPool, rivet_pools::Error> {
self.conn.redis_user_presence().await
}

pub async fn clickhouse(&self) -> GlobalResult<ClickHousePool> {
self.conn.clickhouse().await
}

// Backwards compatibility
pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> {
&self.op_ctx
}
}
36 changes: 21 additions & 15 deletions lib/chirp-workflow/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use proc_macro::TokenStream;
use quote::{quote, ToTokens};
use syn::{
parse::{Parse, ParseStream},
parse_macro_input, spanned::Spanned, GenericArgument, Ident, ItemFn, ItemStruct, LitStr,
PathArguments, ReturnType, Type,
parse_macro_input,
spanned::Spanned,
GenericArgument, Ident, ItemFn, ItemStruct, LitStr, PathArguments, ReturnType, Type,
};

struct Config {
Expand All @@ -22,7 +23,10 @@ impl Default for Config {

#[proc_macro_attribute]
pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Workflow".to_string());
let name = parse_macro_input!(attr as OptionalIdent)
.ident
.map(|x| x.to_string())
.unwrap_or_else(|| "Workflow".to_string());
let item_fn = parse_macro_input!(item as ItemFn);

if let Err(err) = parse_empty_config(&item_fn.attrs) {
Expand Down Expand Up @@ -134,7 +138,10 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream {

#[proc_macro_attribute]
pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Operation".to_string());
let name = parse_macro_input!(attr as OptionalIdent)
.ident
.map(|x| x.to_string())
.unwrap_or_else(|| "Operation".to_string());
let item_fn = parse_macro_input!(item as ItemFn);

let config = match parse_config(&item_fn.attrs) {
Expand Down Expand Up @@ -235,8 +242,7 @@ fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) -
if segment.ident == "GlobalResult" {
match &segment.arguments {
PathArguments::AngleBracketed(args) => {
if let Some(GenericArgument::Type(ty)) = args.args.first()
{
if let Some(GenericArgument::Type(ty)) = args.args.first() {
ty.clone()
} else {
panic!("Unsupported Result type");
Expand Down Expand Up @@ -405,16 +411,16 @@ fn parse_empty_config(attrs: &[syn::Attribute]) -> syn::Result<()> {
}

struct OptionalIdent {
ident: Option<Ident>,
ident: Option<Ident>,
}

impl Parse for OptionalIdent {
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
Ok(OptionalIdent { ident: None })
} else {
let ident: Ident = input.parse()?;
Ok(OptionalIdent { ident: Some(ident) })
}
}
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
Ok(OptionalIdent { ident: None })
} else {
let ident: Ident = input.parse()?;
Ok(OptionalIdent { ident: Some(ident) })
}
}
}

0 comments on commit b840019

Please sign in to comment.