Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/bolt/core/src/tasks/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub enum PlanStepKind {
},
Migrate,
Up,
ProvisionDefaultCluster,
}

impl PlanStepKind {
Expand All @@ -60,6 +61,9 @@ impl PlanStepKind {
tasks::migrate::up_all(&ctx).await?;
}
PlanStepKind::Up => tasks::up::up_all(&ctx, false, false, false, false).await?,
PlanStepKind::ProvisionDefaultCluster => {
tasks::provision::default_cluster_create(&ctx).await?
}
}

Ok(())
Expand Down Expand Up @@ -88,7 +92,7 @@ impl PlanStepKind {

terraform::output::clear_cache(&ctx, &plan_id).await;
}
PlanStepKind::Migrate | PlanStepKind::Up => {
PlanStepKind::Migrate | PlanStepKind::Up | PlanStepKind::ProvisionDefaultCluster => {
// Do nothing
}
}
Expand Down Expand Up @@ -327,6 +331,11 @@ pub fn build_plan(
kind: PlanStepKind::Up,
});

plan.push(PlanStep {
name_id: "provision-default-cluster",
kind: PlanStepKind::ProvisionDefaultCluster,
});

// Start at the specified step
if let Some(start_at) = start_at {
let idx = plan
Expand Down
1 change: 1 addition & 0 deletions lib/bolt/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod db;
pub mod gen;
pub mod infra;
pub mod migrate;
pub mod provision;
pub mod ssh;
pub mod template;
pub mod test;
Expand Down
223 changes: 223 additions & 0 deletions lib/bolt/core/src/tasks/provision.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use anyhow::*;
use futures_util::stream::StreamExt;
use rivet_api::{
apis::{admin_clusters_api, admin_clusters_datacenters_api, configuration},
models,
};
// use proto::backend::{self, pkg::*};
// use rivet_operation::prelude::*;
use serde::Deserialize;
use tokio::{
fs,
process::Command,
sync::{Mutex, Semaphore},
task::JoinSet,
};
use uuid::Uuid;

use crate::{
config::{
self,
service::{ComponentClass, RuntimeKind},
},
context::{
BuildContext, BuildOptimization, ProjectContext, RunContext, ServiceBuildPlan,
ServiceContext,
},
dep::{
self, cargo,
k8s::gen::{ExecServiceContext, ExecServiceDriver},
},
tasks,
utils::{self},
};

#[derive(Deserialize)]
struct Cluster {
name_id: String,
datacenters: HashMap<String, Datacenter>,
}

#[derive(Deserialize)]
struct Datacenter {
datacenter_id: Uuid,
display_name: String,
provider: Provider,
provider_datacenter_name: String,
pools: HashMap<PoolType, Pool>,
build_delivery_method: BuildDeliveryMethod,
drain_timeout: u64,
}

#[derive(Deserialize)]
enum Provider {
#[serde(rename = "linode")]
Linode,
}

#[derive(Deserialize)]
struct Pool {
hardware: Vec<Hardware>,
desired_count: u32,
max_count: u32,
}

#[derive(Deserialize, PartialEq, Eq, Hash)]
enum PoolType {
#[serde(rename = "job")]
Job,
#[serde(rename = "gg")]
Gg,
#[serde(rename = "ats")]
Ats,
}

#[derive(Deserialize)]
struct Hardware {
name: String,
}

#[derive(Deserialize)]
enum BuildDeliveryMethod {
#[serde(rename = "traffic_server")]
TrafficServer,
#[serde(rename = "s3_direct")]
S3Direct,
}

pub async fn default_cluster_create(ctx: &ProjectContext) -> Result<()> {
// Read config from env
let Some(config_json) = util::env::var("RIVET_DEFAULT_CLUSTER_CONFIG").ok() else {
tracing::warn!("no cluster config set in namespace config");
return Ok(());
};
let config = serde_json::from_str::<Cluster>(&config_json)?;

let api_admin_token = ctx.read_secret(&["rivet", "api_admin", "token"]).await?;

let req_config = configuration::Configuration {
base_path: ctx.origin_api(),
bearer_access_token: Some(api_admin_token),
..Default::default()
};

// Get the default cluster
let cluster_res =
admin_clusters_api::admin_clusters_get(&req_config, &Uuid::nil().to_string()).await;

let datacenter_list_res =
admin_clusters_datacenters_api::admin_clusters_datacenters_get_datacenters(
&req_config,
&Uuid::nil().to_string(),
)
.await?;

let datacenters = datacenter_list_res.datacenters;

// If the default cluster doesn't exist, create it
if cluster_res.is_err() {
rivet_term::status::progress("Creating default cluster", "");

admin_clusters_api::admin_clusters_create(
&req_config,
models::AdminClustersCreateRequest {
name_id: "default".to_string(),
owner_team_id: None,
},
)
.await?;
}

// Delete any datacenters that aren't in the config

// Go through each datacenter in the config and update it if needed
for (name_id, datacenter) in config.datacenters {
let existing_datacenter = datacenters
.iter()
.any(|dc| dc.datacenter_id == datacenter.datacenter_id);

// Update existing datacenter
if existing_datacenter {
let new_pools = datacenter
.pools
.into_iter()
.map(|(pool_type, pool)| {
let desired_count = match pool_type {
PoolType::Ats => Some(pool.desired_count),
PoolType::Job | PoolType::Gg => {
// TODO: Add autoscaler to namespace
let use_autoscaler = false;
if use_autoscaler {
None
} else {
Some(pool.desired_count)
}
}
};

clusters::msg::datacenter_update::PoolUpdate {
pool_type: Into::<backend::cluster::PoolType>::into(pool_type) as i32,
hardware: pool
.hardware
.into_iter()
.map(Into::into)
.collect::<Vec<_>>(),
desired_count,
max_count: Some(pool.max_count),
}
})
.collect::<Vec<_>>();

msg!([ctx] @wait cluster::msg::datacenter_update(datacenter.datacenter_id) {
datacenter_id: datacenter_id_proto,
pools: new_pools,
// Convert from seconds to ms
drain_timeout: Some(datacenter.drain_timeout * 1000),
})
.await?;
}
// Create new datacenter
else {
msg!([ctx] @wait cluster::msg::datacenter_create(datacenter.datacenter_id) {
config: Some(backend::cluster::Datacenter {
datacenter_id: datacenter_id_proto,
cluster_id: Some(cluster_id.into()),
name_id,
display_name: datacenter.display_name,

provider: Into::<backend::cluster::Provider>::into(datacenter.provider) as i32,
provider_datacenter_id: datacenter.provider_datacenter_name,
provider_api_token: None,

pools: datacenter.pools.into_iter().map(|(pool_type, pool)| {
backend::cluster::Pool {
pool_type: Into::<backend::cluster::PoolType>::into(pool_type) as i32,
hardware: pool.hardware.into_iter().map(Into::into).collect::<Vec<_>>(),
desired_count: pool.desired_count,
max_count: pool.max_count,
}
}).collect::<Vec<_>>(),

build_delivery_method: Into::<backend::cluster::BuildDeliveryMethod>::into(datacenter.build_delivery_method) as i32,
drain_timeout: datacenter.drain_timeout,
}),
})
.await?;
}

// TODO: Both this message and datacenter-create/datacenter-update (above) publish datacenter-scale.
// This results in double provisioning until datacenter-scale is published again, cleaning up the
// excess.
// Taint datacenter
if taint {
msg!([ctx] @wait cluster::msg::datacenter_taint(datacenter.datacenter_id) {
datacenter_id: datacenter_id_proto,
})
.await?;
}
}

Ok(())
}