diff --git a/Cargo.lock b/Cargo.lock index 88b9029aa93..e862a10ebfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7173,6 +7173,7 @@ dependencies = [ "omicron-workspace-hack", "pq-sys", "slog", + "tokio", "uuid", ] diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index cfbc140bd3e..b39d05ca480 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -56,6 +56,7 @@ use nexus_types::internal_api::background::BlueprintRendezvousStatus; use nexus_types::internal_api::background::EreporterStatus; use nexus_types::internal_api::background::InstanceReincarnationStatus; use nexus_types::internal_api::background::InstanceUpdaterStatus; +use nexus_types::internal_api::background::InventoryLoadStatus; use nexus_types::internal_api::background::LookupRegionPortStatus; use nexus_types::internal_api::background::ReadOnlyRegionReplacementStartStatus; use nexus_types::internal_api::background::RegionReplacementDriverStatus; @@ -1158,6 +1159,9 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { "inventory_collection" => { print_task_inventory_collection(details); } + "inventory_loader" => { + print_task_inventory_load(details); + } "lookup_region_port" => { print_task_lookup_region_port(details); } @@ -1971,6 +1975,37 @@ fn print_task_inventory_collection(details: &serde_json::Value) { }; } +fn print_task_inventory_load(details: &serde_json::Value) { + match serde_json::from_value::(details.clone()) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(status) => match status { + InventoryLoadStatus::Error(error) => { + println!(" task did not complete successfully: {error}"); + } + InventoryLoadStatus::NoCollections => { + println!(" no collections available to load"); + } + InventoryLoadStatus::Loaded { + collection_id, + time_started, + time_loaded, + } => { + println!( + " loaded latest inventory collection as of {}:", + humantime::format_rfc3339_millis(time_loaded.into()) + ); + println!( + " collection {collection_id}, taken at {}", + humantime::format_rfc3339_millis(time_started.into()), + ); + } + }, + }; +} + fn print_task_lookup_region_port(details: &serde_json::Value) { match serde_json::from_value::(details.clone()) { Ok(LookupRegionPortStatus { found_port_ok, errors }) => { diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index b1dae6cf103..6cfcee3e39b 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -116,6 +116,10 @@ task: "inventory_collection" collects hardware and software inventory data from the whole system +task: "inventory_loader" + loads the latest inventory collection from the DB + + task: "lookup_region_port" fill in missing ports for region records @@ -328,6 +332,10 @@ task: "inventory_collection" collects hardware and software inventory data from the whole system +task: "inventory_loader" + loads the latest inventory collection from the DB + + task: "lookup_region_port" fill in missing ports for region records @@ -527,6 +535,10 @@ task: "inventory_collection" collects hardware and software inventory data from the whole system +task: "inventory_loader" + loads the latest inventory collection from the DB + + task: "lookup_region_port" fill in missing ports for region records diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index dbe9411c0ee..1eed12b2fbb 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -351,6 +351,10 @@ task: "inventory_collection" collects hardware and software inventory data from the whole system +task: "inventory_loader" + loads the latest inventory collection from the DB + + task: "lookup_region_port" fill in missing ports for region records @@ -649,6 +653,13 @@ task: "inventory_collection" last collection started: last collection done: +task: "inventory_loader" + configured period: every s + last completed activation: , triggered by + started at (s ago) and ran for ms + loaded latest inventory collection as of : + collection ....................., taken at + task: "lookup_region_port" configured period: every m last completed activation: , triggered by @@ -1177,6 +1188,13 @@ task: "inventory_collection" last collection started: last collection done: +task: "inventory_loader" + configured period: every s + last completed activation: , triggered by + started at (s ago) and ran for ms + loaded latest inventory collection as of : + collection ....................., taken at + task: "lookup_region_port" configured period: every m last completed activation: , triggered by diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a352cbe351d..f39c80e6397 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -562,13 +562,25 @@ pub struct SwitchPortSettingsManagerConfig { #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct InventoryConfig { - /// period (in seconds) for periodic activations of this background task + /// period (in seconds) for periodic activations of the background task to + /// load the latest inventory collection + /// + /// Each activation runs a fast query to check whether there is a new + /// collection, and only follows up with the set of queries required to load + /// its contents if there's been a change. This period should be pretty + /// aggressive to ensure consumers are usually acting on the latest + /// collection. + #[serde_as(as = "DurationSeconds")] + pub period_secs_load: Duration, + + /// period (in seconds) for periodic activations of the background task to + /// collect inventory /// /// Each activation fetches information about all hardware and software in /// the system and inserts it into the database. This generates a moderate /// amount of data. #[serde_as(as = "DurationSeconds")] - pub period_secs: Duration, + pub period_secs_collect: Duration, /// maximum number of past collections to keep in the database /// @@ -580,7 +592,7 @@ pub struct InventoryConfig { /// /// This is an emergency lever for support / operations. It should never be /// necessary. - pub disable: bool, + pub disable_collect: bool, } #[serde_as] @@ -1109,9 +1121,10 @@ mod test { external_endpoints.period_secs = 9 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 - inventory.period_secs = 10 - inventory.nkeep = 11 - inventory.disable = false + inventory.period_secs_load = 10 + inventory.period_secs_collect = 11 + inventory.nkeep = 12 + inventory.disable_collect = false support_bundle_collector.period_secs = 30 physical_disk_adoption.period_secs = 30 decommissioned_disk_cleaner.period_secs = 30 @@ -1274,9 +1287,10 @@ mod test { period_secs: Duration::from_secs(30), }, inventory: InventoryConfig { - period_secs: Duration::from_secs(10), - nkeep: 11, - disable: false, + period_secs_load: Duration::from_secs(10), + period_secs_collect: Duration::from_secs(11), + nkeep: 12, + disable_collect: false, }, support_bundle_collector: SupportBundleCollectorConfig { @@ -1448,9 +1462,10 @@ mod test { external_endpoints.period_secs = 9 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 - inventory.period_secs = 10 + inventory.period_secs_load = 10 + inventory.period_secs_collect = 10 inventory.nkeep = 3 - inventory.disable = false + inventory.disable_collect = false support_bundle_collector.period_secs = 30 physical_disk_adoption.period_secs = 30 decommissioned_disk_cleaner.period_secs = 30 diff --git a/nexus/background-task-interface/src/init.rs b/nexus/background-task-interface/src/init.rs index f0bf7766d15..bc71033bf3b 100644 --- a/nexus/background-task-interface/src/init.rs +++ b/nexus/background-task-interface/src/init.rs @@ -17,6 +17,7 @@ pub struct BackgroundTasks { pub task_nat_cleanup: Activator, pub task_bfd_manager: Activator, pub task_inventory_collection: Activator, + pub task_inventory_loader: Activator, pub task_support_bundle_collector: Activator, pub task_physical_disk_adoption: Activator, pub task_decommissioned_disk_cleaner: Activator, diff --git a/nexus/db-queries/src/db/datastore/inventory.rs b/nexus/db-queries/src/db/datastore/inventory.rs index 04949d241a8..5899cbedecd 100644 --- a/nexus/db-queries/src/db/datastore/inventory.rs +++ b/nexus/db-queries/src/db/datastore/inventory.rs @@ -2329,16 +2329,18 @@ impl DataStore { }) } - /// Attempt to read the latest collection. + /// Attempt to get the ID of the latest collection. /// /// If there aren't any collections, return `Ok(None)`. - pub async fn inventory_get_latest_collection( + pub async fn inventory_get_latest_collection_id( &self, opctx: &OpContext, - ) -> Result, Error> { + ) -> Result, Error> { + use nexus_db_schema::schema::inv_collection::dsl; + opctx.authorize(authz::Action::Read, &authz::INVENTORY).await?; let conn = self.pool_connection_authorized(opctx).await?; - use nexus_db_schema::schema::inv_collection::dsl; + let collection_id = dsl::inv_collection .select(dsl::id) .order_by(dsl::time_started.desc()) @@ -2347,17 +2349,23 @@ impl DataStore { .optional() .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; - let Some(collection_id) = collection_id else { + Ok(collection_id.map(CollectionUuid::from_untyped_uuid)) + } + + /// Attempt to read the latest collection. + /// + /// If there aren't any collections, return `Ok(None)`. + pub async fn inventory_get_latest_collection( + &self, + opctx: &OpContext, + ) -> Result, Error> { + let Some(collection_id) = + self.inventory_get_latest_collection_id(opctx).await? + else { return Ok(None); }; - Ok(Some( - self.inventory_collection_read( - opctx, - CollectionUuid::from_untyped_uuid(collection_id), - ) - .await?, - )) + Ok(Some(self.inventory_collection_read(opctx, collection_id).await?)) } /// Attempt to read the current collection diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index 9b24578215c..fd1d48c022d 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -114,13 +114,16 @@ metrics_producer_gc.period_secs = 60 external_endpoints.period_secs = 60 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 +# How frequently to check for a new inventory collection (made by any Nexus). +# This is cheap, so we should check frequently. +inventory.period_secs_load = 15 # How frequently to collect hardware/software inventory from the whole system # (even if we don't have reason to believe anything has changed). -inventory.period_secs = 600 +inventory.period_secs_collect = 600 # Maximum number of past collections to keep in the database inventory.nkeep = 5 # Disable inventory collection altogether (for emergencies) -inventory.disable = false +inventory.disable_collect = false phantom_disks.period_secs = 30 physical_disk_adoption.period_secs = 30 support_bundle_collector.period_secs = 30 diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index 3a3d3ec294c..9b88eba8317 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -98,13 +98,16 @@ metrics_producer_gc.period_secs = 60 external_endpoints.period_secs = 60 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 +# How frequently to check for a new inventory collection (made by any Nexus). +# This is cheap, so we should check frequently. +inventory.period_secs_load = 15 # How frequently to collect hardware/software inventory from the whole system # (even if we don't have reason to believe anything has changed). -inventory.period_secs = 600 +inventory.period_secs_collect = 600 # Maximum number of past collections to keep in the database inventory.nkeep = 5 # Disable inventory collection altogether (for emergencies) -inventory.disable = false +inventory.disable_collect = false phantom_disks.period_secs = 30 physical_disk_adoption.period_secs = 30 support_bundle_collector.period_secs = 30 diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 10fe0e74944..56a1ad59625 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -107,6 +107,7 @@ use super::tasks::instance_reincarnation; use super::tasks::instance_updater; use super::tasks::instance_watcher; use super::tasks::inventory_collection; +use super::tasks::inventory_load; use super::tasks::lookup_region_port; use super::tasks::metrics_producer_gc; use super::tasks::nat_cleanup; @@ -144,6 +145,7 @@ use nexus_db_queries::db::DataStore; use nexus_types::deployment::Blueprint; use nexus_types::deployment::BlueprintTarget; use nexus_types::deployment::PendingMgsUpdates; +use nexus_types::inventory::Collection; use omicron_uuid_kinds::OmicronZoneUuid; use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; @@ -160,6 +162,17 @@ use uuid::Uuid; pub(crate) struct BackgroundTasksInternal { pub(crate) external_endpoints: watch::Receiver>, + inventory_load_rx: watch::Receiver>>, +} + +impl BackgroundTasksInternal { + pub(crate) fn inventory_load_rx( + &self, + ) -> watch::Receiver>> { + let mut rx = self.inventory_load_rx.clone(); + rx.mark_unchanged(); + rx + } } /// Initializes the background task subsystem @@ -171,6 +184,7 @@ pub struct BackgroundTasksInitializer { driver: Driver, external_endpoints_tx: watch::Sender>, + inventory_load_tx: watch::Sender>>, } impl BackgroundTasksInitializer { @@ -187,10 +201,12 @@ impl BackgroundTasksInitializer { { let (external_endpoints_tx, external_endpoints_rx) = watch::channel(None); + let (inventory_load_tx, inventory_load_rx) = watch::channel(None); let initializer = BackgroundTasksInitializer { driver: Driver::new(), external_endpoints_tx, + inventory_load_tx, }; let background_tasks = BackgroundTasks { @@ -203,6 +219,7 @@ impl BackgroundTasksInitializer { task_nat_cleanup: Activator::new(), task_bfd_manager: Activator::new(), task_inventory_collection: Activator::new(), + task_inventory_loader: Activator::new(), task_support_bundle_collector: Activator::new(), task_physical_disk_adoption: Activator::new(), task_decommissioned_disk_cleaner: Activator::new(), @@ -244,6 +261,7 @@ impl BackgroundTasksInitializer { let internal = BackgroundTasksInternal { external_endpoints: external_endpoints_rx, + inventory_load_rx, }; (initializer, background_tasks, internal) @@ -282,6 +300,7 @@ impl BackgroundTasksInitializer { task_nat_cleanup, task_bfd_manager, task_inventory_collection, + task_inventory_loader, task_support_bundle_collector, task_physical_disk_adoption, task_decommissioned_disk_cleaner, @@ -463,14 +482,14 @@ impl BackgroundTasksInitializer { // This depends on the "output" of the blueprint executor in // order to automatically trigger inventory collection whenever the // blueprint executor runs. - let inventory_watcher = { + let inventory_collect_watcher = { let collector = inventory_collection::InventoryCollector::new( &opctx, datastore.clone(), resolver.clone(), &nexus_id.to_string(), config.inventory.nkeep, - config.inventory.disable, + config.inventory.disable_collect, ); let inventory_watcher = collector.watcher(); driver.register(TaskDefinition { @@ -478,7 +497,7 @@ impl BackgroundTasksInitializer { description: "collects hardware and software inventory data from the \ whole system", - period: config.inventory.period_secs, + period: config.inventory.period_secs_collect, task_impl: Box::new(collector), opctx: opctx.child(BTreeMap::new()), watchers: vec![Box::new(rx_blueprint_exec.clone())], @@ -488,6 +507,22 @@ impl BackgroundTasksInitializer { inventory_watcher }; + // Background task: inventory loader + let inventory_loader = inventory_load::InventoryLoader::new( + datastore.clone(), + self.inventory_load_tx, + ); + driver.register(TaskDefinition { + name: "inventory_loader", + description: "loads the latest inventory collection from the DB", + period: config.inventory.period_secs_load, + task_impl: Box::new(inventory_loader), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(inventory_collect_watcher.clone())], + activator: task_inventory_loader, + }); + + // Background task: reconfigurator config loader let reconfigurator_config_loader = ReconfiguratorConfigLoader::new(datastore.clone()); let reconfigurator_config_watcher = @@ -509,7 +544,7 @@ impl BackgroundTasksInitializer { let blueprint_planner = blueprint_planner::BlueprintPlanner::new( datastore.clone(), reconfigurator_config_watcher.clone(), - inventory_watcher.clone(), + inventory_collect_watcher.clone(), rx_blueprint.clone(), ); let rx_planner = blueprint_planner.watcher(); @@ -520,7 +555,7 @@ impl BackgroundTasksInitializer { task_impl: Box::new(blueprint_planner), opctx: opctx.child(BTreeMap::new()), watchers: vec![ - Box::new(inventory_watcher.clone()), + Box::new(inventory_collect_watcher.clone()), Box::new(rx_blueprint.clone()), Box::new(reconfigurator_config_watcher), ], @@ -585,13 +620,13 @@ impl BackgroundTasksInitializer { task_impl: Box::new( physical_disk_adoption::PhysicalDiskAdoption::new( datastore.clone(), - inventory_watcher.clone(), + inventory_collect_watcher.clone(), config.physical_disk_adoption.disable, rack_id, ), ), opctx: opctx.child(BTreeMap::new()), - watchers: vec![Box::new(inventory_watcher.clone())], + watchers: vec![Box::new(inventory_collect_watcher.clone())], activator: task_physical_disk_adoption, }); @@ -609,7 +644,7 @@ impl BackgroundTasksInitializer { ), ), opctx: opctx.child(BTreeMap::new()), - watchers: vec![Box::new(inventory_watcher.clone())], + watchers: vec![Box::new(inventory_collect_watcher.clone())], activator: task_blueprint_rendezvous, }); diff --git a/nexus/src/app/background/status.rs b/nexus/src/app/background/status.rs index fae66942f44..b308c41f6f2 100644 --- a/nexus/src/app/background/status.rs +++ b/nexus/src/app/background/status.rs @@ -9,12 +9,15 @@ use crate::Nexus; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_types::internal_api::views::BackgroundTask; +use nexus_types::inventory::Collection; use omicron_common::api::external::Error; use omicron_common::api::external::LookupResult; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::sync::Arc; +use tokio::sync::watch; impl Nexus { pub(crate) async fn bgtasks_list( @@ -90,6 +93,12 @@ impl Nexus { Ok(()) } + pub(crate) fn inventory_load_rx( + &self, + ) -> watch::Receiver>> { + self.background_tasks_internal.inventory_load_rx() + } + fn driver(&self) -> Result<&Driver, Error> { self.background_tasks_driver.get().ok_or_else(|| { Error::unavail("background tasks not yet initialized") diff --git a/nexus/src/app/background/tasks/inventory_load.rs b/nexus/src/app/background/tasks/inventory_load.rs new file mode 100644 index 00000000000..3b28d1b5601 --- /dev/null +++ b/nexus/src/app/background/tasks/inventory_load.rs @@ -0,0 +1,268 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for loading the latest inventory collection from the DB + +use crate::app::background::BackgroundTask; +use chrono::Utc; +use futures::future::BoxFuture; +use nexus_auth::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::internal_api::background::InventoryLoadStatus; +use nexus_types::inventory::Collection; +use serde_json::json; +use slog_error_chain::InlineErrorChain; +use std::sync::Arc; +use tokio::sync::watch; + +pub struct InventoryLoader { + datastore: Arc, + // We store an `Arc` in this channel instead of just a + // `Collection` so that cloning it is cheap: we want callers to just grab a + // snapshot of the collection to do whatever they're going to do without + // holding the lock on the channel. + tx: watch::Sender>>, +} + +impl BackgroundTask for InventoryLoader { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + Box::pin(async { + let status = self.load_if_needed(opctx).await; + match serde_json::to_value(status) { + Ok(val) => val, + Err(err) => { + let err = format!( + "could not serialize task status: {}", + InlineErrorChain::new(&err) + ); + json!({ "error": err }) + } + } + }) + } +} + +impl InventoryLoader { + pub fn new( + datastore: Arc, + tx: watch::Sender>>, + ) -> Self { + Self { datastore, tx } + } + + async fn load_if_needed(&self, opctx: &OpContext) -> InventoryLoadStatus { + // Set up a logger for this activation that includes metadata about + // the current target. + let (old, log) = match &*self.tx.borrow() { + None => (None, opctx.log.clone()), + Some(old) => { + let log = opctx.log.new(slog::o!( + "original_id" => old.id.to_string(), + "original_time_started" => old.time_started.to_string(), + )); + (Some((old.id, old.time_started)), log) + } + }; + + // Get the ID of the latest collection. + let time_loaded = Utc::now(); + let latest_id = match self + .datastore + .inventory_get_latest_collection_id(opctx) + .await + { + Ok(Some(id)) => id, + Ok(None) => match old { + Some((old_id, _)) => { + // We should never go from "some collection" to "no + // collections"; pruning should always keep a small number + // of old collections around until we have new ones to + // replace them. + // + // In this case we won't replace our channel contents with + // `None`; we'll keep around whatever old collection we had + // loaded. + warn!( + log, + "previously had a collection, but now none exist" + ); + return InventoryLoadStatus::Error(format!( + "previously loaded collection {old_id}, but now \ + no collections exist" + )); + } + None => { + // Had no collections; still have no collections. + return InventoryLoadStatus::NoCollections; + } + }, + Err(err) => { + let err = InlineErrorChain::new(&err); + warn!( + log, + "failed to read latest inventory collection ID"; + &err + ); + return InventoryLoadStatus::Error(format!( + "failed to read latest inventory collection ID: {err}" + )); + } + }; + + // Have we already loaded this collection? + match old { + Some((old_id, old_time_started)) if old_id == latest_id => { + debug!(log, "latest inventory collection is unchanged"); + return InventoryLoadStatus::Loaded { + collection_id: old_id, + time_started: old_time_started, + time_loaded, + }; + } + _ => (), + } + + // It's new - load the full collection. + let collection = match self + .datastore + .inventory_collection_read(opctx, latest_id) + .await + { + Ok(collection) => collection, + Err(err) => { + let err = InlineErrorChain::new(&err); + warn!( + log, + "failed to read inventory collection {latest_id}"; + &err + ); + return InventoryLoadStatus::Error(format!( + "failed to read inventory collection {latest_id}: {err}" + )); + } + }; + + let new_id = collection.id; + let new_time_started = collection.time_started; + let collection = Arc::new(collection); + self.tx.send_modify(|c| { + *c = Some(collection); + }); + + InventoryLoadStatus::Loaded { + collection_id: new_id, + time_started: new_time_started, + time_loaded, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use nexus_db_queries::db::pub_test_utils::TestDatabase; + use nexus_inventory::CollectionBuilder; + use omicron_test_utils::dev; + + #[tokio::test] + async fn test_inventory_loader() { + let logctx = dev::test_setup_log("test_inventory_loader"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (tx, mut rx) = watch::channel(None); + let loader = InventoryLoader::new(datastore.clone(), tx); + + // Initial state is `None` + assert_eq!(*rx.borrow_and_update(), None); + + // We haven't inserted any collections. + let status = loader.load_if_needed(opctx).await; + assert_eq!(status, InventoryLoadStatus::NoCollections); + assert!(!rx.has_changed().unwrap()); + + // Insert a collection and activate; we should load it. + let coll0 = Arc::new(CollectionBuilder::new("test").build()); + datastore + .inventory_insert_collection(opctx, &coll0) + .await + .expect("inserted collection"); + let status = loader.load_if_needed(opctx).await; + let first_time_loaded = match status { + InventoryLoadStatus::Loaded { + collection_id, + time_started, + time_loaded, + } => { + assert_eq!(collection_id, coll0.id); + assert_eq!(time_started, coll0.time_started); + time_loaded + } + InventoryLoadStatus::Error(_) + | InventoryLoadStatus::NoCollections => { + panic!("unexpected status: {status:?}") + } + }; + assert!(rx.has_changed().unwrap()); + assert_eq!(*rx.borrow_and_update(), Some(coll0.clone())); + + // Activating again should result in a later `time_loaded` but still the + // same collection. + let status = loader.load_if_needed(opctx).await; + match status { + InventoryLoadStatus::Loaded { + collection_id, + time_started, + time_loaded, + } => { + assert_eq!(collection_id, coll0.id); + assert_eq!(time_started, coll0.time_started); + assert!(time_loaded > first_time_loaded); + } + InventoryLoadStatus::Error(_) + | InventoryLoadStatus::NoCollections => { + panic!("unexpected status: {status:?}") + } + } + assert!(!rx.has_changed().unwrap()); + + // Insert two more collections. + let coll1 = CollectionBuilder::new("test").build(); + datastore + .inventory_insert_collection(opctx, &coll1) + .await + .expect("inserted collection"); + let coll2 = Arc::new(CollectionBuilder::new("test").build()); + datastore + .inventory_insert_collection(opctx, &coll2) + .await + .expect("inserted collection"); + + // Activating should find the latest. + let status = loader.load_if_needed(opctx).await; + match status { + InventoryLoadStatus::Loaded { + collection_id, + time_started, + time_loaded: _, + } => { + assert_eq!(collection_id, coll2.id); + assert_eq!(time_started, coll2.time_started); + } + InventoryLoadStatus::Error(_) + | InventoryLoadStatus::NoCollections => { + panic!("unexpected status: {status:?}") + } + } + assert!(rx.has_changed().unwrap()); + assert_eq!(*rx.borrow_and_update(), Some(coll2.clone())); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index ac4d4ccc875..6ec34c5b2b8 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -22,6 +22,7 @@ pub mod instance_reincarnation; pub mod instance_updater; pub mod instance_watcher; pub mod inventory_collection; +pub mod inventory_load; pub mod lookup_region_port; pub mod metrics_producer_gc; pub mod nat_cleanup; diff --git a/nexus/src/lib.rs b/nexus/src/lib.rs index eddb0520761..b41a525e281 100644 --- a/nexus/src/lib.rs +++ b/nexus/src/lib.rs @@ -52,6 +52,7 @@ use slog::Logger; use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV6}; use std::sync::Arc; +use tokio::sync::watch; #[macro_use] extern crate slog; @@ -401,6 +402,10 @@ impl nexus_test_interface::NexusServer for Server { self.apictx.context.nexus.datastore() } + fn inventory_load_rx(&self) -> watch::Receiver>> { + self.apictx.context.nexus.inventory_load_rx() + } + async fn get_http_server_external_address(&self) -> SocketAddr { self.apictx.context.nexus.get_external_server_address().await.unwrap() } diff --git a/nexus/test-interface/Cargo.toml b/nexus/test-interface/Cargo.toml index 66b3275095d..837786d678e 100644 --- a/nexus/test-interface/Cargo.toml +++ b/nexus/test-interface/Cargo.toml @@ -20,6 +20,7 @@ omicron-common.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" slog.workspace = true +tokio.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true omicron-uuid-kinds.workspace = true diff --git a/nexus/test-interface/src/lib.rs b/nexus/test-interface/src/lib.rs index 361daece4cf..34caf173bdb 100644 --- a/nexus/test-interface/src/lib.rs +++ b/nexus/test-interface/src/lib.rs @@ -45,6 +45,7 @@ use omicron_uuid_kinds::DatasetUuid; use slog::Logger; use std::net::{SocketAddr, SocketAddrV6}; use std::sync::Arc; +use tokio::sync::watch; #[async_trait] pub trait NexusServer: Send + Sync + 'static { @@ -85,6 +86,8 @@ pub trait NexusServer: Send + Sync + 'static { fn datastore(&self) -> &Arc; + fn inventory_load_rx(&self) -> watch::Receiver>>; + async fn get_http_server_external_address(&self) -> SocketAddr; async fn get_http_server_techport_address(&self) -> SocketAddr; async fn get_http_server_internal_address(&self) -> SocketAddr; diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 37640ab2e8a..ae50331f13a 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -35,7 +35,6 @@ use nexus_config::InternalDns; use nexus_config::MgdConfig; use nexus_config::NUM_INITIAL_RESERVED_IP_ADDRESSES; use nexus_config::NexusConfig; -use nexus_db_queries::context::OpContext; use nexus_db_queries::db::pub_test_utils::crdb; use nexus_sled_agent_shared::inventory::HostPhase2DesiredSlots; use nexus_sled_agent_shared::inventory::OmicronSledConfig; @@ -88,6 +87,7 @@ use omicron_common::zpool_name::ZpoolName; use omicron_sled_agent::sim; use omicron_test_utils::dev; use omicron_test_utils::dev::poll; +use omicron_test_utils::dev::poll::wait_for_watch_channel_condition; use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; use omicron_uuid_kinds::BlueprintUuid; use omicron_uuid_kinds::DatasetUuid; @@ -104,7 +104,6 @@ use sled_agent_client::types::EarlyNetworkConfig; use sled_agent_client::types::EarlyNetworkConfigBody; use sled_agent_client::types::RackNetworkConfigV2; use slog::{Logger, debug, error, o}; -use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::collections::HashMap; use std::fmt::Debug; @@ -247,20 +246,18 @@ impl ControlPlaneTestContext { &self, timeout: Duration, ) { - let datastore = self.server.datastore(); - let opctx = - OpContext::for_tests(self.logctx.log.clone(), datastore.clone()); - - match wait_for_condition( - || async { - match datastore.inventory_get_latest_collection(&opctx).await { - Ok(Some(_)) => Ok(()), - Ok(None) => Err(CondCheckError::NotYet), - Err(err) => Err(CondCheckError::Failed(err)), + let mut inv_rx = self.server.inventory_load_rx(); + + match wait_for_watch_channel_condition( + &mut inv_rx, + async |inv| { + if inv.is_some() { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) } }, - &Duration::from_millis(500), - &timeout, + timeout, ) .await { @@ -268,11 +265,8 @@ impl ControlPlaneTestContext { Err(poll::Error::TimedOut(elapsed)) => { panic!("no inventory collection found within {elapsed:?}"); } - Err(poll::Error::PermanentError(err)) => { - panic!( - "failed waiting for inventory collection: {}", - InlineErrorChain::new(&err) - ); + Err(poll::Error::PermanentError(())) => { + unreachable!("check can only fail via timeout") } } } diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 7f165286eee..493d9180bd0 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -97,13 +97,16 @@ metrics_producer_gc.period_secs = 60 external_endpoints.period_secs = 60 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 +# How frequently to check for a new inventory collection (made by any Nexus). +# This is cheap, so we should check frequently. +inventory.period_secs_load = 15 # How frequently to collect hardware/software inventory from the whole system # (even if we don't have reason to believe anything has changed). -inventory.period_secs = 600 +inventory.period_secs_collect = 600 # Maximum number of past collections to keep in the database inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) -inventory.disable = false +inventory.disable_collect = false phantom_disks.period_secs = 30 physical_disk_adoption.period_secs = 30 # Disable automatic disk adoption to avoid interfering with tests. diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index d363202eeb3..eb9a965c6d2 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -593,6 +593,23 @@ impl slog::KV for DebugDatasetsRendezvousStats { } } +/// The status of an `inventory_load` background task activation. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub enum InventoryLoadStatus { + /// An error occurred. + Error(String), + + /// We have no collections. + NoCollections, + + /// We've loaded the most recent collection as of `time_loaded`. + Loaded { + collection_id: CollectionUuid, + time_started: DateTime, + time_loaded: DateTime, + }, +} + /// The status of a `blueprint_planner` background task activation. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub enum BlueprintPlannerStatus { diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 51ec021af3f..cc4b85c7c2d 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -40,13 +40,16 @@ metrics_producer_gc.period_secs = 60 external_endpoints.period_secs = 60 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 +# How frequently to check for a new inventory collection (made by any Nexus). +# This is cheap, so we should check frequently. +inventory.period_secs_load = 15 # How frequently to collect hardware/software inventory from the whole system # (even if we don't have reason to believe anything has changed). -inventory.period_secs = 600 +inventory.period_secs_collect = 600 # Maximum number of past collections to keep in the database inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) -inventory.disable = false +inventory.disable_collect = false phantom_disks.period_secs = 30 physical_disk_adoption.period_secs = 30 support_bundle_collector.period_secs = 30 diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index e8e0eea2d6c..98a77f48527 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -40,13 +40,16 @@ metrics_producer_gc.period_secs = 60 external_endpoints.period_secs = 60 nat_cleanup.period_secs = 30 bfd_manager.period_secs = 30 +# How frequently to check for a new inventory collection (made by any Nexus). +# This is cheap, so we should check frequently. +inventory.period_secs_load = 15 # How frequently to collect hardware/software inventory from the whole system # (even if we don't have reason to believe anything has changed). -inventory.period_secs = 600 +inventory.period_secs_collect = 600 # Maximum number of past collections to keep in the database inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) -inventory.disable = false +inventory.disable_collect = false phantom_disks.period_secs = 30 physical_disk_adoption.period_secs = 30 support_bundle_collector.period_secs = 30