From c5a32a3805dfe4efab498709cda9f70e0bcf5ebf Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Wed, 5 Jun 2024 20:15:05 +0000 Subject: [PATCH] feat(workflows): add worker instance failover (#854) ## Changes --- docs/libraries/workflow/ERRORS.md | 3 + lib/chirp-workflow/core/src/ctx/test.rs | 4 - lib/chirp-workflow/core/src/db/mod.rs | 6 +- lib/chirp-workflow/core/src/db/postgres.rs | 87 +++++++++++-------- lib/chirp-workflow/core/src/registry.rs | 4 + lib/chirp-workflow/core/src/worker.rs | 19 +++- svc/Cargo.lock | 16 ++++ svc/Cargo.toml | 3 +- .../migrations/20240430191643_init.up.sql | 16 ++-- svc/pkg/workflow/standalone/gc/Cargo.toml | 20 +++++ svc/pkg/workflow/standalone/gc/Service.toml | 11 +++ svc/pkg/workflow/standalone/gc/src/lib.rs | 58 +++++++++++++ svc/pkg/workflow/standalone/gc/src/main.rs | 31 +++++++ .../standalone/gc/tests/integration.rs | 17 ++++ 14 files changed, 243 insertions(+), 52 deletions(-) create mode 100644 docs/libraries/workflow/ERRORS.md create mode 100644 svc/pkg/workflow/standalone/gc/Cargo.toml create mode 100644 svc/pkg/workflow/standalone/gc/Service.toml create mode 100644 svc/pkg/workflow/standalone/gc/src/lib.rs create mode 100644 svc/pkg/workflow/standalone/gc/src/main.rs create mode 100644 svc/pkg/workflow/standalone/gc/tests/integration.rs diff --git a/docs/libraries/workflow/ERRORS.md b/docs/libraries/workflow/ERRORS.md new file mode 100644 index 000000000..0cd1c0ed0 --- /dev/null +++ b/docs/libraries/workflow/ERRORS.md @@ -0,0 +1,3 @@ +# Errors + +Only errors from inside of activities will be retried. Errors thrown in the workflow body will not be retried because they will never succeed (the state is consistent up the point of error). diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index 6e89183f8..87d6cd6e8 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use global_error::{GlobalError, GlobalResult}; use serde::Serialize; use tokio::time::Duration; @@ -10,8 +8,6 @@ use crate::{ Workflow, WorkflowError, WorkflowInput, }; -pub type TestCtxHandle = Arc; - pub struct TestCtx { name: String, ray_id: Uuid, diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 102877cf9..dff2d6802 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -19,7 +19,11 @@ pub trait Database: Send { input: serde_json::Value, ) -> WorkflowResult<()>; async fn get_workflow(&self, id: Uuid) -> WorkflowResult>; - async fn pull_workflows(&self, filter: &[&str]) -> WorkflowResult>; + async fn pull_workflows( + &self, + worker_instance_id: Uuid, + filter: &[&str], + ) -> WorkflowResult>; // When a workflow is completed async fn commit_workflow( diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index f9d8c4719..b14968d94 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -10,8 +10,6 @@ use super::{ }; use crate::{schema::ActivityId, WorkflowError, WorkflowResult}; -const NODE_ID: Uuid = Uuid::nil(); - pub struct DatabasePostgres { pool: PgPool, } @@ -99,44 +97,59 @@ impl Database for DatabasePostgres { .map_err(WorkflowError::Sqlx) } - async fn pull_workflows(&self, filter: &[&str]) -> WorkflowResult> { + async fn pull_workflows( + &self, + worker_instance_id: Uuid, + filter: &[&str], + ) -> WorkflowResult> { // TODO(RVT-3753): include limit on query to allow better workflow spread between nodes? // Select all workflows that haven't started or that have a wake condition let rows = sqlx::query_as::<_, PulledWorkflowRow>(indoc!( " - UPDATE db_workflow.workflows as w - -- Assign this node to this workflow - SET node_id = $1 - WHERE - -- Filter - workflow_name = ANY($2) AND - -- Not already complete - output IS NULL AND - -- No assigned node (not running) - node_id IS NULL AND - -- Check for wake condition - ( - wake_immediate OR - wake_deadline_ts IS NOT NULL OR - ( - SELECT true - FROM db_workflow.signals AS s - WHERE s.signal_name = ANY(wake_signals) - LIMIT 1 - ) OR - ( - SELECT true - FROM db_workflow.workflows AS w2 - WHERE - w2.workflow_id = w.wake_sub_workflow_id AND - output IS NOT NULL - ) + WITH + pull_workflows AS ( + UPDATE db_workflow.workflows as w + -- Assign this node to this workflow + SET worker_instance_id = $1 + WHERE + -- Filter + workflow_name = ANY($2) AND + -- Not already complete + output IS NULL AND + -- No assigned node (not running) + worker_instance_id IS NULL AND + -- Check for wake condition + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + ( + SELECT true + FROM db_workflow.signals AS s + WHERE s.signal_name = ANY(wake_signals) + LIMIT 1 + ) OR + ( + SELECT true + FROM db_workflow.workflows AS w2 + WHERE + w2.workflow_id = w.wake_sub_workflow_id AND + output IS NOT NULL + ) + ) + RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts + ), + -- Update last ping + worker_instance_update AS ( + UPSERT INTO db_workflow.worker_instances (worker_instance_id, last_ping_ts) + VALUES ($1, $3) + RETURNING 1 ) - RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts + SELECT * FROM pull_workflows ", )) - .bind(NODE_ID) + .bind(worker_instance_id) .bind(filter) + .bind(rivet_util::timestamp::now()) .fetch_all(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx)?; @@ -199,12 +212,12 @@ impl Database for DatabasePostgres { sqlx::query_as::<_, SubWorkflowEventRow>(indoc!( " SELECT - sw.workflow_id, sw.location, sw.sub_workflow_id, w.name as sub_workflow_name + sw.workflow_id, sw.location, sw.sub_workflow_id, w.workflow_name AS sub_workflow_name FROM db_workflow.workflow_sub_workflow_events AS sw JOIN db_workflow.workflows AS w ON sw.sub_workflow_id = w.workflow_id - WHERE workflow_id = ANY($1) - ORDER BY workflow_id, location ASC + WHERE sw.workflow_id = ANY($1) + ORDER BY sw.workflow_id, sw.location ASC ", )) .bind(&workflow_ids) @@ -274,7 +287,7 @@ impl Database for DatabasePostgres { " UPDATE db_workflow.workflows SET - node_id = NULL, + worker_instance_id = NULL, wake_immediate = $2, wake_deadline_ts = $3, wake_signals = $4, @@ -307,7 +320,7 @@ impl Database for DatabasePostgres { UPSERT INTO db_workflow.workflow_activity_events ( workflow_id, location, activity_name, input_hash, input, output ) - VALUES ($1, $2, $3, $4, $5) + VALUES ($1, $2, $3, $4, $5, $6) ", )) .bind(workflow_id) diff --git a/lib/chirp-workflow/core/src/registry.rs b/lib/chirp-workflow/core/src/registry.rs index 60750e40d..6d976ec53 100644 --- a/lib/chirp-workflow/core/src/registry.rs +++ b/lib/chirp-workflow/core/src/registry.rs @@ -75,6 +75,10 @@ impl Registry { .get(name) .ok_or(WorkflowError::WorkflowMissingFromRegistry(name.to_string())) } + + pub fn size(&self) -> usize { + self.workflows.len() + } } pub struct RegistryWorkflow { diff --git a/lib/chirp-workflow/core/src/worker.rs b/lib/chirp-workflow/core/src/worker.rs index f327bcc9e..4f9710f7d 100644 --- a/lib/chirp-workflow/core/src/worker.rs +++ b/lib/chirp-workflow/core/src/worker.rs @@ -1,6 +1,7 @@ use global_error::GlobalResult; use tokio::time::Duration; use tracing::Instrument; +use uuid::Uuid; use crate::{util, DatabaseHandle, RegistryHandle, WorkflowCtx}; @@ -10,16 +11,27 @@ const TICK_INTERVAL: Duration = Duration::from_millis(50); /// that are registered in its registry. After pulling, the workflows are ran and their state is written to /// the database. pub struct Worker { + worker_instance_id: Uuid, registry: RegistryHandle, db: DatabaseHandle, } impl Worker { pub fn new(registry: RegistryHandle, db: DatabaseHandle) -> Self { - Worker { registry, db } + Worker { + worker_instance_id: Uuid::new_v4(), + registry, + db, + } } pub async fn start(mut self, pools: rivet_pools::Pools) -> GlobalResult<()> { + tracing::info!( + worker_instance_id=?self.worker_instance_id, + "starting worker instance with {} registered workflows", + self.registry.size(), + ); + let mut interval = tokio::time::interval(TICK_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -49,7 +61,10 @@ impl Worker { .collect::>(); // Query awake workflows - let workflows = self.db.pull_workflows(®istered_workflows).await?; + let workflows = self + .db + .pull_workflows(self.worker_instance_id, ®istered_workflows) + .await?; for workflow in workflows { let conn = util::new_conn( &shared_client, diff --git a/svc/Cargo.lock b/svc/Cargo.lock index 992415e5a..c0ee8ba14 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -10451,6 +10451,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "workflow-gc" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-worker", + "rivet-connection", + "rivet-health-checks", + "rivet-metrics", + "rivet-operation", + "rivet-runtime", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index 2a09ef4da..750404838 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -261,7 +261,8 @@ members = [ "pkg/user/ops/token-create", "pkg/user/standalone/delete-pending", "pkg/user/standalone/search-user-gc", - "pkg/user/worker" + "pkg/user/worker", + "pkg/workflow/standalone/gc" ] # Speed up compilation diff --git a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql index 452d78dfb..1c4ddca29 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql @@ -1,17 +1,16 @@ -CREATE TABLE nodes ( - node_id UUID PRIMARY KEY, +CREATE TABLE worker_instances ( + worker_instance_id UUID PRIMARY KEY, last_ping_ts INT ); --- TODO: In the event of a node failure, clear all of the wake conditions and remove the node id. This can be --- done in a periodic GC service +-- NOTE: If a row has `worker_instance_id` set and `output` unset, it is currently running CREATE TABLE workflows ( workflow_id UUID PRIMARY KEY, workflow_name TEXT NOT NULL, create_ts INT NOT NULL, ray_id UUID NOT NULL, - -- The node that's running this workflow - node_id UUID, + -- The worker instance that's running this workflow + worker_instance_id UUID, input JSONB NOT NULL, -- Null if incomplete @@ -24,7 +23,10 @@ CREATE TABLE workflows ( INDEX (wake_immediate), INDEX (wake_deadline_ts), - INDEX (wake_sub_workflow_id) + INDEX (wake_sub_workflow_id), + + -- Query by worker_instance_id for failover + INDEX(worker_instance_id) ); CREATE INDEX gin_workflows_wake_signals diff --git a/svc/pkg/workflow/standalone/gc/Cargo.toml b/svc/pkg/workflow/standalone/gc/Cargo.toml new file mode 100644 index 000000000..cc5a7a739 --- /dev/null +++ b/svc/pkg/workflow/standalone/gc/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "workflow-gc" +version = "0.0.1" +edition = "2021" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +rivet-connection = { path = "../../../../../lib/connection" } +rivet-health-checks = { path = "../../../../../lib/health-checks" } +rivet-metrics = { path = "../../../../../lib/metrics" } +rivet-operation = { path = "../../../../../lib/operation/core" } +rivet-runtime = { path = "../../../../../lib/runtime" } +tokio = { version = "1.29", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } + +[dev-dependencies] +chirp-worker = { path = "../../../../../lib/chirp/worker" } diff --git a/svc/pkg/workflow/standalone/gc/Service.toml b/svc/pkg/workflow/standalone/gc/Service.toml new file mode 100644 index 000000000..d2f367a8e --- /dev/null +++ b/svc/pkg/workflow/standalone/gc/Service.toml @@ -0,0 +1,11 @@ +[service] +name = "workflow-gc" + +[runtime] +kind = "rust" + +[headless] +singleton = true + +[databases] +db-workflow = {} diff --git a/svc/pkg/workflow/standalone/gc/src/lib.rs b/svc/pkg/workflow/standalone/gc/src/lib.rs new file mode 100644 index 000000000..cdc962d9e --- /dev/null +++ b/svc/pkg/workflow/standalone/gc/src/lib.rs @@ -0,0 +1,58 @@ +use std::{collections::HashSet, time::Duration}; + +use rivet_operation::prelude::*; + +const WORKER_INSTANCE_LOST_THRESHOLD: i64 = util::duration::seconds(30); + +#[tracing::instrument(skip_all)] +pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()> { + let client = chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("workflow-gc"); + let cache = rivet_cache::CacheInner::from_env(pools.clone())?; + let ctx = OperationContext::new( + "workflow-gc".into(), + Duration::from_secs(60), + rivet_connection::Connection::new(client, pools, cache), + Uuid::new_v4(), + Uuid::new_v4(), + util::timestamp::now(), + util::timestamp::now(), + (), + ); + + // Reset all workflows on worker instances that have not had a ping in the last 30 seconds + let rows = sql_fetch_all!( + [ctx, (Uuid, Uuid,)] + " + UPDATE db_workflow.workflows AS w + SET + worker_instance_id = NULL, + wake_immediate = true, + wake_deadline_ts = NULL, + wake_signals = ARRAY[], + wake_sub_workflow_id = NULL + FROM db_workflow.worker_instances AS wi + WHERE + wi.last_ping_ts < $1 AND + wi.worker_instance_id = w.worker_instance_id AND + w.output IS NULL + RETURNING w.workflow_id, wi.worker_instance_id + ", + ts - WORKER_INSTANCE_LOST_THRESHOLD, + ) + .await?; + + if !rows.is_empty() { + let unique_worker_instance_ids = rows + .iter() + .map(|(_, worker_instance_id)| worker_instance_id) + .collect::>(); + + tracing::info!( + worker_instance_ids=?unique_worker_instance_ids, + total_workflows=%rows.len(), + "handled failover", + ); + } + + Ok(()) +} diff --git a/svc/pkg/workflow/standalone/gc/src/main.rs b/svc/pkg/workflow/standalone/gc/src/main.rs new file mode 100644 index 000000000..573defb3d --- /dev/null +++ b/svc/pkg/workflow/standalone/gc/src/main.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use rivet_operation::prelude::*; + +fn main() -> GlobalResult<()> { + rivet_runtime::run(start()).unwrap() +} + +async fn start() -> GlobalResult<()> { + let pools = rivet_pools::from_env("workflow-gc").await?; + + tokio::task::Builder::new() + .name("workflow_gc::health_checks") + .spawn(rivet_health_checks::run_standalone( + rivet_health_checks::Config { + pools: Some(pools.clone()), + }, + ))?; + + tokio::task::Builder::new() + .name("workflow_gc::metrics") + .spawn(rivet_metrics::run_standalone())?; + + let mut interval = tokio::time::interval(Duration::from_secs(15)); + loop { + interval.tick().await; + + let ts = util::timestamp::now(); + workflow_gc::run_from_env(ts, pools.clone()).await?; + } +} diff --git a/svc/pkg/workflow/standalone/gc/tests/integration.rs b/svc/pkg/workflow/standalone/gc/tests/integration.rs new file mode 100644 index 000000000..a43226628 --- /dev/null +++ b/svc/pkg/workflow/standalone/gc/tests/integration.rs @@ -0,0 +1,17 @@ +use chirp_worker::prelude::*; + +use ::workflow_gc::run_from_env; + +#[tokio::test(flavor = "multi_thread")] +async fn basic() { + tracing_subscriber::fmt() + .json() + .with_max_level(tracing::Level::INFO) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) + .init(); + + let pools = rivet_pools::from_env("workflow-gc-test").await.unwrap(); + + // TODO: + run_from_env(util::timestamp::now(), pools).await.unwrap(); +}