Skip to content

Commit

Permalink
nexus: update instance networking config after live migration (#3127)
Browse files Browse the repository at this point in the history
Whenever Nexus gets a new instance runtime state from a sled agent,
compare the state to the existing runtime state to see if applying the
new state will update the instance's Propolis generation. If it will,
use the sled ID in the new record to create updated OPTE V2P mappings
and Dendrite NAT entries for the instance.

Retry with backoff when sled agent fails to publish a state update to
Nexus. This was required for correctness anyway (see #2727) but is
especially important now that there are many more ways for Nexus to fail
to apply a state update. See the comments in the new code for more
details.

In the future, it might be better to update this configuration using a
reliable persistent workflow that's triggered by Propolis location
changes. This approach will require at least some additional work in
OPTE to assign generation numbers to V2P mappings (Dendrite might have a
similar problem but I'm not as familiar with the tables Nexus is trying
to maintain in this change).
  • Loading branch information
gjcolombo committed May 17, 2023
1 parent 891acc7 commit f77cf31
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 152 deletions.
305 changes: 305 additions & 0 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sled_agent_client::types::InstanceStateRequested;
use sled_agent_client::types::SourceNatConfig;
use sled_agent_client::Client as SledAgentClient;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
Expand Down Expand Up @@ -993,6 +994,7 @@ impl super::Nexus {
/// Instance.
pub async fn notify_instance_updated(
&self,
opctx: &OpContext,
id: &Uuid,
new_runtime_state: &nexus::InstanceRuntimeState,
) -> Result<(), Error> {
Expand All @@ -1002,6 +1004,49 @@ impl super::Nexus {
"instance_id" => %id,
"runtime_state" => ?new_runtime_state);

// If the new state has a newer Propolis ID generation than the current
// instance state in CRDB, notify interested parties of this change.
//
// The synchronization rules here are as follows:
//
// - Sled agents own an instance's runtime state while an instance is
// running on a sled. Each sled agent prevents concurrent conflicting
// Propolis identifier updates from being sent until previous updates
// are processed.
// - Operations that can dispatch an instance to a brand-new sled (e.g.
// live migration) can only start if the appropriate instance runtime
// state fields are cleared in CRDB. For example, while a live
// migration is in progress, the instance's `migration_id` field will
// be non-NULL, and a new migration cannot start until it is cleared.
// This routine must notify recipients before writing new records
// back to CRDB so that these "locks" remain held until all
// notifications have been sent. Otherwise, Nexus might allow new
// operations to proceed that will produce system updates that might
// race with this one.
// - This work is not done in a saga. The presumption is instead that
// if any of these operations fail, the entire update will fail, and
// sled agent will retry the update. Unwinding on failure isn't needed
// because (a) any partially-applied configuration is correct
// configuration, (b) if the instance is migrating, it can't migrate
// again until this routine successfully updates configuration and
// writes an update back to CRDB, and (c) sled agent won't process any
// new instance state changes (e.g. a change that stops an instance)
// until this state change is successfully committed.
let (.., db_instance) = LookupPath::new(&opctx, &self.db_datastore)
.instance_id(*id)
.fetch_for(authz::Action::Read)
.await?;

if new_runtime_state.propolis_gen > *db_instance.runtime().propolis_gen
{
self.handle_instance_propolis_gen_change(
opctx,
new_runtime_state,
&db_instance,
)
.await?;
}

let result = self
.db_datastore
.instance_update_runtime(id, &(new_runtime_state.clone().into()))
Expand Down Expand Up @@ -1052,6 +1097,266 @@ impl super::Nexus {
}
}

async fn handle_instance_propolis_gen_change(
&self,
opctx: &OpContext,
new_runtime: &nexus::InstanceRuntimeState,
db_instance: &nexus_db_model::Instance,
) -> Result<(), Error> {
let log = &self.log;
let instance_id = db_instance.id();

info!(log,
"updating configuration after Propolis generation change";
"instance_id" => %instance_id,
"new_sled_id" => %new_runtime.sled_id,
"old_sled_id" => %db_instance.runtime().sled_id);

// Push updated V2P mappings to all interested sleds. This needs to be
// done irrespective of whether the sled ID actually changed, because
// merely creating the target Propolis on the target sled will create
// XDE devices for its NICs, and creating an XDE device for a virtual IP
// creates a V2P mapping that maps that IP to that sled. This is fine if
// migration succeeded, but if it failed, the instance is running on the
// source sled, and the incorrect mapping needs to be replaced.
//
// TODO(#3107): When XDE no longer creates mappings implicitly, this
// can be restricted to cases where an instance's sled has actually
// changed.
self.create_instance_v2p_mappings(
opctx,
instance_id,
new_runtime.sled_id,
)
.await?;

let (.., sled) = LookupPath::new(opctx, &self.db_datastore)
.sled_id(new_runtime.sled_id)
.fetch()
.await?;

self.instance_ensure_dpd_config(
opctx,
db_instance.id(),
&sled.address(),
None,
)
.await?;

Ok(())
}

/// Ensures that the Dendrite configuration for the supplied instance is
/// up-to-date.
///
/// # Parameters
///
/// - `opctx`: An operation context that grants read and list-children
/// permissions on the identified instance.
/// - `instance_id`: The ID of the instance to act on.
/// - `sled_ip_address`: The internal IP address assigned to the sled's
/// sled agent.
/// - `ip_index_filter`: An optional filter on the index into the instance's
/// external IP array.
/// - If this is `Some(n)`, this routine configures DPD state for only the
/// Nth external IP in the collection returned from CRDB. The caller is
/// responsible for ensuring that the IP collection has stable indices
/// when making this call.
/// - If this is `None`, this routine configures DPD for all external
/// IPs.
pub(crate) async fn instance_ensure_dpd_config(
&self,
opctx: &OpContext,
instance_id: Uuid,
sled_ip_address: &std::net::SocketAddrV6,
ip_index_filter: Option<usize>,
) -> Result<(), Error> {
let log = &self.log;
let dpd_client = &self.dpd_client;

info!(log, "looking up instance's primary network interface";
"instance_id" => %instance_id);

let (.., authz_instance) = LookupPath::new(opctx, &self.db_datastore)
.instance_id(instance_id)
.lookup_for(authz::Action::ListChildren)
.await?;

// All external IPs map to the primary network interface, so find that
// interface. If there is no such interface, there's no way to route
// traffic destined to those IPs, so there's nothing to configure and
// it's safe to return early.
let network_interface = match self
.db_datastore
.derive_guest_network_interface_info(&opctx, &authz_instance)
.await?
.into_iter()
.find(|interface| interface.primary)
{
Some(interface) => interface,
None => {
info!(log, "Instance has no primary network interface";
"instance_id" => %instance_id);
return Ok(());
}
};

let mac_address =
macaddr::MacAddr6::from_str(&network_interface.mac.to_string())
.map_err(|e| {
Error::internal_error(&format!(
"failed to convert mac address: {e}"
))
})?;

let vni: u32 = network_interface.vni.into();

info!(log, "looking up instance's external IPs";
"instance_id" => %instance_id);

let ips = self
.db_datastore
.instance_lookup_external_ips(&opctx, instance_id)
.await?;

if let Some(wanted_index) = ip_index_filter {
if let None = ips.get(wanted_index) {
return Err(Error::internal_error(&format!(
"failed to find external ip address at index: {}",
wanted_index
)));
}
}

for target_ip in ips
.iter()
.enumerate()
.filter(|(index, _)| {
if let Some(wanted_index) = ip_index_filter {
*index == wanted_index
} else {
true
}
})
.map(|(_, ip)| ip)
{
info!(log, "setting up dpd for external IP";
"instance_id" => %instance_id,
"external_ip" => ?target_ip);

let existing_nat = match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_get(&network.ip(), *target_ip.first_port)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_get(&network.ip(), *target_ip.first_port)
.await
}
};

// If a NAT entry already exists, but has the wrong internal
// IP address, delete the old entry before continuing (the
// DPD entry-creation API won't replace an existing entry).
// If the entry exists and has the right internal IP, there's
// no more work to do for this external IP.
match existing_nat {
Ok(existing) => {
let existing = existing.into_inner();
if existing.internal_ip != *sled_ip_address.ip() {
info!(log, "deleting old nat entry";
"instance_id" => %instance_id,
"external_ip" => ?target_ip);

match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_delete(
&network.ip(),
*target_ip.first_port,
)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_delete(
&network.ip(),
*target_ip.first_port,
)
.await
}
}
.map_err(|e| {
Error::internal_error(&format!(
"failed to clear dpd entry: {e}"
))
})?;
} else {
info!(log,
"nat entry with expected internal ip exists, continuing";
"instance_id" => %instance_id,
"external_ip" => ?target_ip,
"existing_entry" => ?existing);

continue;
}
}
Err(e) => {
if e.status() == Some(http::StatusCode::NOT_FOUND) {
info!(log, "no nat entry found for: {target_ip:#?}");
} else {
return Err(Error::internal_error(&format!(
"failed to query dpd: {e}"
)));
}
}
}

info!(log, "creating nat entry for: {target_ip:#?}");
let nat_target = dpd_client::types::NatTarget {
inner_mac: dpd_client::types::MacAddr {
a: mac_address.into_array(),
},
internal_ip: *sled_ip_address.ip(),
vni: vni.into(),
};

match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_create(
&network.ip(),
*target_ip.first_port,
*target_ip.last_port,
&nat_target,
)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_create(
&network.ip(),
*target_ip.first_port,
*target_ip.last_port,
&nat_target,
)
.await
}
}
.map_err(|e| {
Error::internal_error(&format!(
"failed to create nat entry: {e}"
))
})?;

info!(log, "creation of nat entry successful for: {target_ip:#?}");
}

Ok(())
}

/// Returns the requested range of serial console output bytes,
/// provided they are still in the propolis-server's cache.
pub(crate) async fn instance_serial_console_data(
Expand Down
Loading

0 comments on commit f77cf31

Please sign in to comment.