diff --git a/Cargo.lock b/Cargo.lock index ba9d91c7dfa..f4f357edeb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,9 +88,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anyhow" -version = "1.0.66" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" dependencies = [ "backtrace", ] @@ -5896,9 +5896,8 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "steno" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2d803a4bde6e855daf47c2f78c5e1eb3cabf5b416eda9b83666ee9f0a83f199" +version = "0.3.1-dev" +source = "git+http://github.com/oxidecomputer/steno?rev=9cf16057516f97f3eaff198e46d78995352b6bf6#9cf16057516f97f3eaff198e46d78995352b6bf6" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 17281b877f1..b1f9910b793 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,7 +206,8 @@ sp-sim = { path = "sp-sim" } sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } -steno = "0.3" +# TODO: Use versions once more when a new release drops +steno = { git = "http://github.com/oxidecomputer/steno", rev = "9cf16057516f97f3eaff198e46d78995352b6bf6" } strum = { version = "0.24", features = [ "derive" ] } subprocess = "0.2.9" syn = { version = "1.0" } diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index 7fd6289a993..643b71f8984 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -353,7 +353,7 @@ async fn sic_create_network_interface( match interface_params { params::InstanceNetworkInterfaceAttachment::None => Ok(()), params::InstanceNetworkInterfaceAttachment::Default => { - sic_create_default_primary_network_interface( + create_default_primary_network_interface( &sagactx, &saga_params, nic_index, @@ -367,7 +367,7 @@ async fn sic_create_network_interface( ) => match create_params.get(nic_index) { None => Ok(()), Some(ref prs) => { - sic_create_custom_network_interface( + create_custom_network_interface( &sagactx, &saga_params, instance_id, @@ -432,7 +432,7 @@ async fn sic_create_network_interface_undo( } /// Create one custom (non-default) network interface for the provided instance. -async fn sic_create_custom_network_interface( +async fn create_custom_network_interface( sagactx: &NexusActionContext, saga_params: &Params, instance_id: Uuid, @@ -487,13 +487,22 @@ async fn sic_create_custom_network_interface( interface, ) .await - .map_err(|e| ActionError::action_failed(e.into_external()))?; + .map(|_| ()) + .or_else(|err| { + match err { + // Necessary for idempotency + InsertNicError::InterfaceAlreadyExists(_) => Ok(()), + _ => Err(err), + } + }) + .map_err(InsertNicError::into_external) + .map_err(ActionError::action_failed)?; Ok(()) } /// Create a default primary network interface for an instance during the create /// saga. -async fn sic_create_default_primary_network_interface( +async fn create_default_primary_network_interface( sagactx: &NexusActionContext, saga_params: &Params, nic_index: usize, @@ -569,6 +578,14 @@ async fn sic_create_default_primary_network_interface( interface, ) .await + .map(|_| ()) + .or_else(|err| { + match err { + // Necessary for idempotency + InsertNicError::InterfaceAlreadyExists(_) => Ok(()), + _ => Err(err), + } + }) .map_err(InsertNicError::into_external) .map_err(ActionError::action_failed)?; Ok(()) @@ -609,10 +626,7 @@ async fn sic_allocate_instance_snat_ip_undo( let opctx = OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn); let ip_id = sagactx.lookup::("snat_ip_id")?; - datastore - .deallocate_external_ip(&opctx, ip_id) - .await - .map_err(ActionError::action_failed)?; + datastore.deallocate_external_ip(&opctx, ip_id).await?; Ok(()) } @@ -666,10 +680,7 @@ async fn sic_allocate_instance_external_ip_undo( let opctx = OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn); let ip_id = repeat_saga_params.new_id; - datastore - .deallocate_external_ip(&opctx, ip_id) - .await - .map_err(ActionError::action_failed)?; + datastore.deallocate_external_ip(&opctx, ip_id).await?; Ok(()) } @@ -833,12 +844,23 @@ async fn sic_delete_instance_record( // failed, so update the state accordingly to allow deletion. // TODO-correctness TODO-security It's not correct to re-resolve the // instance name now. See oxidecomputer/omicron#1536. - let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + let result = LookupPath::new(&opctx, &datastore) .project_id(params.project_id) .instance_name(&instance_name) .fetch() - .await - .map_err(ActionError::action_failed)?; + .await; + // Although, as mentioned in the comment above, we should not be doing the + // lookup by name here, we do want this operation to be idempotent. + // + // As such, if the instance has already been deleted, we should return with + // a no-op. + let (authz_instance, db_instance) = match result { + Ok((.., authz_instance, db_instance)) => (authz_instance, db_instance), + Err(err) => match err { + Error::ObjectNotFound { .. } => return Ok(()), + _ => return Err(err.into()), + }, + }; let runtime_state = db::model::InstanceRuntimeState { state: db::model::InstanceState::new(InstanceState::Failed), @@ -852,10 +874,8 @@ async fn sic_delete_instance_record( ..db_instance.runtime_state }; - let updated = datastore - .instance_update_runtime(&instance_id, &runtime_state) - .await - .map_err(ActionError::action_failed)?; + let updated = + datastore.instance_update_runtime(&instance_id, &runtime_state).await?; if !updated { warn!( @@ -865,10 +885,7 @@ async fn sic_delete_instance_record( } // Actually delete the record. - datastore - .project_delete_instance(&opctx, &authz_instance) - .await - .map_err(ActionError::action_failed)?; + datastore.project_delete_instance(&opctx, &authz_instance).await?; Ok(()) } @@ -963,11 +980,13 @@ mod test { ByteCount, IdentityMetadataCreateParams, InstanceCpuCount, }; use omicron_sled_agent::sim::SledAgent; + use std::num::NonZeroU32; use uuid::Uuid; type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; + const INSTANCE_NAME: &str = "my-instance"; const ORG_NAME: &str = "test-org"; const PROJECT_NAME: &str = "springfield-squidport"; const DISK_NAME: &str = "my-disk"; @@ -987,7 +1006,7 @@ mod test { project_id, create_params: params::InstanceCreate { identity: IdentityMetadataCreateParams { - name: "my-instance".parse().unwrap(), + name: INSTANCE_NAME.parse().unwrap(), description: "My instance".to_string(), }, ncpus: InstanceCpuCount::try_from(2).unwrap(), @@ -1004,7 +1023,7 @@ mod test { name: DISK_NAME.parse().unwrap(), }, )], - start: true, + start: false, }, } } @@ -1032,7 +1051,10 @@ mod test { let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); // Actually run the saga - nexus.run_saga(runnable_saga).await.unwrap(); + nexus + .run_saga(runnable_saga) + .await + .expect("Saga should have succeeded"); } async fn no_instance_records_exist(datastore: &DataStore) -> bool { @@ -1102,6 +1124,18 @@ mod test { && sled_agent.disk_count().await == 0 } + async fn verify_clean_clate(cptestctx: &ControlPlaneTestContext) { + let sled_agent = &cptestctx.sled_agent.sled_agent; + let datastore = cptestctx.server.apictx.nexus.datastore(); + + // Check that no partial artifacts of instance creation exist + assert!(no_instance_records_exist(datastore).await); + assert!(no_network_interface_records_exist(datastore).await); + assert!(no_external_ip_records_exist(datastore).await); + assert!(disk_is_detached(datastore).await); + assert!(no_instances_or_disks_on_sled(&sled_agent).await); + } + #[nexus_test(server = crate::Server)] async fn test_action_failure_can_unwind( cptestctx: &ControlPlaneTestContext, @@ -1144,17 +1178,136 @@ mod test { .await .expect_err("Saga should have failed"); - let datastore = nexus.datastore(); + verify_clean_clate(&cptestctx).await; + } + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; - // Check that no partial artifacts of instance creation exist - assert!(no_instance_records_exist(datastore).await); - assert!(no_network_interface_records_exist(datastore).await); - assert!(no_external_ip_records_exist(datastore).await); - assert!(disk_is_detached(datastore).await); - assert!( - no_instances_or_disks_on_sled(&cptestctx.sled_agent.sled_agent) - .await + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let project_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + let params = new_test_params(&opctx, project_id); + let dag = create_saga_dag::(params).unwrap(); + + // The "undo_node" should always be immediately preceding the + // "error_node". + for (undo_node, error_node) in + dag.get_nodes().zip(dag.get_nodes().skip(1)) + { + // Create a new saga for this node. + info!( + log, + "Creating new saga which will fail at index {:?}", error_node.index(); + "node_name" => error_node.name().as_ref(), + "label" => error_node.label(), ); + + let runnable_saga = + nexus.create_runnable_saga(dag.clone()).await.unwrap(); + + // Inject an error instead of running the node. + // + // This should cause the saga to unwind. + nexus + .sec() + .saga_inject_error(runnable_saga.id(), error_node.index()) + .await + .unwrap(); + + // Inject a repetition for the node being undone. + // + // This means it is executing twice while unwinding. + nexus + .sec() + .saga_inject_repeat( + runnable_saga.id(), + undo_node.index(), + steno::RepeatInjected { + action: NonZeroU32::new(1).unwrap(), + undo: NonZeroU32::new(2).unwrap(), + }, + ) + .await + .unwrap(); + + nexus + .run_saga(runnable_saga) + .await + .expect_err("Saga should have failed"); + + verify_clean_clate(&cptestctx).await; + } + } + + async fn destroy_instance(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.apictx.nexus; + let opctx = test_opctx(&cptestctx); + + let instance_selector = params::InstanceSelector::new( + Some(ORG_NAME.to_string().try_into().unwrap()), + Some(PROJECT_NAME.to_string().try_into().unwrap()), + INSTANCE_NAME.to_string().try_into().unwrap(), + ); + let instance_lookup = + nexus.instance_lookup(&opctx, &instance_selector).unwrap(); + nexus.project_destroy_instance(&opctx, &instance_lookup).await.unwrap(); + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let project_id = create_org_project_and_disk(&client).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + let params = new_test_params(&opctx, project_id); + let dag = create_saga_dag::(params).unwrap(); + + let runnable_saga = + nexus.create_runnable_saga(dag.clone()).await.unwrap(); + + // Cause all actions to run twice. The saga should succeed regardless! + for node in dag.get_nodes() { + nexus + .sec() + .saga_inject_repeat( + runnable_saga.id(), + node.index(), + steno::RepeatInjected { + action: NonZeroU32::new(2).unwrap(), + undo: NonZeroU32::new(1).unwrap(), + }, + ) + .await + .unwrap(); } + + // Verify that the saga's execution succeeded. + nexus + .run_saga(runnable_saga) + .await + .expect("Saga should have succeeded"); + + // Verify that if the instance is destroyed, no detritus remains. + // This is important to ensure that our original saga didn't + // double-allocate during repeated actions. + destroy_instance(&cptestctx).await; + verify_clean_clate(&cptestctx).await; } } diff --git a/nexus/src/db/datastore/instance.rs b/nexus/src/db/datastore/instance.rs index 46d7eff14c3..2b9cef3c299 100644 --- a/nexus/src/db/datastore/instance.rs +++ b/nexus/src/db/datastore/instance.rs @@ -80,7 +80,8 @@ impl DataStore { diesel::insert_into(dsl::instance) .values(instance) .on_conflict(dsl::id) - .do_nothing(), + .do_update() + .set(dsl::time_modified.eq(dsl::time_modified)), ) .insert_and_get_result_async(self.pool_authorized(opctx).await?) .await diff --git a/nexus/src/db/queries/network_interface.rs b/nexus/src/db/queries/network_interface.rs index e37cf78a5f4..ba42c3107e7 100644 --- a/nexus/src/db/queries/network_interface.rs +++ b/nexus/src/db/queries/network_interface.rs @@ -81,6 +81,11 @@ const NO_INSTANCE_ERROR_MESSAGE: &'static str = /// Errors related to inserting or attaching a NetworkInterface #[derive(Debug)] pub enum InsertError { + /// This interface already exists + /// + /// Note: An interface with the same name existing is a different error; + /// this error matches the case where the UUID already exists. + InterfaceAlreadyExists(String), /// The instance specified for this interface is already associated with a /// different VPC from this interface. InstanceSpansMultipleVpcs(Uuid), @@ -134,6 +139,12 @@ impl InsertError { /// Convert this error into an external one. pub fn into_external(self) -> external::Error { match self { + InsertError::InterfaceAlreadyExists(name) => { + external::Error::ObjectAlreadyExists { + type_name: external::ResourceType::NetworkInterface, + object_name: name, + } + } InsertError::NoAvailableIpAddresses => { external::Error::invalid_request( "No available IP addresses for interface", @@ -222,6 +233,10 @@ fn decode_database_error( const NAME_CONFLICT_CONSTRAINT: &str = "network_interface_instance_id_name_key"; + // The name of the constraint violated if we try to re-insert an already + // existing interface with the same UUID. + const ID_CONFLICT_CONSTRAINT: &str = "network_interface_pkey"; + // The check violated in the case where we try to insert more that the // maximum number of NICs (`MAX_NICS_PER_INSTANCE`). const NO_SLOTS_AVAILABLE_ERROR_MESSAGE: &str = concat!( @@ -320,7 +335,6 @@ fn decode_database_error( .unwrap_or_else(|| std::net::Ipv4Addr::UNSPECIFIED.into()); InsertError::IpAddressNotAvailable(ip) } - // Constraint violated if the user-requested name is already // assigned to an interface on this instance. Some(constraint) if constraint == NAME_CONFLICT_CONSTRAINT => { @@ -332,7 +346,13 @@ fn decode_database_error( ), )) } - + // Constraint violated if the user-requested UUID has already + // been inserted. + Some(constraint) if constraint == ID_CONFLICT_CONSTRAINT => { + InsertError::InterfaceAlreadyExists( + interface.identity.name.to_string(), + ) + } // Any other constraint violation is a bug _ => InsertError::External(error::public_error_from_diesel_pool( err, @@ -638,6 +658,7 @@ fn push_ensure_unique_vpc_expression<'a>( /// SELECT subnet_id /// FROM network_interface /// WHERE +/// id != AND /// instance_id = AND /// time_deleted IS NULL AND /// subnet_id = @@ -653,6 +674,7 @@ fn push_ensure_unique_vpc_expression<'a>( /// `'non-unique-subnets'`, which will fail casting to a UUID. fn push_ensure_unique_vpc_subnet_expression<'a>( mut out: AstPass<'_, 'a, Pg>, + interface_id: &'a Uuid, subnet_id: &'a Uuid, subnet_id_str: &'a String, instance_id: &'a Uuid, @@ -662,6 +684,10 @@ fn push_ensure_unique_vpc_subnet_expression<'a>( out.push_sql(" FROM "); NETWORK_INTERFACE_FROM_CLAUSE.walk_ast(out.reborrow())?; out.push_sql(" WHERE "); + out.push_identifier(dsl::id::NAME)?; + out.push_sql(" != "); + out.push_bind_param::(interface_id)?; + out.push_sql(" AND "); out.push_identifier(dsl::instance_id::NAME)?; out.push_sql(" = "); out.push_bind_param::(instance_id)?; @@ -694,6 +720,7 @@ fn push_ensure_unique_vpc_subnet_expression<'a>( #[allow(clippy::too_many_arguments)] fn push_instance_validation_cte<'a>( mut out: AstPass<'_, 'a, Pg>, + interface_id: &'a Uuid, vpc_id: &'a Uuid, vpc_id_str: &'a String, subnet_id: &'a Uuid, @@ -727,6 +754,7 @@ fn push_instance_validation_cte<'a>( out.push_sql(", "); push_ensure_unique_vpc_subnet_expression( out.reborrow(), + interface_id, subnet_id, subnet_id_str, instance_id, @@ -924,6 +952,7 @@ impl QueryFragment for InsertQuery { // - `is_primary` push_instance_validation_cte( out.reborrow(), + &self.interface.identity.id, &self.interface.vpc_id, &self.vpc_id_str, &self.interface.subnet.identity.id, @@ -2071,6 +2100,43 @@ mod tests { context.success().await; } + #[tokio::test] + async fn test_insert_same_interface_fails() { + let context = + TestContext::new("test_insert_same_interface_fails", 2).await; + let instance = + context.create_instance(external::InstanceState::Stopped).await; + let interface = IncompleteNetworkInterface::new( + Uuid::new_v4(), + instance.id(), + context.net1.vpc_id, + context.net1.subnets[0].clone(), + IdentityMetadataCreateParams { + name: "interface-c".parse().unwrap(), + description: String::from("description"), + }, + None, + ) + .unwrap(); + let _ = context + .db_datastore + .instance_create_network_interface_raw( + &context.opctx, + interface.clone(), + ) + .await + .expect("Failed to insert interface"); + let result = context + .db_datastore + .instance_create_network_interface_raw(&context.opctx, interface) + .await; + assert!( + matches!(result, Err(InsertError::InterfaceAlreadyExists(_))), + "Expected that interface would already exist", + ); + context.success().await; + } + #[tokio::test] async fn test_insert_multiple_vpcs_fails() { let context =