-
Notifications
You must be signed in to change notification settings - Fork 62
[nexus] Make instance creation actions/undo actions idempotent #2095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b2a2ba1
f399ff7
66cf0c0
828ed21
2e0a199
533ee76
6abce08
169ed10
29a3ab5
3b62a6a
05cbd9d
f7f5fbe
2f89973
b9fef96
d68c763
9a8504b
79271d2
23801dd
80932d8
bfc6048
f9c635a
8c01fcd
3f37a48
e1a5e5c
163d9c5
fedfbb8
ffdc483
839fac6
e4190e8
c4b16ce
9008dce
666bf64
af15ed0
ee2519e
bd2786c
c6e929d
1d68f03
344df7a
de1213e
ae4d65f
d59ef33
805ee99
61d9d5c
4e0ab41
0163d16
c8fcbf5
6f8ab62
92ec4a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -353,7 +353,7 @@ async fn sic_create_network_interface( | |
| match interface_params { | ||
| params::InstanceNetworkInterfaceAttachment::None => Ok(()), | ||
| params::InstanceNetworkInterfaceAttachment::Default => { | ||
| sic_create_default_primary_network_interface( | ||
| create_default_primary_network_interface( | ||
| &sagactx, | ||
| &saga_params, | ||
| nic_index, | ||
|
|
@@ -367,7 +367,7 @@ async fn sic_create_network_interface( | |
| ) => match create_params.get(nic_index) { | ||
| None => Ok(()), | ||
| Some(ref prs) => { | ||
| sic_create_custom_network_interface( | ||
| create_custom_network_interface( | ||
| &sagactx, | ||
| &saga_params, | ||
| instance_id, | ||
|
|
@@ -432,7 +432,7 @@ async fn sic_create_network_interface_undo( | |
| } | ||
|
|
||
| /// Create one custom (non-default) network interface for the provided instance. | ||
| async fn sic_create_custom_network_interface( | ||
| async fn create_custom_network_interface( | ||
| sagactx: &NexusActionContext, | ||
| saga_params: &Params, | ||
| instance_id: Uuid, | ||
|
|
@@ -487,13 +487,22 @@ async fn sic_create_custom_network_interface( | |
| interface, | ||
| ) | ||
| .await | ||
| .map_err(|e| ActionError::action_failed(e.into_external()))?; | ||
| .map(|_| ()) | ||
| .or_else(|err| { | ||
| match err { | ||
| // Necessary for idempotency | ||
| InsertNicError::InterfaceAlreadyExists(_) => Ok(()), | ||
| _ => Err(err), | ||
| } | ||
| }) | ||
| .map_err(InsertNicError::into_external) | ||
| .map_err(ActionError::action_failed)?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Create a default primary network interface for an instance during the create | ||
| /// saga. | ||
| async fn sic_create_default_primary_network_interface( | ||
| async fn create_default_primary_network_interface( | ||
| sagactx: &NexusActionContext, | ||
| saga_params: &Params, | ||
| nic_index: usize, | ||
|
|
@@ -569,6 +578,14 @@ async fn sic_create_default_primary_network_interface( | |
| interface, | ||
| ) | ||
| .await | ||
| .map(|_| ()) | ||
| .or_else(|err| { | ||
| match err { | ||
| // Necessary for idempotency | ||
| InsertNicError::InterfaceAlreadyExists(_) => Ok(()), | ||
| _ => Err(err), | ||
| } | ||
| }) | ||
| .map_err(InsertNicError::into_external) | ||
| .map_err(ActionError::action_failed)?; | ||
| Ok(()) | ||
|
|
@@ -609,10 +626,7 @@ async fn sic_allocate_instance_snat_ip_undo( | |
| let opctx = | ||
| OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn); | ||
| let ip_id = sagactx.lookup::<Uuid>("snat_ip_id")?; | ||
| datastore | ||
| .deallocate_external_ip(&opctx, ip_id) | ||
| .await | ||
| .map_err(ActionError::action_failed)?; | ||
| datastore.deallocate_external_ip(&opctx, ip_id).await?; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -666,10 +680,7 @@ async fn sic_allocate_instance_external_ip_undo( | |
| let opctx = | ||
| OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn); | ||
| let ip_id = repeat_saga_params.new_id; | ||
| datastore | ||
| .deallocate_external_ip(&opctx, ip_id) | ||
| .await | ||
| .map_err(ActionError::action_failed)?; | ||
| datastore.deallocate_external_ip(&opctx, ip_id).await?; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -833,12 +844,23 @@ async fn sic_delete_instance_record( | |
| // failed, so update the state accordingly to allow deletion. | ||
| // TODO-correctness TODO-security It's not correct to re-resolve the | ||
| // instance name now. See oxidecomputer/omicron#1536. | ||
| let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) | ||
| let result = LookupPath::new(&opctx, &datastore) | ||
| .project_id(params.project_id) | ||
| .instance_name(&instance_name) | ||
| .fetch() | ||
| .await | ||
| .map_err(ActionError::action_failed)?; | ||
| .await; | ||
| // Although, as mentioned in the comment above, we should not be doing the | ||
| // lookup by name here, we do want this operation to be idempotent. | ||
| // | ||
| // As such, if the instance has already been deleted, we should return with | ||
| // a no-op. | ||
| let (authz_instance, db_instance) = match result { | ||
| Ok((.., authz_instance, db_instance)) => (authz_instance, db_instance), | ||
| Err(err) => match err { | ||
| Error::ObjectNotFound { .. } => return Ok(()), | ||
| _ => return Err(err.into()), | ||
| }, | ||
| }; | ||
|
|
||
| let runtime_state = db::model::InstanceRuntimeState { | ||
| state: db::model::InstanceState::new(InstanceState::Failed), | ||
|
|
@@ -852,10 +874,8 @@ async fn sic_delete_instance_record( | |
| ..db_instance.runtime_state | ||
| }; | ||
|
|
||
| let updated = datastore | ||
| .instance_update_runtime(&instance_id, &runtime_state) | ||
| .await | ||
| .map_err(ActionError::action_failed)?; | ||
| let updated = | ||
| datastore.instance_update_runtime(&instance_id, &runtime_state).await?; | ||
|
|
||
| if !updated { | ||
| warn!( | ||
|
|
@@ -865,10 +885,7 @@ async fn sic_delete_instance_record( | |
| } | ||
|
|
||
| // Actually delete the record. | ||
| datastore | ||
| .project_delete_instance(&opctx, &authz_instance) | ||
| .await | ||
| .map_err(ActionError::action_failed)?; | ||
| datastore.project_delete_instance(&opctx, &authz_instance).await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
@@ -963,11 +980,13 @@ mod test { | |
| ByteCount, IdentityMetadataCreateParams, InstanceCpuCount, | ||
| }; | ||
| use omicron_sled_agent::sim::SledAgent; | ||
| use std::num::NonZeroU32; | ||
| use uuid::Uuid; | ||
|
|
||
| type ControlPlaneTestContext = | ||
| nexus_test_utils::ControlPlaneTestContext<crate::Server>; | ||
|
|
||
| const INSTANCE_NAME: &str = "my-instance"; | ||
| const ORG_NAME: &str = "test-org"; | ||
| const PROJECT_NAME: &str = "springfield-squidport"; | ||
| const DISK_NAME: &str = "my-disk"; | ||
|
|
@@ -987,7 +1006,7 @@ mod test { | |
| project_id, | ||
| create_params: params::InstanceCreate { | ||
| identity: IdentityMetadataCreateParams { | ||
| name: "my-instance".parse().unwrap(), | ||
| name: INSTANCE_NAME.parse().unwrap(), | ||
| description: "My instance".to_string(), | ||
| }, | ||
| ncpus: InstanceCpuCount::try_from(2).unwrap(), | ||
|
|
@@ -1004,7 +1023,7 @@ mod test { | |
| name: DISK_NAME.parse().unwrap(), | ||
| }, | ||
| )], | ||
| start: true, | ||
| start: false, | ||
| }, | ||
| } | ||
| } | ||
|
|
@@ -1032,7 +1051,10 @@ mod test { | |
| let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); | ||
|
|
||
| // Actually run the saga | ||
| nexus.run_saga(runnable_saga).await.unwrap(); | ||
| nexus | ||
| .run_saga(runnable_saga) | ||
| .await | ||
| .expect("Saga should have succeeded"); | ||
| } | ||
|
|
||
| async fn no_instance_records_exist(datastore: &DataStore) -> bool { | ||
|
|
@@ -1102,6 +1124,18 @@ mod test { | |
| && sled_agent.disk_count().await == 0 | ||
| } | ||
|
|
||
| async fn verify_clean_clate(cptestctx: &ControlPlaneTestContext) { | ||
| let sled_agent = &cptestctx.sled_agent.sled_agent; | ||
| let datastore = cptestctx.server.apictx.nexus.datastore(); | ||
|
|
||
| // Check that no partial artifacts of instance creation exist | ||
| assert!(no_instance_records_exist(datastore).await); | ||
| assert!(no_network_interface_records_exist(datastore).await); | ||
| assert!(no_external_ip_records_exist(datastore).await); | ||
| assert!(disk_is_detached(datastore).await); | ||
| assert!(no_instances_or_disks_on_sled(&sled_agent).await); | ||
| } | ||
|
|
||
| #[nexus_test(server = crate::Server)] | ||
| async fn test_action_failure_can_unwind( | ||
| cptestctx: &ControlPlaneTestContext, | ||
|
|
@@ -1144,17 +1178,136 @@ mod test { | |
| .await | ||
| .expect_err("Saga should have failed"); | ||
|
|
||
| let datastore = nexus.datastore(); | ||
| verify_clean_clate(&cptestctx).await; | ||
| } | ||
| } | ||
|
|
||
| #[nexus_test(server = crate::Server)] | ||
| async fn test_action_failure_can_unwind_idempotently( | ||
| cptestctx: &ControlPlaneTestContext, | ||
| ) { | ||
| DiskTest::new(cptestctx).await; | ||
| let log = &cptestctx.logctx.log; | ||
|
|
||
| // Check that no partial artifacts of instance creation exist | ||
| assert!(no_instance_records_exist(datastore).await); | ||
| assert!(no_network_interface_records_exist(datastore).await); | ||
| assert!(no_external_ip_records_exist(datastore).await); | ||
| assert!(disk_is_detached(datastore).await); | ||
| assert!( | ||
| no_instances_or_disks_on_sled(&cptestctx.sled_agent.sled_agent) | ||
| .await | ||
| let client = &cptestctx.external_client; | ||
| let nexus = &cptestctx.server.apictx.nexus; | ||
| let project_id = create_org_project_and_disk(&client).await; | ||
|
|
||
| // Build the saga DAG with the provided test parameters | ||
| let opctx = test_opctx(&cptestctx); | ||
|
|
||
| let params = new_test_params(&opctx, project_id); | ||
| let dag = create_saga_dag::<SagaInstanceCreate>(params).unwrap(); | ||
|
|
||
| // The "undo_node" should always be immediately preceding the | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While perfectly reasonable with our current implementation, it makes me a tad uncomfortable that we are expecting a steno internal Dag structure to look a certain way for this test. I'm not really sure the best way to handle this, except for perhaps first classing this type of injection in steno. I"m curious to hear @davepacheco's thoughts when he gets back, although I don't think we should hold up the PR for this.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We aren't expecting anything of the internal DAG structure, other than "it's ordered". "Which node is which" is entirely up to the test -- the test is making the call "whenever we fail, the node preceding the failure node should have an undo action called twice to test idempotency". We can arbitrarily change the saga DAG, and this test should be able to remain unchanged. All nodes we're iterating over are either actions, constants, or subsagas, as discussed in oxidecomputer/steno#67 .
What would this mean to you? I thought the APIs I added to steno to iterate over the dag, expose indices which could be targets for injection, and add the ability to inject repetitions alongside the already-existing "error injection" counted as first-class support?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yup, thinking about this more, you are 100% correct.
I was overthinking this. What you did makes sense. My initial thought was something unnecessarily complicated. We'd inject errors then iterate again, reading the annotated error nodes and getting the prior nodes affected by the errors. However, as you point out that's unnecessary, since we know based on the dag that the node immediately prior to the error node will be undone. Sorry for unnecessarily getting your hackles up.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No worries! Just want to make sure I wasn't missing a cleaner solution - probing at what's going on is always appreciated |
||
| // "error_node". | ||
| for (undo_node, error_node) in | ||
| dag.get_nodes().zip(dag.get_nodes().skip(1)) | ||
| { | ||
| // Create a new saga for this node. | ||
| info!( | ||
| log, | ||
| "Creating new saga which will fail at index {:?}", error_node.index(); | ||
| "node_name" => error_node.name().as_ref(), | ||
| "label" => error_node.label(), | ||
| ); | ||
|
|
||
| let runnable_saga = | ||
| nexus.create_runnable_saga(dag.clone()).await.unwrap(); | ||
|
|
||
| // Inject an error instead of running the node. | ||
| // | ||
| // This should cause the saga to unwind. | ||
| nexus | ||
| .sec() | ||
| .saga_inject_error(runnable_saga.id(), error_node.index()) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| // Inject a repetition for the node being undone. | ||
| // | ||
| // This means it is executing twice while unwinding. | ||
| nexus | ||
| .sec() | ||
| .saga_inject_repeat( | ||
| runnable_saga.id(), | ||
| undo_node.index(), | ||
| steno::RepeatInjected { | ||
| action: NonZeroU32::new(1).unwrap(), | ||
| undo: NonZeroU32::new(2).unwrap(), | ||
| }, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| nexus | ||
| .run_saga(runnable_saga) | ||
| .await | ||
| .expect_err("Saga should have failed"); | ||
|
|
||
| verify_clean_clate(&cptestctx).await; | ||
| } | ||
| } | ||
|
|
||
| async fn destroy_instance(cptestctx: &ControlPlaneTestContext) { | ||
| let nexus = &cptestctx.server.apictx.nexus; | ||
| let opctx = test_opctx(&cptestctx); | ||
|
|
||
| let instance_selector = params::InstanceSelector::new( | ||
| Some(ORG_NAME.to_string().try_into().unwrap()), | ||
| Some(PROJECT_NAME.to_string().try_into().unwrap()), | ||
| INSTANCE_NAME.to_string().try_into().unwrap(), | ||
| ); | ||
| let instance_lookup = | ||
| nexus.instance_lookup(&opctx, &instance_selector).unwrap(); | ||
| nexus.project_destroy_instance(&opctx, &instance_lookup).await.unwrap(); | ||
| } | ||
|
|
||
| #[nexus_test(server = crate::Server)] | ||
| async fn test_actions_succeed_idempotently( | ||
| cptestctx: &ControlPlaneTestContext, | ||
| ) { | ||
| DiskTest::new(cptestctx).await; | ||
|
|
||
| let client = &cptestctx.external_client; | ||
| let nexus = &cptestctx.server.apictx.nexus; | ||
| let project_id = create_org_project_and_disk(&client).await; | ||
|
|
||
| // Build the saga DAG with the provided test parameters | ||
| let opctx = test_opctx(&cptestctx); | ||
|
|
||
| let params = new_test_params(&opctx, project_id); | ||
| let dag = create_saga_dag::<SagaInstanceCreate>(params).unwrap(); | ||
|
|
||
| let runnable_saga = | ||
| nexus.create_runnable_saga(dag.clone()).await.unwrap(); | ||
|
|
||
| // Cause all actions to run twice. The saga should succeed regardless! | ||
| for node in dag.get_nodes() { | ||
| nexus | ||
| .sec() | ||
| .saga_inject_repeat( | ||
| runnable_saga.id(), | ||
| node.index(), | ||
| steno::RepeatInjected { | ||
| action: NonZeroU32::new(2).unwrap(), | ||
| undo: NonZeroU32::new(1).unwrap(), | ||
| }, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| } | ||
|
|
||
| // Verify that the saga's execution succeeded. | ||
| nexus | ||
| .run_saga(runnable_saga) | ||
| .await | ||
| .expect("Saga should have succeeded"); | ||
|
|
||
| // Verify that if the instance is destroyed, no detritus remains. | ||
| // This is important to ensure that our original saga didn't | ||
| // double-allocate during repeated actions. | ||
| destroy_instance(&cptestctx).await; | ||
| verify_clean_clate(&cptestctx).await; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admittedly did flip this bit - otherwise, I need to poke the instance and update state to "stopped" before I can delete it anyway in the tests below.