Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/bolt/config/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ pub struct ImageResizing {}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct RivetBilling {
pub dynamic_servers_capacity_price_id: String,
pub indie_price_id: String,
pub studio_price_id: String,
}

#[derive(Serialize, Deserialize, Clone, Debug, Default)]
Expand Down
2 changes: 2 additions & 0 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ impl ServiceContextData {
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::Consumer { .. }
)
};

Expand Down
48 changes: 48 additions & 0 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ impl TestCtx {
Ok(signal_id)
}

/// Waits for a workflow to be triggered with a superset of given input. Strictly for tests.
pub fn observe<W: Workflow>(&self, input: serde_json::Value) -> GlobalResult<ObserveHandle> {
// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;

Ok(ObserveHandle {
db: self.db.clone(),
name: W::NAME,
input: input_val,
ts: rivet_util::timestamp::now(),
})
}

pub async fn op<I>(
&self,
input: I,
Expand Down Expand Up @@ -270,3 +285,36 @@ impl TestCtx {
&self.op_ctx
}
}

pub struct ObserveHandle {
db: DatabaseHandle,
name: &'static str,
input: serde_json::Value,
ts: i64,
}

impl ObserveHandle {
pub async fn next(&mut self) -> GlobalResult<Uuid> {
tracing::info!(name=%self.name, input=?self.input, "observing workflow");
tracing::info!(ts=%self.ts);

let (workflow_id, create_ts) = loop {
if let Some((workflow_id, create_ts)) = self
.db
.poll_workflow(self.name, &self.input, self.ts)
.await
.map_err(GlobalError::raw)?
{
break (workflow_id, create_ts);
}

tokio::time::sleep(Duration::from_millis(200)).await;
};

tracing::info!(name=%self.name, id=?workflow_id, "workflow found");

self.ts = create_ts + 1;

Ok(workflow_id)
}
}
11 changes: 9 additions & 2 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub trait Database: Send {
async fn dispatch_workflow(
&self,
ray_id: Uuid,
id: Uuid,
name: &str,
workflow_id: Uuid,
workflow_name: &str,
input: serde_json::Value,
) -> WorkflowResult<()>;
async fn get_workflow(&self, id: Uuid) -> WorkflowResult<Option<WorkflowRow>>;
Expand Down Expand Up @@ -75,6 +75,13 @@ pub trait Database: Send {
sub_workflow_name: &str,
input: serde_json::Value,
) -> WorkflowResult<()>;

async fn poll_workflow(
&self,
name: &str,
input: &serde_json::Value,
after_ts: i64,
) -> WorkflowResult<Option<(Uuid, i64)>>;
}

#[derive(sqlx::FromRow)]
Expand Down
25 changes: 25 additions & 0 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,29 @@ impl Database for DatabasePostgres {

Ok(())
}

async fn poll_workflow(
&self,
workflow_name: &str,
input: &serde_json::Value,
after_ts: i64,
) -> WorkflowResult<Option<(Uuid, i64)>> {
sqlx::query_as::<_, (Uuid, i64)>(indoc!(
"
SELECT workflow_id, create_ts
FROM db_workflow.workflows
WHERE
workflow_name = $1 AND
-- Subset
input @> $2 AND
create_ts >= $3
",
))
.bind(workflow_name)
.bind(input)
.bind(after_ts)
.fetch_optional(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)
}
}
16 changes: 0 additions & 16 deletions proto/backend/billing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package rivet.backend.billing;

import "proto/common.proto";
import "proto/backend/billing/game_plan.proto";

message GameLobbyMetrics {
rivet.common.Uuid game_id = 1;
Expand All @@ -17,18 +16,3 @@ message RegionTierMetrics {
string lobby_group_name_id = 5;
int64 uptime = 4; // in seconds
}

message Team {
rivet.common.Uuid team_id = 1;
string stripe_customer_id = 2;
optional int64 payment_method_attached_ts = 3;
optional int64 payment_method_valid_ts = 4;
optional int64 payment_failed_ts = 5;
optional int64 payment_succeeded_ts = 6;
}

message Game {
rivet.common.Uuid game_id = 1;
rivet.backend.billing.game_plan.GamePlan plan = 2;
}

15 changes: 0 additions & 15 deletions proto/backend/billing/game_plan.proto

This file was deleted.

7 changes: 0 additions & 7 deletions svc/pkg/foo/worker/src/workflows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,5 @@ async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult<FooOutput> {
.map(|(id,)| id)
.collect();

let user_id = util::uuid::parse("000b3124-91d9-472e-8104-3dcc41e1a74d")?;
let user_get_res = op!([ctx] user_get {
user_ids: vec![user_id.into()],
})
.await?;
let user = unwrap!(user_get_res.users.first());

Ok(FooOutput { ids })
}
8 changes: 6 additions & 2 deletions svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ async fn handle(
.collect::<Vec<common::Uuid>>();

if query_ids.len() as isize == MAX_COUNT {
tracing::warn!("too many find queries, short circuiting to prevent bad things from happening");
return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() })
tracing::warn!(
"too many find queries, short circuiting to prevent bad things from happening"
);
return Ok(mm::lobby_find_lobby_query_list::Response {
query_ids: Vec::new(),
});
}

Ok(mm::lobby_find_lobby_query_list::Response { query_ids })
Expand Down