Skip to content

Commit c20b8cd

Browse files
MasterPtatoNathanFlurry
authored andcommitted
fix(pb): stop actors from destroying on goingaway
1 parent 986289f commit c20b8cd

File tree

5 files changed

+31
-30
lines changed

5 files changed

+31
-30
lines changed

engine/packages/guard-core/src/custom_serve.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use async_trait::async_trait;
33
use bytes::Bytes;
44
use http_body_util::Full;
55
use hyper::{Request, Response};
6-
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
76
use pegboard::tunnel::id::RequestId;
7+
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
88

99
use crate::WebSocketHandle;
1010
use crate::proxy_service::ResponseBody;

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -612,19 +612,20 @@ impl ProxyState {
612612
let cache_key = (actor_id, ip_addr);
613613

614614
// Get existing counter or create a new one
615-
let counter_arc =
616-
if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await {
617-
existing_counter
618-
} else {
619-
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
620-
middleware_config.max_in_flight.amount,
621-
)));
622-
self.in_flight_counters
623-
.insert(cache_key, new_counter.clone())
624-
.await;
625-
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
626-
new_counter
627-
};
615+
let counter_arc = if let Some(existing_counter) =
616+
self.in_flight_counters.get(&cache_key).await
617+
{
618+
existing_counter
619+
} else {
620+
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
621+
middleware_config.max_in_flight.amount,
622+
)));
623+
self.in_flight_counters
624+
.insert(cache_key, new_counter.clone())
625+
.await;
626+
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
627+
new_counter
628+
};
628629

629630
// Try to acquire from the counter
630631
let acquired = {
@@ -638,7 +639,7 @@ impl ProxyState {
638639
}
639640

640641
// Generate unique request ID
641-
let request_id = Some(self.generate_unique_request_id().await?);
642+
let request_id = Some(self.generate_unique_request_id().await?);
642643
Ok(request_id)
643644
}
644645

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ impl SharedState {
194194

195195
hs.pending_ws_msgs.push(pending_ws_msg);
196196
tracing::debug!(
197-
index=current_message_index,
198-
new_count=hs.pending_ws_msgs.len(),
197+
index = current_message_index,
198+
new_count = hs.pending_ws_msgs.len(),
199199
"pushed pending websocket message"
200200
);
201201
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -569,10 +569,10 @@ async fn handle_stopped(
569569
tracing::debug!(?variant, "actor stopped");
570570

571571
let force_reschedule = match &variant {
572-
// Reset retry count on successful exit
573572
StoppedVariant::Normal {
574573
code: protocol::StopCode::Ok,
575574
} => {
575+
// Reset retry count on successful exit
576576
state.reschedule_state = Default::default();
577577

578578
false
@@ -583,7 +583,6 @@ async fn handle_stopped(
583583

584584
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
585585
state.gc_timeout_ts = None;
586-
state.going_away = false;
587586
state.stopping = false;
588587
state.runner_id = None;
589588
let old_runner_workflow_id = state.runner_workflow_id.take();
@@ -658,16 +657,16 @@ async fn handle_stopped(
658657
}
659658
// Handle rescheduling if not marked as sleeping
660659
else if !state.sleeping {
661-
// Anything besides a StopCode::Ok is considered a failure
662-
let failed = !matches!(
663-
variant,
664-
StoppedVariant::Normal {
665-
code: protocol::StopCode::Ok
666-
}
667-
);
660+
let graceful_exit = !state.going_away
661+
&& matches!(
662+
variant,
663+
StoppedVariant::Normal {
664+
code: protocol::StopCode::Ok
665+
}
666+
);
668667

669-
match (input.crash_policy, failed) {
670-
(CrashPolicy::Restart, true) => {
668+
match (input.crash_policy, graceful_exit) {
669+
(CrashPolicy::Restart, false) => {
671670
match runtime::reschedule_actor(ctx, &input, state, false).await? {
672671
runtime::SpawnActorOutput::Allocated { .. } => {}
673672
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
@@ -678,7 +677,7 @@ async fn handle_stopped(
678677
}
679678
}
680679
}
681-
(CrashPolicy::Sleep, true) => {
680+
(CrashPolicy::Sleep, false) => {
682681
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to crash");
683682

684683
state.sleeping = true;
@@ -711,6 +710,7 @@ async fn handle_stopped(
711710

712711
state.wake_for_alarm = false;
713712
state.will_wake = false;
713+
state.going_away = false;
714714

715715
ctx.msg(Stopped {})
716716
.tag("actor_id", input.actor_id)

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use futures_util::{FutureExt, StreamExt, TryStreamExt};
22
use gas::prelude::*;
33
use rivet_data::converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData};
44
use rivet_metrics::KeyValue;
5-
use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION};
5+
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
66
use universaldb::{
77
options::{ConflictRangeType, StreamingMode},
88
utils::{FormalChunkedKey, IsolationLevel::*},

0 commit comments

Comments
 (0)