diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 16f60e37d8..0a6be8d793 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -15,6 +15,9 @@ use crate::{ event::combine_events, }; +/// Max amount of workflows pulled from the database with each call to `pull_workflows`. +const MAX_PULLED_WORKFLOWS: i64 = 10; +/// Maximum times a query ran bu this database adapter is retried. const MAX_QUERY_RETRIES: usize = 16; pub struct DatabasePostgres { @@ -143,7 +146,6 @@ impl Database for DatabasePostgres { worker_instance_id: Uuid, filter: &[&str], ) -> WorkflowResult> { - // TODO(RVT-3753): include limit on query to allow better workflow spread between nodes? // Select all workflows that haven't started or that have a wake condition let workflow_rows = sqlx::query_as::<_, PulledWorkflowRow>(indoc!( " @@ -194,6 +196,7 @@ impl Database for DatabasePostgres { output IS NOT NULL ) ) + LIMIT $4 RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts ), -- Update last ping @@ -208,6 +211,7 @@ impl Database for DatabasePostgres { .bind(worker_instance_id) .bind(filter) .bind(rivet_util::timestamp::now()) + .bind(MAX_PULLED_WORKFLOWS) .fetch_all(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx)?;