diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index a46b96b622..1248dbf085 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -35,6 +35,7 @@ use omicron_common::api::external::NameOrId; use omicron_common::api::external::UpdateResult; use omicron_common::api::external::Vni; use omicron_common::api::internal::nexus; +use sled_agent_client::types::InstancePutStateBody; use sled_agent_client::types::InstanceStateRequested; use sled_agent_client::types::SourceNatConfig; use sled_agent_client::Client as SledAgentClient; @@ -327,19 +328,8 @@ impl super::Nexus { opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { - // To implement reboot, we issue a call to the sled agent to set a - // runtime state of "reboot". We cannot simply stop the Instance and - // start it again here because if we crash in the meantime, we might - // leave it stopped. - // - // When an instance is rebooted, the "rebooting" flag remains set on - // the runtime state as it transitions to "Stopping" and "Stopped". - // This flag is cleared when the state goes to "Starting". This way, - // even if the whole rack powered off while this was going on, we would - // never lose track of the fact that this Instance was supposed to be - // running. let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -355,8 +345,29 @@ impl super::Nexus { opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { - let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + // TODO(#2824): This needs to be a saga for crash resiliency + // purposes (otherwise the instance can be leaked if Nexus crashes + // between registration and instance start). + let (.., authz_instance, mut db_instance) = + instance_lookup.fetch().await?; + + // The instance is not really being "created" (it already exists from + // the caller's perspective), but if it does not exist on its sled, the + // target sled agent will populate its instance manager with the + // contents of this modified record, and that record needs to allow a + // transition to the Starting state. + // + // If the instance does exist on this sled, this initial runtime state + // is ignored. + let initial_runtime = nexus_db_model::InstanceRuntimeState { + state: nexus_db_model::InstanceState(InstanceState::Creating), + ..db_instance.runtime_state + }; + db_instance.runtime_state = initial_runtime; + self.instance_ensure_registered(opctx, &authz_instance, &db_instance) + .await?; + + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -373,7 +384,7 @@ impl super::Nexus { instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -383,6 +394,24 @@ impl super::Nexus { self.db_datastore.instance_refetch(opctx, &authz_instance).await } + /// Idempotently ensures that the sled specified in `db_instance` does not + /// have a record of the instance. If the instance is currently running on + /// this sled, this operation rudely terminates it. + pub(crate) async fn instance_ensure_unregistered( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + db_instance: &db::model::Instance, + ) -> Result<(), Error> { + opctx.authorize(authz::Action::Modify, authz_instance).await?; + let sa = self.instance_sled(&db_instance).await?; + let result = sa + .instance_unregister(&db_instance.id()) + .await + .map(|res| res.into_inner().updated_runtime); + self.handle_instance_put_result(db_instance, result).await.map(|_| ()) + } + /// Returns the SledAgentClient for the host where this Instance is running. pub(crate) async fn instance_sled( &self, @@ -395,13 +424,20 @@ impl super::Nexus { fn check_runtime_change_allowed( &self, runtime: &nexus::InstanceRuntimeState, + requested: &InstanceStateRequested, ) -> Result<(), Error> { // Users are allowed to request a start or stop even if the instance is // already in the desired state (or moving to it), and we will issue a // request to the SA to make the state change in these cases in case the - // runtime state we saw here was stale. However, users are not allowed - // to change the state of an instance that's migrating, failed or - // destroyed. + // runtime state we saw here was stale. + // + // Users cannot change the state of a failed or destroyed instance. + // TODO(#2825): Failed instances should be allowed to stop. + // + // Migrating instances can't change state until they're done migrating, + // but for idempotency, a request to make an incarnation of an instance + // into a migration target is allowed if the incarnation is already a + // migration target. let allowed = match runtime.run_state { InstanceState::Creating => true, InstanceState::Starting => true, @@ -409,7 +445,9 @@ impl super::Nexus { InstanceState::Stopping => true, InstanceState::Stopped => true, InstanceState::Rebooting => true, - InstanceState::Migrating => false, + InstanceState::Migrating => { + matches!(requested, InstanceStateRequested::MigrationTarget(_)) + } InstanceState::Repairing => false, InstanceState::Failed => false, InstanceState::Destroyed => false, @@ -427,9 +465,7 @@ impl super::Nexus { } } - /// Modifies the runtime state of the Instance as requested. This generally - /// means booting or halting the Instance. - pub(crate) async fn instance_set_runtime( + pub(crate) async fn instance_request_state( &self, opctx: &OpContext, authz_instance: &authz::Instance, @@ -437,11 +473,35 @@ impl super::Nexus { requested: InstanceStateRequested, ) -> Result<(), Error> { opctx.authorize(authz::Action::Modify, authz_instance).await?; - self.check_runtime_change_allowed( &db_instance.runtime().clone().into(), + &requested, )?; + let sa = self.instance_sled(&db_instance).await?; + let instance_put_result = sa + .instance_put_state( + &db_instance.id(), + &InstancePutStateBody { state: requested }, + ) + .await + .map(|res| res.into_inner().updated_runtime); + + self.handle_instance_put_result(db_instance, instance_put_result) + .await + .map(|_| ()) + } + + /// Modifies the runtime state of the Instance as requested. This generally + /// means booting or halting the Instance. + pub(crate) async fn instance_ensure_registered( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + db_instance: &db::model::Instance, + ) -> Result<(), Error> { + opctx.authorize(authz::Action::Modify, authz_instance).await?; + // Gather disk information and turn that into DiskRequests let disks = self .db_datastore @@ -587,31 +647,73 @@ impl super::Nexus { let sa = self.instance_sled(&db_instance).await?; - let instance_put_result = sa - .instance_put( + let instance_register_result = sa + .instance_register( &db_instance.id(), &sled_agent_client::types::InstanceEnsureBody { initial: instance_hardware, - target: requested, - migrate: None, }, ) - .await; + .await + .map(|res| Some(res.into_inner())); + + self.handle_instance_put_result(db_instance, instance_register_result) + .await + .map(|_| ()) + } - match instance_put_result { - Ok(new_runtime) => { + /// Updates an instance's CRDB record based on the result of a call to sled + /// agent that tried to update the instance's state. + /// + /// # Parameters + /// + /// - `db_instance`: The CRDB instance record observed by the caller before + /// it attempted to update the instance's state. + /// - `result`: The result of the relevant sled agent operation. If this is + /// `Ok`, the payload is the updated instance runtime state returned from + /// sled agent, if there was one. + /// + /// # Return value + /// + /// - `Ok(true)` if the caller supplied an updated instance record and this + /// routine successfully wrote it to CRDB. + /// - `Ok(false)` if the sled agent call succeeded, but this routine did not + /// update CRDB. + /// This can happen either because sled agent didn't return an updated + /// record or because the updated record was superseded by a state update + /// with a more advanced generation number. + /// - `Err` if the sled agent operation failed or this routine received an + /// error while trying to update CRDB. + async fn handle_instance_put_result( + &self, + db_instance: &db::model::Instance, + result: Result< + Option, + sled_agent_client::Error, + >, + ) -> Result { + slog::debug!(&self.log, "Handling sled agent instance PUT result"; + "result" => ?result); + + match result { + Ok(Some(new_runtime)) => { let new_runtime: nexus::InstanceRuntimeState = - new_runtime.into_inner().into(); + new_runtime.into(); - self.db_datastore + let update_result = self + .db_datastore .instance_update_runtime( &db_instance.id(), &new_runtime.into(), ) - .await - .map(|_| ()) - } + .await; + slog::debug!(&self.log, + "Attempted DB update after instance PUT"; + "result" => ?update_result); + update_result + } + Ok(None) => Ok(false), Err(e) => { // The sled-agent has told us that it can't do what we // requested, but does that mean a failure? One example would be diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index 0504f9d8ed..6afe842308 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -120,8 +120,12 @@ declare_saga_actions! { V2P_ENSURE -> "v2p_ensure" { + sic_v2p_ensure } - INSTANCE_ENSURE -> "instance_ensure" { - + sic_instance_ensure + INSTANCE_ENSURE_REGISTERED -> "instance_ensure_registered" { + + sic_instance_ensure_registered + - sic_instance_ensure_registered_undo + } + INSTANCE_ENSURE_RUNNING -> "instance_ensure_running" { + + sic_instance_ensure_running } } @@ -348,8 +352,10 @@ impl NexusSaga for SagaInstanceCreate { builder.append(v2p_ensure_undo_action()); builder.append(v2p_ensure_action()); - builder.append(instance_ensure_action()); - + builder.append(instance_ensure_registered_action()); + if params.create_params.start { + builder.append(instance_ensure_running_action()); + } Ok(builder.build()?) } } @@ -1255,10 +1261,54 @@ async fn sic_delete_instance_record( Ok(()) } -async fn sic_instance_ensure( +async fn sic_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { + Ok(()) +} + +/// Ensure that the necessary v2p mappings exist for this instance +async fn sic_v2p_ensure( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + let instance_id = sagactx.lookup::("instance_id")?; + + osagactx + .nexus() + .create_instance_v2p_mappings(&opctx, instance_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn sic_v2p_ensure_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + let instance_id = sagactx.lookup::("instance_id")?; + + osagactx + .nexus() + .delete_instance_v2p_mappings(&opctx, instance_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn sic_instance_ensure_registered( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - // TODO-correctness is this idempotent? let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; let datastore = osagactx.datastore(); @@ -1281,15 +1331,23 @@ async fn sic_instance_ensure( if !params.create_params.start { let instance_id = db_instance.id(); // If we don't need to start the instance, we can skip the ensure - // and just update the instance runtime state to `Stopped` + // and just update the instance runtime state to `Stopped`. + // + // TODO-correctness: This is dangerous if this step is replayed, since + // a user can discover this instance and ask to start it in between + // attempts to run this step. One way to fix this is to avoid refetching + // the previous runtime state each time this step is taken, such that + // once this update is applied once, subsequent attempts to apply it + // will have an already-used generation number. let runtime_state = db::model::InstanceRuntimeState { state: db::model::InstanceState::new(InstanceState::Stopped), // Must update the generation, or the database query will fail. // - // The runtime state of the instance record is only changed as a result - // of the successful completion of the saga (i.e. after ensure which we're - // skipping in this case) or during saga unwinding. So we're guaranteed - // that the cached generation in the saga log is the most recent in the database. + // The runtime state of the instance record is only changed as a + // result of the successful completion of the saga (i.e. after + // ensure which we're skipping in this case) or during saga + // unwinding. So we're guaranteed that the cached generation in the + // saga log is the most recent in the database. gen: db::model::Generation::from( db_instance.runtime_state.gen.next(), ), @@ -1310,12 +1368,7 @@ async fn sic_instance_ensure( } else { osagactx .nexus() - .instance_set_runtime( - &opctx, - &authz_instance, - &db_instance, - InstanceStateRequested::Running, - ) + .instance_ensure_registered(&opctx, &authz_instance, &db_instance) .await .map_err(ActionError::action_failed)?; } @@ -1323,45 +1376,59 @@ async fn sic_instance_ensure( Ok(()) } -async fn sic_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { - Ok(()) -} - -/// Ensure that the necessary v2p mappings exist for this instance -async fn sic_v2p_ensure( +async fn sic_instance_ensure_registered_undo( sagactx: NexusActionContext, -) -> Result<(), ActionError> { +) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let datastore = osagactx.datastore(); + let instance_id = sagactx.lookup::("instance_id")?; let opctx = crate::context::op_context_for_saga_action( &sagactx, ¶ms.serialized_authn, ); - let instance_id = sagactx.lookup::("instance_id")?; + + let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; osagactx .nexus() - .create_instance_v2p_mappings(&opctx, instance_id) + .instance_ensure_unregistered(&opctx, &authz_instance, &db_instance) .await .map_err(ActionError::action_failed)?; Ok(()) } -async fn sic_v2p_ensure_undo( +async fn sic_instance_ensure_running( sagactx: NexusActionContext, -) -> Result<(), anyhow::Error> { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let datastore = osagactx.datastore(); + let instance_id = sagactx.lookup::("instance_id")?; let opctx = crate::context::op_context_for_saga_action( &sagactx, ¶ms.serialized_authn, ); - let instance_id = sagactx.lookup::("instance_id")?; + + let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; osagactx .nexus() - .delete_instance_v2p_mappings(&opctx, instance_id) + .instance_request_state( + &opctx, + &authz_instance, + &db_instance, + InstanceStateRequested::Running, + ) .await .map_err(ActionError::action_failed)?; diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index c274cc894f..f2f020ddfb 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -82,7 +82,7 @@ }, "/instances/{instance_id}": { "put": { - "operationId": "instance_put", + "operationId": "instance_register", "parameters": [ { "in": "path", @@ -122,6 +122,38 @@ "$ref": "#/components/responses/Error" } } + }, + "delete": { + "operationId": "instance_unregister", + "parameters": [ + { + "in": "path", + "name": "instance_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InstanceUnregisterResponse" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } } }, "/instances/{instance_id}/disks/{disk_id}/snapshot": { @@ -178,6 +210,50 @@ } } }, + "/instances/{instance_id}/state": { + "put": { + "operationId": "instance_put_state", + "parameters": [ + { + "in": "path", + "name": "instance_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InstancePutStateBody" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InstancePutStateResponse" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/services": { "put": { "operationId": "services_put", @@ -1008,38 +1084,20 @@ "minimum": 0 }, "InstanceEnsureBody": { - "description": "Sent to a sled agent to establish the runtime state of an Instance", + "description": "The body of a request to ensure that an instance is known to a sled agent.", "type": "object", "properties": { "initial": { - "description": "Last runtime state of the Instance known to Nexus (used if the agent has never seen this Instance before).", + "description": "A description of the instance's virtual hardware and the initial runtime state this sled agent should store for this incarnation of the instance.", "allOf": [ { "$ref": "#/components/schemas/InstanceHardware" } ] - }, - "migrate": { - "nullable": true, - "description": "If we're migrating this instance, the details needed to drive the migration", - "allOf": [ - { - "$ref": "#/components/schemas/InstanceMigrationTargetParams" - } - ] - }, - "target": { - "description": "requested runtime state of the Instance", - "allOf": [ - { - "$ref": "#/components/schemas/InstanceStateRequested" - } - ] } }, "required": [ - "initial", - "target" + "initial" ] }, "InstanceHardware": { @@ -1117,12 +1175,15 @@ ] }, "InstanceMigrationTargetParams": { + "description": "Parameters used when directing Propolis to initialize itself via live migration.", "type": "object", "properties": { "src_propolis_addr": { + "description": "The address of the Propolis server that will serve as the migration source.", "type": "string" }, "src_propolis_id": { + "description": "The Propolis ID of the migration source.", "type": "string", "format": "uuid" } @@ -1132,6 +1193,38 @@ "src_propolis_id" ] }, + "InstancePutStateBody": { + "description": "The body of a request to move a previously-ensured instance into a specific runtime state.", + "type": "object", + "properties": { + "state": { + "description": "The state into which the instance should be driven.", + "allOf": [ + { + "$ref": "#/components/schemas/InstanceStateRequested" + } + ] + } + }, + "required": [ + "state" + ] + }, + "InstancePutStateResponse": { + "description": "The response sent from a request to move an instance into a specific runtime state.", + "type": "object", + "properties": { + "updated_runtime": { + "nullable": true, + "description": "The current runtime state of the instance after handling the request to change its state. If the instance's state did not change, this field is `None`.", + "allOf": [ + { + "$ref": "#/components/schemas/InstanceRuntimeState" + } + ] + } + } + }, "InstanceRuntimeState": { "description": "Runtime state of the Instance, including the actual running state and minimal metadata\n\nThis state is owned by the sled agent running that Instance.", "type": "object", @@ -1295,35 +1388,86 @@ "description": "Requestable running state of an Instance.\n\nA subset of [`omicron_common::api::external::InstanceState`].", "oneOf": [ { - "description": "Start the instance if it is not already running.", - "type": "string", - "enum": [ - "running" + "description": "Run this instance by migrating in from a previous running incarnation of the instance.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "migration_target" + ] + }, + "value": { + "$ref": "#/components/schemas/InstanceMigrationTargetParams" + } + }, + "required": [ + "type", + "value" ] }, { - "description": "Stop the instance.", - "type": "string", - "enum": [ - "stopped" + "description": "Start the instance if it is not already running.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "running" + ] + } + }, + "required": [ + "type" ] }, { - "description": "Issue a reset command to the instance, such that it should stop and then immediately become running.", - "type": "string", - "enum": [ - "reboot" + "description": "Stop the instance.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "stopped" + ] + } + }, + "required": [ + "type" ] }, { - "description": "Stop the instance and delete it.", - "type": "string", - "enum": [ - "destroyed" + "description": "Immediately reset the instance, as though it had stopped and immediately began to run again.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "reboot" + ] + } + }, + "required": [ + "type" ] } ] }, + "InstanceUnregisterResponse": { + "description": "The response sent from a request to unregister an instance.", + "type": "object", + "properties": { + "updated_runtime": { + "nullable": true, + "description": "The current state of the instance after handling the request to unregister it. If the instance's state did not change, this field is `None`.", + "allOf": [ + { + "$ref": "#/components/schemas/InstanceRuntimeState" + } + ] + } + } + }, "IpNet": { "oneOf": [ { diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index d04dfab255..fb3f1ab40c 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -4,10 +4,10 @@ //! HTTP entrypoint functions for the sled agent's exposed API -use crate::params::VpcFirewallRulesEnsureBody; use crate::params::{ - DatasetEnsureBody, DiskEnsureBody, InstanceEnsureBody, ServiceEnsureBody, - SledRole, TimeSync, Zpool, + DatasetEnsureBody, DiskEnsureBody, InstanceEnsureBody, + InstancePutStateBody, InstancePutStateResponse, InstanceUnregisterResponse, + ServiceEnsureBody, SledRole, TimeSync, VpcFirewallRulesEnsureBody, Zpool, }; use dropshot::{ endpoint, ApiDescription, HttpError, HttpResponseOk, @@ -32,7 +32,9 @@ pub fn api() -> SledApiDescription { api.register(disk_put)?; api.register(filesystem_put)?; api.register(instance_issue_disk_snapshot_request)?; - api.register(instance_put)?; + api.register(instance_put_state)?; + api.register(instance_register)?; + api.register(instance_unregister)?; api.register(services_put)?; api.register(sled_role_get)?; api.register(set_v2p)?; @@ -118,7 +120,7 @@ struct InstancePathParam { method = PUT, path = "/instances/{instance_id}", }] -async fn instance_put( +async fn instance_register( rqctx: RequestContext, path_params: Path, body: TypedBody, @@ -127,14 +129,45 @@ async fn instance_put( let instance_id = path_params.into_inner().instance_id; let body_args = body.into_inner(); Ok(HttpResponseOk( - sa.instance_ensure( - instance_id, - body_args.initial, - body_args.target, - body_args.migrate, - ) - .await - .map_err(Error::from)?, + sa.instance_ensure_registered(instance_id, body_args.initial) + .await + .map_err(Error::from)?, + )) +} + +#[endpoint { + method = DELETE, + path = "/instances/{instance_id}", +}] +async fn instance_unregister( + rqctx: RequestContext, + path_params: Path, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + Ok(HttpResponseOk( + sa.instance_ensure_unregistered(instance_id) + .await + .map_err(Error::from)?, + )) +} + +#[endpoint { + method = PUT, + path = "/instances/{instance_id}/state", +}] +async fn instance_put_state( + rqctx: RequestContext, + path_params: Path, + body: TypedBody, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + let body_args = body.into_inner(); + Ok(HttpResponseOk( + sa.instance_ensure_state(instance_id, body_args.state) + .await + .map_err(Error::from)?, )) } diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index 3c39314321..901d3bc990 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -89,6 +89,9 @@ pub enum Error { #[error("Instance {0} not running!")] InstanceNotRunning(Uuid), + + #[error("Instance already registered with Propolis ID {0}")] + InstanceAlreadyRegistered(Uuid), } // Issues read-only, idempotent HTTP requests at propolis until it responds with @@ -159,14 +162,12 @@ enum Reaction { struct RunningState { // Connection to Propolis. client: Arc, - // Object representing membership in the "instance manager". - instance_ticket: InstanceTicket, // Objects representing the instance's OPTE ports in the port manager port_tickets: Option>, // Handle to task monitoring for Propolis state changes. monitor_task: Option>, // Handle to the zone. - _running_zone: RunningZone, + running_zone: RunningZone, } impl Drop for RunningState { @@ -234,24 +235,61 @@ struct InstanceInner { // Connection to Nexus lazy_nexus_client: LazyNexusClient, + + // Object representing membership in the "instance manager". + instance_ticket: InstanceTicket, } impl InstanceInner { + /// Yields this instance's ID. fn id(&self) -> &Uuid { &self.properties.id } - /// UUID of the underlying propolis-server process + /// Yields this instance's Propolis's ID. fn propolis_id(&self) -> &Uuid { &self.propolis_id } + async fn publish_state_to_nexus(&self) -> Result<(), Error> { + self.lazy_nexus_client + .get() + .await? + .cpapi_instances_put( + self.id(), + &nexus_client::types::InstanceRuntimeState::from( + self.state.current().clone(), + ), + ) + .await + .map_err(|e| Error::Notification(e))?; + + Ok(()) + } + + /// Processes a Propolis state change observed by the Propolis monitoring + /// task. async fn observe_state( &mut self, state: propolis_client::api::InstanceState, ) -> Result { info!(self.log, "Observing new propolis state: {:?}", state); + // The instance might have been rudely terminated between the time the + // call to Propolis returned and the time the instance lock was + // acquired for this call. If that happened, do not publish the Propolis + // state; simply remain in the Destroyed state. + // + // Returning the `Terminate` action is OK because terminating a + // previously-terminated instance is a no-op. + if self.state.current().run_state == InstanceState::Destroyed { + info!( + self.log, + "Ignoring new propolis state: instance is already destroyed" + ); + return Ok(Reaction::Terminate); + } + // Update the Sled Agent's internal state machine. let action = self.state.observe_transition(&state); info!( @@ -261,18 +299,10 @@ impl InstanceInner { action ); - // Notify Nexus of the state change. - self.lazy_nexus_client - .get() - .await? - .cpapi_instances_put( - self.id(), - &nexus_client::types::InstanceRuntimeState::from( - self.state.current().clone(), - ), - ) - .await - .map_err(|e| Error::Notification(e))?; + // TODO(#2727): Any failure here (even a transient failure) causes the + // monitoring task to exit, marooning the instance. Decide where best + // to handle this. + self.publish_state_to_nexus().await?; // Take the next action, if any. if let Some(action) = action { @@ -282,6 +312,7 @@ impl InstanceInner { } } + /// Sends an instance state PUT request to this instance's Propolis. async fn propolis_state_put( &self, request: propolis_client::api::InstanceStateRequested, @@ -297,15 +328,13 @@ impl InstanceInner { Ok(()) } - async fn ensure( - &mut self, - instance: Instance, - instance_ticket: InstanceTicket, - setup: PropolisSetup, + /// Sends an instance ensure request to this instance's Propolis. + async fn propolis_ensure( + &self, + client: &PropolisClient, + running_zone: &RunningZone, migrate: Option, ) -> Result<(), Error> { - let PropolisSetup { client, running_zone, port_tickets } = setup; - let nics = running_zone .opte_ports() .iter() @@ -349,6 +378,26 @@ impl InstanceInner { let result = client.instance_ensure().body(request).send().await; info!(self.log, "result of instance_ensure call is {:?}", result); result?; + Ok(()) + } + + /// Given a freshly-created Propolis process, sends an ensure request to + /// that Propolis and launches all of the tasks needed to monitor the + /// resulting Propolis VM. + /// + /// # Panics + /// + /// Panics if this routine is called more than once for a given Instance. + async fn ensure_propolis_and_tasks( + &mut self, + instance: Instance, + setup: PropolisSetup, + migrate: Option, + ) -> Result<(), Error> { + assert!(self.running_state.is_none()); + + let PropolisSetup { client, running_zone, port_tickets } = setup; + self.propolis_ensure(&client, &running_zone, migrate).await?; // Monitor propolis for state changes in the background. let monitor_client = client.clone(); @@ -363,10 +412,9 @@ impl InstanceInner { self.running_state = Some(RunningState { client, - instance_ticket, port_tickets, monitor_task, - _running_zone: running_zone, + running_zone, }); Ok(()) @@ -398,6 +446,44 @@ impl InstanceInner { self.propolis_state_put(requested_state).await?; Ok(Reaction::Continue) } + + /// Immediately terminates this instance's Propolis zone and cleans up any + /// runtime objects associated with the instance. + /// + /// This routine is safe to call even if the instance's zone was never + /// started. It is also safe to call multiple times on a single instance. + async fn terminate(&mut self) -> Result<(), Error> { + // Ensure that no zone exists. This succeeds even if no zone was ever + // created. + let zname = propolis_zone_name(self.propolis_id()); + warn!(self.log, "Halting and removing zone: {}", zname); + Zones::halt_and_remove_logged(&self.log, &zname).await.unwrap(); + + // Remove ourselves from the instance manager's map of instances. + self.instance_ticket.terminate(); + + // See if there are any runtime objects to clean up. + let mut running_state = if let Some(state) = self.running_state.take() { + state + } else { + return Ok(()); + }; + + // Remove any OPTE ports from the port manager. + let mut result = Ok(()); + if let Some(tickets) = running_state.port_tickets.as_mut() { + for ticket in tickets.iter_mut() { + // Release the port from the manager, and store any error. We + // don't return immediately so that we can try to clean up all + // ports, even if early ones fail. Return the last error, which + // is OK for now. + if let Err(e) = ticket.release() { + result = Err(e.into()); + } + } + } + result + } } /// A reference to a single instance running a running Propolis server. @@ -412,28 +498,27 @@ pub struct Instance { #[cfg(test)] mockall::mock! { pub Instance { + #[allow(clippy::too_many_arguments)] pub fn new( log: Logger, id: Uuid, + ticket: InstanceTicket, initial: InstanceHardware, vnic_allocator: VnicAllocator, port_manager: PortManager, lazy_nexus_client: LazyNexusClient, ) -> Result; - pub async fn start( + pub async fn current_state(&self) -> InstanceRuntimeState; + pub async fn put_state( &self, - instance_ticket: InstanceTicket, - migrate: Option, - ) -> Result<(), Error>; - pub async fn transition( - &self, - target: InstanceStateRequested, + state: InstanceStateRequested ) -> Result; pub async fn issue_snapshot_request( &self, disk_id: Uuid, snapshot_name: Uuid, ) -> Result<(), Error>; + pub async fn terminate(&self) -> Result; } impl Clone for Instance { fn clone(&self) -> Self; @@ -455,9 +540,11 @@ impl Instance { /// ports. /// * `lazy_nexus_client`: Connection to Nexus, used for sending notifications. // TODO: This arg list is getting a little long; can we clean this up? + #[allow(clippy::too_many_arguments)] pub fn new( log: Logger, id: Uuid, + ticket: InstanceTicket, initial: InstanceHardware, vnic_allocator: VnicAllocator, port_manager: PortManager, @@ -492,6 +579,7 @@ impl Instance { state: InstanceStates::new(initial.runtime), running_state: None, lazy_nexus_client, + instance_ticket: ticket, }; let inner = Arc::new(Mutex::new(instance)); @@ -499,25 +587,138 @@ impl Instance { Ok(Instance { inner }) } + pub async fn current_state(&self) -> InstanceRuntimeState { + let inner = self.inner.lock().await; + inner.state.current().clone() + } + + /// Ensures that a Propolis process exists for this instance, then sends it + /// an instance ensure request. + async fn propolis_ensure( + &self, + inner: &mut MutexGuard<'_, InstanceInner>, + migration_params: Option, + ) -> Result<(), Error> { + if let Some(running_state) = inner.running_state.as_ref() { + inner + .propolis_ensure( + &running_state.client, + &running_state.running_zone, + migration_params, + ) + .await?; + } else { + let setup_result: Result<(), Error> = 'setup: { + // If there's no Propolis yet, and this instance is not being + // initialized via migration, immediately send a state update to + // Nexus to reflect that the instance is starting (so that the + // external API will display this state while the zone is being + // started). + // + // Migration targets don't do this because the instance is still + // logically running (on the source) while the target Propolis + // is being launched. + if migration_params.is_none() { + inner.state.transition(InstanceState::Starting); + if let Err(e) = inner.publish_state_to_nexus().await { + break 'setup Err(e); + } + } + + // Set up the Propolis zone and the objects associated with it. + let setup = match self.setup_propolis_locked(inner).await { + Ok(setup) => setup, + Err(e) => break 'setup Err(e), + }; + + // Direct the Propolis server to create its VM and the tasks + // associated with it. On success, the zone handle moves into + // this instance, preserving the zone. + inner + .ensure_propolis_and_tasks( + self.clone(), + setup, + migration_params, + ) + .await + }; + + // If this instance started from scratch, and startup failed, move + // the instance to the Failed state instead of leaking the Starting + // state. + // + // Once again, migration targets don't do this, because a failure to + // start a migration target simply leaves the VM running untouched + // on the source. + if migration_params.is_none() && setup_result.is_err() { + inner.state.transition(InstanceState::Failed); + inner.publish_state_to_nexus().await?; + } + setup_result?; + } + Ok(()) + } + + /// Attempts to update the current state of the instance by launching a + /// Propolis process for the instance (if needed) and issuing an appropriate + /// request to Propolis to change state. + /// + /// Returns the instance's state after applying any changes required by this + /// call. Note that if the instance's Propolis is in the middle of its own + /// state transition, it may publish states that supersede the state + /// published by this routine in perhaps-surprising ways. For example, if an + /// instance begins to stop when Propolis has just begun to handle a prior + /// request to reboot, the instance's state may proceed from Stopping to + /// Rebooting to Running to Stopping to Stopped. + pub async fn put_state( + &self, + state: crate::params::InstanceStateRequested, + ) -> Result { + use propolis_client::api::InstanceStateRequested as PropolisRequest; + let mut inner = self.inner.lock().await; + let (propolis_state, next_published) = match state { + InstanceStateRequested::MigrationTarget(migration_params) => { + self.propolis_ensure(&mut inner, Some(migration_params)) + .await?; + (None, None) + } + InstanceStateRequested::Running => { + self.propolis_ensure(&mut inner, None).await?; + (Some(PropolisRequest::Run), None) + } + InstanceStateRequested::Stopped => { + // If the instance has not started yet, unregister it + // immediately. Since there is no Propolis to push updates when + // this happens, generate an instance record bearing the + // "Destroyed" state and return it to the caller. + if inner.running_state.is_none() { + inner.terminate().await?; + (None, Some(InstanceState::Destroyed)) + } else { + (Some(PropolisRequest::Stop), Some(InstanceState::Stopping)) + } + } + InstanceStateRequested::Reboot => { + if inner.running_state.is_none() { + return Err(Error::InstanceNotRunning(*inner.id())); + } + (Some(PropolisRequest::Reboot), Some(InstanceState::Rebooting)) + } + }; + + if let Some(p) = propolis_state { + inner.propolis_state_put(p).await?; + } + if let Some(s) = next_published { + inner.state.transition(s); + } + Ok(inner.state.current().clone()) + } + async fn setup_propolis_locked( &self, inner: &mut MutexGuard<'_, InstanceInner>, ) -> Result { - // Update nexus with an in-progress state while we set up the instance. - inner.state.transition(InstanceState::Starting); - inner - .lazy_nexus_client - .get() - .await? - .cpapi_instances_put( - inner.id(), - &nexus_client::types::InstanceRuntimeState::from( - inner.state.current().clone(), - ), - ) - .await - .map_err(|e| Error::Notification(e))?; - // Create OPTE ports for the instance let mut opte_ports = Vec::with_capacity(inner.requested_nics.len()); let mut port_tickets = Vec::with_capacity(inner.requested_nics.len()); @@ -698,50 +899,13 @@ impl Instance { }) } - /// Begins the execution of the instance's service (Propolis). - pub async fn start( - &self, - instance_ticket: InstanceTicket, - migrate: Option, - ) -> Result<(), Error> { - let mut inner = self.inner.lock().await; - - // Create the propolis zone and resources - let setup = self.setup_propolis_locked(&mut inner).await?; - - // Ensure the instance exists in the Propolis Server before we start - // using it. - inner.ensure(self.clone(), instance_ticket, setup, migrate).await?; - - Ok(()) - } - - // Terminate the Propolis service. - async fn stop(&self) -> Result<(), Error> { + /// Rudely terminates this instance's Propolis (if it has one) and + /// immediately transitions the instance to the Destroyed state. + pub async fn terminate(&self) -> Result { let mut inner = self.inner.lock().await; - - let zname = propolis_zone_name(inner.propolis_id()); - warn!(inner.log, "Halting and removing zone: {}", zname); - Zones::halt_and_remove_logged(&inner.log, &zname).await.unwrap(); - - // Remove ourselves from the instance manager's map of instances. - let running_state = inner.running_state.as_mut().unwrap(); - running_state.instance_ticket.terminate(); - - // And remove the OPTE ports from the port manager - let mut result = Ok(()); - if let Some(tickets) = running_state.port_tickets.as_mut() { - for ticket in tickets.iter_mut() { - // Release the port from the manager, and store any error. We - // don't return immediately so that we can try to clean up all - // ports, even if early ones fail. Return the last error, which - // is OK for now. - if let Err(e) = ticket.release() { - result = Err(e.into()); - } - } - } - result + inner.terminate().await?; + inner.state.transition(InstanceState::Destroyed); + Ok(inner.state.current().clone()) } // Monitors propolis until explicitly told to disconnect. @@ -766,7 +930,7 @@ impl Instance { match reaction { Reaction::Continue => {} Reaction::Terminate => { - return self.stop().await; + return self.terminate().await.map(|_| ()); } } @@ -776,49 +940,6 @@ impl Instance { } } - /// Attempts to change the current state of the instance by issuing an - /// appropriate state change command to its Propolis. - /// - /// Returns the instance's state after applying any changes required by this - /// call. Note that if the instance's Propolis is in the middle of its own - /// state transition, it may publish states that supersede the state - /// published by this routine in perhaps-surprising ways. For example, if an - /// instance begins to stop when Propolis has just begun to handle a prior - /// request to reboot, the instance's state may proceed from Stopping to - /// Rebooting to Running to Stopping to Stopped. - /// - /// # Panics - /// - /// This method may panic if it has been invoked before [`Instance::start`]. - pub async fn transition( - &self, - target: InstanceStateRequested, - ) -> Result { - let mut inner = self.inner.lock().await; - - let (propolis_state, next_published) = match target { - InstanceStateRequested::Running => { - (propolis_client::api::InstanceStateRequested::Run, None) - } - InstanceStateRequested::Stopped - | InstanceStateRequested::Destroyed => ( - propolis_client::api::InstanceStateRequested::Stop, - Some(InstanceState::Stopping), - ), - InstanceStateRequested::Reboot => ( - propolis_client::api::InstanceStateRequested::Reboot, - Some(InstanceState::Rebooting), - ), - }; - - inner.propolis_state_put(propolis_state).await?; - if let Some(s) = next_published { - inner.state.transition(s); - } - - Ok(inner.state.current().clone()) - } - pub async fn issue_snapshot_request( &self, disk_id: Uuid, @@ -845,6 +966,7 @@ impl Instance { #[cfg(test)] mod test { use super::*; + use crate::instance_manager::InstanceManager; use crate::nexus::LazyNexusClient; use crate::params::InstanceStateRequested; use crate::params::SourceNatConfig; @@ -913,9 +1035,6 @@ mod test { #[tokio::test] #[serial_test::serial] - #[should_panic( - expected = "Propolis client should be initialized before usage" - )] async fn transition_before_start() { let logctx = test_setup_log("transition_before_start"); let log = &logctx.log; @@ -930,10 +1049,19 @@ mod test { let lazy_nexus_client = LazyNexusClient::new(log.clone(), std::net::Ipv6Addr::LOCALHOST) .unwrap(); + let instance_manager = InstanceManager::new( + log.clone(), + lazy_nexus_client.clone(), + Etherstub("mylink".to_string()), + underlay_ip, + mac, + ) + .unwrap(); let inst = Instance::new( log.clone(), test_uuid(), + instance_manager.test_instance_ticket(test_uuid()), new_initial_instance(), vnic_allocator, port_manager, @@ -941,12 +1069,9 @@ mod test { ) .unwrap(); - // Remove the logfile before we expect to panic, or it'll never be - // cleaned up. - logctx.cleanup_successful(); + // Pick a state transition that requires the instance to have started. + assert!(inst.put_state(InstanceStateRequested::Reboot).await.is_err()); - // Trying to transition before the instance has been initialized will - // result in a panic. - inst.transition(InstanceStateRequested::Running).await.unwrap(); + logctx.cleanup_successful(); } } diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index b5a36b8272..d04c828167 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -5,9 +5,9 @@ //! API for controlling multiple instances on a sled. use crate::nexus::LazyNexusClient; -use crate::params::VpcFirewallRule; use crate::params::{ - InstanceHardware, InstanceMigrationTargetParams, InstanceStateRequested, + InstanceHardware, InstancePutStateResponse, InstanceStateRequested, + InstanceUnregisterResponse, VpcFirewallRule, }; use illumos_utils::dladm::Etherstub; use illumos_utils::link::VnicAllocator; @@ -87,97 +87,146 @@ impl InstanceManager { }) } - /// Idempotently ensures that the given Instance (described by - /// `initial_hardware`) exists on this server in the given runtime state - /// (described by `target`). - pub async fn ensure( + /// Ensures that the instance manager contains a registered instance with + /// the supplied instance ID and the Propolis ID specified in + /// `initial_hardware`. + /// + /// # Arguments + /// + /// * instance_id: The ID of the instance to register. + /// * initial_hardware: The initial hardware manifest and runtime state of + /// the instance, to be used if the instance does not already exist. + /// + /// # Return value + /// + /// `Ok` if the instance is registered with the supplied Propolis ID, `Err` + /// otherwise. This routine is idempotent if called to register the same + /// (instance ID, Propolis ID) pair multiple times, but will fail if the + /// instance is registered with a Propolis ID different from the one the + /// caller supplied. + pub async fn ensure_registered( &self, instance_id: Uuid, initial_hardware: InstanceHardware, - target: InstanceStateRequested, - migrate: Option, ) -> Result { + let requested_propolis_id = initial_hardware.runtime.propolis_id; info!( &self.inner.log, - "instance_ensure {} -> {:?}", instance_id, target + "ensuring instance is registered"; + "instance_id" => %instance_id, + "propolis_id" => %requested_propolis_id ); - let target_propolis_id = initial_hardware.runtime.propolis_id; - - let (instance, maybe_instance_ticket) = { + let instance = { let mut instances = self.inner.instances.lock().unwrap(); - match (instances.get(&instance_id), &migrate) { - (Some((_, instance)), None) => { - // Instance already exists and we're not performing a migration - info!(&self.inner.log, "instance already exists"); - (instance.clone(), None) - } - (Some((propolis_id, instance)), Some(_)) - if *propolis_id == target_propolis_id => - { - // A migration was requested but the given propolis id - // already seems to be the propolis backing this instance - // so just return the instance as is without triggering - // another migration. + if let Some((existing_propolis_id, existing_instance)) = + instances.get(&instance_id) + { + if requested_propolis_id != *existing_propolis_id { + info!(&self.inner.log, + "instance already registered with another Propolis ID"; + "instance_id" => %instance_id, + "existing_propolis_id" => %*existing_propolis_id); + return Err(Error::Instance( + crate::instance::Error::InstanceAlreadyRegistered( + *existing_propolis_id, + ), + )); + } else { info!( &self.inner.log, - "instance already exists with given dst propolis" + "instance already registered with requested Propolis ID" ); - (instance.clone(), None) + existing_instance.clone() } - _ => { - // Instance does not exist or one does but we're performing - // a intra-sled migration. Either way - create an instance - info!(&self.inner.log, "new instance"); - let instance_log = self.inner.log.new(o!()); - let instance = Instance::new( - instance_log, - instance_id, - initial_hardware, - self.inner.vnic_allocator.clone(), - self.inner.port_manager.clone(), - self.inner.lazy_nexus_client.clone(), - )?; - let instance_clone = instance.clone(); - let old_instance = instances - .insert(instance_id, (target_propolis_id, instance)); - if let Some((_old_propolis_id, old_instance)) = old_instance - { - // If we had a previous instance, we must be migrating - assert!(migrate.is_some()); - // TODO: assert that old_instance.inner.propolis_id() == migrate.src_uuid - - // We forget the old instance because otherwise if it is the last - // handle, the `InstanceTicket` it holds will also be dropped. - // `InstanceTicket::drop` will try to remove the corresponding - // instance from the instance manager. It does this based off the - // instance's ID. Given that we just replaced said instance using - // the same ID, that would inadvertantly remove our newly created - // instance instead. - // TODO: cleanup source instance properly - std::mem::forget(old_instance); - } + } else { + info!(&self.inner.log, + "registering new instance"; + "instance_id" => ?instance_id); + let instance_log = self.inner.log.new(o!()); + let ticket = + InstanceTicket::new(instance_id, self.inner.clone()); + let instance = Instance::new( + instance_log, + instance_id, + ticket, + initial_hardware, + self.inner.vnic_allocator.clone(), + self.inner.port_manager.clone(), + self.inner.lazy_nexus_client.clone(), + )?; + let instance_clone = instance.clone(); + let _old = instances + .insert(instance_id, (requested_propolis_id, instance)); + assert!(_old.is_none()); + instance_clone + } + }; - let ticket = Some(InstanceTicket::new( - instance_id, - self.inner.clone(), - )); - (instance_clone, ticket) - } + Ok(instance.current_state().await) + } + + /// Idempotently ensures the instance is not registered with this instance + /// manager. If the instance exists and has a running Propolis, that + /// Propolis is rudely terminated. + pub async fn ensure_unregistered( + &self, + instance_id: Uuid, + ) -> Result { + let instance = { + let instances = self.inner.instances.lock().unwrap(); + let instance = instances.get(&instance_id); + if let Some((_, instance)) = instance { + instance.clone() + } else { + return Ok(InstanceUnregisterResponse { + updated_runtime: None, + }); } }; - // If we created a new instance, start or migrate it - but do so outside - // the "instances" lock, since initialization may take a while. - // - // Additionally, this makes it possible to manage the "instance_ticket", - // which might need to grab the lock to remove the instance during - // teardown. - if let Some(instance_ticket) = maybe_instance_ticket { - instance.start(instance_ticket, migrate).await?; - } + Ok(InstanceUnregisterResponse { + updated_runtime: Some(instance.terminate().await?), + }) + } + + /// Idempotently attempts to drive the supplied instance into the supplied + /// runtime state. + pub async fn ensure_state( + &self, + instance_id: Uuid, + target: InstanceStateRequested, + ) -> Result { + let instance = { + let instances = self.inner.instances.lock().unwrap(); + let instance = instances.get(&instance_id); + + if let Some((_, instance)) = instance { + instance.clone() + } else { + match target { + // If the instance isn't registered, then by definition it + // isn't running here. Allow requests to stop or destroy the + // instance to succeed to provide idempotency. This has to + // be handled here (that is, on the "instance not found" + // path) to handle the case where a stop request arrived, + // Propolis handled it, sled agent unregistered the + // instance, and only then did a second stop request + // arrive. + InstanceStateRequested::Stopped => { + return Ok(InstancePutStateResponse { + updated_runtime: None, + }); + } + _ => { + return Err(Error::NoSuchInstance(instance_id)); + } + } + } + }; - instance.transition(target).await.map_err(|e| e.into()) + let new_state = instance.put_state(target).await?; + Ok(InstancePutStateResponse { updated_runtime: Some(new_state) }) } pub async fn instance_issue_disk_snapshot_request( @@ -238,6 +287,14 @@ impl InstanceManager { self.inner.port_manager.unset_virtual_nic_host(mapping)?; Ok(()) } + + /// Generates an instance ticket associated with this instance manager. This + /// allows tests in other modules to create an Instance even though they + /// lack visibility to `InstanceManagerInternal`. + #[cfg(test)] + pub fn test_instance_ticket(&self, instance_id: Uuid) -> InstanceTicket { + InstanceTicket::new(instance_id, self.inner.clone()) + } } /// Represents membership of an instance in the [`InstanceManager`]. @@ -366,39 +423,65 @@ mod test { let ticket = Arc::new(std::sync::Mutex::new(None)); let ticket_clone = ticket.clone(); let instance_new_ctx = MockInstance::new_context(); - instance_new_ctx.expect().return_once(move |_, _, _, _, _, _| { + let mut seq = mockall::Sequence::new(); + + // Expect one call to new() that produces an instance that expects to be + // cloned once. The clone should expect to ask to be put into the + // Running state. + instance_new_ctx.expect().return_once(move |_, _, t, _, _, _, _| { let mut inst = MockInstance::default(); - inst.expect_clone().return_once(move || { - let mut inst = MockInstance::default(); - inst.expect_start().return_once(move |t, _| { - // Grab hold of the ticket, so we don't try to remove the - // instance immediately after "start" completes. - let mut ticket_guard = ticket_clone.lock().unwrap(); - *ticket_guard = Some(t); - Ok(()) - }); - inst.expect_transition().return_once(|_| { - let mut rt_state = new_initial_instance(); - rt_state.runtime.run_state = InstanceState::Running; - Ok(rt_state.runtime) - }); - inst - }); + + // Move the instance ticket out to the test, since the mock instance + // won't hold onto it. + let mut ticket_guard = ticket_clone.lock().unwrap(); + *ticket_guard = Some(t); + + // Expect to be cloned twice, once during registration (to fish the + // current state out of the instance) and once during the state + // transition (to hoist the instance reference out of the instance + // manager lock). + inst.expect_clone().times(1).in_sequence(&mut seq).return_once( + move || { + let mut inst = MockInstance::default(); + inst.expect_current_state() + .return_once(|| new_initial_instance().runtime); + inst + }, + ); + + inst.expect_clone().times(1).in_sequence(&mut seq).return_once( + move || { + let mut inst = MockInstance::default(); + inst.expect_put_state().return_once(|_| { + let mut rt_state = new_initial_instance(); + rt_state.runtime.run_state = InstanceState::Running; + Ok(rt_state.runtime) + }); + inst + }, + ); + Ok(inst) }); + + im.ensure_registered(test_uuid(), new_initial_instance()) + .await + .unwrap(); + + // The instance exists now. + assert_eq!(im.inner.instances.lock().unwrap().len(), 1); + let rt_state = im - .ensure( - test_uuid(), - new_initial_instance(), - InstanceStateRequested::Running, - None, - ) + .ensure_state(test_uuid(), InstanceStateRequested::Running) .await .unwrap(); // At this point, we can observe the expected state of the instance - // manager: contianing the created instance... - assert_eq!(rt_state.run_state, InstanceState::Running); + // manager: containing the created instance... + assert_eq!( + rt_state.updated_runtime.unwrap().run_state, + InstanceState::Running + ); assert_eq!(im.inner.instances.lock().unwrap().len(), 1); // ... however, when we drop the ticket of the corresponding instance, @@ -411,7 +494,7 @@ mod test { #[tokio::test] #[serial_test::serial] - async fn ensure_instance_repeatedly() { + async fn ensure_instance_state_repeatedly() { let logctx = test_setup_log("ensure_instance_repeatedly"); let log = &logctx.log; let lazy_nexus_client = @@ -441,18 +524,20 @@ mod test { let ticket_clone = ticket.clone(); let instance_new_ctx = MockInstance::new_context(); let mut seq = mockall::Sequence::new(); - instance_new_ctx.expect().return_once(move |_, _, _, _, _, _| { + instance_new_ctx.expect().return_once(move |_, _, t, _, _, _, _| { let mut inst = MockInstance::default(); + let mut ticket_guard = ticket_clone.lock().unwrap(); + *ticket_guard = Some(t); + // First call to ensure (start + transition). inst.expect_clone().times(1).in_sequence(&mut seq).return_once( move || { let mut inst = MockInstance::default(); - inst.expect_start().return_once(move |t, _| { - let mut ticket_guard = ticket_clone.lock().unwrap(); - *ticket_guard = Some(t); - Ok(()) - }); - inst.expect_transition().return_once(|_| { + + inst.expect_current_state() + .returning(|| new_initial_instance().runtime); + + inst.expect_put_state().return_once(|_| { let mut rt_state = new_initial_instance(); rt_state.runtime.run_state = InstanceState::Running; Ok(rt_state.runtime) @@ -460,11 +545,12 @@ mod test { inst }, ); + // Next calls to ensure (transition only). - inst.expect_clone().times(2).in_sequence(&mut seq).returning( + inst.expect_clone().times(3).in_sequence(&mut seq).returning( move || { let mut inst = MockInstance::default(); - inst.expect_transition().returning(|_| { + inst.expect_put_state().returning(|_| { let mut rt_state = new_initial_instance(); rt_state.runtime.run_state = InstanceState::Running; Ok(rt_state.runtime) @@ -477,14 +563,12 @@ mod test { let id = test_uuid(); let rt = new_initial_instance(); - let target = InstanceStateRequested::Running; - - // Creates instance, start + transition. - im.ensure(id, rt.clone(), target, None).await.unwrap(); - // Transition only. - im.ensure(id, rt.clone(), target, None).await.unwrap(); - // Transition only. - im.ensure(id, rt, target, None).await.unwrap(); + + // Register the instance, then issue all three state transitions. + im.ensure_registered(id, rt).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); assert_eq!(im.inner.instances.lock().unwrap().len(), 1); ticket.lock().unwrap().take(); diff --git a/sled-agent/src/params.rs b/sled-agent/src/params.rs index 4d4df4bf17..e9921842ec 100644 --- a/sled-agent/src/params.rs +++ b/sled-agent/src/params.rs @@ -65,50 +65,69 @@ pub struct InstanceHardware { pub cloud_init_bytes: Option, } -/// Sent to a sled agent to establish the runtime state of an Instance +/// The body of a request to ensure that an instance is known to a sled agent. #[derive(Serialize, Deserialize, JsonSchema)] pub struct InstanceEnsureBody { - /// Last runtime state of the Instance known to Nexus (used if the agent - /// has never seen this Instance before). + /// A description of the instance's virtual hardware and the initial runtime + /// state this sled agent should store for this incarnation of the instance. pub initial: InstanceHardware, - /// requested runtime state of the Instance - pub target: InstanceStateRequested, - /// If we're migrating this instance, the details needed to drive the migration - pub migrate: Option, } -#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +/// The body of a request to move a previously-ensured instance into a specific +/// runtime state. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct InstancePutStateBody { + /// The state into which the instance should be driven. + pub state: InstanceStateRequested, +} + +/// The response sent from a request to move an instance into a specific runtime +/// state. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct InstancePutStateResponse { + /// The current runtime state of the instance after handling the request to + /// change its state. If the instance's state did not change, this field is + /// `None`. + pub updated_runtime: Option, +} + +/// The response sent from a request to unregister an instance. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct InstanceUnregisterResponse { + /// The current state of the instance after handling the request to + /// unregister it. If the instance's state did not change, this field is + /// `None`. + pub updated_runtime: Option, +} + +/// Parameters used when directing Propolis to initialize itself via live +/// migration. +#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct InstanceMigrationTargetParams { + /// The Propolis ID of the migration source. pub src_propolis_id: Uuid, + + /// The address of the Propolis server that will serve as the migration + /// source. pub src_propolis_addr: SocketAddr, } /// Requestable running state of an Instance. /// /// A subset of [`omicron_common::api::external::InstanceState`]. -#[derive( - Copy, - Clone, - Debug, - Deserialize, - Eq, - Ord, - PartialEq, - PartialOrd, - Serialize, - JsonSchema, -)] -#[serde(rename_all = "lowercase")] +#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case", tag = "type", content = "value")] pub enum InstanceStateRequested { + /// Run this instance by migrating in from a previous running incarnation of + /// the instance. + MigrationTarget(InstanceMigrationTargetParams), /// Start the instance if it is not already running. Running, /// Stop the instance. Stopped, - /// Issue a reset command to the instance, such that it should - /// stop and then immediately become running. + /// Immediately reset the instance, as though it had stopped and immediately + /// began to run again. Reboot, - /// Stop the instance and delete it. - Destroyed, } impl Display for InstanceStateRequested { @@ -120,20 +139,20 @@ impl Display for InstanceStateRequested { impl InstanceStateRequested { fn label(&self) -> &str { match self { + InstanceStateRequested::MigrationTarget(_) => "migrating in", InstanceStateRequested::Running => "running", InstanceStateRequested::Stopped => "stopped", InstanceStateRequested::Reboot => "reboot", - InstanceStateRequested::Destroyed => "destroyed", } } /// Returns true if the state represents a stopped Instance. pub fn is_stopped(&self) -> bool { match self { + InstanceStateRequested::MigrationTarget(_) => false, InstanceStateRequested::Running => false, InstanceStateRequested::Stopped => true, InstanceStateRequested::Reboot => false, - InstanceStateRequested::Destroyed => true, } } } diff --git a/sled-agent/src/sim/collection.rs b/sled-agent/src/sim/collection.rs index d63623a1ba..fd9d08dec9 100644 --- a/sled-agent/src/sim/collection.rs +++ b/sled-agent/src/sim/collection.rs @@ -310,7 +310,7 @@ impl SimCollection { self: &Arc, id: &Uuid, current: S::CurrentState, - target: S::RequestedState, + target: Option, ) -> Result { let mut objects = self.objects.lock().await; let maybe_current_object = objects.remove(id); @@ -336,8 +336,12 @@ impl SimCollection { } }; - let rv = - object.transition(target).map(|_| object.object.current().clone()); + let rv = if let Some(target) = target { + object.transition(target).map(|_| object.object.current().clone()) + } else { + Ok(current.clone()) + }; + if rv.is_ok() || !is_new { objects.insert(*id, object); } @@ -347,6 +351,46 @@ impl SimCollection { pub async fn contains_key(self: &Arc, id: &Uuid) -> bool { self.objects.lock().await.contains_key(id) } + + pub async fn sim_get_current_state( + self: &Arc, + id: &Uuid, + ) -> Result { + let objects = self.objects.lock().await; + let instance = objects + .get(id) + .ok_or_else(|| Error::not_found_by_id(S::resource_type(), id))?; + Ok(instance.object.current().clone()) + } + + /// Iterates over all of the existing objects in the collection and, for any + /// that meet `condition`, asks to transition them into the supplied target + /// state. + /// + /// If any such transition fails, this routine short-circuits and does not + /// attempt to transition any other objects. + // + // TODO: It's likely more idiomatic to have an `iter_mut` routine that + // returns a struct that impls Iterator and yields &mut S references. The + // tricky bit is that the struct must hold the objects lock during the + // iteration. Figure out if there's a better way to arrange all this. + pub async fn sim_ensure_for_each_where( + self: &Arc, + condition: C, + target: &S::RequestedState, + ) -> Result<(), Error> + where + C: Fn(&S) -> bool, + { + let mut objects = self.objects.lock().await; + for o in objects.values_mut() { + if condition(&o.object) { + o.transition(target.clone())?; + } + } + + Ok(()) + } } #[cfg(test)] diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index a57eb08875..4f66d8601a 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -14,6 +14,7 @@ use dropshot::ConfigLoggingLevel; use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::api::external::ResourceType; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::ProducerEndpoint; use oximeter_producer::Server as ProducerServer; @@ -273,4 +274,8 @@ impl Simulatable for SimDisk { .map(|_| ()) .map_err(Error::from) } + + fn resource_type() -> ResourceType { + ResourceType::Disk + } } diff --git a/sled-agent/src/sim/http_entrypoints.rs b/sled-agent/src/sim/http_entrypoints.rs index 346bec17c6..a14034ae1f 100644 --- a/sled-agent/src/sim/http_entrypoints.rs +++ b/sled-agent/src/sim/http_entrypoints.rs @@ -4,8 +4,11 @@ //! HTTP entrypoint functions for the sled agent's exposed API -use crate::params::VpcFirewallRulesEnsureBody; -use crate::params::{DiskEnsureBody, InstanceEnsureBody}; +use crate::params::{ + DiskEnsureBody, InstanceEnsureBody, InstancePutStateBody, + InstancePutStateResponse, InstanceUnregisterResponse, + VpcFirewallRulesEnsureBody, +}; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::HttpError; @@ -30,7 +33,9 @@ type SledApiDescription = ApiDescription>; /// Returns a description of the sled agent API pub fn api() -> SledApiDescription { fn register_endpoints(api: &mut SledApiDescription) -> Result<(), String> { - api.register(instance_put)?; + api.register(instance_put_state)?; + api.register(instance_register)?; + api.register(instance_unregister)?; api.register(instance_poke_post)?; api.register(disk_put)?; api.register(disk_poke_post)?; @@ -60,7 +65,7 @@ struct InstancePathParam { method = PUT, path = "/instances/{instance_id}", }] -async fn instance_put( +async fn instance_register( rqctx: RequestContext>, path_params: Path, body: TypedBody, @@ -69,8 +74,37 @@ async fn instance_put( let instance_id = path_params.into_inner().instance_id; let body_args = body.into_inner(); Ok(HttpResponseOk( - sa.instance_ensure(instance_id, body_args.initial, body_args.target) - .await?, + sa.instance_register(instance_id, body_args.initial).await?, + )) +} + +#[endpoint { + method = DELETE, + path = "/instances/{instance_id}", +}] +async fn instance_unregister( + rqctx: RequestContext>, + path_params: Path, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + Ok(HttpResponseOk(sa.instance_unregister(instance_id).await?)) +} + +#[endpoint { + method = PUT, + path = "/instances/{instance_id}/state", +}] +async fn instance_put_state( + rqctx: RequestContext>, + path_params: Path, + body: TypedBody, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + let body_args = body.into_inner(); + Ok(HttpResponseOk( + sa.instance_ensure_state(instance_id, body_args.state).await?, )) } diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index aa4de3e973..f80e16db4e 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -14,6 +14,7 @@ use nexus_client; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::external::InstanceState as ApiInstanceState; +use omicron_common::api::external::ResourceType; use omicron_common::api::internal::nexus::InstanceRuntimeState; use propolis_client::api::InstanceState as PropolisInstanceState; use std::collections::VecDeque; @@ -87,6 +88,23 @@ impl Simulatable for SimInstance { target: &InstanceStateRequested, ) -> Result, Error> { match target { + InstanceStateRequested::MigrationTarget(_) => { + match self.next_resting_state() { + ApiInstanceState::Creating => { + self.propolis_queue + .push_back(PropolisInstanceState::Migrating); + self.propolis_queue + .push_back(PropolisInstanceState::Running); + } + _ => { + return Err(Error::invalid_request(&format!( + "can't request migration in with pending resting \ + state {}", + self.next_resting_state() + ))) + } + } + } InstanceStateRequested::Running => { match self.next_resting_state() { ApiInstanceState::Creating => { @@ -103,26 +121,13 @@ impl Simulatable for SimInstance { | ApiInstanceState::Running | ApiInstanceState::Rebooting | ApiInstanceState::Migrating => {} - ApiInstanceState::Stopping | ApiInstanceState::Stopped => { - // TODO: Normally, Propolis forbids direct transitions - // from a stopped state back to a running state. - // Instead, Nexus creates a new Propolis and sends state - // change requests to that. This arm abstracts this - // behavior away and just allows a fake instance to - // transition right back to a running state after being - // stopped. - // - // This will change in the future when the sled agents - // (both real and simulated) split "registering" an - // instance with the agent and actually starting it into - // separate actions. - self.state.transition(ApiInstanceState::Starting); - self.propolis_queue - .push_back(PropolisInstanceState::Starting); - self.propolis_queue - .push_back(PropolisInstanceState::Running); - } - ApiInstanceState::Repairing + + // Propolis forbids direct transitions from a stopped state + // back to a running state. Callers who want to restart a + // stopped instance must recreate it. + ApiInstanceState::Stopping + | ApiInstanceState::Stopped + | ApiInstanceState::Repairing | ApiInstanceState::Failed | ApiInstanceState::Destroyed => { return Err(Error::invalid_request(&format!( @@ -183,11 +188,6 @@ impl Simulatable for SimInstance { ))) } }, - InstanceStateRequested::Destroyed => { - self.state - .observe_transition(&PropolisInstanceState::Destroyed); - self.propolis_queue.clear(); - } } Ok(None) @@ -256,4 +256,8 @@ impl Simulatable for SimInstance { .map(|_| ()) .map_err(Error::from) } + + fn resource_type() -> ResourceType { + ResourceType::Instance + } } diff --git a/sled-agent/src/sim/simulatable.rs b/sled-agent/src/sim/simulatable.rs index e005d7b27f..ae1b9d59fc 100644 --- a/sled-agent/src/sim/simulatable.rs +++ b/sled-agent/src/sim/simulatable.rs @@ -6,6 +6,7 @@ use crate::nexus::NexusClient; use async_trait::async_trait; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::api::external::ResourceType; use std::fmt; use std::sync::Arc; use uuid::Uuid; @@ -116,4 +117,6 @@ pub trait Simulatable: fmt::Debug + Send + Sync { id: &Uuid, current: Self::CurrentState, ) -> Result<(), Error>; + + fn resource_type() -> ResourceType; } diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 93546e0129..fc9ce822ad 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -6,11 +6,13 @@ use crate::nexus::NexusClient; use crate::params::{ - DiskStateRequested, InstanceHardware, InstanceStateRequested, + DiskStateRequested, InstanceHardware, InstancePutStateResponse, + InstanceStateRequested, InstanceUnregisterResponse, }; +use crate::sim::simulatable::Simulatable; use crate::updates::UpdateManager; use futures::lock::Mutex; -use omicron_common::api::external::{Error, ResourceType}; +use omicron_common::api::external::{DiskState, Error, ResourceType}; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::InstanceRuntimeState; use slog::Logger; @@ -212,11 +214,10 @@ impl SledAgent { /// Idempotently ensures that the given API Instance (described by /// `api_instance`) exists on this server in the given runtime state /// (described by `target`). - pub async fn instance_ensure( + pub async fn instance_register( self: &Arc, instance_id: Uuid, mut initial_hardware: InstanceHardware, - target: InstanceStateRequested, ) -> Result { // respond with a fake 500 level failure if asked to ensure an instance // with more than 16 CPUs. @@ -229,29 +230,24 @@ impl SledAgent { for disk in &initial_hardware.disks { let initial_state = DiskRuntimeState { - disk_state: omicron_common::api::external::DiskState::Attached( - instance_id, - ), + disk_state: DiskState::Attached(instance_id), gen: omicron_common::api::external::Generation::new(), time_updated: chrono::Utc::now(), }; - let target = match target { - InstanceStateRequested::Running - | InstanceStateRequested::Reboot => { - DiskStateRequested::Attached(instance_id) - } - InstanceStateRequested::Stopped - | InstanceStateRequested::Destroyed => { - DiskStateRequested::Detached - } - }; - + // Ensure that any disks that are in this request are attached to + // this instance. let id = match disk.volume_construction_request { propolis_client::instance_spec::VolumeConstructionRequest::Volume { id, .. } => id, _ => panic!("Unexpected construction type"), }; - self.disks.sim_ensure(&id, initial_state, target).await?; + self.disks + .sim_ensure( + &id, + initial_state, + Some(DiskStateRequested::Attached(instance_id)), + ) + .await?; self.disks .sim_ensure_producer(&id, (self.nexus_address, id)) .await?; @@ -296,27 +292,11 @@ impl SledAgent { }, )?; } - - let body = match target { - InstanceStateRequested::Running => { - propolis_client::types::InstanceStateRequested::Run - } - InstanceStateRequested::Destroyed - | InstanceStateRequested::Stopped => { - propolis_client::types::InstanceStateRequested::Stop - } - InstanceStateRequested::Reboot => { - propolis_client::types::InstanceStateRequested::Reboot - } - }; - client.instance_state_put().body(body).send().await.map_err( - |e| Error::internal_error(&format!("propolis-client: {}", e)), - )?; } let instance_run_time_state = self .instances - .sim_ensure(&instance_id, initial_hardware.runtime, target) + .sim_ensure(&instance_id, initial_hardware.runtime, None) .await?; for disk_request in &initial_hardware.disks { @@ -337,6 +317,123 @@ impl SledAgent { Ok(instance_run_time_state) } + /// Forcibly unregisters an instance. To simulate the rude termination that + /// this produces in the real sled agent, the instance's mock Propolis is + /// not notified. + pub async fn instance_unregister( + self: &Arc, + instance_id: Uuid, + ) -> Result { + if !self.instances.contains_key(&instance_id).await { + return Ok(InstanceUnregisterResponse { updated_runtime: None }); + } + + self.detach_disks_from_instance(instance_id).await?; + + // TODO: Simulated instances can currently only be accessed via their + // collections, which only support operations that are generic across + // all simulated object types. Instantly destroying an object is not + // such an operation. Work around this for now by stopping the instance + // and immediately draining its state queue so that the instance is + // destroyed before this call returns. + self.instances + .sim_ensure( + &instance_id, + self.instances.sim_get_current_state(&instance_id).await?, + Some(InstanceStateRequested::Stopped), + ) + .await?; + self.instances.sim_poke(instance_id, PokeMode::Drain).await; + + Ok(InstanceUnregisterResponse { + updated_runtime: Some( + self.instances.sim_get_current_state(&instance_id).await?, + ), + }) + } + + /// Asks the supplied instance to transition to the requested state. + pub async fn instance_ensure_state( + self: &Arc, + instance_id: Uuid, + state: InstanceStateRequested, + ) -> Result { + if !self.instances.contains_key(&instance_id).await { + match state { + InstanceStateRequested::Stopped => { + return Ok(InstancePutStateResponse { + updated_runtime: None, + }); + } + _ => { + return Err(Error::invalid_request(&format!( + "instance {} not registered on sled", + instance_id, + ))); + } + } + } + + let mock_lock = self.mock_propolis.lock().await; + if let Some((_srv, client)) = mock_lock.as_ref() { + let body = match state { + InstanceStateRequested::MigrationTarget(_) => { + return Err(Error::internal_error( + "migration not implemented for mock Propolis", + )); + } + InstanceStateRequested::Running => { + propolis_client::types::InstanceStateRequested::Run + } + InstanceStateRequested::Stopped => { + propolis_client::types::InstanceStateRequested::Stop + } + InstanceStateRequested::Reboot => { + propolis_client::types::InstanceStateRequested::Reboot + } + }; + client.instance_state_put().body(body).send().await.map_err( + |e| Error::internal_error(&format!("propolis-client: {}", e)), + )?; + } + + let new_state = self + .instances + .sim_ensure( + &instance_id, + self.instances.sim_get_current_state(&instance_id).await?, + Some(state), + ) + .await?; + + // If this request will shut down the simulated instance, look for any + // disks that are attached to it and drive them to the Detached state. + if matches!(state, InstanceStateRequested::Stopped) { + self.detach_disks_from_instance(instance_id).await?; + } + + Ok(InstancePutStateResponse { updated_runtime: Some(new_state) }) + } + + async fn detach_disks_from_instance( + &self, + instance_id: Uuid, + ) -> Result<(), Error> { + self.disks + .sim_ensure_for_each_where( + |disk| match disk.current().disk_state { + DiskState::Attached(id) | DiskState::Attaching(id) => { + id == instance_id + } + _ => false, + }, + &DiskStateRequested::Detached, + ) + .await?; + + Ok(()) + } + /// Idempotently ensures that the given API Disk (described by `api_disk`) /// is attached (or not) as specified. This simulates disk attach and /// detach, similar to instance boot and halt. @@ -346,7 +443,7 @@ impl SledAgent { initial_state: DiskRuntimeState, target: DiskStateRequested, ) -> Result { - self.disks.sim_ensure(&disk_id, initial_state, target).await + self.disks.sim_ensure(&disk_id, initial_state, Some(target)).await } pub fn updates(&self) -> &UpdateManager { diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 78358e20a5..9f47aa799a 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -8,11 +8,11 @@ use crate::bootstrap::params::SledAgentRequest; use crate::config::Config; use crate::instance_manager::InstanceManager; use crate::nexus::{LazyNexusClient, NexusRequestQueue}; -use crate::params::VpcFirewallRule; use crate::params::{ DatasetKind, DiskStateRequested, InstanceHardware, - InstanceMigrationTargetParams, InstanceStateRequested, ServiceEnsureBody, - SledRole, TimeSync, Zpool, + InstancePutStateResponse, InstanceStateRequested, + InstanceUnregisterResponse, ServiceEnsureBody, SledRole, TimeSync, + VpcFirewallRule, Zpool, }; use crate::services::{self, ServiceManager}; use crate::storage_manager::StorageManager; @@ -525,17 +525,47 @@ impl SledAgent { Ok(()) } - /// Idempotently ensures that a given Instance is running on the sled. - pub async fn instance_ensure( + /// Idempotently ensures that a given instance is registered with this sled, + /// i.e., that it can be addressed by future calls to + /// [`instance_ensure_state`]. + pub async fn instance_ensure_registered( &self, instance_id: Uuid, initial: InstanceHardware, - target: InstanceStateRequested, - migrate: Option, ) -> Result { self.inner .instances - .ensure(instance_id, initial, target, migrate) + .ensure_registered(instance_id, initial) + .await + .map_err(|e| Error::Instance(e)) + } + + /// Idempotently ensures that the specified instance is no longer registered + /// on this sled. + /// + /// If the instance is registered and has a running Propolis, this operation + /// rudely terminates the instance. + pub async fn instance_ensure_unregistered( + &self, + instance_id: Uuid, + ) -> Result { + self.inner + .instances + .ensure_unregistered(instance_id) + .await + .map_err(|e| Error::Instance(e)) + } + + /// Idempotently drives the specified instance into the specified target + /// state. + pub async fn instance_ensure_state( + &self, + instance_id: Uuid, + target: InstanceStateRequested, + ) -> Result { + self.inner + .instances + .ensure_state(instance_id, target) .await .map_err(|e| Error::Instance(e)) }