diff --git a/nexus/db-model/src/queries/region_allocation.rs b/nexus/db-model/src/queries/region_allocation.rs index b150b05377b..43fac3c9a6c 100644 --- a/nexus/db-model/src/queries/region_allocation.rs +++ b/nexus/db-model/src/queries/region_allocation.rs @@ -47,13 +47,6 @@ table! { } } -table! { - candidate_zpools { - id -> Uuid, - total_size -> Int8, - } -} - table! { candidate_regions { id -> Uuid, @@ -69,13 +62,6 @@ table! { } } -table! { - zpool_size_delta (pool_id) { - pool_id -> Uuid, - size_used_delta -> Numeric, - } -} - table! { proposed_dataset_changes { id -> Uuid, @@ -92,8 +78,8 @@ table! { } table! { - proposed_datasets_fit (fits) { - fits -> Bool, + candidate_zpools (pool_id) { + pool_id -> Uuid } } @@ -136,8 +122,6 @@ table! { } } -diesel::allow_tables_to_appear_in_same_query!(candidate_datasets, zpool,); - diesel::allow_tables_to_appear_in_same_query!( proposed_dataset_changes, dataset, @@ -150,12 +134,9 @@ diesel::allow_tables_to_appear_in_same_query!( zpool, ); -diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, dataset,); - diesel::allow_tables_to_appear_in_same_query!( old_zpool_usage, zpool, - zpool_size_delta, proposed_dataset_changes, ); @@ -165,3 +146,26 @@ diesel::allow_tables_to_appear_in_same_query!( inserted_regions, updated_datasets, ); + +diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, dataset,); +diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, zpool,); + +// == Needed for random region allocation == + +pub mod cockroach_md5 { + pub mod functions { + use diesel::sql_types::*; + diesel::sql_function!(fn md5(x: Bytea) -> Bytea); + } + + pub mod helper_types { + pub type Md5 = super::functions::md5::HelperType; + } + + pub mod dsl { + pub use super::functions::*; + pub use super::helper_types::*; + } +} + +// == End random region allocation dependencies == diff --git a/nexus/db-model/src/region.rs b/nexus/db-model/src/region.rs index 5fcbaddb4a9..fefc4f4fce4 100644 --- a/nexus/db-model/src/region.rs +++ b/nexus/db-model/src/region.rs @@ -33,6 +33,9 @@ pub struct Region { volume_id: Uuid, block_size: ByteCount, + + // These are i64 only so that we can derive a diesel table from them. We + // never expect them to be negative. blocks_per_extent: i64, extent_count: i64, } @@ -42,16 +45,16 @@ impl Region { dataset_id: Uuid, volume_id: Uuid, block_size: ByteCount, - blocks_per_extent: i64, - extent_count: i64, + blocks_per_extent: u64, + extent_count: u64, ) -> Self { Self { identity: RegionIdentity::new(Uuid::new_v4()), dataset_id, volume_id, block_size, - blocks_per_extent, - extent_count, + blocks_per_extent: blocks_per_extent as i64, + extent_count: extent_count as i64, } } @@ -64,11 +67,11 @@ impl Region { pub fn block_size(&self) -> external::ByteCount { self.block_size.0 } - pub fn blocks_per_extent(&self) -> i64 { - self.blocks_per_extent + pub fn blocks_per_extent(&self) -> u64 { + self.blocks_per_extent as u64 } - pub fn extent_count(&self) -> i64 { - self.extent_count + pub fn extent_count(&self) -> u64 { + self.extent_count as u64 } pub fn encrypted(&self) -> bool { // Per RFD 29, data is always encrypted at rest, and support for diff --git a/nexus/db-queries/src/db/cast_uuid_as_bytea.rs b/nexus/db-queries/src/db/cast_uuid_as_bytea.rs new file mode 100644 index 00000000000..c50c88971fb --- /dev/null +++ b/nexus/db-queries/src/db/cast_uuid_as_bytea.rs @@ -0,0 +1,62 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Cast UUID to BYTES + +use diesel::expression::ValidGrouping; +use diesel::pg::Pg; +use diesel::query_builder::AstPass; +use diesel::query_builder::QueryFragment; +use diesel::query_builder::QueryId; +use diesel::Expression; +use diesel::SelectableExpression; + +/// Cast an expression which evaluates to a Uuid and cast it to a Bytea. It's +/// that simple! +#[derive(ValidGrouping, QueryId)] +pub struct CastUuidToBytea { + expression: E, +} + +impl CastUuidToBytea +where + E: Expression, +{ + pub const fn new(expression: E) -> Self { + Self { expression } + } +} + +impl Expression for CastUuidToBytea +where + E: Expression, +{ + type SqlType = diesel::sql_types::Bytea; +} + +impl diesel::AppearsOnTable for CastUuidToBytea where + E: diesel::AppearsOnTable +{ +} + +impl SelectableExpression for CastUuidToBytea where + E: SelectableExpression +{ +} + +impl QueryFragment for CastUuidToBytea +where + E: QueryFragment, +{ + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.push_sql("CAST("); + self.expression.walk_ast(out.reborrow())?; + out.push_sql(" as BYTEA)"); + + Ok(()) + } +} diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 7d000ad8bd3..06373fdb587 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -270,6 +270,43 @@ pub enum UpdatePrecondition { Value(T), } +/// Defines a strategy for choosing what physical disks to use when allocating +/// new crucible regions. +/// +/// NOTE: More strategies can - and should! - be added. +/// +/// See for a more +/// complete discussion. +/// +/// Longer-term, we should consider: +/// - Storage size + remaining free space +/// - Sled placement of datasets +/// - What sort of loads we'd like to create (even split across all disks +/// may not be preferable, especially if maintenance is expected) +#[derive(Debug, Clone)] +pub enum RegionAllocationStrategy { + /// Choose disks that have the least data usage in the rack. This strategy + /// can lead to bad failure states wherein the disks with the least usage + /// have the least usage because regions on them are actually failing in + /// some way. Further retried allocations will then continue to try to + /// allocate onto the disk, perpetuating the problem. Currently this + /// strategy only exists so we can test that using different allocation + /// strategies actually results in different allocation patterns, hence the + /// `#[cfg(test)]`. + /// + /// See https://github.com/oxidecomputer/omicron/issues/3416 for more on the + /// failure-states associated with this strategy + #[cfg(test)] + LeastUsedDisk, + + /// Choose disks pseudo-randomly. An optional seed may be provided to make + /// the ordering deterministic, otherwise the current time in nanoseconds + /// will be used. Ordering is based on sorting the output of `md5(UUID of + /// candidate dataset + seed)`. The seed does not need to come from a + /// cryptographically secure source. + Random(Option), +} + /// Constructs a DataStore for use in test suites that has preloaded the /// built-in users, roles, and role assignments that are needed for basic /// operation @@ -338,6 +375,8 @@ mod test { use crate::db::queries::vpc_subnet::FilterConflictingVpcSubnetRangesQuery; use assert_matches::assert_matches; use chrono::{Duration, Utc}; + use futures::stream; + use futures::StreamExt; use nexus_test_utils::db::test_setup_database; use nexus_types::external_api::params; use omicron_common::api::external::DataPageParams; @@ -627,105 +666,133 @@ mod test { } } - #[tokio::test] - async fn test_region_allocation() { - let logctx = dev::test_setup_log("test_region_allocation"); - let mut db = test_setup_database(&logctx.log).await; - let (opctx, datastore) = datastore_test(&logctx, &db).await; + async fn create_test_datasets_for_region_allocation( + opctx: &OpContext, + datastore: Arc, + ) -> Vec { + // Create sleds... + let sled_ids: Vec = stream::iter(0..REGION_REDUNDANCY_THRESHOLD) + .then(|_| create_test_sled(&datastore)) + .collect() + .await; - // Create a sled... - let sled_id = create_test_sled(&datastore).await; + struct PhysicalDisk { + sled_id: Uuid, + disk_id: Uuid, + } - // ... and a disk on that sled... - let physical_disk_id = create_test_physical_disk( - &datastore, - &opctx, - sled_id, - PhysicalDiskKind::U2, - ) - .await; + // create 9 disks on each sled + let physical_disks: Vec = stream::iter(sled_ids) + .map(|sled_id| { + let sled_id_iter: Vec = (0..9).map(|_| sled_id).collect(); + stream::iter(sled_id_iter).then(|sled_id| { + let disk_id_future = create_test_physical_disk( + &datastore, + opctx, + sled_id, + PhysicalDiskKind::U2, + ); + async move { + let disk_id = disk_id_future.await; + PhysicalDisk { sled_id, disk_id } + } + }) + }) + .flatten() + .collect() + .await; - // ... and a zpool within that disk... - let zpool_id = - create_test_zpool(&datastore, sled_id, physical_disk_id).await; + // 1 pool per disk + let zpool_ids: Vec = stream::iter(physical_disks) + .then(|disk| { + create_test_zpool(&datastore, disk.sled_id, disk.disk_id) + }) + .collect() + .await; - // ... and datasets within that zpool. - let dataset_count = REGION_REDUNDANCY_THRESHOLD * 2; let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); - let dataset_ids: Vec = - (0..dataset_count).map(|_| Uuid::new_v4()).collect(); - for id in &dataset_ids { - let dataset = - Dataset::new(*id, zpool_id, bogus_addr, DatasetKind::Crucible); - datastore.dataset_upsert(dataset).await.unwrap(); - } - // Allocate regions from the datasets for this disk. - let params = create_test_disk_create_params( - "disk1", - ByteCount::from_mebibytes_u32(500), - ); - let volume1_id = Uuid::new_v4(); - - // Currently, we only allocate one Region Set per volume. - let expected_region_count = REGION_REDUNDANCY_THRESHOLD; - let dataset_and_regions = datastore - .region_allocate( - &opctx, - volume1_id, - ¶ms.disk_source, - params.size, - ) - .await - .unwrap(); + // 1 dataset per zpool + let dataset_ids: Vec = stream::iter(zpool_ids) + .then(|zpool_id| { + let id = Uuid::new_v4(); + let dataset = Dataset::new( + id, + zpool_id, + bogus_addr, + DatasetKind::Crucible, + ); + let datastore = datastore.clone(); + async move { + datastore.dataset_upsert(dataset).await.unwrap(); + id + } + }) + .collect() + .await; - // Verify the allocation. - assert_eq!(expected_region_count, dataset_and_regions.len()); - let mut disk1_datasets = HashSet::new(); - for (dataset, region) in dataset_and_regions { - assert!(disk1_datasets.insert(dataset.id())); - assert_eq!(volume1_id, region.volume_id()); - assert_eq!(ByteCount::from(4096), region.block_size()); - let (_, extent_count) = DataStore::get_crucible_allocation( - &BlockSize::AdvancedFormat, - params.size, - ); - assert_eq!(extent_count, region.extent_count()); - } + dataset_ids + } - // Allocate regions for a second disk. Observe that we allocate from - // the three previously unused datasets. - let params = create_test_disk_create_params( - "disk2", - ByteCount::from_mebibytes_u32(500), - ); - let volume2_id = Uuid::new_v4(); - let dataset_and_regions = datastore - .region_allocate( - &opctx, - volume2_id, - ¶ms.disk_source, - params.size, - ) - .await - .unwrap(); + #[tokio::test] + /// Note that this test is currently non-deterministic. It can be made + /// deterministic by generating deterministic *dataset* Uuids. The sled and + /// pool IDs should not matter. + async fn test_region_allocation() { + let logctx = dev::test_setup_log("test_region_allocation"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + create_test_datasets_for_region_allocation(&opctx, datastore.clone()) + .await; - assert_eq!(expected_region_count, dataset_and_regions.len()); - let mut disk2_datasets = HashSet::new(); - for (dataset, region) in dataset_and_regions { - assert!(disk2_datasets.insert(dataset.id())); - assert_eq!(volume2_id, region.volume_id()); - assert_eq!(ByteCount::from(4096), region.block_size()); - let (_, extent_count) = DataStore::get_crucible_allocation( - &BlockSize::AdvancedFormat, - params.size, + // Allocate regions from the datasets for this disk. Do it a few times + // for good measure. + for alloc_seed in 0..10 { + let params = create_test_disk_create_params( + &format!("disk{}", alloc_seed), + ByteCount::from_mebibytes_u32(1), ); - assert_eq!(extent_count, region.extent_count()); - } + let volume_id = Uuid::new_v4(); + + let expected_region_count = REGION_REDUNDANCY_THRESHOLD; + let dataset_and_regions = datastore + .region_allocate( + &opctx, + volume_id, + ¶ms.disk_source, + params.size, + &RegionAllocationStrategy::Random(Some(alloc_seed as u128)), + ) + .await + .unwrap(); - // Double-check that the datasets used for the first disk weren't - // used when allocating the second disk. - assert_eq!(0, disk1_datasets.intersection(&disk2_datasets).count()); + // Verify the allocation. + assert_eq!(expected_region_count, dataset_and_regions.len()); + let mut disk_datasets = HashSet::new(); + let mut disk_zpools = HashSet::new(); + + // TODO: When allocation chooses 3 distinct sleds, uncomment this. + // let mut disk1_sleds = HashSet::new(); + for (dataset, region) in dataset_and_regions { + // Must be 3 unique datasets + assert!(disk_datasets.insert(dataset.id())); + + // Must be 3 unique zpools + assert!(disk_zpools.insert(dataset.pool_id)); + + // Must be 3 unique sleds + // TODO: When allocation chooses 3 distinct sleds, uncomment this. + // assert!(disk1_sleds.insert(Err(dataset))); + + assert_eq!(volume_id, region.volume_id()); + assert_eq!(ByteCount::from(4096), region.block_size()); + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::AdvancedFormat, + params.size, + ); + assert_eq!(extent_count, region.extent_count()); + } + } let _ = db.cleanup().await; logctx.cleanup_successful(); @@ -737,33 +804,8 @@ mod test { dev::test_setup_log("test_region_allocation_is_idempotent"); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; - - // Create a sled... - let sled_id = create_test_sled(&datastore).await; - - // ... and a disk on that sled... - let physical_disk_id = create_test_physical_disk( - &datastore, - &opctx, - sled_id, - PhysicalDiskKind::U2, - ) - .await; - - // ... and a zpool within that disk... - let zpool_id = - create_test_zpool(&datastore, sled_id, physical_disk_id).await; - - // ... and datasets within that zpool. - let dataset_count = REGION_REDUNDANCY_THRESHOLD; - let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); - let dataset_ids: Vec = - (0..dataset_count).map(|_| Uuid::new_v4()).collect(); - for id in &dataset_ids { - let dataset = - Dataset::new(*id, zpool_id, bogus_addr, DatasetKind::Crucible); - datastore.dataset_upsert(dataset).await.unwrap(); - } + create_test_datasets_for_region_allocation(&opctx, datastore.clone()) + .await; // Allocate regions from the datasets for this volume. let params = create_test_disk_create_params( @@ -777,15 +819,20 @@ mod test { volume_id, ¶ms.disk_source, params.size, + &RegionAllocationStrategy::Random(Some(0)), ) .await .unwrap(); + + // Use a different allocation ordering to ensure we're idempotent even + // if the shuffle changes. let mut dataset_and_regions2 = datastore .region_allocate( &opctx, volume_id, ¶ms.disk_source, params.size, + &RegionAllocationStrategy::Random(Some(1)), ) .await .unwrap(); @@ -814,9 +861,9 @@ mod test { } #[tokio::test] - async fn test_region_allocation_not_enough_datasets() { + async fn test_region_allocation_not_enough_zpools() { let logctx = - dev::test_setup_log("test_region_allocation_not_enough_datasets"); + dev::test_setup_log("test_region_allocation_not_enough_zpools"); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; @@ -832,20 +879,35 @@ mod test { ) .await; - // ... and a zpool within that disk... - let zpool_id = - create_test_zpool(&datastore, sled_id, physical_disk_id).await; + // 1 less than REDUNDANCY level of zpools + let zpool_ids: Vec = + stream::iter(0..REGION_REDUNDANCY_THRESHOLD - 1) + .then(|_| { + create_test_zpool(&datastore, sled_id, physical_disk_id) + }) + .collect() + .await; - // ... and datasets within that zpool. - let dataset_count = REGION_REDUNDANCY_THRESHOLD - 1; let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); - let dataset_ids: Vec = - (0..dataset_count).map(|_| Uuid::new_v4()).collect(); - for id in &dataset_ids { - let dataset = - Dataset::new(*id, zpool_id, bogus_addr, DatasetKind::Crucible); - datastore.dataset_upsert(dataset).await.unwrap(); - } + + // 1 dataset per zpool + stream::iter(zpool_ids) + .then(|zpool_id| { + let id = Uuid::new_v4(); + let dataset = Dataset::new( + id, + zpool_id, + bogus_addr, + DatasetKind::Crucible, + ); + let datastore = datastore.clone(); + async move { + datastore.dataset_upsert(dataset).await.unwrap(); + id + } + }) + .collect::>() + .await; // Allocate regions from the datasets for this volume. let params = create_test_disk_create_params( @@ -859,11 +921,12 @@ mod test { volume1_id, ¶ms.disk_source, params.size, + &RegionAllocationStrategy::Random(Some(0)), ) .await .unwrap_err(); - let expected = "Not enough datasets to allocate disks"; + let expected = "Not enough zpool space to allocate disks"; assert!( err.to_string().contains(expected), "Saw error: \'{err}\', but expected \'{expected}\'" @@ -882,39 +945,12 @@ mod test { let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; - // Create a sled... - let sled_id = create_test_sled(&datastore).await; - - // ... and a disk on that sled... - let physical_disk_id = create_test_physical_disk( - &datastore, - &opctx, - sled_id, - PhysicalDiskKind::U2, - ) - .await; - - // ... and a zpool within that disk... - let zpool_id = - create_test_zpool(&datastore, sled_id, physical_disk_id).await; - - // ... and datasets within that zpool. - let dataset_count = REGION_REDUNDANCY_THRESHOLD; - let bogus_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); - let dataset_ids: Vec = - (0..dataset_count).map(|_| Uuid::new_v4()).collect(); - for id in &dataset_ids { - let dataset = - Dataset::new(*id, zpool_id, bogus_addr, DatasetKind::Crucible); - datastore.dataset_upsert(dataset).await.unwrap(); - } + create_test_datasets_for_region_allocation(&opctx, datastore.clone()) + .await; - // Allocate regions from the datasets for this disk. - // - // Note that we ask for a disk which is as large as the zpool, - // so we shouldn't have space for redundancy. let disk_size = test_zpool_size(); - let params = create_test_disk_create_params("disk1", disk_size); + let alloc_size = ByteCount::try_from(disk_size.to_bytes() * 2).unwrap(); + let params = create_test_disk_create_params("disk1", alloc_size); let volume1_id = Uuid::new_v4(); assert!(datastore @@ -922,7 +958,8 @@ mod test { &opctx, volume1_id, ¶ms.disk_source, - params.size + params.size, + &RegionAllocationStrategy::Random(Some(0)), ) .await .is_err()); diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index fc0e3ce0682..c39f7ceb409 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -5,6 +5,7 @@ //! [`DataStore`] methods on [`Region`]s. use super::DataStore; +use super::RegionAllocationStrategy; use super::RunnableQuery; use crate::context::OpContext; use crate::db; @@ -88,17 +89,17 @@ impl DataStore { // TODO for now, extent size is fixed at 64 MiB. In the future, this may be // tunable at runtime. - pub const EXTENT_SIZE: i64 = 64_i64 << 20; + pub const EXTENT_SIZE: u64 = 64_u64 << 20; /// Given a block size and total disk size, get Crucible allocation values pub fn get_crucible_allocation( block_size: &db::model::BlockSize, size: external::ByteCount, - ) -> (i64, i64) { + ) -> (u64, u64) { let blocks_per_extent = - Self::EXTENT_SIZE / block_size.to_bytes() as i64; + Self::EXTENT_SIZE / block_size.to_bytes() as u64; - let size = size.to_bytes() as i64; + let size = size.to_bytes(); // allocate enough extents to fit all the disk blocks, rounding up. let extent_count = size / Self::EXTENT_SIZE @@ -118,27 +119,8 @@ impl DataStore { volume_id: Uuid, disk_source: ¶ms::DiskSource, size: external::ByteCount, + allocation_strategy: &RegionAllocationStrategy, ) -> Result, Error> { - // ALLOCATION POLICY - // - // NOTE: This policy can - and should! - be changed. - // - // See https://rfd.shared.oxide.computer/rfd/0205 for a more - // complete discussion. - // - // It is currently acting as a placeholder, showing a feasible - // interaction between datasets and regions. - // - // This policy allocates regions to distinct Crucible datasets, - // favoring datasets with the smallest existing (summed) region - // sizes. Basically, "pick the datasets with the smallest load first". - // - // Longer-term, we should consider: - // - Storage size + remaining free space - // - Sled placement of datasets - // - What sort of loads we'd like to create (even split across all disks - // may not be preferable, especially if maintenance is expected) - let block_size = self.get_block_size_from_disk_source(opctx, &disk_source).await?; let (blocks_per_extent, extent_count) = @@ -147,9 +129,10 @@ impl DataStore { let dataset_and_regions: Vec<(Dataset, Region)> = crate::db::queries::region_allocation::RegionAllocate::new( volume_id, - block_size.into(), + block_size.to_bytes() as u64, blocks_per_extent, extent_count, + allocation_strategy, ) .get_results_async(self.pool()) .await diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index 1e4e8afbe89..8b7424a0565 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -5,6 +5,7 @@ //! Facilities for working with the Omicron database pub(crate) mod alias; +pub(crate) mod cast_uuid_as_bytea; // This is not intended to be public, but this is necessary to use it from // doctests pub mod collection_attach; diff --git a/nexus/db-queries/src/db/queries/region_allocation.rs b/nexus/db-queries/src/db/queries/region_allocation.rs index 4c76689cffc..674a525c5c8 100644 --- a/nexus/db-queries/src/db/queries/region_allocation.rs +++ b/nexus/db-queries/src/db/queries/region_allocation.rs @@ -5,6 +5,8 @@ //! Implementation of queries for provisioning regions. use crate::db::alias::ExpressionAlias; +use crate::db::cast_uuid_as_bytea::CastUuidToBytea; +use crate::db::datastore::RegionAllocationStrategy; use crate::db::datastore::REGION_REDUNDANCY_THRESHOLD; use crate::db::model::{Dataset, DatasetKind, Region}; use crate::db::pool::DbConnection; @@ -13,30 +15,35 @@ use crate::db::true_or_cast_error::{matches_sentinel, TrueOrCastError}; use db_macros::Subquery; use diesel::pg::Pg; use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; +use diesel::PgBinaryExpressionMethods; use diesel::{ sql_types, BoolExpressionMethods, Column, CombineDsl, ExpressionMethods, Insertable, IntoSql, JoinOnDsl, NullableExpressionMethods, QueryDsl, RunQueryDsl, }; use nexus_db_model::queries::region_allocation::{ - candidate_datasets, candidate_regions, candidate_zpools, do_insert, - inserted_regions, old_regions, old_zpool_usage, proposed_dataset_changes, - proposed_datasets_fit, updated_datasets, zpool_size_delta, + candidate_datasets, candidate_regions, candidate_zpools, cockroach_md5, + do_insert, inserted_regions, old_regions, old_zpool_usage, + proposed_dataset_changes, updated_datasets, }; use nexus_db_model::schema; -use nexus_db_model::ByteCount; use omicron_common::api::external; const NOT_ENOUGH_DATASETS_SENTINEL: &'static str = "Not enough datasets"; const NOT_ENOUGH_ZPOOL_SPACE_SENTINEL: &'static str = "Not enough space"; +const NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL: &'static str = + "Not enough unique zpools selected"; /// Translates a generic pool error to an external error based /// on messages which may be emitted during region provisioning. pub fn from_pool(e: async_bb8_diesel::PoolError) -> external::Error { use crate::db::error; - let sentinels = - [NOT_ENOUGH_DATASETS_SENTINEL, NOT_ENOUGH_ZPOOL_SPACE_SENTINEL]; + let sentinels = [ + NOT_ENOUGH_DATASETS_SENTINEL, + NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, + NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, + ]; if let Some(sentinel) = matches_sentinel(&e, &sentinels) { match sentinel { NOT_ENOUGH_DATASETS_SENTINEL => { @@ -49,6 +56,11 @@ pub fn from_pool(e: async_bb8_diesel::PoolError) -> external::Error { "Not enough zpool space to allocate disks", ); } + NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL => { + return external::Error::unavail( + "Not enough unique zpools selected while allocating disks", + ); + } // Fall-through to the generic error conversion. _ => {} } @@ -86,47 +98,72 @@ struct CandidateDatasets { } impl CandidateDatasets { - fn new() -> Self { + fn new( + allocation_strategy: &RegionAllocationStrategy, + candidate_zpools: &CandidateZpools, + ) -> Self { use crate::db::schema::dataset::dsl as dataset_dsl; + use candidate_zpools::dsl as candidate_zpool_dsl; + + let query = match allocation_strategy { + #[cfg(test)] + RegionAllocationStrategy::LeastUsedDisk => { + let query: Box< + dyn CteQuery, + > = Box::new( + dataset_dsl::dataset + .inner_join( + candidate_zpools + .query_source() + .on(dataset_dsl::pool_id + .eq(candidate_zpool_dsl::pool_id)), + ) + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::size_used.is_not_null()) + .filter(dataset_dsl::kind.eq(DatasetKind::Crucible)) + .order(dataset_dsl::size_used.asc()) + .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()) + .select((dataset_dsl::id, dataset_dsl::pool_id)), + ); + query + } + RegionAllocationStrategy::Random(seed) => { + let seed = seed.unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + }); + + let seed_bytes = seed.to_le_bytes(); + + let query: Box< + dyn CteQuery, + > = Box::new( + dataset_dsl::dataset + .inner_join( + candidate_zpools + .query_source() + .on(dataset_dsl::pool_id + .eq(candidate_zpool_dsl::pool_id)), + ) + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::size_used.is_not_null()) + .filter(dataset_dsl::kind.eq(DatasetKind::Crucible)) + // We order by md5 to shuffle the ordering of the datasets. + // md5 has a uniform output distribution so it does the job. + .order(cockroach_md5::dsl::md5( + CastUuidToBytea::new(dataset_dsl::id) + .concat(seed_bytes.to_vec()), + )) + .select((dataset_dsl::id, dataset_dsl::pool_id)) + .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()), + ); + query + } + }; - Self { - query: Box::new( - dataset_dsl::dataset - .filter(dataset_dsl::time_deleted.is_null()) - .filter(dataset_dsl::size_used.is_not_null()) - .filter(dataset_dsl::kind.eq(DatasetKind::Crucible)) - .order(dataset_dsl::size_used.asc()) - .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()) - .select((dataset_dsl::id, dataset_dsl::pool_id)), - ), - } - } -} - -/// A subquery to find the total size of zpools containing candidate datasets. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_zpools)] -struct CandidateZpools { - query: Box>, -} - -impl CandidateZpools { - fn new(candidate_datasets: &CandidateDatasets) -> Self { - use crate::db::schema::zpool::dsl as zpool_dsl; - use candidate_datasets::dsl as candidate_datasets_dsl; - - Self { - query: Box::new( - zpool_dsl::zpool - .inner_join( - candidate_datasets - .query_source() - .on(candidate_datasets_dsl::pool_id - .eq(zpool_dsl::id)), - ) - .select((zpool_dsl::id, zpool_dsl::total_size)), - ), - } + Self { query } } } @@ -144,18 +181,19 @@ impl CandidateRegions { fn new( candidate_datasets: &CandidateDatasets, volume_id: uuid::Uuid, - block_size: ByteCount, - blocks_per_extent: i64, - extent_count: i64, + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, ) -> Self { use candidate_datasets::dsl as candidate_datasets_dsl; use schema::region; let volume_id = volume_id.into_sql::(); - let block_size = block_size.into_sql::(); + let block_size = (block_size as i64).into_sql::(); let blocks_per_extent = - blocks_per_extent.into_sql::(); - let extent_count = extent_count.into_sql::(); + (blocks_per_extent as i64).into_sql::(); + let extent_count = + (extent_count as i64).into_sql::(); Self { query: Box::new(candidate_datasets.query_source().select(( ExpressionAlias::new::(gen_random_uuid()), @@ -219,17 +257,11 @@ struct OldPoolUsage { } impl OldPoolUsage { - fn new(candidate_zpools: &CandidateZpools) -> Self { + fn new() -> Self { use crate::db::schema::dataset::dsl as dataset_dsl; Self { query: Box::new( dataset_dsl::dataset - .inner_join( - candidate_zpools - .query_source() - .on(dataset_dsl::pool_id - .eq(candidate_zpools::dsl::id)), - ) .group_by(dataset_dsl::pool_id) .filter(dataset_dsl::size_used.is_not_null()) .filter(dataset_dsl::time_deleted.is_null()) @@ -245,49 +277,19 @@ impl OldPoolUsage { } } -/// A subquery which calculates the sum of all dataset size changes -/// on the appropriate zpools. -#[derive(Subquery, QueryId)] -#[subquery(name = zpool_size_delta)] -struct ZpoolSizeDelta { - query: Box>, -} - -impl ZpoolSizeDelta { - fn new(proposed_changes: &ProposedChanges) -> Self { - Self { - query: Box::new( - proposed_changes.query_source() - .group_by(proposed_dataset_changes::dsl::pool_id) - .select(( - ExpressionAlias::new::(proposed_dataset_changes::dsl::pool_id), - ExpressionAlias::new::(diesel::dsl::sum(proposed_dataset_changes::dsl::size_used_delta).assume_not_null()), - )) - ) - } - } -} - -/// A subquery which identifies if the proposed size changes will -/// fit on the corresponding zpools. +/// A subquery which identifies zpools with enough space for a region allocation. #[derive(Subquery, QueryId)] -#[subquery(name = proposed_datasets_fit)] -struct ProposedDatasetsFit { - query: Box>, +#[subquery(name = candidate_zpools)] +struct CandidateZpools { + query: Box>, } -impl ProposedDatasetsFit { - fn new( - old_zpool_usage: &OldPoolUsage, - zpool_size_delta: &ZpoolSizeDelta, - ) -> Self { +impl CandidateZpools { + fn new(old_zpool_usage: &OldPoolUsage, zpool_size_delta: u64) -> Self { use schema::zpool::dsl as zpool_dsl; let with_zpool = zpool_dsl::zpool .on(zpool_dsl::id.eq(old_zpool_usage::dsl::pool_id)); - let with_zpool_size_delta = - zpool_size_delta.query_source().on(zpool_size_delta::dsl::pool_id - .eq(old_zpool_usage::dsl::pool_id)); // Why are we using raw `diesel::dsl::sql` here? // @@ -299,19 +301,21 @@ impl ProposedDatasetsFit { // is safe (after all, we basically want to promote "total_size" to a // Numeric too) but Diesel demands that the input and output SQL types // of expression methods like ".le" match exactly. + // + // For similar reasons, we use `diesel::dsl::sql` with zpool_size_delta. + // We would like to add it, but diesel only permits us to `to_sql()` it + // into a BigInt, not a Numeric. I welcome a better solution. let it_will_fit = (old_zpool_usage::dsl::size_used - + zpool_size_delta::dsl::size_used_delta) - .le(diesel::dsl::sql(zpool_dsl::total_size::NAME)); + + diesel::dsl::sql(&zpool_size_delta.to_string())) + .le(diesel::dsl::sql(zpool_dsl::total_size::NAME)); Self { query: Box::new( old_zpool_usage .query_source() .inner_join(with_zpool) - .inner_join(with_zpool_size_delta) - .select((ExpressionAlias::new::< - proposed_datasets_fit::dsl::fits, - >(it_will_fit),)), + .filter(it_will_fit) + .select((old_zpool_usage::dsl::pool_id,)), ), } } @@ -339,7 +343,7 @@ impl DoInsert { fn new( old_regions: &OldRegions, candidate_regions: &CandidateRegions, - proposed_datasets_fit: &ProposedDatasetsFit, + candidate_zpools: &CandidateZpools, ) -> Self { let redundancy = REGION_REDUNDANCY_THRESHOLD as i64; let not_allocated_yet = old_regions @@ -348,17 +352,48 @@ impl DoInsert { .single_value() .assume_not_null() .lt(redundancy); - let enough_candidates = candidate_regions + + let enough_candidate_zpools = candidate_zpools .query_source() .count() .single_value() .assume_not_null() .ge(redundancy); - let proposals_fit = proposed_datasets_fit + + let enough_candidate_regions = candidate_regions .query_source() - .select(bool_and(proposed_datasets_fit::dsl::fits)) + .count() .single_value() - .assume_not_null(); + .assume_not_null() + .ge(redundancy); + + // We want to ensure that we do not allocate on two datasets in the same + // zpool, for two reasons + // - Data redundancy: If a drive fails it should only take one of the 3 + // regions with it + // - Risk of overallocation: We only check that each zpool as enough + // room for one region, so we should not allocate more than one region + // to it. + // + // Selecting two datasets on the same zpool will not initially be + // possible, as at the time of writing each zpool only has one dataset. + // Additionally, we intend to modify the allocation strategy to select + // from 3 distinct sleds, removing the possibility entirely. But, if we + // introduce a change that adds another crucible dataset to zpools + // before we improve the allocation strategy, this check will make sure + // we don't violate drive redundancy, and generate an error instead. + use crate::db::schema::dataset::dsl as dataset_dsl; + use candidate_regions::dsl as candidate_dsl; + let enough_unique_candidate_zpools = candidate_regions + .query_source() + .inner_join( + dataset_dsl::dataset + .on(candidate_dsl::dataset_id.eq(dataset_dsl::id)), + ) + .select(diesel::dsl::count_distinct(dataset_dsl::pool_id)) + .single_value() + .assume_not_null() + .ge(redundancy); Self { query: Box::new(diesel::select((ExpressionAlias::new::< @@ -366,12 +401,16 @@ impl DoInsert { >( not_allocated_yet .and(TrueOrCastError::new( - enough_candidates, + enough_candidate_zpools, + NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, + )) + .and(TrueOrCastError::new( + enough_candidate_regions, NOT_ENOUGH_DATASETS_SENTINEL, )) .and(TrueOrCastError::new( - proposals_fit, - NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, + enough_unique_candidate_zpools, + NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, )), ),))), } @@ -464,13 +503,22 @@ pub struct RegionAllocate { impl RegionAllocate { pub fn new( volume_id: uuid::Uuid, - block_size: ByteCount, - blocks_per_extent: i64, - extent_count: i64, + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, + allocation_strategy: &RegionAllocationStrategy, ) -> Self { + let size_delta = block_size * blocks_per_extent * extent_count; + let old_regions = OldRegions::new(volume_id); - let candidate_datasets = CandidateDatasets::new(); - let candidate_zpools = CandidateZpools::new(&candidate_datasets); + + let old_pool_usage = OldPoolUsage::new(); + let candidate_zpools = + CandidateZpools::new(&old_pool_usage, size_delta); + + let candidate_datasets = + CandidateDatasets::new(&allocation_strategy, &candidate_zpools); + let candidate_regions = CandidateRegions::new( &candidate_datasets, volume_id, @@ -479,15 +527,8 @@ impl RegionAllocate { extent_count, ); let proposed_changes = ProposedChanges::new(&candidate_regions); - let old_pool_usage = OldPoolUsage::new(&candidate_zpools); - let zpool_size_delta = ZpoolSizeDelta::new(&proposed_changes); - let proposed_datasets_fit = - ProposedDatasetsFit::new(&old_pool_usage, &zpool_size_delta); - let do_insert = DoInsert::new( - &old_regions, - &candidate_regions, - &proposed_datasets_fit, - ); + let do_insert = + DoInsert::new(&old_regions, &candidate_regions, &candidate_zpools); let insert_regions = InsertRegions::new(&do_insert, &candidate_regions); let updated_datasets = UpdateDatasets::new(&do_insert, &proposed_changes); @@ -533,13 +574,11 @@ impl RegionAllocate { let cte = CteBuilder::new() .add_subquery(old_regions) - .add_subquery(candidate_datasets) + .add_subquery(old_pool_usage) .add_subquery(candidate_zpools) + .add_subquery(candidate_datasets) .add_subquery(candidate_regions) .add_subquery(proposed_changes) - .add_subquery(old_pool_usage) - .add_subquery(zpool_size_delta) - .add_subquery(proposed_datasets_fit) .add_subquery(do_insert) .add_subquery(insert_regions) .add_subquery(updated_datasets) diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index a9c5a0f2651..3379b4f3360 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -40,8 +40,8 @@ pub async fn ensure_region_in_dataset( let region_request = CreateRegion { block_size: region.block_size().to_bytes(), - extent_count: region.extent_count().try_into().unwrap(), - extent_size: region.blocks_per_extent().try_into().unwrap(), + extent_count: region.extent_count(), + extent_size: region.blocks_per_extent(), // TODO: Can we avoid casting from UUID to string? // NOTE: This'll require updating the crucible agent client. id: RegionId(region.id().to_string()), diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index b32cccadc05..d41345c9444 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -16,6 +16,7 @@ use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; use crate::external_api::params; use crate::{authn, authz, db}; +use nexus_db_queries::db::datastore::RegionAllocationStrategy; use omicron_common::api::external::Error; use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Deserialize; @@ -250,6 +251,7 @@ async fn sdc_alloc_regions( volume_id, ¶ms.create_params.disk_source, params.create_params.size, + &RegionAllocationStrategy::Random(None), ) .await .map_err(ActionError::action_failed)?; diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 180f1fe795c..8e4851f350d 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -106,6 +106,7 @@ use crate::{authn, authz, db}; use anyhow::anyhow; use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; use nexus_db_model::Generation; +use nexus_db_queries::db::datastore::RegionAllocationStrategy; use omicron_common::api::external; use omicron_common::api::external::Error; use rand::{rngs::StdRng, RngCore, SeedableRng}; @@ -338,6 +339,7 @@ async fn ssc_alloc_regions( .map_err(|e| ActionError::action_failed(e.to_string()))?, }, external::ByteCount::from(disk.size), + &RegionAllocationStrategy::Random(None), ) .await .map_err(ActionError::action_failed)?;