diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index e5afca7f6a9..aaec6c7384e 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -581,7 +581,7 @@ fn randomize_volume_construction_request_ids( } #[cfg(test)] -mod test { +pub(crate) mod test { use crate::{ app::saga::create_saga_dag, app::sagas::disk_create::Params, app::sagas::disk_create::SagaDiskCreate, authn::saga::Serialized, @@ -677,6 +677,20 @@ mod test { .is_none() } + async fn no_volume_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::Volume; + use crate::db::schema::volume::dsl; + + dsl::volume + .filter(dsl::time_deleted.is_null()) + .select(Volume::as_select()) + .first_async::(datastore.pool_for_tests().await.unwrap()) + .await + .optional() + .unwrap() + .is_none() + } + async fn no_region_allocations_exist( datastore: &DataStore, test: &DiskTest, @@ -712,6 +726,19 @@ mod test { true } + pub(crate) async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + test: &DiskTest, + ) { + let sled_agent = &cptestctx.sled_agent.sled_agent; + let datastore = cptestctx.server.apictx.nexus.datastore(); + + assert!(no_disk_records_exist(datastore).await); + assert!(no_volume_records_exist(datastore).await); + assert!(no_region_allocations_exist(datastore, &test).await); + assert!(no_regions_ensured(&sled_agent, &test).await); + } + #[nexus_test(server = crate::Server)] async fn test_action_failure_can_unwind( cptestctx: &ControlPlaneTestContext, @@ -753,15 +780,8 @@ mod test { .await .expect_err("Saga should have failed"); - let datastore = nexus.datastore(); - // Check that no partial artifacts of disk creation exist: - assert!(no_disk_records_exist(datastore).await); - assert!(no_region_allocations_exist(datastore, &test).await); - assert!( - no_regions_ensured(&cptestctx.sled_agent.sled_agent, &test) - .await - ); + verify_clean_slate(&cptestctx, &test).await; } } diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 5dc1dface18..68783ccf33d 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -82,8 +82,11 @@ //! use super::{ - common_storage::ensure_all_datasets_and_regions, ActionRegistry, - NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID, + common_storage::{ + delete_crucible_regions, ensure_all_datasets_and_regions, + }, + ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, + ACTION_GENERATE_ID, }; use crate::app::sagas::declare_saga_actions; use crate::context::OpContext; @@ -125,9 +128,11 @@ declare_saga_actions! { snapshot_create; REGIONS_ALLOC -> "datasets_and_regions" { + ssc_alloc_regions + - ssc_alloc_regions_undo } REGIONS_ENSURE -> "regions_ensure" { + ssc_regions_ensure + - ssc_regions_ensure_undo } CREATE_DESTINATION_VOLUME_RECORD -> "created_destination_volume" { + ssc_create_destination_volume_record @@ -139,9 +144,11 @@ declare_saga_actions! { } SEND_SNAPSHOT_REQUEST -> "snapshot_request" { + ssc_send_snapshot_request + - ssc_send_snapshot_request_undo } START_RUNNING_SNAPSHOT -> "replace_sockets_map" { + ssc_start_running_snapshot + - ssc_start_running_snapshot_undo } CREATE_VOLUME_RECORD -> "created_volume" { + ssc_create_volume_record @@ -187,17 +194,27 @@ impl NexusSaga for SagaSnapshotCreate { ACTION_GENERATE_ID.as_ref(), )); - // Allocate region space for snapshot to store blocks post-scrub + // (DB) Allocate region space for snapshot to store blocks post-scrub builder.append(regions_alloc_action()); + // (Sleds) Reaches out to each dataset, and ensures the regions exist + // for the destination volume builder.append(regions_ensure_action()); + // (DB) Creates a record of the destination volume in the DB builder.append(create_destination_volume_record_action()); + // (DB) Creates a record of the snapshot, referencing both the + // original disk ID and the destination volume builder.append(create_snapshot_record_action()); + // (Sleds) Sends a request for the disk to create a ZFS snapshot builder.append(send_snapshot_request_action()); - // Validate with crucible agent and start snapshot downstairs + // (Sleds + DB) Start snapshot downstairs, add an entry in the DB for + // the dataset's snapshot. + // + // TODO: Should this be two separate saga steps? builder.append(start_running_snapshot_action()); - // Copy and modify the disk volume construction request to point to the new - // running snapshot + // (DB) Copy and modify the disk volume construction request to point + // to the new running snapshot builder.append(create_volume_record_action()); + // (DB) Mark snapshot as "ready" builder.append(finalize_snapshot_record_action()); Ok(builder.build()?) @@ -252,6 +269,23 @@ async fn ssc_alloc_regions( Ok(datasets_and_regions) } +async fn ssc_alloc_regions_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + let region_ids = sagactx + .lookup::>( + "datasets_and_regions", + )? + .into_iter() + .map(|(_, region)| region.id()) + .collect::>(); + + osagactx.datastore().regions_hard_delete(region_ids).await?; + Ok(()) +} + async fn ssc_regions_ensure( sagactx: NexusActionContext, ) -> Result { @@ -332,6 +366,21 @@ async fn ssc_regions_ensure( Ok(volume_data) } +async fn ssc_regions_ensure_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + warn!(log, "ssc_regions_ensure_undo: Deleting crucible regions"); + delete_crucible_regions( + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await?; + info!(log, "ssc_regions_ensure_undo: Deleted crucible regions"); + Ok(()) +} + async fn ssc_create_destination_volume_record( sagactx: NexusActionContext, ) -> Result { @@ -498,6 +547,7 @@ async fn ssc_send_snapshot_request( .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; + Ok(()) } None => { @@ -526,14 +576,20 @@ async fn ssc_send_snapshot_request( })?; // Send the snapshot request to a random sled agent - let sled_agent_client = osagactx - .random_sled_client() + let sled_id = osagactx + .nexus() + .random_sled_id() .await .map_err(ActionError::action_failed)? .ok_or_else(|| { "no sled found when looking for random sled?!".to_string() }) .map_err(ActionError::action_failed)?; + let sled_agent_client = osagactx + .nexus() + .sled_client(&sled_id) + .await + .map_err(ActionError::action_failed)?; sled_agent_client .issue_disk_snapshot_request( @@ -547,9 +603,43 @@ async fn ssc_send_snapshot_request( .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; + + Ok(()) } } +} + +async fn ssc_send_snapshot_request_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + let snapshot_id = sagactx.lookup::("snapshot_id")?; + info!(log, "Undoing snapshot request for {snapshot_id}"); + // Lookup the regions used by the source disk... + let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) + .disk_id(params.disk_id) + .fetch() + .await?; + let datasets_and_regions = + osagactx.datastore().get_allocated_regions(disk.volume_id).await?; + + // ... and instruct each of those regions to delete the snapshot. + for (dataset, region) in datasets_and_regions { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + client + .region_delete_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await?; + } Ok(()) } @@ -562,6 +652,7 @@ async fn ssc_start_running_snapshot( let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let snapshot_id = sagactx.lookup::("snapshot_id")?; + info!(log, "starting running snapshot for {snapshot_id}"); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) .disk_id(params.disk_id) @@ -649,6 +740,60 @@ async fn ssc_start_running_snapshot( Ok(map) } +async fn ssc_start_running_snapshot_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + let snapshot_id = sagactx.lookup::("snapshot_id")?; + info!(log, "Undoing snapshot start running request for {snapshot_id}"); + + // Lookup the regions used by the source disk... + let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) + .disk_id(params.disk_id) + .fetch() + .await?; + let datasets_and_regions = + osagactx.datastore().get_allocated_regions(disk.volume_id).await?; + + // ... and instruct each of those regions to delete the running snapshot. + for (dataset, region) in datasets_and_regions { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + use crucible_agent_client::Error::ErrorResponse; + use http::status::StatusCode; + + client + .region_delete_running_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + .map(|_| ()) + // NOTE: If we later create a volume record and delete it, the + // running snapshot may be deleted (see: + // ssc_create_volume_record_undo). + // + // To cope, we treat "running snapshot not found" as "Ok", since it + // may just be the result of the volume deletion steps completing. + .or_else(|err| match err { + ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => { + Ok(()) + } + _ => Err(err), + })?; + osagactx + .datastore() + .region_snapshot_remove(dataset.id(), region.id(), snapshot_id) + .await?; + } + Ok(()) +} + async fn ssc_create_volume_record( sagactx: NexusActionContext, ) -> Result { @@ -866,7 +1011,22 @@ fn create_snapshot_from_disk( mod test { use super::*; + use crate::app::saga::create_saga_dag; + use crate::db::DataStore; + use async_bb8_diesel::{AsyncRunQueryDsl, OptionalExtension}; + use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; + use dropshot::test_util::ClientTestContext; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_ip_pool; + use nexus_test_utils::resource_helpers::create_organization; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::delete_disk; + use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils_macros::nexus_test; + use omicron_common::api::external::IdentityMetadataCreateParams; + use omicron_common::api::external::Name; use sled_agent_client::types::CrucibleOpts; + use std::str::FromStr; #[test] fn test_create_snapshot_from_disk_modify_request() { @@ -1062,4 +1222,236 @@ mod test { assert!(snapshot_first_opts.control.is_none()); assert!(disk_first_opts.control.is_some()); } + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const ORG_NAME: &str = "test-org"; + const PROJECT_NAME: &str = "springfield-squidport"; + const DISK_NAME: &str = "disky-mcdiskface"; + + async fn create_org_project_and_disk(client: &ClientTestContext) -> Uuid { + create_ip_pool(&client, "p0", None).await; + create_organization(&client, ORG_NAME).await; + create_project(client, ORG_NAME, PROJECT_NAME).await; + create_disk(client, ORG_NAME, PROJECT_NAME, DISK_NAME).await.identity.id + } + + // Helper for creating snapshot create parameters + fn new_test_params( + opctx: &OpContext, + silo_id: Uuid, + project_id: Uuid, + disk_id: Uuid, + disk: Name, + ) -> Params { + Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + silo_id, + project_id, + disk_id, + create_params: params::SnapshotCreate { + identity: IdentityMetadataCreateParams { + name: "my-snapshot".parse().expect("Invalid disk name"), + description: "My snapshot".to_string(), + }, + disk, + }, + } + } + + pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.apictx.nexus.datastore().clone(), + ) + } + + #[nexus_test(server = crate::Server)] + async fn test_saga_basic_usage_succeeds( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let disk_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(cptestctx); + + let (authz_silo, _authz_org, authz_project, _authz_disk) = + LookupPath::new(&opctx, nexus.datastore()) + .disk_id(disk_id) + .lookup_for(authz::Action::Read) + .await + .expect("Failed to look up created disk"); + + let silo_id = authz_silo.id(); + let project_id = authz_project.id(); + + let params = new_test_params( + &opctx, + silo_id, + project_id, + disk_id, + Name::from_str(DISK_NAME).unwrap().into(), + ); + let dag = create_saga_dag::(params).unwrap(); + let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); + + // Actually run the saga + let output = nexus.run_saga(runnable_saga).await.unwrap(); + + let snapshot = output + .lookup_node_output::( + "finalized_snapshot", + ) + .unwrap(); + assert_eq!(snapshot.project_id, project_id); + } + + async fn no_snapshot_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::Snapshot; + use crate::db::schema::snapshot::dsl; + + dsl::snapshot + .filter(dsl::time_deleted.is_null()) + .select(Snapshot::as_select()) + .first_async::(datastore.pool_for_tests().await.unwrap()) + .await + .optional() + .unwrap() + .is_none() + } + + async fn no_region_snapshot_records_exist(datastore: &DataStore) -> bool { + use crate::db::model::RegionSnapshot; + use crate::db::schema::region_snapshot::dsl; + + dsl::region_snapshot + .select(RegionSnapshot::as_select()) + .first_async::( + datastore.pool_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() + } + + async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + test: &DiskTest, + ) { + // Verifies: + // - No disk records exist + // - No volume records exist + // - No region allocations exist + // - No regions are ensured in the sled agent + crate::app::sagas::disk_create::test::verify_clean_slate( + cptestctx, test, + ) + .await; + + // Verifies: + // - No snapshot records exist + // - No region snapshot records exist + let datastore = cptestctx.server.apictx.nexus.datastore(); + assert!(no_snapshot_records_exist(datastore).await); + assert!(no_region_snapshot_records_exist(datastore).await); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + let test = DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let mut disk_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + let (authz_silo, _authz_org, authz_project, _authz_disk) = + LookupPath::new(&opctx, nexus.datastore()) + .disk_id(disk_id) + .lookup_for(authz::Action::Read) + .await + .expect("Failed to look up created disk"); + + let silo_id = authz_silo.id(); + let project_id = authz_project.id(); + + let params = new_test_params( + &opctx, + silo_id, + project_id, + disk_id, + Name::from_str(DISK_NAME).unwrap().into(), + ); + let mut dag = create_saga_dag::(params).unwrap(); + + // The saga's input parameters include a disk UUID, which makes sense, + // since the snapshot is created from a disk. + // + // Unfortunately, for our idempotency checks, checking for a "clean + // slate" gets more expensive when we need to compare region allocations + // between the disk and the snapshot. If we can undo the snapshot + // provisioning AND delete the disk together, these checks are much + // simpler to write. + // + // So, in summary: We do some odd indexing here... + // ... because we re-create the whole DAG on each iteration... + // ... because we also delete the disk on each iteration, making the + // parameters invalid... + // ... because doing so provides a really easy-to-verify "clean slate" + // for us to test against. + let mut n: usize = 0; + while let Some(node) = dag.get_nodes().nth(n) { + n = n + 1; + + // Create a new saga for this node. + info!( + log, + "Creating new saga which will fail at index {:?}", node.index(); + "node_name" => node.name().as_ref(), + "label" => node.label(), + ); + + let runnable_saga = + nexus.create_runnable_saga(dag.clone()).await.unwrap(); + + // Inject an error instead of running the node. + // + // This should cause the saga to unwind. + nexus + .sec() + .saga_inject_error(runnable_saga.id(), node.index()) + .await + .unwrap(); + nexus + .run_saga(runnable_saga) + .await + .expect_err("Saga should have failed"); + + delete_disk(client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + verify_clean_slate(cptestctx, &test).await; + disk_id = create_disk(client, ORG_NAME, PROJECT_NAME, DISK_NAME) + .await + .identity + .id; + + let params = new_test_params( + &opctx, + silo_id, + project_id, + disk_id, + Name::from_str(DISK_NAME).unwrap().into(), + ); + dag = create_saga_dag::(params).unwrap(); + } + } } diff --git a/nexus/src/saga_interface.rs b/nexus/src/saga_interface.rs index b6a15f9a77e..97255cc1082 100644 --- a/nexus/src/saga_interface.rs +++ b/nexus/src/saga_interface.rs @@ -59,13 +59,4 @@ impl SagaContext { ) -> Result, Error> { self.nexus.sled_client(sled_id).await } - - pub async fn random_sled_client( - &self, - ) -> Result>, Error> { - Ok(match self.nexus.random_sled_id().await? { - Some(sled_id) => Some(self.nexus.sled_client(&sled_id).await?), - None => None, - }) - } } diff --git a/nexus/test-utils/src/resource_helpers.rs b/nexus/test-utils/src/resource_helpers.rs index ea9591c0109..5efc14c2ec7 100644 --- a/nexus/test-utils/src/resource_helpers.rs +++ b/nexus/test-utils/src/resource_helpers.rs @@ -68,6 +68,14 @@ where .unwrap() } +pub async fn object_delete(client: &ClientTestContext, path: &str) { + NexusRequest::object_delete(client, path) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect(&format!("failed to make \"delete\" request to {path}")); +} + pub async fn populate_ip_pool( client: &ClientTestContext, pool_name: &str, @@ -217,6 +225,19 @@ pub async fn create_disk( .await } +pub async fn delete_disk( + client: &ClientTestContext, + organization_name: &str, + project_name: &str, + disk_name: &str, +) { + let url = format!( + "/organizations/{}/projects/{}/disks/{}", + organization_name, project_name, disk_name + ); + object_delete(client, &url).await +} + /// Creates an instance with a default NIC and no disks. /// /// Wrapper around [`create_instance_with`].