From 5458e50a10e2ce2ab37ecaf7e3bfaeff019f141f Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 18 Nov 2025 13:21:47 -0800 Subject: [PATCH] fix(gas): fix loop forgotten bug due to concurrency --- Cargo.toml | 5 +++ engine/packages/gasoline/src/db/kv/mod.rs | 40 ++++++++++++++----- engine/packages/pegboard-runner/src/conn.rs | 2 +- .../pegboard/src/workflows/runner2.rs | 9 ++--- scripts/docker/build-push.sh | 7 +++- scripts/run/docker/engine-rocksdb.sh | 1 - 6 files changed, 44 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f353940a3..6441b5d736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -433,3 +433,8 @@ debug = false lto = "fat" codegen-units = 1 opt-level = 3 + +# strip = true +# panic = "abort" +# overflow-checks = false +# debug-assertions = false diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 27dad968a9..f0480276a0 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -2579,23 +2579,41 @@ impl Database for DatabaseKv { keys::history::HistorySubspaceVariant::Forgotten, )); - let loop_events_subspace = - self.subspace - .subspace(&keys::history::EventHistorySubspaceKey::entire( - from_workflow_id, - location.clone(), - false, - )); + // Start is {loop location, 0, ...} + let loop_events_subspace_start = self + .subspace + .subspace(&keys::history::EventHistorySubspaceKey::entire( + from_workflow_id, + location.clone(), + false, + )) + .range() + .0; + // End is {loop location, iteration - 1, ...} + let loop_events_subspace_end = self + .subspace + .subspace(&keys::history::EventHistorySubspaceKey::new( + from_workflow_id, + location.clone(), + iteration.saturating_sub(1), + false, + )) + .range() + .1; let mut stream = tx.get_ranges_keyvalues( universaldb::RangeOption { mode: StreamingMode::WantAll, - ..(&loop_events_subspace).into() + ..( + loop_events_subspace_start.as_slice(), + loop_events_subspace_end.as_slice(), + ) + .into() }, Serializable, ); - // Move all current events under this loop to the forgotten history + // Move all events under this loop up to the current iteration to the forgotten history loop { let Some(entry) = stream.try_next().await? else { break; @@ -2605,7 +2623,7 @@ impl Database for DatabaseKv { return Err(universaldb::tuple::PackError::BadPrefix.into()); } - // Truncate tuple up to ACTIVE and replace it with FORGOTTEN + // Truncate tuple up to ...ACTIVE and replace it with ...FORGOTTEN let truncated_key = &entry.key()[active_history_subspace.bytes().len()..]; let forgotten_key = [forgotten_history_subspace.bytes(), truncated_key].concat(); @@ -2613,7 +2631,7 @@ impl Database for DatabaseKv { tx.set(&forgotten_key, entry.value()); } - tx.clear_subspace_range(&loop_events_subspace); + tx.clear_range(&loop_events_subspace_start, &loop_events_subspace_end); // Only retain last 100 events in forgotten history if iteration > 100 { diff --git a/engine/packages/pegboard-runner/src/conn.rs b/engine/packages/pegboard-runner/src/conn.rs index 2551348542..9715cdd1d5 100644 --- a/engine/packages/pegboard-runner/src/conn.rs +++ b/engine/packages/pegboard-runner/src/conn.rs @@ -128,7 +128,7 @@ pub async fn init_conn( // Spawn a new runner workflow if one doesn't already exist let workflow_id = ctx - .workflow(pegboard::workflows::runner::Input { + .workflow(pegboard::workflows::runner2::Input { runner_id, namespace_id: namespace.namespace_id, name: name.clone(), diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index 51dca7bef0..bea5097432 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -103,10 +103,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() Main::Forward(sig) => { match sig.inner { protocol::ToServer::ToServerInit(init_sig) => { - if init.is_none() { - init = Some(init_sig); - check_queue = true; - } + init = Some(init_sig); + check_queue = true; } protocol::ToServer::ToServerEvents(new_events) => { // Ignore events that were already received @@ -234,8 +232,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() let last_event_idx = events.last().map(|event| event.index); // NOTE: This should not be parallelized because signals should be sent in order - // Forward to actor workflows - // Process events + // Forward events to actor workflows for event in &events { let actor_id = crate::utils::event_actor_id(&event.inner).to_string(); let res = ctx diff --git a/scripts/docker/build-push.sh b/scripts/docker/build-push.sh index d327e4e76e..4ba19a53ec 100755 --- a/scripts/docker/build-push.sh +++ b/scripts/docker/build-push.sh @@ -23,7 +23,12 @@ for tag in "${TAGS[@]}"; do BUILD_TAG_ARGS+=("-t" "${IMAGE_REPO}:${tag}") done -docker build -f "${DOCKERFILE}" --target "${TARGET}" --platform linux/x86_64 --build-arg BUILD_FRONTEND=true "${BUILD_TAG_ARGS[@]}" "${CONTEXT}" +docker build -f "${DOCKERFILE}" --target "${TARGET}" \ + --platform linux/x86_64 \ + --build-arg BUILD_FRONTEND=true \ + # --build-arg CARGO_BUILD_MODE=release \ + "${BUILD_TAG_ARGS[@]}" \ + "${CONTEXT}" echo "Pushing images..." for tag in "${TAGS[@]}"; do diff --git a/scripts/run/docker/engine-rocksdb.sh b/scripts/run/docker/engine-rocksdb.sh index 42690862ba..93779c83a5 100755 --- a/scripts/run/docker/engine-rocksdb.sh +++ b/scripts/run/docker/engine-rocksdb.sh @@ -13,4 +13,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \ RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \ RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \ cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log -