Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 112 additions & 78 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
},
};

use anyhow::Result;
use anyhow::{Context, Result, bail};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use futures_util::{StreamExt, TryStreamExt};
Expand All @@ -15,7 +15,7 @@ use pegboard::keys;
use reqwest::header::{HeaderName, HeaderValue};
use reqwest_eventsource as sse;
use rivet_runner_protocol as protocol;
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
use rivet_types::runner_configs::RunnerConfigKind;
use tokio::{sync::oneshot, task::JoinHandle, time::Duration};
use universaldb::options::StreamingMode;
use universaldb::utils::IsolationLevel::*;
Expand Down Expand Up @@ -104,95 +104,38 @@ async fn tick(
})
.await?;

// Process each runner config with error handling
for (ns_id, runner_name, desired_slots) in &serverless_data {
let runner_config = runner_configs
.iter()
.find(|rc| rc.namespace_id == *ns_id)
.context("runner config not found")?;

let namespace = ctx
.op(namespace::ops::get_global::Input {
namespace_ids: vec![ns_id.clone()],
})
.await
.context("runner namespace not found")?;
let namespace = namespace.first().context("runner namespace not found")?;
let namespace_name = &namespace.name;
let runner_config = runner_configs.iter().find(|rc| rc.namespace_id == *ns_id);

let RunnerConfigKind::Serverless {
url,
headers,
request_lifespan,
slots_per_runner,
min_runners,
max_runners,
runners_margin,
} = &runner_config.config.kind
else {
let Some(runner_config) = runner_config else {
tracing::warn!(
?ns_id,
"this runner config should not be in the serverless subspace (wrong config kind)"
?runner_name,
"runner config not found, likely deleted"
);
continue;
};

let curr = outbound_connections
.entry((*ns_id, runner_name.clone()))
.or_insert_with(Vec::new);

// Remove finished and draining connections from list
curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst));

// Log warning and reset to 0 if negative
let adjusted_desired_slots = if *desired_slots < 0 {
if let Err(err) = tick_runner_config(
ctx,
*ns_id,
runner_name.clone(),
*desired_slots,
runner_config,
outbound_connections,
)
.await
{
tracing::error!(
?ns_id,
?runner_name,
?desired_slots,
"negative desired slots, scaling to 0"
?err,
"failed to process runner config, continuing with others"
);
0
} else {
*desired_slots
};

let desired_count =
(rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64)
.max(*min_runners as i64)
+ *runners_margin as i64)
.min(*max_runners as i64)
.try_into()?;

// Calculate diff
let drain_count = curr.len().saturating_sub(desired_count);
let start_count = desired_count.saturating_sub(curr.len());

if drain_count != 0 {
// TODO: Implement smart logic of draining runners with the lowest allocated actors
let draining_connections = curr.split_off(desired_count);

for conn in draining_connections {
if conn.shutdown_tx.send(()).is_err() {
tracing::warn!(
"serverless connection shutdown channel dropped, likely already stopped"
);
}
}
// Continue processing other runner configs even if this one failed
continue;
}

let starting_connections = std::iter::repeat_with(|| {
spawn_connection(
ctx.clone(),
url.clone(),
headers.clone(),
Duration::from_secs(*request_lifespan as u64),
*slots_per_runner,
runner_name.clone(),
namespace_name.clone(),
)
})
.take(start_count);
curr.extend(starting_connections);
}

// Remove entries that aren't returned from udb
Expand All @@ -209,6 +152,97 @@ async fn tick(
Ok(())
}

async fn tick_runner_config(
ctx: &StandaloneCtx,
ns_id: Id,
runner_name: String,
desired_slots: i64,
runner_config: &namespace::ops::runner_config::get::RunnerConfig,
outbound_connections: &mut HashMap<(Id, String), Vec<OutboundConnection>>,
) -> Result<()> {
let namespace = ctx
.op(namespace::ops::get_global::Input {
namespace_ids: vec![ns_id.clone()],
})
.await
.context("runner namespace not found")?;
let namespace = namespace.first().context("runner namespace not found")?;
let namespace_name = &namespace.name;

let RunnerConfigKind::Serverless {
url,
headers,
request_lifespan,
slots_per_runner,
min_runners,
max_runners,
runners_margin,
} = &runner_config.config.kind
else {
bail!("runner config should not be in the serverless subspace (wrong config kind)");
};

let curr = outbound_connections
.entry((ns_id, runner_name.clone()))
.or_insert_with(Vec::new);

// Remove finished and draining connections from list
curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst));

// Log warning and reset to 0 if negative
let adjusted_desired_slots = if desired_slots < 0 {
tracing::error!(
?ns_id,
?runner_name,
?desired_slots,
"negative desired slots, scaling to 0"
);
0
} else {
desired_slots
};

let desired_count =
(rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64)
.max(*min_runners as i64)
+ *runners_margin as i64)
.min(*max_runners as i64)
.try_into()?;

// Calculate diff
let drain_count = curr.len().saturating_sub(desired_count);
let start_count = desired_count.saturating_sub(curr.len());

if drain_count != 0 {
// TODO: Implement smart logic of draining runners with the lowest allocated actors
let draining_connections = curr.split_off(desired_count);

for conn in draining_connections {
if conn.shutdown_tx.send(()).is_err() {
tracing::warn!(
"serverless connection shutdown channel dropped, likely already stopped"
);
}
}
}

let starting_connections = std::iter::repeat_with(|| {
spawn_connection(
ctx.clone(),
url.clone(),
headers.clone(),
Duration::from_secs(*request_lifespan as u64),
*slots_per_runner,
runner_name.clone(),
namespace_name.clone(),
)
})
.take(start_count);
curr.extend(starting_connections);

Ok(())
}

fn spawn_connection(
ctx: StandaloneCtx,
url: String,
Expand Down
Loading