From 6ce34efd75d532f380f51042d937c649713bf4db Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Sat, 7 Jun 2025 02:04:09 +0000 Subject: [PATCH] feat: db sh for workflows --- .../core/src/db/fdb_sqlite_nats/debug.rs | 12 +- .../core/src/db/fdb_sqlite_nats/mod.rs | 34 +-- .../core/src/db/fdb_sqlite_nats/sqlite/mod.rs | 6 - .../common/chirp-workflow/core/src/db/mod.rs | 5 + packages/common/pools/src/db/sqlite/mod.rs | 40 +-- packages/common/server-cli/Cargo.toml | 4 +- .../common/server-cli/src/commands/db/mod.rs | 10 + packages/common/server-cli/src/util/db.rs | 285 +++++++++++++++++- .../install/install_scripts/components/mod.rs | 32 ++ .../server/install/install_scripts/mod.rs | 2 + .../pegboard/src/workflows/actor/mod.rs | 1 - .../pegboard/src/workflows/actor/runtime.rs | 21 +- 12 files changed, 400 insertions(+), 52 deletions(-) diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs index 6e1adcd6e6..ac18ab4a37 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs @@ -17,7 +17,7 @@ use uuid::Uuid; use super::{ keys, - sqlite::{db_name_internal, SqlStub}, + sqlite::SqlStub, DatabaseFdbSqliteNats, }; use crate::{ @@ -602,10 +602,10 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { for key in sub_workflow_wake_keys { tracing::warn!( - "workflow {} is being waited on by sub workflow {}, silencing anyway", - key.workflow_id, - key.sub_workflow_id - ); + "workflow {} is being waited on by sub workflow {}, silencing anyway", + key.workflow_id, + key.sub_workflow_id + ); } for key in tag_keys { @@ -741,7 +741,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats { ) -> Result> { let pool = &self .pools - .sqlite(db_name_internal(workflow_id), true) + .sqlite(crate::db::sqlite_db_name_internal(workflow_id), true) .await?; let (wf_data, event_rows, error_rows) = tokio::try_join!( diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs index 544fc03532..2ca270169a 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs @@ -142,7 +142,7 @@ impl DatabaseFdbSqliteNats { self.pools .sqlite_manager() .evict(vec![ - sqlite::db_name_internal(workflow_id), + crate::db::sqlite_db_name_internal(workflow_id), crate::db::sqlite_db_name_data(workflow_id), ]) .await?; @@ -1277,7 +1277,7 @@ impl Database for DatabaseFdbSqliteNats { async move { let pool = &self .pools - .sqlite(sqlite::db_name_internal(partial.workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false) .await?; // Handle error during sqlite init @@ -1876,7 +1876,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false) .await?; let owned_filter = filter @@ -2323,7 +2323,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; // Insert history event @@ -2351,7 +2351,7 @@ impl Database for DatabaseFdbSqliteNats { .sqlite_manager() .flush( vec![ - sqlite::db_name_internal(from_workflow_id), + crate::db::sqlite_db_name_internal(from_workflow_id), crate::db::sqlite_db_name_data(from_workflow_id), ], false, @@ -2413,7 +2413,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult { let pool = &self .pools - .sqlite(sqlite::db_name_internal(workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false) .await?; // Insert history event @@ -2448,7 +2448,7 @@ impl Database for DatabaseFdbSqliteNats { .sqlite_manager() .flush( vec![ - sqlite::db_name_internal(workflow_id), + crate::db::sqlite_db_name_internal(workflow_id), crate::db::sqlite_db_name_data(workflow_id), ], false, @@ -2596,7 +2596,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false) .await?; let input_hash = event_id.input_hash.to_be_bytes(); @@ -2702,7 +2702,7 @@ impl Database for DatabaseFdbSqliteNats { .sqlite_manager() .flush( vec![ - sqlite::db_name_internal(from_workflow_id), + crate::db::sqlite_db_name_internal(from_workflow_id), crate::db::sqlite_db_name_data(from_workflow_id), ], false, @@ -2711,7 +2711,7 @@ impl Database for DatabaseFdbSqliteNats { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -2751,7 +2751,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false) .await?; self.txn(|| async { @@ -3040,7 +3040,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -3072,7 +3072,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -3100,7 +3100,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -3132,7 +3132,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -3164,7 +3164,7 @@ impl Database for DatabaseFdbSqliteNats { ) -> WorkflowResult<()> { let pool = &self .pools - .sqlite(sqlite::db_name_internal(from_workflow_id), false) + .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false) .await?; sql_execute!( @@ -3220,7 +3220,7 @@ async fn flush_handler(pools: rivet_pools::Pools, mut flush_rx: mpsc::UnboundedR .sqlite_manager() .flush( vec![ - sqlite::db_name_internal(workflow_id), + crate::db::sqlite_db_name_internal(workflow_id), crate::db::sqlite_db_name_data(workflow_id), ], true, diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs index bd76c08e1a..6eb0e80cb7 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use fdb_util::keys::*; use include_dir::{include_dir, Dir, File}; use indoc::indoc; use rivet_pools::prelude::*; @@ -407,8 +406,3 @@ pub fn build_history( Ok(events_by_location) } - -/// Database name for the workflow internal state. -pub fn db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) { - (WORKFLOW, workflow_id, INTERNAL) -} diff --git a/packages/common/chirp-workflow/core/src/db/mod.rs b/packages/common/chirp-workflow/core/src/db/mod.rs index 682c400f37..467ff11f5e 100644 --- a/packages/common/chirp-workflow/core/src/db/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/mod.rs @@ -343,3 +343,8 @@ pub struct SignalData { pub fn sqlite_db_name_data(workflow_id: Uuid) -> (usize, Uuid, usize) { (WORKFLOW, workflow_id, DATA) } + +/// Database name for the workflow internal state. +pub fn sqlite_db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) { + (WORKFLOW, workflow_id, INTERNAL) +} diff --git a/packages/common/pools/src/db/sqlite/mod.rs b/packages/common/pools/src/db/sqlite/mod.rs index 688e88929a..3ef6d2d461 100644 --- a/packages/common/pools/src/db/sqlite/mod.rs +++ b/packages/common/pools/src/db/sqlite/mod.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use crate::{metrics, Error, FdbPool}; -mod keys; +pub mod keys; #[cfg(test)] mod tests; @@ -146,7 +146,7 @@ impl SqliteWriterEntry { /// DB key in packed form. This is not the full FDB key, this is the DB name segment in DbDataKey. /// /// Stored in an `Arc` since this is frequently copied around. -type KeyPacked = Arc>; +pub type KeyPacked = Arc>; pub type SqlitePoolManagerHandle = Arc; pub type SqlitePoolManagerHandleWeak = Weak; @@ -293,22 +293,15 @@ impl SqlitePoolManager { // MARK: Private helpers impl SqlitePoolManager { - fn db_info(&self, key_packed: &KeyPacked) -> (PathBuf, String) { + fn db_path(&self, key_packed: &KeyPacked) -> PathBuf { let hex_key_str = hex::encode(&**key_packed); match &self.storage { - SqliteStorage::Local { path } => { - // Determine the persistent location of this database - let db_path = path.join(format!("{hex_key_str}.db")); - let db_url = format!("sqlite://{}", db_path.display()); - (db_path, db_url) - } + // Determine the persistent location of this database + SqliteStorage::Local { path } => path.join(format!("{hex_key_str}.db")), + // Generate temporary file location so multiple readers don't clobber each other SqliteStorage::FoundationDb { path } => { - // Generate temporary file location so multiple readers don't clobber each other - let db_path = - path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4())); - let db_url = format!("sqlite://{}", db_path.display()); - (db_path, db_url) + path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4())) } } } @@ -344,7 +337,7 @@ impl SqlitePoolManager { } } }, - clear_db_files(&self.storage, self.db_info(&key_packed).0), + clear_db_files(&self.storage, self.db_path(&key_packed)), ); } } @@ -746,13 +739,13 @@ pub struct SqlitePoolInner { } impl SqlitePoolInner { - #[tracing::instrument(name = "sqlite_pool_new", skip_all)] async fn new( key_packed: KeyPacked, conn_type: SqliteConnType, manager: SqlitePoolManagerHandle, ) -> Result { - let (db_path, db_url) = manager.db_info(&key_packed); + let db_path = manager.db_path(&key_packed); + let db_url = format!("sqlite://{}", db_path.display()); // Load database match &manager.storage { @@ -893,6 +886,7 @@ impl SqlitePoolInner { } impl SqlitePoolInner { + // TODO: Doesn't need a result type #[tracing::instrument(name = "sqlite_pool_snapshot", skip_all)] pub async fn snapshot(&self, vacuum: bool) -> GlobalResult { match self @@ -910,9 +904,21 @@ impl SqlitePoolInner { } } } + + #[tracing::instrument(name = "sqlite_pool_evict", skip_all)] + pub async fn evict(&self) -> GlobalResult<()> { + self + .manager + .evict_with_key(&[self.key_packed.clone()]) + .await + } } impl SqlitePoolInner { + pub fn db_path(&self) -> &Path { + &self.db_path + } + #[tracing::instrument(skip_all)] pub async fn conn(&self) -> Result, sqlx::Error> { // Attempt to use an existing connection diff --git a/packages/common/server-cli/Cargo.toml b/packages/common/server-cli/Cargo.toml index 035714cc9c..3a65391ef1 100644 --- a/packages/common/server-cli/Cargo.toml +++ b/packages/common/server-cli/Cargo.toml @@ -11,13 +11,15 @@ chirp-workflow.workspace = true chrono = "0.4.38" clap = { version = "4.3", features = ["derive"] } colored_json = "5.0.0" +fdb-util.workspace = true +foundationdb.workspace = true futures-util = "0.3" global-error.workspace = true hex.workspace = true include_dir = "0.7.4" indoc = "2.0.5" +lz4_flex = "0.11.3" reqwest = "0.12.9" -foundationdb.workspace = true rivet-api.workspace = true rivet-config.workspace = true rivet-logs.workspace = true diff --git a/packages/common/server-cli/src/commands/db/mod.rs b/packages/common/server-cli/src/commands/db/mod.rs index dcbd2a2f52..79e118445c 100644 --- a/packages/common/server-cli/src/commands/db/mod.rs +++ b/packages/common/server-cli/src/commands/db/mod.rs @@ -29,6 +29,10 @@ pub enum DatabaseType { Redis, #[clap(alias = "ch")] Clickhouse, + #[clap(alias = "wfd")] + WorkflowData, + #[clap(alias = "wfi")] + WorkflowInternal, } impl SubCommand { @@ -56,6 +60,12 @@ impl SubCommand { DatabaseType::Clickhouse => { crate::util::db::clickhouse_shell(config, shell_ctx).await? } + DatabaseType::WorkflowData => { + crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await? + } + DatabaseType::WorkflowInternal => { + crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await? + } } Ok(()) diff --git a/packages/common/server-cli/src/util/db.rs b/packages/common/server-cli/src/util/db.rs index 82aa999d40..c881af5283 100644 --- a/packages/common/server-cli/src/util/db.rs +++ b/packages/common/server-cli/src/util/db.rs @@ -1,6 +1,25 @@ +use std::{ + io::{Read, Write}, + path::Path, + result::Result::Ok, + str::FromStr, + sync::Arc, + time::Duration, +}; + use anyhow::*; +use fdb_util::{prelude::*, SERIALIZABLE}; +use foundationdb::{self as fdb, options::StreamingMode, FdbBindingError}; +use futures_util::TryStreamExt; +use rivet_pools::db::sqlite::{keys, KeyPacked}; use serde_json::json; -use std::path::Path; +use sqlx::sqlite::{ + SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteLockingMode, SqliteSynchronous, +}; +use tokio::io::AsyncReadExt; +use uuid::Uuid; + +const CHUNK_SIZE: usize = 10_000; // 10 KB, not KiB, see https://apple.github.io/foundationdb/blob.html pub struct ShellQuery { pub svc: String, @@ -176,3 +195,267 @@ pub async fn clickhouse_shell( Ok(()) } + +pub async fn wf_sqlite_shell( + config: rivet_config::Config, + shell_ctx: ShellContext<'_>, + internal: bool, +) -> Result<()> { + let ShellContext { queries, .. } = shell_ctx; + + let pools = rivet_pools::Pools::new(config.clone()).await?; + + // Combine all queries into one command + for ShellQuery { + svc: workflow_id, + query, + } in queries + { + let workflow_id = Uuid::from_str(workflow_id).context("could not parse input as UUID")?; + + rivet_term::status::warn( + "WARNING", + "Database will open in WRITE mode. Modifications made will automatically be committed after the shell closes. This may cause changes made outside of this shell to be overwritten." + ); + + let term = rivet_term::terminal(); + let response = rivet_term::prompt::PromptBuilder::default() + .message("Are you sure?") + .build() + .expect("failed to build prompt") + .bool(&term) + .await + .expect("failed to show prompt"); + + if !response { + return Ok(()); + } + + println!(); + + let key = if internal { + chirp_workflow::db::sqlite_db_name_internal(workflow_id) + } else { + chirp_workflow::db::sqlite_db_name_data(workflow_id) + }; + let key_packed = Arc::new(key.pack_to_vec()); + + let db_file = tempfile::NamedTempFile::new()?; + let db_path = db_file.path(); + + read_from_fdb(&pools, &key_packed, &db_path).await?; + + let mut cmd = std::process::Command::new("/root/go/bin/usql"); + cmd.arg(format!("sqlite:{}", db_path.display())); + + if let Some(query) = query { + cmd.args(["-c", query]); + } + + cmd.status().context("failed running usql")?; + + rivet_term::status::progress("Evicting database", ""); + write_to_fdb(&pools, &key_packed, &db_path).await?; + rivet_term::status::success("Evicted", ""); + } + + Ok(()) +} + +async fn read_from_fdb( + pools: &rivet_pools::Pools, + key_packed: &KeyPacked, + db_path: &Path, +) -> Result<()> { + let (data, chunks) = pools + .fdb()? + .run(|tx, _mc| { + let key_packed = key_packed.clone(); + async move { + let compressed_db_data_subspace = + subspace().subspace(&keys::CompressedDbDataKey::new(key_packed.clone())); + + // Fetch all chunks + let mut compressed_data_stream = tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&compressed_db_data_subspace).into() + }, + SERIALIZABLE, + ); + + // Aggregate data + let mut buf = Vec::new(); + let mut chunk_count = 0; + + let mut compressed_data_buf = Vec::new(); + while let Some(entry) = compressed_data_stream.try_next().await? { + // Parse key + let key = subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + // Validate chunk + if chunk_count != key.chunk { + return Err(FdbBindingError::CustomError("mismatched chunk".into())); + } + chunk_count += 1; + + // Write to buffer + compressed_data_buf.extend(entry.value()); + } + + // Decompress the data + let mut decoder = lz4_flex::frame::FrameDecoder::new(&compressed_data_buf[..]); + decoder + .read_to_end(&mut buf) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + // If there is no compressed data, read from the uncompressed data (backwards compatibility) + if chunk_count == 0 { + let db_data_subspace = + subspace().subspace(&keys::DbDataKey::new(key_packed.clone())); + let mut data_stream = tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&db_data_subspace).into() + }, + SERIALIZABLE, + ); + + while let Some(entry) = data_stream.try_next().await? { + // Parse key + let key = subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + // Validate chunk + if chunk_count != key.chunk { + return Err(FdbBindingError::CustomError("mismatched chunk".into())); + } + chunk_count += 1; + + // Write to buffer + buf.extend(entry.value()); + } + } + + Ok((buf, chunk_count)) + } + }) + .await?; + + ensure!(chunks > 0, "db not found in fdb"); + + tokio::fs::write(db_path, data).await?; + + Ok(()) +} + +async fn write_to_fdb( + pools: &rivet_pools::Pools, + key_packed: &KeyPacked, + db_path: &Path, +) -> Result<()> { + let db_url = format!("sqlite://{}", db_path.display()); + + let opts = db_url + .parse::()? + .create_if_missing(false) + // Enable foreign key constraint enforcement + .foreign_keys(true) + // Enable auto vacuuming and set it to incremental mode for gradual space reclaiming + .auto_vacuum(SqliteAutoVacuum::Incremental) + // Set synchronous mode to NORMAL for performance and data safety balance + .synchronous(SqliteSynchronous::Normal) + // Increases write performance + .journal_mode(SqliteJournalMode::Wal) + // Reduces file system operations + .locking_mode(SqliteLockingMode::Exclusive); + + let pool_opts = sqlx::sqlite::SqlitePoolOptions::new() + // The default connection timeout is too high + .acquire_timeout(Duration::from_secs(60)) + .max_lifetime(Duration::from_secs(15 * 60)) + .max_lifetime_jitter(Duration::from_secs(90)) + // Remove connections after a while in order to reduce load after bursts + .idle_timeout(Some(Duration::from_secs(10 * 60))) + // Sqlite doesnt support more than 1 concurrent writer, will get "database is locked" + .min_connections(1) + .max_connections(1); + + // Create pool + let pool = pool_opts.connect_with(opts).await?; + + // Attempt to use an existing connection + let mut conn = if let Some(conn) = pool.try_acquire() { + conn + } else { + // Create a new connection + pool.acquire().await? + }; + + // Flush WAL journal + sqlx::query("PRAGMA wal_checkpoint(TRUNCATE);") + .execute(&mut *conn) + .await?; + + // Stream the database file and compress it + let mut compressed_data = Vec::new(); + let file = tokio::fs::File::open(db_path).await?; + let mut reader = tokio::io::BufReader::new(file); + let mut encoder = lz4_flex::frame::FrameEncoder::new(&mut compressed_data); + + async { + let mut buffer = [0u8; 16 * 1024]; // 16 KiB + loop { + let bytes_read = reader.read(&mut buffer).await?; + if bytes_read == 0 { + break; + } + encoder.write_all(&buffer[..bytes_read])?; + } + encoder.finish()?; + + Result::<_, Error>::Ok(()) + } + .await?; + + let data = Arc::new(compressed_data); + + // Write to FDB + pools + .fdb()? + .run(|tx, _mc| { + let key_packed = key_packed.clone(); + let data = data.clone(); + async move { + // Clear previous data + let db_data_subspace = + subspace().subspace(&keys::DbDataKey::new(key_packed.clone())); + tx.clear_subspace_range(&db_data_subspace); + let compressed_db_data_subspace = + subspace().subspace(&keys::CompressedDbDataKey::new(key_packed.clone())); + tx.clear_subspace_range(&compressed_db_data_subspace); + + // Write chunks + for (idx, chunk) in data.chunks(CHUNK_SIZE).enumerate() { + let chunk_key = keys::CompressedDbDataChunkKey { + db_name_segment: key_packed.clone(), + chunk: idx, + }; + + tx.set(&subspace().pack(&chunk_key), chunk); + } + + Ok(()) + } + }) + .await?; + + Ok(()) +} + +fn subspace() -> fdb_util::Subspace { + fdb_util::Subspace::new(&(RIVET, SQLITE)) +} diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs index 26bb86c57a..c75338adf8 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs @@ -217,6 +217,38 @@ pub mod skopeo { } } +pub mod go { + use indoc::indoc; + + pub fn install() -> String { + indoc!( + r#" + wget https://go.dev/dl/go1.24.4.linux-amd64.tar.gz + sudo tar -C /usr/local -xzf go1.24.4.linux-amd64.tar.gz + export PATH=$PATH:/usr/local/go/bin + export PATH="$PATH:$(go env GOPATH)/bin" + "# + ) + .to_string() + } +} + +pub mod usql { + use indoc::indoc; + + /// Requires go. + pub fn install() -> String { + indoc!( + r#" + apt install -y gcc + + go install github.com/xo/usql@latest + "# + ) + .to_string() + } +} + pub mod umoci { use indoc::indoc; diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs index e95e6e1617..4a373d06e4 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs @@ -72,6 +72,8 @@ pub async fn gen_install( script.push(components::fdb::install(initialize_immediately)); } PoolType::Worker => { + script.push(components::go::install()); + script.push(components::usql::install()); script.push(components::otel_collector::install(pool_type)?); script.push(components::rivet::worker::install(config)?); } diff --git a/packages/edge/services/pegboard/src/workflows/actor/mod.rs b/packages/edge/services/pegboard/src/workflows/actor/mod.rs index f3b0e99ccd..2c82f7ec9c 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/mod.rs @@ -165,7 +165,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul runtime::State::new(res.client_id, res.client_workflow_id, input.image_id), |ctx, state| { let input = input.clone(); - let meta = initial_actor_setup.meta.clone(); async move { let sig = if let Some(drain_timeout_ts) = state.drain_timeout_ts { diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index b2915be492..da9481b9f2 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -617,6 +617,19 @@ pub async fn insert_ports_fdb(ctx: &ActivityCtx, input: &InsertPortsFdbInput) -> Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct CompareRetryInput { + last_retry_ts: i64, +} + +#[activity(CompareRetry)] +async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> GlobalResult<(i64, bool)> { + let now = util::timestamp::now(); + + // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count + Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS)) +} + /// Returns whether or not there was availability to spawn the actor. pub async fn spawn_actor( ctx: &mut WorkflowCtx, @@ -790,9 +803,11 @@ pub async fn reschedule_actor( let mut backoff = util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count); - // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0 - let now = util::timestamp::now(); - state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS { + let (now, reset) = ctx.v(2).activity(CompareRetryInput { + last_retry_ts: state.last_retry_ts, + }).await?; + + state.retry_count = if reset { 0 } else { state.retry_count + 1