Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions packages/common/gasoline/core/src/db/kv/keys/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ impl FormalChunkedKey for InputKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `InputKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down Expand Up @@ -693,7 +693,7 @@ impl FormalChunkedKey for OutputKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `OutputKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down Expand Up @@ -1621,8 +1621,12 @@ pub mod insert {
);

let state_key = super::InputKey::new(workflow_id, location.clone());
let state_subspace = subspace.subspace(&state_key);

// Write state
// Clear old state
tx.clear_subspace_range(&state_subspace);

// Write new state
for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() {
let chunk_key = state_key.chunk(i);

Expand Down
2 changes: 1 addition & 1 deletion packages/common/gasoline/core/src/db/kv/keys/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl FormalChunkedKey for BodyKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `BodyKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down
6 changes: 3 additions & 3 deletions packages/common/gasoline/core/src/db/kv/keys/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl FormalChunkedKey for InputKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `InputKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down Expand Up @@ -271,7 +271,7 @@ impl FormalChunkedKey for OutputKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `OutputKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down Expand Up @@ -358,7 +358,7 @@ impl FormalChunkedKey for StateKey {
.flatten()
.collect(),
)?)
.map_err(Into::into)
.context("failed to combine `StateKey`")
}

fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
Expand Down
6 changes: 5 additions & 1 deletion packages/common/gasoline/core/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2305,8 +2305,12 @@ impl Database for DatabaseKv {
.run(|tx| {
async move {
let state_key = keys::workflow::StateKey::new(workflow_id);
let state_subspace = self.subspace.subspace(&state_key);

// Clear old state
tx.clear_subspace_range(&state_subspace);

// Write state
// Write new state
for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() {
let chunk_key = state_key.chunk(i);

Expand Down
3 changes: 2 additions & 1 deletion packages/common/universaldb/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;

use anyhow::{Result, anyhow};
use anyhow::{Context, Result, anyhow};
use futures_util::FutureExt;

use crate::{
Expand Down Expand Up @@ -37,6 +37,7 @@ impl Database {
.map(|x| *x)
.map_err(|_| anyhow!("failed to downcast `run` return type"))
})
.context("transaction failed")
}

/// Creates a new txn instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,7 @@ impl TransactionTask {
} => {
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
{
tracing::error!("cannot set in read only txn");
let _ =
response.send(Err(anyhow!("postgres transaction connection failed")));
let _ = response.send(Err(anyhow!("cannot set in read only txn")));
continue;
};

Expand All @@ -418,9 +416,7 @@ impl TransactionTask {
TransactionCommand::Clear { key, response } => {
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
{
tracing::error!("cannot set in read only txn");
let _ =
response.send(Err(anyhow!("postgres transaction connection failed")));
let _ = response.send(Err(anyhow!("cannot set in read only txn")));
continue;
};

Expand All @@ -443,9 +439,7 @@ impl TransactionTask {
} => {
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
{
tracing::error!("cannot clear range in read only txn");
let _ =
response.send(Err(anyhow!("postgres transaction connection failed")));
let _ = response.send(Err(anyhow!("cannot clear range in read only txn")));
continue;
};

Expand Down Expand Up @@ -478,9 +472,8 @@ impl TransactionTask {
} => {
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
{
tracing::error!("cannot apply atomic op in read only txn");
let _ =
response.send(Err(anyhow!("postgres transaction connection failed")));
response.send(Err(anyhow!("cannot apply atomic op in read only txn")));
continue;
};

Expand Down Expand Up @@ -539,9 +532,9 @@ impl TransactionTask {
if let TransactionIsolationLevel::RepeatableReadReadOnly =
self.isolation_level
{
tracing::error!("cannot release conflict ranges in read only txn");
let _ = response
.send(Err(anyhow!("postgres transaction connection failed")));
let _ = response.send(Err(anyhow!(
"cannot release conflict ranges in read only txn"
)));
continue;
};

Expand All @@ -567,9 +560,8 @@ impl TransactionTask {
} => {
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
{
tracing::error!("cannot add conflict range in read only txn");
let _ =
response.send(Err(anyhow!("postgres transaction connection failed")));
let _ = response
.send(Err(anyhow!("cannot add conflict range in read only txn")));
continue;
};

Expand Down
6 changes: 1 addition & 5 deletions packages/common/universaldb/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Transaction {
self.driver.clear_range(begin, end)
}

pub fn clear_subspace_range(&self, subspace: &Subspace) {
pub fn clear_subspace_range(&self, subspace: &tuple::Subspace) {
let (begin, end) = subspace.range();
self.driver.clear_range(&begin, &end);
}
Expand Down Expand Up @@ -321,10 +321,6 @@ impl<'t> InformalTransaction<'t> {
self.inner.driver.clear_range(&begin, &end);
}

// pub fn commit(self: Box<Self>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
// self.inner.driver.commit()
// }

pub fn cancel(&self) {
self.inner.driver.cancel()
}
Expand Down
1 change: 0 additions & 1 deletion packages/core/pegboard-runner/src/ping_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>) -> Result<()> {
loop {
tokio::time::sleep(UPDATE_PING_INTERVAL).await;

// Check that workflow is not dead
let Some(wf) = ctx
.workflow::<pegboard::workflows::runner::Input>(conn.workflow_id)
.get()
Expand Down
4 changes: 2 additions & 2 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ async fn outbound_handler(
let mut req = client.get(url).headers(headers);

// Add admin token if configured
if let Some(auth) = ctx.config().auth {
req = req.header(X_RIVET_TOKEN, &auth.admin_token);
if let Some(auth) = &ctx.config().auth {
req = req.header(X_RIVET_TOKEN, auth.admin_token.read());
}

let mut source = sse::EventSource::new(req)?;
Expand Down
3 changes: 2 additions & 1 deletion packages/services/pegboard/src/keys/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ impl FormalChunkedKey for MetadataKey {
.map(|x| x.value().iter().map(|x| *x))
.flatten()
.collect::<Vec<_>>(),
)?
)
.context("failed to combine `MetadataKey`")?
.try_into()
}

Expand Down
5 changes: 4 additions & 1 deletion packages/services/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ async fn process_init(ctx: &ActivityCtx, input: &ProcessInitInput) -> Result<Pro

let metadata_key = keys::runner::MetadataKey::new(input.runner_id);

// Clear old metadata
tx.delete_key_subspace(&metadata_key);

// Write metadata
for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() {
let chunk_key = metadata_key.chunk(i);
Expand All @@ -782,7 +785,7 @@ async fn process_init(ctx: &ActivityCtx, input: &ProcessInitInput) -> Result<Pro

Ok(())
})
.custom_instrument(tracing::info_span!("runner_populate_actor_names_tx"))
.custom_instrument(tracing::info_span!("runner_process_init_tx"))
.await?;

Ok(ProcessInitOutput {
Expand Down
Loading