Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/guard-core/src/custom_serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Request, Response};
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use pegboard::tunnel::id::RequestId;
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;

use crate::WebSocketHandle;
use crate::proxy_service::ResponseBody;
Expand Down
29 changes: 15 additions & 14 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,19 +612,20 @@ impl ProxyState {
let cache_key = (actor_id, ip_addr);

// Get existing counter or create a new one
let counter_arc =
if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await {
existing_counter
} else {
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
middleware_config.max_in_flight.amount,
)));
self.in_flight_counters
.insert(cache_key, new_counter.clone())
.await;
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
new_counter
};
let counter_arc = if let Some(existing_counter) =
self.in_flight_counters.get(&cache_key).await
{
existing_counter
} else {
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
middleware_config.max_in_flight.amount,
)));
self.in_flight_counters
.insert(cache_key, new_counter.clone())
.await;
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
new_counter
};

// Try to acquire from the counter
let acquired = {
Expand All @@ -638,7 +639,7 @@ impl ProxyState {
}

// Generate unique request ID
let request_id = Some(self.generate_unique_request_id().await?);
let request_id = Some(self.generate_unique_request_id().await?);
Ok(request_id)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ impl SharedState {

hs.pending_ws_msgs.push(pending_ws_msg);
tracing::debug!(
index=current_message_index,
new_count=hs.pending_ws_msgs.len(),
index = current_message_index,
new_count = hs.pending_ws_msgs.len(),
"pushed pending websocket message"
);
}
Expand Down
24 changes: 12 additions & 12 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ async fn handle_stopped(
tracing::debug!(?variant, "actor stopped");

let force_reschedule = match &variant {
// Reset retry count on successful exit
StoppedVariant::Normal {
code: protocol::StopCode::Ok,
} => {
// Reset retry count on successful exit
state.reschedule_state = Default::default();

false
Expand All @@ -583,7 +583,6 @@ async fn handle_stopped(

// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
state.gc_timeout_ts = None;
state.going_away = false;
state.stopping = false;
state.runner_id = None;
let old_runner_workflow_id = state.runner_workflow_id.take();
Expand Down Expand Up @@ -658,16 +657,16 @@ async fn handle_stopped(
}
// Handle rescheduling if not marked as sleeping
else if !state.sleeping {
// Anything besides a StopCode::Ok is considered a failure
let failed = !matches!(
variant,
StoppedVariant::Normal {
code: protocol::StopCode::Ok
}
);
let graceful_exit = !state.going_away
&& matches!(
variant,
StoppedVariant::Normal {
code: protocol::StopCode::Ok
}
);

match (input.crash_policy, failed) {
(CrashPolicy::Restart, true) => {
match (input.crash_policy, graceful_exit) {
(CrashPolicy::Restart, false) => {
match runtime::reschedule_actor(ctx, &input, state, false).await? {
runtime::SpawnActorOutput::Allocated { .. } => {}
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
Expand All @@ -678,7 +677,7 @@ async fn handle_stopped(
}
}
}
(CrashPolicy::Sleep, true) => {
(CrashPolicy::Sleep, false) => {
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to crash");

state.sleeping = true;
Expand Down Expand Up @@ -711,6 +710,7 @@ async fn handle_stopped(

state.wake_for_alarm = false;
state.will_wake = false;
state.going_away = false;

ctx.msg(Stopped {})
.tag("actor_id", input.actor_id)
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures_util::{FutureExt, StreamExt, TryStreamExt};
use gas::prelude::*;
use rivet_data::converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData};
use rivet_metrics::KeyValue;
use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION};
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
use universaldb::{
options::{ConflictRangeType, StreamingMode},
utils::{FormalChunkedKey, IsolationLevel::*},
Expand Down
Loading