Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::{
event::combine_events,
};

/// Max amount of workflows pulled from the database with each call to `pull_workflows`.
const MAX_PULLED_WORKFLOWS: i64 = 10;
/// Maximum times a query ran bu this database adapter is retried.
const MAX_QUERY_RETRIES: usize = 16;

pub struct DatabasePostgres {
Expand Down Expand Up @@ -143,7 +146,6 @@ impl Database for DatabasePostgres {
worker_instance_id: Uuid,
filter: &[&str],
) -> WorkflowResult<Vec<PulledWorkflow>> {
// TODO(RVT-3753): include limit on query to allow better workflow spread between nodes?
// Select all workflows that haven't started or that have a wake condition
let workflow_rows = sqlx::query_as::<_, PulledWorkflowRow>(indoc!(
"
Expand Down Expand Up @@ -194,6 +196,7 @@ impl Database for DatabasePostgres {
output IS NOT NULL
)
)
LIMIT $4
RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts
),
-- Update last ping
Expand All @@ -208,6 +211,7 @@ impl Database for DatabasePostgres {
.bind(worker_instance_id)
.bind(filter)
.bind(rivet_util::timestamp::now())
.bind(MAX_PULLED_WORKFLOWS)
.fetch_all(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down