Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ async fn tick(
})
.await?;

tracing::info!(?runner_configs, "------------------");

for (ns_id, runner_name, desired_slots) in &serverless_data {
let runner_config = runner_configs
.iter()
Expand Down
4 changes: 0 additions & 4 deletions packages/services/namespace/src/ops/runner_config/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ pub struct Input {

#[operation]
pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> {
if !ctx.config().is_leader() {
return Err(errors::Namespace::NotLeader.build());
}

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand Down
4 changes: 0 additions & 4 deletions packages/services/namespace/src/ops/runner_config/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ pub async fn namespace_runner_config_get(
ctx: &OperationCtx,
input: &Input,
) -> Result<Vec<RunnerConfig>> {
if !ctx.config().is_leader() {
return Err(errors::Namespace::NotLeader.build());
}

let runner_configs = ctx
.udb()?
.run(|tx| async move {
Expand Down
4 changes: 0 additions & 4 deletions packages/services/namespace/src/ops/runner_config/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ pub async fn namespace_runner_config_list(
ctx: &OperationCtx,
input: &Input,
) -> Result<Vec<(String, RunnerConfig)>> {
if !ctx.config().is_leader() {
return Err(errors::Namespace::NotLeader.build());
}

let runner_configs = ctx
.udb()?
.run(|tx| async move {
Expand Down
39 changes: 30 additions & 9 deletions packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,27 @@ pub async fn find_dc_with_runner(ctx: &OperationCtx, input: &Input) -> Result<Ou
}

async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<Option<u16>> {
// Check if this DC has any runners
// Check if this DC has any non-draining runners
let res = ctx
.op(super::list_for_ns::Input {
namespace_id: input.namespace_id,
name: Some(input.runner_name.clone()),
include_stopped: false,
created_before: None,
limit: 1,
limit: 16,
})
.await?;
if !res.runners.is_empty() {
if res
.runners
.iter()
.filter(|runner| runner.drain_ts.is_none())
.count()
!= 0
{
return Ok(Some(ctx.config().dc_label()));
}

// Check if serverless runner config exists
// Check if a serverless runner config with a max runners > 0 exists
let res = ctx
.op(namespace::ops::runner_config::get::Input {
runners: vec![(input.namespace_id, input.runner_name.clone())],
Expand All @@ -74,7 +80,6 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<
if let Some(runner) = res.first() {
match &runner.config {
RunnerConfig::Serverless { max_runners, .. } => {
// Check if runner config does not have a max runner count of 0
if *max_runners != 0 {
return Ok(Some(ctx.config().dc_label()));
}
Expand Down Expand Up @@ -103,10 +108,16 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<
name: Some(input.runner_name.clone()),
runner_ids: None,
include_stopped: Some(false),
limit: Some(1),
limit: Some(16),
cursor: None,
},
|res| !res.runners.is_empty(),
// Check for non draining runners
|res| {
res.runners
.iter()
.filter(|runner| runner.drain_ts.is_none())
.count() != 0
},
)
.map(|res| res.map(|x| x.map(|x| x.0)))
.boxed();
Expand All @@ -123,10 +134,20 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<
namespace: namespace.name.clone(),
variant: None,
runner_names: Some(input.runner_name.clone()),
limit: Some(1),
limit: None,
cursor: None,
},
|res| !res.runner_configs.is_empty(),
// Check for configs with a max runners > 0
|res| {
res.runner_configs
.iter()
.filter(|(_, rc)| match rc {
rivet_types::runner_configs::RunnerConfig::Serverless {
max_runners, ..
} => *max_runners != 0,
})
.count() != 0
},
)
.map(|res| res.map(|x| x.map(|x| x.0)))
.boxed();
Expand Down
Loading