From 2baae91e4b1e689f0f7d412628c7f1dde667ab39 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Wed, 28 Sep 2022 13:48:14 -0400 Subject: [PATCH 1/7] Provision destination volume for snapshot blocks During snapshot creation, provision a destination volume as an eventual landing place of the snapshot blocks. This will cause snapshots to fail if there isn't space to store all the blocks in regions of their own. This commit adds an optional destination volume ID to the snapshot model. Once a snapshot is created, some task will have to copy blocks from the source volume to this destination volume, then swap the entries accordingly. This commit also moves common Crucible agent interaction functions into the saga mod.rs, and generalizes region_allocate's arguments so that it can be called from more than just the disk creation saga. Closes https://github.com/oxidecomputer/omicron/issues/1642 --- common/src/sql/dbinit.sql | 5 + nexus/db-model/src/schema.rs | 2 + nexus/db-model/src/snapshot.rs | 4 + nexus/src/app/disk.rs | 5 +- nexus/src/app/sagas/disk_create.rs | 135 +-------- nexus/src/app/sagas/mod.rs | 123 ++++++++ nexus/src/app/sagas/snapshot_create.rs | 282 +++++++++++++++++- nexus/src/db/datastore/mod.rs | 59 +++- nexus/src/db/datastore/region.rs | 118 +++++++- nexus/tests/integration_tests/endpoints.rs | 3 +- nexus/tests/integration_tests/snapshots.rs | 61 ++++ .../integration_tests/volume_management.rs | 4 +- nexus/types/src/external_api/params.rs | 83 ------ 13 files changed, 650 insertions(+), 234 deletions(-) diff --git a/common/src/sql/dbinit.sql b/common/src/sql/dbinit.sql index 198bb09a6f4..829702d7137 100644 --- a/common/src/sql/dbinit.sql +++ b/common/src/sql/dbinit.sql @@ -654,6 +654,8 @@ CREATE TABLE omicron.public.disk ( /* Indicates that the object has been deleted */ /* This is redundant for Disks, but we keep it here for consistency. */ time_deleted TIMESTAMPTZ, + + /* child resource generation number, per RFD 192 */ rcgen INT NOT NULL, /* Every Disk is in exactly one Project at a time. */ @@ -772,6 +774,9 @@ CREATE TABLE omicron.public.snapshot ( /* Every Snapshot consists of a root volume */ volume_id UUID NOT NULL, + /* Where will the scrubbed blocks eventually land? */ + destination_volume_id UUID, + gen INT NOT NULL, state omicron.public.snapshot_state NOT NULL, block_size omicron.public.block_size NOT NULL, diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index b2e30402e96..42df422586e 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -77,6 +77,8 @@ table! { disk_id -> Uuid, volume_id -> Uuid, + destination_volume_id -> Nullable, + gen -> Int8, state -> crate::SnapshotStateEnum, block_size -> crate::BlockSizeEnum, diff --git a/nexus/db-model/src/snapshot.rs b/nexus/db-model/src/snapshot.rs index b6fadefbc29..b34bd668f2a 100644 --- a/nexus/db-model/src/snapshot.rs +++ b/nexus/db-model/src/snapshot.rs @@ -43,9 +43,13 @@ pub struct Snapshot { pub identity: SnapshotIdentity, pub project_id: Uuid, + // which disk is this a snapshot of pub disk_id: Uuid, pub volume_id: Uuid, + // destination of all snapshot blocks + pub destination_volume_id: Option, + pub gen: Generation, pub state: SnapshotState, pub block_size: BlockSize, diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index e26955f12fe..5d7e8b3c0f8 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -510,8 +510,11 @@ impl super::Nexus { .project_delete_snapshot(opctx, &authz_snapshot, &db_snapshot) .await?; - // Kick off volume deletion saga + // Kick off volume deletion saga(s) self.volume_delete(db_snapshot.volume_id).await?; + if let Some(volume_id) = db_snapshot.destination_volume_id { + self.volume_delete(volume_id).await?; + } Ok(()) } diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index c65f97d7331..a03f81a241c 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -3,8 +3,8 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{ - ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, - ACTION_GENERATE_ID, MAX_CONCURRENT_REGION_REQUESTS, + ensure_all_datasets_and_regions, ActionRegistry, NexusActionContext, + NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; use crate::app::sagas::NexusAction; use crate::context::OpContext; @@ -12,22 +12,13 @@ use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; use crate::external_api::params; use crate::{authn, authz, db}; -use anyhow::anyhow; -use crucible_agent_client::{ - types::{CreateRegion, RegionId, State as RegionState}, - Client as CrucibleAgentClient, -}; -use futures::StreamExt; use lazy_static::lazy_static; use omicron_common::api::external::Error; -use omicron_common::backoff::{self, BackoffError}; use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::{CrucibleOpts, VolumeConstructionRequest}; -use slog::warn; -use slog::Logger; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::sync::Arc; use steno::ActionError; use steno::ActionFunc; @@ -240,69 +231,17 @@ async fn sdc_alloc_regions( let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let datasets_and_regions = osagactx .datastore() - .region_allocate(&opctx, volume_id, ¶ms.create_params) + .region_allocate( + &opctx, + volume_id, + ¶ms.create_params.disk_source, + params.create_params.size, + ) .await .map_err(ActionError::action_failed)?; Ok(datasets_and_regions) } -/// Call out to Crucible agent and perform region creation. -async fn ensure_region_in_dataset( - log: &Logger, - dataset: &db::model::Dataset, - region: &db::model::Region, -) -> Result { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - let region_request = CreateRegion { - block_size: region.block_size().to_bytes(), - extent_count: region.extent_count().try_into().unwrap(), - extent_size: region.blocks_per_extent().try_into().unwrap(), - // TODO: Can we avoid casting from UUID to string? - // NOTE: This'll require updating the crucible agent client. - id: RegionId(region.id().to_string()), - encrypted: region.encrypted(), - cert_pem: None, - key_pem: None, - root_pem: None, - }; - - let create_region = || async { - let region = client - .region_create(®ion_request) - .await - .map_err(|e| BackoffError::Permanent(e.into()))?; - match region.state { - RegionState::Requested => Err(BackoffError::transient(anyhow!( - "Region creation in progress" - ))), - RegionState::Created => Ok(region), - _ => Err(BackoffError::Permanent(anyhow!( - "Failed to create region, unexpected state: {:?}", - region.state - ))), - } - }; - - let log_create_failure = |_, delay| { - warn!( - log, - "Region requested, not yet created. Retrying in {:?}", delay - ); - }; - - let region = backoff::retry_notify( - backoff::internal_service_policy(), - create_region, - log_create_failure, - ) - .await - .map_err(|e| Error::internal_error(&e.to_string()))?; - - Ok(region.into_inner()) -} - /// Call out to Crucible agent and perform region creation. async fn sdc_regions_ensure( sagactx: NexusActionContext, @@ -310,55 +249,13 @@ async fn sdc_regions_ensure( let log = sagactx.user_data().log(); let disk_id = sagactx.lookup::("disk_id")?; - let datasets_and_regions = sagactx - .lookup::>( + let datasets_and_regions = ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( "datasets_and_regions", - )?; - - let request_count = datasets_and_regions.len(); - - // Allocate regions, and additionally return the dataset that the region was - // allocated in. - let datasets_and_regions: Vec<( - db::model::Dataset, - crucible_agent_client::types::Region, - )> = futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - match ensure_region_in_dataset(log, &dataset, ®ion).await { - Ok(result) => Ok((dataset, result)), - Err(e) => Err(e), - } - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::, - >>() - .await - .into_iter() - .collect::, - Error, - >>() - .map_err(ActionError::action_failed)?; - - // Assert each region has the same block size, otherwise Volume creation - // will fail. - let all_region_have_same_block_size = datasets_and_regions - .windows(2) - .all(|w| w[0].1.block_size == w[1].1.block_size); - - if !all_region_have_same_block_size { - return Err(ActionError::action_failed(Error::internal_error( - "volume creation will fail due to block size mismatch", - ))); - } + )?, + ) + .await?; let block_size = datasets_and_regions[0].1.block_size; @@ -519,8 +416,6 @@ async fn sdc_regions_ensure( key_pem: None, root_cert_pem: None, - // TODO open a control socket for the whole volume, not - // in the sub volumes control: None, read_only: false, diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 564f1b342ad..80dd1b056ad 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -9,8 +9,19 @@ // correctness, idempotence, etc. The more constrained this interface is, the // easier it will be to test, version, and update in deployed systems. +use crate::db; +use crate::db::identity::Asset; use crate::saga_interface::SagaContext; +use anyhow::anyhow; +use crucible_agent_client::{ + types::{CreateRegion, RegionId, State as RegionState}, + Client as CrucibleAgentClient, +}; +use futures::StreamExt; use lazy_static::lazy_static; +use omicron_common::api::external::Error; +use omicron_common::backoff::{self, BackoffError}; +use slog::Logger; use std::sync::Arc; use steno::new_action_noop_undo; use steno::ActionContext; @@ -114,3 +125,115 @@ pub(super) async fn saga_generate_uuid( // Arbitrary limit on concurrency, for operations issued on multiple regions // within a disk at the same time. const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; + +/// Call out to Crucible agent and perform region creation. +pub async fn ensure_region_in_dataset( + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, +) -> Result { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + let region_request = CreateRegion { + block_size: region.block_size().to_bytes(), + extent_count: region.extent_count().try_into().unwrap(), + extent_size: region.blocks_per_extent().try_into().unwrap(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + encrypted: region.encrypted(), + cert_pem: None, + key_pem: None, + root_pem: None, + }; + + let create_region = || async { + let region = client + .region_create(®ion_request) + .await + .map_err(|e| BackoffError::Permanent(e.into()))?; + match region.state { + RegionState::Requested => Err(BackoffError::transient(anyhow!( + "Region creation in progress" + ))), + RegionState::Created => Ok(region), + _ => Err(BackoffError::Permanent(anyhow!( + "Failed to create region, unexpected state: {:?}", + region.state + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", delay + ); + }; + + let region = backoff::retry_notify( + backoff::internal_service_policy(), + create_region, + log_create_failure, + ) + .await + .map_err(|e| Error::internal_error(&e.to_string()))?; + + Ok(region.into_inner()) +} + +pub async fn ensure_all_datasets_and_regions( + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Result< + Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, + ActionError, +> { + let request_count = datasets_and_regions.len(); + + // Allocate regions, and additionally return the dataset that the region was + // allocated in. + let datasets_and_regions: Vec<( + db::model::Dataset, + crucible_agent_client::types::Region, + )> = futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + match ensure_region_in_dataset(log, &dataset, ®ion).await { + Ok(result) => Ok((dataset, result)), + Err(e) => Err(e), + } + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::, + >>() + .await + .into_iter() + .collect::, + Error, + >>() + .map_err(ActionError::action_failed)?; + + // Assert each region has the same block size, otherwise Volume creation + // will fail. + let all_region_have_same_block_size = datasets_and_regions + .windows(2) + .all(|w| w[0].1.block_size == w[1].1.block_size); + + if !all_region_have_same_block_size { + return Err(ActionError::action_failed(Error::internal_error( + "volume creation will fail due to block size mismatch", + ))); + } + + Ok(datasets_and_regions) +} diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 2d11ad91e01..b79c971037e 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -82,24 +82,27 @@ //! use super::{ - ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, - ACTION_GENERATE_ID, + ensure_all_datasets_and_regions, ActionRegistry, NexusActionContext, + NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; use crate::app::sagas::NexusAction; use crate::context::OpContext; use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; +use crate::db::model::Generation; use crate::external_api::params; use crate::{authn, authz, db}; use anyhow::anyhow; use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; use lazy_static::lazy_static; +use omicron_common::api::external; use omicron_common::api::external::Error; +use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::{ - DiskSnapshotRequestBody, InstanceIssueDiskSnapshotRequestBody, - VolumeConstructionRequest, + CrucibleOpts, DiskSnapshotRequestBody, + InstanceIssueDiskSnapshotRequestBody, VolumeConstructionRequest, }; use slog::info; use std::collections::BTreeMap; @@ -124,6 +127,24 @@ pub struct Params { // snapshot create saga: actions lazy_static! { + static ref SAVE_DISK_GEN_TO_CONTEXT: NexusAction = new_action_noop_undo( + "snapshot-create.save-disk-gen-to-ctx", + ssc_save_disk_gen, + ); + static ref REGIONS_ALLOC: NexusAction = new_action_noop_undo( + "snapshot-create.regions-alloc", + ssc_alloc_regions, + ); + static ref REGIONS_ENSURE: NexusAction = new_action_noop_undo( + "snapshot-create.regions-ensure", + ssc_regions_ensure, + ); + static ref CREATE_DESTINATION_VOLUME_RECORD: NexusAction = + ActionFunc::new_action( + "snapshot-create.create-destination-volume-record", + ssc_create_destination_volume_record, + ssc_create_destination_volume_record_undo, + ); static ref CREATE_SNAPSHOT_RECORD: NexusAction = ActionFunc::new_action( "snapshot-create.create-snapshot-record", ssc_create_snapshot_record, @@ -157,6 +178,10 @@ impl NexusSaga for SagaSnapshotCreate { type Params = Params; fn register_actions(registry: &mut ActionRegistry) { + registry.register(Arc::clone(&*SAVE_DISK_GEN_TO_CONTEXT)); + registry.register(Arc::clone(&*REGIONS_ALLOC)); + registry.register(Arc::clone(&*REGIONS_ENSURE)); + registry.register(Arc::clone(&*CREATE_DESTINATION_VOLUME_RECORD)); registry.register(Arc::clone(&*CREATE_SNAPSHOT_RECORD)); registry.register(Arc::clone(&*SEND_SNAPSHOT_REQUEST)); registry.register(Arc::clone(&*START_RUNNING_SNAPSHOT)); @@ -181,6 +206,39 @@ impl NexusSaga for SagaSnapshotCreate { ACTION_GENERATE_ID.as_ref(), )); + builder.append(Node::action( + "destination_volume_id", + "GenerateDestinationVolumeId", + ACTION_GENERATE_ID.as_ref(), + )); + + // At the beginning of this saga, save disk generation number. Compare + // later to see if the disk changed mid-saga. + builder.append(Node::action( + "disk_gen", + "SaveDiskGenToCtx", + SAVE_DISK_GEN_TO_CONTEXT.as_ref(), + )); + + // Allocate region space for snapshot to store blocks post-scrub + builder.append(Node::action( + "datasets_and_regions", + "RegionsAlloc", + REGIONS_ALLOC.as_ref(), + )); + + builder.append(Node::action( + "regions_ensure", + "RegionsEnsure", + REGIONS_ENSURE.as_ref(), + )); + + builder.append(Node::action( + "created_destination_volume", + "CreateDestinationVolumeRecord", + CREATE_DESTINATION_VOLUME_RECORD.as_ref(), + )); + // Create the Snapshot DB object builder.append(Node::action( "created_snapshot", @@ -222,6 +280,187 @@ impl NexusSaga for SagaSnapshotCreate { // snapshot create saga: action implementations +async fn ssc_save_disk_gen( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) + .project_id(params.project_id) + .disk_name(¶ms.create_params.disk.clone().into()) + .fetch() + .await + .map_err(ActionError::action_failed)?; + + Ok(disk.runtime_state.gen) +} + +async fn ssc_alloc_regions( + sagactx: NexusActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let destination_volume_id = + sagactx.lookup::("destination_volume_id")?; + + // Ensure the destination volume is backed by appropriate regions. + // + // This allocates regions in the database, but the disk state is still + // "creating" - the respective Crucible Agents must be instructed to + // allocate the necessary regions before we can mark the disk as "ready to + // be used". + // + // TODO: Depending on the result of + // https://github.com/oxidecomputer/omicron/issues/613 , we + // should consider using a paginated API to access regions, rather than + // returning all of them at once. + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) + .project_id(params.project_id) + .disk_name(¶ms.create_params.disk.clone().into()) + .fetch() + .await + .map_err(ActionError::action_failed)?; + + if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { + return Err(ActionError::action_failed( + "disk changed during snapshot create".to_string(), + )); + } + + let datasets_and_regions = osagactx + .datastore() + .region_allocate( + &opctx, + destination_volume_id, + ¶ms::DiskSource::Blank { + block_size: params::BlockSize::try_from( + disk.block_size.to_bytes(), + ) + .map_err(|e| ActionError::action_failed(e.to_string()))?, + }, + external::ByteCount::from(disk.size), + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(datasets_and_regions) +} + +async fn ssc_regions_ensure( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + let destination_volume_id = + sagactx.lookup::("destination_volume_id")?; + + let datasets_and_regions = ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await?; + + let block_size = datasets_and_regions[0].1.block_size; + + // Create volume construction request + let mut rng = StdRng::from_entropy(); + let volume_construction_request = VolumeConstructionRequest::Volume { + id: destination_volume_id, + block_size, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size, + // gen of 0 is here, these regions were just allocated. + gen: 0, + opts: CrucibleOpts { + id: destination_volume_id, + target: datasets_and_regions + .iter() + .map(|(dataset, region)| { + dataset + .address_with_port(region.port_number) + .to_string() + }) + .collect(), + + lossy: false, + flush_timeout: None, + + // all downstairs will expect encrypted blocks + key: Some(base64::encode({ + // TODO the current encryption key + // requirement is 32 bytes, what if that + // changes? + let mut random_bytes: [u8; 32] = [0; 32]; + rng.fill_bytes(&mut random_bytes); + random_bytes + })), + + // TODO TLS, which requires sending X509 stuff during + // downstairs region allocation too. + cert_pem: None, + key_pem: None, + root_cert_pem: None, + + control: None, + + // TODO while the transfer of blocks is occurring to the + // destination volume, the opt here should be read-write. When + // the transfer has completed, update the volume to make it + // read-only. + read_only: false, + }, + }], + read_only_parent: None, + }; + + let volume_data = serde_json::to_string(&volume_construction_request) + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; + + Ok(volume_data) +} + +async fn ssc_create_destination_volume_record( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + + let destination_volume_id = + sagactx.lookup::("destination_volume_id")?; + let destination_volume_data = sagactx.lookup::("regions_ensure")?; + + let volume = + db::model::Volume::new(destination_volume_id, destination_volume_data); + + let volume_created = osagactx + .datastore() + .volume_create(volume) + .await + .map_err(ActionError::action_failed)?; + + Ok(volume_created) +} + +async fn ssc_create_destination_volume_record_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let destination_volume_id = + sagactx.lookup::("destination_volume_id")?; + osagactx.nexus().volume_delete(destination_volume_id).await?; + + Ok(()) +} + async fn ssc_create_snapshot_record( sagactx: NexusActionContext, ) -> Result { @@ -232,10 +471,12 @@ async fn ssc_create_snapshot_record( let snapshot_id = sagactx.lookup::("snapshot_id")?; - // We admittedly reference the volume before it has been allocated, - // but this should be acceptable because the snapshot remains in a "Creating" - // state until the saga has completed. + // We admittedly reference the volume(s) before they have been allocated, + // but this should be acceptable because the snapshot remains in a + // "Creating" state until the saga has completed. let volume_id = sagactx.lookup::("volume_id")?; + let destination_volume_id = + sagactx.lookup::("destination_volume_id")?; info!(log, "grabbing disk by name {}", params.create_params.disk); @@ -246,6 +487,12 @@ async fn ssc_create_snapshot_record( .await .map_err(ActionError::action_failed)?; + if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { + return Err(ActionError::action_failed( + "disk changed during snapshot create".to_string(), + )); + } + info!(log, "creating snapshot {} from disk {}", snapshot_id, disk.id()); let snapshot = db::model::Snapshot { @@ -257,6 +504,7 @@ async fn ssc_create_snapshot_record( project_id: params.project_id, disk_id: disk.id(), volume_id, + destination_volume_id: Some(destination_volume_id), gen: db::model::Generation::new(), state: db::model::SnapshotState::Creating, @@ -325,6 +573,12 @@ async fn ssc_send_snapshot_request( .await .map_err(ActionError::action_failed)?; + if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { + return Err(ActionError::action_failed( + "disk changed during snapshot create".to_string(), + )); + } + match disk.runtime().attach_instance_id { Some(instance_id) => { info!(log, "disk {} instance is {}", disk.id(), instance_id); @@ -426,6 +680,12 @@ async fn ssc_start_running_snapshot( .await .map_err(ActionError::action_failed)?; + if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { + return Err(ActionError::action_failed( + "disk changed during snapshot create".to_string(), + )); + } + // For each dataset and region that makes up the disk, create a map from the // region information to the new running snapshot information. let datasets_and_regions = osagactx @@ -526,6 +786,12 @@ async fn ssc_create_volume_record( .await .map_err(ActionError::action_failed)?; + if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { + return Err(ActionError::action_failed( + "disk changed during snapshot create".to_string(), + )); + } + let disk_volume = osagactx .datastore() .volume_get(disk.volume_id) @@ -628,6 +894,8 @@ async fn ssc_finalize_snapshot_record( Ok(snapshot) } +// helper functions + /// Create a Snapshot VolumeConstructionRequest by copying a disk's /// VolumeConstructionRequest and modifying it accordingly. fn create_snapshot_from_disk( diff --git a/nexus/src/db/datastore/mod.rs b/nexus/src/db/datastore/mod.rs index 4810ea7723a..a048cfab72c 100644 --- a/nexus/src/db/datastore/mod.rs +++ b/nexus/src/db/datastore/mod.rs @@ -235,6 +235,7 @@ mod test { use crate::db::identity::Asset; use crate::db::identity::Resource; use crate::db::lookup::LookupPath; + use crate::db::model::BlockSize; use crate::db::model::Dataset; use crate::db::model::ExternalIp; use crate::db::model::Rack; @@ -514,10 +515,16 @@ mod test { ByteCount::from_mebibytes_u32(500), ); let volume1_id = Uuid::new_v4(); + // Currently, we only allocate one Region Set per volume. let expected_region_count = REGION_REDUNDANCY_THRESHOLD; let dataset_and_regions = datastore - .region_allocate(&opctx, volume1_id, ¶ms) + .region_allocate( + &opctx, + volume1_id, + ¶ms.disk_source, + params.size, + ) .await .unwrap(); @@ -528,8 +535,11 @@ mod test { assert!(disk1_datasets.insert(dataset.id())); assert_eq!(volume1_id, region.volume_id()); assert_eq!(ByteCount::from(4096), region.block_size()); - assert_eq!(params.extent_size() / 4096, region.blocks_per_extent()); - assert_eq!(params.extent_count(), region.extent_count()); + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::AdvancedFormat, + params.size, + ); + assert_eq!(extent_count, region.extent_count()); } // Allocate regions for a second disk. Observe that we allocate from @@ -540,17 +550,26 @@ mod test { ); let volume2_id = Uuid::new_v4(); let dataset_and_regions = datastore - .region_allocate(&opctx, volume2_id, ¶ms) + .region_allocate( + &opctx, + volume2_id, + ¶ms.disk_source, + params.size, + ) .await .unwrap(); + assert_eq!(expected_region_count, dataset_and_regions.len()); let mut disk2_datasets = HashSet::new(); for (dataset, region) in dataset_and_regions { assert!(disk2_datasets.insert(dataset.id())); assert_eq!(volume2_id, region.volume_id()); assert_eq!(ByteCount::from(4096), region.block_size()); - assert_eq!(params.extent_size() / 4096, region.blocks_per_extent()); - assert_eq!(params.extent_count(), region.extent_count()); + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::AdvancedFormat, + params.size, + ); + assert_eq!(extent_count, region.extent_count()); } // Double-check that the datasets used for the first disk weren't @@ -597,11 +616,21 @@ mod test { ); let volume_id = Uuid::new_v4(); let mut dataset_and_regions1 = datastore - .region_allocate(&opctx, volume_id, ¶ms) + .region_allocate( + &opctx, + volume_id, + ¶ms.disk_source, + params.size, + ) .await .unwrap(); let mut dataset_and_regions2 = datastore - .region_allocate(&opctx, volume_id, ¶ms) + .region_allocate( + &opctx, + volume_id, + ¶ms.disk_source, + params.size, + ) .await .unwrap(); @@ -664,7 +693,12 @@ mod test { ); let volume1_id = Uuid::new_v4(); let err = datastore - .region_allocate(&opctx, volume1_id, ¶ms) + .region_allocate( + &opctx, + volume1_id, + ¶ms.disk_source, + params.size, + ) .await .unwrap_err(); @@ -718,7 +752,12 @@ mod test { let volume1_id = Uuid::new_v4(); assert!(datastore - .region_allocate(&opctx, volume1_id, ¶ms) + .region_allocate( + &opctx, + volume1_id, + ¶ms.disk_source, + params.size + ) .await .is_err()); diff --git a/nexus/src/db/datastore/region.rs b/nexus/src/db/datastore/region.rs index 4fe417873ee..658c8df2333 100644 --- a/nexus/src/db/datastore/region.rs +++ b/nexus/src/db/datastore/region.rs @@ -18,6 +18,7 @@ use crate::external_api::params; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; +use omicron_common::api::external; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use uuid::Uuid; @@ -52,12 +53,12 @@ impl DataStore { .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) } - async fn get_block_size_from_disk_create( + async fn get_block_size_from_disk_source( &self, opctx: &OpContext, - disk_create: ¶ms::DiskCreate, + disk_source: ¶ms::DiskSource, ) -> Result { - match &disk_create.disk_source { + match &disk_source { params::DiskSource::Blank { block_size } => { Ok(db::model::BlockSize::try_from(*block_size) .map_err(|e| Error::invalid_request(&e.to_string()))?) @@ -91,6 +92,28 @@ impl DataStore { } } + // TODO for now, extent size is fixed at 64 MiB. In the future, this may be + // tunable at runtime. + pub const EXTENT_SIZE: i64 = 64_i64 << 20; + + /// Given a block size and total disk size, get Crucible allocation values + pub fn get_crucible_allocation( + block_size: &db::model::BlockSize, + size: external::ByteCount, + ) -> (i64, i64) { + let blocks_per_extent = + Self::EXTENT_SIZE / block_size.to_bytes() as i64; + + let size = size.to_bytes() as i64; + + // allocate enough extents to fit all the disk blocks, rounding up. + let extent_count = size / Self::EXTENT_SIZE + + ((size % Self::EXTENT_SIZE) + Self::EXTENT_SIZE - 1) + / Self::EXTENT_SIZE; + + (blocks_per_extent, extent_count) + } + /// Idempotently allocates enough regions to back a disk. /// /// Returns the allocated regions, as well as the datasets to which they @@ -99,7 +122,8 @@ impl DataStore { &self, opctx: &OpContext, volume_id: Uuid, - params: ¶ms::DiskCreate, + disk_source: ¶ms::DiskSource, + size: external::ByteCount, ) -> Result, Error> { // ALLOCATION POLICY // @@ -120,18 +144,17 @@ impl DataStore { // - Sled placement of datasets // - What sort of loads we'd like to create (even split across all disks // may not be preferable, especially if maintenance is expected) - let params: params::DiskCreate = params.clone(); - let block_size = - self.get_block_size_from_disk_create(opctx, ¶ms).await?; - let blocks_per_extent = - params.extent_size() / block_size.to_bytes() as i64; + + let block_size = self.get_block_size_from_disk_source(opctx, &disk_source).await?; + let (blocks_per_extent, extent_count) = + Self::get_crucible_allocation(&block_size, size); let dataset_and_regions: Vec<(Dataset, Region)> = crate::db::queries::region_allocation::RegionAllocate::new( volume_id, block_size.into(), blocks_per_extent, - params.extent_count(), + extent_count, ) .get_results_async(self.pool()) .await @@ -255,3 +278,78 @@ impl DataStore { } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::db::model::BlockSize; + use omicron_common::api::external::ByteCount; + + #[test] + fn test_extent_count() { + // Zero sized disks should get zero extents + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(0u64).unwrap(), + ); + assert_eq!(0, extent_count); + + // Test 1 byte disk + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(1u64).unwrap(), + ); + assert_eq!(1, extent_count); + + // Test 1 less than the (current) maximum extent size + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(DataStore::EXTENT_SIZE - 1).unwrap(), + ); + assert_eq!(1, extent_count); + + // Test at than the (current) maximum extent size + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(DataStore::EXTENT_SIZE).unwrap(), + ); + assert_eq!(1, extent_count); + + // Test at 1 byte more than the (current) maximum extent size + let (_, extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(DataStore::EXTENT_SIZE + 1).unwrap(), + ); + assert_eq!(2, extent_count); + + // Mostly just checking we don't blow up on an unwrap here. + let (_, _extent_count) = DataStore::get_crucible_allocation( + &BlockSize::Traditional, + ByteCount::try_from(i64::MAX).unwrap(), + ); + + // Note that i64::MAX bytes is an invalid disk size as it's not + // divisible by 4096. Create the maximum sized disk here. + let max_disk_size = i64::MAX + - (i64::MAX % (BlockSize::AdvancedFormat.to_bytes() as i64)); + let (blocks_per_extent, extent_count) = + DataStore::get_crucible_allocation( + &BlockSize::AdvancedFormat, + ByteCount::try_from(max_disk_size).unwrap(), + ); + + // We should still be rounding up to the nearest extent size. + assert_eq!( + extent_count as u128 * DataStore::EXTENT_SIZE as u128, + i64::MAX as u128 + 1, + ); + + // Assert that the regions allocated will fit this disk + assert!( + max_disk_size as u128 + <= extent_count as u128 + * blocks_per_extent as u128 + * DataStore::EXTENT_SIZE as u128 + ); + } +} diff --git a/nexus/tests/integration_tests/endpoints.rs b/nexus/tests/integration_tests/endpoints.rs index 2de8fb1e4f3..b1aebe01599 100644 --- a/nexus/tests/integration_tests/endpoints.rs +++ b/nexus/tests/integration_tests/endpoints.rs @@ -181,7 +181,8 @@ lazy_static! { block_size: params::BlockSize::try_from(4096).unwrap(), }, size: ByteCount::from_gibibytes_u32( - DiskTest::DEFAULT_ZPOOL_SIZE_GIB + // divide by two to leave space for snapshot blocks + DiskTest::DEFAULT_ZPOOL_SIZE_GIB / 2 ), }; pub static ref DEMO_DISK_METRICS_URL: String = diff --git a/nexus/tests/integration_tests/snapshots.rs b/nexus/tests/integration_tests/snapshots.rs index 04ec154b1c1..023f2fc3a0a 100644 --- a/nexus/tests/integration_tests/snapshots.rs +++ b/nexus/tests/integration_tests/snapshots.rs @@ -421,6 +421,7 @@ async fn test_reject_creating_disk_from_snapshot( project_id, disk_id: Uuid::new_v4(), volume_id: Uuid::new_v4(), + destination_volume_id: None, gen: db::model::Generation::new(), state: db::model::SnapshotState::Creating, @@ -577,6 +578,7 @@ async fn test_reject_creating_disk_from_illegal_snapshot( project_id, disk_id: Uuid::new_v4(), volume_id: Uuid::new_v4(), + destination_volume_id: None, gen: db::model::Generation::new(), state: db::model::SnapshotState::Creating, @@ -634,6 +636,64 @@ async fn test_reject_creating_disk_from_illegal_snapshot( ); } +#[nexus_test] +async fn test_cannot_snapshot_if_no_space(cptestctx: &ControlPlaneTestContext) { + // Test that snapshots cannot be created if there is no space for the blocks + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + create_ip_pool(&client, "p0", None, None).await; + create_org_and_project(client).await; + let disks_url = get_disks_url(); + + // Create a disk at just over half the capacity of what DiskTest allocates + let gibibytes: u64 = DiskTest::DEFAULT_ZPOOL_SIZE_GIB as u64 / 2 + 1; + let disk_size = + ByteCount::try_from(gibibytes * 1024 * 1024 * 1024).unwrap(); + let base_disk_name: Name = "base-disk".parse().unwrap(); + let base_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: base_disk_name.clone(), + description: String::from("sells rainsticks"), + }, + disk_source: params::DiskSource::Blank { + block_size: params::BlockSize::try_from(512).unwrap(), + }, + size: disk_size, + }; + + NexusRequest::new( + RequestBuilder::new(client, Method::POST, &disks_url) + .body(Some(&base_disk)) + .expect_status(Some(StatusCode::CREATED)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("unexpected error creating disk"); + + // Issue snapshot request, expect it to fail + let snapshots_url = format!( + "/organizations/{}/projects/{}/snapshots", + ORG_NAME, PROJECT_NAME + ); + + NexusRequest::new( + RequestBuilder::new(client, Method::POST, &snapshots_url) + .body(Some(¶ms::SnapshotCreate { + identity: IdentityMetadataCreateParams { + name: "not-attached".parse().unwrap(), + description: "not attached to instance".into(), + }, + disk: base_disk_name, + })) + .expect_status(Some(StatusCode::SERVICE_UNAVAILABLE)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("unexpected success creating snapshot"); +} + // Test that the code that Saga nodes call is idempotent #[nexus_test] @@ -662,6 +722,7 @@ async fn test_create_snapshot_record_idempotent( project_id, disk_id: Uuid::new_v4(), volume_id: Uuid::new_v4(), + destination_volume_id: None, gen: db::model::Generation::new(), state: db::model::SnapshotState::Creating, diff --git a/nexus/tests/integration_tests/volume_management.rs b/nexus/tests/integration_tests/volume_management.rs index 54d41c19e28..ca97fed15c5 100644 --- a/nexus/tests/integration_tests/volume_management.rs +++ b/nexus/tests/integration_tests/volume_management.rs @@ -371,7 +371,7 @@ async fn test_multiple_snapshots(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::try_from(1u64 * 1024 * 1024 * 1024).unwrap(); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -926,7 +926,7 @@ async fn prepare_for_test_multiple_layers_of_snapshots( let disks_url = get_disks_url(); // Create a blank disk - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::try_from(1u64 * 1024 * 1024 * 1024).unwrap(); let layer_1_disk_name: Name = "layer-1-disk".parse().unwrap(); let layer_1_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { diff --git a/nexus/types/src/external_api/params.rs b/nexus/types/src/external_api/params.rs index 7ff37c83ba0..b5045da5d93 100644 --- a/nexus/types/src/external_api/params.rs +++ b/nexus/types/src/external_api/params.rs @@ -727,21 +727,6 @@ pub struct DiskCreate { pub size: ByteCount, } -const EXTENT_SIZE: u32 = 64_u32 << 20; // 64 MiB - -impl DiskCreate { - pub fn extent_size(&self) -> i64 { - EXTENT_SIZE as i64 - } - - pub fn extent_count(&self) -> i64 { - let extent_size = EXTENT_SIZE as i64; - let size = self.size.to_bytes() as i64; - size / extent_size - + ((size % extent_size) + extent_size - 1) / extent_size - } -} - /// Parameters for the [`Disk`](omicron_common::api::external::Disk) to be /// attached or detached to an instance #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] @@ -869,71 +854,3 @@ pub struct ResourceMetrics { /// An exclusive end time of metrics. pub end_time: DateTime, } - -#[cfg(test)] -mod test { - use super::*; - use std::convert::TryFrom; - - const BLOCK_SIZE: u32 = 4096; - - fn new_disk_create_params(size: ByteCount) -> DiskCreate { - DiskCreate { - identity: IdentityMetadataCreateParams { - name: Name::try_from("myobject".to_string()).unwrap(), - description: "desc".to_string(), - }, - disk_source: DiskSource::Blank { - block_size: BlockSize(BLOCK_SIZE), - }, - size, - } - } - - #[test] - fn test_extent_count() { - let params = new_disk_create_params(ByteCount::try_from(0u64).unwrap()); - assert_eq!(0, params.extent_count()); - - let params = new_disk_create_params(ByteCount::try_from(1u64).unwrap()); - assert_eq!(1, params.extent_count()); - let params = new_disk_create_params( - ByteCount::try_from(EXTENT_SIZE - 1).unwrap(), - ); - assert_eq!(1, params.extent_count()); - let params = - new_disk_create_params(ByteCount::try_from(EXTENT_SIZE).unwrap()); - assert_eq!(1, params.extent_count()); - - let params = new_disk_create_params( - ByteCount::try_from(EXTENT_SIZE + 1).unwrap(), - ); - assert_eq!(2, params.extent_count()); - - // Mostly just checking we don't blow up on an unwrap here. - let _params = - new_disk_create_params(ByteCount::try_from(i64::MAX).unwrap()); - - // Note that i64::MAX bytes is an invalid disk size as it's not - // divisible by 4096. - let max_disk_size = i64::MAX - (i64::MAX % (BLOCK_SIZE as i64)); - let params = - new_disk_create_params(ByteCount::try_from(max_disk_size).unwrap()); - let blocks_per_extent: u64 = - params.extent_size() as u64 / BLOCK_SIZE as u64; - - // We should still be rounding up to the nearest extent size. - assert_eq!( - params.extent_count() as u128 * EXTENT_SIZE as u128, - i64::MAX as u128 + 1, - ); - - // Assert that the regions allocated will fit this disk - assert!( - params.size.to_bytes() as u64 - <= (params.extent_count() as u64) - * blocks_per_extent - * BLOCK_SIZE as u64 - ); - } -} From 110623ebb13516c022c395b4385d0133fe5e790d Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Wed, 28 Sep 2022 16:06:17 -0400 Subject: [PATCH 2/7] fmt --- nexus/src/db/datastore/region.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nexus/src/db/datastore/region.rs b/nexus/src/db/datastore/region.rs index 658c8df2333..5e1f85911a4 100644 --- a/nexus/src/db/datastore/region.rs +++ b/nexus/src/db/datastore/region.rs @@ -145,7 +145,8 @@ impl DataStore { // - What sort of loads we'd like to create (even split across all disks // may not be preferable, especially if maintenance is expected) - let block_size = self.get_block_size_from_disk_source(opctx, &disk_source).await?; + let block_size = + self.get_block_size_from_disk_source(opctx, &disk_source).await?; let (blocks_per_extent, extent_count) = Self::get_crucible_allocation(&block_size, size); From d2d3aefba9a46ef07d4d0174369af8324d15818e Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 3 Oct 2022 11:21:58 -0400 Subject: [PATCH 3/7] use ByteCount::from_gibibytes_u32 instead of ByteCount::try_from --- nexus/tests/integration_tests/disks.rs | 26 +++++++------------ nexus/tests/integration_tests/snapshots.rs | 8 +++--- .../integration_tests/volume_management.rs | 16 ++++++------ 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 26f0043e3ab..954cd028a3c 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -837,7 +837,7 @@ async fn test_disk_backed_by_multiple_region_sets( create_org_and_project(client).await; // Ask for a 20 gibibyte disk. - let disk_size = ByteCount::try_from(20u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(20); let disks_url = get_disks_url(); let new_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -873,7 +873,7 @@ async fn test_disk_too_big(cptestctx: &ControlPlaneTestContext) { assert_eq!(10, DiskTest::DEFAULT_ZPOOL_SIZE_GIB); // Ask for a 300 gibibyte disk (but only 10 is available) - let disk_size = ByteCount::try_from(300u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(300); let disks_url = get_disks_url(); let new_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -926,7 +926,7 @@ async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { } // Ask for a 7 gibibyte disk, this should succeed - let disk_size = ByteCount::try_from(7u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(7); let disks_url = get_disks_url(); let disk_one = params::DiskCreate { @@ -959,16 +959,14 @@ async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { .regions_total_occupied_size(dataset.id) .await .unwrap(), - ByteCount::try_from(7u64 * 1024 * 1024 * 1024) - .unwrap() - .to_bytes(), + ByteCount::from_gibibytes_u32(7).to_bytes(), ); } } // Ask for a 4 gibibyte disk, this should fail because there isn't space // available. - let disk_size = ByteCount::try_from(4u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(4); let disk_two = params::DiskCreate { identity: IdentityMetadataCreateParams { name: "disk-two".parse().unwrap(), @@ -998,9 +996,7 @@ async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { .regions_total_occupied_size(dataset.id) .await .unwrap(), - ByteCount::try_from(7u64 * 1024 * 1024 * 1024) - .unwrap() - .to_bytes(), + ByteCount::from_gibibytes_u32(7).to_bytes(), ); } } @@ -1030,7 +1026,7 @@ async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { } // Ask for a 10 gibibyte disk. - let disk_size = ByteCount::try_from(10u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(10); let disk_three = params::DiskCreate { identity: IdentityMetadataCreateParams { name: "disk-three".parse().unwrap(), @@ -1060,9 +1056,7 @@ async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { .regions_total_occupied_size(dataset.id) .await .unwrap(), - ByteCount::try_from(10u64 * 1024 * 1024 * 1024) - .unwrap() - .to_bytes(), + ByteCount::from_gibibytes_u32(10).to_bytes(), ); } } @@ -1088,7 +1082,7 @@ async fn test_multiple_disks_multiple_zpools( create_org_and_project(client).await; // Ask for a 10 gibibyte disk, this should succeed - let disk_size = ByteCount::try_from(10u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(10); let disks_url = get_disks_url(); let disk_one = params::DiskCreate { @@ -1113,7 +1107,7 @@ async fn test_multiple_disks_multiple_zpools( .unwrap(); // Ask for another 10 gibibyte disk - let disk_size = ByteCount::try_from(10u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(10); let disk_two = params::DiskCreate { identity: IdentityMetadataCreateParams { name: "disk-two".parse().unwrap(), diff --git a/nexus/tests/integration_tests/snapshots.rs b/nexus/tests/integration_tests/snapshots.rs index 023f2fc3a0a..7231d1a1c92 100644 --- a/nexus/tests/integration_tests/snapshots.rs +++ b/nexus/tests/integration_tests/snapshots.rs @@ -104,7 +104,7 @@ async fn test_snapshot(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -235,7 +235,7 @@ async fn test_snapshot_without_instance(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -292,7 +292,7 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { let disks_url = get_disks_url(); // Create a blank disk - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -340,7 +340,7 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { assert_eq!(snapshot.size, base_disk.size); // Create a disk from this snapshot - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let snap_disk_name: Name = "snap-disk".parse().unwrap(); let snap_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { diff --git a/nexus/tests/integration_tests/volume_management.rs b/nexus/tests/integration_tests/volume_management.rs index ca97fed15c5..1943ec3e6c8 100644 --- a/nexus/tests/integration_tests/volume_management.rs +++ b/nexus/tests/integration_tests/volume_management.rs @@ -107,7 +107,7 @@ async fn test_snapshot_then_delete_disk(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -239,7 +239,7 @@ async fn test_delete_snapshot_then_disk(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -371,7 +371,7 @@ async fn test_multiple_snapshots(cptestctx: &ControlPlaneTestContext) { .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(1u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(1); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -504,7 +504,7 @@ async fn test_snapshot_prevents_other_disk( .unwrap(); // Create a disk from this image - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let base_disk_name: Name = "base-disk".parse().unwrap(); let base_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -567,7 +567,7 @@ async fn test_snapshot_prevents_other_disk( // Attempt disk allocation, which will fail - the presense of the snapshot // means the region wasn't deleted. - let disk_size = ByteCount::try_from(10u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(10); let next_disk_name: Name = "next-disk".parse().unwrap(); let next_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -638,7 +638,7 @@ async fn test_multiple_disks_multiple_snapshots_order_1( let disks_url = get_disks_url(); // Create a blank disk - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let first_disk_name: Name = "first-disk".parse().unwrap(); let first_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -785,7 +785,7 @@ async fn test_multiple_disks_multiple_snapshots_order_2( let disks_url = get_disks_url(); // Create a blank disk - let disk_size = ByteCount::try_from(2u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(2); let first_disk_name: Name = "first-disk".parse().unwrap(); let first_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { @@ -926,7 +926,7 @@ async fn prepare_for_test_multiple_layers_of_snapshots( let disks_url = get_disks_url(); // Create a blank disk - let disk_size = ByteCount::try_from(1u64 * 1024 * 1024 * 1024).unwrap(); + let disk_size = ByteCount::from_gibibytes_u32(1); let layer_1_disk_name: Name = "layer-1-disk".parse().unwrap(); let layer_1_disk = params::DiskCreate { identity: IdentityMetadataCreateParams { From a554ef53270694ec666e19e24a6c7224f6f78922 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 3 Oct 2022 11:47:23 -0400 Subject: [PATCH 4/7] move functions that interact with the Crucible agent to common_storage.rs --- nexus/src/app/sagas/common_storage.rs | 262 +++++++++++++++++++++++++ nexus/src/app/sagas/disk_create.rs | 4 +- nexus/src/app/sagas/mod.rs | 129 +----------- nexus/src/app/sagas/snapshot_create.rs | 4 +- nexus/src/app/sagas/volume_delete.rs | 136 +------------ 5 files changed, 270 insertions(+), 265 deletions(-) create mode 100644 nexus/src/app/sagas/common_storage.rs diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs new file mode 100644 index 00000000000..2a692f545e8 --- /dev/null +++ b/nexus/src/app/sagas/common_storage.rs @@ -0,0 +1,262 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Functions common to interacting with the Crucible agent in saga actions + +use super::*; + +use crate::db; +use crate::db::identity::Asset; +use anyhow::anyhow; +use crucible_agent_client::{ + types::{CreateRegion, RegionId, State as RegionState}, + Client as CrucibleAgentClient, +}; +use futures::StreamExt; +use omicron_common::api::external::Error; +use omicron_common::backoff::{self, BackoffError}; +use slog::Logger; + +// Arbitrary limit on concurrency, for operations issued on multiple regions +// within a disk at the same time. +const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; + +/// Call out to Crucible agent and perform region creation. +pub async fn ensure_region_in_dataset( + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, +) -> Result { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + let region_request = CreateRegion { + block_size: region.block_size().to_bytes(), + extent_count: region.extent_count().try_into().unwrap(), + extent_size: region.blocks_per_extent().try_into().unwrap(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + encrypted: region.encrypted(), + cert_pem: None, + key_pem: None, + root_pem: None, + }; + + let create_region = || async { + let region = client + .region_create(®ion_request) + .await + .map_err(|e| BackoffError::Permanent(e.into()))?; + match region.state { + RegionState::Requested => Err(BackoffError::transient(anyhow!( + "Region creation in progress" + ))), + RegionState::Created => Ok(region), + _ => Err(BackoffError::Permanent(anyhow!( + "Failed to create region, unexpected state: {:?}", + region.state + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", delay + ); + }; + + let region = backoff::retry_notify( + backoff::internal_service_policy(), + create_region, + log_create_failure, + ) + .await + .map_err(|e| Error::internal_error(&e.to_string()))?; + + Ok(region.into_inner()) +} + +pub async fn ensure_all_datasets_and_regions( + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Result< + Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, + ActionError, +> { + let request_count = datasets_and_regions.len(); + + // Allocate regions, and additionally return the dataset that the region was + // allocated in. + let datasets_and_regions: Vec<( + db::model::Dataset, + crucible_agent_client::types::Region, + )> = futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + match ensure_region_in_dataset(log, &dataset, ®ion).await { + Ok(result) => Ok((dataset, result)), + Err(e) => Err(e), + } + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::, + >>() + .await + .into_iter() + .collect::, + Error, + >>() + .map_err(ActionError::action_failed)?; + + // Assert each region has the same block size, otherwise Volume creation + // will fail. + let all_region_have_same_block_size = datasets_and_regions + .windows(2) + .all(|w| w[0].1.block_size == w[1].1.block_size); + + if !all_region_have_same_block_size { + return Err(ActionError::action_failed(Error::internal_error( + "volume creation will fail due to block size mismatch", + ))); + } + + Ok(datasets_and_regions) +} + +// Given a list of datasets and regions, send DELETE calls to the datasets +// corresponding Crucible Agent for each region. +pub(super) async fn delete_crucible_regions( + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Result<(), Error> { + let request_count = datasets_and_regions.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + let id = RegionId(region.id().to_string()); + client.region_delete(&id).await.map_err(|e| match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::SERVICE_UNAVAILABLE => { + Error::unavail(&rv.message) + } + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `delete_crucible_regions`", + ), + }) + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) +} + +// Given a list of datasets and region snapshots, send DELETE calls to the +// datasets corresponding Crucible Agent for each running read-only downstairs +// and snapshot. +pub(super) async fn delete_crucible_snapshots( + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, +) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + // delete running snapshot + client + .region_delete_running_snapshot( + &RegionId(region_snapshot.region_id.to_string()), + ®ion_snapshot.snapshot_id.to_string(), + ) + .await + .map_err(|e| match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::SERVICE_UNAVAILABLE => { + Error::unavail(&rv.message) + } + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `region_delete_running_snapshot`", + ), + })?; + + // delete snapshot + client + .region_delete_snapshot( + &RegionId(region_snapshot.region_id.to_string()), + ®ion_snapshot.snapshot_id.to_string(), + ) + .await + .map_err(|e| match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::SERVICE_UNAVAILABLE => { + Error::unavail(&rv.message) + } + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `region_delete_snapshot`", + ), + })?; + + Ok(()) + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) +} diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index a03f81a241c..9bbbb658277 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -3,8 +3,8 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{ - ensure_all_datasets_and_regions, ActionRegistry, NexusActionContext, - NexusSaga, SagaInitError, ACTION_GENERATE_ID, + common_storage::ensure_all_datasets_and_regions, ActionRegistry, + NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; use crate::app::sagas::NexusAction; use crate::context::OpContext; diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 80dd1b056ad..b73cf62c677 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -9,19 +9,8 @@ // correctness, idempotence, etc. The more constrained this interface is, the // easier it will be to test, version, and update in deployed systems. -use crate::db; -use crate::db::identity::Asset; use crate::saga_interface::SagaContext; -use anyhow::anyhow; -use crucible_agent_client::{ - types::{CreateRegion, RegionId, State as RegionState}, - Client as CrucibleAgentClient, -}; -use futures::StreamExt; use lazy_static::lazy_static; -use omicron_common::api::external::Error; -use omicron_common::backoff::{self, BackoffError}; -use slog::Logger; use std::sync::Arc; use steno::new_action_noop_undo; use steno::ActionContext; @@ -37,6 +26,8 @@ pub mod instance_migrate; pub mod snapshot_create; pub mod volume_delete; +pub mod common_storage; + #[derive(Debug)] pub struct NexusSagaType; impl steno::SagaType for NexusSagaType { @@ -121,119 +112,3 @@ pub(super) async fn saga_generate_uuid( ) -> Result { Ok(Uuid::new_v4()) } - -// Arbitrary limit on concurrency, for operations issued on multiple regions -// within a disk at the same time. -const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; - -/// Call out to Crucible agent and perform region creation. -pub async fn ensure_region_in_dataset( - log: &Logger, - dataset: &db::model::Dataset, - region: &db::model::Region, -) -> Result { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - let region_request = CreateRegion { - block_size: region.block_size().to_bytes(), - extent_count: region.extent_count().try_into().unwrap(), - extent_size: region.blocks_per_extent().try_into().unwrap(), - // TODO: Can we avoid casting from UUID to string? - // NOTE: This'll require updating the crucible agent client. - id: RegionId(region.id().to_string()), - encrypted: region.encrypted(), - cert_pem: None, - key_pem: None, - root_pem: None, - }; - - let create_region = || async { - let region = client - .region_create(®ion_request) - .await - .map_err(|e| BackoffError::Permanent(e.into()))?; - match region.state { - RegionState::Requested => Err(BackoffError::transient(anyhow!( - "Region creation in progress" - ))), - RegionState::Created => Ok(region), - _ => Err(BackoffError::Permanent(anyhow!( - "Failed to create region, unexpected state: {:?}", - region.state - ))), - } - }; - - let log_create_failure = |_, delay| { - warn!( - log, - "Region requested, not yet created. Retrying in {:?}", delay - ); - }; - - let region = backoff::retry_notify( - backoff::internal_service_policy(), - create_region, - log_create_failure, - ) - .await - .map_err(|e| Error::internal_error(&e.to_string()))?; - - Ok(region.into_inner()) -} - -pub async fn ensure_all_datasets_and_regions( - log: &Logger, - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result< - Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, - ActionError, -> { - let request_count = datasets_and_regions.len(); - - // Allocate regions, and additionally return the dataset that the region was - // allocated in. - let datasets_and_regions: Vec<( - db::model::Dataset, - crucible_agent_client::types::Region, - )> = futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - match ensure_region_in_dataset(log, &dataset, ®ion).await { - Ok(result) => Ok((dataset, result)), - Err(e) => Err(e), - } - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::, - >>() - .await - .into_iter() - .collect::, - Error, - >>() - .map_err(ActionError::action_failed)?; - - // Assert each region has the same block size, otherwise Volume creation - // will fail. - let all_region_have_same_block_size = datasets_and_regions - .windows(2) - .all(|w| w[0].1.block_size == w[1].1.block_size); - - if !all_region_have_same_block_size { - return Err(ActionError::action_failed(Error::internal_error( - "volume creation will fail due to block size mismatch", - ))); - } - - Ok(datasets_and_regions) -} diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index b79c971037e..64af12674bd 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -82,8 +82,8 @@ //! use super::{ - ensure_all_datasets_and_regions, ActionRegistry, NexusActionContext, - NexusSaga, SagaInitError, ACTION_GENERATE_ID, + common_storage::ensure_all_datasets_and_regions, ActionRegistry, + NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; use crate::app::sagas::NexusAction; use crate::context::OpContext; diff --git a/nexus/src/app/sagas/volume_delete.rs b/nexus/src/app/sagas/volume_delete.rs index 74efcc5eb3b..cb29972885f 100644 --- a/nexus/src/app/sagas/volume_delete.rs +++ b/nexus/src/app/sagas/volume_delete.rs @@ -23,18 +23,15 @@ //! resources, and when they are inserted or deleted the accounting needs to //! change. Saga nodes must be idempotent in order to work correctly. +use super::common_storage::delete_crucible_regions; +use super::common_storage::delete_crucible_snapshots; use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; -use super::MAX_CONCURRENT_REGION_REQUESTS; use crate::app::sagas::NexusAction; -use crate::db; use crate::db::datastore::CrucibleResources; -use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; -use futures::StreamExt; use lazy_static::lazy_static; use nexus_types::identity::Asset; -use omicron_common::api::external::Error; use serde::Deserialize; use serde::Serialize; use std::sync::Arc; @@ -330,132 +327,3 @@ async fn svd_hard_delete_volume_record( Ok(()) } - -// helper functions - -// Given a list of datasets and regions, send DELETE calls to the datasets -// corresponding Crucible Agent for each region. -pub(super) async fn delete_crucible_regions( - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result<(), Error> { - let request_count = datasets_and_regions.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - let id = RegionId(region.id().to_string()); - client.region_delete(&id).await.map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) - } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `delete_crucible_regions`", - ), - }) - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} - -// Given a list of datasets and region snapshots, send DELETE calls to the -// datasets corresponding Crucible Agent for each running read-only downstairs -// and snapshot. -pub(super) async fn delete_crucible_snapshots( - datasets_and_snapshots: Vec<( - db::model::Dataset, - db::model::RegionSnapshot, - )>, -) -> Result<(), Error> { - let request_count = datasets_and_snapshots.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_snapshots) - .map(|(dataset, region_snapshot)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - // delete running snapshot - client - .region_delete_running_snapshot( - &RegionId(region_snapshot.region_id.to_string()), - ®ion_snapshot.snapshot_id.to_string(), - ) - .await - .map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) - } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_running_snapshot`", - ), - })?; - - // delete snapshot - client - .region_delete_snapshot( - &RegionId(region_snapshot.region_id.to_string()), - ®ion_snapshot.snapshot_id.to_string(), - ) - .await - .map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) - } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_snapshot`", - ), - })?; - - Ok(()) - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} From 89d30cb504490c797faba6a511b909b1746f82dd Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 3 Oct 2022 12:09:33 -0400 Subject: [PATCH 5/7] operate on disk id, not disk name, in snapshot create saga --- nexus/src/app/disk.rs | 18 ++++++++++++------ nexus/src/app/sagas/snapshot_create.rs | 20 +++++++------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index 5d7e8b3c0f8..f83341bf280 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -400,11 +400,10 @@ impl super::Nexus { project_name: &Name, params: ¶ms::SnapshotCreate, ) -> CreateResult { - let (authz_silo, authz_org) = - LookupPath::new(opctx, &self.db_datastore) - .organization_name(organization_name) - .lookup_for(authz::Action::ListChildren) - .await?; + let (authz_silo, _) = LookupPath::new(opctx, &self.db_datastore) + .organization_name(organization_name) + .lookup_for(authz::Action::ListChildren) + .await?; let (.., authz_project) = LookupPath::new(opctx, &self.db_datastore) .organization_name(organization_name) @@ -412,11 +411,18 @@ impl super::Nexus { .lookup_for(authz::Action::ListChildren) .await?; + let (.., authz_disk) = LookupPath::new(opctx, &self.db_datastore) + .organization_name(organization_name) + .project_name(project_name) + .disk_name(&db::model::Name(params.disk.clone())) + .lookup_for(authz::Action::Read) + .await?; + let saga_params = sagas::snapshot_create::Params { serialized_authn: authn::saga::Serialized::for_opctx(opctx), silo_id: authz_silo.id(), - organization_id: authz_org.id(), project_id: authz_project.id(), + disk_id: authz_disk.id(), create_params: params.clone(), }; diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 64af12674bd..e8293e42cd1 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -119,8 +119,8 @@ use uuid::Uuid; pub struct Params { pub serialized_authn: authn::saga::Serialized, pub silo_id: Uuid, - pub organization_id: Uuid, pub project_id: Uuid, + pub disk_id: Uuid, pub create_params: params::SnapshotCreate, } @@ -289,8 +289,7 @@ async fn ssc_save_disk_gen( let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; @@ -320,8 +319,7 @@ async fn ssc_alloc_regions( let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; @@ -481,8 +479,7 @@ async fn ssc_create_snapshot_record( info!(log, "grabbing disk by name {}", params.create_params.disk); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; @@ -567,8 +564,7 @@ async fn ssc_send_snapshot_request( // Find if this disk is attached to an instance let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; @@ -674,8 +670,7 @@ async fn ssc_start_running_snapshot( let snapshot_id = sagactx.lookup::("snapshot_id")?; let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; @@ -780,8 +775,7 @@ async fn ssc_create_volume_record( let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .project_id(params.project_id) - .disk_name(¶ms.create_params.disk.clone().into()) + .disk_id(params.disk_id) .fetch() .await .map_err(ActionError::action_failed)?; From 4ebe2979f36ed8706b474fe7c631eb721a2188c4 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 3 Oct 2022 12:32:23 -0400 Subject: [PATCH 6/7] if a disk is modifed during the saga, the saga will fail anyway - do not need to check --- nexus/src/app/sagas/snapshot_create.rs | 61 -------------------------- 1 file changed, 61 deletions(-) diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index e8293e42cd1..289aec63a86 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -89,7 +89,6 @@ use crate::app::sagas::NexusAction; use crate::context::OpContext; use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; -use crate::db::model::Generation; use crate::external_api::params; use crate::{authn, authz, db}; use anyhow::anyhow; @@ -127,10 +126,6 @@ pub struct Params { // snapshot create saga: actions lazy_static! { - static ref SAVE_DISK_GEN_TO_CONTEXT: NexusAction = new_action_noop_undo( - "snapshot-create.save-disk-gen-to-ctx", - ssc_save_disk_gen, - ); static ref REGIONS_ALLOC: NexusAction = new_action_noop_undo( "snapshot-create.regions-alloc", ssc_alloc_regions, @@ -178,7 +173,6 @@ impl NexusSaga for SagaSnapshotCreate { type Params = Params; fn register_actions(registry: &mut ActionRegistry) { - registry.register(Arc::clone(&*SAVE_DISK_GEN_TO_CONTEXT)); registry.register(Arc::clone(&*REGIONS_ALLOC)); registry.register(Arc::clone(&*REGIONS_ENSURE)); registry.register(Arc::clone(&*CREATE_DESTINATION_VOLUME_RECORD)); @@ -212,14 +206,6 @@ impl NexusSaga for SagaSnapshotCreate { ACTION_GENERATE_ID.as_ref(), )); - // At the beginning of this saga, save disk generation number. Compare - // later to see if the disk changed mid-saga. - builder.append(Node::action( - "disk_gen", - "SaveDiskGenToCtx", - SAVE_DISK_GEN_TO_CONTEXT.as_ref(), - )); - // Allocate region space for snapshot to store blocks post-scrub builder.append(Node::action( "datasets_and_regions", @@ -280,23 +266,6 @@ impl NexusSaga for SagaSnapshotCreate { // snapshot create saga: action implementations -async fn ssc_save_disk_gen( - sagactx: NexusActionContext, -) -> Result { - let osagactx = sagactx.user_data(); - let params = sagactx.saga_params::()?; - - let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); - - let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) - .disk_id(params.disk_id) - .fetch() - .await - .map_err(ActionError::action_failed)?; - - Ok(disk.runtime_state.gen) -} - async fn ssc_alloc_regions( sagactx: NexusActionContext, ) -> Result, ActionError> { @@ -324,12 +293,6 @@ async fn ssc_alloc_regions( .await .map_err(ActionError::action_failed)?; - if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { - return Err(ActionError::action_failed( - "disk changed during snapshot create".to_string(), - )); - } - let datasets_and_regions = osagactx .datastore() .region_allocate( @@ -484,12 +447,6 @@ async fn ssc_create_snapshot_record( .await .map_err(ActionError::action_failed)?; - if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { - return Err(ActionError::action_failed( - "disk changed during snapshot create".to_string(), - )); - } - info!(log, "creating snapshot {} from disk {}", snapshot_id, disk.id()); let snapshot = db::model::Snapshot { @@ -569,12 +526,6 @@ async fn ssc_send_snapshot_request( .await .map_err(ActionError::action_failed)?; - if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { - return Err(ActionError::action_failed( - "disk changed during snapshot create".to_string(), - )); - } - match disk.runtime().attach_instance_id { Some(instance_id) => { info!(log, "disk {} instance is {}", disk.id(), instance_id); @@ -675,12 +626,6 @@ async fn ssc_start_running_snapshot( .await .map_err(ActionError::action_failed)?; - if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { - return Err(ActionError::action_failed( - "disk changed during snapshot create".to_string(), - )); - } - // For each dataset and region that makes up the disk, create a map from the // region information to the new running snapshot information. let datasets_and_regions = osagactx @@ -780,12 +725,6 @@ async fn ssc_create_volume_record( .await .map_err(ActionError::action_failed)?; - if disk.runtime_state.gen != sagactx.lookup::("disk_gen")? { - return Err(ActionError::action_failed( - "disk changed during snapshot create".to_string(), - )); - } - let disk_volume = osagactx .datastore() .volume_get(disk.volume_id) From 6ed3354779d96d580b20a68c57678f9c510da885 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 3 Oct 2022 23:07:08 -0400 Subject: [PATCH 7/7] one LookupPath to avoid grabbing stale entries --- nexus/src/app/disk.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index f83341bf280..d8c0f60556b 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -400,23 +400,18 @@ impl super::Nexus { project_name: &Name, params: ¶ms::SnapshotCreate, ) -> CreateResult { - let (authz_silo, _) = LookupPath::new(opctx, &self.db_datastore) - .organization_name(organization_name) - .lookup_for(authz::Action::ListChildren) - .await?; - - let (.., authz_project) = LookupPath::new(opctx, &self.db_datastore) - .organization_name(organization_name) - .project_name(project_name) - .lookup_for(authz::Action::ListChildren) - .await?; + let authz_silo: authz::Silo; + let _authz_org: authz::Organization; + let authz_project: authz::Project; + let authz_disk: authz::Disk; - let (.., authz_disk) = LookupPath::new(opctx, &self.db_datastore) - .organization_name(organization_name) - .project_name(project_name) - .disk_name(&db::model::Name(params.disk.clone())) - .lookup_for(authz::Action::Read) - .await?; + (authz_silo, _authz_org, authz_project, authz_disk) = + LookupPath::new(opctx, &self.db_datastore) + .organization_name(organization_name) + .project_name(project_name) + .disk_name(&db::model::Name(params.disk.clone())) + .lookup_for(authz::Action::Read) + .await?; let saga_params = sagas::snapshot_create::Params { serialized_authn: authn::saga::Serialized::for_opctx(opctx),