diff --git a/lib/bolt/config/src/ns.rs b/lib/bolt/config/src/ns.rs index 7fb54d2e29..aa9005a667 100644 --- a/lib/bolt/config/src/ns.rs +++ b/lib/bolt/config/src/ns.rs @@ -715,7 +715,8 @@ pub struct ImageResizing {} #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(deny_unknown_fields)] pub struct RivetBilling { - pub dynamic_servers_capacity_price_id: String, + pub indie_price_id: String, + pub studio_price_id: String, } #[derive(Serialize, Deserialize, Clone, Debug, Default)] diff --git a/lib/bolt/core/src/context/service.rs b/lib/bolt/core/src/context/service.rs index a5439f2258..66e2b3aa61 100644 --- a/lib/bolt/core/src/context/service.rs +++ b/lib/bolt/core/src/context/service.rs @@ -522,6 +522,7 @@ impl ServiceContextData { | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } | ServiceKind::Operations { .. } | ServiceKind::ApiRoutes { .. } + | ServiceKind::Consumer { .. } ) } else { matches!( @@ -529,6 +530,7 @@ impl ServiceContextData { ServiceKind::Database { .. } | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } | ServiceKind::Operations { .. } + | ServiceKind::Consumer { .. } ) }; diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index a9e0425bf6..a61dedab1e 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -159,6 +159,21 @@ impl TestCtx { Ok(signal_id) } + /// Waits for a workflow to be triggered with a superset of given input. Strictly for tests. + pub fn observe(&self, input: serde_json::Value) -> GlobalResult { + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeWorkflowOutput) + .map_err(GlobalError::raw)?; + + Ok(ObserveHandle { + db: self.db.clone(), + name: W::NAME, + input: input_val, + ts: rivet_util::timestamp::now(), + }) + } + pub async fn op( &self, input: I, @@ -270,3 +285,36 @@ impl TestCtx { &self.op_ctx } } + +pub struct ObserveHandle { + db: DatabaseHandle, + name: &'static str, + input: serde_json::Value, + ts: i64, +} + +impl ObserveHandle { + pub async fn next(&mut self) -> GlobalResult { + tracing::info!(name=%self.name, input=?self.input, "observing workflow"); + tracing::info!(ts=%self.ts); + + let (workflow_id, create_ts) = loop { + if let Some((workflow_id, create_ts)) = self + .db + .poll_workflow(self.name, &self.input, self.ts) + .await + .map_err(GlobalError::raw)? + { + break (workflow_id, create_ts); + } + + tokio::time::sleep(Duration::from_millis(200)).await; + }; + + tracing::info!(name=%self.name, id=?workflow_id, "workflow found"); + + self.ts = create_ts + 1; + + Ok(workflow_id) + } +} diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 16b37c1f7e..56f028ea50 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -14,8 +14,8 @@ pub trait Database: Send { async fn dispatch_workflow( &self, ray_id: Uuid, - id: Uuid, - name: &str, + workflow_id: Uuid, + workflow_name: &str, input: serde_json::Value, ) -> WorkflowResult<()>; async fn get_workflow(&self, id: Uuid) -> WorkflowResult>; @@ -75,6 +75,13 @@ pub trait Database: Send { sub_workflow_name: &str, input: serde_json::Value, ) -> WorkflowResult<()>; + + async fn poll_workflow( + &self, + name: &str, + input: &serde_json::Value, + after_ts: i64, + ) -> WorkflowResult>; } #[derive(sqlx::FromRow)] diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 3c5345f756..48a66da9f6 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -505,4 +505,29 @@ impl Database for DatabasePostgres { Ok(()) } + + async fn poll_workflow( + &self, + workflow_name: &str, + input: &serde_json::Value, + after_ts: i64, + ) -> WorkflowResult> { + sqlx::query_as::<_, (Uuid, i64)>(indoc!( + " + SELECT workflow_id, create_ts + FROM db_workflow.workflows + WHERE + workflow_name = $1 AND + -- Subset + input @> $2 AND + create_ts >= $3 + ", + )) + .bind(workflow_name) + .bind(input) + .bind(after_ts) + .fetch_optional(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + } } diff --git a/proto/backend/billing.proto b/proto/backend/billing.proto index 2956f41eb1..2cde3c0cf0 100644 --- a/proto/backend/billing.proto +++ b/proto/backend/billing.proto @@ -3,7 +3,6 @@ syntax = "proto3"; package rivet.backend.billing; import "proto/common.proto"; -import "proto/backend/billing/game_plan.proto"; message GameLobbyMetrics { rivet.common.Uuid game_id = 1; @@ -17,18 +16,3 @@ message RegionTierMetrics { string lobby_group_name_id = 5; int64 uptime = 4; // in seconds } - -message Team { - rivet.common.Uuid team_id = 1; - string stripe_customer_id = 2; - optional int64 payment_method_attached_ts = 3; - optional int64 payment_method_valid_ts = 4; - optional int64 payment_failed_ts = 5; - optional int64 payment_succeeded_ts = 6; -} - -message Game { - rivet.common.Uuid game_id = 1; - rivet.backend.billing.game_plan.GamePlan plan = 2; -} - diff --git a/proto/backend/billing/game_plan.proto b/proto/backend/billing/game_plan.proto deleted file mode 100644 index c8f4f95286..0000000000 --- a/proto/backend/billing/game_plan.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package rivet.backend.billing.game_plan; - -import "proto/common.proto"; - -message GamePlan { - repeated DynamicServersCapacityRegion dynamic_servers_capacity = 1; -} - -message DynamicServersCapacityRegion { - rivet.common.Uuid region_id = 1; - uint64 cores = 2; -} - diff --git a/svc/pkg/foo/worker/src/workflows/test.rs b/svc/pkg/foo/worker/src/workflows/test.rs index 6039859a20..07f56ac690 100644 --- a/svc/pkg/foo/worker/src/workflows/test.rs +++ b/svc/pkg/foo/worker/src/workflows/test.rs @@ -45,12 +45,5 @@ async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult { .map(|(id,)| id) .collect(); - let user_id = util::uuid::parse("000b3124-91d9-472e-8104-3dcc41e1a74d")?; - let user_get_res = op!([ctx] user_get { - user_ids: vec![user_id.into()], - }) - .await?; - let user = unwrap!(user_get_res.users.first()); - Ok(FooOutput { ids }) } diff --git a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs index 429f76a3ce..d162f1d314 100644 --- a/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs +++ b/svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs @@ -23,8 +23,12 @@ async fn handle( .collect::>(); if query_ids.len() as isize == MAX_COUNT { - tracing::warn!("too many find queries, short circuiting to prevent bad things from happening"); - return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() }) + tracing::warn!( + "too many find queries, short circuiting to prevent bad things from happening" + ); + return Ok(mm::lobby_find_lobby_query_list::Response { + query_ids: Vec::new(), + }); } Ok(mm::lobby_find_lobby_query_list::Response { query_ids })