Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ async fn generate_root(path: &Path) {
let _ = fs::remove_file(pkg.path().join("Cargo.lock")).await;

set_license(&pkg.path().join("Cargo.toml")).await;

let types_path = pkg.path().join("types");
if fs::metadata(&types_path).await.is_ok() {
set_license(&types_path.join("Cargo.toml")).await;
}
} else {
// Check worker
let worker_path = pkg.path().join("worker");
Expand Down
12 changes: 12 additions & 0 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, Workfl

#[derive(Clone)]
pub struct ActivityCtx {
workflow_id: Uuid,
ray_id: Uuid,
name: &'static str,
ts: i64,
Expand All @@ -20,6 +21,7 @@ pub struct ActivityCtx {

impl ActivityCtx {
pub fn new(
workflow_id: Uuid,
db: DatabaseHandle,
conn: &rivet_connection::Connection,
activity_create_ts: i64,
Expand All @@ -42,6 +44,7 @@ impl ActivityCtx {
op_ctx.from_workflow = true;

ActivityCtx {
workflow_id,
ray_id,
name,
ts,
Expand Down Expand Up @@ -76,6 +79,15 @@ impl ActivityCtx {
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
}

pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
self.db
.update_workflow_tags(
self.workflow_id,
tags,
)
.await.map_err(GlobalError::raw)
}
}

impl ActivityCtx {
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl WorkflowCtx {
create_ts: i64,
) -> WorkflowResult<A::Output> {
let ctx = ActivityCtx::new(
self.workflow_id,
self.db.clone(),
&self.conn,
self.create_ts,
Expand Down
5 changes: 5 additions & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub trait Database: Send {
wake_sub_workflow: Option<Uuid>,
error: &str,
) -> WorkflowResult<()>;
async fn update_workflow_tags(
&self,
workflow_id: Uuid,
tags: &serde_json::Value,
) -> WorkflowResult<()>;

async fn commit_workflow_activity_event(
&self,
Expand Down
23 changes: 23 additions & 0 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,29 @@ impl Database for DatabasePostgres {
Ok(())
}

// TODO: Theres nothing preventing this from being able to be called from the workflow ctx also, but for
// now its only in the activity ctx so it isn't called again during workflow retries
async fn update_workflow_tags(
&self,
workflow_id: Uuid,
tags: &serde_json::Value,
) -> WorkflowResult<()> {
sqlx::query(indoc!(
"
UPDATE db_workflow.workflows
SET tags = $2
WHERE workflow_id = $1
",
))
.bind(workflow_id)
.bind(tags)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;

Ok(())
}

async fn commit_workflow_activity_event(
&self,
workflow_id: Uuid,
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
// Implements `Executable` for any tuple size
macro_rules! impl_tuple {
($($args:ident),*) => {
#[::async_trait::async_trait]
#[async_trait::async_trait]
impl<$($args : Executable),*> Executable for ($($args),*) {
type Output = ($(<$args as Executable>::Output),*);

Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod util {

pub use crate::{
activity::Activity,
signal::{Listen, Signal},
signal::{Listen, Signal, join_signal},
ctx::*,
db,
error::{WorkflowError, WorkflowResult},
Expand Down
15 changes: 13 additions & 2 deletions lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,22 @@ pub trait Listen: Sized {
/// ````
#[macro_export]
macro_rules! join_signal {
(pub $join:ident, [$($signals:ident),*]) => {
pub enum $join {
$($signals($signals)),*
}

join_signal!(@ $join, [$($signals),*]);
};
($join:ident, [$($signals:ident),*]) => {
enum $join {
$($signals($signals)),*
}

#[::async_trait::async_trait]
join_signal!(@ $join, [$($signals),*]);
};
(@ $join:ident, [$($signals:ident),*]) => {
#[async_trait::async_trait]
impl Listen for $join {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[$($signals::NAME),*]).await?;
Expand All @@ -73,5 +83,6 @@ macro_rules! join_signal {
}
}
}
}
};
}
pub use join_signal;
2 changes: 1 addition & 1 deletion svc/pkg/mm/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ uuid = { version = "1", features = ["v4", "serde"] }
ip-info = { path = "../../ip/ops/info" }
mm-lobby-list-for-user-id = { path = "../ops/lobby-list-for-user-id" }
region-get = { path = "../../region/ops/get" }
user-identity-get = { path = "../../user-identity/ops/get" }
user-identity-get = { path = "../../user-identity/ops/get" }