Skip to content

Commit

Permalink
nexus: update instance networking config after live migration
Browse files Browse the repository at this point in the history
Whenever Nexus gets a new instance runtime state from a sled agent, compare the
state to the existing runtime state to see if applying the new state will update
the instance's Propolis generation. If it will, use the sled ID in the new
record to create updated OPTE V2P mappings and Dendrite NAT entries for the
instance.

Retry with backoff when sled agent fails to publish a state update to Nexus.
This was required for correctness anyway (see #2727) but is especially
important now that there are many more ways for Nexus to fail to apply a state
update. See the comments in the new code for more details.
  • Loading branch information
gjcolombo committed May 17, 2023
1 parent 891acc7 commit 66b04a6
Show file tree
Hide file tree
Showing 7 changed files with 550 additions and 153 deletions.
297 changes: 297 additions & 0 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sled_agent_client::types::InstanceStateRequested;
use sled_agent_client::types::SourceNatConfig;
use sled_agent_client::Client as SledAgentClient;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
Expand Down Expand Up @@ -993,6 +994,7 @@ impl super::Nexus {
/// Instance.
pub async fn notify_instance_updated(
&self,
opctx: &OpContext,
id: &Uuid,
new_runtime_state: &nexus::InstanceRuntimeState,
) -> Result<(), Error> {
Expand All @@ -1002,6 +1004,43 @@ impl super::Nexus {
"instance_id" => %id,
"runtime_state" => ?new_runtime_state);

// If the new state has a newer Propolis ID generation than the current
// instance state in CRDB, notify interested parties of this change.
//
// The synchronization rules here are as follows:
//
// - Sled agents own an instance's runtime state while an instance is
// running on a sled. Each sled agent prevents concurrent conflicting
// Propolis identifier updates from being sent until previous updates
// are processed.
// - Operations that can dispatch an instance to a brand-new sled (e.g.
// live migration) can only start if the appropriate instance runtime
// state fields are cleared in CRDB. For example, while a live
// migration is in progress, the instance's `migration_id` field will
// be non-NULL, and a new migration cannot start until it is cleared.
// This routine must notify recipients before writing new records
// back to CRDB so that these "locks" remain held until all
// notifications have been sent. Otherwise, Nexus might allow new
// operations to proceed that will produce system updates that might
// race with this one.
// - This work is not done in a saga. The presumption is instead that
// if any of these operations fail, the entire update will fail, and
// sled agent will retry the update.
let (.., db_instance) = LookupPath::new(&opctx, &self.db_datastore)
.instance_id(*id)
.fetch_for(authz::Action::Read)
.await?;

if new_runtime_state.propolis_gen > *db_instance.runtime().propolis_gen
{
self.handle_instance_propolis_gen_change(
opctx,
new_runtime_state,
&db_instance,
)
.await?;
}

let result = self
.db_datastore
.instance_update_runtime(id, &(new_runtime_state.clone().into()))
Expand Down Expand Up @@ -1052,6 +1091,264 @@ impl super::Nexus {
}
}

async fn handle_instance_propolis_gen_change(
&self,
opctx: &OpContext,
new_runtime: &nexus::InstanceRuntimeState,
db_instance: &nexus_db_model::Instance,
) -> Result<(), Error> {
let log = &self.log;
let instance_id = db_instance.id();

info!(log,
"updating configuration after Propolis generation change";
"instance_id" => %instance_id,
"new_sled_id" => %new_runtime.sled_id,
"old_sled_id" => %db_instance.runtime().sled_id);

// Push updated V2P mappings to all interested sleds. This needs to be
// done irrespective of whether the sled ID actually changed, because
// merely creating the target Propolis on the target sled will create
// XDE devices for its NICs, and creating an XDE device for a virtual IP
// creates a V2P mapping that maps that IP to that sled. This is fine if
// migration succeeded, but if it failed, the instance is running on the
// source sled, and the incorrect mapping needs to be replaced.
//
// TODO(#3107): When XDE no longer creates mappings implicitly, this
// can be restricted to cases where an instance's sled has actually
// changed.
self.create_instance_v2p_mappings(
opctx,
instance_id,
new_runtime.sled_id,
)
.await?;

let (.., sled) = LookupPath::new(opctx, &self.db_datastore)
.sled_id(new_runtime.sled_id)
.fetch()
.await?;

self.instance_ensure_dpd_config(
opctx,
db_instance.id(),
&sled.address(),
None,
)
.await?;

Ok(())
}

/// Ensures that the Dendrite configuration for the supplied instance is
/// up-to-date.
///
/// # Parameters
///
/// - `opctx`: An operation context that grants read and list-children
/// permissions on the identified instance.
/// - `instance_id`: The ID of the instance to act on.
/// - `sled_ip_address`: The internal IP address assigned to the sled's
/// sled agent.
/// - `ip_index_filter`: An optional filter on the index into the instance's
/// external IP array.
/// - If this is `Some(n)`, this routine configures DPD state for only the
/// Nth external IP in the collection returned from CRDB. The caller is
/// responsible for ensuring that the IP collection has stable indices
/// when making this call.
/// - If this is `None`, this routine configures DPD for all external
/// IPs.
pub(crate) async fn instance_ensure_dpd_config(
&self,
opctx: &OpContext,
instance_id: Uuid,
sled_ip_address: &std::net::SocketAddrV6,
ip_index_filter: Option<usize>,
) -> Result<(), Error> {
let log = &self.log;
let dpd_client = &self.dpd_client;

info!(log, "looking up instance's primary network interface";
"instance_id" => %instance_id);

let (.., authz_instance) = LookupPath::new(opctx, &self.db_datastore)
.instance_id(instance_id)
.lookup_for(authz::Action::ListChildren)
.await?;

let network_interface = match self
.db_datastore
.derive_guest_network_interface_info(&opctx, &authz_instance)
.await?
.into_iter()
.find(|interface| interface.primary)
{
Some(interface) => interface,
// Return early if instance does not have a primary network
// interface
None => {
info!(log, "Instance has no primary network interface";
"instance_id" => %instance_id);
return Ok(());
}
};

let mac_address =
macaddr::MacAddr6::from_str(&network_interface.mac.to_string())
.map_err(|e| {
Error::internal_error(&format!(
"failed to convert mac address: {e}"
))
})?;

let vni: u32 = network_interface.vni.into();

info!(log, "looking up instance's external IPs";
"instance_id" => %instance_id);

let ips = self
.db_datastore
.instance_lookup_external_ips(&opctx, instance_id)
.await?;

if let Some(wanted_index) = ip_index_filter {
if let None = ips.get(wanted_index) {
return Err(Error::internal_error(&format!(
"failed to find external ip address at index: {}",
wanted_index
)));
}
}

for target_ip in ips
.iter()
.enumerate()
.filter(|(index, _)| {
if let Some(wanted_index) = ip_index_filter {
*index == wanted_index
} else {
true
}
})
.map(|(_, ip)| ip)
{
info!(log, "setting up dpd for external IP";
"instance_id" => %instance_id,
"external_ip" => ?target_ip);

let existing_nat = match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_get(&network.ip(), *target_ip.first_port)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_get(&network.ip(), *target_ip.first_port)
.await
}
};

// If a NAT entry already exists, but has the wrong internal
// IP address, delete the old entry before continuing (the
// DPD entry-creation API won't replace an existing entry).
// If the entry exists and has the right internal IP, there's
// no more work to do for this external IP.
match existing_nat {
Ok(existing) => {
let existing = existing.into_inner();
if existing.internal_ip != *sled_ip_address.ip() {
info!(log, "deleting old nat entry";
"instance_id" => %instance_id,
"external_ip" => ?target_ip);

match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_delete(
&network.ip(),
*target_ip.first_port,
)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_delete(
&network.ip(),
*target_ip.first_port,
)
.await
}
}
.map_err(|e| {
Error::internal_error(&format!(
"failed to clear dpd entry: {e}"
))
})?;
} else {
info!(log,
"nat entry with expected internal ip exists, continuing";
"instance_id" => %instance_id,
"external_ip" => ?target_ip,
"existing_entry" => ?existing);

continue;
}
}
Err(e) => {
if e.status() == Some(http::StatusCode::NOT_FOUND) {
info!(log, "no nat entry found for: {target_ip:#?}");
} else {
return Err(Error::internal_error(&format!(
"failed to query dpd: {e}"
)));
}
}
}

info!(log, "creating nat entry for: {target_ip:#?}");
let nat_target = dpd_client::types::NatTarget {
inner_mac: dpd_client::types::MacAddr {
a: mac_address.into_array(),
},
internal_ip: *sled_ip_address.ip(),
vni: vni.into(),
};

match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_create(
&network.ip(),
*target_ip.first_port,
*target_ip.last_port,
&nat_target,
)
.await
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_create(
&network.ip(),
*target_ip.first_port,
*target_ip.last_port,
&nat_target,
)
.await
}
}
.map_err(|e| {
Error::internal_error(&format!(
"failed to create nat entry: {e}"
))
})?;

debug!(log, "creation of nat entry successful for: {target_ip:#?}");
}

Ok(())
}

/// Returns the requested range of serial console output bytes,
/// provided they are still in the propolis-server's cache.
pub(crate) async fn instance_serial_console_data(
Expand Down
Loading

0 comments on commit 66b04a6

Please sign in to comment.