Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/bolt/core/src/tasks/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::{io::AsyncWriteExt, task::block_in_place};
use crate::{
config::{self, service::RuntimeKind},
context::{ProjectContext, ServiceContext},
utils::db_conn::DatabaseConnections,
utils::{self, db_conn::DatabaseConnections},
};

const REDIS_IMAGE: &str = "ghcr.io/rivet-gg/redis:cc3241e";
Expand All @@ -31,7 +31,7 @@ pub struct ShellContext<'a> {
}

pub async fn shell(ctx: &ProjectContext, svc: &ServiceContext, query: Option<&str>) -> Result<()> {
let conn = DatabaseConnections::create(ctx, &[svc.clone()]).await?;
let conn = DatabaseConnections::create(ctx, &[svc.clone()], true).await?;
let shell_ctx = ShellContext {
ctx,
conn: &conn,
Expand Down Expand Up @@ -256,6 +256,14 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> {
query_cmd.push_str(&cmd);
}

utils::kubectl_port_forward(ctx, "cockroachdb", "cockroachdb", (26257, 26257))?;

println!("{query_cmd}");

block_in_place(|| cmd!(query_cmd).run())?;

return Ok(());

let overrides = json!({
"apiVersion": "v1",
"metadata": {
Expand Down
8 changes: 4 additions & 4 deletions lib/bolt/core/src/tasks/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub async fn up_all(ctx: &ProjectContext) -> Result<()> {
}

pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> {
let conn = DatabaseConnections::create(ctx, services).await?;
let conn = DatabaseConnections::create(ctx, services, false).await?;
let mut crdb_pre_queries = Vec::new();
let mut crdb_post_queries = Vec::new();
let mut clickhouse_pre_queries = Vec::new();
Expand Down Expand Up @@ -489,7 +489,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()>
}

pub async fn down(ctx: &ProjectContext, service: &ServiceContext, num: usize) -> Result<()> {
let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?;
let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?;
let database_url = conn.migrate_db_url(service).await?;

migration(
Expand All @@ -504,7 +504,7 @@ pub async fn down(ctx: &ProjectContext, service: &ServiceContext, num: usize) ->
}

pub async fn force(ctx: &ProjectContext, service: &ServiceContext, num: usize) -> Result<()> {
let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?;
let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?;
let database_url = conn.migrate_db_url(service).await?;

migration(
Expand All @@ -519,7 +519,7 @@ pub async fn force(ctx: &ProjectContext, service: &ServiceContext, num: usize) -
}

pub async fn drop(ctx: &ProjectContext, service: &ServiceContext) -> Result<()> {
let conn = DatabaseConnections::create(ctx, &[service.clone()]).await?;
let conn = DatabaseConnections::create(ctx, &[service.clone()], false).await?;
let database_url = conn.migrate_db_url(service).await?;

migration(
Expand Down
47 changes: 46 additions & 1 deletion lib/bolt/core/src/utils/db_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ impl DatabaseConnections {
pub async fn create(
ctx: &ProjectContext,
services: &[ServiceContext],
forward: bool,
) -> Result<Arc<DatabaseConnections>> {
match &ctx.ns().cluster.kind {
config::ns::ClusterKind::SingleNode { .. } => {
DatabaseConnections::create_local(ctx, services).await
if forward {
DatabaseConnections::create_local_forwarded(ctx, services).await
} else {
DatabaseConnections::create_local(ctx, services).await
}
}
config::ns::ClusterKind::Distributed { .. } => {
DatabaseConnections::create_distributed(ctx, services).await
Expand Down Expand Up @@ -91,6 +96,46 @@ impl DatabaseConnections {
}))
}

async fn create_local_forwarded(
_ctx: &ProjectContext,
services: &[ServiceContext],
) -> Result<Arc<DatabaseConnections>> {
let mut redis_hosts = HashMap::new();
let mut cockroach_host = None;
let mut clickhouse_host = None;

for svc in services {
match &svc.config().runtime {
RuntimeKind::Redis { .. } => {
let name = svc.name();

if !redis_hosts.contains_key(&name) {
let host = "localhost:6379".to_string();
redis_hosts.insert(name, host);
}
}
RuntimeKind::CRDB { .. } => {
if cockroach_host.is_none() {
cockroach_host = Some("localhost:26257".to_string());
}
}
RuntimeKind::ClickHouse { .. } => {
if clickhouse_host.is_none() {
clickhouse_host = Some("localhost:9440".to_string());
}
}
x => bail!("cannot connect to this type of service: {x:?}"),
}
}

Ok(Arc::new(DatabaseConnections {
redis_hosts,
cockroach_host,
clickhouse_host,
clickhouse_config: None,
}))
}

async fn create_distributed(
ctx: &ProjectContext,
services: &[ServiceContext],
Expand Down
7 changes: 0 additions & 7 deletions lib/bolt/core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,6 @@ pub fn pick_port() -> u16 {
portpicker::pick_unused_port().expect("no free ports")
}

pub struct PortForwardConfig {
pub service_name: &'static str,
pub namespace: &'static str,
pub local_port: u16,
pub remote_port: u16,
}

pub struct DroppablePort {
local_port: u16,
handle: duct::Handle,
Expand Down
20 changes: 16 additions & 4 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ impl Database for DatabasePgNats {
output IS NULL AND
-- No assigned node (not running)
worker_instance_id IS NULL AND
-- Not silenced
silence_ts IS NULL AND
-- Check for wake condition
(
-- Immediate
Expand All @@ -207,14 +209,17 @@ impl Database for DatabasePgNats {
output IS NULL AND
-- No assigned node (not running)
worker_instance_id IS NULL AND
-- Not silenced
silence_ts IS NULL AND
-- Signal exists
(
SELECT true
FROM db_workflow.signals AS s
WHERE
s.workflow_id = w.workflow_id AND
s.signal_name = ANY(w.wake_signals) AND
s.ack_ts IS NULL
s.ack_ts IS NULL AND
silence_ts IS NULL
LIMIT 1
)
UNION
Expand All @@ -227,14 +232,17 @@ impl Database for DatabasePgNats {
output IS NULL AND
-- No assigned node (not running)
worker_instance_id IS NULL AND
-- Not silenced
silence_ts IS NULL AND
-- Tagged signal exists
(
SELECT true
FROM db_workflow.tagged_signals AS s
WHERE
s.signal_name = ANY(w.wake_signals) AND
s.tags <@ w.tags AND
s.ack_ts IS NULL
s.ack_ts IS NULL AND
s.silence_ts IS NULL
LIMIT 1
)
UNION
Expand All @@ -247,6 +255,8 @@ impl Database for DatabasePgNats {
output IS NULL AND
-- No assigned node (not running)
worker_instance_id IS NULL AND
-- Not silenced
silence_ts IS NULL AND
-- Sub workflow completed
(
SELECT true
Expand Down Expand Up @@ -656,14 +666,16 @@ impl Database for DatabasePgNats {
WHERE
workflow_id = $1 AND
signal_name = ANY($2) AND
ack_ts IS NULL
ack_ts IS NULL AND
silence_ts IS NULL
UNION ALL
SELECT true AS tagged, signal_id, create_ts, signal_name, body
FROM db_workflow.tagged_signals
WHERE
signal_name = ANY($2) AND
tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND
ack_ts IS NULL
ack_ts IS NULL AND
silence_ts IS NULL
ORDER BY create_ts ASC
LIMIT 1
),
Expand Down
5 changes: 5 additions & 0 deletions scripts/forward/crdb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
set -euf

FORWARD_NS=cockroachdb FORWARD_NAME=service/cockroachdb PORT=5432 FORWARD_PORT=26257 ./scripts/forward/service.sh

1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ in
go-migrate
jq
openssh # ssh-keygen
postgresql

# Runtimes
nodejs_20 # Required for Fern
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE workflows
ADD COLUMN silence_ts INT;

ALTER TABLE signals
ADD COLUMN silence_ts INT;

ALTER TABLE tagged_signals
ADD COLUMN silence_ts INT;

CREATE INDEX ON signals (ack_ts, silence_ts) STORING (workflow_id, signal_name);
CREATE INDEX ON tagged_signals (ack_ts, silence_ts) STORING (tags, signal_name);
1 change: 1 addition & 0 deletions svc/pkg/workflow/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
wi.last_ping_ts < $1 AND
wi.worker_instance_id = w.worker_instance_id AND
w.output IS NULL AND
w.silence_ts IS NULL AND
-- Check for any wake condition so we don't restart a permanently dead workflow
(
w.wake_immediate OR
Expand Down
17 changes: 12 additions & 5 deletions svc/pkg/workflow/standalone/metrics-publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
FROM db_workflow.workflows AS OF SYSTEM TIME '-1s'
WHERE
output IS NULL AND
worker_instance_id IS NOT NULL
worker_instance_id IS NOT NULL AND
silence_ts IS NULL
GROUP BY workflow_name
",
),
Expand All @@ -55,7 +56,8 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
FROM db_workflow.workflows AS OF SYSTEM TIME '-1s'
WHERE
error IS NOT NULL AND
output IS NULL AND
output IS NULL AND AND
silence_ts IS NULL AND
wake_immediate = FALSE AND
wake_deadline_ts IS NULL AND
cardinality(wake_signals) = 0 AND
Expand All @@ -71,6 +73,7 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
WHERE
worker_instance_id IS NULL AND
output IS NULL AND
silence_ts IS NULL AND
(
wake_immediate OR
wake_deadline_ts IS NOT NULL OR
Expand All @@ -87,18 +90,22 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
FROM (
SELECT signal_name
FROM db_workflow.signals
WHERE ack_ts IS NULL
WHERE
ack_ts IS NULL AND
silence_ts IS NULL
UNION ALL
SELECT signal_name
FROM db_workflow.tagged_signals
WHERE ack_ts IS NULL
WHERE
ack_ts IS NULL AND
silence_ts IS NULL
) AS OF SYSTEM TIME '-1s'
GROUP BY signal_name
",
),
)?;

// Get rid of metrics that don't exist in the db anymore (stateful)
// Get rid of metrics that don't exist in the db anymore (declarative)
chirp_workflow::metrics::WORKFLOW_TOTAL.reset();
chirp_workflow::metrics::WORKFLOW_ACTIVE.reset();
chirp_workflow::metrics::WORKFLOW_DEAD.reset();
Expand Down