diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index dc1a2e8f158..091b5dbefd6 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -43,6 +43,8 @@ jobs: steps: # actions/checkout@v2 - uses: actions/checkout@28c7f3d2b5162b5ddd3dfd9a45aa55eaf396478b + - uses: Swatinem/rust-cache@v1 + if: ${{ github.ref != 'refs/heads/main' }} - name: Report cargo version run: cargo --version - name: Check build of deployed Omicron packages @@ -55,6 +57,8 @@ jobs: steps: # actions/checkout@v2 - uses: actions/checkout@28c7f3d2b5162b5ddd3dfd9a45aa55eaf396478b + - uses: Swatinem/rust-cache@v1 + if: ${{ github.ref != 'refs/heads/main' }} - name: Report cargo version run: cargo --version - name: Report Clippy version @@ -76,6 +80,8 @@ jobs: steps: # actions/checkout@v2 - uses: actions/checkout@28c7f3d2b5162b5ddd3dfd9a45aa55eaf396478b + - uses: Swatinem/rust-cache@v1 + if: ${{ github.ref != 'refs/heads/main' }} - name: Report cargo version run: cargo --version - name: Test build documentation @@ -94,6 +100,8 @@ jobs: steps: # actions/checkout@v2 - uses: actions/checkout@28c7f3d2b5162b5ddd3dfd9a45aa55eaf396478b + - uses: Swatinem/rust-cache@v1 + if: ${{ github.ref != 'refs/heads/main' }} - name: Report cargo version run: cargo --version - name: Configure GitHub cache for CockroachDB binaries diff --git a/Cargo.lock b/Cargo.lock index e13ceb4041e..9fbf71fb944 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -453,6 +474,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -487,6 +522,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -527,6 +572,20 @@ dependencies = [ "xts-mode", ] +[[package]] +name = "crucible-agent-client" +version = "0.0.1" +source = "git+https://github.com/oxidecomputer/crucible?rev=7e8d0c31#7e8d0c314adb43698e8a65d1c55abf2225d35c2c" +dependencies = [ + "anyhow", + "percent-encoding", + "progenitor", + "reqwest", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "crucible-common" version = "0.0.0" @@ -851,8 +910,9 @@ checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" [[package]] name = "dropshot" version = "0.6.1-dev" -source = "git+https://github.com/oxidecomputer/dropshot?branch=main#3ced9b099d72e9b975e9a6551989b027415b849b" +source = "git+https://github.com/oxidecomputer/dropshot?branch=main#8e4af93207fb79998eea90bd094ff7a5475673e5" dependencies = [ + "async-stream", "async-trait", "base64", "bytes", @@ -867,6 +927,8 @@ dependencies = [ "paste", "percent-encoding", "proc-macro2", + "rustls", + "rustls-pemfile", "schemars", "serde", "serde_json", @@ -878,6 +940,7 @@ dependencies = [ "slog-term", "syn", "tokio", + "tokio-rustls", "toml", "usdt 0.3.1", "uuid", @@ -886,7 +949,7 @@ dependencies = [ [[package]] name = "dropshot_endpoint" version = "0.6.1-dev" -source = "git+https://github.com/oxidecomputer/dropshot?branch=main#3ced9b099d72e9b975e9a6551989b027415b849b" +source = "git+https://github.com/oxidecomputer/dropshot?branch=main#8e4af93207fb79998eea90bd094ff7a5475673e5" dependencies = [ "proc-macro2", "quote", @@ -1239,9 +1302,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" +checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689" dependencies = [ "bytes", "fnv", @@ -1445,9 +1508,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", "hashbrown", @@ -1761,6 +1824,7 @@ dependencies = [ "percent-encoding", "progenitor", "reqwest", + "schemars", "serde", "serde_json", "slog", @@ -1914,6 +1978,7 @@ dependencies = [ "chrono", "cookie", "criterion", + "crucible-agent-client", "db-macros", "diesel", "diesel-dtrace", @@ -1972,6 +2037,7 @@ name = "omicron-package" version = "0.1.0" dependencies = [ "anyhow", + "crossbeam", "omicron-common", "propolis-server", "rayon", @@ -2001,6 +2067,7 @@ dependencies = [ "bytes", "cfg-if", "chrono", + "crucible-agent-client", "dropshot", "expectorate", "futures", @@ -2191,6 +2258,7 @@ dependencies = [ "percent-encoding", "progenitor", "reqwest", + "schemars", "serde", "slog", "uuid", @@ -2686,7 +2754,7 @@ dependencies = [ [[package]] name = "progenitor" version = "0.0.0" -source = "git+https://github.com/oxidecomputer/progenitor#66b41ba301793b8d720770b2210bee8884446d3f" +source = "git+https://github.com/oxidecomputer/progenitor#f1f9e2e93850713908f4e6494808a07f3b253108" dependencies = [ "anyhow", "getopts", @@ -2700,7 +2768,7 @@ dependencies = [ [[package]] name = "progenitor-impl" version = "0.0.0" -source = "git+https://github.com/oxidecomputer/progenitor#66b41ba301793b8d720770b2210bee8884446d3f" +source = "git+https://github.com/oxidecomputer/progenitor#f1f9e2e93850713908f4e6494808a07f3b253108" dependencies = [ "anyhow", "convert_case", @@ -2714,20 +2782,24 @@ dependencies = [ "schemars", "serde", "serde_json", + "syn", "thiserror", "typify", + "unicode-xid", ] [[package]] name = "progenitor-macro" version = "0.0.0" -source = "git+https://github.com/oxidecomputer/progenitor#66b41ba301793b8d720770b2210bee8884446d3f" +source = "git+https://github.com/oxidecomputer/progenitor#f1f9e2e93850713908f4e6494808a07f3b253108" dependencies = [ "openapiv3", "proc-macro2", "progenitor-impl", "quote", + "serde", "serde_json", + "serde_tokenstream", "syn", ] @@ -2959,15 +3031,16 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" +checksum = "87f242f1488a539a79bac6dbe7c8609ae43b7914b7736210f239a37cccb32525" dependencies = [ "base64", "bytes", "encoding_rs", "futures-core", "futures-util", + "h2", "http", "http-body", "hyper", @@ -3269,9 +3342,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142" +checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" dependencies = [ "itoa 1.0.1", "ryu", @@ -3280,9 +3353,9 @@ dependencies = [ [[package]] name = "serde_tokenstream" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3ce95257fba42a656f558db28d56a9fac5aa6e4f29c5ef607f32f524fab0ab" +checksum = "d6deb15c3a535e81438110111d90168d91721652f502abb147f31cde129f683d" dependencies = [ "proc-macro2", "serde", @@ -3468,6 +3541,7 @@ dependencies = [ "percent-encoding", "progenitor", "reqwest", + "schemars", "serde", "slog", "uuid", @@ -3972,9 +4046,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.1" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4baa378e417d780beff82bf54ceb0d195193ea6a00c14e22359e7f39456b5689" +checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" dependencies = [ "rustls", "tokio", @@ -4129,7 +4203,7 @@ checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" [[package]] name = "typify" version = "0.0.6-dev" -source = "git+https://github.com/oxidecomputer/typify#80b510b02b1db22de463efcf6e7762243bcea67a" +source = "git+https://github.com/oxidecomputer/typify#9afa917671b29fc231bc9ce304e041bdd685af09" dependencies = [ "typify-impl", "typify-macro", @@ -4138,9 +4212,10 @@ dependencies = [ [[package]] name = "typify-impl" version = "0.0.6-dev" -source = "git+https://github.com/oxidecomputer/typify#80b510b02b1db22de463efcf6e7762243bcea67a" +source = "git+https://github.com/oxidecomputer/typify#9afa917671b29fc231bc9ce304e041bdd685af09" dependencies = [ "convert_case", + "log", "proc-macro2", "quote", "rustfmt-wrapper", @@ -4148,17 +4223,20 @@ dependencies = [ "serde_json", "syn", "thiserror", + "unicode-xid", ] [[package]] name = "typify-macro" version = "0.0.6-dev" -source = "git+https://github.com/oxidecomputer/typify#80b510b02b1db22de463efcf6e7762243bcea67a" +source = "git+https://github.com/oxidecomputer/typify#9afa917671b29fc231bc9ce304e041bdd685af09" dependencies = [ "proc-macro2", "quote", "schemars", + "serde", "serde_json", + "serde_tokenstream", "syn", "typify-impl", ] diff --git a/Cargo.toml b/Cargo.toml index 5853d11a0b3..040a8505a63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,4 +69,3 @@ panic = "abort" [patch.crates-io.pq-sys] git = 'https://github.com/oxidecomputer/pq-sys' branch = "oxide/omicron" - diff --git a/README.adoc b/README.adoc index 85bbb73caa6..47ee24a6f10 100644 --- a/README.adoc +++ b/README.adoc @@ -34,7 +34,7 @@ The code here is still rough, but the following are implemented: * actual API: ** projects: CRUD -** instances: CRUD + boot/halt/reboot (simulated) +** instances: CRUD + boot/halt/reboot. ** disks: CRD + attach/detach (simulated) ** racks: list + get ** sleds: list + get diff --git a/common/src/lib.rs b/common/src/lib.rs index 8222f7f2c75..3f734d20159 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -34,18 +34,19 @@ pub mod packaging; macro_rules! generate_logging_api { ($path:literal) => { progenitor::generate_api!( - $path, - slog::Logger, - |log: &slog::Logger, request: &reqwest::Request| { + spec = $path, + inner_type = slog::Logger, + pre_hook = (|log: &slog::Logger, request: &reqwest::Request| { slog::debug!(log, "client request"; "method" => %request.method(), "uri" => %request.url(), "body" => ?&request.body(), ); - }, - |log: &slog::Logger, result: &Result<_, _>| { + }), + post_hook = (|log: &slog::Logger, result: &Result<_, _>| { slog::debug!(log, "client response"; "result" => ?result); - }, + }), + derives = [schemars::JsonSchema], ); }; } diff --git a/nexus-client/Cargo.toml b/nexus-client/Cargo.toml index 0ef31114868..2ceb8bbb54f 100644 --- a/nexus-client/Cargo.toml +++ b/nexus-client/Cargo.toml @@ -9,6 +9,7 @@ anyhow = "1.0" progenitor = { git = "https://github.com/oxidecomputer/progenitor" } reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } percent-encoding = "2.1.0" +schemars = { version = "0.8" } serde_json = "1.0" [dependencies.chrono] diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index b24132958b3..1d5f105f643 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -9,10 +9,11 @@ path = "../rpaths" [dependencies] anyhow = "1.0" +async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "c849b717be" } async-trait = "0.1.51" bb8 = "0.7.1" -async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "c849b717be" } cookie = "0.16" +crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "7e8d0c31" } # Tracking pending 2.0 version. diesel = { git = "https://github.com/diesel-rs/diesel", rev = "ce77c382", features = ["postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } futures = "0.3.18" diff --git a/nexus/src/db/datastore.rs b/nexus/src/db/datastore.rs index f42f47970b9..b2e4c47bf11 100644 --- a/nexus/src/db/datastore.rs +++ b/nexus/src/db/datastore.rs @@ -56,7 +56,7 @@ use omicron_common::api::external::{ CreateResult, IdentityMetadataCreateParams, }; use omicron_common::bail_unless; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use std::sync::Arc; use uuid::Uuid; @@ -67,13 +67,13 @@ use crate::db::{ public_error_from_diesel_pool_shouldnt_fail, TransactionError, }, model::{ - ConsoleSession, Dataset, Disk, DiskRuntimeState, Generation, - IncompleteNetworkInterface, Instance, InstanceRuntimeState, Name, - NetworkInterface, Organization, OrganizationUpdate, OximeterInfo, - ProducerEndpoint, Project, ProjectUpdate, RoleAssignmentBuiltin, - RoleBuiltin, RouterRoute, RouterRouteUpdate, Sled, UserBuiltin, Vpc, - VpcFirewallRule, VpcRouter, VpcRouterUpdate, VpcSubnet, - VpcSubnetUpdate, VpcUpdate, Zpool, + ConsoleSession, Dataset, DatasetKind, Disk, DiskRuntimeState, + Generation, IncompleteNetworkInterface, Instance, InstanceRuntimeState, + Name, NetworkInterface, Organization, OrganizationUpdate, OximeterInfo, + ProducerEndpoint, Project, ProjectUpdate, Region, + RoleAssignmentBuiltin, RoleBuiltin, RouterRoute, RouterRouteUpdate, + Sled, UserBuiltin, Vpc, VpcFirewallRule, VpcRouter, VpcRouterUpdate, + VpcSubnet, VpcSubnetUpdate, VpcUpdate, Zpool, }, pagination::paginated, pagination::paginated_multicolumn, @@ -81,6 +81,10 @@ use crate::db::{ update_and_check::{UpdateAndCheck, UpdateStatus}, }; +// Number of unique datasets required to back a region. +// TODO: This should likely turn into a configuration option. +const REGION_REDUNDANCY_THRESHOLD: usize = 3; + pub struct DataStore { pool: Arc, } @@ -238,6 +242,173 @@ impl DataStore { }) } + /// Gets allocated regions for a disk, and the datasets to which those + /// regions belong. + /// + /// Note that this function does not validate liveness of the Disk, so it + /// may be used in a context where the disk is being deleted. + pub async fn get_allocated_regions( + &self, + disk_id: Uuid, + ) -> Result, Error> { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + region_dsl::region + .filter(region_dsl::disk_id.eq(disk_id)) + .inner_join( + dataset_dsl::dataset + .on(region_dsl::dataset_id.eq(dataset_dsl::id)), + ) + .select((Dataset::as_select(), Region::as_select())) + .get_results_async::<(Dataset, Region)>(self.pool()) + .await + .map_err(|e| public_error_from_diesel_pool_shouldnt_fail(e)) + } + + /// Idempotently allocates enough regions to back a disk. + /// + /// Returns the allocated regions, as well as the datasets to which they + /// belong. + pub async fn region_allocate( + &self, + disk_id: Uuid, + params: ¶ms::DiskCreate, + ) -> Result, Error> { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + + // ALLOCATION POLICY + // + // NOTE: This policy can - and should! - be changed. + // + // See https://rfd.shared.oxide.computer/rfd/0205 for a more + // complete discussion. + // + // It is currently acting as a placeholder, showing a feasible + // interaction between datasets and regions. + // + // This policy allocates regions to distinct Crucible datasets, + // favoring datasets with the smallest existing (summed) region + // sizes. Basically, "pick the datasets with the smallest load first". + // + // Longer-term, we should consider: + // - Storage size + remaining free space + // - Sled placement of datasets + // - What sort of loads we'd like to create (even split across all disks + // may not be preferable, especially if maintenance is expected) + #[derive(Debug, thiserror::Error)] + enum RegionAllocateError { + #[error("Not enough datasets for replicated allocation: {0}")] + NotEnoughDatasets(usize), + } + type TxnError = TransactionError; + let params: params::DiskCreate = params.clone(); + self.pool() + .transaction(move |conn| { + // First, for idempotency, check if regions are already + // allocated to this disk. + // + // If they are, return those regions and the associated + // datasets. + let datasets_and_regions = region_dsl::region + .filter(region_dsl::disk_id.eq(disk_id)) + .inner_join( + dataset_dsl::dataset + .on(region_dsl::dataset_id.eq(dataset_dsl::id)), + ) + .select((Dataset::as_select(), Region::as_select())) + .get_results::<(Dataset, Region)>(conn)?; + if !datasets_and_regions.is_empty() { + return Ok(datasets_and_regions); + } + + let datasets: Vec = dataset_dsl::dataset + // We look for valid datasets (non-deleted crucible datasets). + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::kind.eq(DatasetKind( + crate::internal_api::params::DatasetKind::Crucible, + ))) + // Next, observe all the regions allocated to each dataset, and + // determine how much space they're using. + // + // TODO: We could store "free/allocated" space per-dataset, + // and keep them up-to-date, rather than trying to recompute + // this. + // + // TODO: We admittedly don't actually *fail* any request for + // running out of space - we try to send the request down to + // crucible agents, and expect them to fail on our behalf in + // out-of-storage conditions. This should undoubtedly be + // handled more explicitly. + .left_outer_join( + region_dsl::region + .on(dataset_dsl::id.eq(region_dsl::dataset_id)), + ) + .group_by(dataset_dsl::id) + .select(Dataset::as_select()) + .order( + diesel::dsl::sum( + region_dsl::extent_size * region_dsl::extent_count, + ) + .asc(), + ) + .get_results::(conn)?; + + if datasets.len() < REGION_REDUNDANCY_THRESHOLD { + return Err(TxnError::CustomError( + RegionAllocateError::NotEnoughDatasets(datasets.len()), + )); + } + + // Create identical regions on each of the following datasets. + let source_datasets = &datasets[0..REGION_REDUNDANCY_THRESHOLD]; + let regions: Vec = source_datasets + .iter() + .map(|dataset| { + Region::new( + dataset.id(), + disk_id, + params.block_size().try_into().unwrap(), + params.extent_size().try_into().unwrap(), + params.extent_count().try_into().unwrap(), + ) + }) + .collect(); + let regions = diesel::insert_into(region_dsl::region) + .values(regions) + .returning(Region::as_returning()) + .get_results(conn)?; + + // Return the regions with the datasets to which they were allocated. + Ok(source_datasets + .into_iter() + .map(|d| d.clone()) + .zip(regions) + .collect()) + }) + .await + .map_err(|e| { + Error::internal_error(&format!("Transaction error: {}", e)) + }) + } + + /// Deletes all regions backing a disk. + pub async fn regions_hard_delete(&self, disk_id: Uuid) -> DeleteResult { + use db::schema::region::dsl; + + diesel::delete(dsl::region) + .filter(dsl::disk_id.eq(disk_id)) + .execute_async(self.pool()) + .await + .map(|_| ()) + .map_err(|e| { + Error::internal_error(&format!( + "error deleting regions: {:?}", + e + )) + }) + } + /// Create a organization pub async fn organization_create( &self, @@ -1047,24 +1218,66 @@ impl DataStore { &self, opctx: &OpContext, disk_authz: authz::ProjectChild, + ) -> DeleteResult { + let disk_id = disk_authz.id(); + opctx.authorize(authz::Action::Delete, disk_authz).await?; + self.project_delete_disk_internal( + disk_id, + self.pool_authorized(opctx).await?, + ) + .await + } + + // TODO: Delete me (this function, not the disk!), ensure all datastore + // access is auth-checked. + // + // Here's the deal: We have auth checks on access to the database - at the + // time of writing this comment, only a subset of access is protected, and + // "Delete Disk" is actually one of the first targets of this auth check. + // + // However, there are contexts where we want to delete disks *outside* of + // calling the HTTP API-layer "delete disk" endpoint. As one example, during + // the "undo" part of the disk creation saga, we want to allow users to + // delete the disk they (partially) created. + // + // This gets a little tricky mapping back to user permissions - a user + // SHOULD be able to create a disk with the "create" permission, without the + // "delete" permission. To still make the call internally, we'd basically + // need to manufacture a token that identifies the ability to "create a + // disk, or delete a very specific disk with ID = ...". + pub async fn project_delete_disk_no_auth( + &self, + disk_id: &Uuid, + ) -> DeleteResult { + self.project_delete_disk_internal(disk_id, self.pool()).await + } + + async fn project_delete_disk_internal( + &self, + disk_id: &Uuid, + pool: &bb8::Pool>, ) -> DeleteResult { use db::schema::disk::dsl; let now = Utc::now(); - let disk_id = disk_authz.id(); - opctx.authorize(authz::Action::Delete, disk_authz).await?; + let ok_to_delete_states = vec![ + api::external::DiskState::Detached, + api::external::DiskState::Faulted, + api::external::DiskState::Creating, + ]; + let ok_to_delete_state_labels: Vec<_> = + ok_to_delete_states.iter().map(|s| s.label()).collect(); let destroyed = api::external::DiskState::Destroyed.label(); - let detached = api::external::DiskState::Detached.label(); - let faulted = api::external::DiskState::Faulted.label(); let result = diesel::update(dsl::disk) .filter(dsl::time_deleted.is_null()) .filter(dsl::id.eq(*disk_id)) - .filter(dsl::disk_state.eq_any(vec![detached, faulted])) + .filter(dsl::disk_state.eq_any(ok_to_delete_state_labels)) + .filter(dsl::attach_instance_id.is_null()) .set((dsl::disk_state.eq(destroyed), dsl::time_deleted.eq(now))) .check_if_exists::(*disk_id) - .execute_and_check(self.pool_authorized(opctx).await?) + .execute_and_check(pool) .await .map_err(|e| { public_error_from_diesel_pool( @@ -1076,12 +1289,29 @@ impl DataStore { match result.status { UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => Err(Error::InvalidRequest { - message: format!( - "disk cannot be deleted in state \"{}\"", - result.found.runtime_state.disk_state - ), - }), + UpdateStatus::NotUpdatedButExists => { + let disk_state = result.found.state(); + if !ok_to_delete_states.contains(disk_state.state()) { + return Err(Error::InvalidRequest { + message: format!( + "disk cannot be deleted in state \"{}\"", + result.found.runtime_state.disk_state + ), + }); + } else if disk_state.is_attached() { + return Err(Error::InvalidRequest { + message: String::from("disk is attached"), + }); + } else { + // NOTE: This is a "catch-all" error case, more specific + // errors should be preferred as they're more actionable. + return Err(Error::InvalidRequest { + message: String::from( + "disk exists, but cannot be deleted", + ), + }); + } + } } } @@ -2383,15 +2613,20 @@ pub async fn datastore_test( #[cfg(test)] mod test { - use super::datastore_test; + use super::*; use crate::authz; use crate::db::identity::Resource; use crate::db::model::{ConsoleSession, Organization, Project}; use crate::external_api::params; use chrono::{Duration, Utc}; use nexus_test_utils::db::test_setup_database; - use omicron_common::api::external::{Error, IdentityMetadataCreateParams}; + use omicron_common::api::external::{ + ByteCount, Error, IdentityMetadataCreateParams, Name, + }; use omicron_test_utils::dev; + use std::collections::HashSet; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; use uuid::Uuid; #[tokio::test] @@ -2482,4 +2717,219 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + // Creates a test sled, returns its UUID. + async fn create_test_sled(datastore: &DataStore) -> Uuid { + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let sled_id = Uuid::new_v4(); + let sled = Sled::new(sled_id, bogus_addr.clone()); + datastore.sled_upsert(sled).await.unwrap(); + sled_id + } + + // Creates a test zpool, returns its UUID. + async fn create_test_zpool(datastore: &DataStore, sled_id: Uuid) -> Uuid { + let zpool_id = Uuid::new_v4(); + let zpool = Zpool::new( + zpool_id, + sled_id, + &crate::internal_api::params::ZpoolPutRequest { + size: ByteCount::from_gibibytes_u32(100), + }, + ); + datastore.zpool_upsert(zpool).await.unwrap(); + zpool_id + } + + fn create_test_disk_create_params( + name: &str, + size: ByteCount, + ) -> params::DiskCreate { + params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: Name::try_from(name.to_string()).unwrap(), + description: name.to_string(), + }, + snapshot_id: None, + size, + } + } + + #[tokio::test] + async fn test_region_allocation() { + let logctx = dev::test_setup_log("test_region_allocation"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD * 2; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk1", + ByteCount::from_mebibytes_u32(500), + ); + let disk1_id = Uuid::new_v4(); + let dataset_and_regions = + datastore.region_allocate(disk1_id, ¶ms).await.unwrap(); + + // Verify the allocation. + assert_eq!(REGION_REDUNDANCY_THRESHOLD, dataset_and_regions.len()); + let mut disk1_datasets = HashSet::new(); + for (dataset, region) in dataset_and_regions { + assert!(disk1_datasets.insert(dataset.id())); + assert_eq!(disk1_id, region.disk_id()); + assert_eq!(params.block_size(), region.block_size()); + assert_eq!(params.extent_size(), region.extent_size()); + assert_eq!(params.extent_count(), region.extent_count()); + } + + // Allocate regions for a second disk. Observe that we allocate from + // the three previously unused datasets. + let params = create_test_disk_create_params( + "disk2", + ByteCount::from_mebibytes_u32(500), + ); + let disk2_id = Uuid::new_v4(); + let dataset_and_regions = + datastore.region_allocate(disk2_id, ¶ms).await.unwrap(); + assert_eq!(REGION_REDUNDANCY_THRESHOLD, dataset_and_regions.len()); + let mut disk2_datasets = HashSet::new(); + for (dataset, region) in dataset_and_regions { + assert!(disk2_datasets.insert(dataset.id())); + assert_eq!(disk2_id, region.disk_id()); + assert_eq!(params.block_size(), region.block_size()); + assert_eq!(params.extent_size(), region.extent_size()); + assert_eq!(params.extent_count(), region.extent_count()); + } + + // Double-check that the datasets used for the first disk weren't + // used when allocating the second disk. + assert_eq!(0, disk1_datasets.intersection(&disk2_datasets).count()); + + let _ = db.cleanup().await; + } + + #[tokio::test] + async fn test_region_allocation_is_idempotent() { + let logctx = + dev::test_setup_log("test_region_allocation_is_idempotent"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk", + ByteCount::from_mebibytes_u32(500), + ); + let disk_id = Uuid::new_v4(); + let mut dataset_and_regions1 = + datastore.region_allocate(disk_id, ¶ms).await.unwrap(); + let mut dataset_and_regions2 = + datastore.region_allocate(disk_id, ¶ms).await.unwrap(); + + // Give them a consistent order so we can easily compare them. + let sort_vec = |v: &mut Vec<(Dataset, Region)>| { + v.sort_by(|(d1, r1), (d2, r2)| { + let order = d1.id().cmp(&d2.id()); + match order { + std::cmp::Ordering::Equal => r1.id().cmp(&r2.id()), + _ => order, + } + }); + }; + sort_vec(&mut dataset_and_regions1); + sort_vec(&mut dataset_and_regions2); + + // Validate that the two calls to allocate return the same data. + assert_eq!(dataset_and_regions1.len(), dataset_and_regions2.len()); + for i in 0..dataset_and_regions1.len() { + assert_eq!(dataset_and_regions1[i], dataset_and_regions2[i],); + } + + let _ = db.cleanup().await; + } + + #[tokio::test] + async fn test_region_allocation_not_enough_datasets() { + let logctx = + dev::test_setup_log("test_region_allocation_not_enough_datasets"); + let mut db = test_setup_database(&logctx.log).await; + let cfg = db::Config { url: db.pg_config().clone() }; + let pool = db::Pool::new(&cfg); + let datastore = DataStore::new(Arc::new(pool)); + + // Create a sled... + let sled_id = create_test_sled(&datastore).await; + + // ... and a zpool within that sled... + let zpool_id = create_test_zpool(&datastore, sled_id).await; + + // ... and datasets within that zpool. + let dataset_count = REGION_REDUNDANCY_THRESHOLD - 1; + let bogus_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let kind = + DatasetKind(crate::internal_api::params::DatasetKind::Crucible); + let dataset_ids: Vec = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + let dataset = Dataset::new(*id, zpool_id, bogus_addr, kind.clone()); + datastore.dataset_upsert(dataset).await.unwrap(); + } + + // Allocate regions from the datasets for this disk. + let params = create_test_disk_create_params( + "disk1", + ByteCount::from_mebibytes_u32(500), + ); + let disk1_id = Uuid::new_v4(); + let err = + datastore.region_allocate(disk1_id, ¶ms).await.unwrap_err(); + assert!(err + .to_string() + .contains("Not enough datasets for replicated allocation")); + + let _ = db.cleanup().await; + } } diff --git a/nexus/src/db/db-macros/src/lib.rs b/nexus/src/db/db-macros/src/lib.rs index 1bd49481468..3b29099c5dc 100644 --- a/nexus/src/db/db-macros/src/lib.rs +++ b/nexus/src/db/db-macros/src/lib.rs @@ -186,7 +186,7 @@ fn build_resource_identity( let identity_name = format_ident!("{}Identity", struct_name); quote! { #[doc = #identity_doc] - #[derive(Clone, Debug, Selectable, Queryable, Insertable)] + #[derive(Clone, Debug, PartialEq, Selectable, Queryable, Insertable, serde::Serialize, serde::Deserialize)] #[table_name = #table_name ] pub struct #identity_name { pub id: ::uuid::Uuid, @@ -225,7 +225,7 @@ fn build_asset_identity(struct_name: &Ident, table_name: &Lit) -> TokenStream { let identity_name = format_ident!("{}Identity", struct_name); quote! { #[doc = #identity_doc] - #[derive(Clone, Debug, Selectable, Queryable, Insertable)] + #[derive(Clone, Debug, PartialEq, Selectable, Queryable, Insertable, serde::Serialize, serde::Deserialize)] #[table_name = #table_name ] pub struct #identity_name { pub id: ::uuid::Uuid, diff --git a/nexus/src/db/model.rs b/nexus/src/db/model.rs index 87c5587e1c3..5b375cc37de 100644 --- a/nexus/src/db/model.rs +++ b/nexus/src/db/model.rs @@ -27,7 +27,7 @@ use parse_display::Display; use rand::{rngs::StdRng, SeedableRng}; use ref_cast::RefCast; use schemars::JsonSchema; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::net::SocketAddr; use uuid::Uuid; @@ -106,8 +106,9 @@ macro_rules! impl_enum_type { Ord, PartialOrd, RefCast, - Deserialize, JsonSchema, + Serialize, + Deserialize, )] #[sql_type = "sql_types::Text"] #[serde(transparent)] @@ -142,7 +143,9 @@ where } } -#[derive(Copy, Clone, Debug, AsExpression, FromSqlRow)] +#[derive( + Copy, Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, +)] #[sql_type = "sql_types::BigInt"] pub struct ByteCount(pub external::ByteCount); @@ -175,7 +178,17 @@ where } #[derive( - Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, AsExpression, FromSqlRow, + Copy, + Clone, + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + AsExpression, + FromSqlRow, + Serialize, + Deserialize, )] #[sql_type = "sql_types::BigInt"] #[repr(transparent)] @@ -504,11 +517,11 @@ impl DatastoreCollection for Zpool { } impl_enum_type!( - #[derive(SqlType, Debug)] + #[derive(SqlType, Debug, QueryId)] #[postgres(type_name = "dataset_kind", type_schema = "public")] pub struct DatasetKindEnum; - #[derive(Clone, Debug, AsExpression, FromSqlRow)] + #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] #[sql_type = "DatasetKindEnum"] pub struct DatasetKind(pub internal_api::params::DatasetKind); @@ -528,7 +541,17 @@ impl From for DatasetKind { /// /// A dataset represents a portion of a Zpool, which is then made /// available to a service on the Sled. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Asset, + Deserialize, + Serialize, + PartialEq, +)] #[table_name = "dataset"] pub struct Dataset { #[diesel(embed)] @@ -588,7 +611,17 @@ impl DatastoreCollection for Disk { /// /// A region represents a portion of a Crucible Downstairs dataset /// allocated within a volume. -#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Asset, + Serialize, + Deserialize, + PartialEq, +)] #[table_name = "region"] pub struct Region { #[diesel(embed)] @@ -602,6 +635,41 @@ pub struct Region { extent_count: i64, } +impl Region { + pub fn new( + dataset_id: Uuid, + disk_id: Uuid, + block_size: i64, + extent_size: i64, + extent_count: i64, + ) -> Self { + Self { + identity: RegionIdentity::new(Uuid::new_v4()), + dataset_id, + disk_id, + block_size, + extent_size, + extent_count, + } + } + + pub fn disk_id(&self) -> Uuid { + self.disk_id + } + pub fn dataset_id(&self) -> Uuid { + self.dataset_id + } + pub fn block_size(&self) -> u64 { + self.block_size as u64 + } + pub fn extent_size(&self) -> u64 { + self.extent_size as u64 + } + pub fn extent_count(&self) -> u64 { + self.extent_count as u64 + } +} + /// Describes an organization within the database. #[derive(Queryable, Insertable, Debug, Resource, Selectable)] #[table_name = "organization"] @@ -665,7 +733,7 @@ impl Project { pub fn new(organization_id: Uuid, params: params::ProjectCreate) -> Self { Self { identity: ProjectIdentity::new(Uuid::new_v4(), params.identity), - organization_id: organization_id, + organization_id, } } } @@ -851,7 +919,16 @@ where } /// A Disk (network block device). -#[derive(Queryable, Insertable, Clone, Debug, Selectable, Resource)] +#[derive( + Queryable, + Insertable, + Clone, + Debug, + Selectable, + Resource, + Serialize, + Deserialize, +)] #[table_name = "disk"] pub struct Disk { #[diesel(embed)] @@ -918,7 +995,16 @@ impl Into for Disk { } } -#[derive(AsChangeset, Clone, Debug, Queryable, Insertable, Selectable)] +#[derive( + AsChangeset, + Clone, + Debug, + Queryable, + Insertable, + Selectable, + Serialize, + Deserialize, +)] #[table_name = "disk"] // When "attach_instance_id" is set to None, we'd like to // clear it from the DB, rather than ignore the update. diff --git a/nexus/src/db/schema.rs b/nexus/src/db/schema.rs index 504543d4bb0..0067899e2be 100644 --- a/nexus/src/db/schema.rs +++ b/nexus/src/db/schema.rs @@ -324,6 +324,7 @@ table! { } allow_tables_to_appear_in_same_query!( + dataset, disk, instance, metric_producer, @@ -331,6 +332,7 @@ allow_tables_to_appear_in_same_query!( organization, oximeter, project, + region, saga, saga_node_event, console_session, @@ -343,4 +345,5 @@ allow_tables_to_appear_in_same_query!( user_builtin, role_builtin, role_assignment_builtin, + zpool, ); diff --git a/nexus/src/external_api/params.rs b/nexus/src/external_api/params.rs index bc3d6796a56..bbcb5acf56d 100644 --- a/nexus/src/external_api/params.rs +++ b/nexus/src/external_api/params.rs @@ -182,6 +182,20 @@ pub struct DiskCreate { pub size: ByteCount, } +impl DiskCreate { + pub fn block_size(&self) -> u64 { + 512 + } + + pub fn extent_size(&self) -> u64 { + 1 << 20 + } + + pub fn extent_count(&self) -> u64 { + (self.size.to_bytes() + self.extent_size() - 1) / self.extent_size() + } +} + /** * Parameters for the [`Disk`] to be attached or detached to an instance */ diff --git a/nexus/src/internal_api/params.rs b/nexus/src/internal_api/params.rs index 5d073cce5b2..1e4f420d676 100644 --- a/nexus/src/internal_api/params.rs +++ b/nexus/src/internal_api/params.rs @@ -33,7 +33,7 @@ pub struct ZpoolPutRequest { pub struct ZpoolPutResponse {} /// Describes the purpose of the dataset. -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, Copy, PartialEq)] pub enum DatasetKind { Crucible, Cockroach, diff --git a/nexus/src/nexus.rs b/nexus/src/nexus.rs index 7f35dd9178a..fec16feaf8a 100644 --- a/nexus/src/nexus.rs +++ b/nexus/src/nexus.rs @@ -228,6 +228,10 @@ impl Nexus { nexus_arc } + pub fn log(&self) -> &Logger { + &self.log + } + pub async fn wait_for_populate(&self) -> Result<(), anyhow::Error> { let mut my_rx = self.populate_status.clone(); loop { @@ -671,7 +675,7 @@ impl Nexus { } pub async fn project_create_disk( - &self, + self: &Arc, organization_name: &Name, project_name: &Name, params: ¶ms::DiskCreate, @@ -690,37 +694,23 @@ impl Nexus { }); } - let disk_id = Uuid::new_v4(); - let disk = db::model::Disk::new( - disk_id, - project.id(), - params.clone(), - db::model::DiskRuntimeState::new(), - ); - let disk_created = self.db_datastore.project_create_disk(disk).await?; - - // TODO: Here, we should ensure the disk is backed by appropriate - // regions. This is blocked behind actually having Crucible agents - // running in zones for dedicated zpools. - // - // TODO: Performing this operation, alongside "create" and "update - // state from create to detach", should be executed in a Saga. + let saga_params = Arc::new(sagas::ParamsDiskCreate { + project_id: project.id(), + create_params: params.clone(), + }); - /* - * This is a little hokey. We'd like to simulate an asynchronous - * transition from "Creating" to "Detached". For instances, the - * simulation lives in a simulated sled agent. Here, the analog might - * be a simulated storage control plane. But that doesn't exist yet, - * and we don't even know what APIs it would provide yet. So we just - * carry out the simplest possible "simulation" here: we'll return to - * the client a structure describing a disk in state "Creating", but by - * the time we do so, we've already updated the internal representation - * to "Created". - */ - self.db_datastore - .disk_update_runtime(&disk_id, &disk_created.runtime().detach()) + let saga_outputs = self + .execute_saga( + Arc::clone(&sagas::SAGA_DISK_CREATE_TEMPLATE), + sagas::SAGA_DISK_CREATE_NAME, + saga_params, + ) .await?; - + let disk_created = saga_outputs + .lookup_output::("created_disk") + .map_err(|e| Error::InternalError { + internal_message: e.to_string(), + })?; Ok(disk_created) } @@ -754,7 +744,7 @@ impl Nexus { } pub async fn project_delete_disk( - &self, + self: &Arc, opctx: &OpContext, organization_name: &Name, project_name: &Name, @@ -763,34 +753,26 @@ impl Nexus { let (disk, authz_disk) = self .project_lookup_disk(organization_name, project_name, disk_name) .await?; - let runtime = disk.runtime(); - bail_unless!(runtime.state().state() != &DiskState::Destroyed); - if runtime.state().is_attached() { - return Err(Error::InvalidRequest { - message: String::from("disk is attached"), - }); - } + // TODO: We need to sort out the authorization checks. + // + // Normally, this would be coupled alongside access to the + // datastore, but now that disk deletion exists within a Saga, + // this would require OpContext to be serialized (which is + // not trivial). + opctx.authorize(authz::Action::Query, authz::DATABASE).await?; + opctx.authorize(authz::Action::Delete, authz_disk).await?; + + let saga_params = + Arc::new(sagas::ParamsDiskDelete { disk_id: disk.id() }); + self.execute_saga( + Arc::clone(&sagas::SAGA_DISK_DELETE_TEMPLATE), + sagas::SAGA_DISK_DELETE_NAME, + saga_params, + ) + .await?; - /* - * TODO-correctness It's not clear how this handles the case where we - * begin this delete operation while some other request is ongoing to - * attach the disk. We won't be able to see that in the state here. We - * might be able to detect this when we go update the disk's state to - * Attaching (because a SQL UPDATE will update 0 rows), but we'd sort of - * already be in a bad state because the destroyed disk will be - * attaching (and eventually attached) on some sled, and if the wrong - * combination of components crash at this point, we could wind up not - * fixing that state. - * - * This is a consequence of the choice _not_ to record the Attaching - * state in the database before beginning the attach process. If we did - * that, we wouldn't have this problem, but we'd have a similar problem - * of dealing with the case of a crash after recording this state and - * before actually beginning the attach process. Sagas can maybe - * address that. - */ - self.db_datastore.project_delete_disk(opctx, authz_disk).await + Ok(()) } /* diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index 6f20243eb44..6793cb7656b 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -11,6 +11,7 @@ use crate::external_api::params; use crate::Nexus; use omicron_common::api::external::Error; use sled_agent_client::Client as SledAgentClient; +use slog::Logger; use std::fmt; use std::sync::Arc; use uuid::Uuid; @@ -35,6 +36,10 @@ impl SagaContext { SagaContext { nexus } } + pub fn log(&self) -> &Logger { + self.nexus.log() + } + /* * TODO-design This interface should not exist. Instead, sleds should be * represented in the database. Reservations will wind up writing to the diff --git a/nexus/src/sagas.rs b/nexus/src/sagas.rs index a1f284f5495..f4ce09904e1 100644 --- a/nexus/src/sagas.rs +++ b/nexus/src/sagas.rs @@ -14,11 +14,18 @@ */ use crate::db; -use crate::db::identity::Resource; +use crate::db::identity::{Asset, Resource}; use crate::external_api::params; use crate::saga_interface::SagaContext; +use anyhow::anyhow; use chrono::Utc; +use crucible_agent_client::{ + types::{CreateRegion, RegionId, State as RegionState}, + Client as CrucibleAgentClient, +}; +use futures::StreamExt; use lazy_static::lazy_static; +use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::external::IdentityMetadataCreateParams; use omicron_common::api::external::InstanceState; @@ -26,8 +33,10 @@ use omicron_common::api::external::Name; use omicron_common::api::external::NetworkInterface; use omicron_common::api::internal::nexus::InstanceRuntimeState; use omicron_common::api::internal::sled_agent::InstanceHardware; +use omicron_common::backoff::{self, BackoffError}; use serde::Deserialize; use serde::Serialize; +use slog::Logger; use std::collections::BTreeMap; use std::convert::TryFrom; use std::sync::Arc; @@ -45,9 +54,15 @@ use uuid::Uuid; * We'll need a richer mechanism for registering sagas, but this works for now. */ pub const SAGA_INSTANCE_CREATE_NAME: &'static str = "instance-create"; +pub const SAGA_DISK_CREATE_NAME: &'static str = "disk-create"; +pub const SAGA_DISK_DELETE_NAME: &'static str = "disk-delete"; lazy_static! { pub static ref SAGA_INSTANCE_CREATE_TEMPLATE: Arc> = Arc::new(saga_instance_create()); + pub static ref SAGA_DISK_CREATE_TEMPLATE: Arc> = + Arc::new(saga_disk_create()); + pub static ref SAGA_DISK_DELETE_TEMPLATE: Arc> = + Arc::new(saga_disk_delete()); } lazy_static! { @@ -57,11 +72,23 @@ lazy_static! { fn all_templates( ) -> BTreeMap<&'static str, Arc>>> { - vec![( - SAGA_INSTANCE_CREATE_NAME, - Arc::clone(&SAGA_INSTANCE_CREATE_TEMPLATE) - as Arc>>, - )] + vec![ + ( + SAGA_INSTANCE_CREATE_NAME, + Arc::clone(&SAGA_INSTANCE_CREATE_TEMPLATE) + as Arc>>, + ), + ( + SAGA_DISK_CREATE_NAME, + Arc::clone(&SAGA_DISK_CREATE_TEMPLATE) + as Arc>>, + ), + ( + SAGA_DISK_DELETE_NAME, + Arc::clone(&SAGA_DISK_DELETE_TEMPLATE) + as Arc>>, + ), + ] .into_iter() .collect() } @@ -308,3 +335,332 @@ async fn sic_instance_ensure( .map(|_| ()) .map_err(ActionError::action_failed) } + +#[derive(Debug, Deserialize, Serialize)] +pub struct ParamsDiskCreate { + pub project_id: Uuid, + pub create_params: params::DiskCreate, +} + +#[derive(Debug)] +pub struct SagaDiskCreate; +impl SagaType for SagaDiskCreate { + type SagaParamsType = Arc; + type ExecContextType = Arc; +} + +fn saga_disk_create() -> SagaTemplate { + let mut template_builder = SagaTemplateBuilder::new(); + + template_builder.append( + "disk_id", + "GenerateDiskId", + new_action_noop_undo(sdc_generate_uuid), + ); + + template_builder.append( + "created_disk", + "CreateDiskRecord", + ActionFunc::new_action( + sdc_create_disk_record, + sdc_create_disk_record_undo, + ), + ); + + template_builder.append( + "datasets_and_regions", + "AllocRegions", + ActionFunc::new_action(sdc_alloc_regions, sdc_alloc_regions_undo), + ); + + template_builder.append( + "regions_ensure", + "RegionsEnsure", + ActionFunc::new_action(sdc_regions_ensure, sdc_regions_ensure_undo), + ); + + template_builder.append( + "disk_runtime", + "FinalizeDiskRecord", + new_action_noop_undo(sdc_finalize_disk_record), + ); + + template_builder.build() +} + +async fn sdc_generate_uuid( + _: ActionContext, +) -> Result { + Ok(Uuid::new_v4()) +} + +async fn sdc_create_disk_record( + sagactx: ActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + let disk_id = sagactx.lookup::("disk_id")?; + + // NOTE: This could be done in a transaction alongside region allocation? + // + // Unclear if it's a problem to let this disk exist without any backing + // regions for a brief period of time, or if that's under the valid + // jurisdiction of "Creating". + let disk = db::model::Disk::new( + disk_id, + params.project_id, + params.create_params.clone(), + db::model::DiskRuntimeState::new(), + ); + let disk_created = osagactx + .datastore() + .project_create_disk(disk) + .await + .map_err(ActionError::action_failed)?; + Ok(disk_created) +} + +async fn sdc_create_disk_record_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let disk_id = sagactx.lookup::("disk_id")?; + osagactx.datastore().project_delete_disk_no_auth(&disk_id).await?; + Ok(()) +} + +async fn sdc_alloc_regions( + sagactx: ActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + let disk_id = sagactx.lookup::("disk_id")?; + // Ensure the disk is backed by appropriate regions. + // + // This allocates regions in the database, but the disk state is still + // "creating" - the respective Crucible Agents must be instructed to + // allocate the necessary regions before we can mark the disk as "ready to + // be used". + let datasets_and_regions = osagactx + .datastore() + .region_allocate(disk_id, ¶ms.create_params) + .await + .map_err(ActionError::action_failed)?; + Ok(datasets_and_regions) +} + +async fn sdc_alloc_regions_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let disk_id = sagactx.lookup::("disk_id")?; + osagactx.datastore().regions_hard_delete(disk_id).await?; + Ok(()) +} + +async fn ensure_region_in_dataset( + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, +) -> Result { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + let region_request = CreateRegion { + block_size: region.block_size(), + extent_count: region.extent_count(), + extent_size: region.extent_size(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + volume_id: region.disk_id().to_string(), + }; + + let create_region = || async { + let region = client + .region_create(®ion_request) + .await + .map_err(|e| BackoffError::Permanent(e))?; + match region.state { + RegionState::Requested => Err(BackoffError::Transient(anyhow!( + "Region creation in progress" + ))), + RegionState::Created => Ok(region), + _ => Err(BackoffError::Permanent(anyhow!( + "Failed to create region, unexpected state: {:?}", + region.state + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", delay + ); + }; + + let region = backoff::retry_notify( + backoff::internal_service_policy(), + create_region, + log_create_failure, + ) + .await?; + + Ok(region) +} + +async fn sdc_regions_ensure( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let datasets_and_regions = sagactx + .lookup::>( + "datasets_and_regions", + )?; + let request_count = datasets_and_regions.len(); + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + ensure_region_in_dataset(log, &dataset, ®ion).await + }) + // Execute the allocation requests concurrently. + .buffer_unordered(request_count) + .collect::>>() + .await + .into_iter() + .collect::, _>>() + .map_err(ActionError::action_failed)?; + + // TODO: Region has a port value, we could store this in the DB? + Ok(()) +} + +async fn delete_regions( + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Result<(), Error> { + let request_count = datasets_and_regions.len(); + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + let id = RegionId(region.id().to_string()); + client.region_delete(&id).await + }) + // Execute the allocation requests concurrently. + .buffer_unordered(request_count) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + Ok(()) +} + +async fn sdc_regions_ensure_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + let datasets_and_regions = sagactx + .lookup::>( + "datasets_and_regions", + )?; + delete_regions(datasets_and_regions).await?; + Ok(()) +} + +async fn sdc_finalize_disk_record( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let _params = sagactx.saga_params(); + + let disk_id = sagactx.lookup::("disk_id")?; + let disk_created = sagactx.lookup::("created_disk")?; + osagactx + .datastore() + .disk_update_runtime(&disk_id, &disk_created.runtime().detach()) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ParamsDiskDelete { + pub disk_id: Uuid, +} + +#[derive(Debug)] +pub struct SagaDiskDelete; +impl SagaType for SagaDiskDelete { + type SagaParamsType = Arc; + type ExecContextType = Arc; +} + +fn saga_disk_delete() -> SagaTemplate { + let mut template_builder = SagaTemplateBuilder::new(); + + template_builder.append( + "no_result", + "DeleteDiskRecord", + new_action_noop_undo(sdd_delete_disk_record), + ); + + template_builder.append( + "no_result", + "DeleteRegions", + new_action_noop_undo(sdd_delete_regions), + ); + + template_builder.append( + "no_result", + "DeleteRegionRecords", + new_action_noop_undo(sdd_delete_region_records), + ); + + template_builder.build() +} + +async fn sdd_delete_disk_record( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + osagactx + .datastore() + .project_delete_disk_no_auth(¶ms.disk_id) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdd_delete_regions( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + + let datasets_and_regions = osagactx + .datastore() + .get_allocated_regions(params.disk_id) + .await + .map_err(ActionError::action_failed)?; + delete_regions(datasets_and_regions) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdd_delete_region_records( + sagactx: ActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); + osagactx + .datastore() + .regions_hard_delete(params.disk_id) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 0788fab1054..fb7bc555e82 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -124,7 +124,7 @@ pub async fn test_setup_with_config( /* Set up a single sled agent. */ let sa_id = Uuid::parse_str(SLED_AGENT_UUID).unwrap(); - let sa = start_sled_agent( + let sled_agent = start_sled_agent( logctx.log.new(o!( "component" => "omicron_sled_agent::sim::Server", "sled_id" => sa_id.to_string(), @@ -160,7 +160,7 @@ pub async fn test_setup_with_config( internal_client: testctx_internal, database, clickhouse, - sled_agent: sa, + sled_agent, oximeter, producer, logctx, diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index a6f2d118d88..e2602359989 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -2,10 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -/*! - * Tests basic disk support in the API - */ +//! Tests basic disk support in the API +use crucible_agent_client::types::State as RegionState; use http::method::Method; use http::StatusCode; use omicron_common::api::external::ByteCount; @@ -16,6 +15,7 @@ use omicron_common::api::external::Instance; use omicron_common::api::external::InstanceCpuCount; use omicron_nexus::TestInterfaces as _; use omicron_nexus::{external_api::params, Nexus}; +use omicron_sled_agent::sim::SledAgent; use sled_agent_client::TestInterfaces as _; use std::sync::Arc; use uuid::Uuid; @@ -35,36 +35,103 @@ use nexus_test_utils::resource_helpers::create_project; use nexus_test_utils::ControlPlaneTestContext; use nexus_test_utils_macros::nexus_test; -/* - * TODO-cleanup the mess of URLs used here and in test_instances.rs ought to - * come from common code. - */ +const ORG_NAME: &str = "test-org"; +const PROJECT_NAME: &str = "springfield-squidport-disks"; +const DISK_NAME: &str = "just-rainsticks"; + +fn get_project_url() -> String { + format!("/organizations/{}/projects/{}", ORG_NAME, PROJECT_NAME) +} + +fn get_disks_url() -> String { + format!("{}/disks", get_project_url()) +} + +fn get_instances_url() -> String { + format!("{}/instances", get_project_url()) +} + +fn get_instance_disks_url(instance_name: &str) -> String { + format!("{}/{}/disks", get_instances_url(), instance_name) +} + +fn get_disk_attach_url(instance_name: &str) -> String { + format!("{}/attach", get_instance_disks_url(instance_name)) +} + +fn get_disk_detach_url(instance_name: &str) -> String { + format!("{}/detach", get_instance_disks_url(instance_name)) +} + +async fn create_org_and_project(client: &ClientTestContext) -> Uuid { + create_organization(&client, ORG_NAME).await; + let project = create_project(client, ORG_NAME, PROJECT_NAME).await; + project.identity.id +} + +struct DiskTest { + sled_agent: Arc, + zpool_id: Uuid, + zpool_size: ByteCount, + dataset_ids: Vec, + project_id: Uuid, +} + +impl DiskTest { + // Creates fake physical storage, an organization, and a project. + async fn new(cptestctx: &ControlPlaneTestContext) -> Self { + let client = &cptestctx.external_client; + let sled_agent = cptestctx.sled_agent.sled_agent.clone(); + + // Create a Zpool. + let zpool_id = Uuid::new_v4(); + let zpool_size = ByteCount::from_gibibytes_u32(10); + sled_agent.create_zpool(zpool_id, zpool_size.to_bytes()).await; + + // Create multiple Datasets within that Zpool. + let dataset_count = 3; + let dataset_ids: Vec<_> = + (0..dataset_count).map(|_| Uuid::new_v4()).collect(); + for id in &dataset_ids { + sled_agent.create_crucible_dataset(zpool_id, *id).await; + + // By default, regions are created immediately. + let crucible = sled_agent.get_crucible_dataset(zpool_id, *id).await; + crucible + .set_create_callback(Box::new(|_| RegionState::Created)) + .await; + } + + // Create a project for testing. + let project_id = create_org_and_project(&client).await; + + Self { sled_agent, zpool_id, zpool_size, dataset_ids, project_id } + } +} + #[nexus_test] -async fn test_disks(cptestctx: &ControlPlaneTestContext) { +async fn test_disk_not_found_before_creation( + cptestctx: &ControlPlaneTestContext, +) { let client = &cptestctx.external_client; - let apictx = &cptestctx.server.apictx; - let nexus = &apictx.nexus; - - /* Create a project for testing. */ - let org_name = "test-org"; - create_organization(&client, &org_name).await; - let project_name = "springfield-squidport-disks"; - let url_disks = - format!("/organizations/{}/projects/{}/disks", org_name, project_name); - let project = create_project(client, &org_name, &project_name).await; - - /* List disks. There aren't any yet. */ - let disks = disks_list(&client, &url_disks).await; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // List disks. There aren't any yet. + let disks = disks_list(&client, &disks_url).await; assert_eq!(disks.len(), 0); - /* Make sure we get a 404 if we fetch one. */ - let disk_url = format!("{}/just-rainsticks", url_disks); + // Make sure we get a 404 if we fetch one. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); let error = client .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) .await; - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); - /* We should also get a 404 if we delete one. */ + // We should also get a 404 if we delete one. let error = NexusRequest::new( RequestBuilder::new(client, Method::DELETE, &disk_url) .expect_status(Some(StatusCode::NOT_FOUND)), @@ -75,65 +142,239 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .expect("unexpected success") .parsed_body::() .unwrap(); - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_create_attach_detach_delete( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + let nexus = &cptestctx.server.apictx.nexus; + let disks_url = get_disks_url(); - /* Create a disk. */ + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); let new_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { - name: "just-rainsticks".parse().unwrap(), + name: DISK_NAME.parse().unwrap(), description: String::from("sells rainsticks"), }, snapshot_id: None, size: ByteCount::from_gibibytes_u32(1), }; - let disk: Disk = objects_post(&client, &url_disks, new_disk.clone()).await; - assert_eq!(disk.identity.name, "just-rainsticks"); + let disk: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + assert_eq!(disk.identity.name, DISK_NAME); assert_eq!(disk.identity.description, "sells rainsticks"); - assert_eq!(disk.project_id, project.identity.id); + assert_eq!(disk.project_id, test.project_id); assert_eq!(disk.snapshot_id, None); assert_eq!(disk.size.to_whole_mebibytes(), 1024); assert_eq!(disk.state, DiskState::Creating); - /* - * Fetch the disk and expect it to match what we just created except that - * the state will now be "Detached", as the server has simulated the create - * process. - */ + // Fetch the disk and expect it to match what we just created except that + // the state will now be "Detached", as the server has simulated the create + // process. let disk = disk_get(&client, &disk_url).await; - assert_eq!(disk.identity.name, "just-rainsticks"); + assert_eq!(disk.identity.name, DISK_NAME); assert_eq!(disk.identity.description, "sells rainsticks"); - assert_eq!(disk.project_id, project.identity.id); + assert_eq!(disk.project_id, test.project_id); assert_eq!(disk.snapshot_id, None); assert_eq!(disk.size.to_whole_mebibytes(), 1024); assert_eq!(disk.state, DiskState::Detached); - /* Attempt to create a second disk with a conflicting name. */ + // List disks again and expect to find the one we just created. + let disks = disks_list(&client, &disks_url).await; + assert_eq!(disks.len(), 1); + disks_eq(&disks[0], &disk); + + // Create an instance to attach the disk. + let url_instances = get_instances_url(); + let instance: Instance = objects_post( + &client, + &url_instances, + params::InstanceCreate { + identity: IdentityMetadataCreateParams { + name: "instance1".parse().unwrap(), + description: "instance1".to_string(), + }, + ncpus: InstanceCpuCount(4), + memory: ByteCount::from_mebibytes_u32(256), + hostname: "instance1".to_string(), + }, + ) + .await; + + // Verify that there are no disks attached to the instance, and specifically + // that our disk is not attached to this instance. + let url_instance_disks = + get_instance_disks_url(instance.identity.name.as_str()); + let disks = objects_list_page::(&client, &url_instance_disks).await; + assert_eq!(disks.items.len(), 0); + + let url_instance_attach_disk = + get_disk_attach_url(instance.identity.name.as_str()); + let url_instance_detach_disk = + get_disk_detach_url(instance.identity.name.as_str()); + + // Start attaching the disk to the instance. + let mut response = client + .make_request( + Method::POST, + &url_instance_attach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let attached_disk: Disk = read_json(&mut response).await; + let instance_id = &instance.identity.id; + assert_eq!(attached_disk.identity.name, disk.identity.name); + assert_eq!(attached_disk.identity.id, disk.identity.id); + assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); + + // Finish simulation of the attachment and verify the new state, both on the + // attachment and the disk itself. + disk_simulate(nexus, &disk.identity.id).await; + let attached_disk: Disk = disk_get(&client, &disk_url).await; + assert_eq!(attached_disk.identity.name, disk.identity.name); + assert_eq!(attached_disk.identity.id, disk.identity.id); + assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); + + // Attach the disk to the same instance. This should complete immediately + // with no state change. + client + .make_request( + Method::POST, + &url_instance_attach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); + + // Begin detaching the disk. + client + .make_request( + Method::POST, + &url_instance_detach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + let disk: Disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); + + // Finish the detachment. + disk_simulate(nexus, &disk.identity.id).await; + let disk = disk_get(&client, &disk_url).await; + assert_eq!(disk.state, DiskState::Detached); + + // Since detach is idempotent, we can detach it again. + client + .make_request( + Method::POST, + &url_instance_detach_disk, + Some(params::DiskIdentifier { disk: disk.identity.name.clone() }), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + + // A priveleged user should be able to delete the disk. + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // It should no longer be present in our list of disks. + assert_eq!(disks_list(&client, &disks_url).await.len(), 0); + + // We shouldn't find it if we request it explicitly. + let error = client + .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) + .await; + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_create_disk_that_already_exists_fails( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let disk = disk_get(&client, &disk_url).await; + + // Attempt to create a second disk with a conflicting name. let error = client .make_request_error_body( Method::POST, - &url_disks, + &disks_url, new_disk, StatusCode::BAD_REQUEST, ) .await; - assert_eq!(error.message, "already exists: disk \"just-rainsticks\""); + assert_eq!( + error.message, + format!("already exists: disk \"{}\"", DISK_NAME) + ); - /* List disks again and expect to find the one we just created. */ - let disks = disks_list(&client, &url_disks).await; + // List disks again and expect to find the one we just created. + let disks = disks_list(&client, &disks_url).await; assert_eq!(disks.len(), 1); disks_eq(&disks[0], &disk); +} - /* Create an instance to attach the disk. */ - let url_instances = format!( - "/organizations/{}/projects/{}/instances", - org_name, project_name - ); +#[nexus_test] +async fn test_disk_move_between_instances(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let disk: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + + // Create an instance to attach the disk. + let url_instances = get_instances_url(); let instance: Instance = objects_post( &client, &url_instances, params::InstanceCreate { identity: IdentityMetadataCreateParams { - name: "just-rainsticks".parse().unwrap(), + name: DISK_NAME.parse().unwrap(), description: String::from("sells rainsticks"), }, ncpus: InstanceCpuCount(4), @@ -143,33 +384,19 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { ) .await; - /* - * Verify that there are no disks attached to the instance, and specifically - * that our disk is not attached to this instance. - */ - let url_instance_disks = format!( - "/organizations/{}/projects/{}/instances/{}/disks", - org_name, - project_name, - instance.identity.name.as_str() - ); + // Verify that there are no disks attached to the instance, and specifically + // that our disk is not attached to this instance. + let url_instance_disks = + get_instance_disks_url(instance.identity.name.as_str()); let disks = objects_list_page::(&client, &url_instance_disks).await; assert_eq!(disks.items.len(), 0); - let url_instance_attach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/attach", - org_name, - project_name, - instance.identity.name.as_str(), - ); - let url_instance_detach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/detach", - org_name, - project_name, - instance.identity.name.as_str(), - ); + let url_instance_attach_disk = + get_disk_attach_url(instance.identity.name.as_str()); + let url_instance_detach_disk = + get_disk_detach_url(instance.identity.name.as_str()); - /* Start attaching the disk to the instance. */ + // Start attaching the disk to the instance. let mut response = client .make_request( Method::POST, @@ -185,20 +412,16 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attaching(instance_id.clone())); - /* - * Finish simulation of the attachment and verify the new state, both on the - * attachment and the disk itself. - */ + // Finish simulation of the attachment and verify the new state, both on the + // attachment and the disk itself. disk_simulate(nexus, &disk.identity.id).await; let attached_disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(attached_disk.identity.name, disk.identity.name); assert_eq!(attached_disk.identity.id, disk.identity.id); assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); - /* - * Attach the disk to the same instance. This should complete immediately - * with no state change. - */ + // Attach the disk to the same instance. This should complete immediately + // with no state change. client .make_request( Method::POST, @@ -211,10 +434,8 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attached(instance_id.clone())); - /* - * Create a second instance and try to attach the disk to that. This should - * fail and the disk should remain attached to the first instance. - */ + // Create a second instance and try to attach the disk to that. This should + // fail and the disk should remain attached to the first instance. let instance2: Instance = objects_post( &client, &url_instances, @@ -229,18 +450,10 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { }, ) .await; - let url_instance2_attach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/attach", - org_name, - project_name, - instance2.identity.name.as_str(), - ); - let url_instance2_detach_disk = format!( - "/organizations/{}/projects/{}/instances/{}/disks/detach", - org_name, - project_name, - instance2.identity.name.as_str(), - ); + let url_instance2_attach_disk = + get_disk_attach_url(instance2.identity.name.as_str()); + let url_instance2_detach_disk = + get_disk_detach_url(instance2.identity.name.as_str()); let error = client .make_request_error_body( Method::POST, @@ -251,16 +464,17 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); let attached_disk = disk_get(&client, &disk_url).await; assert_eq!(attached_disk.state, DiskState::Attached(instance_id.clone())); - /* - * Begin detaching the disk. - */ + // Begin detaching the disk. client .make_request( Method::POST, @@ -273,7 +487,7 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - /* It's still illegal to attach this disk elsewhere. */ + // It's still illegal to attach this disk elsewhere. let error = client .make_request_error_body( Method::POST, @@ -284,11 +498,14 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* It's even illegal to attach this disk back to the same instance. */ + // It's even illegal to attach this disk back to the same instance. let error = client .make_request_error_body( Method::POST, @@ -297,14 +514,17 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { StatusCode::BAD_REQUEST, ) .await; - /* TODO-debug the error message here is misleading. */ + // TODO-debug the error message here is misleading. assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* However, there's no problem attempting to detach it again. */ + // However, there's no problem attempting to detach it again. client .make_request( Method::POST, @@ -317,12 +537,12 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance_id.clone())); - /* Finish the detachment. */ + // Finish the detachment. disk_simulate(nexus, &disk.identity.id).await; let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); - /* Since delete is idempotent, we can detach it again -- from either one. */ + // Since delete is idempotent, we can detach it again -- from either one. client .make_request( Method::POST, @@ -342,7 +562,7 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await .unwrap(); - /* Now, start attaching it again to the second instance. */ + // Now, start attaching it again to the second instance. let mut response = client .make_request( Method::POST, @@ -361,10 +581,8 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); - /* - * At this point, it's not legal to attempt to attach it to a different - * instance (the first one). - */ + // At this point, it's not legal to attempt to attach it to a different + // instance (the first one). let error = client .make_request_error_body( Method::POST, @@ -375,11 +593,14 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .await; assert_eq!( error.message, - "cannot attach disk \"just-rainsticks\": disk is attached to another \ - instance" + format!( + "cannot attach disk \"{}\": disk is attached to another \ + instance", + DISK_NAME + ) ); - /* It's fine to attempt another attachment to the same instance. */ + // It's fine to attempt another attachment to the same instance. client .make_request( Method::POST, @@ -392,13 +613,20 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Attaching(instance2_id.clone())); - /* It's not allowed to delete a disk that's attaching. */ - let error = client - .make_request_error(Method::DELETE, &disk_url, StatusCode::BAD_REQUEST) - .await; - assert_eq!(error.message, "disk is attached"); + // It's not allowed to delete a disk that's attaching. + let error = NexusRequest::new( + RequestBuilder::new(client, Method::DELETE, &disk_url) + .expect_status(Some(StatusCode::BAD_REQUEST)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("expected request to fail") + .parsed_body::() + .expect("cannot parse"); + assert_eq!(error.message, "disk cannot be deleted in state \"attaching\""); - /* Now, begin a detach while the disk is still being attached. */ + // Now, begin a detach while the disk is still being attached. client .make_request( Method::POST, @@ -411,21 +639,65 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { let disk: Disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detaching(instance2_id.clone())); - /* It's not allowed to delete a disk that's detaching, either. */ - let error = client - .make_request_error(Method::DELETE, &disk_url, StatusCode::BAD_REQUEST) - .await; - assert_eq!(error.message, "disk is attached"); + // It's not allowed to delete a disk that's detaching, either. + let error = NexusRequest::new( + RequestBuilder::new(client, Method::DELETE, &disk_url) + .expect_status(Some(StatusCode::BAD_REQUEST)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("expected request to fail") + .parsed_body::() + .expect("cannot parse"); + assert_eq!(error.message, "disk cannot be deleted in state \"detaching\""); - /* Finish detachment. */ + // Finish detachment. disk_simulate(nexus, &disk.identity.id).await; let disk = disk_get(&client, &disk_url).await; assert_eq!(disk.state, DiskState::Detached); - /* - * If we're not authenticated, or authenticated as an unprivileged user, we - * shouldn't be able to delete this disk. - */ + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + + // It should no longer be present in our list of disks. + assert_eq!(disks_list(&client, &disks_url).await.len(), 0); + + // We shouldn't find it if we request it explicitly. + let error = client + .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) + .await; + assert_eq!( + error.message, + format!("not found: disk with name \"{}\"", DISK_NAME) + ); +} + +#[nexus_test] +async fn test_disk_deletion_requires_authentication( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Create a disk. + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; + + // If we're not authenticated, or authenticated as an unprivileged user, we + // shouldn't be able to delete this disk. NexusRequest::new( RequestBuilder::new(client, Method::DELETE, &disk_url) .expect_status(Some(StatusCode::UNAUTHORIZED)), @@ -446,14 +718,122 @@ async fn test_disks(cptestctx: &ControlPlaneTestContext) { .execute() .await .expect("failed to delete disk"); +} - /* It should no longer be present in our list of disks. */ - assert_eq!(disks_list(&client, &url_disks).await.len(), 0); - /* We shouldn't find it if we request it explicitly. */ - let error = client - .make_request_error(Method::GET, &disk_url, StatusCode::NOT_FOUND) +#[nexus_test] +async fn test_disk_creation_region_requested_then_started( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + let disks_url = get_disks_url(); + + // Before we create a disk, set the response from the Crucible Agent: + // no matter what regions get requested, they'll always *start* as + // "Requested", and transition to "Created" on the second call. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + let called = std::sync::atomic::AtomicBool::new(false); + crucible + .set_create_callback(Box::new(move |_| { + if !called.load(std::sync::atomic::Ordering::SeqCst) { + called.store(true, std::sync::atomic::Ordering::SeqCst); + RegionState::Requested + } else { + RegionState::Created + } + })) + .await; + } + + // The disk is created successfully, even when this "requested" -> "started" + // transition occurs. + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: ByteCount::from_gibibytes_u32(1), + }; + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; +} + +// Tests that region allocation failure causes disk allocation to fail. +#[nexus_test] +async fn test_disk_region_creation_failure( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let test = DiskTest::new(&cptestctx).await; + + // Before we create a disk, set the response from the Crucible Agent: + // no matter what regions get requested, they'll always fail. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + crucible.set_create_callback(Box::new(|_| RegionState::Failed)).await; + } + + let disk_size = ByteCount::from_gibibytes_u32(3); + let dataset_count = test.dataset_ids.len() as u64; + assert!( + disk_size.to_bytes() * dataset_count < test.zpool_size.to_bytes(), + "Disk size too big for Zpool size" + ); + assert!( + 2 * disk_size.to_bytes() * dataset_count > test.zpool_size.to_bytes(), + "(test constraint) Zpool needs to be smaller (to store only one disk)", + ); + + // Attempt to allocate the disk, observe a server error. + let disks_url = get_disks_url(); + let new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + snapshot_id: None, + size: disk_size, + }; + + // Unfortunately, the error message is only posted internally to the + // logs, and it not returned to the client. + // + // TODO: Maybe consider making this a more informative error? + // How should we propagate this to the client? + client + .make_request_error_body( + Method::POST, + &disks_url, + new_disk.clone(), + StatusCode::INTERNAL_SERVER_ERROR, + ) .await; - assert_eq!(error.message, "not found: disk with name \"just-rainsticks\""); + + // After the failed allocation, the disk should not exist. + let disks = disks_list(&client, &disks_url).await; + assert_eq!(disks.len(), 0); + + // After the failed allocation, regions will exist, but be "Failed". + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + let regions = crucible.list().await; + assert_eq!(regions.len(), 1); + assert_eq!(regions[0].state, RegionState::Failed); + } + + // Validate that the underlying regions were released as a part of + // unwinding the failed disk allocation, by performing another disk + // allocation that should succeed. + for id in &test.dataset_ids { + let crucible = + test.sled_agent.get_crucible_dataset(test.zpool_id, *id).await; + crucible.set_create_callback(Box::new(|_| RegionState::Created)).await; + } + let _: Disk = objects_post(&client, &disks_url, new_disk.clone()).await; } async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { diff --git a/oximeter-client/Cargo.toml b/oximeter-client/Cargo.toml index d2165b95f83..2ee69f74aeb 100644 --- a/oximeter-client/Cargo.toml +++ b/oximeter-client/Cargo.toml @@ -8,6 +8,7 @@ license = "MPL-2.0" anyhow = "1.0" progenitor = { git = "https://github.com/oxidecomputer/progenitor" } reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } +schemars = { version = "0.8" } percent-encoding = "2.1.0" [dependencies.chrono] diff --git a/oximeter/producer/examples/producer.rs b/oximeter/producer/examples/producer.rs index ef20bb87623..c137f533bcb 100644 --- a/oximeter/producer/examples/producer.rs +++ b/oximeter/producer/examples/producer.rs @@ -86,8 +86,11 @@ impl Producer for CpuBusyProducer { #[tokio::main] async fn main() { let address = "[::1]:0".parse().unwrap(); - let dropshot_config = - ConfigDropshot { bind_address: address, request_body_max_bytes: 2048 }; + let dropshot_config = ConfigDropshot { + bind_address: address, + request_body_max_bytes: 2048, + tls: None, + }; let logging_config = ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Debug }; let server_info = ProducerEndpoint { diff --git a/oximeter/producer/src/lib.rs b/oximeter/producer/src/lib.rs index 29c8b3818cc..f7cb59e16f1 100644 --- a/oximeter/producer/src/lib.rs +++ b/oximeter/producer/src/lib.rs @@ -180,6 +180,7 @@ pub async fn register( client .cpapi_producers_post(&server_info.into()) .await + .map(|_| ()) .map_err(|msg| Error::RegistrationError(msg.to_string())) } diff --git a/package/Cargo.toml b/package/Cargo.toml index 83fe8e4df19..9a256c84e78 100644 --- a/package/Cargo.toml +++ b/package/Cargo.toml @@ -6,7 +6,9 @@ license = "MPL-2.0" [dependencies] anyhow = "1.0" +crossbeam = "0.8" omicron-common = { path = "../common" } + # We depend on the propolis-server here -- a binary, not a library -- to # make it visible to the packaging tool, which can compile it and shove # it in a tarball. @@ -28,3 +30,7 @@ walkdir = "2.3" [[bin]] name = "omicron-package" doc = false + +[[bin]] +name = "thing-flinger" +doc = false diff --git a/package/README.adoc b/package/README.adoc new file mode 100644 index 00000000000..eaf17ca782a --- /dev/null +++ b/package/README.adoc @@ -0,0 +1,142 @@ +Omicron is a complex piece of software consisting of many build and install-time dependencies. It's +intended to run primarily on illumos based systems, and as such is built to use runtime facilities +of illumos, such as https://illumos.org/man/5/smf[SMF]. Furthermore, Omicron is fundamentally a +distributed system, with its components intended to run on multiple servers communicating over the +network. In order to secure the system, certain cryptographic primitives, such as asymmetric key +pairs and shared secrets are required. Due to the nature of these cryptographic primitives, there is +a requirement for the distribution or creation of files unique to a specific server, such that no +other server has access to those files. Examples of this are private keys, and threshold key +shares, although other non-cryptographic unique files may also become necessary over time. + +In order to satisfy the above requirements of building and deploying a complex distributed system +consisting of unique, private files, two CLI tools have been created: + + . link:src/bin/omicron-package.rs[omicron-package] - build, package, install on local machine + . link:src/bin/thing-flinger.rs[thing-flinger] - build, package, deploy to remote machines + + +If a user is working on their local illumos based machine, and only wants to run +omicron in single node mode, they should follow the install instruction in +the link:../README.adoc[Omicron README] and use `omicron-package`. If the user +wishes for a more complete workflow, where they can code on their local laptop, +use a remote build machine, and install to multiple machines for a more realistic +deployment, they should use `thing-flinger`. + +The remainder of this document will describe a typical workflow for using +thing-flinger, pointing out room for improvement. + +== Environment and Configuration + + + +------------------+ +------------------+ + | | | | + | | | | + | Client |----------------> Builder | + | | | | + | | | | + +------------------+ +------------------+ + | + | + | + | + +---------------------------+--------------------------+ + | | | + | | | + | | | + +--------v---------+ +---------v--------+ +---------v--------+ + | | | | | | + | | | | | | + | Deployed Server | | Deployed Server | | Deployed Server | + | | | | | | + | | | | | | + +------------------+ +------------------+ +------------------+ + + +`thing-flinger` defines three types of nodes: + + * Client - Where a user typically edits their code and runs thing-flinger. This can run any OS. + * Builder - A Helios box where Omicron is built and packaged + * Deployed Server - Helios machines where Omicron will be installed and run + +It's not at all necessary for these to be separate nodes. For example, a client and builder can be +the same machine, as long as it's a Helios box. Same goes for Builder and a deployment server. The +benefit of this separation though, is that it allows editing on something like a laptop, without +having to worry about setting up a development environment on an illumos based host. + +Machine topology is configured in a `TOML` file that is passed on the command line. All illumos +machines are listed under `servers`, and just the names are used for configuring a builder and +deployment servers. An link:src/bin/deployment-example.toml[example] is provided. + +Thing flinger works over SSH, and so the user must have the public key of their client configured +for their account on all servers. SSH agent forwarding is used to prevent the need for the keys of +the builder to also be on the other servers, thus minimizing needed server configuration. + +== Typical Workflow + +=== Prerequisites + +Ensure you have an account on all illumos boxes, with the client public key in +`~/.ssh/authorized_keys`. + +.The build machine must have Rust and cargo installed, as well as +all the dependencies for Omicron installed. Following the *prerequisites* in the +https://github.com/oxidecomputer/omicron/#build-and-run[Build and run] section of the main Omicron +README is probably a good idea. + +=== Command Based Workflow + +==== Build thing-flinger on client +`thing-flinger` is part of the `omicron-package` crate. + +`cargo build -p omicron-package` + +==== sync +Copy your source code to the builder. Note that this copies over your `.git` subdirectory on purpose so +that a branch can be configured for building with the `git_treeish` field in the toml `builder` +table. + +`./target/debug/thing-flinger -c sync` + +==== build-minimal +Build necessary parts of omicron on the builder, required for future use by thing-flinger. + +`./target/debug/thing-flinger -c build-minimal` + +==== package +Build and package omicron using `omicron-package` on the builder. + +`./target/debug/thing-flinger -c package` + +==== overlay +Create files that are unique to each deployment server. + +`./target/debug/thing-flinger -c overlay` + +==== install +Install omicron to all machines, in parallel. This consists of copying the packaged omicron tarballs +along with overlay files, and omicron-package and its manifest to a `staging` directory on each +deployment server, and then running omicron-package, installing overlay files, and restarting +services. + +`./target/debug/thing-flinger -c install` + +=== Current Limitations + +`thing-flinger` is an early prototype. It has served so far to demonstrate that unique files, +specifically secret shares, can be created and distributed over ssh, and that omicron can be +installed remotely using `omicron-package`. It is not currently complete enough to fully test a +distributed omicron setup, as the underlying dependencies are not configured yet. Specifically, +`CockroachDB` and perhaps `Clickhouse`, need to be configured to run in multiple server mode. It's +anticipated that the `overlay` feature of `thing-flinger` can be used to generate and distribute +configs for this. + +=== Design rationale + +`thing-flinger` is a command line program written in rust. It was written this way to build upon +`omicron-package`, which is also in rust, as that is our default language of choice at Oxide. +`thing-flinger` is based around SSH, as that is the minimal viable requirement for a test tool such +as this. Additionally, it provides for the most straightforward implementation, and takes the least +effort to use securely. This particular implementation wraps the openssh ssh client via +`std::process::Command`, rather than using the `ssh2` crate, because ssh2, as a wrapper around +`libssh`, does not support agent-forwarding. + diff --git a/package/src/bin/deployment-example.toml b/package/src/bin/deployment-example.toml new file mode 100644 index 00000000000..62296fa4d0f --- /dev/null +++ b/package/src/bin/deployment-example.toml @@ -0,0 +1,46 @@ +# This manifest describes the servers that omicron will be installed to, along +# with any ancillary information specific to a given server. +# +# It is ingested by the `thing-flinger` tool. + +# This must be an absolute path +local_source = "/Users/ajs/oxide/omicron" + +[builder] +# `server` must refer to one of the `servers` in the servers table +server = "atrium" + +# This must be an absolute path +omicron_path = "/home/andrew/oxide/omicron" + +# Git branch, sha, etc... +git_treeish = "thing-flinger2" + +[deployment] +servers = ["sock", "buskin"] +rack_secret_threshold = 2 + +# Location where files to install will be placed before running +# `omicron-package install` +# +# This must be an absolute path +# We specifically allow for $HOME in validating the absolute path +staging_dir = "$HOME/omicron_staging" + +[servers.tarz] +username = "ajs" +addr = "tarz.local" + +[servers.atrium] +username = "andrew" +addr = "atrium.eng.oxide.computer" + +[servers.sock] +username = "andrew" +addr = "sock.eng.oxide.computer" + +[servers.buskin] +username = "andrew" +addr = "buskin.eng.oxide.computer" + + diff --git a/package/src/bin/omicron-package.rs b/package/src/bin/omicron-package.rs index 10f1b64bb52..f9131400dba 100644 --- a/package/src/bin/omicron-package.rs +++ b/package/src/bin/omicron-package.rs @@ -9,6 +9,7 @@ use omicron_common::packaging::sha256_digest; use anyhow::{anyhow, bail, Context, Result}; +use omicron_package::{parse, SubCommand}; use rayon::prelude::*; use reqwest; use serde_derive::Deserialize; @@ -19,7 +20,6 @@ use std::path::{Path, PathBuf}; use std::process::Command; use structopt::StructOpt; use tar::Builder; -use thiserror::Error; use tokio::io::AsyncWriteExt; const S3_BUCKET: &str = "https://oxide-omicron-build.s3.amazonaws.com"; @@ -29,59 +29,6 @@ const PKG: &str = "pkg"; // Name for the directory component where downloaded blobs are stored. const BLOB: &str = "blob"; -#[derive(Debug, StructOpt)] -enum SubCommand { - /// Builds the packages specified in a manifest, and places them into a target - /// directory. - Package { - /// The output directory, where artifacts should be placed. - /// - /// Defaults to "out". - #[structopt(long = "out", default_value = "out")] - artifact_dir: PathBuf, - - /// The binary profile to package. - /// - /// True: release, False: debug (default). - #[structopt( - short, - long, - help = "True if bundling release-mode binaries" - )] - release: bool, - }, - /// Checks the packages specified in a manifest, without building. - Check, - /// Installs the packages to a target machine. - Install { - /// The directory from which artifacts will be pulled. - /// - /// Should match the format from the Package subcommand. - #[structopt(long = "in", default_value = "out")] - artifact_dir: PathBuf, - - /// The directory to which artifacts will be installed. - /// - /// Defaults to "/opt/oxide". - #[structopt(long = "out", default_value = "/opt/oxide")] - install_dir: PathBuf, - }, - /// Removes the packages from the target machine. - Uninstall { - /// The directory from which artifacts were be pulled. - /// - /// Should match the format from the Package subcommand. - #[structopt(long = "in", default_value = "out")] - artifact_dir: PathBuf, - - /// The directory to which artifacts were installed. - /// - /// Defaults to "/opt/oxide". - #[structopt(long = "out", default_value = "/opt/oxide")] - install_dir: PathBuf, - }, -} - #[derive(Debug, StructOpt)] #[structopt(name = "packaging tool")] struct Args { @@ -121,15 +68,6 @@ fn run_cargo_on_package( Ok(()) } -/// Errors which may be returned when parsing the server configuration. -#[derive(Error, Debug)] -enum ParseError { - #[error("Cannot parse toml: {0}")] - Toml(#[from] toml::de::Error), - #[error("IO error: {0}")] - Io(#[from] std::io::Error), -} - #[derive(Deserialize, Debug)] #[serde(rename_all = "lowercase")] enum Build { @@ -200,12 +138,6 @@ struct Config { packages: BTreeMap, } -fn parse>(path: P) -> Result { - let contents = std::fs::read_to_string(path.as_ref())?; - let cfg = toml::from_str::(&contents)?; - Ok(cfg) -} - async fn do_check(config: &Config) -> Result<()> { for (package_name, package) in &config.packages { println!("Checking {}", package_name); @@ -356,6 +288,11 @@ fn do_install( anyhow!("Cannot create installation directory: {}", err) })?; + println!( + "Copying digest.toml from {} to {}", + artifact_dir.to_string_lossy(), + install_dir.to_string_lossy() + ); // Move the digest of expected packages. std::fs::copy( artifact_dir.join("digest.toml"), @@ -454,7 +391,7 @@ fn do_uninstall( #[tokio::main] async fn main() -> Result<()> { let args = Args::from_args_safe().map_err(|err| anyhow!(err))?; - let config = parse(&args.manifest)?; + let config = parse::<_, Config>(&args.manifest)?; // Use a CWD that is the root of the Omicron repository. if let Ok(manifest) = env::var("CARGO_MANIFEST_DIR") { diff --git a/package/src/bin/thing-flinger.rs b/package/src/bin/thing-flinger.rs new file mode 100644 index 00000000000..284a59226d3 --- /dev/null +++ b/package/src/bin/thing-flinger.rs @@ -0,0 +1,634 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utility for deploying Omicron to remote machines + +use omicron_package::{parse, SubCommand as PackageSubCommand}; + +use std::collections::{BTreeMap, BTreeSet}; +use std::fmt::Write; +use std::path::{Path, PathBuf}; +use std::process::Command; + +use anyhow::{anyhow, Context, Result}; +use crossbeam::thread::{self, ScopedJoinHandle}; +use serde_derive::Deserialize; +use structopt::StructOpt; +use thiserror::Error; + +#[derive(Deserialize, Debug)] +struct Builder { + pub server: String, + pub omicron_path: PathBuf, + pub git_treeish: String, +} + +// A server on which an omicron package is deployed +#[derive(Deserialize, Debug)] +struct Server { + pub username: String, + pub addr: String, +} + +#[derive(Deserialize, Debug)] +struct Deployment { + pub servers: BTreeSet, + pub rack_secret_threshold: usize, + pub staging_dir: PathBuf, +} + +#[derive(Debug, Deserialize)] +struct Config { + pub local_source: PathBuf, + pub builder: Builder, + pub servers: BTreeMap, + pub deployment: Deployment, +} + +fn parse_into_set(src: &str) -> BTreeSet { + src.split_whitespace().map(|s| s.to_owned()).collect() +} + +#[derive(Debug, StructOpt)] +enum SubCommand { + /// Run the given command on the given servers, or all servers if none are + /// given. + /// + /// Be careful! + Exec { + /// The command to run + #[structopt(short, long)] + cmd: String, + + /// The servers to run the command on + #[structopt(short, long, parse(from_str = parse_into_set))] + servers: Option>, + }, + + /// Sync our local source to the build host + Sync, + + /// Build omicron-package and everything needed to run thing-flinger + /// commands on the build host. + /// + /// Package always builds everything, but it can be set in release mode, and + /// we expect the existing tools to run from 'target/debug'. Additionally, + // you can't run `Package` until you have actually built `omicron-package`, + // which `BuildMinimal` does. + BuildMinimal, + + /// Use all subcommands from omicron-package + #[structopt(flatten)] + Package(PackageSubCommand), + + /// Create an overlay directory tree for each deployment server + /// + /// Each directory tree contains unique files for the given server that will + /// be populated in the svc/pkg dir. + /// + /// This is a separate subcommand so that we can reconstruct overlays + /// without rebuilding or repackaging. + Overlay, +} + +#[derive(Debug, StructOpt)] +#[structopt(name = "thing-flinger")] +struct Args { + /// The path to the deployment manifest TOML file + #[structopt(short, long, help = "Path to deployment manifest toml file")] + config: PathBuf, + + #[structopt(subcommand)] + subcommand: SubCommand, +} + +/// Errors which can be returned when executing subcommands +#[derive(Error, Debug)] +enum FlingError { + #[error("Servers not listed in configuration: {0:?}")] + InvalidServers(Vec), + + /// The parameter should be the name of the argument that could not be + /// properly converted to a string. + #[error("{0} is not valid UTF-8")] + BadString(String), + + /// Failed to rsync omicron to build host + #[error("Failed to sync {src} with {dst}")] + FailedSync { src: String, dst: String }, + + /// The given path must be absolute + #[error("Path for {field} must be absolute")] + NotAbsolutePath { field: &'static str }, +} + +// TODO: run in parallel when that option is given +fn do_exec( + config: &Config, + cmd: String, + servers: Option>, +) -> Result<()> { + if let Some(ref servers) = servers { + validate_servers(servers, &config.servers)?; + + for name in servers { + let server = &config.servers[name]; + ssh_exec(&server, &cmd, false)?; + } + } else { + for (_, server) in config.servers.iter() { + ssh_exec(&server, &cmd, false)?; + } + } + Ok(()) +} + +fn do_sync(config: &Config) -> Result<()> { + let server = + config.servers.get(&config.builder.server).ok_or_else(|| { + FlingError::InvalidServers(vec![config.builder.server.clone()]) + })?; + + // For rsync to copy from the source appropriately we must guarantee a + // trailing slash. + let src = + format!("{}/", config.local_source.canonicalize()?.to_string_lossy()); + let dst = format!( + "{}@{}:{}", + server.username, + server.addr, + config.builder.omicron_path.to_str().unwrap() + ); + let mut cmd = Command::new("rsync"); + cmd.arg("-az") + .arg("-e") + .arg("ssh") + .arg("--delete") + .arg("--progress") + .arg("--exclude") + .arg("target/") + .arg("--exclude") + .arg("out/") + .arg("--exclude") + .arg("cockroachdb/") + .arg("--exclude") + .arg("clickhouse/") + .arg("--exclude") + .arg("*.swp") + .arg("--out-format") + .arg("File changed: %o %t %f") + .arg(&src) + .arg(&dst); + let status = + cmd.status().context(format!("Failed to run command: ({:?})", cmd))?; + if !status.success() { + return Err(FlingError::FailedSync { src, dst }.into()); + } + + Ok(()) +} + +// Build omicron-package and omicron-sled-agent on the builder +// +// We need to build omicron-sled-agent for overlay file generation +fn do_build_minimal(config: &Config) -> Result<()> { + let server = &config.servers[&config.builder.server]; + let cmd = format!( + "cd {} && git checkout {} && cargo build -p {} -p {}", + config.builder.omicron_path.to_string_lossy(), + config.builder.git_treeish, + "omicron-package", + "omicron-sled-agent" + ); + ssh_exec(&server, &cmd, false) +} + +fn do_package( + config: &Config, + artifact_dir: PathBuf, + release: bool, +) -> Result<()> { + let server = &config.servers[&config.builder.server]; + let mut cmd = String::new(); + let mut release_flag = ""; + if release { + release_flag = "--release"; + } + let cmd_path = "./target/debug/omicron-package"; + let artifact_dir = artifact_dir + .to_str() + .ok_or_else(|| FlingError::BadString("artifact_dir".to_string()))?; + + // We use a bash login shell to get a proper environment, so we have a path to + // postgres, and $DEP_PQ_LIBDIRS is filled in. This is required for building + // nexus. + // + // See https://github.com/oxidecomputer/omicron/blob/8757ec542ea4ffbadd6f26094ed4ba357715d70d/rpaths/src/lib.rs + write!( + &mut cmd, + "bash -lc 'cd {} && git checkout {} && {} package --out {} {}'", + config.builder.omicron_path.to_string_lossy(), + config.builder.git_treeish, + cmd_path, + &artifact_dir, + release_flag + )?; + + ssh_exec(&server, &cmd, false) +} + +fn do_check(config: &Config) -> Result<()> { + let server = &config.servers[&config.builder.server]; + let mut cmd = String::new(); + let cmd_path = "./target/debug/omicron-package"; + + write!( + &mut cmd, + "bash -lc 'cd {} && git checkout {} && {} check'", + config.builder.omicron_path.to_string_lossy(), + config.builder.git_treeish, + cmd_path, + )?; + + ssh_exec(&server, &cmd, false) +} + +fn do_uninstall( + config: &Config, + artifact_dir: PathBuf, + install_dir: PathBuf, +) -> Result<()> { + let mut deployment_src = PathBuf::from(&config.deployment.staging_dir); + deployment_src.push(&artifact_dir); + for server_name in &config.deployment.servers { + let server = &config.servers[server_name]; + // Run `omicron-package uninstall` on the deployment server + let cmd = format!( + "cd {} && pfexec ./omicron-package uninstall --in {} --out {}", + config.deployment.staging_dir.to_string_lossy(), + deployment_src.to_string_lossy(), + install_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(&server, &cmd, true)?; + } + Ok(()) +} + +fn do_install(config: &Config, artifact_dir: &Path, install_dir: &Path) { + let builder = &config.servers[&config.builder.server]; + let mut pkg_dir = PathBuf::from(&config.builder.omicron_path); + pkg_dir.push(artifact_dir); + let pkg_dir = pkg_dir.to_string_lossy(); + let pkg_dir = &pkg_dir; + + thread::scope(|s| { + let mut handles = + Vec::<(String, ScopedJoinHandle<'_, Result<()>>)>::new(); + + // Spawn a thread for each server install + for server_name in &config.deployment.servers { + handles.push(( + server_name.to_owned(), + s.spawn(move |_| -> Result<()> { + single_server_install( + config, + &artifact_dir, + &install_dir, + &pkg_dir, + builder, + server_name, + ) + }), + )); + } + + // Join all the handles and print the install status + for (server_name, handle) in handles { + match handle.join() { + Ok(Ok(())) => { + println!("Install completed for server: {}", server_name) + } + Ok(Err(e)) => { + println!( + "Install failed for server: {} with error: {}", + server_name, e + ) + } + Err(_) => { + println!( + "Install failed for server: {}. Thread panicked.", + server_name + ) + } + } + } + }) + .unwrap(); +} + +fn do_overlay(config: &Config) -> Result<()> { + let mut root_path = PathBuf::from(&config.builder.omicron_path); + // TODO: This needs to match the artifact_dir in `package` + root_path.push("out/overlay"); + let server_dirs = dir_per_deploy_server(config, &root_path); + let server = &config.servers[&config.builder.server]; + overlay_sled_agent(&server, config, &server_dirs) +} + +fn overlay_sled_agent( + server: &Server, + config: &Config, + server_dirs: &[PathBuf], +) -> Result<()> { + let sled_agent_dirs: Vec = server_dirs + .iter() + .map(|dir| { + let mut dir = PathBuf::from(dir); + dir.push("sled-agent/pkg"); + dir + }) + .collect(); + + // Create directories on builder + let dirs = dir_string(&sled_agent_dirs); + let cmd = format!("sh -c 'for dir in {}; do mkdir -p $dir; done'", dirs); + + let cmd = format!( + "{} && cd {} && ./target/debug/sled-agent-overlay-files \ + --threshold {} --directories {}", + cmd, + config.builder.omicron_path.to_string_lossy(), + config.deployment.rack_secret_threshold, + dirs + ); + ssh_exec(server, &cmd, false) +} + +fn single_server_install( + config: &Config, + artifact_dir: &Path, + install_dir: &Path, + pkg_dir: &str, + builder: &Server, + server_name: &str, +) -> Result<()> { + let server = &config.servers[server_name]; + + copy_package_artifacts_to_staging(config, pkg_dir, builder, server)?; + copy_omicron_package_binary_to_staging(config, builder, server)?; + copy_package_manifest_to_staging(config, builder, server)?; + run_omicron_package_from_staging( + config, + server, + &artifact_dir, + &install_dir, + )?; + copy_overlay_files_to_staging( + config, + pkg_dir, + builder, + server, + server_name, + )?; + install_overlay_files_from_staging(config, server, &install_dir)?; + restart_services(server) +} + +// Copy package artifacts as a result of `omicron-package package` from the +// builder to the deployment server staging directory. +fn copy_package_artifacts_to_staging( + config: &Config, + pkg_dir: &str, + builder: &Server, + destination: &Server, +) -> Result<()> { + let cmd = format!( + "rsync -avz -e 'ssh -o StrictHostKeyChecking=no' \ + --exclude overlay/ {} {}@{}:{}", + pkg_dir, + destination.username, + destination.addr, + config.deployment.staging_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(builder, &cmd, true) +} + +fn copy_omicron_package_binary_to_staging( + config: &Config, + builder: &Server, + destination: &Server, +) -> Result<()> { + let mut bin_path = PathBuf::from(&config.builder.omicron_path); + bin_path.push("target/debug/omicron-package"); + let cmd = format!( + "rsync -avz {} {}@{}:{}", + bin_path.to_string_lossy(), + destination.username, + destination.addr, + config.deployment.staging_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(builder, &cmd, true) +} + +fn copy_package_manifest_to_staging( + config: &Config, + builder: &Server, + destination: &Server, +) -> Result<()> { + let mut path = PathBuf::from(&config.builder.omicron_path); + path.push("package-manifest.toml"); + let cmd = format!( + "rsync {} {}@{}:{}", + path.to_string_lossy(), + destination.username, + destination.addr, + config.deployment.staging_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(builder, &cmd, true) +} + +fn run_omicron_package_from_staging( + config: &Config, + destination: &Server, + artifact_dir: &Path, + install_dir: &Path, +) -> Result<()> { + let mut deployment_src = PathBuf::from(&config.deployment.staging_dir); + deployment_src.push(&artifact_dir); + + // Run `omicron-package install` on the deployment server + let cmd = format!( + "cd {} && pfexec ./omicron-package install --in {} --out {}", + config.deployment.staging_dir.to_string_lossy(), + deployment_src.to_string_lossy(), + install_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(destination, &cmd, true) +} + +fn copy_overlay_files_to_staging( + config: &Config, + pkg_dir: &str, + builder: &Server, + destination: &Server, + destination_name: &str, +) -> Result<()> { + let cmd = format!( + "rsync -avz {}/overlay/{}/ {}@{}:{}/overlay/", + pkg_dir, + destination_name, + destination.username, + destination.addr, + config.deployment.staging_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(builder, &cmd, true) +} + +fn install_overlay_files_from_staging( + config: &Config, + destination: &Server, + install_dir: &Path, +) -> Result<()> { + let cmd = format!( + "pfexec cp -r {}/overlay/* {}", + config.deployment.staging_dir.to_string_lossy(), + install_dir.to_string_lossy() + ); + println!("$ {}", cmd); + ssh_exec(&destination, &cmd, false) +} + +// For now, we just restart sled-agent, as that's the only service with an +// overlay file. +fn restart_services(destination: &Server) -> Result<()> { + ssh_exec(destination, "svcadm restart sled-agent", false) +} + +fn dir_string(dirs: &[PathBuf]) -> String { + dirs.iter().map(|dir| dir.to_string_lossy().to_string() + " ").collect() +} + +fn dir_per_deploy_server(config: &Config, root: &Path) -> Vec { + config + .deployment + .servers + .iter() + .map(|server_dir| { + let mut dir = PathBuf::from(root); + dir.push(server_dir); + dir + }) + .collect() +} + +fn ssh_exec( + server: &Server, + remote_cmd: &str, + forward_agent: bool, +) -> Result<()> { + // Source .profile, so we have access to cargo. Rustup installs knowledge + // about the cargo path here. + let remote_cmd = String::from(". $HOME/.profile && ") + remote_cmd; + let auth_sock = std::env::var("SSH_AUTH_SOCK")?; + let mut cmd = Command::new("ssh"); + if forward_agent { + cmd.arg("-A"); + } + cmd.arg("-o") + .arg("StrictHostKeyChecking=no") + .arg("-l") + .arg(&server.username) + .arg(&server.addr) + .arg(&remote_cmd); + cmd.env("SSH_AUTH_SOCK", auth_sock); + cmd.status() + .context(format!("Failed to run {} on {}", remote_cmd, server.addr))?; + + Ok(()) +} + +fn validate_servers( + chosen: &BTreeSet, + all: &BTreeMap, +) -> Result<(), FlingError> { + let all = all.keys().cloned().collect(); + let diff: Vec = chosen.difference(&all).cloned().collect(); + if !diff.is_empty() { + Err(FlingError::InvalidServers(diff)) + } else { + Ok(()) + } +} + +fn validate_absolute_path( + path: &Path, + field: &'static str, +) -> Result<(), FlingError> { + if path.is_absolute() || path.starts_with("$HOME") { + Ok(()) + } else { + Err(FlingError::NotAbsolutePath { field }) + } +} + +fn validate(config: &Config) -> Result<(), FlingError> { + validate_absolute_path(&config.local_source, "local_source")?; + validate_absolute_path( + &config.builder.omicron_path, + "builder.omicron_path", + )?; + validate_absolute_path( + &config.deployment.staging_dir, + "deployment.staging_dir", + )?; + + validate_servers(&config.deployment.servers, &config.servers)?; + + validate_servers( + &BTreeSet::from([config.builder.server.clone()]), + &config.servers, + ) +} + +fn main() -> Result<()> { + let args = Args::from_args_safe().map_err(|err| anyhow!(err))?; + let config = parse::<_, Config>(args.config)?; + + validate(&config)?; + + match args.subcommand { + SubCommand::Exec { cmd, servers } => { + do_exec(&config, cmd, servers)?; + } + SubCommand::Sync => do_sync(&config)?, + SubCommand::BuildMinimal => do_build_minimal(&config)?, + SubCommand::Package(PackageSubCommand::Package { + artifact_dir, + release, + }) => { + do_package(&config, artifact_dir, release)?; + } + SubCommand::Package(PackageSubCommand::Install { + artifact_dir, + install_dir, + }) => { + do_install(&config, &artifact_dir, &install_dir); + } + SubCommand::Package(PackageSubCommand::Uninstall { + artifact_dir, + install_dir, + }) => { + do_uninstall(&config, artifact_dir, install_dir)?; + } + SubCommand::Package(PackageSubCommand::Check) => do_check(&config)?, + SubCommand::Overlay => do_overlay(&config)?, + } + Ok(()) +} diff --git a/package/src/lib.rs b/package/src/lib.rs new file mode 100644 index 00000000000..f4a84573fab --- /dev/null +++ b/package/src/lib.rs @@ -0,0 +1,77 @@ +//! Common code shared between `omicron-package` and `thing-flinger` binaries. + +use serde::de::DeserializeOwned; +use std::path::Path; +use std::path::PathBuf; +use structopt::StructOpt; +use thiserror::Error; + +/// Errors which may be returned when parsing the server configuration. +#[derive(Error, Debug)] +pub enum ParseError { + #[error("Cannot parse toml: {0}")] + Toml(#[from] toml::de::Error), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} + +pub fn parse, C: DeserializeOwned>( + path: P, +) -> Result { + let contents = std::fs::read_to_string(path.as_ref())?; + let cfg = toml::from_str::(&contents)?; + Ok(cfg) +} + +#[derive(Debug, StructOpt)] +pub enum SubCommand { + /// Builds the packages specified in a manifest, and places them into a target + /// directory. + Package { + /// The output directory, where artifacts should be placed. + /// + /// Defaults to "out". + #[structopt(long = "out", default_value = "out")] + artifact_dir: PathBuf, + + /// The binary profile to package. + /// + /// True: release, False: debug (default). + #[structopt( + short, + long, + help = "True if bundling release-mode binaries" + )] + release: bool, + }, + /// Checks the packages specified in a manifest, without building. + Check, + /// Installs the packages to a target machine. + Install { + /// The directory from which artifacts will be pulled. + /// + /// Should match the format from the Package subcommand. + #[structopt(long = "in", default_value = "out")] + artifact_dir: PathBuf, + + /// The directory to which artifacts will be installed. + /// + /// Defaults to "/opt/oxide". + #[structopt(long = "out", default_value = "/opt/oxide")] + install_dir: PathBuf, + }, + /// Removes the packages from the target machine. + Uninstall { + /// The directory from which artifacts were be pulled. + /// + /// Should match the format from the Package subcommand. + #[structopt(long = "in", default_value = "out")] + artifact_dir: PathBuf, + + /// The directory to which artifacts were installed. + /// + /// Defaults to "/opt/oxide". + #[structopt(long = "out", default_value = "/opt/oxide")] + install_dir: PathBuf, + }, +} diff --git a/sled-agent-client/Cargo.toml b/sled-agent-client/Cargo.toml index 2f233c4ebdc..ebf1f35dfd1 100644 --- a/sled-agent-client/Cargo.toml +++ b/sled-agent-client/Cargo.toml @@ -9,6 +9,7 @@ anyhow = "1.0" async-trait = "0.1" progenitor = { git = "https://github.com/oxidecomputer/progenitor" } reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } +schemars = { version = "0.8" } percent-encoding = "2.1.0" [dependencies.chrono] diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 61241d3bb0d..26057fa6358 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -11,6 +11,8 @@ bincode = "1.3.3" bytes = "1.1" cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } +# Only used by the simulated sled agent. +crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "7e8d0c31" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } futures = "0.3.18" ipnetwork = "0.18" diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index 95ef478dcc5..1bf395d7701 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -95,6 +95,7 @@ impl Simulatable for SimDisk { &nexus_client::types::DiskRuntimeState::from(current), ) .await + .map(|_| ()) .map_err(Error::from) } } diff --git a/sled-agent/src/sim/http_entrypoints_storage.rs b/sled-agent/src/sim/http_entrypoints_storage.rs new file mode 100644 index 00000000000..6ef08a9a373 --- /dev/null +++ b/sled-agent/src/sim/http_entrypoints_storage.rs @@ -0,0 +1,107 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! HTTP entrypoint functions for simulating the storage agent API. + +use crucible_agent_client::types::{CreateRegion, Region, RegionId}; +use dropshot::{ + endpoint, ApiDescription, HttpError, HttpResponseDeleted, HttpResponseOk, + Path as TypedPath, RequestContext, TypedBody, +}; +use schemars::JsonSchema; +use serde::Deserialize; +use std::sync::Arc; + +use super::storage::CrucibleData; + +type CrucibleAgentApiDescription = ApiDescription>; + +/// Returns a description of the Crucible Agent API. +pub fn api() -> CrucibleAgentApiDescription { + fn register_endpoints( + api: &mut CrucibleAgentApiDescription, + ) -> Result<(), String> { + api.register(region_list)?; + api.register(region_create)?; + api.register(region_get)?; + api.register(region_delete)?; + Ok(()) + } + + let mut api = CrucibleAgentApiDescription::new(); + if let Err(err) = register_endpoints(&mut api) { + panic!("failed to register entrypoints: {}", err); + } + api +} + +// TODO: We'd like to de-duplicate as much as possible with the +// real crucible agent here, to avoid skew. + +#[derive(Deserialize, JsonSchema)] +struct RegionPath { + id: RegionId, +} + +#[endpoint { + method = GET, + path = "/crucible/0/regions", +}] +async fn region_list( + rc: Arc>>, +) -> Result>, HttpError> { + let crucible = rc.context(); + Ok(HttpResponseOk(crucible.list().await)) +} + +#[endpoint { + method = POST, + path = "/crucible/0/regions", +}] +async fn region_create( + rc: Arc>>, + body: TypedBody, +) -> Result, HttpError> { + let params = body.into_inner(); + let crucible = rc.context(); + + Ok(HttpResponseOk(crucible.create(params).await)) +} + +#[endpoint { + method = GET, + path = "/crucible/0/regions/{id}", +}] +async fn region_get( + rc: Arc>>, + path: TypedPath, +) -> Result, HttpError> { + let id = path.into_inner().id; + let crucible = rc.context(); + match crucible.get(id).await { + Some(region) => Ok(HttpResponseOk(region)), + None => { + Err(HttpError::for_not_found(None, "Region not found".to_string())) + } + } +} + +#[endpoint { + method = DELETE, + path = "/crucible/0/regions/{id}", +}] +async fn region_delete( + rc: Arc>>, + path: TypedPath, +) -> Result { + let id = path.into_inner().id; + let crucible = rc.context(); + + match crucible.delete(id).await { + Some(_) => Ok(HttpResponseDeleted()), + None => { + Err(HttpError::for_not_found(None, "Region not found".to_string())) + } + } +} diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index b7414d4c877..2e57353f878 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -102,6 +102,7 @@ impl Simulatable for SimInstance { &nexus_client::types::InstanceRuntimeState::from(current), ) .await + .map(|_| ()) .map_err(Error::from) } } diff --git a/sled-agent/src/sim/mod.rs b/sled-agent/src/sim/mod.rs index 0cad7673f32..3824951d854 100644 --- a/sled-agent/src/sim/mod.rs +++ b/sled-agent/src/sim/mod.rs @@ -10,10 +10,13 @@ mod collection; mod config; mod disk; mod http_entrypoints; +mod http_entrypoints_storage; mod instance; mod server; mod simulatable; mod sled_agent; +mod storage; pub use config::{Config, SimMode}; pub use server::{run_server, Server}; +pub use sled_agent::SledAgent; diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 664d4aef5ea..71e0c692152 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -7,6 +7,7 @@ */ use crate::params::DiskStateRequested; +use futures::lock::Mutex; use nexus_client::Client as NexusClient; use omicron_common::api::external::Error; use omicron_common::api::internal::nexus::DiskRuntimeState; @@ -21,6 +22,7 @@ use super::collection::SimCollection; use super::config::SimMode; use super::disk::SimDisk; use super::instance::SimInstance; +use super::storage::{CrucibleData, Storage}; /** * Simulates management of the control plane on a sled @@ -32,13 +34,11 @@ use super::instance::SimInstance; * move later. */ pub struct SledAgent { - /** unique id for this server */ - pub id: Uuid, - /** collection of simulated instances, indexed by instance uuid */ instances: Arc>, /** collection of simulated disks, indexed by disk uuid */ disks: Arc>, + storage: Mutex, } impl SledAgent { @@ -51,25 +51,30 @@ impl SledAgent { id: &Uuid, sim_mode: SimMode, log: Logger, - ctlsc: Arc, + nexus_client: Arc, ) -> SledAgent { info!(&log, "created simulated sled agent"; "sim_mode" => ?sim_mode); let instance_log = log.new(o!("kind" => "instances")); let disk_log = log.new(o!("kind" => "disks")); + let storage_log = log.new(o!("kind" => "storage")); SledAgent { - id: *id, instances: Arc::new(SimCollection::new( - Arc::clone(&ctlsc), + Arc::clone(&nexus_client), instance_log, sim_mode, )), disks: Arc::new(SimCollection::new( - Arc::clone(&ctlsc), + Arc::clone(&nexus_client), disk_log, sim_mode, )), + storage: Mutex::new(Storage::new( + *id, + Arc::clone(&nexus_client), + storage_log, + )), } } @@ -111,4 +116,27 @@ impl SledAgent { pub async fn disk_poke(&self, id: Uuid) { self.disks.sim_poke(id).await; } + + /// Adds a Zpool to the simulated sled agent. + pub async fn create_zpool(&self, id: Uuid, size: u64) { + self.storage.lock().await.insert_zpool(id, size).await; + } + + /// Adds a Crucible Dataset within a zpool. + pub async fn create_crucible_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) { + self.storage.lock().await.insert_dataset(zpool_id, dataset_id).await; + } + + /// Returns a crucible dataset within a particular zpool. + pub async fn get_crucible_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) -> Arc { + self.storage.lock().await.get_dataset(zpool_id, dataset_id).await + } } diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs new file mode 100644 index 00000000000..55b2cb80f45 --- /dev/null +++ b/sled-agent/src/sim/storage.rs @@ -0,0 +1,246 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Simulated sled agent storage implementation + +use crucible_agent_client::types::{CreateRegion, Region, RegionId, State}; +use futures::lock::Mutex; +use nexus_client::types::{ + ByteCount, DatasetKind, DatasetPutRequest, ZpoolPutRequest, +}; +use nexus_client::Client as NexusClient; +use slog::Logger; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use uuid::Uuid; + +type CreateCallback = Box State + Send + 'static>; + +struct CrucibleDataInner { + regions: HashMap, + on_create: Option, +} + +impl CrucibleDataInner { + fn new() -> Self { + Self { regions: HashMap::new(), on_create: None } + } + + fn set_create_callback(&mut self, callback: CreateCallback) { + self.on_create = Some(callback); + } + + fn list(&self) -> Vec { + self.regions.values().cloned().collect() + } + + fn create(&mut self, params: CreateRegion) -> Region { + let id = Uuid::from_str(¶ms.id.0).unwrap(); + + let state = if let Some(on_create) = &self.on_create { + on_create(¶ms) + } else { + State::Requested + }; + + let region = Region { + id: params.id, + volume_id: params.volume_id, + block_size: params.block_size, + extent_size: params.extent_size, + extent_count: params.extent_count, + // NOTE: This is a lie - no server is running. + port_number: 0, + state, + }; + let old = self.regions.insert(id, region.clone()); + if let Some(old) = old { + assert_eq!( + old.id.0, region.id.0, + "Region already exists, but with a different ID" + ); + } + region + } + + fn get(&self, id: RegionId) -> Option { + let id = Uuid::from_str(&id.0).unwrap(); + self.regions.get(&id).cloned() + } + + fn get_mut(&mut self, id: &RegionId) -> Option<&mut Region> { + let id = Uuid::from_str(&id.0).unwrap(); + self.regions.get_mut(&id) + } + + fn delete(&mut self, id: RegionId) -> Option { + let id = Uuid::from_str(&id.0).unwrap(); + let mut region = self.regions.get_mut(&id)?; + region.state = State::Destroyed; + Some(region.clone()) + } +} + +/// Represents a running Crucible Agent. Contains regions. +pub struct CrucibleData { + inner: Mutex, +} + +impl CrucibleData { + fn new() -> Self { + Self { inner: Mutex::new(CrucibleDataInner::new()) } + } + + pub async fn set_create_callback(&self, callback: CreateCallback) { + self.inner.lock().await.set_create_callback(callback); + } + + pub async fn list(&self) -> Vec { + self.inner.lock().await.list() + } + + pub async fn create(&self, params: CreateRegion) -> Region { + self.inner.lock().await.create(params) + } + + pub async fn get(&self, id: RegionId) -> Option { + self.inner.lock().await.get(id) + } + + pub async fn delete(&self, id: RegionId) -> Option { + self.inner.lock().await.delete(id) + } + + pub async fn set_state(&self, id: &RegionId, state: State) { + self.inner + .lock() + .await + .get_mut(id) + .expect("region does not exist") + .state = state; + } +} + +/// A simulated Crucible Dataset. +/// +/// Contains both the data and the HTTP server. +pub struct CrucibleServer { + server: dropshot::HttpServer>, + data: Arc, +} + +impl CrucibleServer { + fn new(log: &Logger) -> Self { + let data = Arc::new(CrucibleData::new()); + let config = dropshot::ConfigDropshot { + bind_address: SocketAddr::new("127.0.0.1".parse().unwrap(), 0), + ..Default::default() + }; + let dropshot_log = log + .new(o!("component" => "Simulated CrucibleAgent Dropshot Server")); + let server = dropshot::HttpServerStarter::new( + &config, + super::http_entrypoints_storage::api(), + data.clone(), + &dropshot_log, + ) + .expect("Could not initialize server") + .start(); + + CrucibleServer { server, data } + } + + fn address(&self) -> SocketAddr { + self.server.local_addr() + } +} + +pub struct Zpool { + datasets: HashMap, +} + +impl Zpool { + pub fn new() -> Self { + Zpool { datasets: HashMap::new() } + } + + pub fn insert_dataset( + &mut self, + log: &Logger, + id: Uuid, + ) -> &CrucibleServer { + self.datasets.insert(id, CrucibleServer::new(log)); + self.datasets + .get(&id) + .expect("Failed to get the dataset we just inserted") + } +} + +/// Simulated representation of all storage on a sled. +pub struct Storage { + sled_id: Uuid, + nexus_client: Arc, + log: Logger, + zpools: HashMap, +} + +impl Storage { + pub fn new( + sled_id: Uuid, + nexus_client: Arc, + log: Logger, + ) -> Self { + Self { sled_id, nexus_client, log, zpools: HashMap::new() } + } + + /// Adds a Zpool to the sled's simulated storage and notifies Nexus. + pub async fn insert_zpool(&mut self, zpool_id: Uuid, size: u64) { + // Update our local data + self.zpools.insert(zpool_id, Zpool::new()); + + // Notify Nexus + let request = ZpoolPutRequest { size: ByteCount(size) }; + self.nexus_client + .zpool_put(&self.sled_id, &zpool_id, &request) + .await + .expect("Failed to notify Nexus about new Zpool"); + } + + /// Adds a Dataset to the sled's simulated storage and notifies Nexus. + pub async fn insert_dataset(&mut self, zpool_id: Uuid, dataset_id: Uuid) { + // Update our local data + let dataset = self + .zpools + .get_mut(&zpool_id) + .expect("Zpool does not exist") + .insert_dataset(&self.log, dataset_id); + + // Notify Nexus + let request = DatasetPutRequest { + address: dataset.address().to_string(), + kind: DatasetKind::Crucible, + }; + self.nexus_client + .dataset_put(&zpool_id, &dataset_id, &request) + .await + .expect("Failed to notify Nexus about new Dataset"); + } + + pub async fn get_dataset( + &self, + zpool_id: Uuid, + dataset_id: Uuid, + ) -> Arc { + self.zpools + .get(&zpool_id) + .expect("Zpool does not exist") + .datasets + .get(&dataset_id) + .expect("Dataset does not exist") + .data + .clone() + } +}