diff --git a/packages/common/gasoline/core/src/db/kv/keys/history.rs b/packages/common/gasoline/core/src/db/kv/keys/history.rs index 9787355f96..a415c5f669 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/history.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/history.rs @@ -577,7 +577,7 @@ impl FormalChunkedKey for InputKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `InputKey`") } fn split(&self, value: Self::Value) -> Result>> { @@ -693,7 +693,7 @@ impl FormalChunkedKey for OutputKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `OutputKey`") } fn split(&self, value: Self::Value) -> Result>> { @@ -1621,8 +1621,12 @@ pub mod insert { ); let state_key = super::InputKey::new(workflow_id, location.clone()); + let state_subspace = subspace.subspace(&state_key); - // Write state + // Clear old state + tx.clear_subspace_range(&state_subspace); + + // Write new state for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() { let chunk_key = state_key.chunk(i); diff --git a/packages/common/gasoline/core/src/db/kv/keys/signal.rs b/packages/common/gasoline/core/src/db/kv/keys/signal.rs index 04cb4437c5..25587d48ad 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/signal.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/signal.rs @@ -42,7 +42,7 @@ impl FormalChunkedKey for BodyKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `BodyKey`") } fn split(&self, value: Self::Value) -> Result>> { diff --git a/packages/common/gasoline/core/src/db/kv/keys/workflow.rs b/packages/common/gasoline/core/src/db/kv/keys/workflow.rs index 63b166c42e..5ef01bb389 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/workflow.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/workflow.rs @@ -184,7 +184,7 @@ impl FormalChunkedKey for InputKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `InputKey`") } fn split(&self, value: Self::Value) -> Result>> { @@ -271,7 +271,7 @@ impl FormalChunkedKey for OutputKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `OutputKey`") } fn split(&self, value: Self::Value) -> Result>> { @@ -358,7 +358,7 @@ impl FormalChunkedKey for StateKey { .flatten() .collect(), )?) - .map_err(Into::into) + .context("failed to combine `StateKey`") } fn split(&self, value: Self::Value) -> Result>> { diff --git a/packages/common/gasoline/core/src/db/kv/mod.rs b/packages/common/gasoline/core/src/db/kv/mod.rs index bb8e5f8b5f..ffa3ba10d7 100644 --- a/packages/common/gasoline/core/src/db/kv/mod.rs +++ b/packages/common/gasoline/core/src/db/kv/mod.rs @@ -2305,8 +2305,12 @@ impl Database for DatabaseKv { .run(|tx| { async move { let state_key = keys::workflow::StateKey::new(workflow_id); + let state_subspace = self.subspace.subspace(&state_key); + + // Clear old state + tx.clear_subspace_range(&state_subspace); - // Write state + // Write new state for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() { let chunk_key = state_key.chunk(i); diff --git a/packages/common/universaldb/src/database.rs b/packages/common/universaldb/src/database.rs index 508a638e6b..a8f356f548 100644 --- a/packages/common/universaldb/src/database.rs +++ b/packages/common/universaldb/src/database.rs @@ -1,6 +1,6 @@ use std::future::Future; -use anyhow::{Result, anyhow}; +use anyhow::{Context, Result, anyhow}; use futures_util::FutureExt; use crate::{ @@ -37,6 +37,7 @@ impl Database { .map(|x| *x) .map_err(|_| anyhow!("failed to downcast `run` return type")) }) + .context("transaction failed") } /// Creates a new txn instance. diff --git a/packages/common/universaldb/src/driver/postgres/transaction_task.rs b/packages/common/universaldb/src/driver/postgres/transaction_task.rs index 3587b791db..1e49f9e8b7 100644 --- a/packages/common/universaldb/src/driver/postgres/transaction_task.rs +++ b/packages/common/universaldb/src/driver/postgres/transaction_task.rs @@ -394,9 +394,7 @@ impl TransactionTask { } => { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot set in read only txn"); - let _ = - response.send(Err(anyhow!("postgres transaction connection failed"))); + let _ = response.send(Err(anyhow!("cannot set in read only txn"))); continue; }; @@ -418,9 +416,7 @@ impl TransactionTask { TransactionCommand::Clear { key, response } => { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot set in read only txn"); - let _ = - response.send(Err(anyhow!("postgres transaction connection failed"))); + let _ = response.send(Err(anyhow!("cannot set in read only txn"))); continue; }; @@ -443,9 +439,7 @@ impl TransactionTask { } => { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot clear range in read only txn"); - let _ = - response.send(Err(anyhow!("postgres transaction connection failed"))); + let _ = response.send(Err(anyhow!("cannot clear range in read only txn"))); continue; }; @@ -478,9 +472,8 @@ impl TransactionTask { } => { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot apply atomic op in read only txn"); let _ = - response.send(Err(anyhow!("postgres transaction connection failed"))); + response.send(Err(anyhow!("cannot apply atomic op in read only txn"))); continue; }; @@ -539,9 +532,9 @@ impl TransactionTask { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot release conflict ranges in read only txn"); - let _ = response - .send(Err(anyhow!("postgres transaction connection failed"))); + let _ = response.send(Err(anyhow!( + "cannot release conflict ranges in read only txn" + ))); continue; }; @@ -567,9 +560,8 @@ impl TransactionTask { } => { if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level { - tracing::error!("cannot add conflict range in read only txn"); - let _ = - response.send(Err(anyhow!("postgres transaction connection failed"))); + let _ = response + .send(Err(anyhow!("cannot add conflict range in read only txn"))); continue; }; diff --git a/packages/common/universaldb/src/transaction.rs b/packages/common/universaldb/src/transaction.rs index 991c069db5..3808afff9c 100644 --- a/packages/common/universaldb/src/transaction.rs +++ b/packages/common/universaldb/src/transaction.rs @@ -232,7 +232,7 @@ impl Transaction { self.driver.clear_range(begin, end) } - pub fn clear_subspace_range(&self, subspace: &Subspace) { + pub fn clear_subspace_range(&self, subspace: &tuple::Subspace) { let (begin, end) = subspace.range(); self.driver.clear_range(&begin, &end); } @@ -321,10 +321,6 @@ impl<'t> InformalTransaction<'t> { self.inner.driver.clear_range(&begin, &end); } - // pub fn commit(self: Box) -> Pin> + Send>> { - // self.inner.driver.commit() - // } - pub fn cancel(&self) { self.inner.driver.cancel() } diff --git a/packages/core/pegboard-runner/src/ping_task.rs b/packages/core/pegboard-runner/src/ping_task.rs index 4e1a5ebeb0..aab494ae35 100644 --- a/packages/core/pegboard-runner/src/ping_task.rs +++ b/packages/core/pegboard-runner/src/ping_task.rs @@ -19,7 +19,6 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc) -> Result<()> { loop { tokio::time::sleep(UPDATE_PING_INTERVAL).await; - // Check that workflow is not dead let Some(wf) = ctx .workflow::(conn.workflow_id) .get() diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 2fa1d11636..29a69f544a 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -232,8 +232,8 @@ async fn outbound_handler( let mut req = client.get(url).headers(headers); // Add admin token if configured - if let Some(auth) = ctx.config().auth { - req = req.header(X_RIVET_TOKEN, &auth.admin_token); + if let Some(auth) = &ctx.config().auth { + req = req.header(X_RIVET_TOKEN, auth.admin_token.read()); } let mut source = sse::EventSource::new(req)?; diff --git a/packages/services/pegboard/src/keys/runner.rs b/packages/services/pegboard/src/keys/runner.rs index d7c6774115..fdc726f239 100644 --- a/packages/services/pegboard/src/keys/runner.rs +++ b/packages/services/pegboard/src/keys/runner.rs @@ -760,7 +760,8 @@ impl FormalChunkedKey for MetadataKey { .map(|x| x.value().iter().map(|x| *x)) .flatten() .collect::>(), - )? + ) + .context("failed to combine `MetadataKey`")? .try_into() } diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index 4dc38ee257..08467bfdd9 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -772,6 +772,9 @@ async fn process_init(ctx: &ActivityCtx, input: &ProcessInitInput) -> Result Result