diff --git a/lib/bolt/core/src/tasks/db.rs b/lib/bolt/core/src/tasks/db.rs index 6fc2248d88..777d909823 100644 --- a/lib/bolt/core/src/tasks/db.rs +++ b/lib/bolt/core/src/tasks/db.rs @@ -8,7 +8,7 @@ use tokio::{io::AsyncWriteExt, task::block_in_place}; use crate::{ config::{self, service::RuntimeKind}, context::{ProjectContext, ServiceContext}, - utils::db_conn::DatabaseConnections, + utils::{self, db_conn::DatabaseConnections}, }; const REDIS_IMAGE: &str = "ghcr.io/rivet-gg/redis:cc3241e"; @@ -31,7 +31,7 @@ pub struct ShellContext<'a> { } pub async fn shell(ctx: &ProjectContext, svc: &ServiceContext, query: Option<&str>) -> Result<()> { - let conn = DatabaseConnections::create(ctx, &[svc.clone()]).await?; + let conn = DatabaseConnections::create(ctx, &[svc.clone()], true).await?; let shell_ctx = ShellContext { ctx, conn: &conn, @@ -256,6 +256,14 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { query_cmd.push_str(&cmd); } + utils::kubectl_port_forward(ctx, "cockroachdb", "cockroachdb", (26257, 26257))?; + + println!("{query_cmd}"); + + block_in_place(|| cmd!(query_cmd).run())?; + + return Ok(()); + let overrides = json!({ "apiVersion": "v1", "metadata": { diff --git a/lib/bolt/core/src/tasks/migrate.rs b/lib/bolt/core/src/tasks/migrate.rs index f430628a56..4f68140957 100644 --- a/lib/bolt/core/src/tasks/migrate.rs +++ b/lib/bolt/core/src/tasks/migrate.rs @@ -281,7 +281,7 @@ pub async fn up_all(ctx: &ProjectContext) -> Result<()> { } pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> { - let conn = DatabaseConnections::create(ctx, services).await?; + let conn = DatabaseConnections::create(ctx, services, false).await?; let mut crdb_pre_queries = Vec::new(); let mut crdb_post_queries = Vec::new(); let mut clickhouse_pre_queries = Vec::new(); @@ -489,7 +489,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> } pub async fn down(ctx: &ProjectContext, service: &ServiceContext, num: usize) -> Result<()> { - let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?; + let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?; let database_url = conn.migrate_db_url(service).await?; migration( @@ -504,7 +504,7 @@ pub async fn down(ctx: &ProjectContext, service: &ServiceContext, num: usize) -> } pub async fn force(ctx: &ProjectContext, service: &ServiceContext, num: usize) -> Result<()> { - let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?; + let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?; let database_url = conn.migrate_db_url(service).await?; migration( @@ -519,7 +519,7 @@ pub async fn force(ctx: &ProjectContext, service: &ServiceContext, num: usize) - } pub async fn drop(ctx: &ProjectContext, service: &ServiceContext) -> Result<()> { - let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?; + let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?; let database_url = conn.migrate_db_url(service).await?; migration( diff --git a/lib/bolt/core/src/utils/db_conn.rs b/lib/bolt/core/src/utils/db_conn.rs index 8c265b9ea4..3c6b0df6ff 100644 --- a/lib/bolt/core/src/utils/db_conn.rs +++ b/lib/bolt/core/src/utils/db_conn.rs @@ -21,10 +21,15 @@ impl DatabaseConnections { pub async fn create( ctx: &ProjectContext, services: &[ServiceContext], + forward: bool, ) -> Result> { match &ctx.ns().cluster.kind { config::ns::ClusterKind::SingleNode { .. } => { - DatabaseConnections::create_local(ctx, services).await + if forward { + DatabaseConnections::create_local_forwarded(ctx, services).await + } else { + DatabaseConnections::create_local(ctx, services).await + } } config::ns::ClusterKind::Distributed { .. } => { DatabaseConnections::create_distributed(ctx, services).await @@ -91,6 +96,46 @@ impl DatabaseConnections { })) } + async fn create_local_forwarded( + _ctx: &ProjectContext, + services: &[ServiceContext], + ) -> Result> { + let mut redis_hosts = HashMap::new(); + let mut cockroach_host = None; + let mut clickhouse_host = None; + + for svc in services { + match &svc.config().runtime { + RuntimeKind::Redis { .. } => { + let name = svc.name(); + + if !redis_hosts.contains_key(&name) { + let host = "localhost:6379".to_string(); + redis_hosts.insert(name, host); + } + } + RuntimeKind::CRDB { .. } => { + if cockroach_host.is_none() { + cockroach_host = Some("localhost:26257".to_string()); + } + } + RuntimeKind::ClickHouse { .. } => { + if clickhouse_host.is_none() { + clickhouse_host = Some("localhost:9440".to_string()); + } + } + x => bail!("cannot connect to this type of service: {x:?}"), + } + } + + Ok(Arc::new(DatabaseConnections { + redis_hosts, + cockroach_host, + clickhouse_host, + clickhouse_config: None, + })) + } + async fn create_distributed( ctx: &ProjectContext, services: &[ServiceContext], diff --git a/lib/bolt/core/src/utils/mod.rs b/lib/bolt/core/src/utils/mod.rs index 0582575afa..8198bfcf67 100644 --- a/lib/bolt/core/src/utils/mod.rs +++ b/lib/bolt/core/src/utils/mod.rs @@ -207,13 +207,6 @@ pub fn pick_port() -> u16 { portpicker::pick_unused_port().expect("no free ports") } -pub struct PortForwardConfig { - pub service_name: &'static str, - pub namespace: &'static str, - pub local_port: u16, - pub remote_port: u16, -} - pub struct DroppablePort { local_port: u16, handle: duct::Handle, diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index 5d2a3268c6..ec8c73351a 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -187,6 +187,8 @@ impl Database for DatabasePgNats { output IS NULL AND -- No assigned node (not running) worker_instance_id IS NULL AND + -- Not silenced + silence_ts IS NULL AND -- Check for wake condition ( -- Immediate @@ -207,6 +209,8 @@ impl Database for DatabasePgNats { output IS NULL AND -- No assigned node (not running) worker_instance_id IS NULL AND + -- Not silenced + silence_ts IS NULL AND -- Signal exists ( SELECT true @@ -214,7 +218,8 @@ impl Database for DatabasePgNats { WHERE s.workflow_id = w.workflow_id AND s.signal_name = ANY(w.wake_signals) AND - s.ack_ts IS NULL + s.ack_ts IS NULL AND + silence_ts IS NULL LIMIT 1 ) UNION @@ -227,6 +232,8 @@ impl Database for DatabasePgNats { output IS NULL AND -- No assigned node (not running) worker_instance_id IS NULL AND + -- Not silenced + silence_ts IS NULL AND -- Tagged signal exists ( SELECT true @@ -234,7 +241,8 @@ impl Database for DatabasePgNats { WHERE s.signal_name = ANY(w.wake_signals) AND s.tags <@ w.tags AND - s.ack_ts IS NULL + s.ack_ts IS NULL AND + s.silence_ts IS NULL LIMIT 1 ) UNION @@ -247,6 +255,8 @@ impl Database for DatabasePgNats { output IS NULL AND -- No assigned node (not running) worker_instance_id IS NULL AND + -- Not silenced + silence_ts IS NULL AND -- Sub workflow completed ( SELECT true @@ -656,14 +666,16 @@ impl Database for DatabasePgNats { WHERE workflow_id = $1 AND signal_name = ANY($2) AND - ack_ts IS NULL + ack_ts IS NULL AND + silence_ts IS NULL UNION ALL SELECT true AS tagged, signal_id, create_ts, signal_name, body FROM db_workflow.tagged_signals WHERE signal_name = ANY($2) AND tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND - ack_ts IS NULL + ack_ts IS NULL AND + silence_ts IS NULL ORDER BY create_ts ASC LIMIT 1 ), diff --git a/scripts/forward/crdb.sh b/scripts/forward/crdb.sh new file mode 100755 index 0000000000..c978ff6e56 --- /dev/null +++ b/scripts/forward/crdb.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -euf + +FORWARD_NS=cockroachdb FORWARD_NAME=service/cockroachdb PORT=5432 FORWARD_PORT=26257 ./scripts/forward/service.sh + diff --git a/shell.nix b/shell.nix index d7214d2094..28fb50823a 100644 --- a/shell.nix +++ b/shell.nix @@ -44,6 +44,7 @@ in go-migrate jq openssh # ssh-keygen + postgresql # Runtimes nodejs_20 # Required for Fern diff --git a/svc/pkg/workflow/db/workflow/migrations/20240905192157_manual_ack.down.sql b/svc/pkg/workflow/db/workflow/migrations/20240905192157_manual_ack.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/svc/pkg/workflow/db/workflow/migrations/20240905192157_manual_ack.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240905192157_manual_ack.up.sql new file mode 100644 index 0000000000..122f61266e --- /dev/null +++ b/svc/pkg/workflow/db/workflow/migrations/20240905192157_manual_ack.up.sql @@ -0,0 +1,11 @@ +ALTER TABLE workflows +ADD COLUMN silence_ts INT; + +ALTER TABLE signals +ADD COLUMN silence_ts INT; + +ALTER TABLE tagged_signals +ADD COLUMN silence_ts INT; + +CREATE INDEX ON signals (ack_ts, silence_ts) STORING (workflow_id, signal_name); +CREATE INDEX ON tagged_signals (ack_ts, silence_ts) STORING (tags, signal_name); diff --git a/svc/pkg/workflow/standalone/gc/src/lib.rs b/svc/pkg/workflow/standalone/gc/src/lib.rs index d97007fcd0..5b7d7551cb 100644 --- a/svc/pkg/workflow/standalone/gc/src/lib.rs +++ b/svc/pkg/workflow/standalone/gc/src/lib.rs @@ -35,6 +35,7 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<() wi.last_ping_ts < $1 AND wi.worker_instance_id = w.worker_instance_id AND w.output IS NULL AND + w.silence_ts IS NULL AND -- Check for any wake condition so we don't restart a permanently dead workflow ( w.wake_immediate OR diff --git a/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs index 67de568a2f..08bc5d6bbd 100644 --- a/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs +++ b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs @@ -44,7 +44,8 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' WHERE output IS NULL AND - worker_instance_id IS NOT NULL + worker_instance_id IS NOT NULL AND + silence_ts IS NULL GROUP BY workflow_name ", ), @@ -55,7 +56,8 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' WHERE error IS NOT NULL AND - output IS NULL AND + output IS NULL AND AND + silence_ts IS NULL AND wake_immediate = FALSE AND wake_deadline_ts IS NULL AND cardinality(wake_signals) = 0 AND @@ -71,6 +73,7 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { WHERE worker_instance_id IS NULL AND output IS NULL AND + silence_ts IS NULL AND ( wake_immediate OR wake_deadline_ts IS NOT NULL OR @@ -87,18 +90,22 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { FROM ( SELECT signal_name FROM db_workflow.signals - WHERE ack_ts IS NULL + WHERE + ack_ts IS NULL AND + silence_ts IS NULL UNION ALL SELECT signal_name FROM db_workflow.tagged_signals - WHERE ack_ts IS NULL + WHERE + ack_ts IS NULL AND + silence_ts IS NULL ) AS OF SYSTEM TIME '-1s' GROUP BY signal_name ", ), )?; - // Get rid of metrics that don't exist in the db anymore (stateful) + // Get rid of metrics that don't exist in the db anymore (declarative) chirp_workflow::metrics::WORKFLOW_TOTAL.reset(); chirp_workflow::metrics::WORKFLOW_ACTIVE.reset(); chirp_workflow::metrics::WORKFLOW_DEAD.reset();