diff --git a/Cargo.lock b/Cargo.lock index 06bec1aaf2b..bd89ced5688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,6 +578,20 @@ dependencies = [ "xts-mode", ] +[[package]] +name = "crucible-agent-client" +version = "0.0.1" +source = "git+https://github.com/oxidecomputer/crucible?rev=078d364e14d57d5faa3a44001c65709935419779#078d364e14d57d5faa3a44001c65709935419779" +dependencies = [ + "anyhow", + "percent-encoding", + "progenitor", + "reqwest", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "crucible-common" version = "0.0.0" @@ -1969,6 +1983,7 @@ dependencies = [ "chrono", "cookie", "criterion", + "crucible-agent-client", "db-macros", "diesel", "diesel-dtrace", @@ -2057,6 +2072,7 @@ dependencies = [ "bytes", "cfg-if", "chrono", + "crucible-agent-client", "dropshot", "expectorate", "futures", diff --git a/common/src/api/external/mod.rs b/common/src/api/external/mod.rs index 97494bdd677..994fe6dd5c0 100644 --- a/common/src/api/external/mod.rs +++ b/common/src/api/external/mod.rs @@ -399,7 +399,7 @@ impl JsonSchema for RoleName { * the database as an i64. Constraining it here ensures that we can't fail to * serialize the value. */ -#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq)] pub struct ByteCount(u64); impl ByteCount { @@ -468,8 +468,8 @@ impl From for ByteCount { } } -impl From<&ByteCount> for i64 { - fn from(b: &ByteCount) -> Self { +impl From for i64 { + fn from(b: ByteCount) -> Self { /* We have already validated that this value is in range. */ i64::try_from(b.0).unwrap() } @@ -2072,12 +2072,12 @@ mod test { /* Largest supported value: both constructors that support it. */ let max = ByteCount::try_from(i64::MAX).unwrap(); assert_eq!(i64::MAX, max.to_bytes() as i64); - assert_eq!(i64::MAX, i64::from(&max)); + assert_eq!(i64::MAX, i64::from(max)); let maxu64 = u64::try_from(i64::MAX).unwrap(); let max = ByteCount::try_from(maxu64).unwrap(); assert_eq!(i64::MAX, max.to_bytes() as i64); - assert_eq!(i64::MAX, i64::from(&max)); + assert_eq!(i64::MAX, i64::from(max)); assert_eq!( (i64::MAX / 1024 / 1024 / 1024 / 1024) as u64, max.to_whole_tebibytes() diff --git a/common/src/sql/dbinit.sql b/common/src/sql/dbinit.sql index 2fe6a42afb0..6780574a181 100644 --- a/common/src/sql/dbinit.sql +++ b/common/src/sql/dbinit.sql @@ -105,13 +105,21 @@ CREATE TABLE omicron.public.Dataset ( /* FK into the Pool table */ pool_id UUID NOT NULL, - /* Contact information for the downstairs region */ + /* Contact information for the dataset */ ip INET NOT NULL, port INT4 NOT NULL, - kind omicron.public.dataset_kind NOT NULL + kind omicron.public.dataset_kind NOT NULL, + + /* An upper bound on the amount of space that might be in-use */ + size_used INT ); +/* Create an index on the size usage for Crucible's allocation */ +CREATE INDEX on omicron.public.Dataset ( + size_used +) WHERE size_used IS NOT NULL AND time_deleted IS NULL AND kind = 'crucible'; + /* * A region of space allocated to Crucible Downstairs, within a dataset. */ @@ -129,10 +137,17 @@ CREATE TABLE omicron.public.Region ( /* Metadata describing the region */ block_size INT NOT NULL, - extent_size INT NOT NULL, + blocks_per_extent INT NOT NULL, extent_count INT NOT NULL ); +/* + * Allow all regions belonging to a disk to be accessed quickly. + */ +CREATE INDEX on omicron.public.Region ( + disk_id +); + /* * Organizations */ diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 8275ed45e22..d0b72ea8799 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -9,10 +9,11 @@ path = "../rpaths" [dependencies] anyhow = "1.0" +async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "c849b717be" } async-trait = "0.1.51" bb8 = "0.7.1" -async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "c849b717be" } cookie = "0.16" +crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "078d364e14d57d5faa3a44001c65709935419779" } # Tracking pending 2.0 version. diesel = { git = "https://github.com/diesel-rs/diesel", rev = "ce77c382", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } futures = "0.3.18" diff --git a/nexus/src/db/datastore.rs b/nexus/src/db/datastore.rs index 479713a7aa8..1ae13a9944d 100644 --- a/nexus/src/db/datastore.rs +++ b/nexus/src/db/datastore.rs @@ -40,7 +40,10 @@ use async_bb8_diesel::{ PoolError, }; use chrono::Utc; +use diesel::pg::Pg; use diesel::prelude::*; +use diesel::query_builder::{QueryFragment, QueryId}; +use diesel::query_dsl::methods::LoadQuery; use diesel::upsert::excluded; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use omicron_common::api; @@ -56,7 +59,7 @@ use omicron_common::api::external::{ CreateResult, IdentityMetadataCreateParams, }; use omicron_common::bail_unless; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use std::sync::Arc; use uuid::Uuid; @@ -67,13 +70,13 @@ use crate::db::{ public_error_from_diesel_pool_shouldnt_fail, TransactionError, }, model::{ - ConsoleSession, Dataset, Disk, DiskRuntimeState, Generation, - IncompleteNetworkInterface, Instance, InstanceRuntimeState, Name, - NetworkInterface, Organization, OrganizationUpdate, OximeterInfo, - ProducerEndpoint, Project, ProjectUpdate, RoleAssignmentBuiltin, - RoleBuiltin, RouterRoute, RouterRouteUpdate, Sled, UserBuiltin, Vpc, - VpcFirewallRule, VpcRouter, VpcRouterUpdate, VpcSubnet, - VpcSubnetUpdate, VpcUpdate, Zpool, + ConsoleSession, Dataset, DatasetKind, Disk, DiskRuntimeState, + Generation, IncompleteNetworkInterface, Instance, InstanceRuntimeState, + Name, NetworkInterface, Organization, OrganizationUpdate, OximeterInfo, + ProducerEndpoint, Project, ProjectUpdate, Region, + RoleAssignmentBuiltin, RoleBuiltin, RouterRoute, RouterRouteUpdate, + Sled, UserBuiltin, Vpc, VpcFirewallRule, VpcRouter, VpcRouterUpdate, + VpcSubnet, VpcSubnetUpdate, VpcUpdate, Zpool, }, pagination::paginated, pagination::paginated_multicolumn, @@ -81,6 +84,31 @@ use crate::db::{ update_and_check::{UpdateAndCheck, UpdateStatus}, }; +// Number of unique datasets required to back a region. +// TODO: This should likely turn into a configuration option. +const REGION_REDUNDANCY_THRESHOLD: usize = 3; + +// Represents a query that is ready to be executed. +// +// This helper trait lets the statement either be executed or explained. +// +// U: The output type of executing the statement. +trait RunnableQuery: + RunQueryDsl + + QueryFragment + + LoadQuery + + QueryId +{ +} + +impl RunnableQuery for T where + T: RunQueryDsl + + QueryFragment + + LoadQuery + + QueryId +{ +} + pub struct DataStore { pool: Arc, } @@ -238,6 +266,212 @@ impl DataStore { }) } + fn get_allocated_regions_query( + disk_id: Uuid, + ) -> impl RunnableQuery<(Dataset, Region)> { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + region_dsl::region + .filter(region_dsl::disk_id.eq(disk_id)) + .inner_join( + dataset_dsl::dataset + .on(region_dsl::dataset_id.eq(dataset_dsl::id)), + ) + .select((Dataset::as_select(), Region::as_select())) + } + + /// Gets allocated regions for a disk, and the datasets to which those + /// regions belong. + /// + /// Note that this function does not validate liveness of the Disk, so it + /// may be used in a context where the disk is being deleted. + pub async fn get_allocated_regions( + &self, + disk_id: Uuid, + ) -> Result, Error> { + Self::get_allocated_regions_query(disk_id) + .get_results_async::<(Dataset, Region)>(self.pool()) + .await + .map_err(|e| public_error_from_diesel_pool_shouldnt_fail(e)) + } + + fn get_allocatable_datasets_query() -> impl RunnableQuery { + use db::schema::dataset::dsl; + + dsl::dataset + // We look for valid datasets (non-deleted crucible datasets). + .filter(dsl::size_used.is_not_null()) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::kind.eq(DatasetKind( + crate::internal_api::params::DatasetKind::Crucible, + ))) + .order(dsl::size_used.asc()) + // TODO: We admittedly don't actually *fail* any request for + // running out of space - we try to send the request down to + // crucible agents, and expect them to fail on our behalf in + // out-of-storage conditions. This should undoubtedly be + // handled more explicitly. + .select(Dataset::as_select()) + .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()) + } + + /// Idempotently allocates enough regions to back a disk. + /// + /// Returns the allocated regions, as well as the datasets to which they + /// belong. + pub async fn region_allocate( + &self, + disk_id: Uuid, + params: ¶ms::DiskCreate, + ) -> Result, Error> { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + + // ALLOCATION POLICY + // + // NOTE: This policy can - and should! - be changed. + // + // See https://rfd.shared.oxide.computer/rfd/0205 for a more + // complete discussion. + // + // It is currently acting as a placeholder, showing a feasible + // interaction between datasets and regions. + // + // This policy allocates regions to distinct Crucible datasets, + // favoring datasets with the smallest existing (summed) region + // sizes. Basically, "pick the datasets with the smallest load first". + // + // Longer-term, we should consider: + // - Storage size + remaining free space + // - Sled placement of datasets + // - What sort of loads we'd like to create (even split across all disks + // may not be preferable, especially if maintenance is expected) + #[derive(Debug, thiserror::Error)] + enum RegionAllocateError { + #[error("Not enough datasets for replicated allocation: {0}")] + NotEnoughDatasets(usize), + } + type TxnError = TransactionError; + let params: params::DiskCreate = params.clone(); + self.pool() + .transaction(move |conn| { + // First, for idempotency, check if regions are already + // allocated to this disk. + // + // If they are, return those regions and the associated + // datasets. + let datasets_and_regions = Self::get_allocated_regions_query( + disk_id, + ) + .get_results::<(Dataset, Region)>(conn)?; + if !datasets_and_regions.is_empty() { + return Ok(datasets_and_regions); + } + + let mut datasets: Vec = + Self::get_allocatable_datasets_query() + .get_results::(conn)?; + + if datasets.len() < REGION_REDUNDANCY_THRESHOLD { + return Err(TxnError::CustomError( + RegionAllocateError::NotEnoughDatasets(datasets.len()), + )); + } + + // Create identical regions on each of the following datasets. + let source_datasets = + &mut datasets[0..REGION_REDUNDANCY_THRESHOLD]; + let regions: Vec = source_datasets + .iter() + .map(|dataset| { + Region::new( + dataset.id(), + disk_id, + params.block_size().into(), + params.blocks_per_extent(), + params.extent_count(), + ) + }) + .collect(); + let regions = diesel::insert_into(region_dsl::region) + .values(regions) + .returning(Region::as_returning()) + .get_results(conn)?; + + // Update the tallied sizes in the source datasets containing + // those regions. + let region_size = i64::from(params.block_size()) + * params.blocks_per_extent() + * params.extent_count(); + for dataset in source_datasets.iter_mut() { + dataset.size_used = + dataset.size_used.map(|v| v + region_size); + } + + let dataset_ids: Vec = + source_datasets.iter().map(|ds| ds.id()).collect(); + diesel::update(dataset_dsl::dataset) + .filter(dataset_dsl::id.eq_any(dataset_ids)) + .set( + dataset_dsl::size_used + .eq(dataset_dsl::size_used + region_size), + ) + .execute(conn)?; + + // Return the regions with the datasets to which they were allocated. + Ok(source_datasets + .into_iter() + .map(|d| d.clone()) + .zip(regions) + .collect()) + }) + .await + .map_err(|e| { + Error::internal_error(&format!("Transaction error: {}", e)) + }) + } + + /// Deletes all regions backing a disk. + /// + /// Also updates the storage usage on their corresponding datasets. + pub async fn regions_hard_delete(&self, disk_id: Uuid) -> DeleteResult { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + + // Remove the regions, collecting datasets they're from. + let (dataset_id, size) = diesel::delete(region_dsl::region) + .filter(region_dsl::disk_id.eq(disk_id)) + .returning(( + region_dsl::dataset_id, + region_dsl::block_size + * region_dsl::blocks_per_extent + * region_dsl::extent_count, + )) + .get_result_async::<(Uuid, i64)>(self.pool()) + .await + .map_err(|e| { + Error::internal_error(&format!( + "error deleting regions: {:?}", + e + )) + })?; + + // Update those datasets to which the regions belonged. + diesel::update(dataset_dsl::dataset) + .filter(dataset_dsl::id.eq(dataset_id)) + .set(dataset_dsl::size_used.eq(dataset_dsl::size_used - size)) + .execute_async(self.pool()) + .await + .map_err(|e| { + Error::internal_error(&format!( + "error updating dataset space: {:?}", + e + )) + })?; + + Ok(()) + } + /// Create a organization pub async fn organization_create( &self, @@ -1063,28 +1297,53 @@ impl DataStore { }) } - pub async fn project_delete_disk( + /// Updates a disk record to indicate it has been deleted. + /// + /// Does not attempt to modify any resources (e.g. regions) which may + /// belong to the disk. + // TODO: Delete me (this function, not the disk!), ensure all datastore + // access is auth-checked. + // + // Here's the deal: We have auth checks on access to the database - at the + // time of writing this comment, only a subset of access is protected, and + // "Delete Disk" is actually one of the first targets of this auth check. + // + // However, there are contexts where we want to delete disks *outside* of + // calling the HTTP API-layer "delete disk" endpoint. As one example, during + // the "undo" part of the disk creation saga, we want to allow users to + // delete the disk they (partially) created. + // + // This gets a little tricky mapping back to user permissions - a user + // SHOULD be able to create a disk with the "create" permission, without the + // "delete" permission. To still make the call internally, we'd basically + // need to manufacture a token that identifies the ability to "create a + // disk, or delete a very specific disk with ID = ...". + pub async fn project_delete_disk_no_auth( &self, - opctx: &OpContext, - disk_authz: &authz::ProjectChild, - ) -> DeleteResult { + disk_id: &Uuid, + ) -> Result<(), Error> { use db::schema::disk::dsl; + let pool = self.pool(); let now = Utc::now(); - let disk_id = disk_authz.id(); - opctx.authorize(authz::Action::Delete, disk_authz).await?; + let ok_to_delete_states = vec![ + api::external::DiskState::Detached, + api::external::DiskState::Faulted, + api::external::DiskState::Creating, + ]; + let ok_to_delete_state_labels: Vec<_> = + ok_to_delete_states.iter().map(|s| s.label()).collect(); let destroyed = api::external::DiskState::Destroyed.label(); - let detached = api::external::DiskState::Detached.label(); - let faulted = api::external::DiskState::Faulted.label(); let result = diesel::update(dsl::disk) .filter(dsl::time_deleted.is_null()) .filter(dsl::id.eq(*disk_id)) - .filter(dsl::disk_state.eq_any(vec![detached, faulted])) + .filter(dsl::disk_state.eq_any(ok_to_delete_state_labels)) + .filter(dsl::attach_instance_id.is_null()) .set((dsl::disk_state.eq(destroyed), dsl::time_deleted.eq(now))) .check_if_exists::(*disk_id) - .execute_and_check(self.pool_authorized(opctx).await?) + .execute_and_check(pool) .await .map_err(|e| { public_error_from_diesel_pool( @@ -1096,12 +1355,37 @@ impl DataStore { match result.status { UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => Err(Error::InvalidRequest { - message: format!( - "disk cannot be deleted in state \"{}\"", - result.found.runtime_state.disk_state - ), - }), + UpdateStatus::NotUpdatedButExists => { + let disk = result.found; + let disk_state = disk.state(); + if disk.time_deleted().is_some() + && disk_state.state() + == &api::external::DiskState::Destroyed + { + // To maintain idempotency, if the disk has already been + // destroyed, don't throw an error. + return Ok(()); + } else if !ok_to_delete_states.contains(disk_state.state()) { + return Err(Error::InvalidRequest { + message: format!( + "disk cannot be deleted in state \"{}\"", + disk.runtime_state.disk_state + ), + }); + } else if disk_state.is_attached() { + return Err(Error::InvalidRequest { + message: String::from("disk is attached"), + }); + } else { + // NOTE: This is a "catch-all" error case, more specific + // errors should be preferred as they're more actionable. + return Err(Error::InternalError { + internal_message: String::from( + "disk exists, but cannot be deleted", + ), + }); + } + } } } @@ -2413,17 +2697,21 @@ pub async fn datastore_test( #[cfg(test)] mod test { - use super::datastore_test; + use super::*; use crate::authz; + use crate::db::explain::ExplainableAsync; use crate::db::identity::Resource; use crate::db::model::{ConsoleSession, Organization, Project}; use crate::external_api::params; use chrono::{Duration, Utc}; use nexus_test_utils::db::test_setup_database; use omicron_common::api::external::{ - Error, IdentityMetadataCreateParams, LookupType, + ByteCount, Error, IdentityMetadataCreateParams, LookupType, Name, }; use omicron_test_utils::dev; + use std::collections::HashSet; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; use uuid::Uuid; #[tokio::test] @@ -2517,4 +2805,304 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + // Creates a test sled, returns its UUID. + async fn create_test_sled(datastore: &DataStore) -> Uuid { + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, bogus_addr.clone()); + datastore.sled_upsert(sled).await.unwrap(); + sled_id + } + + fn test_zpool_size() -> ByteCount { + ByteCount::from_gibibytes_u32(100) + } + + // Creates a test zpool, returns its UUID. + async fn create_test_zpool(datastore: &DataStore, sled_id: Uuid) -> Uuid { + let zpool_id = Uuid::new_v4(); + let zpool = Zpool::new( + zpool_id, + sled_id, + &crate::internal_api::params::ZpoolPutRequest { + size: test_zpool_size(), + }, + ); + datastore.zpool_upsert(zpool).await.unwrap(); + zpool_id + } + + fn create_test_disk_create_params( + name: &str, + size: ByteCount, + ) -> params::DiskCreate { + params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: name.to_string(), + }, + snapshot_id: None, + size, + } + } + + #[tokio::test] + async fn test_region_allocation() { + let logctx = dev::test_setup_log("test_region_allocation"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD * 2; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk1", + ByteCount::from_mebibytes_u32(500), + ); + let disk1_id = Uuid::new_v4(); + // Currently, we only allocate one Region Set per disk. + let expected_region_count = REGION_REDUNDANCY_THRESHOLD; + let dataset_and_regions = + datastore.region_allocate(disk1_id, ¶ms).await.unwrap(); + + // Verify the allocation. + assert_eq!(expected_region_count, dataset_and_regions.len()); + let mut disk1_datasets = HashSet::new(); + for (dataset, region) in dataset_and_regions { + assert!(disk1_datasets.insert(dataset.id())); + assert_eq!(disk1_id, region.disk_id()); + assert_eq!(params.block_size(), region.block_size()); + assert_eq!(params.blocks_per_extent(), region.blocks_per_extent()); + assert_eq!(params.extent_count(), region.extent_count()); + } + + // Allocate regions for a second disk. Observe that we allocate from + // the three previously unused datasets. + let params = create_test_disk_create_params( + "disk2", + ByteCount::from_mebibytes_u32(500), + ); + let disk2_id = Uuid::new_v4(); + let dataset_and_regions = + datastore.region_allocate(disk2_id, ¶ms).await.unwrap(); + assert_eq!(expected_region_count, dataset_and_regions.len()); + let mut disk2_datasets = HashSet::new(); + for (dataset, region) in dataset_and_regions { + assert!(disk2_datasets.insert(dataset.id())); + assert_eq!(disk2_id, region.disk_id()); + assert_eq!(params.block_size(), region.block_size()); + assert_eq!(params.blocks_per_extent(), region.blocks_per_extent()); + assert_eq!(params.extent_count(), region.extent_count()); + } + + // Double-check that the datasets used for the first disk weren't + // used when allocating the second disk. + assert_eq!(0, disk1_datasets.intersection(&disk2_datasets).count()); + + let _ = db.cleanup().await; + } + + #[tokio::test] + async fn test_region_allocation_is_idempotent() { + let logctx = + dev::test_setup_log("test_region_allocation_is_idempotent"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk", + ByteCount::from_mebibytes_u32(500), + ); + let disk_id = Uuid::new_v4(); + let mut dataset_and_regions1 = + datastore.region_allocate(disk_id, ¶ms).await.unwrap(); + let mut dataset_and_regions2 = + datastore.region_allocate(disk_id, ¶ms).await.unwrap(); + + // Give them a consistent order so we can easily compare them. + let sort_vec = |v: &mut Vec<(Dataset, Region)>| { + v.sort_by(|(d1, r1), (d2, r2)| { + let order = d1.id().cmp(&d2.id()); + match order { + std::cmp::Ordering::Equal => r1.id().cmp(&r2.id()), + _ => order, + } + }); + }; + sort_vec(&mut dataset_and_regions1); + sort_vec(&mut dataset_and_regions2); + + // Validate that the two calls to allocate return the same data. + assert_eq!(dataset_and_regions1.len(), dataset_and_regions2.len()); + for i in 0..dataset_and_regions1.len() { + assert_eq!(dataset_and_regions1[i], dataset_and_regions2[i],); + } + + let _ = db.cleanup().await; + } + + #[tokio::test] + async fn test_region_allocation_not_enough_datasets() { + let logctx = + dev::test_setup_log("test_region_allocation_not_enough_datasets"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD - 1; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk1", + ByteCount::from_mebibytes_u32(500), + ); + let disk1_id = Uuid::new_v4(); + let err = + datastore.region_allocate(disk1_id, ¶ms).await.unwrap_err(); + assert!(err + .to_string() + .contains("Not enough datasets for replicated allocation")); + + let _ = db.cleanup().await; + } + + // TODO: This test should be updated when the correct handling + // of this out-of-space case is implemented. + #[tokio::test] + async fn test_region_allocation_out_of_space_does_not_fail_yet() { + let logctx = dev::test_setup_log( + "test_region_allocation_out_of_space_does_not_fail_yet", + ); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + // + // Note that we ask for a disk which is as large as the zpool, + // so we shouldn't have space for redundancy. + let disk_size = test_zpool_size(); + let params = create_test_disk_create_params("disk1", disk_size); + let disk1_id = Uuid::new_v4(); + + // NOTE: This *should* be an error, rather than succeeding. + datastore.region_allocate(disk1_id, ¶ms).await.unwrap(); + + let _ = db.cleanup().await; + } + + // Validate that queries which should be executable without a full table + // scan are, in fact, runnable without a FULL SCAN. + #[tokio::test] + async fn test_queries_do_not_require_full_table_scan() { + let logctx = + dev::test_setup_log("test_queries_do_not_require_full_table_scan"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + let explanation = DataStore::get_allocated_regions_query(Uuid::nil()) + .explain_async(datastore.pool()) + .await + .unwrap(); + assert!( + !explanation.contains("FULL SCAN"), + "Found an unexpected FULL SCAN: {}", + explanation + ); + + let explanation = DataStore::get_allocatable_datasets_query() + .explain_async(datastore.pool()) + .await + .unwrap(); + assert!( + !explanation.contains("FULL SCAN"), + "Found an unexpected FULL SCAN: {}", + explanation + ); + + let _ = db.cleanup().await; + } } diff --git a/nexus/src/db/db-macros/src/lib.rs b/nexus/src/db/db-macros/src/lib.rs index 1bd49481468..3b29099c5dc 100644 --- a/nexus/src/db/db-macros/src/lib.rs +++ b/nexus/src/db/db-macros/src/lib.rs @@ -186,7 +186,7 @@ fn build_resource_identity( let identity_name = format_ident!("{}Identity", struct_name); quote! { #[doc = #identity_doc] - #[derive(Clone, Debug, Selectable, Queryable, Insertable)] + #[derive(Clone, Debug, PartialEq, Selectable, Queryable, Insertable, serde::Serialize, serde::Deserialize)] #[table_name = #table_name ] pub struct #identity_name { pub id: ::uuid::Uuid, @@ -225,7 +225,7 @@ fn build_asset_identity(struct_name: &Ident, table_name: &Lit) -> TokenStream { let identity_name = format_ident!("{}Identity", struct_name); quote! { #[doc = #identity_doc] - #[derive(Clone, Debug, Selectable, Queryable, Insertable)] + #[derive(Clone, Debug, PartialEq, Selectable, Queryable, Insertable, serde::Serialize, serde::Deserialize)] #[table_name = #table_name ] pub struct #identity_name { pub id: ::uuid::Uuid, diff --git a/nexus/src/db/model.rs b/nexus/src/db/model.rs index 87c5587e1c3..1fb308610d9 100644 --- a/nexus/src/db/model.rs +++ b/nexus/src/db/model.rs @@ -27,7 +27,7 @@ use parse_display::Display; use rand::{rngs::StdRng, SeedableRng}; use ref_cast::RefCast; use schemars::JsonSchema; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::net::SocketAddr; use uuid::Uuid; @@ -106,8 +106,9 @@ macro_rules! impl_enum_type { Ord, PartialOrd, RefCast, - Deserialize, JsonSchema, + Serialize, + Deserialize, )] #[sql_type = "sql_types::Text"] #[serde(transparent)] @@ -142,7 +143,16 @@ where } } -#[derive(Copy, Clone, Debug, AsExpression, FromSqlRow)] +#[derive( + Copy, + Clone, + Debug, + AsExpression, + FromSqlRow, + Serialize, + Deserialize, + PartialEq, +)] #[sql_type = "sql_types::BigInt"] pub struct ByteCount(pub external::ByteCount); @@ -158,7 +168,7 @@ where &self, out: &mut serialize::Output, ) -> serialize::Result { - i64::from(&self.0).to_sql(out) + i64::from(self.0).to_sql(out) } } @@ -175,7 +185,17 @@ where } #[derive( - Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, AsExpression, FromSqlRow, + Copy, + Clone, + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + AsExpression, + FromSqlRow, + Serialize, + Deserialize, )] #[sql_type = "sql_types::BigInt"] #[repr(transparent)] @@ -504,11 +524,11 @@ impl DatastoreCollection for Zpool { } impl_enum_type!( - #[derive(SqlType, Debug)] + #[derive(SqlType, Debug, QueryId)] #[postgres(type_name = "dataset_kind", type_schema = "public")] pub struct DatasetKindEnum; - #[derive(Clone, Debug, AsExpression, FromSqlRow)] + #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] #[sql_type = "DatasetKindEnum"] pub struct DatasetKind(pub internal_api::params::DatasetKind); @@ -528,7 +548,17 @@ impl From for DatasetKind { /// /// A dataset represents a portion of a Zpool, which is then made /// available to a service on the Sled. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Asset, + Deserialize, + Serialize, + PartialEq, +)] #[table_name = "dataset"] pub struct Dataset { #[diesel(embed)] @@ -542,6 +572,7 @@ pub struct Dataset { port: i32, kind: DatasetKind, + pub size_used: Option, } impl Dataset { @@ -551,6 +582,10 @@ impl Dataset { addr: SocketAddr, kind: DatasetKind, ) -> Self { + let size_used = match kind { + DatasetKind(internal_api::params::DatasetKind::Crucible) => Some(0), + _ => None, + }; Self { identity: DatasetIdentity::new(id), time_deleted: None, @@ -559,6 +594,7 @@ impl Dataset { ip: addr.ip().into(), port: addr.port().into(), kind, + size_used, } } @@ -588,7 +624,17 @@ impl DatastoreCollection for Disk { /// /// A region represents a portion of a Crucible Downstairs dataset /// allocated within a volume. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Asset, + Serialize, + Deserialize, + PartialEq, +)] #[table_name = "region"] pub struct Region { #[diesel(embed)] @@ -597,11 +643,46 @@ pub struct Region { dataset_id: Uuid, disk_id: Uuid, - block_size: i64, - extent_size: i64, + block_size: ByteCount, + blocks_per_extent: i64, extent_count: i64, } +impl Region { + pub fn new( + dataset_id: Uuid, + disk_id: Uuid, + block_size: ByteCount, + blocks_per_extent: i64, + extent_count: i64, + ) -> Self { + Self { + identity: RegionIdentity::new(Uuid::new_v4()), + dataset_id, + disk_id, + block_size, + blocks_per_extent, + extent_count, + } + } + + pub fn disk_id(&self) -> Uuid { + self.disk_id + } + pub fn dataset_id(&self) -> Uuid { + self.dataset_id + } + pub fn block_size(&self) -> external::ByteCount { + self.block_size.0 + } + pub fn blocks_per_extent(&self) -> i64 { + self.blocks_per_extent + } + pub fn extent_count(&self) -> i64 { + self.extent_count + } +} + /// Describes an organization within the database. #[derive(Queryable, Insertable, Debug, Resource, Selectable)] #[table_name = "organization"] @@ -665,7 +746,7 @@ impl Project { pub fn new(organization_id: Uuid, params: params::ProjectCreate) -> Self { Self { identity: ProjectIdentity::new(Uuid::new_v4(), params.identity), - organization_id: organization_id, + organization_id, } } } @@ -851,7 +932,16 @@ where } /// A Disk (network block device). -#[derive(Queryable, Insertable, Clone, Debug, Selectable, Resource)] +#[derive( + Queryable, + Insertable, + Clone, + Debug, + Selectable, + Resource, + Serialize, + Deserialize, +)] #[table_name = "disk"] pub struct Disk { #[diesel(embed)] @@ -918,7 +1008,16 @@ impl Into for Disk { } } -#[derive(AsChangeset, Clone, Debug, Queryable, Insertable, Selectable)] +#[derive( + AsChangeset, + Clone, + Debug, + Queryable, + Insertable, + Selectable, + Serialize, + Deserialize, +)] #[table_name = "disk"] // When "attach_instance_id" is set to None, we'd like to // clear it from the DB, rather than ignore the update. diff --git a/nexus/src/db/schema.rs b/nexus/src/db/schema.rs index 504543d4bb0..fc7424f4883 100644 --- a/nexus/src/db/schema.rs +++ b/nexus/src/db/schema.rs @@ -191,6 +191,8 @@ table! { port -> Int4, kind -> crate::db::model::DatasetKindEnum, + + size_used -> Nullable, } } @@ -204,7 +206,7 @@ table! { disk_id -> Uuid, block_size -> Int8, - extent_size -> Int8, + blocks_per_extent -> Int8, extent_count -> Int8, } } @@ -324,6 +326,7 @@ table! { } allow_tables_to_appear_in_same_query!( + dataset, disk, instance, metric_producer, @@ -331,6 +334,7 @@ allow_tables_to_appear_in_same_query!( organization, oximeter, project, + region, saga, saga_node_event, console_session, @@ -343,4 +347,5 @@ allow_tables_to_appear_in_same_query!( user_builtin, role_builtin, role_assignment_builtin, + zpool, ); diff --git a/nexus/src/external_api/params.rs b/nexus/src/external_api/params.rs index 8371cd31b4f..7291843fbfe 100644 --- a/nexus/src/external_api/params.rs +++ b/nexus/src/external_api/params.rs @@ -169,6 +169,26 @@ pub struct DiskCreate { pub size: ByteCount, } +const BLOCK_SIZE: u32 = 1_u32 << 12; +const EXTENT_SIZE: u32 = 1_u32 << 20; + +impl DiskCreate { + pub fn block_size(&self) -> ByteCount { + ByteCount::from(BLOCK_SIZE) + } + + pub fn blocks_per_extent(&self) -> i64 { + EXTENT_SIZE as i64 / BLOCK_SIZE as i64 + } + + pub fn extent_count(&self) -> i64 { + let extent_size = EXTENT_SIZE as i64; + let size = self.size.to_bytes() as i64; + size / extent_size + + ((size % extent_size) + extent_size - 1) / extent_size + } +} + /** * Parameters for the [`Disk`] to be attached or detached to an instance */ @@ -193,3 +213,51 @@ pub struct UserBuiltinCreate { #[serde(flatten)] pub identity: IdentityMetadataCreateParams, } + +#[cfg(test)] +mod test { + use super::*; + use std::convert::TryFrom; + + fn new_disk_create_params(size: ByteCount) -> DiskCreate { + DiskCreate { + identity: IdentityMetadataCreateParams { + name: Name::try_from("myobject".to_string()).unwrap(), + description: "desc".to_string(), + }, + snapshot_id: None, + size, + } + } + + #[test] + fn test_extent_count() { + let params = new_disk_create_params(ByteCount::try_from(0u64).unwrap()); + assert_eq!(0, params.extent_count()); + + let params = new_disk_create_params(ByteCount::try_from(1u64).unwrap()); + assert_eq!(1, params.extent_count()); + let params = new_disk_create_params( + ByteCount::try_from(EXTENT_SIZE - 1).unwrap(), + ); + assert_eq!(1, params.extent_count()); + let params = + new_disk_create_params(ByteCount::try_from(EXTENT_SIZE).unwrap()); + assert_eq!(1, params.extent_count()); + + let params = new_disk_create_params( + ByteCount::try_from(EXTENT_SIZE + 1).unwrap(), + ); + assert_eq!(2, params.extent_count()); + + // Mostly just checking we don't blow up on an unwrap here. + let params = + new_disk_create_params(ByteCount::try_from(i64::MAX).unwrap()); + assert!( + params.size.to_bytes() + <= (params.extent_count() as u64) + * (params.blocks_per_extent() as u64) + * params.block_size().to_bytes() + ); + } +} diff --git a/nexus/src/internal_api/params.rs b/nexus/src/internal_api/params.rs index 5d073cce5b2..1e4f420d676 100644 --- a/nexus/src/internal_api/params.rs +++ b/nexus/src/internal_api/params.rs @@ -33,7 +33,7 @@ pub struct ZpoolPutRequest { pub struct ZpoolPutResponse {} /// Describes the purpose of the dataset. -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, Copy, PartialEq)] pub enum DatasetKind { Crucible, Cockroach, diff --git a/nexus/src/nexus.rs b/nexus/src/nexus.rs index 440495a6547..d10aec2ed6b 100644 --- a/nexus/src/nexus.rs +++ b/nexus/src/nexus.rs @@ -209,24 +209,28 @@ impl Nexus { }; /* TODO-cleanup all the extra Arcs here seems wrong */ - let nexus_arc = Arc::new(nexus); + let nexus = Arc::new(nexus); let opctx = OpContext::for_background( log.new(o!("component" => "SagaRecoverer")), authz, authn::Context::internal_saga_recovery(), Arc::clone(&db_datastore), ); + let saga_logger = nexus.log.new(o!("saga_type" => "recovery")); let recovery_task = db::recover( opctx, my_sec_id, - Arc::new(Arc::new(SagaContext::new(Arc::clone(&nexus_arc)))), + Arc::new(Arc::new(SagaContext::new( + Arc::clone(&nexus), + saga_logger, + ))), db_datastore, Arc::clone(&sec_client), &sagas::ALL_TEMPLATES, ); - *nexus_arc.recovery_task.lock().unwrap() = Some(recovery_task); - nexus_arc + *nexus.recovery_task.lock().unwrap() = Some(recovery_task); + nexus } pub async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { @@ -438,8 +442,10 @@ impl Nexus { P: serde::Serialize, { let saga_id = SagaId(Uuid::new_v4()); + let saga_logger = + self.log.new(o!("template_name" => template_name.to_owned())); let saga_context = - Arc::new(Arc::new(SagaContext::new(Arc::clone(self)))); + Arc::new(Arc::new(SagaContext::new(Arc::clone(self), saga_logger))); let future = self .sec_client .saga_create( @@ -676,7 +682,7 @@ impl Nexus { } pub async fn project_create_disk( - &self, + self: &Arc, organization_name: &Name, project_name: &Name, params: ¶ms::DiskCreate, @@ -697,37 +703,22 @@ impl Nexus { }); } - let disk_id = Uuid::new_v4(); - let disk = db::model::Disk::new( - disk_id, - authz_project.id(), - params.clone(), - db::model::DiskRuntimeState::new(), - ); - let disk_created = self.db_datastore.project_create_disk(disk).await?; - - // TODO: Here, we should ensure the disk is backed by appropriate - // regions. This is blocked behind actually having Crucible agents - // running in zones for dedicated zpools. - // - // TODO: Performing this operation, alongside "create" and "update - // state from create to detach", should be executed in a Saga. - - /* - * This is a little hokey. We'd like to simulate an asynchronous - * transition from "Creating" to "Detached". For instances, the - * simulation lives in a simulated sled agent. Here, the analog might - * be a simulated storage control plane. But that doesn't exist yet, - * and we don't even know what APIs it would provide yet. So we just - * carry out the simplest possible "simulation" here: we'll return to - * the client a structure describing a disk in state "Creating", but by - * the time we do so, we've already updated the internal representation - * to "Created". - */ - self.db_datastore - .disk_update_runtime(&disk_id, &disk_created.runtime().detach()) + let saga_params = Arc::new(sagas::ParamsDiskCreate { + project_id: authz_project.id(), + create_params: params.clone(), + }); + let saga_outputs = self + .execute_saga( + Arc::clone(&sagas::SAGA_DISK_CREATE_TEMPLATE), + sagas::SAGA_DISK_CREATE_NAME, + saga_params, + ) .await?; - + let disk_created = saga_outputs + .lookup_output::("created_disk") + .map_err(|e| Error::InternalError { + internal_message: e.to_string(), + })?; Ok(disk_created) } @@ -757,7 +748,7 @@ impl Nexus { } pub async fn project_delete_disk( - &self, + self: &Arc, opctx: &OpContext, organization_name: &Name, project_name: &Name, @@ -766,34 +757,25 @@ impl Nexus { let (disk, authz_disk) = self .project_lookup_disk(organization_name, project_name, disk_name) .await?; - let runtime = disk.runtime(); - bail_unless!(runtime.state().state() != &DiskState::Destroyed); - if runtime.state().is_attached() { - return Err(Error::InvalidRequest { - message: String::from("disk is attached"), - }); - } + // TODO: We need to sort out the authorization checks. + // + // Normally, this would be coupled alongside access to the + // datastore, but now that disk deletion exists within a Saga, + // this would require OpContext to be serialized (which is + // not trivial). + opctx.authorize(authz::Action::Delete, &authz_disk).await?; + + let saga_params = + Arc::new(sagas::ParamsDiskDelete { disk_id: disk.id() }); + self.execute_saga( + Arc::clone(&sagas::SAGA_DISK_DELETE_TEMPLATE), + sagas::SAGA_DISK_DELETE_NAME, + saga_params, + ) + .await?; - /* - * TODO-correctness It's not clear how this handles the case where we - * begin this delete operation while some other request is ongoing to - * attach the disk. We won't be able to see that in the state here. We - * might be able to detect this when we go update the disk's state to - * Attaching (because a SQL UPDATE will update 0 rows), but we'd sort of - * already be in a bad state because the destroyed disk will be - * attaching (and eventually attached) on some sled, and if the wrong - * combination of components crash at this point, we could wind up not - * fixing that state. - * - * This is a consequence of the choice _not_ to record the Attaching - * state in the database before beginning the attach process. If we did - * that, we wouldn't have this problem, but we'd have a similar problem - * of dealing with the case of a crash after recording this state and - * before actually beginning the attach process. Sagas can maybe - * address that. - */ - self.db_datastore.project_delete_disk(opctx, &authz_disk).await + Ok(()) } /* diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index 6f20243eb44..a3d54961797 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -11,6 +11,7 @@ use crate::external_api::params; use crate::Nexus; use omicron_common::api::external::Error; use sled_agent_client::Client as SledAgentClient; +use slog::Logger; use std::fmt; use std::sync::Arc; use uuid::Uuid; @@ -22,6 +23,7 @@ use uuid::Uuid; */ pub struct SagaContext { nexus: Arc, + log: Logger, } impl fmt::Debug for SagaContext { @@ -31,8 +33,12 @@ impl fmt::Debug for SagaContext { } impl SagaContext { - pub fn new(nexus: Arc) -> SagaContext { - SagaContext { nexus } + pub fn new(nexus: Arc, log: Logger) -> SagaContext { + SagaContext { nexus, log } + } + + pub fn log(&self) -> &Logger { + &self.log } /* diff --git a/nexus/src/sagas.rs b/nexus/src/sagas.rs index a1f284f5495..34c52b124ba 100644 --- a/nexus/src/sagas.rs +++ b/nexus/src/sagas.rs @@ -14,11 +14,18 @@ */ use crate::db; -use crate::db::identity::Resource; +use crate::db::identity::{Asset, Resource}; use crate::external_api::params; use crate::saga_interface::SagaContext; +use anyhow::anyhow; use chrono::Utc; +use crucible_agent_client::{ + types::{CreateRegion, RegionId, State as RegionState}, + Client as CrucibleAgentClient, +}; +use futures::StreamExt; use lazy_static::lazy_static; +use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::external::IdentityMetadataCreateParams; use omicron_common::api::external::InstanceState; @@ -26,10 +33,12 @@ use omicron_common::api::external::Name; use omicron_common::api::external::NetworkInterface; use omicron_common::api::internal::nexus::InstanceRuntimeState; use omicron_common::api::internal::sled_agent::InstanceHardware; +use omicron_common::backoff::{self, BackoffError}; use serde::Deserialize; use serde::Serialize; +use slog::Logger; use std::collections::BTreeMap; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use std::sync::Arc; use steno::new_action_noop_undo; use steno::ActionContext; @@ -45,9 +54,15 @@ use uuid::Uuid; * We'll need a richer mechanism for registering sagas, but this works for now. */ pub const SAGA_INSTANCE_CREATE_NAME: &'static str = "instance-create"; +pub const SAGA_DISK_CREATE_NAME: &'static str = "disk-create"; +pub const SAGA_DISK_DELETE_NAME: &'static str = "disk-delete"; lazy_static! { pub static ref SAGA_INSTANCE_CREATE_TEMPLATE: Arc> = Arc::new(saga_instance_create()); + pub static ref SAGA_DISK_CREATE_TEMPLATE: Arc> = + Arc::new(saga_disk_create()); + pub static ref SAGA_DISK_DELETE_TEMPLATE: Arc> = + Arc::new(saga_disk_delete()); } lazy_static! { @@ -57,11 +72,23 @@ lazy_static! { fn all_templates( ) -> BTreeMap<&'static str, Arc>>> { - vec![( - SAGA_INSTANCE_CREATE_NAME, - Arc::clone(&SAGA_INSTANCE_CREATE_TEMPLATE) - as Arc>>, - )] + vec![ + ( + SAGA_INSTANCE_CREATE_NAME, + Arc::clone(&SAGA_INSTANCE_CREATE_TEMPLATE) + as Arc>>, + ), + ( + SAGA_DISK_CREATE_NAME, + Arc::clone(&SAGA_DISK_CREATE_TEMPLATE) + as Arc>>, + ), + ( + SAGA_DISK_DELETE_NAME, + Arc::clone(&SAGA_DISK_DELETE_TEMPLATE) + as Arc>>, + ), + ] .into_iter() .collect() } @@ -308,3 +335,353 @@ async fn sic_instance_ensure( .map(|_| ()) .map_err(ActionError::action_failed) } + +#[derive(Debug, Deserialize, Serialize)] +pub struct ParamsDiskCreate { + pub project_id: Uuid, + pub create_params: params::DiskCreate, +} + +#[derive(Debug)] +pub struct SagaDiskCreate; +impl SagaType for SagaDiskCreate { + type SagaParamsType = Arc; + type ExecContextType = Arc; +} + +fn saga_disk_create() -> SagaTemplate { + let mut template_builder = SagaTemplateBuilder::new(); + + template_builder.append( + "disk_id", + "GenerateDiskId", + new_action_noop_undo(sdc_generate_uuid), + ); + + template_builder.append( + "created_disk", + "CreateDiskRecord", + ActionFunc::new_action( + sdc_create_disk_record, + sdc_create_disk_record_undo, + ), + ); + + template_builder.append( + "datasets_and_regions", + "AllocRegions", + ActionFunc::new_action(sdc_alloc_regions, sdc_alloc_regions_undo), + ); + + template_builder.append( + "regions_ensure", + "RegionsEnsure", + ActionFunc::new_action(sdc_regions_ensure, sdc_regions_ensure_undo), + ); + + template_builder.append( + "disk_runtime", + "FinalizeDiskRecord", + new_action_noop_undo(sdc_finalize_disk_record), + ); + + template_builder.build() +} + +async fn sdc_generate_uuid( + _: ActionContext, +) -> Result { + Ok(Uuid::new_v4()) +} + +async fn sdc_create_disk_record( + sagactx: ActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + let disk_id = sagactx.lookup::("disk_id")?; + let disk = db::model::Disk::new( + disk_id, + params.project_id, + params.create_params.clone(), + db::model::DiskRuntimeState::new(), + ); + let disk_created = osagactx + .datastore() + .project_create_disk(disk) + .await + .map_err(ActionError::action_failed)?; + Ok(disk_created) +} + +async fn sdc_create_disk_record_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let disk_id = sagactx.lookup::("disk_id")?; + osagactx.datastore().project_delete_disk_no_auth(&disk_id).await?; + Ok(()) +} + +async fn sdc_alloc_regions( + sagactx: ActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + let disk_id = sagactx.lookup::("disk_id")?; + // Ensure the disk is backed by appropriate regions. + // + // This allocates regions in the database, but the disk state is still + // "creating" - the respective Crucible Agents must be instructed to + // allocate the necessary regions before we can mark the disk as "ready to + // be used". + // + // TODO: Depending on the result of + // https://github.com/oxidecomputer/omicron/issues/613 , we + // should consider using a paginated API to access regions, rather than + // returning all of them at once. + let datasets_and_regions = osagactx + .datastore() + .region_allocate(disk_id, ¶ms.create_params) + .await + .map_err(ActionError::action_failed)?; + Ok(datasets_and_regions) +} + +async fn sdc_alloc_regions_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let disk_id = sagactx.lookup::("disk_id")?; + osagactx.datastore().regions_hard_delete(disk_id).await?; + Ok(()) +} + +async fn ensure_region_in_dataset( + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, +) -> Result { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + let region_request = CreateRegion { + block_size: region.block_size().to_bytes(), + extent_count: region.extent_count().try_into().unwrap(), + extent_size: region.blocks_per_extent().try_into().unwrap(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + volume_id: region.disk_id().to_string(), + }; + + let create_region = || async { + let region = client + .region_create(®ion_request) + .await + .map_err(|e| BackoffError::Permanent(e))?; + match region.state { + RegionState::Requested => Err(BackoffError::Transient(anyhow!( + "Region creation in progress" + ))), + RegionState::Created => Ok(region), + _ => Err(BackoffError::Permanent(anyhow!( + "Failed to create region, unexpected state: {:?}", + region.state + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", delay + ); + }; + + let region = backoff::retry_notify( + backoff::internal_service_policy(), + create_region, + log_create_failure, + ) + .await?; + + Ok(region) +} + +// Arbitrary limit on concurrency, for operations issued +// on multiple regions within a disk at the same time. +const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; + +async fn sdc_regions_ensure( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let datasets_and_regions = sagactx + .lookup::>( + "datasets_and_regions", + )?; + let request_count = datasets_and_regions.len(); + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + ensure_region_in_dataset(log, &dataset, ®ion).await + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>() + .map_err(ActionError::action_failed)?; + + // TODO: Region has a port value, we could store this in the DB? + Ok(()) +} + +async fn delete_regions( + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Result<(), Error> { + let request_count = datasets_and_regions.len(); + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + let id = RegionId(region.id().to_string()); + client.region_delete(&id).await + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + Ok(()) +} + +async fn sdc_regions_ensure_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let datasets_and_regions = sagactx + .lookup::>( + "datasets_and_regions", + )?; + delete_regions(datasets_and_regions).await?; + Ok(()) +} + +async fn sdc_finalize_disk_record( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let _params = sagactx.saga_params(); + + let disk_id = sagactx.lookup::("disk_id")?; + let disk_created = sagactx.lookup::("created_disk")?; + osagactx + .datastore() + .disk_update_runtime(&disk_id, &disk_created.runtime().detach()) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ParamsDiskDelete { + pub disk_id: Uuid, +} + +#[derive(Debug)] +pub struct SagaDiskDelete; +impl SagaType for SagaDiskDelete { + type SagaParamsType = Arc; + type ExecContextType = Arc; +} + +fn saga_disk_delete() -> SagaTemplate { + let mut template_builder = SagaTemplateBuilder::new(); + + template_builder.append( + "no_result", + "DeleteDiskRecord", + // TODO: See the comment on the "DeleteRegions" step, + // we may want to un-delete the disk if we cannot remove + // underlying regions. + new_action_noop_undo(sdd_delete_disk_record), + ); + + template_builder.append( + "no_result", + "DeleteRegions", + // TODO(https://github.com/oxidecomputer/omicron/issues/612): + // We need a way to deal with this operation failing, aside from + // propagating the error to the user. + // + // What if the Sled goes offline? Nexus must ultimately be + // responsible for reconciling this scenario. + // + // The current behavior causes the disk deletion saga to + // fail, but still marks the disk as destroyed. + new_action_noop_undo(sdd_delete_regions), + ); + + template_builder.append( + "no_result", + "DeleteRegionRecords", + new_action_noop_undo(sdd_delete_region_records), + ); + + template_builder.build() +} + +async fn sdd_delete_disk_record( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + osagactx + .datastore() + .project_delete_disk_no_auth(¶ms.disk_id) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdd_delete_regions( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + let datasets_and_regions = osagactx + .datastore() + .get_allocated_regions(params.disk_id) + .await + .map_err(ActionError::action_failed)?; + delete_regions(datasets_and_regions) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdd_delete_region_records( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + osagactx + .datastore() + .regions_hard_delete(params.disk_id) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 0788fab1054..fb7bc555e82 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -124,7 +124,7 @@ pub async fn test_setup_with_config( /* Set up a single sled agent. */ let sa_id = Uuid::parse_str(SLED_AGENT_UUID).unwrap(); - let sa = start_sled_agent( + let sled_agent = start_sled_agent( logctx.log.new(o!( "component" => "omicron_sled_agent::sim::Server", "sled_id" => sa_id.to_string(), @@ -160,7 +160,7 @@ pub async fn test_setup_with_config( internal_client: testctx_internal, database, clickhouse, - sled_agent: sa, + sled_agent, oximeter, producer, logctx, diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 6a3ce16be5a..05935ac1953 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -2,10 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -/*! - * Tests basic disk support in the API - */ +//! Tests basic disk support in the API +use crucible_agent_client::types::State as RegionState; use http::method::Method; use http::StatusCode; use omicron_common::api::external::ByteCount; @@ -16,6 +15,7 @@ use omicron_common::api::external::Instance; use omicron_common::api::external::InstanceCpuCount; use omicron_nexus::TestInterfaces as _; use omicron_nexus::{external_api::params, Nexus}; +use omicron_sled_agent::sim::SledAgent; use sled_agent_client::TestInterfaces as _; use std::sync::Arc; use uuid::Uuid; @@ -35,36 +35,103 @@ use nexus_test_utils::resource_helpers::create_project; use nexus_test_utils::ControlPlaneTestContext; use nexus_test_utils_macros::nexus_test; -/* - * TODO-cleanup the mess of URLs used here and in test_instances.rs ought to - * come from common code. - */ +const ORG_NAME: &str = "test-org"; +const PROJECT_NAME: &str = "springfield-squidport-disks"; +const DISK_NAME: &str = "just-rainsticks"; + +fn get_project_url() -> String { + format!("/organizations/{}/projects/{}", ORG_NAME, PROJECT_NAME) +} + +fn get_disks_url() -> String { + format!("{}/disks", get_project_url()) +} + +fn get_instances_url() -> String { + format!("{}/instances", get_project_url()) +} + +fn get_instance_disks_url(instance_name: &str) -> String { + format!("{}/{}/disks", get_instances_url(), instance_name) +} + +fn get_disk_attach_url(instance_name: &str) -> String { + format!("{}/attach", get_instance_disks_url(instance_name)) +} + +fn get_disk_detach_url(instance_name: &str) -> String { + format!("{}/detach", get_instance_disks_url(instance_name)) +} + +async fn create_org_and_project(client: &ClientTestContext) -> Uuid { + create_organization(&client, ORG_NAME).await; + let project = create_project(client, ORG_NAME, PROJECT_NAME).await; + project.identity.id +} + +struct DiskTest { + sled_agent: Arc, + zpool_id: Uuid, + zpool_size: ByteCount, + dataset_ids: Vec, + project_id: Uuid, +} + +impl DiskTest { + // Creates fake physical storage, an organization, and a project. + async fn new(cptestctx: &ControlPlaneTestContext) -> Self { + let client = &cptestctx.external_client; + let sled_agent = cptestctx.sled_agent.sled_agent.clone(); + + // Create a Zpool. + let zpool_id = Uuid::new_v4(); + let zpool_size = ByteCount::from_gibibytes_u32(10); + sled_agent.create_zpool(zpool_id, zpool_size.to_bytes()).await; + + // Create multiple Datasets within that Zpool. + let dataset_count = 3; + let dataset_ids: Vec<_> = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + sled_agent.create_crucible_dataset(zpool_id, *id).await; + + // By default, regions are created immediately. + let crucible = sled_agent.get_crucible_dataset(zpool_id, *id).await; + crucible + .set_create_callback(Box::new(|_| RegionState::Created)) + .await; + } + + // Create a project for testing. + let project_id = create_org_and_project(&client).await; + + Self { sled_agent, zpool_id, zpool_size, dataset_ids, project_id } + } +} + #[nexus_test] -async fn test_disks(cptestctx: &ControlPlaneTestContext) { +async fn test_disk_not_found_before_creation( + cptestctx: &ControlPlaneTestContext, +) { let client = &cptestctx.external_client; - let apictx = &cptestctx.server.apictx; - let nexus = &apictx.nexus; - - /* Create a project for testing. */ - let org_name = "test-org"; - create_organization(&client, &org_name).await; - let project_name = "springfield-squidport-disks"; - let url_disks = - format!("/organizations/{}/projects/{}/disks", org_name, project_name); - let project = create_project(client, &org_name, &project_name).await; - - /* List disks. There aren't any yet. */ - let disks = disks_list(&client, &url_disks).await; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // List disks. There aren't any yet. + let disks = disks_list(&client, &disks_url).await; assert_eq!(disks.len(), 0); - /* Make sure we get a 404 if we fetch one. */ - let disk_url = format!("{}/just-rainsticks", url_disks); + // Make sure we get a 404 if we fetch one. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); let error = client .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) .await; - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); - /* We should also get a 404 if we delete one. */ + // We should also get a 404 if we delete one. let error = NexusRequest::new( RequestBuilder::new(client, Method::DELETE, &disk_url) .expect_status(Some(StatusCode::NOT_FOUND)), @@ -75,65 +142,239 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .expect("unexpected success") .parsed_body::() .unwrap(); - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_create_attach_detach_delete( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + let nexus = &cptestctx.server.apictx.nexus; + let disks_url = get_disks_url(); - /* Create a disk. */ + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); let new_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { - name: "just-rainsticks".parse().unwrap(), + name: DISK_NAME.parse().unwrap(), description: String::from("sells rainsticks"), }, snapshot_id: None, size: ByteCount::from_gibibytes_u32(1), }; - let disk: Disk = objects_post(&client, &url_disks, new_disk.clone()).await; - assert_eq!(disk.identity.name, "just-rainsticks"); + let disk: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + assert_eq!(disk.identity.name, DISK_NAME); assert_eq!(disk.identity.description, "sells rainsticks"); - assert_eq!(disk.project_id, project.identity.id); + assert_eq!(disk.project_id, test.project_id); assert_eq!(disk.snapshot_id, None); assert_eq!(disk.size.to_whole_mebibytes(), 1024); assert_eq!(disk.state, DiskState::Creating); - /* - * Fetch the disk and expect it to match what we just created except that - * the state will now be "Detached", as the server has simulated the create - * process. - */ + // Fetch the disk and expect it to match what we just created except that + // the state will now be "Detached", as the server has simulated the create + // process. let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.identity.name, "just-rainsticks"); + assert_eq!(disk.identity.name, DISK_NAME); assert_eq!(disk.identity.description, "sells rainsticks"); - assert_eq!(disk.project_id, project.identity.id); + assert_eq!(disk.project_id, test.project_id); assert_eq!(disk.snapshot_id, None); assert_eq!(disk.size.to_whole_mebibytes(), 1024); assert_eq!(disk.state, DiskState::Detached); - /* Attempt to create a second disk with a conflicting name. */ + // List disks again and expect to find the one we just created. + let disks = disks_list(&client, &disks_url).await; + assert_eq!(disks.len(), 1); + disks_eq(&disks[0], &disk); + + // Create an instance to attach the disk. + let url_instances = get_instances_url(); + let instance: Instance = objects_post( + &client, + &url_instances, + params::InstanceCreate { + identity: IdentityMetadataCreateParams { + name: "instance1".parse().unwrap(), + description: "instance1".to_string(), + }, + ncpus: InstanceCpuCount(4), + memory: ByteCount::from_mebibytes_u32(256), + hostname: "instance1".to_string(), + }, + ) + .await; + + // Verify that there are no disks attached to the instance, and specifically + // that our disk is not attached to this instance. + let url_instance_disks = + get_instance_disks_url(instance.identity.name.as_str()); + let disks = objects_list_page::(&client, &url_instance_disks).await; + assert_eq!(disks.items.len(), 0); + + let url_instance_attach_disk = + get_disk_attach_url(instance.identity.name.as_str()); + let url_instance_detach_disk = + get_disk_detach_url(instance.identity.name.as_str()); + + // Start attaching the disk to the instance. + let mut response = client + .make_request( + Method::POST, + &url_instance_attach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let attached_disk: Disk = read_json(&mut response).await; + let instance_id = &instance.identity.id; + assert_eq!(attached_disk.identity.name, disk.identity.name); + assert_eq!(attached_disk.identity.id, disk.identity.id); + assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); + + // Finish simulation of the attachment and verify the new state, both on the + // attachment and the disk itself. + disk_simulate(nexus, &disk.identity.id).await; + let attached_disk: Disk = disk_get(&client, &disk_url).await; + assert_eq!(attached_disk.identity.name, disk.identity.name); + assert_eq!(attached_disk.identity.id, disk.identity.id); + assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); + + // Attach the disk to the same instance. This should complete immediately + // with no state change. + client + .make_request( + Method::POST, + &url_instance_attach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); + + // Begin detaching the disk. + client + .make_request( + Method::POST, + &url_instance_detach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let disk: Disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); + + // Finish the detachment. + disk_simulate(nexus, &disk.identity.id).await; + let disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Detached); + + // Since detach is idempotent, we can detach it again. + client + .make_request( + Method::POST, + &url_instance_detach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + + // A priveleged user should be able to delete the disk. + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // It should no longer be present in our list of disks. + assert_eq!(disks_list(&client, &disks_url).await.len(), 0); + + // We shouldn't find it if we request it explicitly. + let error = client + .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) + .await; + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_create_disk_that_already_exists_fails( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let disk = disk_get(&client, &disk_url).await; + + // Attempt to create a second disk with a conflicting name. let error = client .make_request_error_body( Method::POST, - &url_disks, + &disks_url, new_disk, StatusCode::BAD_REQUEST, ) .await; - assert_eq!(error.message, "already exists: disk \"just-rainsticks\""); + assert_eq!( + error.message, + format!("already exists: disk \"{}\"", DISK_NAME) + ); - /* List disks again and expect to find the one we just created. */ - let disks = disks_list(&client, &url_disks).await; + // List disks again and expect to find the one we just created. + let disks = disks_list(&client, &disks_url).await; assert_eq!(disks.len(), 1); disks_eq(&disks[0], &disk); +} - /* Create an instance to attach the disk. */ - let url_instances = format!( - "/organizations/{}/projects/{}/instances", - org_name, project_name - ); +#[nexus_test] +async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let disk: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + + // Create an instance to attach the disk. + let url_instances = get_instances_url(); let instance: Instance = objects_post( &client, &url_instances, params::InstanceCreate { identity: IdentityMetadataCreateParams { - name: "just-rainsticks".parse().unwrap(), + name: DISK_NAME.parse().unwrap(), description: String::from("sells rainsticks"), }, ncpus: InstanceCpuCount(4), @@ -143,33 +384,19 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { ) .await; - /* - * Verify that there are no disks attached to the instance, and specifically - * that our disk is not attached to this instance. - */ - let url_instance_disks = format!( - "/organizations/{}/projects/{}/instances/{}/disks", - org_name, - project_name, - instance.identity.name.as_str() - ); + // Verify that there are no disks attached to the instance, and specifically + // that our disk is not attached to this instance. + let url_instance_disks = + get_instance_disks_url(instance.identity.name.as_str()); let disks = objects_list_page::(&client, &url_instance_disks).await; assert_eq!(disks.items.len(), 0); - let url_instance_attach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/attach", - org_name, - project_name, - instance.identity.name.as_str(), - ); - let url_instance_detach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/detach", - org_name, - project_name, - instance.identity.name.as_str(), - ); + let url_instance_attach_disk = + get_disk_attach_url(instance.identity.name.as_str()); + let url_instance_detach_disk = + get_disk_detach_url(instance.identity.name.as_str()); - /* Start attaching the disk to the instance. */ + // Start attaching the disk to the instance. let mut response = client .make_request( Method::POST, @@ -185,20 +412,16 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); - /* - * Finish simulation of the attachment and verify the new state, both on the - * attachment and the disk itself. - */ + // Finish simulation of the attachment and verify the new state, both on the + // attachment and the disk itself. disk_simulate(nexus, &disk.identity.id).await; let attached_disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(attached_disk.identity.name, disk.identity.name); assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); - /* - * Attach the disk to the same instance. This should complete immediately - * with no state change. - */ + // Attach the disk to the same instance. This should complete immediately + // with no state change. client .make_request( Method::POST, @@ -211,10 +434,8 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); - /* - * Create a second instance and try to attach the disk to that. This should - * fail and the disk should remain attached to the first instance. - */ + // Create a second instance and try to attach the disk to that. This should + // fail and the disk should remain attached to the first instance. let instance2: Instance = objects_post( &client, &url_instances, @@ -229,18 +450,10 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { }, ) .await; - let url_instance2_attach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/attach", - org_name, - project_name, - instance2.identity.name.as_str(), - ); - let url_instance2_detach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/detach", - org_name, - project_name, - instance2.identity.name.as_str(), - ); + let url_instance2_attach_disk = + get_disk_attach_url(instance2.identity.name.as_str()); + let url_instance2_detach_disk = + get_disk_detach_url(instance2.identity.name.as_str()); let error = client .make_request_error_body( Method::POST, @@ -251,16 +464,17 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); let attached_disk = disk_get(&client, &disk_url).await; assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); - /* - * Begin detaching the disk. - */ + // Begin detaching the disk. client .make_request( Method::POST, @@ -273,7 +487,7 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - /* It's still illegal to attach this disk elsewhere. */ + // It's still illegal to attach this disk elsewhere. let error = client .make_request_error_body( Method::POST, @@ -284,11 +498,14 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* It's even illegal to attach this disk back to the same instance. */ + // It's even illegal to attach this disk back to the same instance. let error = client .make_request_error_body( Method::POST, @@ -297,14 +514,17 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { StatusCode::BAD_REQUEST, ) .await; - /* TODO-debug the error message here is misleading. */ + // TODO-debug the error message here is misleading. assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* However, there's no problem attempting to detach it again. */ + // However, there's no problem attempting to detach it again. client .make_request( Method::POST, @@ -317,12 +537,12 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - /* Finish the detachment. */ + // Finish the detachment. disk_simulate(nexus, &disk.identity.id).await; let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); - /* Since delete is idempotent, we can detach it again -- from either one. */ + // Since delete is idempotent, we can detach it again -- from either one. client .make_request( Method::POST, @@ -342,7 +562,7 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await .unwrap(); - /* Now, start attaching it again to the second instance. */ + // Now, start attaching it again to the second instance. let mut response = client .make_request( Method::POST, @@ -361,10 +581,8 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); - /* - * At this point, it's not legal to attempt to attach it to a different - * instance (the first one). - */ + // At this point, it's not legal to attempt to attach it to a different + // instance (the first one). let error = client .make_request_error_body( Method::POST, @@ -375,11 +593,14 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* It's fine to attempt another attachment to the same instance. */ + // It's fine to attempt another attachment to the same instance. client .make_request( Method::POST, @@ -392,13 +613,20 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); - /* It's not allowed to delete a disk that's attaching. */ - let error = client - .make_request_error(Method::DELETE, &disk_url, StatusCode::BAD_REQUEST) - .await; - assert_eq!(error.message, "disk is attached"); + // It's not allowed to delete a disk that's attaching. + let error = NexusRequest::new( + RequestBuilder::new(client, Method::DELETE, &disk_url) + .expect_status(Some(StatusCode::BAD_REQUEST)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("expected request to fail") + .parsed_body::() + .expect("cannot parse"); + assert_eq!(error.message, "disk cannot be deleted in state \"attaching\""); - /* Now, begin a detach while the disk is still being attached. */ + // Now, begin a detach while the disk is still being attached. client .make_request( Method::POST, @@ -411,54 +639,220 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance2_id.clone())); - /* It's not allowed to delete a disk that's detaching, either. */ - let error = client - .make_request_error(Method::DELETE, &disk_url, StatusCode::BAD_REQUEST) - .await; - assert_eq!(error.message, "disk is attached"); + // It's not allowed to delete a disk that's detaching, either. + let error = NexusRequest::new( + RequestBuilder::new(client, Method::DELETE, &disk_url) + .expect_status(Some(StatusCode::BAD_REQUEST)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("expected request to fail") + .parsed_body::() + .expect("cannot parse"); + assert_eq!(error.message, "disk cannot be deleted in state \"detaching\""); - /* Finish detachment. */ + // Finish detachment. disk_simulate(nexus, &disk.identity.id).await; let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); - /* - * If we're not authenticated, or authenticated as an unprivileged user, we - * shouldn't be able to delete this disk. (We shouldn't be able to even see - * it if we try.) - */ - NexusRequest::expect_failure( - client, - StatusCode::NOT_FOUND, - Method::DELETE, - &disk_url, - ) - .execute() - .await - .expect("expected request to fail"); - NexusRequest::expect_failure( - client, - StatusCode::NOT_FOUND, - Method::DELETE, - &disk_url, - ) - .authn_as(AuthnMode::UnprivilegedUser) - .execute() - .await - .expect("expected request to fail"); NexusRequest::object_delete(client, &disk_url) .authn_as(AuthnMode::PrivilegedUser) .execute() .await .expect("failed to delete disk"); - /* It should no longer be present in our list of disks. */ - assert_eq!(disks_list(&client, &url_disks).await.len(), 0); - /* We shouldn't find it if we request it explicitly. */ + // It should no longer be present in our list of disks. + assert_eq!(disks_list(&client, &disks_url).await.len(), 0); + + // We shouldn't find it if we request it explicitly. let error = client .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) .await; - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_deletion_requires_authentication( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + + const BAD_DISK_NAME: &str = "wonderful-knife"; + let bad_disk_url = format!("{}/{}", disks_url, BAD_DISK_NAME); + + // If we are not authenticated, we should not be able to delete the disk. + // + // We should see the same error regardless of the existence of the disk. + let urls = [&disk_url, &bad_disk_url]; + for url in &urls { + NexusRequest::expect_failure( + client, + StatusCode::NOT_FOUND, + Method::DELETE, + &url, + ) + .execute() + .await + .expect("expected request to fail"); + } + + // If we are unprivileged, we should not be able to delete the disk. + // + // We should see the same error regardless of the existence of the disk. + for url in &urls { + NexusRequest::expect_failure( + client, + StatusCode::NOT_FOUND, + Method::DELETE, + &url, + ) + .authn_as(AuthnMode::UnprivilegedUser) + .execute() + .await + .expect("expected request to fail"); + } + + // Privileged users can delete disks. + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); +} + +#[nexus_test] +async fn test_disk_creation_region_requested_then_started( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Before we create a disk, set the response from the Crucible Agent: + // no matter what regions get requested, they'll always *start* as + // "Requested", and transition to "Created" on the second call. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + let called = std::sync::atomic::AtomicBool::new(false); + crucible + .set_create_callback(Box::new(move |_| { + if !called.load(std::sync::atomic::Ordering::SeqCst) { + called.store(true, std::sync::atomic::Ordering::SeqCst); + RegionState::Requested + } else { + RegionState::Created + } + })) + .await; + } + + // The disk is created successfully, even when this "requested" -> "started" + // transition occurs. + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; +} + +// Tests that region allocation failure causes disk allocation to fail. +#[nexus_test] +async fn test_disk_region_creation_failure( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + + // Before we create a disk, set the response from the Crucible Agent: + // no matter what regions get requested, they'll always fail. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + crucible.set_create_callback(Box::new(|_| RegionState::Failed)).await; + } + + let disk_size = ByteCount::from_gibibytes_u32(3); + let dataset_count = test.dataset_ids.len() as u64; + assert!( + disk_size.to_bytes() * dataset_count < test.zpool_size.to_bytes(), + "Disk size too big for Zpool size" + ); + assert!( + 2 * disk_size.to_bytes() * dataset_count > test.zpool_size.to_bytes(), + "(test constraint) Zpool needs to be smaller (to store only one disk)", + ); + + // Attempt to allocate the disk, observe a server error. + let disks_url = get_disks_url(); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: disk_size, + }; + + // Unfortunately, the error message is only posted internally to the + // logs, and it not returned to the client. + // + // TODO: Maybe consider making this a more informative error? + // How should we propagate this to the client? + client + .make_request_error_body( + Method::POST, + &disks_url, + new_disk.clone(), + StatusCode::INTERNAL_SERVER_ERROR, + ) + .await; + + // After the failed allocation, the disk should not exist. + let disks = disks_list(&client, &disks_url).await; + assert_eq!(disks.len(), 0); + + // After the failed allocation, regions will exist, but be "Failed". + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + let regions = crucible.list().await; + assert_eq!(regions.len(), 1); + assert_eq!(regions[0].state, RegionState::Failed); + } + + // Validate that the underlying regions were released as a part of + // unwinding the failed disk allocation, by performing another disk + // allocation that should succeed. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + crucible.set_create_callback(Box::new(|_| RegionState::Created)).await; + } + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; } async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 8b894f63e33..8875c22802f 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -11,6 +11,8 @@ bincode = "1.3.3" bytes = "1.1" cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } +# Only used by the simulated sled agent. +crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "078d364e14d57d5faa3a44001c65709935419779" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } futures = "0.3.18" ipnetwork = "0.18" diff --git a/sled-agent/src/sim/http_entrypoints_storage.rs b/sled-agent/src/sim/http_entrypoints_storage.rs new file mode 100644 index 00000000000..6ef08a9a373 --- /dev/null +++ b/sled-agent/src/sim/http_entrypoints_storage.rs @@ -0,0 +1,107 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! HTTP entrypoint functions for simulating the storage agent API. + +use crucible_agent_client::types::{CreateRegion, Region, RegionId}; +use dropshot::{ + endpoint, ApiDescription, HttpError, HttpResponseDeleted, HttpResponseOk, + Path as TypedPath, RequestContext, TypedBody, +}; +use schemars::JsonSchema; +use serde::Deserialize; +use std::sync::Arc; + +use super::storage::CrucibleData; + +type CrucibleAgentApiDescription = ApiDescription>; + +/// Returns a description of the Crucible Agent API. +pub fn api() -> CrucibleAgentApiDescription { + fn register_endpoints( + api: &mut CrucibleAgentApiDescription, + ) -> Result<(), String> { + api.register(region_list)?; + api.register(region_create)?; + api.register(region_get)?; + api.register(region_delete)?; + Ok(()) + } + + let mut api = CrucibleAgentApiDescription::new(); + if let Err(err) = register_endpoints(&mut api) { + panic!("failed to register entrypoints: {}", err); + } + api +} + +// TODO: We'd like to de-duplicate as much as possible with the +// real crucible agent here, to avoid skew. + +#[derive(Deserialize, JsonSchema)] +struct RegionPath { + id: RegionId, +} + +#[endpoint { + method = GET, + path = "/crucible/0/regions", +}] +async fn region_list( + rc: Arc>>, +) -> Result>, HttpError> { + let crucible = rc.context(); + Ok(HttpResponseOk(crucible.list().await)) +} + +#[endpoint { + method = POST, + path = "/crucible/0/regions", +}] +async fn region_create( + rc: Arc>>, + body: TypedBody, +) -> Result, HttpError> { + let params = body.into_inner(); + let crucible = rc.context(); + + Ok(HttpResponseOk(crucible.create(params).await)) +} + +#[endpoint { + method = GET, + path = "/crucible/0/regions/{id}", +}] +async fn region_get( + rc: Arc>>, + path: TypedPath, +) -> Result, HttpError> { + let id = path.into_inner().id; + let crucible = rc.context(); + match crucible.get(id).await { + Some(region) => Ok(HttpResponseOk(region)), + None => { + Err(HttpError::for_not_found(None, "Region not found".to_string())) + } + } +} + +#[endpoint { + method = DELETE, + path = "/crucible/0/regions/{id}", +}] +async fn region_delete( + rc: Arc>>, + path: TypedPath, +) -> Result { + let id = path.into_inner().id; + let crucible = rc.context(); + + match crucible.delete(id).await { + Some(_) => Ok(HttpResponseDeleted()), + None => { + Err(HttpError::for_not_found(None, "Region not found".to_string())) + } + } +} diff --git a/sled-agent/src/sim/mod.rs b/sled-agent/src/sim/mod.rs index 0cad7673f32..3824951d854 100644 --- a/sled-agent/src/sim/mod.rs +++ b/sled-agent/src/sim/mod.rs @@ -10,10 +10,13 @@ mod collection; mod config; mod disk; mod http_entrypoints; +mod http_entrypoints_storage; mod instance; mod server; mod simulatable; mod sled_agent; +mod storage; pub use config::{Config, SimMode}; pub use server::{run_server, Server}; +pub use sled_agent::SledAgent; diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 664d4aef5ea..71e0c692152 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -7,6 +7,7 @@ */ use crate::params::DiskStateRequested; +use futures::lock::Mutex; use nexus_client::Client as NexusClient; use omicron_common::api::external::Error; use omicron_common::api::internal::nexus::DiskRuntimeState; @@ -21,6 +22,7 @@ use super::collection::SimCollection; use super::config::SimMode; use super::disk::SimDisk; use super::instance::SimInstance; +use super::storage::{CrucibleData, Storage}; /** * Simulates management of the control plane on a sled @@ -32,13 +34,11 @@ use super::instance::SimInstance; * move later. */ pub struct SledAgent { - /** unique id for this server */ - pub id: Uuid, - /** collection of simulated instances, indexed by instance uuid */ instances: Arc>, /** collection of simulated disks, indexed by disk uuid */ disks: Arc>, + storage: Mutex, } impl SledAgent { @@ -51,25 +51,30 @@ impl SledAgent { id: &Uuid, sim_mode: SimMode, log: Logger, - ctlsc: Arc, + nexus_client: Arc, ) -> SledAgent { info!(&log, "created simulated sled agent"; "sim_mode" => ?sim_mode); let instance_log = log.new(o!("kind" => "instances")); let disk_log = log.new(o!("kind" => "disks")); + let storage_log = log.new(o!("kind" => "storage")); SledAgent { - id: *id, instances: Arc::new(SimCollection::new( - Arc::clone(&ctlsc), + Arc::clone(&nexus_client), instance_log, sim_mode, )), disks: Arc::new(SimCollection::new( - Arc::clone(&ctlsc), + Arc::clone(&nexus_client), disk_log, sim_mode, )), + storage: Mutex::new(Storage::new( + *id, + Arc::clone(&nexus_client), + storage_log, + )), } } @@ -111,4 +116,27 @@ impl SledAgent { pub async fn disk_poke(&self, id: Uuid) { self.disks.sim_poke(id).await; } + + /// Adds a Zpool to the simulated sled agent. + pub async fn create_zpool(&self, id: Uuid, size: u64) { + self.storage.lock().await.insert_zpool(id, size).await; + } + + /// Adds a Crucible Dataset within a zpool. + pub async fn create_crucible_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) { + self.storage.lock().await.insert_dataset(zpool_id, dataset_id).await; + } + + /// Returns a crucible dataset within a particular zpool. + pub async fn get_crucible_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) -> Arc { + self.storage.lock().await.get_dataset(zpool_id, dataset_id).await + } } diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs new file mode 100644 index 00000000000..4e178d410ac --- /dev/null +++ b/sled-agent/src/sim/storage.rs @@ -0,0 +1,250 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Simulated sled agent storage implementation +//! +//! Note, this refers to the "storage which exists on the Sled", rather +//! than the representation of "virtual disks" which would be presented +//! through Nexus' external API. + +use crucible_agent_client::types::{CreateRegion, Region, RegionId, State}; +use futures::lock::Mutex; +use nexus_client::types::{ + ByteCount, DatasetKind, DatasetPutRequest, ZpoolPutRequest, +}; +use nexus_client::Client as NexusClient; +use slog::Logger; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use uuid::Uuid; + +type CreateCallback = Box State + Send + 'static>; + +struct CrucibleDataInner { + regions: HashMap, + on_create: Option, +} + +impl CrucibleDataInner { + fn new() -> Self { + Self { regions: HashMap::new(), on_create: None } + } + + fn set_create_callback(&mut self, callback: CreateCallback) { + self.on_create = Some(callback); + } + + fn list(&self) -> Vec { + self.regions.values().cloned().collect() + } + + fn create(&mut self, params: CreateRegion) -> Region { + let id = Uuid::from_str(¶ms.id.0).unwrap(); + + let state = if let Some(on_create) = &self.on_create { + on_create(¶ms) + } else { + State::Requested + }; + + let region = Region { + id: params.id, + volume_id: params.volume_id, + block_size: params.block_size, + extent_size: params.extent_size, + extent_count: params.extent_count, + // NOTE: This is a lie - no server is running. + port_number: 0, + state, + }; + let old = self.regions.insert(id, region.clone()); + if let Some(old) = old { + assert_eq!( + old.id.0, region.id.0, + "Region already exists, but with a different ID" + ); + } + region + } + + fn get(&self, id: RegionId) -> Option { + let id = Uuid::from_str(&id.0).unwrap(); + self.regions.get(&id).cloned() + } + + fn get_mut(&mut self, id: &RegionId) -> Option<&mut Region> { + let id = Uuid::from_str(&id.0).unwrap(); + self.regions.get_mut(&id) + } + + fn delete(&mut self, id: RegionId) -> Option { + let id = Uuid::from_str(&id.0).unwrap(); + let mut region = self.regions.get_mut(&id)?; + region.state = State::Destroyed; + Some(region.clone()) + } +} + +/// Represents a running Crucible Agent. Contains regions. +pub struct CrucibleData { + inner: Mutex, +} + +impl CrucibleData { + fn new() -> Self { + Self { inner: Mutex::new(CrucibleDataInner::new()) } + } + + pub async fn set_create_callback(&self, callback: CreateCallback) { + self.inner.lock().await.set_create_callback(callback); + } + + pub async fn list(&self) -> Vec { + self.inner.lock().await.list() + } + + pub async fn create(&self, params: CreateRegion) -> Region { + self.inner.lock().await.create(params) + } + + pub async fn get(&self, id: RegionId) -> Option { + self.inner.lock().await.get(id) + } + + pub async fn delete(&self, id: RegionId) -> Option { + self.inner.lock().await.delete(id) + } + + pub async fn set_state(&self, id: &RegionId, state: State) { + self.inner + .lock() + .await + .get_mut(id) + .expect("region does not exist") + .state = state; + } +} + +/// A simulated Crucible Dataset. +/// +/// Contains both the data and the HTTP server. +pub struct CrucibleServer { + server: dropshot::HttpServer>, + data: Arc, +} + +impl CrucibleServer { + fn new(log: &Logger) -> Self { + let data = Arc::new(CrucibleData::new()); + let config = dropshot::ConfigDropshot { + bind_address: SocketAddr::new("127.0.0.1".parse().unwrap(), 0), + ..Default::default() + }; + let dropshot_log = log + .new(o!("component" => "Simulated CrucibleAgent Dropshot Server")); + let server = dropshot::HttpServerStarter::new( + &config, + super::http_entrypoints_storage::api(), + data.clone(), + &dropshot_log, + ) + .expect("Could not initialize server") + .start(); + + CrucibleServer { server, data } + } + + fn address(&self) -> SocketAddr { + self.server.local_addr() + } +} + +pub struct Zpool { + datasets: HashMap, +} + +impl Zpool { + pub fn new() -> Self { + Zpool { datasets: HashMap::new() } + } + + pub fn insert_dataset( + &mut self, + log: &Logger, + id: Uuid, + ) -> &CrucibleServer { + self.datasets.insert(id, CrucibleServer::new(log)); + self.datasets + .get(&id) + .expect("Failed to get the dataset we just inserted") + } +} + +/// Simulated representation of all storage on a sled. +pub struct Storage { + sled_id: Uuid, + nexus_client: Arc, + log: Logger, + zpools: HashMap, +} + +impl Storage { + pub fn new( + sled_id: Uuid, + nexus_client: Arc, + log: Logger, + ) -> Self { + Self { sled_id, nexus_client, log, zpools: HashMap::new() } + } + + /// Adds a Zpool to the sled's simulated storage and notifies Nexus. + pub async fn insert_zpool(&mut self, zpool_id: Uuid, size: u64) { + // Update our local data + self.zpools.insert(zpool_id, Zpool::new()); + + // Notify Nexus + let request = ZpoolPutRequest { size: ByteCount(size) }; + self.nexus_client + .zpool_put(&self.sled_id, &zpool_id, &request) + .await + .expect("Failed to notify Nexus about new Zpool"); + } + + /// Adds a Dataset to the sled's simulated storage and notifies Nexus. + pub async fn insert_dataset(&mut self, zpool_id: Uuid, dataset_id: Uuid) { + // Update our local data + let dataset = self + .zpools + .get_mut(&zpool_id) + .expect("Zpool does not exist") + .insert_dataset(&self.log, dataset_id); + + // Notify Nexus + let request = DatasetPutRequest { + address: dataset.address().to_string(), + kind: DatasetKind::Crucible, + }; + self.nexus_client + .dataset_put(&zpool_id, &dataset_id, &request) + .await + .expect("Failed to notify Nexus about new Dataset"); + } + + pub async fn get_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) -> Arc { + self.zpools + .get(&zpool_id) + .expect("Zpool does not exist") + .datasets + .get(&dataset_id) + .expect("Dataset does not exist") + .data + .clone() + } +}