From 382a8c33f1d61424952e81e5af37f9b38802c8ee Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 28 Mar 2023 22:51:31 +0000 Subject: [PATCH] sled agent: split ensure into "register" and "ensure state" APIs Split the sled agent's `/instances/{id}` PUT endpoint into two endpoints: - A PUT to `/instances/{id}` "registers" an instance with a sled. This creates a record for the instance in the manager, but does not start its Propolis and does not try to drive the instance to any particular state. - A PUT to `/instances/{id}/state` attempts to change the state of a previously- registered instance's VM by starting it, stopping it, rebooting it, initializing by live migration, or unceremoniously destroying it. (This last case is meant to provide a safety valve that lets Nexus get an unresponsive Propolis off a sled.) This allows the instance create saga to avoid a class of problems in which an instance starts, stops (due to user input to the VM), and then is errantly restarted by a replayed saga step: because sled agent will only accept requests to run a registered instance, and stopping an instance unregisters it, a replayed "run this VM" saga node won't restart the VM. The migration saga is vulnerable to a similar class of problem, so this groundwork is necessary to write that saga correctly. A secondary benefit of this change is that operations on running instances (like "stop" and "reboot") no longer need to construct an (unused) `InstanceHardware` to pass to the sled agent's ensure endpoint. Update the simulated sled agent to support these APIs, update callers in Nexus to use them, and split the instance create saga's "instance ensure" step into two steps as described above. This requires some extra affordances in simulated collections to support simulated disks, since instance state changes no longer go through a path where an instance's hardware manifest is available. Finally, add some Nexus logging to record information about CRDB updates that Nexus applies when a call to sled agent produces a new `InstanceRuntimeState`, since these are handy for debugging. Tested: cargo test; installed Omicron locally and played around with some instances. --- nexus/src/app/instance.rs | 154 +++++++++--- nexus/src/app/sagas/instance_create.rs | 129 +++++++--- openapi/sled-agent.json | 182 +++++++++++--- sled-agent/src/http_entrypoints.rs | 41 ++- sled-agent/src/instance.rs | 334 ++++++++++++++++--------- sled-agent/src/instance_manager.rs | 295 +++++++++++++--------- sled-agent/src/params.rs | 60 +++-- sled-agent/src/sim/collection.rs | 50 +++- sled-agent/src/sim/disk.rs | 5 + sled-agent/src/sim/http_entrypoints.rs | 27 +- sled-agent/src/sim/instance.rs | 49 ++-- sled-agent/src/sim/simulatable.rs | 3 + sled-agent/src/sim/sled_agent.rs | 130 +++++++--- sled-agent/src/sled_agent.rs | 29 ++- 14 files changed, 1043 insertions(+), 445 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index a46b96b622..4751543572 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -35,6 +35,7 @@ use omicron_common::api::external::NameOrId; use omicron_common::api::external::UpdateResult; use omicron_common::api::external::Vni; use omicron_common::api::internal::nexus; +use sled_agent_client::types::InstancePutStateBody; use sled_agent_client::types::InstanceStateRequested; use sled_agent_client::types::SourceNatConfig; use sled_agent_client::Client as SledAgentClient; @@ -327,19 +328,8 @@ impl super::Nexus { opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { - // To implement reboot, we issue a call to the sled agent to set a - // runtime state of "reboot". We cannot simply stop the Instance and - // start it again here because if we crash in the meantime, we might - // leave it stopped. - // - // When an instance is rebooted, the "rebooting" flag remains set on - // the runtime state as it transitions to "Stopping" and "Stopped". - // This flag is cleared when the state goes to "Starting". This way, - // even if the whole rack powered off while this was going on, we would - // never lose track of the fact that this Instance was supposed to be - // running. let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -355,8 +345,29 @@ impl super::Nexus { opctx: &OpContext, instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { - let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + // TODO(#2824): This needs to be a saga for crash resiliency + // purposes (otherwise the instance can be leaked if Nexus crashes + // between registration and instance start). + let (.., authz_instance, mut db_instance) = + instance_lookup.fetch().await?; + + // The instance is not really being "created" (it already exists from + // the caller's perspective), but if it does not exist on its sled, the + // target sled agent will populate its instance manager with the + // contents of this modified record, and that record needs to allow a + // transition to the Starting state. + // + // If the instance does exist on this sled, this initial runtime state + // is ignored. + let initial_runtime = nexus_db_model::InstanceRuntimeState { + state: nexus_db_model::InstanceState(InstanceState::Creating), + ..db_instance.runtime_state + }; + db_instance.runtime_state = initial_runtime; + self.instance_ensure_registered(opctx, &authz_instance, &db_instance) + .await?; + + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -373,7 +384,7 @@ impl super::Nexus { instance_lookup: &lookup::Instance<'_>, ) -> UpdateResult { let (.., authz_instance, db_instance) = instance_lookup.fetch().await?; - self.instance_set_runtime( + self.instance_request_state( opctx, &authz_instance, &db_instance, @@ -395,13 +406,20 @@ impl super::Nexus { fn check_runtime_change_allowed( &self, runtime: &nexus::InstanceRuntimeState, + requested: &InstanceStateRequested, ) -> Result<(), Error> { // Users are allowed to request a start or stop even if the instance is // already in the desired state (or moving to it), and we will issue a // request to the SA to make the state change in these cases in case the - // runtime state we saw here was stale. However, users are not allowed - // to change the state of an instance that's migrating, failed or - // destroyed. + // runtime state we saw here was stale. + // + // Users cannot change the state of a failed or destroyed instance. + // TODO(#2825): Failed instances should be allowed to stop. + // + // Migrating instances can't change state until they're done migrating, + // but for idempotency, a request to make an incarnation of an instance + // into a migration target is allowed if the incarnation is already a + // migration target. let allowed = match runtime.run_state { InstanceState::Creating => true, InstanceState::Starting => true, @@ -409,7 +427,9 @@ impl super::Nexus { InstanceState::Stopping => true, InstanceState::Stopped => true, InstanceState::Rebooting => true, - InstanceState::Migrating => false, + InstanceState::Migrating => { + matches!(requested, InstanceStateRequested::MigrationTarget(_)) + } InstanceState::Repairing => false, InstanceState::Failed => false, InstanceState::Destroyed => false, @@ -427,9 +447,7 @@ impl super::Nexus { } } - /// Modifies the runtime state of the Instance as requested. This generally - /// means booting or halting the Instance. - pub(crate) async fn instance_set_runtime( + pub(crate) async fn instance_request_state( &self, opctx: &OpContext, authz_instance: &authz::Instance, @@ -437,11 +455,35 @@ impl super::Nexus { requested: InstanceStateRequested, ) -> Result<(), Error> { opctx.authorize(authz::Action::Modify, authz_instance).await?; - self.check_runtime_change_allowed( &db_instance.runtime().clone().into(), + &requested, )?; + let sa = self.instance_sled(&db_instance).await?; + let instance_put_result = sa + .instance_put_state( + &db_instance.id(), + &InstancePutStateBody { state: requested }, + ) + .await + .map(|res| res.into_inner().updated_runtime); + + self.handle_instance_put_result(db_instance, instance_put_result) + .await + .map(|_| ()) + } + + /// Modifies the runtime state of the Instance as requested. This generally + /// means booting or halting the Instance. + pub(crate) async fn instance_ensure_registered( + &self, + opctx: &OpContext, + authz_instance: &authz::Instance, + db_instance: &db::model::Instance, + ) -> Result<(), Error> { + opctx.authorize(authz::Action::Modify, authz_instance).await?; + // Gather disk information and turn that into DiskRequests let disks = self .db_datastore @@ -587,31 +629,73 @@ impl super::Nexus { let sa = self.instance_sled(&db_instance).await?; - let instance_put_result = sa - .instance_put( + let instance_register_result = sa + .instance_register( &db_instance.id(), &sled_agent_client::types::InstanceEnsureBody { initial: instance_hardware, - target: requested, - migrate: None, }, ) - .await; + .await + .map(|res| Some(res.into_inner())); + + self.handle_instance_put_result(db_instance, instance_register_result) + .await + .map(|_| ()) + } - match instance_put_result { - Ok(new_runtime) => { + /// Updates an instance's CRDB record based on the result of a call to sled + /// agent that tried to update the instance's state. + /// + /// # Parameters + /// + /// - `db_instance`: The CRDB instance record observed by the caller before + /// it attempted to update the instance's state. + /// - `result`: The result of the relevant sled agent operation. If this is + /// `Ok`, the payload is the updated instance runtime state returned from + /// sled agent, if there was one. + /// + /// # Return value + /// + /// - `Ok(true)` if the caller supplied an updated instance record and this + /// routine successfully wrote it to CRDB. + /// - `Ok(false)` if the sled agent call succeeded, but this routine did not + /// update CRDB. + /// This can happen either because sled agent didn't return an updated + /// record or because the updated record was superseded by a state update + /// with a more advanced generation number. + /// - `Err` if the sled agent operation failed or this routine received an + /// error while trying to update CRDB. + async fn handle_instance_put_result( + &self, + db_instance: &db::model::Instance, + result: Result< + Option, + sled_agent_client::Error, + >, + ) -> Result { + slog::debug!(&self.log, "Handling sled agent instance PUT result"; + "result" => ?result); + + match result { + Ok(Some(new_runtime)) => { let new_runtime: nexus::InstanceRuntimeState = - new_runtime.into_inner().into(); + new_runtime.into(); - self.db_datastore + let update_result = self + .db_datastore .instance_update_runtime( &db_instance.id(), &new_runtime.into(), ) - .await - .map(|_| ()) - } + .await; + slog::debug!(&self.log, + "Attempted DB update after instance PUT"; + "result" => ?update_result); + update_result + } + Ok(None) => Ok(false), Err(e) => { // The sled-agent has told us that it can't do what we // requested, but does that mean a failure? One example would be diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index 0504f9d8ed..f96381af3e 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -120,8 +120,12 @@ declare_saga_actions! { V2P_ENSURE -> "v2p_ensure" { + sic_v2p_ensure } - INSTANCE_ENSURE -> "instance_ensure" { - + sic_instance_ensure + INSTANCE_ENSURE_REGISTERED -> "instance_ensure_registered" { + + sic_instance_ensure_registered + - sic_instance_ensure_registered_undo + } + INSTANCE_ENSURE_RUNNING -> "instance_ensure_running" { + + sic_instance_ensure_running } } @@ -348,8 +352,10 @@ impl NexusSaga for SagaInstanceCreate { builder.append(v2p_ensure_undo_action()); builder.append(v2p_ensure_action()); - builder.append(instance_ensure_action()); - + builder.append(instance_ensure_registered_action()); + if params.create_params.start { + builder.append(instance_ensure_running_action()); + } Ok(builder.build()?) } } @@ -1255,10 +1261,54 @@ async fn sic_delete_instance_record( Ok(()) } -async fn sic_instance_ensure( +async fn sic_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { + Ok(()) +} + +/// Ensure that the necessary v2p mappings exist for this instance +async fn sic_v2p_ensure( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + let instance_id = sagactx.lookup::("instance_id")?; + + osagactx + .nexus() + .create_instance_v2p_mappings(&opctx, instance_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn sic_v2p_ensure_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + let instance_id = sagactx.lookup::("instance_id")?; + + osagactx + .nexus() + .delete_instance_v2p_mappings(&opctx, instance_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn sic_instance_ensure_registered( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - // TODO-correctness is this idempotent? let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; let datastore = osagactx.datastore(); @@ -1281,15 +1331,20 @@ async fn sic_instance_ensure( if !params.create_params.start { let instance_id = db_instance.id(); // If we don't need to start the instance, we can skip the ensure - // and just update the instance runtime state to `Stopped` + // and just update the instance runtime state to `Stopped`. + // + // TODO-correctness: This is dangerous if this step is replayed, since + // a user can discover this instance and ask to start it in between + // attempts to run this step. let runtime_state = db::model::InstanceRuntimeState { state: db::model::InstanceState::new(InstanceState::Stopped), // Must update the generation, or the database query will fail. // - // The runtime state of the instance record is only changed as a result - // of the successful completion of the saga (i.e. after ensure which we're - // skipping in this case) or during saga unwinding. So we're guaranteed - // that the cached generation in the saga log is the most recent in the database. + // The runtime state of the instance record is only changed as a + // result of the successful completion of the saga (i.e. after + // ensure which we're skipping in this case) or during saga + // unwinding. So we're guaranteed that the cached generation in the + // saga log is the most recent in the database. gen: db::model::Generation::from( db_instance.runtime_state.gen.next(), ), @@ -1310,12 +1365,7 @@ async fn sic_instance_ensure( } else { osagactx .nexus() - .instance_set_runtime( - &opctx, - &authz_instance, - &db_instance, - InstanceStateRequested::Running, - ) + .instance_ensure_registered(&opctx, &authz_instance, &db_instance) .await .map_err(ActionError::action_failed)?; } @@ -1323,45 +1373,64 @@ async fn sic_instance_ensure( Ok(()) } -async fn sic_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { - Ok(()) -} - -/// Ensure that the necessary v2p mappings exist for this instance -async fn sic_v2p_ensure( +async fn sic_instance_ensure_registered_undo( sagactx: NexusActionContext, -) -> Result<(), ActionError> { +) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let datastore = osagactx.datastore(); + let instance_id = sagactx.lookup::("instance_id")?; let opctx = crate::context::op_context_for_saga_action( &sagactx, ¶ms.serialized_authn, ); - let instance_id = sagactx.lookup::("instance_id")?; + + let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; osagactx .nexus() - .create_instance_v2p_mappings(&opctx, instance_id) + .instance_request_state( + &opctx, + &authz_instance, + &db_instance, + InstanceStateRequested::Destroyed, + ) .await .map_err(ActionError::action_failed)?; Ok(()) } -async fn sic_v2p_ensure_undo( +async fn sic_instance_ensure_running( sagactx: NexusActionContext, -) -> Result<(), anyhow::Error> { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let datastore = osagactx.datastore(); + let instance_id = sagactx.lookup::("instance_id")?; let opctx = crate::context::op_context_for_saga_action( &sagactx, ¶ms.serialized_authn, ); - let instance_id = sagactx.lookup::("instance_id")?; + + let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; osagactx .nexus() - .delete_instance_v2p_mappings(&opctx, instance_id) + .instance_request_state( + &opctx, + &authz_instance, + &db_instance, + InstanceStateRequested::Running, + ) .await .map_err(ActionError::action_failed)?; diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 19ce22fee0..c91fc14852 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -82,7 +82,7 @@ }, "/instances/{instance_id}": { "put": { - "operationId": "instance_put", + "operationId": "instance_register", "parameters": [ { "in": "path", @@ -178,6 +178,50 @@ } } }, + "/instances/{instance_id}/state": { + "put": { + "operationId": "instance_put_state", + "parameters": [ + { + "in": "path", + "name": "instance_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InstancePutStateBody" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InstancePutStateResponse" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/services": { "put": { "operationId": "services_put", @@ -1008,38 +1052,20 @@ "minimum": 0 }, "InstanceEnsureBody": { - "description": "Sent to a sled agent to establish the runtime state of an Instance", + "description": "The body of a request to ensure that an instance is known to a sled agent.", "type": "object", "properties": { "initial": { - "description": "Last runtime state of the Instance known to Nexus (used if the agent has never seen this Instance before).", + "description": "A description of the instance's virtual hardware and the initial runtime state this sled agent should store for this incarnation of the instance.", "allOf": [ { "$ref": "#/components/schemas/InstanceHardware" } ] - }, - "migrate": { - "nullable": true, - "description": "If we're migrating this instance, the details needed to drive the migration", - "allOf": [ - { - "$ref": "#/components/schemas/InstanceMigrationTargetParams" - } - ] - }, - "target": { - "description": "requested runtime state of the Instance", - "allOf": [ - { - "$ref": "#/components/schemas/InstanceStateRequested" - } - ] } }, "required": [ - "initial", - "target" + "initial" ] }, "InstanceHardware": { @@ -1117,12 +1143,15 @@ ] }, "InstanceMigrationTargetParams": { + "description": "Parameters used when directing Propolis to initialize itself via live migration.", "type": "object", "properties": { "src_propolis_addr": { + "description": "The address of the Propolis server that will serve as the migration source.", "type": "string" }, "src_propolis_id": { + "description": "The Propolis ID of the migration source.", "type": "string", "format": "uuid" } @@ -1132,6 +1161,38 @@ "src_propolis_id" ] }, + "InstancePutStateBody": { + "description": "The body of a request to move a previously-ensured instance into a specific runtime state.", + "type": "object", + "properties": { + "state": { + "description": "The state into which the instance should be driven.", + "allOf": [ + { + "$ref": "#/components/schemas/InstanceStateRequested" + } + ] + } + }, + "required": [ + "state" + ] + }, + "InstancePutStateResponse": { + "description": "The response sent from a request to move an instance into a specific runtime state.", + "type": "object", + "properties": { + "updated_runtime": { + "nullable": true, + "description": "The current runtime state of the instance after handling the request to change its state. If the instance's state did not change, this field is `None`.", + "allOf": [ + { + "$ref": "#/components/schemas/InstanceRuntimeState" + } + ] + } + } + }, "InstanceRuntimeState": { "description": "Runtime state of the Instance, including the actual running state and minimal metadata\n\nThis state is owned by the sled agent running that Instance.", "type": "object", @@ -1294,32 +1355,83 @@ "InstanceStateRequested": { "description": "Requestable running state of an Instance.\n\nA subset of [`omicron_common::api::external::InstanceState`].", "oneOf": [ + { + "description": "Run this instance by migrating in from a previous running incarnation of the instance.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "migration_target" + ] + }, + "value": { + "$ref": "#/components/schemas/InstanceMigrationTargetParams" + } + }, + "required": [ + "type", + "value" + ] + }, { "description": "Start the instance if it is not already running.", - "type": "string", - "enum": [ - "running" + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "running" + ] + } + }, + "required": [ + "type" ] }, { "description": "Stop the instance.", - "type": "string", - "enum": [ - "stopped" + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "stopped" + ] + } + }, + "required": [ + "type" ] }, { - "description": "Issue a reset command to the instance, such that it should stop and then immediately become running.", - "type": "string", - "enum": [ - "reboot" + "description": "Immediately reset the instance, as though it had stopped and immediately began to run again.", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "reboot" + ] + } + }, + "required": [ + "type" ] }, { "description": "Stop the instance and delete it.", - "type": "string", - "enum": [ - "destroyed" + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "destroyed" + ] + } + }, + "required": [ + "type" ] } ] diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index d04dfab255..45ece2baa0 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -4,10 +4,10 @@ //! HTTP entrypoint functions for the sled agent's exposed API -use crate::params::VpcFirewallRulesEnsureBody; use crate::params::{ - DatasetEnsureBody, DiskEnsureBody, InstanceEnsureBody, ServiceEnsureBody, - SledRole, TimeSync, Zpool, + DatasetEnsureBody, DiskEnsureBody, InstanceEnsureBody, + InstancePutStateBody, InstancePutStateResponse, ServiceEnsureBody, + SledRole, TimeSync, VpcFirewallRulesEnsureBody, Zpool, }; use dropshot::{ endpoint, ApiDescription, HttpError, HttpResponseOk, @@ -32,7 +32,8 @@ pub fn api() -> SledApiDescription { api.register(disk_put)?; api.register(filesystem_put)?; api.register(instance_issue_disk_snapshot_request)?; - api.register(instance_put)?; + api.register(instance_put_state)?; + api.register(instance_register)?; api.register(services_put)?; api.register(sled_role_get)?; api.register(set_v2p)?; @@ -118,7 +119,7 @@ struct InstancePathParam { method = PUT, path = "/instances/{instance_id}", }] -async fn instance_put( +async fn instance_register( rqctx: RequestContext, path_params: Path, body: TypedBody, @@ -127,14 +128,28 @@ async fn instance_put( let instance_id = path_params.into_inner().instance_id; let body_args = body.into_inner(); Ok(HttpResponseOk( - sa.instance_ensure( - instance_id, - body_args.initial, - body_args.target, - body_args.migrate, - ) - .await - .map_err(Error::from)?, + sa.instance_ensure_registered(instance_id, body_args.initial) + .await + .map_err(Error::from)?, + )) +} + +#[endpoint { + method = PUT, + path = "/instances/{instance_id}/state", +}] +async fn instance_put_state( + rqctx: RequestContext, + path_params: Path, + body: TypedBody, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + let body_args = body.into_inner(); + Ok(HttpResponseOk( + sa.instance_ensure_state(instance_id, body_args.state) + .await + .map_err(Error::from)?, )) } diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index b4f34c249b..89ca7bcf39 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -89,6 +89,9 @@ pub enum Error { #[error("Instance {0} not running!")] InstanceNotRunning(Uuid), + + #[error("Instance already registered with Propolis ID {0}")] + InstanceAlreadyRegistered(Uuid), } // Issues read-only, idempotent HTTP requests at propolis until it responds with @@ -159,14 +162,12 @@ enum Reaction { struct RunningState { // Connection to Propolis. client: Arc, - // Object representing membership in the "instance manager". - instance_ticket: InstanceTicket, // Objects representing the instance's OPTE ports in the port manager port_tickets: Option>, // Handle to task monitoring for Propolis state changes. monitor_task: Option>, // Handle to the zone. - _running_zone: RunningZone, + running_zone: RunningZone, } impl Drop for RunningState { @@ -234,18 +235,40 @@ struct InstanceInner { // Connection to Nexus lazy_nexus_client: LazyNexusClient, + + // Object representing membership in the "instance manager". + instance_ticket: InstanceTicket, } impl InstanceInner { + /// Yields this instance's ID. fn id(&self) -> &Uuid { &self.properties.id } - /// UUID of the underlying propolis-server process + /// Yields this instance's Propolis's ID. fn propolis_id(&self) -> &Uuid { &self.propolis_id } + async fn publish_state_to_nexus(&self) -> Result<(), Error> { + self.lazy_nexus_client + .get() + .await? + .cpapi_instances_put( + self.id(), + &nexus_client::types::InstanceRuntimeState::from( + self.state.current().clone(), + ), + ) + .await + .map_err(|e| Error::Notification(e))?; + + Ok(()) + } + + /// Processes a Propolis state change observed by the Propolis monitoring + /// task. async fn observe_state( &mut self, state: propolis_client::api::InstanceState, @@ -261,18 +284,10 @@ impl InstanceInner { action ); - // Notify Nexus of the state change. - self.lazy_nexus_client - .get() - .await? - .cpapi_instances_put( - self.id(), - &nexus_client::types::InstanceRuntimeState::from( - self.state.current().clone(), - ), - ) - .await - .map_err(|e| Error::Notification(e))?; + // TODO(#2727): Any failure here (even a transient failure) causes the + // monitoring task to exit, marooning the instance. Decide where best + // to handle this. + self.publish_state_to_nexus().await?; // Take the next action, if any. if let Some(action) = action { @@ -282,6 +297,7 @@ impl InstanceInner { } } + /// Sends an instance state PUT request to this instance's Propolis. async fn propolis_state_put( &self, request: propolis_client::api::InstanceStateRequested, @@ -297,15 +313,13 @@ impl InstanceInner { Ok(()) } - async fn ensure( - &mut self, - instance: Instance, - instance_ticket: InstanceTicket, - setup: PropolisSetup, + /// Sends an instance ensure request to this instance's Propolis. + async fn propolis_ensure( + &self, + client: &PropolisClient, + running_zone: &RunningZone, migrate: Option, ) -> Result<(), Error> { - let PropolisSetup { client, running_zone, port_tickets } = setup; - let nics = running_zone .opte_ports() .iter() @@ -349,6 +363,26 @@ impl InstanceInner { let result = client.instance_ensure().body(request).send().await; info!(self.log, "result of instance_ensure call is {:?}", result); result?; + Ok(()) + } + + /// Given a freshly-created Propolis process, sends an ensure request to + /// that Propolis and launches all of the tasks needed to monitor the + /// resulting Propolis VM. + /// + /// # Panics + /// + /// Panics if this routine is called more than once for a given Instance. + async fn ensure_propolis_and_tasks( + &mut self, + instance: Instance, + setup: PropolisSetup, + migrate: Option, + ) -> Result<(), Error> { + assert!(self.running_state.is_none()); + + let PropolisSetup { client, running_zone, port_tickets } = setup; + self.propolis_ensure(&client, &running_zone, migrate).await?; // Monitor propolis for state changes in the background. let monitor_client = client.clone(); @@ -363,10 +397,9 @@ impl InstanceInner { self.running_state = Some(RunningState { client, - instance_ticket, port_tickets, monitor_task, - _running_zone: running_zone, + running_zone, }); Ok(()) @@ -412,22 +445,20 @@ pub struct Instance { #[cfg(test)] mockall::mock! { pub Instance { + #[allow(clippy::too_many_arguments)] pub fn new( log: Logger, id: Uuid, + ticket: InstanceTicket, initial: InstanceHardware, vnic_allocator: VnicAllocator, port_manager: PortManager, lazy_nexus_client: LazyNexusClient, ) -> Result; - pub async fn start( - &self, - instance_ticket: InstanceTicket, - migrate: Option, - ) -> Result<(), Error>; - pub async fn transition( + pub async fn current_state(&self) -> InstanceRuntimeState; + pub async fn put_state( &self, - target: InstanceStateRequested, + state: InstanceStateRequested ) -> Result; pub async fn issue_snapshot_request( &self, @@ -455,9 +486,11 @@ impl Instance { /// ports. /// * `lazy_nexus_client`: Connection to Nexus, used for sending notifications. // TODO: This arg list is getting a little long; can we clean this up? + #[allow(clippy::too_many_arguments)] pub fn new( log: Logger, id: Uuid, + ticket: InstanceTicket, initial: InstanceHardware, vnic_allocator: VnicAllocator, port_manager: PortManager, @@ -492,6 +525,7 @@ impl Instance { state: InstanceStates::new(initial.runtime), running_state: None, lazy_nexus_client, + instance_ticket: ticket, }; let inner = Arc::new(Mutex::new(instance)); @@ -499,25 +533,142 @@ impl Instance { Ok(Instance { inner }) } + pub async fn current_state(&self) -> InstanceRuntimeState { + let inner = self.inner.lock().await; + inner.state.current().clone() + } + + /// Ensures that a Propolis process exists for this instance, then sends it + /// an instance ensure request. + async fn propolis_ensure( + &self, + inner: &mut MutexGuard<'_, InstanceInner>, + migration_params: Option, + ) -> Result<(), Error> { + if let Some(running_state) = inner.running_state.as_ref() { + inner + .propolis_ensure( + &running_state.client, + &running_state.running_zone, + migration_params, + ) + .await?; + } else { + let setup_result: Result<(), Error> = 'setup: { + // If there's no Propolis yet, and this instance is not being + // initialized via migration, immediately send a state update to + // Nexus to reflect that the instance is starting (so that the + // external API will display this state while the zone is being + // started). + // + // Migration targets don't do this because the instance is still + // logically running (on the source) while the target Propolis + // is being launched. + if migration_params.is_none() { + inner.state.transition(InstanceState::Starting); + if let Err(e) = inner.publish_state_to_nexus().await { + break 'setup Err(e); + } + } + + // Set up the Propolis zone and the objects associated with it. + let setup = match self.setup_propolis_locked(inner).await { + Ok(setup) => setup, + Err(e) => break 'setup Err(e), + }; + + // Direct the Propolis server to create its VM and the tasks + // associated with it. On success, the zone handle moves into + // this instance, preserving the zone. + inner + .ensure_propolis_and_tasks( + self.clone(), + setup, + migration_params, + ) + .await + }; + + // If this instance started from scratch, and startup failed, move + // the instance to the Failed state instead of leaking the Starting + // state. + // + // Once again, migration targets don't do this, because a failure to + // start a migration target simply leaves the VM running untouched + // on the source. + if migration_params.is_none() && setup_result.is_err() { + inner.state.transition(InstanceState::Failed); + inner.publish_state_to_nexus().await?; + } + setup_result?; + } + Ok(()) + } + + /// Attempts to update the current state of the instance by launching a + /// Propolis process for the instance (if needed) and issuing an appropriate + /// request to Propolis to change state. + /// + /// Returns the instance's state after applying any changes required by this + /// call. Note that if the instance's Propolis is in the middle of its own + /// state transition, it may publish states that supersede the state + /// published by this routine in perhaps-surprising ways. For example, if an + /// instance begins to stop when Propolis has just begun to handle a prior + /// request to reboot, the instance's state may proceed from Stopping to + /// Rebooting to Running to Stopping to Stopped. + pub async fn put_state( + &self, + state: crate::params::InstanceStateRequested, + ) -> Result { + use propolis_client::api::InstanceStateRequested as PropolisRequest; + let mut inner = self.inner.lock().await; + let (propolis_state, next_published) = match state { + InstanceStateRequested::MigrationTarget(migration_params) => { + self.propolis_ensure(&mut inner, Some(migration_params)) + .await?; + (None, None) + } + InstanceStateRequested::Running => { + self.propolis_ensure(&mut inner, None).await?; + (Some(PropolisRequest::Run), None) + } + InstanceStateRequested::Stopped => { + // If the instance has not started yet, unregister it + // immediately. Since there is no Propolis to push updates when + // this happens, generate an instance record bearing the + // "Destroyed" state and return it to the caller. + if inner.running_state.is_none() { + inner.instance_ticket.terminate(); + (None, Some(InstanceState::Destroyed)) + } else { + (Some(PropolisRequest::Stop), Some(InstanceState::Stopping)) + } + } + InstanceStateRequested::Reboot => { + if inner.running_state.is_none() { + return Err(Error::InstanceNotRunning(*inner.id())); + } + (Some(PropolisRequest::Reboot), Some(InstanceState::Rebooting)) + } + InstanceStateRequested::Destroyed => { + inner.instance_ticket.terminate(); + (None, Some(InstanceState::Destroyed)) + } + }; + + if let Some(p) = propolis_state { + inner.propolis_state_put(p).await?; + } + if let Some(s) = next_published { + inner.state.transition(s); + } + Ok(inner.state.current().clone()) + } + async fn setup_propolis_locked( &self, inner: &mut MutexGuard<'_, InstanceInner>, ) -> Result { - // Update nexus with an in-progress state while we set up the instance. - inner.state.transition(InstanceState::Starting); - inner - .lazy_nexus_client - .get() - .await? - .cpapi_instances_put( - inner.id(), - &nexus_client::types::InstanceRuntimeState::from( - inner.state.current().clone(), - ), - ) - .await - .map_err(|e| Error::Notification(e))?; - // Create OPTE ports for the instance let mut opte_ports = Vec::with_capacity(inner.requested_nics.len()); let mut port_tickets = Vec::with_capacity(inner.requested_nics.len()); @@ -698,25 +849,7 @@ impl Instance { }) } - /// Begins the execution of the instance's service (Propolis). - pub async fn start( - &self, - instance_ticket: InstanceTicket, - migrate: Option, - ) -> Result<(), Error> { - let mut inner = self.inner.lock().await; - - // Create the propolis zone and resources - let setup = self.setup_propolis_locked(&mut inner).await?; - - // Ensure the instance exists in the Propolis Server before we start - // using it. - inner.ensure(self.clone(), instance_ticket, setup, migrate).await?; - - Ok(()) - } - - // Terminate the Propolis service. + /// Terminates this instance's Propolis zone. async fn stop(&self) -> Result<(), Error> { let mut inner = self.inner.lock().await; @@ -725,10 +858,10 @@ impl Instance { Zones::halt_and_remove_logged(&inner.log, &zname).await.unwrap(); // Remove ourselves from the instance manager's map of instances. - let running_state = inner.running_state.as_mut().unwrap(); - running_state.instance_ticket.terminate(); + inner.instance_ticket.terminate(); // And remove the OPTE ports from the port manager + let running_state = inner.running_state.as_mut().unwrap(); let mut result = Ok(()); if let Some(tickets) = running_state.port_tickets.as_mut() { for ticket in tickets.iter_mut() { @@ -776,49 +909,6 @@ impl Instance { } } - /// Attempts to change the current state of the instance by issuing an - /// appropriate state change command to its Propolis. - /// - /// Returns the instance's state after applying any changes required by this - /// call. Note that if the instance's Propolis is in the middle of its own - /// state transition, it may publish states that supersede the state - /// published by this routine in perhaps-surprising ways. For example, if an - /// instance begins to stop when Propolis has just begun to handle a prior - /// request to reboot, the instance's state may proceed from Stopping to - /// Rebooting to Running to Stopping to Stopped. - /// - /// # Panics - /// - /// This method may panic if it has been invoked before [`Instance::start`]. - pub async fn transition( - &self, - target: InstanceStateRequested, - ) -> Result { - let mut inner = self.inner.lock().await; - - let (propolis_state, next_published) = match target { - InstanceStateRequested::Running => { - (propolis_client::api::InstanceStateRequested::Run, None) - } - InstanceStateRequested::Stopped - | InstanceStateRequested::Destroyed => ( - propolis_client::api::InstanceStateRequested::Stop, - Some(InstanceState::Stopping), - ), - InstanceStateRequested::Reboot => ( - propolis_client::api::InstanceStateRequested::Reboot, - Some(InstanceState::Rebooting), - ), - }; - - inner.propolis_state_put(propolis_state).await?; - if let Some(s) = next_published { - inner.state.transition(s); - } - - Ok(inner.state.current().clone()) - } - pub async fn issue_snapshot_request( &self, disk_id: Uuid, @@ -845,6 +935,7 @@ impl Instance { #[cfg(test)] mod test { use super::*; + use crate::instance_manager::InstanceManager; use crate::nexus::LazyNexusClient; use crate::params::InstanceStateRequested; use crate::params::SourceNatConfig; @@ -913,9 +1004,6 @@ mod test { #[tokio::test] #[serial_test::serial] - #[should_panic( - expected = "Propolis client should be initialized before usage" - )] async fn transition_before_start() { let logctx = test_setup_log("transition_before_start"); let log = &logctx.log; @@ -930,10 +1018,19 @@ mod test { let lazy_nexus_client = LazyNexusClient::new(log.clone(), std::net::Ipv6Addr::LOCALHOST) .unwrap(); + let instance_manager = InstanceManager::new( + log.clone(), + lazy_nexus_client.clone(), + Etherstub("mylink".to_string()), + underlay_ip, + mac, + ) + .unwrap(); let inst = Instance::new( log.clone(), test_uuid(), + instance_manager.test_instance_ticket(test_uuid()), new_initial_instance(), vnic_allocator, port_manager, @@ -941,12 +1038,9 @@ mod test { ) .unwrap(); - // Remove the logfile before we expect to panic, or it'll never be - // cleaned up. - logctx.cleanup_successful(); + // Pick a state transition that requires the instance to have started. + assert!(inst.put_state(InstanceStateRequested::Reboot).await.is_err()); - // Trying to transition before the instance has been initialized will - // result in a panic. - inst.transition(InstanceStateRequested::Running).await.unwrap(); + logctx.cleanup_successful(); } } diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index b5a36b8272..9f5091ade9 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -5,9 +5,9 @@ //! API for controlling multiple instances on a sled. use crate::nexus::LazyNexusClient; -use crate::params::VpcFirewallRule; use crate::params::{ - InstanceHardware, InstanceMigrationTargetParams, InstanceStateRequested, + InstanceHardware, InstancePutStateResponse, InstanceStateRequested, + VpcFirewallRule, }; use illumos_utils::dladm::Etherstub; use illumos_utils::link::VnicAllocator; @@ -87,97 +87,121 @@ impl InstanceManager { }) } - /// Idempotently ensures that the given Instance (described by - /// `initial_hardware`) exists on this server in the given runtime state - /// (described by `target`). - pub async fn ensure( + /// Ensures that the instance manager contains a registered instance with + /// the supplied instance ID and the Propolis ID specified in + /// `initial_hardware`. + /// + /// # Arguments + /// + /// * instance_id: The ID of the instance to register. + /// * initial_hardware: The initial hardware manifest and runtime state of + /// the instance, to be used if the instance does not already exist. + /// + /// # Return value + /// + /// `Ok` if the instance is registered with the supplied Propolis ID, `Err` + /// otherwise. This routine is idempotent if called to register the same + /// (instance ID, Propolis ID) pair multiple times, but will fail if the + /// instance is registered with a Propolis ID different from the one the + /// caller supplied. + pub async fn ensure_registered( &self, instance_id: Uuid, initial_hardware: InstanceHardware, - target: InstanceStateRequested, - migrate: Option, ) -> Result { + let requested_propolis_id = initial_hardware.runtime.propolis_id; info!( &self.inner.log, - "instance_ensure {} -> {:?}", instance_id, target + "ensuring instance is registered"; + "instance_id" => %instance_id, + "propolis_id" => %requested_propolis_id ); - let target_propolis_id = initial_hardware.runtime.propolis_id; - - let (instance, maybe_instance_ticket) = { + let instance = { let mut instances = self.inner.instances.lock().unwrap(); - match (instances.get(&instance_id), &migrate) { - (Some((_, instance)), None) => { - // Instance already exists and we're not performing a migration - info!(&self.inner.log, "instance already exists"); - (instance.clone(), None) - } - (Some((propolis_id, instance)), Some(_)) - if *propolis_id == target_propolis_id => - { - // A migration was requested but the given propolis id - // already seems to be the propolis backing this instance - // so just return the instance as is without triggering - // another migration. + if let Some((existing_propolis_id, existing_instance)) = + instances.get(&instance_id) + { + if requested_propolis_id != *existing_propolis_id { + info!(&self.inner.log, + "instance already registered with another Propolis ID"; + "instance_id" => %instance_id, + "existing_propolis_id" => %*existing_propolis_id); + return Err(Error::Instance( + crate::instance::Error::InstanceAlreadyRegistered( + *existing_propolis_id, + ), + )); + } else { info!( &self.inner.log, - "instance already exists with given dst propolis" + "instance already registered with requested Propolis ID" ); - (instance.clone(), None) + existing_instance.clone() } - _ => { - // Instance does not exist or one does but we're performing - // a intra-sled migration. Either way - create an instance - info!(&self.inner.log, "new instance"); - let instance_log = self.inner.log.new(o!()); - let instance = Instance::new( - instance_log, - instance_id, - initial_hardware, - self.inner.vnic_allocator.clone(), - self.inner.port_manager.clone(), - self.inner.lazy_nexus_client.clone(), - )?; - let instance_clone = instance.clone(); - let old_instance = instances - .insert(instance_id, (target_propolis_id, instance)); - if let Some((_old_propolis_id, old_instance)) = old_instance - { - // If we had a previous instance, we must be migrating - assert!(migrate.is_some()); - // TODO: assert that old_instance.inner.propolis_id() == migrate.src_uuid - - // We forget the old instance because otherwise if it is the last - // handle, the `InstanceTicket` it holds will also be dropped. - // `InstanceTicket::drop` will try to remove the corresponding - // instance from the instance manager. It does this based off the - // instance's ID. Given that we just replaced said instance using - // the same ID, that would inadvertantly remove our newly created - // instance instead. - // TODO: cleanup source instance properly - std::mem::forget(old_instance); - } + } else { + info!(&self.inner.log, "new instance"); + let instance_log = self.inner.log.new(o!()); + let ticket = + InstanceTicket::new(instance_id, self.inner.clone()); + let instance = Instance::new( + instance_log, + instance_id, + ticket, + initial_hardware, + self.inner.vnic_allocator.clone(), + self.inner.port_manager.clone(), + self.inner.lazy_nexus_client.clone(), + )?; + let instance_clone = instance.clone(); + let _old = instances + .insert(instance_id, (requested_propolis_id, instance)); + assert!(_old.is_none()); + instance_clone + } + }; - let ticket = Some(InstanceTicket::new( - instance_id, - self.inner.clone(), - )); - (instance_clone, ticket) + Ok(instance.current_state().await) + } + + /// Idempotently attempts to drive the supplied instance into the supplied + /// runtime state. + pub async fn ensure_state( + &self, + instance_id: Uuid, + target: InstanceStateRequested, + ) -> Result { + let instance = { + let instances = self.inner.instances.lock().unwrap(); + let instance = instances.get(&instance_id); + + if let Some((_, instance)) = instance { + instance.clone() + } else { + match target { + // If the instance isn't registered, then by definition it + // isn't running here. Allow requests to stop or destroy the + // instance to succeed to provide idempotency. This has to + // be handled here (that is, on the "instance not found" + // path) to handle the case where a stop request arrived, + // Propolis handled it, sled agent unregistered the + // instance, and only then did a second stop request + // arrive. + InstanceStateRequested::Stopped + | InstanceStateRequested::Destroyed => { + return Ok(InstancePutStateResponse { + updated_runtime: None, + }); + } + _ => { + return Err(Error::NoSuchInstance(instance_id)); + } } } }; - // If we created a new instance, start or migrate it - but do so outside - // the "instances" lock, since initialization may take a while. - // - // Additionally, this makes it possible to manage the "instance_ticket", - // which might need to grab the lock to remove the instance during - // teardown. - if let Some(instance_ticket) = maybe_instance_ticket { - instance.start(instance_ticket, migrate).await?; - } - - instance.transition(target).await.map_err(|e| e.into()) + let new_state = instance.put_state(target).await?; + Ok(InstancePutStateResponse { updated_runtime: Some(new_state) }) } pub async fn instance_issue_disk_snapshot_request( @@ -238,6 +262,14 @@ impl InstanceManager { self.inner.port_manager.unset_virtual_nic_host(mapping)?; Ok(()) } + + /// Generates an instance ticket associated with this instance manager. This + /// allows tests in other modules to create an Instance even though they + /// lack visibility to `InstanceManagerInternal`. + #[cfg(test)] + pub fn test_instance_ticket(&self, instance_id: Uuid) -> InstanceTicket { + InstanceTicket::new(instance_id, self.inner.clone()) + } } /// Represents membership of an instance in the [`InstanceManager`]. @@ -366,39 +398,65 @@ mod test { let ticket = Arc::new(std::sync::Mutex::new(None)); let ticket_clone = ticket.clone(); let instance_new_ctx = MockInstance::new_context(); - instance_new_ctx.expect().return_once(move |_, _, _, _, _, _| { + let mut seq = mockall::Sequence::new(); + + // Expect one call to new() that produces an instance that expects to be + // cloned once. The clone should expect to ask to be put into the + // Running state. + instance_new_ctx.expect().return_once(move |_, _, t, _, _, _, _| { let mut inst = MockInstance::default(); - inst.expect_clone().return_once(move || { - let mut inst = MockInstance::default(); - inst.expect_start().return_once(move |t, _| { - // Grab hold of the ticket, so we don't try to remove the - // instance immediately after "start" completes. - let mut ticket_guard = ticket_clone.lock().unwrap(); - *ticket_guard = Some(t); - Ok(()) - }); - inst.expect_transition().return_once(|_| { - let mut rt_state = new_initial_instance(); - rt_state.runtime.run_state = InstanceState::Running; - Ok(rt_state.runtime) - }); - inst - }); + + // Move the instance ticket out to the test, since the mock instance + // won't hold onto it. + let mut ticket_guard = ticket_clone.lock().unwrap(); + *ticket_guard = Some(t); + + // Expect to be cloned twice, once during registration (to fish the + // current state out of the instance) and once during the state + // transition (to hoist the instance reference out of the instance + // manager lock). + inst.expect_clone().times(1).in_sequence(&mut seq).return_once( + move || { + let mut inst = MockInstance::default(); + inst.expect_current_state() + .return_once(|| new_initial_instance().runtime); + inst + }, + ); + + inst.expect_clone().times(1).in_sequence(&mut seq).return_once( + move || { + let mut inst = MockInstance::default(); + inst.expect_put_state().return_once(|_| { + let mut rt_state = new_initial_instance(); + rt_state.runtime.run_state = InstanceState::Running; + Ok(rt_state.runtime) + }); + inst + }, + ); + Ok(inst) }); + + im.ensure_registered(test_uuid(), new_initial_instance()) + .await + .unwrap(); + + // The instance exists now. + assert_eq!(im.inner.instances.lock().unwrap().len(), 1); + let rt_state = im - .ensure( - test_uuid(), - new_initial_instance(), - InstanceStateRequested::Running, - None, - ) + .ensure_state(test_uuid(), InstanceStateRequested::Running) .await .unwrap(); // At this point, we can observe the expected state of the instance - // manager: contianing the created instance... - assert_eq!(rt_state.run_state, InstanceState::Running); + // manager: containing the created instance... + assert_eq!( + rt_state.updated_runtime.unwrap().run_state, + InstanceState::Running + ); assert_eq!(im.inner.instances.lock().unwrap().len(), 1); // ... however, when we drop the ticket of the corresponding instance, @@ -411,7 +469,7 @@ mod test { #[tokio::test] #[serial_test::serial] - async fn ensure_instance_repeatedly() { + async fn ensure_instance_state_repeatedly() { let logctx = test_setup_log("ensure_instance_repeatedly"); let log = &logctx.log; let lazy_nexus_client = @@ -441,18 +499,20 @@ mod test { let ticket_clone = ticket.clone(); let instance_new_ctx = MockInstance::new_context(); let mut seq = mockall::Sequence::new(); - instance_new_ctx.expect().return_once(move |_, _, _, _, _, _| { + instance_new_ctx.expect().return_once(move |_, _, t, _, _, _, _| { let mut inst = MockInstance::default(); + let mut ticket_guard = ticket_clone.lock().unwrap(); + *ticket_guard = Some(t); + // First call to ensure (start + transition). inst.expect_clone().times(1).in_sequence(&mut seq).return_once( move || { let mut inst = MockInstance::default(); - inst.expect_start().return_once(move |t, _| { - let mut ticket_guard = ticket_clone.lock().unwrap(); - *ticket_guard = Some(t); - Ok(()) - }); - inst.expect_transition().return_once(|_| { + + inst.expect_current_state() + .returning(|| new_initial_instance().runtime); + + inst.expect_put_state().return_once(|_| { let mut rt_state = new_initial_instance(); rt_state.runtime.run_state = InstanceState::Running; Ok(rt_state.runtime) @@ -460,11 +520,12 @@ mod test { inst }, ); + // Next calls to ensure (transition only). - inst.expect_clone().times(2).in_sequence(&mut seq).returning( + inst.expect_clone().times(3).in_sequence(&mut seq).returning( move || { let mut inst = MockInstance::default(); - inst.expect_transition().returning(|_| { + inst.expect_put_state().returning(|_| { let mut rt_state = new_initial_instance(); rt_state.runtime.run_state = InstanceState::Running; Ok(rt_state.runtime) @@ -477,14 +538,12 @@ mod test { let id = test_uuid(); let rt = new_initial_instance(); - let target = InstanceStateRequested::Running; - - // Creates instance, start + transition. - im.ensure(id, rt.clone(), target, None).await.unwrap(); - // Transition only. - im.ensure(id, rt.clone(), target, None).await.unwrap(); - // Transition only. - im.ensure(id, rt, target, None).await.unwrap(); + + // Register the instance, then issue all three state transitions. + im.ensure_registered(id, rt).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); + im.ensure_state(id, InstanceStateRequested::Running).await.unwrap(); assert_eq!(im.inner.instances.lock().unwrap().len(), 1); ticket.lock().unwrap().take(); diff --git a/sled-agent/src/params.rs b/sled-agent/src/params.rs index 142b459eb9..d1fa3ea8dd 100644 --- a/sled-agent/src/params.rs +++ b/sled-agent/src/params.rs @@ -65,47 +65,59 @@ pub struct InstanceHardware { pub cloud_init_bytes: Option, } -/// Sent to a sled agent to establish the runtime state of an Instance +/// The body of a request to ensure that an instance is known to a sled agent. #[derive(Serialize, Deserialize, JsonSchema)] pub struct InstanceEnsureBody { - /// Last runtime state of the Instance known to Nexus (used if the agent - /// has never seen this Instance before). + /// A description of the instance's virtual hardware and the initial runtime + /// state this sled agent should store for this incarnation of the instance. pub initial: InstanceHardware, - /// requested runtime state of the Instance - pub target: InstanceStateRequested, - /// If we're migrating this instance, the details needed to drive the migration - pub migrate: Option, } -#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +/// The body of a request to move a previously-ensured instance into a specific +/// runtime state. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct InstancePutStateBody { + /// The state into which the instance should be driven. + pub state: InstanceStateRequested, +} + +/// The response sent from a request to move an instance into a specific runtime +/// state. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct InstancePutStateResponse { + /// The current runtime state of the instance after handling the request to + /// change its state. If the instance's state did not change, this field is + /// `None`. + pub updated_runtime: Option, +} + +/// Parameters used when directing Propolis to initialize itself via live +/// migration. +#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct InstanceMigrationTargetParams { + /// The Propolis ID of the migration source. pub src_propolis_id: Uuid, + + /// The address of the Propolis server that will serve as the migration + /// source. pub src_propolis_addr: SocketAddr, } /// Requestable running state of an Instance. /// /// A subset of [`omicron_common::api::external::InstanceState`]. -#[derive( - Copy, - Clone, - Debug, - Deserialize, - Eq, - Ord, - PartialEq, - PartialOrd, - Serialize, - JsonSchema, -)] -#[serde(rename_all = "lowercase")] +#[derive(Copy, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case", tag = "type", content = "value")] pub enum InstanceStateRequested { + /// Run this instance by migrating in from a previous running incarnation of + /// the instance. + MigrationTarget(InstanceMigrationTargetParams), /// Start the instance if it is not already running. Running, /// Stop the instance. Stopped, - /// Issue a reset command to the instance, such that it should - /// stop and then immediately become running. + /// Immediately reset the instance, as though it had stopped and immediately + /// began to run again. Reboot, /// Stop the instance and delete it. Destroyed, @@ -120,6 +132,7 @@ impl Display for InstanceStateRequested { impl InstanceStateRequested { fn label(&self) -> &str { match self { + InstanceStateRequested::MigrationTarget(_) => "migrating in", InstanceStateRequested::Running => "running", InstanceStateRequested::Stopped => "stopped", InstanceStateRequested::Reboot => "reboot", @@ -130,6 +143,7 @@ impl InstanceStateRequested { /// Returns true if the state represents a stopped Instance. pub fn is_stopped(&self) -> bool { match self { + InstanceStateRequested::MigrationTarget(_) => false, InstanceStateRequested::Running => false, InstanceStateRequested::Stopped => true, InstanceStateRequested::Reboot => false, diff --git a/sled-agent/src/sim/collection.rs b/sled-agent/src/sim/collection.rs index d63623a1ba..fd9d08dec9 100644 --- a/sled-agent/src/sim/collection.rs +++ b/sled-agent/src/sim/collection.rs @@ -310,7 +310,7 @@ impl SimCollection { self: &Arc, id: &Uuid, current: S::CurrentState, - target: S::RequestedState, + target: Option, ) -> Result { let mut objects = self.objects.lock().await; let maybe_current_object = objects.remove(id); @@ -336,8 +336,12 @@ impl SimCollection { } }; - let rv = - object.transition(target).map(|_| object.object.current().clone()); + let rv = if let Some(target) = target { + object.transition(target).map(|_| object.object.current().clone()) + } else { + Ok(current.clone()) + }; + if rv.is_ok() || !is_new { objects.insert(*id, object); } @@ -347,6 +351,46 @@ impl SimCollection { pub async fn contains_key(self: &Arc, id: &Uuid) -> bool { self.objects.lock().await.contains_key(id) } + + pub async fn sim_get_current_state( + self: &Arc, + id: &Uuid, + ) -> Result { + let objects = self.objects.lock().await; + let instance = objects + .get(id) + .ok_or_else(|| Error::not_found_by_id(S::resource_type(), id))?; + Ok(instance.object.current().clone()) + } + + /// Iterates over all of the existing objects in the collection and, for any + /// that meet `condition`, asks to transition them into the supplied target + /// state. + /// + /// If any such transition fails, this routine short-circuits and does not + /// attempt to transition any other objects. + // + // TODO: It's likely more idiomatic to have an `iter_mut` routine that + // returns a struct that impls Iterator and yields &mut S references. The + // tricky bit is that the struct must hold the objects lock during the + // iteration. Figure out if there's a better way to arrange all this. + pub async fn sim_ensure_for_each_where( + self: &Arc, + condition: C, + target: &S::RequestedState, + ) -> Result<(), Error> + where + C: Fn(&S) -> bool, + { + let mut objects = self.objects.lock().await; + for o in objects.values_mut() { + if condition(&o.object) { + o.transition(target.clone())?; + } + } + + Ok(()) + } } #[cfg(test)] diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index a57eb08875..4f66d8601a 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -14,6 +14,7 @@ use dropshot::ConfigLoggingLevel; use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::api::external::ResourceType; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::ProducerEndpoint; use oximeter_producer::Server as ProducerServer; @@ -273,4 +274,8 @@ impl Simulatable for SimDisk { .map(|_| ()) .map_err(Error::from) } + + fn resource_type() -> ResourceType { + ResourceType::Disk + } } diff --git a/sled-agent/src/sim/http_entrypoints.rs b/sled-agent/src/sim/http_entrypoints.rs index 346bec17c6..423f74403a 100644 --- a/sled-agent/src/sim/http_entrypoints.rs +++ b/sled-agent/src/sim/http_entrypoints.rs @@ -4,8 +4,10 @@ //! HTTP entrypoint functions for the sled agent's exposed API -use crate::params::VpcFirewallRulesEnsureBody; -use crate::params::{DiskEnsureBody, InstanceEnsureBody}; +use crate::params::{ + DiskEnsureBody, InstanceEnsureBody, InstancePutStateBody, + InstancePutStateResponse, VpcFirewallRulesEnsureBody, +}; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::HttpError; @@ -31,6 +33,7 @@ type SledApiDescription = ApiDescription>; pub fn api() -> SledApiDescription { fn register_endpoints(api: &mut SledApiDescription) -> Result<(), String> { api.register(instance_put)?; + api.register(instance_put_state)?; api.register(instance_poke_post)?; api.register(disk_put)?; api.register(disk_poke_post)?; @@ -69,8 +72,24 @@ async fn instance_put( let instance_id = path_params.into_inner().instance_id; let body_args = body.into_inner(); Ok(HttpResponseOk( - sa.instance_ensure(instance_id, body_args.initial, body_args.target) - .await?, + sa.instance_ensure(instance_id, body_args.initial).await?, + )) +} + +#[endpoint { + method = PUT, + path = "/instances/{instance_id}/state", +}] +async fn instance_put_state( + rqctx: RequestContext>, + path_params: Path, + body: TypedBody, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + let body_args = body.into_inner(); + Ok(HttpResponseOk( + sa.instance_ensure_state(instance_id, body_args.state).await?, )) } diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index aa4de3e973..93444cd617 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -14,6 +14,7 @@ use nexus_client; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::external::InstanceState as ApiInstanceState; +use omicron_common::api::external::ResourceType; use omicron_common::api::internal::nexus::InstanceRuntimeState; use propolis_client::api::InstanceState as PropolisInstanceState; use std::collections::VecDeque; @@ -87,6 +88,23 @@ impl Simulatable for SimInstance { target: &InstanceStateRequested, ) -> Result, Error> { match target { + InstanceStateRequested::MigrationTarget(_) => { + match self.next_resting_state() { + ApiInstanceState::Creating => { + self.propolis_queue + .push_back(PropolisInstanceState::Migrating); + self.propolis_queue + .push_back(PropolisInstanceState::Running); + } + _ => { + return Err(Error::invalid_request(&format!( + "can't request migration in with pending resting \ + state {}", + self.next_resting_state() + ))) + } + } + } InstanceStateRequested::Running => { match self.next_resting_state() { ApiInstanceState::Creating => { @@ -103,26 +121,13 @@ impl Simulatable for SimInstance { | ApiInstanceState::Running | ApiInstanceState::Rebooting | ApiInstanceState::Migrating => {} - ApiInstanceState::Stopping | ApiInstanceState::Stopped => { - // TODO: Normally, Propolis forbids direct transitions - // from a stopped state back to a running state. - // Instead, Nexus creates a new Propolis and sends state - // change requests to that. This arm abstracts this - // behavior away and just allows a fake instance to - // transition right back to a running state after being - // stopped. - // - // This will change in the future when the sled agents - // (both real and simulated) split "registering" an - // instance with the agent and actually starting it into - // separate actions. - self.state.transition(ApiInstanceState::Starting); - self.propolis_queue - .push_back(PropolisInstanceState::Starting); - self.propolis_queue - .push_back(PropolisInstanceState::Running); - } - ApiInstanceState::Repairing + + // Propolis forbids direct transitions from a stopped state + // back to a running state. Callers who want to restart a + // stopped instance must recreate it. + ApiInstanceState::Stopping + | ApiInstanceState::Stopped + | ApiInstanceState::Repairing | ApiInstanceState::Failed | ApiInstanceState::Destroyed => { return Err(Error::invalid_request(&format!( @@ -256,4 +261,8 @@ impl Simulatable for SimInstance { .map(|_| ()) .map_err(Error::from) } + + fn resource_type() -> ResourceType { + ResourceType::Instance + } } diff --git a/sled-agent/src/sim/simulatable.rs b/sled-agent/src/sim/simulatable.rs index e005d7b27f..ae1b9d59fc 100644 --- a/sled-agent/src/sim/simulatable.rs +++ b/sled-agent/src/sim/simulatable.rs @@ -6,6 +6,7 @@ use crate::nexus::NexusClient; use async_trait::async_trait; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::api::external::ResourceType; use std::fmt; use std::sync::Arc; use uuid::Uuid; @@ -116,4 +117,6 @@ pub trait Simulatable: fmt::Debug + Send + Sync { id: &Uuid, current: Self::CurrentState, ) -> Result<(), Error>; + + fn resource_type() -> ResourceType; } diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 93546e0129..231271c509 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -6,11 +6,13 @@ use crate::nexus::NexusClient; use crate::params::{ - DiskStateRequested, InstanceHardware, InstanceStateRequested, + DiskStateRequested, InstanceHardware, InstancePutStateResponse, + InstanceStateRequested, }; +use crate::sim::simulatable::Simulatable; use crate::updates::UpdateManager; use futures::lock::Mutex; -use omicron_common::api::external::{Error, ResourceType}; +use omicron_common::api::external::{DiskState, Error, ResourceType}; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::InstanceRuntimeState; use slog::Logger; @@ -216,7 +218,6 @@ impl SledAgent { self: &Arc, instance_id: Uuid, mut initial_hardware: InstanceHardware, - target: InstanceStateRequested, ) -> Result { // respond with a fake 500 level failure if asked to ensure an instance // with more than 16 CPUs. @@ -229,29 +230,24 @@ impl SledAgent { for disk in &initial_hardware.disks { let initial_state = DiskRuntimeState { - disk_state: omicron_common::api::external::DiskState::Attached( - instance_id, - ), + disk_state: DiskState::Attached(instance_id), gen: omicron_common::api::external::Generation::new(), time_updated: chrono::Utc::now(), }; - let target = match target { - InstanceStateRequested::Running - | InstanceStateRequested::Reboot => { - DiskStateRequested::Attached(instance_id) - } - InstanceStateRequested::Stopped - | InstanceStateRequested::Destroyed => { - DiskStateRequested::Detached - } - }; - + // Ensure that any disks that are in this request are attached to + // this instance. let id = match disk.volume_construction_request { propolis_client::instance_spec::VolumeConstructionRequest::Volume { id, .. } => id, _ => panic!("Unexpected construction type"), }; - self.disks.sim_ensure(&id, initial_state, target).await?; + self.disks + .sim_ensure( + &id, + initial_state, + Some(DiskStateRequested::Attached(instance_id)), + ) + .await?; self.disks .sim_ensure_producer(&id, (self.nexus_address, id)) .await?; @@ -296,27 +292,11 @@ impl SledAgent { }, )?; } - - let body = match target { - InstanceStateRequested::Running => { - propolis_client::types::InstanceStateRequested::Run - } - InstanceStateRequested::Destroyed - | InstanceStateRequested::Stopped => { - propolis_client::types::InstanceStateRequested::Stop - } - InstanceStateRequested::Reboot => { - propolis_client::types::InstanceStateRequested::Reboot - } - }; - client.instance_state_put().body(body).send().await.map_err( - |e| Error::internal_error(&format!("propolis-client: {}", e)), - )?; } let instance_run_time_state = self .instances - .sim_ensure(&instance_id, initial_hardware.runtime, target) + .sim_ensure(&instance_id, initial_hardware.runtime, None) .await?; for disk_request in &initial_hardware.disks { @@ -337,6 +317,84 @@ impl SledAgent { Ok(instance_run_time_state) } + /// Asks the supplied instance to transition to the requested state. + pub async fn instance_ensure_state( + self: &Arc, + instance_id: Uuid, + state: InstanceStateRequested, + ) -> Result { + if !self.instances.contains_key(&instance_id).await { + match state { + InstanceStateRequested::Stopped + | InstanceStateRequested::Destroyed => { + return Ok(InstancePutStateResponse { + updated_runtime: None, + }); + } + _ => { + return Err(Error::invalid_request(&format!( + "instance {} not registered on sled", + instance_id, + ))); + } + } + } + + let mock_lock = self.mock_propolis.lock().await; + if let Some((_srv, client)) = mock_lock.as_ref() { + let body = match state { + InstanceStateRequested::MigrationTarget(_) => { + return Err(Error::internal_error( + "migration not implemented for mock Propolis", + )); + } + InstanceStateRequested::Running => { + propolis_client::types::InstanceStateRequested::Run + } + InstanceStateRequested::Destroyed + | InstanceStateRequested::Stopped => { + propolis_client::types::InstanceStateRequested::Stop + } + InstanceStateRequested::Reboot => { + propolis_client::types::InstanceStateRequested::Reboot + } + }; + client.instance_state_put().body(body).send().await.map_err( + |e| Error::internal_error(&format!("propolis-client: {}", e)), + )?; + } + + let new_state = self + .instances + .sim_ensure( + &instance_id, + self.instances.sim_get_current_state(&instance_id).await?, + Some(state), + ) + .await?; + + // If this request will shut down the simulated instance, look for any + // disks that are attached to it and drive them to the Detached state. + if matches!( + state, + InstanceStateRequested::Stopped | InstanceStateRequested::Destroyed + ) { + self.disks + .sim_ensure_for_each_where( + |disk| match disk.current().disk_state { + DiskState::Attached(id) | DiskState::Attaching(id) => { + id == instance_id + } + _ => false, + }, + &DiskStateRequested::Detached, + ) + .await?; + } + + Ok(InstancePutStateResponse { updated_runtime: Some(new_state) }) + } + /// Idempotently ensures that the given API Disk (described by `api_disk`) /// is attached (or not) as specified. This simulates disk attach and /// detach, similar to instance boot and halt. @@ -346,7 +404,7 @@ impl SledAgent { initial_state: DiskRuntimeState, target: DiskStateRequested, ) -> Result { - self.disks.sim_ensure(&disk_id, initial_state, target).await + self.disks.sim_ensure(&disk_id, initial_state, Some(target)).await } pub fn updates(&self) -> &UpdateManager { diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 78358e20a5..90f00dddae 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -8,11 +8,10 @@ use crate::bootstrap::params::SledAgentRequest; use crate::config::Config; use crate::instance_manager::InstanceManager; use crate::nexus::{LazyNexusClient, NexusRequestQueue}; -use crate::params::VpcFirewallRule; use crate::params::{ DatasetKind, DiskStateRequested, InstanceHardware, - InstanceMigrationTargetParams, InstanceStateRequested, ServiceEnsureBody, - SledRole, TimeSync, Zpool, + InstancePutStateResponse, InstanceStateRequested, ServiceEnsureBody, + SledRole, TimeSync, VpcFirewallRule, Zpool, }; use crate::services::{self, ServiceManager}; use crate::storage_manager::StorageManager; @@ -525,17 +524,31 @@ impl SledAgent { Ok(()) } - /// Idempotently ensures that a given Instance is running on the sled. - pub async fn instance_ensure( + /// Idempotently ensures that a given instance is registered with this sled, + /// i.e., that it can be addressed by future calls to + /// [`instance_ensure_state`]. + pub async fn instance_ensure_registered( &self, instance_id: Uuid, initial: InstanceHardware, - target: InstanceStateRequested, - migrate: Option, ) -> Result { self.inner .instances - .ensure(instance_id, initial, target, migrate) + .ensure_registered(instance_id, initial) + .await + .map_err(|e| Error::Instance(e)) + } + + /// Idempotently drives the specified instance into the specified target + /// state. + pub async fn instance_ensure_state( + &self, + instance_id: Uuid, + target: InstanceStateRequested, + ) -> Result { + self.inner + .instances + .ensure_state(instance_id, target) .await .map_err(|e| Error::Instance(e)) }