Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,8 @@ debug = false
lto = "fat"
codegen-units = 1
opt-level = 3

# strip = true
# panic = "abort"
# overflow-checks = false
# debug-assertions = false
40 changes: 29 additions & 11 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2579,23 +2579,41 @@ impl Database for DatabaseKv {
keys::history::HistorySubspaceVariant::Forgotten,
));

let loop_events_subspace =
self.subspace
.subspace(&keys::history::EventHistorySubspaceKey::entire(
from_workflow_id,
location.clone(),
false,
));
// Start is {loop location, 0, ...}
let loop_events_subspace_start = self
.subspace
.subspace(&keys::history::EventHistorySubspaceKey::entire(
from_workflow_id,
location.clone(),
false,
))
.range()
.0;
// End is {loop location, iteration - 1, ...}
let loop_events_subspace_end = self
.subspace
.subspace(&keys::history::EventHistorySubspaceKey::new(
from_workflow_id,
location.clone(),
iteration.saturating_sub(1),
false,
))
.range()
.1;

let mut stream = tx.get_ranges_keyvalues(
universaldb::RangeOption {
mode: StreamingMode::WantAll,
..(&loop_events_subspace).into()
..(
loop_events_subspace_start.as_slice(),
loop_events_subspace_end.as_slice(),
)
.into()
},
Serializable,
);

// Move all current events under this loop to the forgotten history
// Move all events under this loop up to the current iteration to the forgotten history
loop {
let Some(entry) = stream.try_next().await? else {
break;
Expand All @@ -2605,15 +2623,15 @@ impl Database for DatabaseKv {
return Err(universaldb::tuple::PackError::BadPrefix.into());
}

// Truncate tuple up to ACTIVE and replace it with FORGOTTEN
// Truncate tuple up to ...ACTIVE and replace it with ...FORGOTTEN
let truncated_key = &entry.key()[active_history_subspace.bytes().len()..];
let forgotten_key =
[forgotten_history_subspace.bytes(), truncated_key].concat();

tx.set(&forgotten_key, entry.value());
}

tx.clear_subspace_range(&loop_events_subspace);
tx.clear_range(&loop_events_subspace_start, &loop_events_subspace_end);

// Only retain last 100 events in forgotten history
if iteration > 100 {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-runner/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn init_conn(

// Spawn a new runner workflow if one doesn't already exist
let workflow_id = ctx
.workflow(pegboard::workflows::runner::Input {
.workflow(pegboard::workflows::runner2::Input {
runner_id,
namespace_id: namespace.namespace_id,
name: name.clone(),
Expand Down
9 changes: 3 additions & 6 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
Main::Forward(sig) => {
match sig.inner {
protocol::ToServer::ToServerInit(init_sig) => {
if init.is_none() {
init = Some(init_sig);
check_queue = true;
}
init = Some(init_sig);
check_queue = true;
}
protocol::ToServer::ToServerEvents(new_events) => {
// Ignore events that were already received
Expand Down Expand Up @@ -234,8 +232,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
let last_event_idx = events.last().map(|event| event.index);

// NOTE: This should not be parallelized because signals should be sent in order
// Forward to actor workflows
// Process events
// Forward events to actor workflows
for event in &events {
let actor_id = crate::utils::event_actor_id(&event.inner).to_string();
let res = ctx
Expand Down
7 changes: 6 additions & 1 deletion scripts/docker/build-push.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ for tag in "${TAGS[@]}"; do
BUILD_TAG_ARGS+=("-t" "${IMAGE_REPO}:${tag}")
done

docker build -f "${DOCKERFILE}" --target "${TARGET}" --platform linux/x86_64 --build-arg BUILD_FRONTEND=true "${BUILD_TAG_ARGS[@]}" "${CONTEXT}"
docker build -f "${DOCKERFILE}" --target "${TARGET}" \
--platform linux/x86_64 \
--build-arg BUILD_FRONTEND=true \
# --build-arg CARGO_BUILD_MODE=release \
"${BUILD_TAG_ARGS[@]}" \
"${CONTEXT}"

echo "Pushing images..."
for tag in "${TAGS[@]}"; do
Expand Down
1 change: 0 additions & 1 deletion scripts/run/docker/engine-rocksdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \
RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \
RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \
cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log

Loading