diff --git a/Cargo.lock b/Cargo.lock index 9ad589f078b..1af92d65336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2790,6 +2790,7 @@ dependencies = [ "db-macros", "diesel", "hex", + "internal-dns-client", "ipnetwork 0.20.0", "macaddr", "newtype_derive", diff --git a/common/src/address.rs b/common/src/address.rs index 5fd69543456..5f9fcd0af01 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -20,12 +20,12 @@ pub const SLED_PREFIX: u8 = 64; /// The amount of redundancy for DNS servers. /// /// Must be less than MAX_DNS_REDUNDANCY. -pub const DNS_REDUNDANCY: usize = 1; +pub const DNS_REDUNDANCY: u32 = 1; /// The maximum amount of redundancy for DNS servers. /// /// This determines the number of addresses which are /// reserved for DNS servers. -pub const MAX_DNS_REDUNDANCY: usize = 5; +pub const MAX_DNS_REDUNDANCY: u32 = 5; pub const DNS_PORT: u16 = 53; pub const DNS_SERVER_PORT: u16 = 5353; @@ -34,10 +34,12 @@ pub const SLED_AGENT_PORT: u16 = 12345; /// The port propolis-server listens on inside the propolis zone. pub const PROPOLIS_PORT: u16 = 12400; pub const COCKROACH_PORT: u16 = 32221; +pub const CRUCIBLE_PORT: u16 = 32345; pub const CLICKHOUSE_PORT: u16 = 8123; pub const OXIMETER_PORT: u16 = 12223; pub const DENDRITE_PORT: u16 = 12224; +pub const NEXUS_EXTERNAL_PORT: u16 = 12220; pub const NEXUS_INTERNAL_PORT: u16 = 12221; // Anycast is a mechanism in which a single IP address is shared by multiple @@ -180,7 +182,7 @@ mod test { // Observe the first DNS subnet within this reserved rack subnet. let dns_subnets = rack_subnet.get_dns_subnets(); - assert_eq!(DNS_REDUNDANCY, dns_subnets.len()); + assert_eq!(DNS_REDUNDANCY, dns_subnets.len() as u32); // The DNS address and GZ address should be only differing by one. assert_eq!( diff --git a/common/src/sql/dbinit.sql b/common/src/sql/dbinit.sql index b86c8c790f7..807d9abf0ec 100644 --- a/common/src/sql/dbinit.sql +++ b/common/src/sql/dbinit.sql @@ -86,10 +86,11 @@ CREATE TABLE omicron.public.sled ( last_used_address INET NOT NULL ); -/* Add an index which lets us look up sleds on a rack */ +/* Add an index which lets us look up the sleds on a rack */ CREATE INDEX ON omicron.public.sled ( rack_id -) WHERE time_deleted IS NULL; +) WHERE + time_deleted IS NULL; /* * Services @@ -117,7 +118,13 @@ CREATE TABLE omicron.public.service ( /* Add an index which lets us look up the services on a sled */ CREATE INDEX ON omicron.public.service ( - sled_id + sled_id, + kind +); + +/* Add an index which lets us look up services of a particular kind on a sled */ +CREATE INDEX ON omicron.public.service ( + kind ); /* @@ -140,6 +147,11 @@ CREATE TABLE omicron.public.Zpool ( total_size INT NOT NULL ); +/* Create an index which allows looking up all zpools on a sled */ +CREATE INDEX on omicron.public.Zpool ( + sled_id +) WHERE time_deleted IS NULL; + CREATE TYPE omicron.public.dataset_kind AS ENUM ( 'crucible', 'cockroach', @@ -170,6 +182,11 @@ CREATE TABLE omicron.public.Dataset ( size_used INT ); +/* Create an index which allows looking up all datasets in a pool */ +CREATE INDEX on omicron.public.Dataset ( + pool_id +) WHERE time_deleted IS NULL; + /* Create an index on the size usage for Crucible's allocation */ CREATE INDEX on omicron.public.Dataset ( size_used diff --git a/nexus/db-model/Cargo.toml b/nexus/db-model/Cargo.toml index 9a188f38cc9..1e513bbadb0 100644 --- a/nexus/db-model/Cargo.toml +++ b/nexus/db-model/Cargo.toml @@ -9,6 +9,7 @@ anyhow = "1.0" chrono = { version = "0.4", features = ["serde"] } diesel = { version = "2.0.0-rc.1", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } hex = "0.4.3" +internal-dns-client = { path = "../../internal-dns-client" } ipnetwork = "0.20" macaddr = { version = "1.0.1", features = [ "serde_std" ]} newtype_derive = "0.1.6" diff --git a/nexus/db-model/src/dataset.rs b/nexus/db-model/src/dataset.rs index d68097af274..d58cde4fb4b 100644 --- a/nexus/db-model/src/dataset.rs +++ b/nexus/db-model/src/dataset.rs @@ -4,11 +4,14 @@ use super::{DatasetKind, Generation, Region, SqlU16}; use crate::collection::DatastoreCollectionConfig; +use crate::ipv6; use crate::schema::{dataset, region}; use chrono::{DateTime, Utc}; use db_macros::Asset; +use internal_dns_client::names::{BackendName, ServiceName, AAAA, SRV}; +use nexus_types::identity::Asset; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; +use std::net::{Ipv6Addr, SocketAddrV6}; use uuid::Uuid; /// Database representation of a Dataset. @@ -35,10 +38,10 @@ pub struct Dataset { pub pool_id: Uuid, - ip: ipnetwork::IpNetwork, + ip: ipv6::Ipv6Addr, port: SqlU16, - kind: DatasetKind, + pub kind: DatasetKind, pub size_used: Option, } @@ -46,7 +49,7 @@ impl Dataset { pub fn new( id: Uuid, pool_id: Uuid, - addr: SocketAddr, + addr: SocketAddrV6, kind: DatasetKind, ) -> Self { let size_used = match kind { @@ -65,12 +68,26 @@ impl Dataset { } } - pub fn address(&self) -> SocketAddr { + pub fn address(&self) -> SocketAddrV6 { self.address_with_port(self.port.into()) } - pub fn address_with_port(&self, port: u16) -> SocketAddr { - SocketAddr::new(self.ip.ip(), port) + pub fn address_with_port(&self, port: u16) -> SocketAddrV6 { + SocketAddrV6::new(Ipv6Addr::from(self.ip), port, 0, 0) + } + + pub fn aaaa(&self) -> AAAA { + AAAA::Zone(self.id()) + } + + pub fn srv(&self) -> SRV { + match self.kind { + DatasetKind::Crucible => { + SRV::Backend(BackendName::Crucible, self.id()) + } + DatasetKind::Clickhouse => SRV::Service(ServiceName::Clickhouse), + DatasetKind::Cockroach => SRV::Service(ServiceName::Cockroach), + } } } diff --git a/nexus/db-model/src/dataset_kind.rs b/nexus/db-model/src/dataset_kind.rs index e2c0510ab3d..b131e6b7a82 100644 --- a/nexus/db-model/src/dataset_kind.rs +++ b/nexus/db-model/src/dataset_kind.rs @@ -4,14 +4,15 @@ use super::impl_enum_type; use nexus_types::internal_api; +use omicron_common::address::{CLICKHOUSE_PORT, COCKROACH_PORT, CRUCIBLE_PORT}; use serde::{Deserialize, Serialize}; impl_enum_type!( - #[derive(SqlType, Debug, QueryId)] + #[derive(Clone, SqlType, Debug, QueryId)] #[diesel(postgres_type(name = "dataset_kind"))] pub struct DatasetKindEnum; - #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] #[diesel(sql_type = DatasetKindEnum)] pub enum DatasetKind; @@ -21,6 +22,16 @@ impl_enum_type!( Clickhouse => b"clickhouse" ); +impl DatasetKind { + pub fn port(&self) -> u16 { + match self { + DatasetKind::Crucible => CRUCIBLE_PORT, + DatasetKind::Cockroach => COCKROACH_PORT, + DatasetKind::Clickhouse => CLICKHOUSE_PORT, + } + } +} + impl From for DatasetKind { fn from(k: internal_api::params::DatasetKind) -> Self { match k { diff --git a/nexus/db-model/src/ipv6.rs b/nexus/db-model/src/ipv6.rs index 2b494100825..60f7c0558c6 100644 --- a/nexus/db-model/src/ipv6.rs +++ b/nexus/db-model/src/ipv6.rs @@ -16,9 +16,19 @@ use diesel::sql_types::Inet; use ipnetwork::IpNetwork; use ipnetwork::Ipv6Network; use omicron_common::api::external::Error; +use serde::{Deserialize, Serialize}; #[derive( - Clone, Copy, AsExpression, FromSqlRow, PartialEq, Ord, PartialOrd, Eq, + Clone, + Copy, + AsExpression, + FromSqlRow, + PartialEq, + Ord, + PartialOrd, + Eq, + Deserialize, + Serialize, )] #[diesel(sql_type = Inet)] pub struct Ipv6Addr(std::net::Ipv6Addr); diff --git a/nexus/db-model/src/service.rs b/nexus/db-model/src/service.rs index 3b9e57cfc62..9deaeef2c37 100644 --- a/nexus/db-model/src/service.rs +++ b/nexus/db-model/src/service.rs @@ -6,11 +6,16 @@ use super::ServiceKind; use crate::ipv6; use crate::schema::service; use db_macros::Asset; -use std::net::Ipv6Addr; +use internal_dns_client::names::{ServiceName, AAAA, SRV}; +use nexus_types::identity::Asset; +use omicron_common::address::{ + DENDRITE_PORT, DNS_SERVER_PORT, NEXUS_INTERNAL_PORT, OXIMETER_PORT, +}; +use std::net::{Ipv6Addr, SocketAddrV6}; use uuid::Uuid; /// Representation of services which may run on Sleds. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset, PartialEq)] #[diesel(table_name = service)] pub struct Service { #[diesel(embed)] @@ -35,4 +40,27 @@ impl Service { kind, } } + + pub fn aaaa(&self) -> AAAA { + AAAA::Zone(self.id()) + } + + pub fn srv(&self) -> SRV { + match self.kind { + ServiceKind::InternalDNS => SRV::Service(ServiceName::InternalDNS), + ServiceKind::Nexus => SRV::Service(ServiceName::Nexus), + ServiceKind::Oximeter => SRV::Service(ServiceName::Oximeter), + ServiceKind::Dendrite => SRV::Service(ServiceName::Dendrite), + } + } + + pub fn address(&self) -> SocketAddrV6 { + let port = match self.kind { + ServiceKind::InternalDNS => DNS_SERVER_PORT, + ServiceKind::Nexus => NEXUS_INTERNAL_PORT, + ServiceKind::Oximeter => OXIMETER_PORT, + ServiceKind::Dendrite => DENDRITE_PORT, + }; + SocketAddrV6::new(Ipv6Addr::from(self.ip), port, 0, 0) + } } diff --git a/nexus/db-model/src/service_kind.rs b/nexus/db-model/src/service_kind.rs index d7230367d45..661bde449a6 100644 --- a/nexus/db-model/src/service_kind.rs +++ b/nexus/db-model/src/service_kind.rs @@ -7,11 +7,11 @@ use nexus_types::internal_api; use serde::{Deserialize, Serialize}; impl_enum_type!( - #[derive(SqlType, Debug, QueryId)] + #[derive(Clone, SqlType, Debug, QueryId)] #[diesel(postgres_type(name = "service_kind"))] pub struct ServiceKindEnum; - #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] #[diesel(sql_type = ServiceKindEnum)] pub enum ServiceKind; diff --git a/nexus/db-model/src/sled.rs b/nexus/db-model/src/sled.rs index 784bb1b56f6..faa96b9af1a 100644 --- a/nexus/db-model/src/sled.rs +++ b/nexus/db-model/src/sled.rs @@ -14,7 +14,7 @@ use std::net::SocketAddrV6; use uuid::Uuid; /// Database representation of a Sled. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset, PartialEq)] #[diesel(table_name = sled)] pub struct Sled { #[diesel(embed)] diff --git a/nexus/db-model/src/zpool.rs b/nexus/db-model/src/zpool.rs index bad66359131..42489b4e09f 100644 --- a/nexus/db-model/src/zpool.rs +++ b/nexus/db-model/src/zpool.rs @@ -14,7 +14,7 @@ use uuid::Uuid; /// /// A zpool represents a ZFS storage pool, allocated on a single /// physical sled. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset, PartialEq)] #[diesel(table_name = zpool)] pub struct Zpool { #[diesel(embed)] diff --git a/nexus/src/app/background/fakes.rs b/nexus/src/app/background/fakes.rs new file mode 100644 index 00000000000..2096c9235ac --- /dev/null +++ b/nexus/src/app/background/fakes.rs @@ -0,0 +1,179 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Test-only implementations of interfaces used by background tasks. + +use super::interfaces::{ + DnsUpdaterInterface, NexusInterface, SledClientInterface, +}; + +use crate::db::datastore::DataStore; +use async_trait::async_trait; +use internal_dns_client::{ + multiclient::{AAAARecord, DnsError}, + names::SRV, +}; +use omicron_common::address::{Ipv6Subnet, RACK_PREFIX}; +use omicron_common::api::external::Error; +use sled_agent_client::types as SledAgentTypes; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use uuid::Uuid; + +/// A fake implementation of a Sled Agent client. +/// +/// In lieu of any networked requests, stores onto the requested services and +/// datasets for later inspection. +pub struct FakeSledClientInner { + service_request: Option, + dataset_requests: Vec, +} + +#[derive(Clone)] +pub struct FakeSledClient { + inner: Arc>, +} + +impl FakeSledClient { + fn new() -> Arc { + Arc::new(Self { + inner: Arc::new(Mutex::new(FakeSledClientInner { + service_request: None, + dataset_requests: vec![], + })), + }) + } + + /// Returns the requests to create services on the sled. + pub fn service_requests(&self) -> Vec { + self.inner + .lock() + .unwrap() + .service_request + .as_ref() + .map(|request| request.services.clone()) + .unwrap_or(vec![]) + } + + /// Returns the requests to create datasets on the sled. + pub fn dataset_requests(&self) -> Vec { + self.inner.lock().unwrap().dataset_requests.clone() + } +} + +#[async_trait] +impl SledClientInterface for FakeSledClient { + async fn services_put( + &self, + body: &SledAgentTypes::ServiceEnsureBody, + ) -> Result<(), Error> { + let old = + self.inner.lock().unwrap().service_request.replace(body.clone()); + assert!( + old.is_none(), + "Should only set services once (was {old:?}, inserted {body:?})" + ); + Ok(()) + } + + async fn filesystem_put( + &self, + body: &SledAgentTypes::DatasetEnsureBody, + ) -> Result<(), Error> { + self.inner.lock().unwrap().dataset_requests.push(body.clone()); + Ok(()) + } +} + +/// Provides an abstraction of Nexus which can be used by tests. +/// +/// Wraps a real datastore, but fakes out all networked requests. +#[derive(Clone)] +pub struct FakeNexus { + datastore: Arc, + rack_id: Uuid, + rack_subnet: Ipv6Subnet, + sleds: Arc>>>, +} + +impl FakeNexus { + pub fn new( + datastore: Arc, + rack_subnet: Ipv6Subnet, + ) -> Arc { + Arc::new(Self { + datastore, + rack_id: Uuid::new_v4(), + rack_subnet, + sleds: Arc::new(Mutex::new(HashMap::new())), + }) + } +} + +#[async_trait] +impl NexusInterface for FakeNexus { + fn rack_id(&self) -> Uuid { + self.rack_id + } + + fn rack_subnet(&self) -> Ipv6Subnet { + self.rack_subnet + } + + fn datastore(&self) -> &Arc { + &self.datastore + } + + async fn sled_client( + &self, + id: &Uuid, + ) -> Result, Error> { + let sled = self + .sleds + .lock() + .unwrap() + .entry(*id) + .or_insert_with(|| FakeSledClient::new()) + .clone(); + Ok(sled) + } +} + +/// A fake implementation of the DNS updater. +/// +/// Avoids all networking, instead storing all outgoing requests for later +/// inspection. +#[derive(Clone)] +pub struct FakeDnsUpdater { + records: Arc>>>, +} + +impl FakeDnsUpdater { + pub fn new() -> Self { + Self { records: Arc::new(Mutex::new(HashMap::new())) } + } + + // Get a copy of all records. + pub fn records(&self) -> HashMap> { + self.records.lock().unwrap().clone() + } +} + +#[async_trait] +impl DnsUpdaterInterface for FakeDnsUpdater { + async fn insert_dns_records( + &self, + records: &HashMap>, + ) -> Result<(), DnsError> { + let mut our_records = self.records.lock().unwrap(); + for (k, v) in records { + let old = our_records.insert(k.clone(), v.clone()); + assert!( + old.is_none(), + "Inserted key {k}, but found old value: {old:?}" + ); + } + Ok(()) + } +} diff --git a/nexus/src/app/background/interfaces.rs b/nexus/src/app/background/interfaces.rs new file mode 100644 index 00000000000..daf0e849e44 --- /dev/null +++ b/nexus/src/app/background/interfaces.rs @@ -0,0 +1,107 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Interfaces which can be faked out for testing. + +use crate::db::datastore::DataStore; +use crate::Nexus; +use async_trait::async_trait; +use internal_dns_client::{ + multiclient::{AAAARecord, DnsError, Updater as DnsUpdater}, + names::SRV, +}; +use omicron_common::address::{Ipv6Subnet, RACK_PREFIX}; +use omicron_common::api::external::Error; +use sled_agent_client::types as SledAgentTypes; +use std::collections::HashMap; +use std::sync::Arc; +use uuid::Uuid; + +// A trait intended to aid testing. +// +// The non-test implementation should be as simple as possible. +#[async_trait] +pub trait SledClientInterface { + async fn services_put( + &self, + body: &SledAgentTypes::ServiceEnsureBody, + ) -> Result<(), Error>; + async fn filesystem_put( + &self, + body: &SledAgentTypes::DatasetEnsureBody, + ) -> Result<(), Error>; +} + +#[async_trait] +impl SledClientInterface for sled_agent_client::Client { + async fn services_put( + &self, + body: &SledAgentTypes::ServiceEnsureBody, + ) -> Result<(), Error> { + self.services_put(body).await?; + Ok(()) + } + + async fn filesystem_put( + &self, + body: &SledAgentTypes::DatasetEnsureBody, + ) -> Result<(), Error> { + self.filesystem_put(body).await?; + Ok(()) + } +} + +// A trait intended to aid testing. +// +// The non-test implementation should be as simple as possible. +#[async_trait] +pub trait NexusInterface { + fn rack_id(&self) -> Uuid; + fn rack_subnet(&self) -> Ipv6Subnet; + fn datastore(&self) -> &Arc; + async fn sled_client(&self, id: &Uuid) -> Result, Error>; +} + +#[async_trait] +impl NexusInterface for Nexus { + fn rack_id(&self) -> Uuid { + self.rack_id + } + + fn rack_subnet(&self) -> Ipv6Subnet { + self.rack_subnet + } + + fn datastore(&self) -> &Arc { + self.datastore() + } + + async fn sled_client( + &self, + id: &Uuid, + ) -> Result, Error> { + self.sled_client(id).await + } +} + +// A trait intended to aid testing. +// +// The non-test implementation should be as simple as possible. +#[async_trait] +pub trait DnsUpdaterInterface { + async fn insert_dns_records( + &self, + records: &HashMap>, + ) -> Result<(), DnsError>; +} + +#[async_trait] +impl DnsUpdaterInterface for DnsUpdater { + async fn insert_dns_records( + &self, + records: &HashMap>, + ) -> Result<(), DnsError> { + self.insert_dns_records(records).await + } +} diff --git a/nexus/src/app/background/mod.rs b/nexus/src/app/background/mod.rs new file mode 100644 index 00000000000..d7604f849d7 --- /dev/null +++ b/nexus/src/app/background/mod.rs @@ -0,0 +1,52 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background tasks managed by Nexus. + +#[cfg(test)] +mod fakes; +mod interfaces; +mod services; + +use crate::app::Nexus; +use internal_dns_client::multiclient::Updater as DnsUpdater; +use std::sync::Arc; +use tokio::task::{spawn, JoinHandle}; + +/// Management structure which encapsulates periodically-executing background +/// tasks. +pub struct TaskRunner { + _handle: JoinHandle<()>, +} + +impl TaskRunner { + pub fn new(nexus: Arc) -> Self { + let handle = spawn(async move { + let log = nexus.log.new(o!("component" => "BackgroundTaskRunner")); + + let dns_updater = DnsUpdater::new( + &nexus.az_subnet(), + log.new(o!("component" => "DNS Updater")), + ); + let service_balancer = services::ServiceBalancer::new( + log.clone(), + nexus.clone(), + dns_updater, + ); + + loop { + // TODO: We may want triggers to exist here, to invoke this task + // more frequently (e.g., on Sled failure). + let opctx = nexus.opctx_for_service_balancer(); + if let Err(e) = service_balancer.balance_services(&opctx).await + { + warn!(log, "Failed to balance services: {:?}", e); + } + + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + } + }); + Self { _handle: handle } + } +} diff --git a/nexus/src/app/background/services.rs b/nexus/src/app/background/services.rs new file mode 100644 index 00000000000..a1d84f59320 --- /dev/null +++ b/nexus/src/app/background/services.rs @@ -0,0 +1,759 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Task which ensures that expected Nexus services exist. + +use super::interfaces::{ + DnsUpdaterInterface, NexusInterface, SledClientInterface, +}; +use crate::context::OpContext; +use crate::db::datastore::DatasetRedundancy; +use crate::db::identity::Asset; +use crate::db::model::Dataset; +use crate::db::model::DatasetKind; +use crate::db::model::Service; +use crate::db::model::ServiceKind; +use crate::db::model::Sled; +use crate::db::model::Zpool; +use futures::stream::{self, StreamExt, TryStreamExt}; +use omicron_common::address::{ + DNS_PORT, DNS_REDUNDANCY, DNS_SERVER_PORT, NEXUS_EXTERNAL_PORT, + NEXUS_INTERNAL_PORT, +}; +use omicron_common::api::external::Error; +use sled_agent_client::types as SledAgentTypes; +use slog::Logger; +use std::collections::{HashMap, HashSet}; +use std::marker::PhantomData; +use std::net::{Ipv6Addr, SocketAddrV6}; +use std::sync::Arc; + +// Policy for the number of services to be provisioned. +#[derive(Debug)] +enum ServiceRedundancy { + // This service must exist on at least this many sleds + // within the rack. + PerRack(u32), + + // This service must exist on at least this many sleds + // within the availability zone. + DnsPerAz(u32), +} + +#[derive(Debug)] +struct ExpectedService { + kind: ServiceKind, + redundancy: ServiceRedundancy, +} + +// NOTE: longer-term, when we integrate multi-rack support, +// it is expected that Nexus will manage multiple racks +// within the fleet, rather than simply per-rack services. +// +// When that happens, it is likely that many of the "per-rack" +// services will become "per-fleet", such as Nexus and CRDB. +const EXPECTED_SERVICES: [ExpectedService; 3] = [ + ExpectedService { + kind: ServiceKind::InternalDNS, + redundancy: ServiceRedundancy::DnsPerAz(DNS_REDUNDANCY), + }, + ExpectedService { + kind: ServiceKind::Nexus, + redundancy: ServiceRedundancy::PerRack(1), + }, + ExpectedService { + kind: ServiceKind::Oximeter, + redundancy: ServiceRedundancy::PerRack(1), + }, +]; + +#[derive(Debug)] +struct ExpectedDataset { + kind: DatasetKind, + redundancy: DatasetRedundancy, +} + +const EXPECTED_DATASETS: [ExpectedDataset; 3] = [ + ExpectedDataset { + kind: DatasetKind::Crucible, + redundancy: DatasetRedundancy::OnAll, + }, + ExpectedDataset { + kind: DatasetKind::Cockroach, + redundancy: DatasetRedundancy::PerRack(1), + }, + ExpectedDataset { + kind: DatasetKind::Clickhouse, + redundancy: DatasetRedundancy::PerRack(1), + }, +]; + +/// Contains logic for balancing services within a fleet. +/// +/// This struct operates on generic parameters to easily permit +/// dependency injection via testing, but in production, practically +/// operates on the same concrete types. +pub struct ServiceBalancer +where + D: DnsUpdaterInterface, + N: NexusInterface, + S: SledClientInterface, +{ + log: Logger, + nexus: Arc, + dns_updater: D, + phantom: PhantomData, +} + +impl ServiceBalancer +where + D: DnsUpdaterInterface, + N: NexusInterface, + S: SledClientInterface, +{ + pub fn new(log: Logger, nexus: Arc, dns_updater: D) -> Self { + Self { log, nexus, dns_updater, phantom: PhantomData } + } + + // Reaches out to all sled agents implied in "services", and + // requests that the desired services are executing. + async fn instantiate_services( + &self, + opctx: &OpContext, + services: Vec, + ) -> Result<(), Error> { + let mut sled_ids = HashSet::new(); + for svc in &services { + sled_ids.insert(svc.sled_id); + } + + // For all sleds requiring an update, request all services be + // instantiated. + stream::iter(&sled_ids) + .map(Ok::<_, Error>) + .try_for_each_concurrent(None, |sled_id| async { + // TODO: This interface kinda sucks; ideally we would + // only insert the *new* services. + // + // Inserting the old ones too is costing us an extra query. + let services = self + .nexus + .datastore() + .service_list(opctx, *sled_id) + .await?; + let sled_client = self.nexus.sled_client(sled_id).await?; + + info!(self.log, "instantiate_services: {:?}", services); + + sled_client + .services_put(&SledAgentTypes::ServiceEnsureBody { + services: services + .iter() + .map(|s| { + let address = Ipv6Addr::from(s.ip); + let (name, service_type) = + Self::get_service_name_and_type( + address, s.kind, + ); + + // TODO: This is hacky, specifically to inject + // global zone addresses in the DNS service. + let gz_addresses = match &s.kind { + ServiceKind::InternalDNS => { + let mut octets = address.octets(); + octets[15] = octets[15] + 1; + vec![Ipv6Addr::from(octets)] + } + _ => vec![], + }; + + SledAgentTypes::ServiceRequest { + id: s.id(), + name, + addresses: vec![address], + gz_addresses, + service_type, + } + }) + .collect(), + }) + .await?; + Ok(()) + }) + .await?; + + let mut records = HashMap::new(); + for service in &services { + records + .entry(service.srv()) + .or_insert_with(Vec::new) + .push((service.aaaa(), service.address())); + } + self.dns_updater + .insert_dns_records(&records) + .await + .map_err(|e| Error::internal_error(&e.to_string()))?; + + Ok(()) + } + + // Translates (address, db kind) to Sled Agent client types. + fn get_service_name_and_type( + address: Ipv6Addr, + kind: ServiceKind, + ) -> (String, SledAgentTypes::ServiceType) { + match kind { + ServiceKind::Nexus => ( + "nexus".to_string(), + SledAgentTypes::ServiceType::Nexus { + internal_address: SocketAddrV6::new( + address, + NEXUS_INTERNAL_PORT, + 0, + 0, + ) + .to_string(), + external_address: SocketAddrV6::new( + address, + NEXUS_EXTERNAL_PORT, + 0, + 0, + ) + .to_string(), + }, + ), + ServiceKind::InternalDNS => ( + "internal-dns".to_string(), + SledAgentTypes::ServiceType::InternalDns { + server_address: SocketAddrV6::new( + address, + DNS_SERVER_PORT, + 0, + 0, + ) + .to_string(), + dns_address: SocketAddrV6::new(address, DNS_PORT, 0, 0) + .to_string(), + }, + ), + ServiceKind::Oximeter => { + ("oximeter".to_string(), SledAgentTypes::ServiceType::Oximeter) + } + ServiceKind::Dendrite => ( + "dendrite".to_string(), + SledAgentTypes::ServiceType::Dendrite { + asic: SledAgentTypes::DendriteAsic::TofinoStub, + }, + ), + } + } + + // Provision the services within the database. + async fn provision_rack_service( + &self, + opctx: &OpContext, + kind: ServiceKind, + desired_count: u32, + ) -> Result, Error> { + self.nexus + .datastore() + .ensure_rack_service( + opctx, + self.nexus.rack_id(), + kind, + desired_count, + ) + .await + } + + // Provision the services within the database. + async fn provision_dns_service( + &self, + opctx: &OpContext, + desired_count: u32, + ) -> Result, Error> { + self.nexus + .datastore() + .ensure_dns_service(opctx, self.nexus.rack_subnet(), desired_count) + .await + } + + async fn ensure_services_provisioned( + &self, + opctx: &OpContext, + expected_services: &[ExpectedService], + ) -> Result<(), Error> { + // Provision services within the database. + let mut svcs = vec![]; + for expected_svc in expected_services { + info!(self.log, "Ensuring service {:?} exists", expected_svc); + match expected_svc.redundancy { + ServiceRedundancy::PerRack(desired_count) => { + svcs.extend_from_slice( + &self + .provision_rack_service( + opctx, + expected_svc.kind, + desired_count, + ) + .await?, + ); + } + ServiceRedundancy::DnsPerAz(desired_count) => { + svcs.extend_from_slice( + &self + .provision_dns_service(opctx, desired_count) + .await?, + ); + } + } + } + + // Ensure services exist on the target sleds. + self.instantiate_services(opctx, svcs).await?; + Ok(()) + } + + async fn ensure_rack_dataset( + &self, + opctx: &OpContext, + kind: DatasetKind, + redundancy: DatasetRedundancy, + ) -> Result<(), Error> { + // Provision the datasets within the database. + let new_datasets = self + .nexus + .datastore() + .ensure_rack_dataset(opctx, self.nexus.rack_id(), kind, redundancy) + .await?; + + // Actually instantiate those datasets. + self.instantiate_datasets(new_datasets, kind).await + } + + // Reaches out to all sled agents implied in "services", and + // requests that the desired services are executing. + async fn instantiate_datasets( + &self, + datasets: Vec<(Sled, Zpool, Dataset)>, + kind: DatasetKind, + ) -> Result<(), Error> { + if datasets.is_empty() { + return Ok(()); + } + + // Ensure that there is one connection per sled. + let mut sled_clients = HashMap::new(); + for (sled, _, _) in &datasets { + if sled_clients.get(&sled.id()).is_none() { + let sled_client = self.nexus.sled_client(&sled.id()).await?; + sled_clients.insert(sled.id(), sled_client); + } + } + + // Issue all dataset instantiation requests concurrently. + stream::iter(&datasets) + .map(Ok::<_, Error>) + .try_for_each_concurrent(None, |(sled, zpool, dataset)| async { + let sled_client = sled_clients.get(&sled.id()).unwrap(); + + let dataset_kind = match kind { + // TODO: This set of "all addresses" isn't right. + // TODO: ... should we even be using "all addresses" to contact CRDB? + // Can it just rely on DNS, somehow? + DatasetKind::Cockroach => { + SledAgentTypes::DatasetKind::CockroachDb(vec![]) + } + DatasetKind::Crucible => { + SledAgentTypes::DatasetKind::Crucible + } + DatasetKind::Clickhouse => { + SledAgentTypes::DatasetKind::Clickhouse + } + }; + + // Instantiate each dataset. + sled_client + .filesystem_put(&SledAgentTypes::DatasetEnsureBody { + id: dataset.id(), + zpool_id: zpool.id(), + dataset_kind, + address: dataset.address().to_string(), + }) + .await?; + Ok(()) + }) + .await?; + + // Ensure all DNS records are updated for the created datasets. + let mut records = HashMap::new(); + for (_, _, dataset) in &datasets { + records + .entry(dataset.srv()) + .or_insert_with(Vec::new) + .push((dataset.aaaa(), dataset.address())); + } + self.dns_updater + .insert_dns_records(&records) + .await + .map_err(|e| Error::internal_error(&e.to_string()))?; + + Ok(()) + } + + async fn ensure_datasets_provisioned( + &self, + opctx: &OpContext, + expected_datasets: &[ExpectedDataset], + ) -> Result<(), Error> { + // Provision all dataset types concurrently. + stream::iter(expected_datasets) + .map(Ok::<_, Error>) + .try_for_each_concurrent(None, |expected_dataset| async move { + info!( + self.log, + "Ensuring dataset {:?} exists", expected_dataset + ); + self.ensure_rack_dataset( + opctx, + expected_dataset.kind, + expected_dataset.redundancy, + ) + .await?; + Ok(()) + }) + .await + } + + /// Provides a single point-in-time evaluation and adjustment of + /// the services provisioned within the rack. + /// + /// May adjust the provisioned services to meet the redundancy of the + /// rack, if necessary. + // TODO: Consider using sagas to ensure the rollout of services. + // + // Not using sagas *happens* to be fine because these operations are + // re-tried periodically, but that's kind forcing a dependency on the + // caller. + pub async fn balance_services( + &self, + opctx: &OpContext, + ) -> Result<(), Error> { + self.ensure_datasets_provisioned(opctx, &EXPECTED_DATASETS).await?; + self.ensure_services_provisioned(opctx, &EXPECTED_SERVICES).await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::app::background::fakes::{FakeDnsUpdater, FakeNexus}; + use crate::db::datastore::DataStore; + use crate::{authn, authz}; + use dropshot::test_util::LogContext; + use internal_dns_client::names::{BackendName, AAAA, SRV}; + use nexus_test_utils::db::test_setup_database; + use omicron_common::address::Ipv6Subnet; + use omicron_common::api::external::ByteCount; + use omicron_test_utils::dev; + use std::sync::Arc; + use uuid::Uuid; + + // TODO: maybe figure out what you *want* to test? + // I suspect we'll need to refactor this API for testability. + // + // - Dataset init: + // - Call to DB + // - For each new dataset... + // - Call to Sled (filesystem put) + // - Update DNS record + // + // - Service init: + // - Call to DB + // - For each sled... + // - List svcs + // - Put svcs + // - For each new service... + // - Update DNS record + // + // TODO: Also, idempotency check + + struct ProvisionTest { + logctx: LogContext, + opctx: OpContext, + db: dev::db::CockroachInstance, + datastore: Arc, + } + + impl ProvisionTest { + // Create the logger and setup the database. + async fn new(name: &str) -> Self { + let logctx = dev::test_setup_log(name); + let db = test_setup_database(&logctx.log).await; + let (_, datastore) = + crate::db::datastore::datastore_test(&logctx, &db).await; + let opctx = OpContext::for_background( + logctx.log.new(o!()), + Arc::new(authz::Authz::new(&logctx.log)), + authn::Context::internal_service_balancer(), + datastore.clone(), + ); + Self { logctx, opctx, db, datastore } + } + + async fn cleanup(mut self) { + self.db.cleanup().await.unwrap(); + self.logctx.cleanup_successful(); + } + } + + async fn create_test_sled(rack_id: Uuid, datastore: &DataStore) -> Uuid { + let bogus_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, bogus_addr.clone(), rack_id); + datastore.sled_upsert(sled).await.unwrap(); + sled_id + } + + async fn create_test_zpool(datastore: &DataStore, sled_id: Uuid) -> Uuid { + let zpool_id = Uuid::new_v4(); + let zpool = Zpool::new( + zpool_id, + sled_id, + &crate::internal_api::params::ZpoolPutRequest { + size: ByteCount::from_gibibytes_u32(10), + }, + ); + datastore.zpool_upsert(zpool).await.unwrap(); + zpool_id + } + + #[tokio::test] + async fn test_provision_dataset_on_all_no_zpools() { + let test = + ProvisionTest::new("test_provision_dataset_on_all_no_zpools").await; + + let rack_subnet = Ipv6Subnet::new(Ipv6Addr::LOCALHOST); + let nexus = FakeNexus::new(test.datastore.clone(), rack_subnet); + let dns_updater = FakeDnsUpdater::new(); + let service_balancer = ServiceBalancer::new( + test.logctx.log.clone(), + nexus.clone(), + dns_updater.clone(), + ); + + // Setup: One sled, no zpools. + let sled_id = create_test_sled(nexus.rack_id(), &test.datastore).await; + + // Make the request to the service balancer for Crucibles on all Zpools. + // + // However, with no zpools, this is a no-op. + let expected_datasets = [ExpectedDataset { + kind: DatasetKind::Crucible, + redundancy: DatasetRedundancy::OnAll, + }]; + service_balancer + .ensure_datasets_provisioned(&test.opctx, &expected_datasets) + .await + .unwrap(); + + // Observe that nothing was requested at the sled. + let sled = nexus.sled_client(&sled_id).await.unwrap(); + assert!(sled.service_requests().is_empty()); + assert!(sled.dataset_requests().is_empty()); + + // Observe that no DNS records were updated. + let records = dns_updater.records(); + assert!(records.is_empty()); + + test.cleanup().await; + } + + #[tokio::test] + async fn test_provision_dataset_on_all_zpools() { + let test = + ProvisionTest::new("test_provision_dataset_on_all_zpools").await; + + let rack_subnet = Ipv6Subnet::new(Ipv6Addr::LOCALHOST); + let nexus = FakeNexus::new(test.datastore.clone(), rack_subnet); + let dns_updater = FakeDnsUpdater::new(); + let service_balancer = ServiceBalancer::new( + test.logctx.log.clone(), + nexus.clone(), + dns_updater.clone(), + ); + + // Setup: One sled, multiple zpools + let sled_id = create_test_sled(nexus.rack_id(), &test.datastore).await; + const ZPOOL_COUNT: usize = 3; + let mut zpools = vec![]; + for _ in 0..ZPOOL_COUNT { + zpools.push(create_test_zpool(&test.datastore, sled_id).await); + } + + // Make the request to the service balancer for Crucibles on all Zpools. + let expected_datasets = [ExpectedDataset { + kind: DatasetKind::Crucible, + redundancy: DatasetRedundancy::OnAll, + }]; + service_balancer + .ensure_datasets_provisioned(&test.opctx, &expected_datasets) + .await + .unwrap(); + + // Observe that datasets were requested on each zpool. + let sled = nexus.sled_client(&sled_id).await.unwrap(); + assert!(sled.service_requests().is_empty()); + let dataset_requests = sled.dataset_requests(); + assert_eq!(ZPOOL_COUNT, dataset_requests.len()); + for request in &dataset_requests { + assert!( + zpools.contains(&request.zpool_id), + "Dataset request for unexpected zpool" + ); + assert!(matches!( + request.dataset_kind, + SledAgentTypes::DatasetKind::Crucible + )); + } + + // Observe that DNS records for each Crucible exist. + let records = dns_updater.records(); + assert_eq!(ZPOOL_COUNT, records.len()); + for (srv, aaaas) in records { + match srv { + SRV::Backend(BackendName::Crucible, dataset_id) => { + let expected_address = dataset_requests + .iter() + .find_map(|request| { + if request.id == dataset_id { + Some(request.address.clone()) + } else { + None + } + }) + .unwrap(); + + assert_eq!(1, aaaas.len()); + let (aaaa_name, dns_addr) = &aaaas[0]; + assert_eq!(dns_addr.to_string(), expected_address); + if let AAAA::Zone(zone_id) = aaaa_name { + assert_eq!( + *zone_id, dataset_id, + "Expected AAAA UUID to match SRV record", + ); + } else { + panic!( + "Expected AAAA record for Zone from {aaaa_name}" + ); + } + } + _ => panic!("Unexpected SRV record"), + } + } + + test.cleanup().await; + } + + // Observe that "per-rack" dataset provisions can be completed. + // + // This test uses multiple racks, and verifies that a provision occurs + // on each one. + #[tokio::test] + async fn test_provision_dataset_per_rack() { + let test = ProvisionTest::new("test_provision_dataset_per_rack").await; + + let rack_subnet = Ipv6Subnet::new(Ipv6Addr::LOCALHOST); + let nexus = FakeNexus::new(test.datastore.clone(), rack_subnet); + let dns_updater = FakeDnsUpdater::new(); + let service_balancer = ServiceBalancer::new( + test.logctx.log.clone(), + nexus.clone(), + dns_updater.clone(), + ); + + // Setup: Create a couple sleds on the first rack, and create a third + // sled on a "different rack". + // + // Each sled gets a single zpool. + let mut zpools = vec![]; + + let sled1_id = create_test_sled(nexus.rack_id(), &test.datastore).await; + zpools.push(create_test_zpool(&test.datastore, sled1_id).await); + + let sled2_id = create_test_sled(nexus.rack_id(), &test.datastore).await; + zpools.push(create_test_zpool(&test.datastore, sled2_id).await); + + let other_rack_id = Uuid::new_v4(); + let other_rack_sled_id = + create_test_sled(other_rack_id, &test.datastore).await; + zpools + .push(create_test_zpool(&test.datastore, other_rack_sled_id).await); + + // Ask for one dataset per rack. + let expected_datasets = [ExpectedDataset { + kind: DatasetKind::Cockroach, + redundancy: DatasetRedundancy::PerRack(1), + }]; + service_balancer + .ensure_datasets_provisioned(&test.opctx, &expected_datasets) + .await + .unwrap(); + + // Observe that the datasets were requested on each rack. + + // Rack 1: One of the two sleds should have a dataset. + let sled = nexus.sled_client(&sled1_id).await.unwrap(); + let requests1 = sled.dataset_requests(); + if !requests1.is_empty() { + assert_eq!(1, requests1.len()); + assert_eq!(zpools[0], requests1[0].zpool_id); + } + let sled = nexus.sled_client(&sled2_id).await.unwrap(); + let requests2 = sled.dataset_requests(); + if !requests2.is_empty() { + assert_eq!(1, requests2.len()); + assert_eq!(zpools[1], requests2[0].zpool_id); + } + assert!( + requests1.is_empty() ^ requests2.is_empty(), + "One of the sleds should have a dataset, the other should not" + ); + + // Rack 2: The sled should have a dataset. + let sled = nexus.sled_client(&other_rack_sled_id).await.unwrap(); + let requests = sled.dataset_requests(); + // TODO(https://github.com/oxidecomputer/omicron/issues/1276): + // We should see a request to the "other rack" when multi-rack + // is supported. + // + // At the moment, however, all requests for service-balancing are + // "rack-local". + assert_eq!(0, requests.len()); + + // We should be able to assert this when multi-rack is supported. + // assert_eq!(zpools[2], requests[0].zpool_id); + + test.cleanup().await; + } + + /* + #[tokio::test] + async fn test_provision_service_per_rack() { + todo!(); + } + + #[tokio::test] + async fn test_provision_service_dns_per_az() { + todo!(); + } + */ +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 559ea15ffb3..0a8469203b0 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -15,9 +15,11 @@ use crate::populate::PopulateArgs; use crate::populate::PopulateStatus; use crate::saga_interface::SagaContext; use anyhow::anyhow; +use omicron_common::address::{Ipv6Subnet, AZ_PREFIX, RACK_PREFIX}; use omicron_common::api::external::Error; use slog::Logger; use std::sync::Arc; +use tokio::sync::OnceCell; use uuid::Uuid; // The implementation of Nexus is large, and split into a number of submodules @@ -43,6 +45,9 @@ mod vpc; mod vpc_router; mod vpc_subnet; +// Background tasks exist in the "background" module. +mod background; + // Sagas are not part of the "Nexus" implementation, but they are // application logic. mod sagas; @@ -65,6 +70,9 @@ pub struct Nexus { /// uuid for this rack rack_id: Uuid, + /// subnet of this rack + rack_subnet: Ipv6Subnet, + /// general server log log: Logger, @@ -83,6 +91,9 @@ pub struct Nexus { /// Status of background task to populate database populate_status: tokio::sync::watch::Receiver, + /// Background task for Nexus. + background_task_runner: OnceCell, + /// Client to the timeseries database. timeseries_client: LazyTimeseriesClient, @@ -141,7 +152,7 @@ impl Nexus { sec_store, )); - // Connect to clickhouse - but do so lazily. + // Connect to Clickhouse - but do so lazily. // Clickhouse may not be executing when Nexus starts. let timeseries_client = if let Some(address) = &config.pkg.timeseries_db.address { @@ -172,12 +183,14 @@ impl Nexus { let nexus = Nexus { id: config.deployment.id, rack_id, + rack_subnet: config.deployment.subnet, log: log.new(o!()), db_datastore: Arc::clone(&db_datastore), authz: Arc::clone(&authz), sec_client: Arc::clone(&sec_client), recovery_task: std::sync::Mutex::new(None), populate_status, + background_task_runner: OnceCell::new(), timeseries_client, updates_config: config.pkg.updates.clone(), tunables: config.pkg.tunables.clone(), @@ -222,6 +235,10 @@ impl Nexus { nexus } + pub fn az_subnet(&self) -> Ipv6Subnet { + Ipv6Subnet::::new(self.rack_subnet.net().ip()) + } + /// Return the tunable configuration parameters, e.g. for use in tests. pub fn tunables(&self) -> &config::Tunables { &self.tunables @@ -244,6 +261,15 @@ impl Nexus { } } + pub fn start_background_tasks( + self: &Arc, + ) -> Result<(), anyhow::Error> { + let nexus = self.clone(); + self.background_task_runner + .set(background::TaskRunner::new(nexus)) + .map_err(|error| anyhow!(error.to_string())) + } + /// Returns an [`OpContext`] used for authenticating external requests pub fn opctx_external_authn(&self) -> &OpContext { &self.opctx_external_authn diff --git a/nexus/src/app/rack.rs b/nexus/src/app/rack.rs index dcc7ce92dbc..8b81dc61fdb 100644 --- a/nexus/src/app/rack.rs +++ b/nexus/src/app/rack.rs @@ -74,8 +74,12 @@ impl super::Nexus { }) .collect(); + // TODO(https://github.com/oxidecomputer/omicron/pull/1216): + // Actually supply datasets provided from the sled agent. + // + // This requires corresponding changes on the RSS side. self.db_datastore - .rack_set_initialized(opctx, rack_id, services) + .rack_set_initialized(opctx, rack_id, services, vec![]) .await?; Ok(()) diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index e4fc616f095..0e01112c532 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -16,7 +16,7 @@ use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; use sled_agent_client::Client as SledAgentClient; -use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::net::{Ipv6Addr, SocketAddrV6}; use std::sync::Arc; use uuid::Uuid; @@ -135,7 +135,7 @@ impl super::Nexus { &self, id: Uuid, zpool_id: Uuid, - address: SocketAddr, + address: SocketAddrV6, kind: DatasetKind, ) -> Result<(), Error> { info!(self.log, "upserting dataset"; "zpool_id" => zpool_id.to_string(), "dataset_id" => id.to_string(), "address" => address.to_string()); diff --git a/nexus/src/config.rs b/nexus/src/config.rs index 7266a3abd10..5323a918836 100644 --- a/nexus/src/config.rs +++ b/nexus/src/config.rs @@ -52,34 +52,30 @@ pub struct TimeseriesDbConfig { pub address: Option, } -// A deserializable type that does no validation on the tunable parameters. -#[derive(Clone, Debug, Deserialize, PartialEq)] -struct UnvalidatedTunables { - max_vpc_ipv4_subnet_prefix: u8, +fn deserialize_ipv4_subnet<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let prefix = u8::deserialize(deserializer)?; + Tunables::validate_ipv4_prefix(prefix) + .map_err(|e| serde::de::Error::custom(e))?; + Ok(prefix) } /// Tunable configuration parameters, intended for use in test environments or /// other situations in which experimentation / tuning is valuable. #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -#[serde(try_from = "UnvalidatedTunables")] pub struct Tunables { /// The maximum prefix size supported for VPC Subnet IPv4 subnetworks. /// /// Note that this is the maximum _prefix_ size, which sets the minimum size /// of the subnet. + #[serde(default, deserialize_with = "deserialize_ipv4_subnet")] pub max_vpc_ipv4_subnet_prefix: u8, -} - -// Convert from the unvalidated tunables, verifying each parameter as needed. -impl TryFrom for Tunables { - type Error = InvalidTunable; - fn try_from(unvalidated: UnvalidatedTunables) -> Result { - Tunables::validate_ipv4_prefix(unvalidated.max_vpc_ipv4_subnet_prefix)?; - Ok(Tunables { - max_vpc_ipv4_subnet_prefix: unvalidated.max_vpc_ipv4_subnet_prefix, - }) - } + /// Identifies whether or not background tasks will be enabled. + #[serde(default)] + pub enable_background_tasks: bool, } impl Tunables { @@ -121,7 +117,10 @@ pub const MAX_VPC_IPV4_SUBNET_PREFIX: u8 = 26; impl Default for Tunables { fn default() -> Self { - Tunables { max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX } + Tunables { + max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX, + enable_background_tasks: true, + } } } @@ -393,7 +392,10 @@ mod test { trusted_root: PathBuf::from("/path/to/root.json"), default_base_url: "http://example.invalid/".into(), }), - tunables: Tunables { max_vpc_ipv4_subnet_prefix: 27 }, + tunables: Tunables { + max_vpc_ipv4_subnet_prefix: 27, + enable_background_tasks: false, + }, }, } ); diff --git a/nexus/src/db/datastore/dataset.rs b/nexus/src/db/datastore/dataset.rs index 0023cbb972e..4ca593676f8 100644 --- a/nexus/src/db/datastore/dataset.rs +++ b/nexus/src/db/datastore/dataset.rs @@ -7,15 +7,25 @@ use super::DataStore; use super::RunnableQuery; use super::REGION_REDUNDANCY_THRESHOLD; +use crate::authz; use crate::db; use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; +use crate::db::collection_insert::SyncInsertError; +use crate::db::datastore::DatasetRedundancy; +use crate::db::datastore::OpContext; +use crate::db::error::public_error_from_diesel_create; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; +use crate::db::error::TransactionError; use crate::db::identity::Asset; use crate::db::model::Dataset; use crate::db::model::DatasetKind; +use crate::db::model::Sled; use crate::db::model::Zpool; +use crate::db::pool::DbConnection; +use async_bb8_diesel::AsyncConnection; +use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; use diesel::upsert::excluded; @@ -23,6 +33,8 @@ use omicron_common::api::external::CreateResult; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; +use std::net::SocketAddrV6; +use uuid::Uuid; impl DataStore { /// Stores a new dataset in the database. @@ -66,6 +78,60 @@ impl DataStore { }) } + /// Stores a new dataset in the database. + fn dataset_upsert_sync( + conn: &mut DbConnection, + dataset: Dataset, + ) -> CreateResult { + use db::schema::dataset::dsl; + + let zpool_id = dataset.pool_id; + Zpool::insert_resource( + zpool_id, + diesel::insert_into(dsl::dataset) + .values(dataset.clone()) + .on_conflict(dsl::id) + .do_update() + .set(( + dsl::time_modified.eq(Utc::now()), + dsl::pool_id.eq(excluded(dsl::pool_id)), + dsl::ip.eq(excluded(dsl::ip)), + dsl::port.eq(excluded(dsl::port)), + dsl::kind.eq(excluded(dsl::kind)), + )), + ) + .insert_and_get_result(conn) + .map_err(|e| match e { + SyncInsertError::CollectionNotFound => Error::ObjectNotFound { + type_name: ResourceType::Zpool, + lookup_type: LookupType::ById(zpool_id), + }, + SyncInsertError::DatabaseError(e) => { + public_error_from_diesel_create( + e, + ResourceType::Dataset, + &dataset.id().to_string(), + ) + } + }) + } + + pub async fn dataset_list( + &self, + opctx: &OpContext, + zpool_id: Uuid, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + use db::schema::dataset::dsl; + dsl::dataset + .filter(dsl::time_deleted.is_null()) + .filter(dsl::pool_id.eq(zpool_id)) + .select(Dataset::as_select()) + .load_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) + } + pub(super) fn get_allocatable_datasets_query() -> impl RunnableQuery { use db::schema::dataset::dsl; @@ -84,4 +150,137 @@ impl DataStore { .select(Dataset::as_select()) .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()) } + + fn sled_zpool_and_dataset_list_sync( + conn: &mut DbConnection, + rack_id: Uuid, + kind: DatasetKind, + ) -> Result)>, diesel::result::Error> + { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::sled::dsl as sled_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + db::schema::sled::table + .filter(sled_dsl::time_deleted.is_null()) + .filter(sled_dsl::rack_id.eq(rack_id)) + .inner_join( + db::schema::zpool::table.on(zpool_dsl::sled_id + .eq(sled_dsl::id) + .and(zpool_dsl::time_deleted.is_null())), + ) + .left_outer_join( + db::schema::dataset::table.on(dataset_dsl::pool_id + .eq(zpool_dsl::id) + .and(dataset_dsl::kind.eq(kind)) + .and(dataset_dsl::time_deleted.is_null())), + ) + .select(<(Sled, Zpool, Option)>::as_select()) + .get_results(conn) + } + + pub async fn ensure_rack_dataset( + &self, + opctx: &OpContext, + rack_id: Uuid, + kind: DatasetKind, + redundancy: DatasetRedundancy, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + #[derive(Debug)] + enum DatasetError { + NotEnoughZpools, + Other(Error), + } + type TxnError = TransactionError; + + self.pool() + .transaction(move |conn| { + let sleds_zpools_and_maybe_datasets = + Self::sled_zpool_and_dataset_list_sync( + conn, rack_id, kind, + )?; + + // Split the set of returned zpools into "those with" and "those + // without" the requested dataset. + let (zpools_with_dataset, zpools_without_dataset): ( + Vec<_>, + Vec<_>, + ) = sleds_zpools_and_maybe_datasets + .into_iter() + .partition(|(_, _, maybe_dataset)| maybe_dataset.is_some()); + let mut zpools_without_dataset = zpools_without_dataset + .into_iter() + .map(|(sled, zpool, _)| (sled, zpool)) + .peekable(); + + let mut datasets: Vec<_> = zpools_with_dataset + .into_iter() + .map(|(sled, zpool, maybe_dataset)| { + ( + sled, + zpool, + maybe_dataset.expect("Dataset should exist"), + ) + }) + .collect(); + + // Add datasets to zpools, in-order, until we've met a + // number sufficient for our redundancy. + // + // The selection of "which zpools contain this dataset" is completely + // arbitrary. + loop { + match redundancy { + DatasetRedundancy::OnAll => { + if zpools_without_dataset.peek().is_none() { + break; + } + } + DatasetRedundancy::PerRack(desired) => { + if datasets.len() >= (desired as usize) { + break; + } + } + }; + + let (sled, zpool) = + zpools_without_dataset.next().ok_or_else(|| { + TxnError::CustomError(DatasetError::NotEnoughZpools) + })?; + let dataset_id = Uuid::new_v4(); + let address = Self::next_ipv6_address_sync(conn, sled.id()) + .map_err(|e| { + TxnError::CustomError(DatasetError::Other(e)) + }) + .map(|ip| SocketAddrV6::new(ip, kind.port(), 0, 0))?; + + let dataset = db::model::Dataset::new( + dataset_id, + zpool.id(), + address, + kind, + ); + + let dataset = Self::dataset_upsert_sync(conn, dataset) + .map_err(|e| { + TxnError::CustomError(DatasetError::Other(e)) + })?; + datasets.push((sled, zpool, dataset)); + } + + return Ok(datasets); + }) + .await + .map_err(|e| match e { + TxnError::CustomError(DatasetError::NotEnoughZpools) => { + Error::unavail("Not enough zpools for dataset allocation") + } + TxnError::CustomError(DatasetError::Other(e)) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + }) + } } diff --git a/nexus/src/db/datastore/mod.rs b/nexus/src/db/datastore/mod.rs index 85e45b42f5d..0cc1a985e2a 100644 --- a/nexus/src/db/datastore/mod.rs +++ b/nexus/src/db/datastore/mod.rs @@ -24,7 +24,10 @@ use crate::authz; use crate::context::OpContext; use crate::db::{ self, - error::{public_error_from_diesel_pool, ErrorHandler}, + error::{ + public_error_from_diesel_lookup, public_error_from_diesel_pool, + ErrorHandler, + }, }; use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager}; use diesel::pg::Pg; @@ -92,6 +95,15 @@ impl RunnableQuery for T where { } +// Redundancy for the number of datasets to be provisioned. +#[derive(Clone, Copy, Debug)] +pub enum DatasetRedundancy { + // The dataset should exist on all zpools. + OnAll, + // The dataset should exist on at least this many zpools. + PerRack(u32), +} + pub struct DataStore { pool: Arc, } @@ -121,30 +133,61 @@ impl DataStore { Ok(self.pool.pool()) } - /// Return the next available IPv6 address for an Oxide service running on - /// the provided sled. - pub async fn next_ipv6_address( - &self, - opctx: &OpContext, + fn next_ipv6_address_query( sled_id: Uuid, - ) -> Result { + ) -> impl RunnableQuery { use db::schema::sled::dsl; - let net = diesel::update( + diesel::update( dsl::sled.find(sled_id).filter(dsl::time_deleted.is_null()), ) .set(dsl::last_used_address.eq(dsl::last_used_address + 1)) .returning(dsl::last_used_address) - .get_result_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByLookup( + } + + pub fn next_ipv6_address_sync( + conn: &mut DbConnection, + sled_id: Uuid, + ) -> Result { + let net = Self::next_ipv6_address_query(sled_id) + .get_result(conn) + .map_err(|e| { + public_error_from_diesel_lookup( + e, ResourceType::Sled, - LookupType::ById(sled_id), - ), - ) - })?; + &LookupType::ById(sled_id), + ) + })?; + + // TODO-correctness: We could ensure that this address is actually + // within the sled's underlay prefix, once that's included in the + // database record. + match net { + ipnetwork::IpNetwork::V6(net) => Ok(net.ip()), + _ => Err(Error::InternalError { + internal_message: String::from("Sled IP address must be IPv6"), + }), + } + } + + /// Return the next available IPv6 address for an Oxide service running on + /// the provided sled. + pub async fn next_ipv6_address( + &self, + opctx: &OpContext, + sled_id: Uuid, + ) -> Result { + let net = Self::next_ipv6_address_query(sled_id) + .get_result_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::Sled, + LookupType::ById(sled_id), + ), + ) + })?; // TODO-correctness: We need to ensure that this address is actually // within the sled's underlay prefix, once that's included in the @@ -241,15 +284,14 @@ mod test { use crate::external_api::params; use chrono::{Duration, Utc}; use nexus_test_utils::db::test_setup_database; + use omicron_common::address::{Ipv6Subnet, DNS_REDUNDANCY, RACK_PREFIX}; use omicron_common::api::external::{ ByteCount, Error, IdentityMetadataCreateParams, LookupType, Name, }; use omicron_test_utils::dev; use ref_cast::RefCast; - use std::collections::HashSet; - use std::net::Ipv6Addr; - use std::net::SocketAddrV6; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::collections::{HashMap, HashSet}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV6}; use std::sync::Arc; use uuid::Uuid; @@ -488,8 +530,7 @@ mod test { // ... and datasets within that zpool. let dataset_count = REGION_REDUNDANCY_THRESHOLD * 2; - let bogus_addr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let dataset_ids: Vec = (0..dataset_count).map(|_| Uuid::new_v4()).collect(); for id in &dataset_ids { @@ -570,8 +611,7 @@ mod test { // ... and datasets within that zpool. let dataset_count = REGION_REDUNDANCY_THRESHOLD; - let bogus_addr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let dataset_ids: Vec = (0..dataset_count).map(|_| Uuid::new_v4()).collect(); for id in &dataset_ids { @@ -637,8 +677,7 @@ mod test { // ... and datasets within that zpool. let dataset_count = REGION_REDUNDANCY_THRESHOLD - 1; - let bogus_addr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let dataset_ids: Vec = (0..dataset_count).map(|_| Uuid::new_v4()).collect(); for id in &dataset_ids { @@ -689,8 +728,7 @@ mod test { // ... and datasets within that zpool. let dataset_count = REGION_REDUNDANCY_THRESHOLD; - let bogus_addr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let dataset_ids: Vec = (0..dataset_count).map(|_| Uuid::new_v4()).collect(); for id in &dataset_ids { @@ -941,6 +979,497 @@ mod test { logctx.cleanup_successful(); } + #[tokio::test] + async fn test_ensure_rack_service() { + let logctx = dev::test_setup_log("test_ensure_rack_service"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create a sled on which the service should exist. + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled) + .await + .expect("Should be able to upsert sled"); + + // Ensure a service exists on the rack. + let services = datastore + .ensure_rack_service(&opctx, rack_id, ServiceKind::Nexus, 1) + .await + .expect("Should have allocated service"); + + // Only a single service was allocated, with the type / address we + // expect. + assert_eq!(1, services.len()); + assert_eq!(ServiceKind::Nexus, services[0].kind); + assert_eq!(sled_id, services[0].sled_id); + + // Listing services only shows this one. + let observed_services = datastore + .service_list(&opctx, sled_id) + .await + .expect("Should be able to list services"); + assert_eq!(1, observed_services.len()); + assert_eq!(services[0].id(), observed_services[0].id()); + + // Test that ensuring services is idempotent. + let services_again = datastore + .ensure_rack_service(&opctx, rack_id, ServiceKind::Nexus, 1) + .await + .expect("Should have allocated service"); + assert_eq!(services_again, services); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_rack_service_multi_sled() { + let logctx = dev::test_setup_log("test_ensure_rack_service_multi_sled"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let rack_id = Uuid::new_v4(); + + // Create sleds with distinct underlay subnets. + const SLED_COUNT: usize = 3; + let mut sleds = HashMap::new(); + for i in 0..SLED_COUNT { + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, i.try_into().unwrap(), 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled.clone()) + .await + .expect("Should be able to upsert sled"); + sleds.insert(sled.id(), sled); + } + + // Ensure a service exists on the rack, with some redundancy. + const NEXUS_COUNT: u32 = 3; + let mut services = datastore + .ensure_rack_service( + &opctx, + rack_id, + ServiceKind::Nexus, + NEXUS_COUNT, + ) + .await + .expect("Should have allocated service"); + services.sort_by(|a, b| a.id().cmp(&b.id())); + + assert_eq!(NEXUS_COUNT, services.len() as u32); + for svc in &services { + assert_eq!(ServiceKind::Nexus, svc.kind); + + // Each service should have been provisioned to a distinct sled. + let observed_services = datastore + .service_list(&opctx, svc.sled_id) + .await + .expect("Should be able to list services"); + assert_eq!(1, observed_services.len()); + assert_eq!(svc.id(), observed_services[0].id()); + } + + // Test that ensuring services is idempotent. + let mut services_again = datastore + .ensure_rack_service( + &opctx, + rack_id, + ServiceKind::Nexus, + NEXUS_COUNT, + ) + .await + .expect("Should have allocated service"); + services_again.sort_by(|a, b| a.id().cmp(&b.id())); + assert_eq!(services_again, services); + + // Ask for a different service type on the rack. + let oximeter_services = datastore + .ensure_rack_service(&opctx, rack_id, ServiceKind::Oximeter, 1) + .await + .expect("Should have allocated service"); + + // This should only return a single service + assert_eq!(1, oximeter_services.len()); + + // The target sled should contain both the nexus and oximeter services + let observed_services = datastore + .service_list(&opctx, oximeter_services[0].sled_id) + .await + .expect("Should be able to list services"); + assert_eq!(2, observed_services.len()); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_rack_service_not_enough_sleds() { + let logctx = + dev::test_setup_log("test_ensure_rack_service_not_enough_sleds"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create a sled on which the service should exist. + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled) + .await + .expect("Should be able to upsert sled"); + + // Try to request a redundancy which is larger than the number of sleds. + let err = datastore + .ensure_rack_service(&opctx, rack_id, ServiceKind::Nexus, 2) + .await + .expect_err("Should have failed to allocate service"); + + assert!( + matches!(err, Error::ServiceUnavailable { .. }), + "Error should have been ServiceUnavailable: {:?}", + err + ); + assert!( + err.to_string().contains("Not enough sleds"), + "Error should have identified 'Not enough sleds' as the cause: {:?}", err + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_dns_service() { + let logctx = dev::test_setup_log("test_ensure_dns_service"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create a sled on which the service should exist. + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled) + .await + .expect("Should be able to upsert sled"); + + let rack_subnet = Ipv6Subnet::::new(*sled_addr.ip()); + + // Ensure a service exists on the rack. + let services = datastore + .ensure_dns_service(&opctx, rack_subnet, 1) + .await + .expect("Should have allocated service"); + + // Only a single service was allocated, with the type / address we + // expect. + assert_eq!(1, services.len()); + assert_eq!(ServiceKind::InternalDNS, services[0].kind); + assert_eq!(sled_id, services[0].sled_id); + + // Listing services only shows this one. + let observed_services = datastore + .service_list(&opctx, sled_id) + .await + .expect("Should be able to list services"); + assert_eq!(1, observed_services.len()); + assert_eq!(services[0].id(), observed_services[0].id()); + + // Test that ensuring services is idempotent. + let services_again = datastore + .ensure_dns_service(&opctx, rack_subnet, 1) + .await + .expect("Should have allocated service"); + assert_eq!(services_again, services); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_dns_service_multi_sled() { + let logctx = dev::test_setup_log("test_ensure_dns_service_multi_sled"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let rack_id = Uuid::new_v4(); + + // Create sleds with distinct underlay subnets. + const SLED_COUNT: u32 = DNS_REDUNDANCY; + let mut sleds = HashMap::new(); + for i in 0..SLED_COUNT { + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, i.try_into().unwrap(), 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled.clone()) + .await + .expect("Should be able to upsert sled"); + sleds.insert(sled.id(), sled); + } + let rack_subnet = Ipv6Subnet::::new(Ipv6Addr::from( + sleds.values().next().unwrap().ip, + )); + + for sled in sleds.values() { + assert_eq!( + rack_subnet, + Ipv6Subnet::::new(Ipv6Addr::from(sled.ip)), + "Test pre-condition violated: All sleds must belong to the same rack" + ); + } + + // Ensure a service exists on the rack. + const DNS_COUNT: u32 = DNS_REDUNDANCY; + let mut services = datastore + .ensure_dns_service(&opctx, rack_subnet, DNS_COUNT) + .await + .expect("Should have allocated service"); + services.sort_by(|a, b| a.id().cmp(&b.id())); + + assert_eq!(DNS_COUNT, services.len() as u32); + for svc in &services { + assert_eq!(ServiceKind::InternalDNS, svc.kind); + + // Each service should have been provisioned to a distinct sled. + let observed_services = datastore + .service_list(&opctx, svc.sled_id) + .await + .expect("Should be able to list services"); + assert_eq!(1, observed_services.len()); + assert_eq!(svc.id(), observed_services[0].id()); + } + + // Test for idempotency + let mut services_again = datastore + .ensure_dns_service(&opctx, rack_subnet, DNS_COUNT) + .await + .expect("Should have allocated service"); + services_again.sort_by(|a, b| a.id().cmp(&b.id())); + assert_eq!(services_again, services); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_rack_dataset() { + let logctx = dev::test_setup_log("test_ensure_rack_dataset"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create a sled on which the dataset should exist. + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled) + .await + .expect("Should be able to upsert sled"); + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // Ensure a dataset exists on the rack. + let output = datastore + .ensure_rack_dataset( + &opctx, + rack_id, + DatasetKind::Crucible, + DatasetRedundancy::PerRack(1), + ) + .await + .expect("Should have allocated dataset"); + + // Observe that only a single dataset was allocated + assert_eq!(1, output.len()); + let (_, _, output_dataset) = &output[0]; + assert_eq!(DatasetKind::Crucible, output_dataset.kind); + assert_eq!(zpool_id, output_dataset.pool_id); + + // Listing datasets only shows this one. + let observed_datasets = datastore + .dataset_list(&opctx, zpool_id) + .await + .expect("Should be able to list datasets"); + assert_eq!(1, observed_datasets.len()); + assert_eq!(output_dataset.id(), observed_datasets[0].id()); + + // Test that ensuring datasets is idempotent. + let output_again = datastore + .ensure_rack_dataset( + &opctx, + rack_id, + DatasetKind::Crucible, + DatasetRedundancy::PerRack(1), + ) + .await + .expect("Should have allocated dataset"); + let (_, _, output_dataset_again) = &output_again[0]; + assert_eq!(output_dataset_again, output_dataset); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_rack_dataset_not_enough_zpools() { + let logctx = + dev::test_setup_log("test_ensure_rack_dataset_not_enough_zpools"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Create a sled on which the dataset should exist. + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled) + .await + .expect("Should be able to upsert sled"); + + // Attempt to allocate a dataset on a rack without zpools. + let err = datastore + .ensure_rack_dataset( + &opctx, + rack_id, + DatasetKind::Crucible, + DatasetRedundancy::PerRack(1), + ) + .await + .expect_err("Should not have allocated dataset"); + + assert!( + matches!(err, Error::ServiceUnavailable { .. }), + "Error should have been ServiceUnavailable: {:?}", + err + ); + assert!( + err.to_string().contains("Not enough zpools"), + "Error should have identified 'Not enough zpools' as the cause: {:?}", err + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_rack_dataset_multi_sled() { + let logctx = dev::test_setup_log("test_ensure_rack_dataset_multi_sled"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let rack_id = Uuid::new_v4(); + + // Create sleds with distinct underlay subnets. + const SLED_COUNT: usize = 3; + let mut sleds = HashMap::new(); + for i in 0..SLED_COUNT { + let sled_addr = SocketAddrV6::new( + Ipv6Addr::new(0xfd00, 0, 0, i.try_into().unwrap(), 0, 0, 0, 1), + 8080, + 0, + 0, + ); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, sled_addr.clone(), rack_id); + datastore + .sled_upsert(sled.clone()) + .await + .expect("Should be able to upsert sled"); + sleds.insert(sled.id(), sled); + create_test_zpool(&datastore, sled_id).await; + } + + // Ensure datasets exist on the rack. + let output = datastore + .ensure_rack_dataset( + &opctx, + rack_id, + DatasetKind::Crucible, + DatasetRedundancy::OnAll, + ) + .await + .expect("Should have allocated dataset"); + assert_eq!(SLED_COUNT, output.len()); + for (sled, zpool, dataset) in &output { + assert_eq!(DatasetKind::Crucible, dataset.kind); + assert_eq!(zpool.id(), dataset.pool_id); + assert_eq!(sled.id(), zpool.sled_id); + + let observed_datasets = datastore + .dataset_list(&opctx, zpool.id()) + .await + .expect("Should be able to list datasets"); + assert_eq!(1, observed_datasets.len()); + assert_eq!(dataset.id(), observed_datasets[0].id()) + } + + // Test that ensuring datasets is idempotent. + let output_again = datastore + .ensure_rack_dataset( + &opctx, + rack_id, + DatasetKind::Crucible, + DatasetRedundancy::OnAll, + ) + .await + .expect("Should have allocated dataset"); + + let mut output: Vec<_> = + output.into_iter().map(|(_, _, dataset)| dataset).collect(); + output.sort_by(|a, b| a.id().cmp(&b.id())); + let mut output_again: Vec<_> = + output_again.into_iter().map(|(_, _, dataset)| dataset).collect(); + output_again.sort_by(|a, b| a.id().cmp(&b.id())); + assert_eq!(output, output_again); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_rack_initialize_is_idempotent() { let logctx = dev::test_setup_log("test_rack_initialize_is_idempotent"); @@ -960,14 +1489,14 @@ mod test { // Initialize the Rack. let result = datastore - .rack_set_initialized(&opctx, rack.id(), vec![]) + .rack_set_initialized(&opctx, rack.id(), vec![], vec![]) .await .unwrap(); assert!(result.initialized); // Re-initialize the rack (check for idempotency) let result = datastore - .rack_set_initialized(&opctx, rack.id(), vec![]) + .rack_set_initialized(&opctx, rack.id(), vec![], vec![]) .await .unwrap(); assert!(result.initialized); diff --git a/nexus/src/db/datastore/rack.rs b/nexus/src/db/datastore/rack.rs index 06d298a4261..0e2a0dc9af0 100644 --- a/nexus/src/db/datastore/rack.rs +++ b/nexus/src/db/datastore/rack.rs @@ -16,9 +16,11 @@ use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; use crate::db::error::TransactionError; use crate::db::identity::Asset; +use crate::db::model::Dataset; use crate::db::model::Rack; use crate::db::model::Service; use crate::db::model::Sled; +use crate::db::model::Zpool; use crate::db::pagination::paginated; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; @@ -84,19 +86,29 @@ impl DataStore { opctx: &OpContext, rack_id: Uuid, services: Vec, + datasets: Vec, ) -> UpdateResult { use db::schema::rack::dsl as rack_dsl; - use db::schema::service::dsl as service_dsl; #[derive(Debug)] enum RackInitError { - ServiceInsert { err: SyncInsertError, sled_id: Uuid, svc_id: Uuid }, + ServiceInsert { + err: SyncInsertError, + sled_id: Uuid, + svc_id: Uuid, + }, + DatasetInsert { + err: SyncInsertError, + zpool_id: Uuid, + dataset_id: Uuid, + }, RackUpdate(diesel::result::Error), } type TxnError = TransactionError; // NOTE: This operation could likely be optimized with a CTE, but given // the low-frequency of calls, this optimization has been deferred. + let log = opctx.log.clone(); self.pool_authorized(opctx) .await? .transaction(move |conn| { @@ -109,25 +121,25 @@ impl DataStore { TxnError::CustomError(RackInitError::RackUpdate(e)) })?; if rack.initialized { + info!(log, "Early exit: Rack already initialized"); return Ok(rack); } - // Otherwise, insert services and set rack.initialized = true. + // Otherwise, insert services and datasets for svc in services { + use db::schema::service::dsl; let sled_id = svc.sled_id; >::insert_resource( sled_id, - diesel::insert_into(service_dsl::service) + diesel::insert_into(dsl::service) .values(svc.clone()) - .on_conflict(service_dsl::id) + .on_conflict(dsl::id) .do_update() .set(( - service_dsl::time_modified.eq(Utc::now()), - service_dsl::sled_id - .eq(excluded(service_dsl::sled_id)), - service_dsl::ip.eq(excluded(service_dsl::ip)), - service_dsl::kind - .eq(excluded(service_dsl::kind)), + dsl::time_modified.eq(Utc::now()), + dsl::sled_id.eq(excluded(dsl::sled_id)), + dsl::ip.eq(excluded(dsl::ip)), + dsl::kind.eq(excluded(dsl::kind)), )), ) .insert_and_get_result(conn) @@ -139,7 +151,37 @@ impl DataStore { }) })?; } - diesel::update(rack_dsl::rack) + info!(log, "Inserted services"); + for dataset in datasets { + use db::schema::dataset::dsl; + let zpool_id = dataset.pool_id; + >::insert_resource( + zpool_id, + diesel::insert_into(dsl::dataset) + .values(dataset.clone()) + .on_conflict(dsl::id) + .do_update() + .set(( + dsl::time_modified.eq(Utc::now()), + dsl::pool_id.eq(excluded(dsl::pool_id)), + dsl::ip.eq(excluded(dsl::ip)), + dsl::port.eq(excluded(dsl::port)), + dsl::kind.eq(excluded(dsl::kind)), + )), + ) + .insert_and_get_result(conn) + .map_err(|err| { + TxnError::CustomError(RackInitError::DatasetInsert { + err, + zpool_id, + dataset_id: dataset.id(), + }) + })?; + } + info!(log, "Inserted datasets"); + + // Set the rack to "initialized" once the handoff is complete + let rack = diesel::update(rack_dsl::rack) .filter(rack_dsl::id.eq(rack_id)) .set(( rack_dsl::initialized.eq(true), @@ -149,10 +191,31 @@ impl DataStore { .get_result::(conn) .map_err(|e| { TxnError::CustomError(RackInitError::RackUpdate(e)) - }) + })?; + info!(log, "Updated rack (set initialized to true)"); + Ok(rack) }) .await .map_err(|e| match e { + TxnError::CustomError(RackInitError::DatasetInsert { + err, + zpool_id, + dataset_id, + }) => match err { + SyncInsertError::CollectionNotFound => { + Error::ObjectNotFound { + type_name: ResourceType::Zpool, + lookup_type: LookupType::ById(zpool_id), + } + } + SyncInsertError::DatabaseError(e) => { + public_error_from_diesel_create( + e, + ResourceType::Dataset, + &dataset_id.to_string(), + ) + } + }, TxnError::CustomError(RackInitError::ServiceInsert { err, sled_id, diff --git a/nexus/src/db/datastore/service.rs b/nexus/src/db/datastore/service.rs index b2665b48cdd..a9fd19137a8 100644 --- a/nexus/src/db/datastore/service.rs +++ b/nexus/src/db/datastore/service.rs @@ -5,22 +5,35 @@ //! [`DataStore`] methods on [`Service`]s. use super::DataStore; +use crate::authz; use crate::context::OpContext; use crate::db; use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; +use crate::db::collection_insert::SyncInsertError; +use crate::db::error::public_error_from_diesel_create; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; +use crate::db::error::TransactionError; use crate::db::identity::Asset; use crate::db::model::Service; +use crate::db::model::ServiceKind; use crate::db::model::Sled; +use crate::db::pool::DbConnection; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use chrono::Utc; use diesel::prelude::*; use diesel::upsert::excluded; +use omicron_common::address::Ipv6Subnet; +use omicron_common::address::ReservedRackSubnet; +use omicron_common::address::DNS_REDUNDANCY; +use omicron_common::address::RACK_PREFIX; use omicron_common::api::external::CreateResult; use omicron_common::api::external::Error; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; +use std::net::Ipv6Addr; +use uuid::Uuid; impl DataStore { /// Stores a new service in the database. @@ -63,4 +76,276 @@ impl DataStore { } }) } + + fn service_upsert_sync( + conn: &mut DbConnection, + service: Service, + ) -> CreateResult { + use db::schema::service::dsl; + + let sled_id = service.sled_id; + Sled::insert_resource( + sled_id, + diesel::insert_into(dsl::service) + .values(service.clone()) + .on_conflict(dsl::id) + .do_update() + .set(( + dsl::time_modified.eq(Utc::now()), + dsl::sled_id.eq(excluded(dsl::sled_id)), + dsl::ip.eq(excluded(dsl::ip)), + dsl::kind.eq(excluded(dsl::kind)), + )), + ) + .insert_and_get_result(conn) + .map_err(|e| match e { + SyncInsertError::CollectionNotFound => Error::ObjectNotFound { + type_name: ResourceType::Sled, + lookup_type: LookupType::ById(sled_id), + }, + SyncInsertError::DatabaseError(e) => { + public_error_from_diesel_create( + e, + ResourceType::Service, + &service.id().to_string(), + ) + } + }) + } + + fn sled_list_with_limit_sync( + conn: &mut DbConnection, + limit: u32, + ) -> Result, diesel::result::Error> { + use db::schema::sled::dsl; + dsl::sled + .filter(dsl::time_deleted.is_null()) + .limit(limit as i64) + .select(Sled::as_select()) + .load(conn) + } + + pub async fn service_list( + &self, + opctx: &OpContext, + sled_id: Uuid, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + use db::schema::service::dsl; + dsl::service + .filter(dsl::sled_id.eq(sled_id)) + .select(Service::as_select()) + .load_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) + } + + fn sled_and_service_list_sync( + conn: &mut DbConnection, + rack_id: Uuid, + kind: ServiceKind, + ) -> Result)>, diesel::result::Error> { + use db::schema::service::dsl as svc_dsl; + use db::schema::sled::dsl as sled_dsl; + + db::schema::sled::table + .filter(sled_dsl::time_deleted.is_null()) + .filter(sled_dsl::rack_id.eq(rack_id)) + .left_outer_join(db::schema::service::table.on( + svc_dsl::sled_id.eq(sled_dsl::id).and(svc_dsl::kind.eq(kind)), + )) + .select(<(Sled, Option)>::as_select()) + .get_results(conn) + } + + pub async fn ensure_rack_service( + &self, + opctx: &OpContext, + rack_id: Uuid, + kind: ServiceKind, + redundancy: u32, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + #[derive(Debug)] + enum ServiceError { + NotEnoughSleds, + Other(Error), + } + type TxnError = TransactionError; + + self.pool() + .transaction(move |conn| { + let sleds_and_maybe_svcs = + Self::sled_and_service_list_sync(conn, rack_id, kind)?; + + // Split the set of returned sleds into "those with" and "those + // without" the requested service. + let (sleds_with_svc, sleds_without_svc): (Vec<_>, Vec<_>) = + sleds_and_maybe_svcs + .into_iter() + .partition(|(_, maybe_svc)| maybe_svc.is_some()); + // Identify sleds without services (targets for future + // allocation). + let mut sleds_without_svc = + sleds_without_svc.into_iter().map(|(sled, _)| sled); + + // Identify sleds with services (part of output). + let mut svcs: Vec<_> = sleds_with_svc + .into_iter() + .map(|(_, maybe_svc)| { + maybe_svc.expect( + "Should have filtered by sleds with the service", + ) + }) + .collect(); + + // Add services to sleds, in-order, until we've met a + // number sufficient for our redundancy. + // + // The selection of "which sleds run this service" is completely + // arbitrary. + while svcs.len() < (redundancy as usize) { + let sled = sleds_without_svc.next().ok_or_else(|| { + TxnError::CustomError(ServiceError::NotEnoughSleds) + })?; + let svc_id = Uuid::new_v4(); + let address = Self::next_ipv6_address_sync(conn, sled.id()) + .map_err(|e| { + TxnError::CustomError(ServiceError::Other(e)) + })?; + + let service = db::model::Service::new( + svc_id, + sled.id(), + address, + kind, + ); + + let svc = Self::service_upsert_sync(conn, service) + .map_err(|e| { + TxnError::CustomError(ServiceError::Other(e)) + })?; + svcs.push(svc); + } + + return Ok(svcs); + }) + .await + .map_err(|e| match e { + TxnError::CustomError(ServiceError::NotEnoughSleds) => { + Error::unavail("Not enough sleds for service allocation") + } + TxnError::CustomError(ServiceError::Other(e)) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + }) + } + + pub async fn ensure_dns_service( + &self, + opctx: &OpContext, + rack_subnet: Ipv6Subnet, + redundancy: u32, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + #[derive(Debug)] + enum ServiceError { + NotEnoughSleds, + NotEnoughIps, + Other(Error), + } + type TxnError = TransactionError; + + self.pool() + .transaction(move |conn| { + let mut svcs = Self::dns_service_list_sync(conn)?; + + // Get all subnets not allocated to existing services. + let mut usable_dns_subnets = ReservedRackSubnet(rack_subnet) + .get_dns_subnets() + .into_iter() + .filter(|subnet| { + // If any existing services are using this address, + // skip it. + !svcs.iter().any(|svc| { + Ipv6Addr::from(svc.ip) == subnet.dns_address().ip() + }) + }) + .collect::>() + .into_iter(); + + // Get all sleds which aren't already running DNS services. + let mut target_sleds = + Self::sled_list_with_limit_sync(conn, redundancy)? + .into_iter() + .filter(|sled| { + // The target sleds are only considered if they aren't already + // running a DNS service. + svcs.iter().all(|svc| svc.sled_id != sled.id()) + }) + .collect::>() + .into_iter(); + + while svcs.len() < (redundancy as usize) { + let sled = target_sleds.next().ok_or_else(|| { + TxnError::CustomError(ServiceError::NotEnoughSleds) + })?; + let svc_id = Uuid::new_v4(); + let dns_subnet = + usable_dns_subnets.next().ok_or_else(|| { + TxnError::CustomError(ServiceError::NotEnoughIps) + })?; + let address = dns_subnet.dns_address().ip(); + + // TODO: How are we tracking the GZ address that must be + // allocated? They're tracked by the "DnsSubnet" object + // in address.rs, but I don't think they're getting + // propagated out of here. + let service = db::model::Service::new( + svc_id, + sled.id(), + address, + ServiceKind::InternalDNS, + ); + + let svc = Self::service_upsert_sync(conn, service) + .map_err(|e| { + TxnError::CustomError(ServiceError::Other(e)) + })?; + + svcs.push(svc); + } + return Ok(svcs); + }) + .await + .map_err(|e| match e { + TxnError::CustomError(ServiceError::NotEnoughSleds) => { + Error::unavail("Not enough sleds for service allocation") + } + TxnError::CustomError(ServiceError::NotEnoughIps) => { + Error::unavail( + "Not enough IP addresses for service allocation", + ) + } + TxnError::CustomError(ServiceError::Other(e)) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + }) + } + + fn dns_service_list_sync( + conn: &mut DbConnection, + ) -> Result, diesel::result::Error> { + use db::schema::service::dsl as svc; + + svc::service + .filter(svc::kind.eq(ServiceKind::InternalDNS)) + .limit(DNS_REDUNDANCY.into()) + .select(Service::as_select()) + .get_results(conn) + } } diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index fdfeb5effb4..1462ae11814 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -34,6 +34,7 @@ address = "[::1]:0" [tunables] # Allow small subnets, so we can test IP address exhaustion easily / quickly max_vpc_ipv4_subnet_prefix = 29 +enable_background_tasks = false [deployment] # Identifier for this instance of Nexus. diff --git a/nexus/tests/integration_tests/datasets.rs b/nexus/tests/integration_tests/datasets.rs index 42b0d48a847..721e9f6f382 100644 --- a/nexus/tests/integration_tests/datasets.rs +++ b/nexus/tests/integration_tests/datasets.rs @@ -8,7 +8,7 @@ use omicron_common::api::external::ByteCount; use omicron_nexus::internal_api::params::{ DatasetKind, DatasetPutRequest, ZpoolPutRequest, }; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{Ipv6Addr, SocketAddrV6}; use uuid::Uuid; use nexus_test_utils::{ControlPlaneTestContext, SLED_AGENT_UUID}; @@ -36,8 +36,7 @@ async fn test_dataset_put_success(cptestctx: &ControlPlaneTestContext) { .await .unwrap(); - let address = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let address = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let kind = DatasetKind::Crucible; let request = DatasetPutRequest { address, kind }; let dataset_id = Uuid::new_v4(); @@ -69,8 +68,7 @@ async fn test_dataset_put_bad_zpool_returns_not_found( let dataset_put_url = format!("/zpools/{}/dataset/{}", zpool_id, dataset_id); - let address = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let address = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); let kind = DatasetKind::Crucible; let request = DatasetPutRequest { address, kind }; diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index 4e111cd06dd..038e74afc72 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -74,7 +74,7 @@ impl FromStr for DatasetKind { pub struct DatasetPutRequest { /// Address on which a service is responding to requests for the /// dataset. - pub address: SocketAddr, + pub address: SocketAddrV6, /// Type of dataset being inserted. pub kind: DatasetKind, diff --git a/smf/nexus/config-partial.toml b/smf/nexus/config-partial.toml index c2ba69e5384..66cb2bd24eb 100644 --- a/smf/nexus/config-partial.toml +++ b/smf/nexus/config-partial.toml @@ -23,3 +23,7 @@ if_exists = "append" # Configuration for interacting with the timeseries database [timeseries_db] address = "[fd00:1122:3344:0101::5]:8123" + +[tunables] +# TODO: Remove when RSS transfer to Nexus is fully fleshed out +enable_background_tasks = false