diff --git a/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs index 406c19b24d..ebc62e8680 100644 --- a/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs @@ -168,7 +168,7 @@ impl Database for DatabaseCrdbNats { metrics::WORKER_LAST_PING .with_label_values(&[&worker_instance_id.to_string()]) .set(rivet_util::timestamp::now()); - + sql_execute!( [self] " diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs index 8ce570002c..95d7cdd633 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs @@ -1275,7 +1275,22 @@ impl Database for DatabaseFdbSqliteNats { .pools .sqlite(sqlite::db_name_internal(partial.workflow_id), false) .await?; - sqlite::init(partial.workflow_id, pool).await?; + + // Handle error during sqlite init + if let Err(err) = sqlite::init(partial.workflow_id, pool).await { + self.commit_workflow( + partial.workflow_id, + &partial.workflow_name, + false, + None, + &[], + None, + &err.to_string(), + ) + .await?; + + return Ok(None); + } // Fetch all events let events = sql_fetch_all!( @@ -1479,7 +1494,7 @@ impl Database for DatabaseFdbSqliteNats { ) .await?; - WorkflowResult::Ok(PulledWorkflowData { + WorkflowResult::Ok(Some(PulledWorkflowData { workflow_id: partial.workflow_id, workflow_name: partial.workflow_name, create_ts: partial.create_ts, @@ -1487,10 +1502,11 @@ impl Database for DatabaseFdbSqliteNats { input: partial.input, wake_deadline_ts: partial.wake_deadline_ts, events: sqlite::build_history(events)?, - }) + })) } }) .buffer_unordered(512) + .try_filter_map(|x| std::future::ready(Ok(x))) .try_collect() .instrument(tracing::trace_span!("map_to_pulled_workflows")) .await?; diff --git a/packages/common/server-cli/src/commands/fdb/cli.rs b/packages/common/server-cli/src/commands/fdb/cli.rs index 4eb0c925ee..6a182d7b81 100644 --- a/packages/common/server-cli/src/commands/fdb/cli.rs +++ b/packages/common/server-cli/src/commands/fdb/cli.rs @@ -56,11 +56,11 @@ pub enum SubCommand { /// Set value at current path. #[command(name = "set")] Set { - /// Key path to list. Supports relative key paths. - key: Option, + /// Key path to set. Supports relative key paths. Used as value if value is not set. + key_or_value: String, /// Value to set, with optional type prefix (e.g. "u64:123", overrides type hint). - value: String, + value: Option, /// Optional type hint. #[arg(short = 't', long)] type_hint: Option, @@ -302,10 +302,16 @@ impl SubCommand { } } SubCommand::Set { - key, + key_or_value, value, type_hint, } => { + let (key, value) = if let Some(value) = value { + (Some(key_or_value), value) + } else { + (None, key_or_value) + }; + let mut current_tuple = current_tuple.clone(); if !update_current_tuple(&mut current_tuple, key) { return CommandResult::Error; diff --git a/packages/edge/services/pegboard/standalone/ws/src/lib.rs b/packages/edge/services/pegboard/standalone/ws/src/lib.rs index 52c30268e7..3390449fbc 100644 --- a/packages/edge/services/pegboard/standalone/ws/src/lib.rs +++ b/packages/edge/services/pegboard/standalone/ws/src/lib.rs @@ -118,7 +118,7 @@ async fn handle_connection( tracing::error!(?addr, ?err, "handle connection inner failed"); } - tracing::info!(?client_id, "client connection closed"); + tracing::info!(client_id=?url_data.client_id, "client connection closed"); // Clean up let conn = conns.write().await.remove(&url_data.client_id);