diff --git a/common/src/sql/dbinit.sql b/common/src/sql/dbinit.sql index 3b7133f5af2..3c396799f69 100644 --- a/common/src/sql/dbinit.sql +++ b/common/src/sql/dbinit.sql @@ -128,6 +128,64 @@ CREATE INDEX ON omicron.public.service ( sled_id ); +-- A table describing virtual resource provisioning which may be associated +-- with a collection of objects, including: +-- - Projects +-- - Organizations +-- - Silos +-- - Fleet +CREATE TABLE omicron.public.virtual_provisioning_collection ( + -- Should match the UUID of the corresponding collection. + id UUID PRIMARY KEY, + time_modified TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Identifies the type of the collection. + collection_type STRING(63) NOT NULL, + + -- The amount of physical disk space which has been provisioned + -- on behalf of the collection. + virtual_disk_bytes_provisioned INT8 NOT NULL, + + -- The number of CPUs provisioned by VMs. + cpus_provisioned INT8 NOT NULL, + + -- The amount of RAM provisioned by VMs. + ram_provisioned INT8 NOT NULL +); + +-- A table describing a single virtual resource which has been provisioned. +-- This may include: +-- - Disks +-- - Instances +-- - Snapshots +-- +-- NOTE: You might think to yourself: "This table looks an awful lot like +-- the 'virtual_provisioning_collection' table, could they be condensed into +-- a single table?" +-- The answer to this question is unfortunately: "No". We use CTEs to both +-- UPDATE the collection table while INSERTing rows in the resource table, and +-- this would not be allowed if they came from the same table due to: +-- https://www.cockroachlabs.com/docs/v22.2/known-limitations#statements-containing-multiple-modification-subqueries-of-the-same-table-are-disallowed +-- However, by using separate tables, the CTE is able to function correctly. +CREATE TABLE omicron.public.virtual_provisioning_resource ( + -- Should match the UUID of the corresponding collection. + id UUID PRIMARY KEY, + time_modified TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Identifies the type of the resource. + resource_type STRING(63) NOT NULL, + + -- The amount of physical disk space which has been provisioned + -- on behalf of the resource. + virtual_disk_bytes_provisioned INT8 NOT NULL, + + -- The number of CPUs provisioned. + cpus_provisioned INT8 NOT NULL, + + -- The amount of RAM provisioned. + ram_provisioned INT8 NOT NULL +); + /* * ZPools of Storage, attached to Sleds. * Typically these are backed by a single physical disk. diff --git a/nexus/db-model/src/instance.rs b/nexus/db-model/src/instance.rs index 430c8c675bd..77b09a3e30c 100644 --- a/nexus/db-model/src/instance.rs +++ b/nexus/db-model/src/instance.rs @@ -12,11 +12,15 @@ use nexus_types::identity::Resource; use omicron_common::address::PROPOLIS_PORT; use omicron_common::api::external; use omicron_common::api::internal; +use serde::Deserialize; +use serde::Serialize; use std::net::SocketAddr; use uuid::Uuid; /// An Instance (VM). -#[derive(Queryable, Insertable, Debug, Selectable, Resource)] +#[derive( + Queryable, Insertable, Debug, Selectable, Resource, Serialize, Deserialize, +)] #[diesel(table_name = instance)] pub struct Instance { #[diesel(embed)] @@ -84,7 +88,16 @@ impl DatastoreAttachTargetConfig for Instance { /// metadata /// /// This state is owned by the sled agent running that Instance. -#[derive(Clone, Debug, AsChangeset, Selectable, Insertable, Queryable)] +#[derive( + Clone, + Debug, + AsChangeset, + Selectable, + Insertable, + Queryable, + Serialize, + Deserialize, +)] #[diesel(table_name = instance)] pub struct InstanceRuntimeState { /// runtime state of the Instance diff --git a/nexus/db-model/src/instance_cpu_count.rs b/nexus/db-model/src/instance_cpu_count.rs index 6923d8a1c79..2a84585ad2e 100644 --- a/nexus/db-model/src/instance_cpu_count.rs +++ b/nexus/db-model/src/instance_cpu_count.rs @@ -8,9 +8,13 @@ use diesel::pg::Pg; use diesel::serialize::{self, ToSql}; use diesel::sql_types; use omicron_common::api::external; +use serde::Deserialize; +use serde::Serialize; use std::convert::TryFrom; -#[derive(Copy, Clone, Debug, AsExpression, FromSqlRow)] +#[derive( + Copy, Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, +)] #[diesel(sql_type = sql_types::BigInt)] pub struct InstanceCpuCount(pub external::InstanceCpuCount); diff --git a/nexus/db-model/src/instance_state.rs b/nexus/db-model/src/instance_state.rs index c06118433d6..6baec7afbdf 100644 --- a/nexus/db-model/src/instance_state.rs +++ b/nexus/db-model/src/instance_state.rs @@ -4,6 +4,8 @@ use super::impl_enum_wrapper; use omicron_common::api::external; +use serde::Deserialize; +use serde::Serialize; use std::io::Write; impl_enum_wrapper!( @@ -11,7 +13,7 @@ impl_enum_wrapper!( #[diesel(postgres_type(name = "instance_state"))] pub struct InstanceStateEnum; - #[derive(Clone, Debug, PartialEq, AsExpression, FromSqlRow)] + #[derive(Clone, Debug, PartialEq, AsExpression, FromSqlRow, Serialize, Deserialize)] #[diesel(sql_type = InstanceStateEnum)] pub struct InstanceState(pub external::InstanceState); diff --git a/nexus/db-model/src/lib.rs b/nexus/db-model/src/lib.rs index 0adebd901a7..65d29caa1d4 100644 --- a/nexus/db-model/src/lib.rs +++ b/nexus/db-model/src/lib.rs @@ -62,6 +62,8 @@ mod ssh_key; mod u16; mod update_artifact; mod user_builtin; +mod virtual_provisioning_collection; +mod virtual_provisioning_resource; mod vni; mod volume; mod vpc; @@ -126,6 +128,8 @@ pub use snapshot::*; pub use ssh_key::*; pub use update_artifact::*; pub use user_builtin::*; +pub use virtual_provisioning_collection::*; +pub use virtual_provisioning_resource::*; pub use vni::*; pub use volume::*; pub use vpc::*; diff --git a/nexus/db-model/src/queries/mod.rs b/nexus/db-model/src/queries/mod.rs index 20c94b8285e..7724d48bab9 100644 --- a/nexus/db-model/src/queries/mod.rs +++ b/nexus/db-model/src/queries/mod.rs @@ -5,3 +5,4 @@ //! Subqueries used in CTEs. pub mod region_allocation; +pub mod virtual_provisioning_collection_update; diff --git a/nexus/db-model/src/queries/virtual_provisioning_collection_update.rs b/nexus/db-model/src/queries/virtual_provisioning_collection_update.rs new file mode 100644 index 00000000000..388737bf965 --- /dev/null +++ b/nexus/db-model/src/queries/virtual_provisioning_collection_update.rs @@ -0,0 +1,47 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Describes the resource provisioning update CTE +//! +//! Refer to +//! for the construction of this query. + +use crate::schema::organization; +use crate::schema::silo; +use crate::schema::virtual_provisioning_collection; + +table! { + parent_org { + id -> Uuid, + } +} + +table! { + parent_silo { + id -> Uuid, + } +} + +table! { + all_collections { + id -> Uuid, + } +} + +table! { + do_update (update) { + update -> Bool, + } +} + +diesel::allow_tables_to_appear_in_same_query!(organization, parent_org,); +diesel::allow_tables_to_appear_in_same_query!(silo, parent_silo,); + +diesel::allow_tables_to_appear_in_same_query!( + virtual_provisioning_collection, + parent_org, + parent_silo, + all_collections, + do_update, +); diff --git a/nexus/db-model/src/rack.rs b/nexus/db-model/src/rack.rs index 97f0291289e..0f1ef2a853c 100644 --- a/nexus/db-model/src/rack.rs +++ b/nexus/db-model/src/rack.rs @@ -13,7 +13,6 @@ use uuid::Uuid; pub struct Rack { #[diesel(embed)] pub identity: RackIdentity, - pub initialized: bool, pub tuf_base_url: Option, } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index e6e5a282a85..038f26b2e2f 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -404,6 +404,36 @@ table! { } } +table! { + virtual_provisioning_collection { + id -> Uuid, + // This type isn't actually "Nullable" - it's just handy to use the + // same type for insertion and querying, and doing so requires this + // field to appear optional so we can let this (default) field appear + // optional. + time_modified -> Nullable, + collection_type -> Text, + virtual_disk_bytes_provisioned -> Int8, + cpus_provisioned -> Int8, + ram_provisioned -> Int8, + } +} + +table! { + virtual_provisioning_resource { + id -> Uuid, + // This type isn't actually "Nullable" - it's just handy to use the + // same type for insertion and querying, and doing so requires this + // field to appear optional so we can let this (default) field appear + // optional. + time_modified -> Nullable, + resource_type -> Text, + virtual_disk_bytes_provisioned -> Int8, + cpus_provisioned -> Int8, + ram_provisioned -> Int8, + } +} + table! { zpool (id) { id -> Uuid, diff --git a/nexus/db-model/src/virtual_provisioning_collection.rs b/nexus/db-model/src/virtual_provisioning_collection.rs new file mode 100644 index 00000000000..0a4d9e27958 --- /dev/null +++ b/nexus/db-model/src/virtual_provisioning_collection.rs @@ -0,0 +1,53 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::schema::virtual_provisioning_collection; +use crate::ByteCount; +use chrono::{DateTime, Utc}; +use omicron_common::api::external; +use parse_display::Display; +use uuid::Uuid; + +#[derive(Debug, Display)] +pub enum CollectionTypeProvisioned { + Project, + Organization, + Silo, + Fleet, +} + +/// Describes virtual_provisioning_collection for a collection +#[derive(Clone, Selectable, Queryable, Insertable, Debug)] +#[diesel(table_name = virtual_provisioning_collection)] +#[diesel(treat_none_as_default_value = true)] +pub struct VirtualProvisioningCollection { + pub id: Uuid, + pub time_modified: Option>, + pub collection_type: String, + + pub virtual_disk_bytes_provisioned: ByteCount, + pub cpus_provisioned: i64, + pub ram_provisioned: ByteCount, +} + +impl VirtualProvisioningCollection { + pub fn new(id: Uuid, collection_type: CollectionTypeProvisioned) -> Self { + Self { + id, + time_modified: None, + collection_type: collection_type.to_string(), + virtual_disk_bytes_provisioned: ByteCount( + external::ByteCount::from(0), + ), + cpus_provisioned: 0, + ram_provisioned: ByteCount(external::ByteCount::from(0)), + } + } + + pub fn is_empty(&self) -> bool { + self.virtual_disk_bytes_provisioned.to_bytes() == 0 + && self.cpus_provisioned == 0 + && self.ram_provisioned.to_bytes() == 0 + } +} diff --git a/nexus/db-model/src/virtual_provisioning_resource.rs b/nexus/db-model/src/virtual_provisioning_resource.rs new file mode 100644 index 00000000000..237e37b17b4 --- /dev/null +++ b/nexus/db-model/src/virtual_provisioning_resource.rs @@ -0,0 +1,55 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::schema::virtual_provisioning_resource; +use crate::ByteCount; +use chrono::{DateTime, Utc}; +use omicron_common::api::external; +use uuid::Uuid; + +#[derive(Debug)] +pub enum ResourceTypeProvisioned { + Instance, + Disk, + Snapshot, +} + +impl std::fmt::Display for ResourceTypeProvisioned { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ResourceTypeProvisioned::Instance => write!(f, "instance"), + ResourceTypeProvisioned::Disk => write!(f, "disk"), + ResourceTypeProvisioned::Snapshot => write!(f, "snapshot"), + } + } +} + +/// Describes virtual_provisioning_resource for a resource. +#[derive(Clone, Selectable, Queryable, Insertable, Debug)] +#[diesel(table_name = virtual_provisioning_resource)] +#[diesel(treat_none_as_default_value = true)] +pub struct VirtualProvisioningResource { + pub id: Uuid, + pub time_modified: Option>, + pub resource_type: String, + + pub virtual_disk_bytes_provisioned: ByteCount, + pub cpus_provisioned: i64, + pub ram_provisioned: ByteCount, +} + +impl VirtualProvisioningResource { + pub fn new(id: Uuid, resource_type: ResourceTypeProvisioned) -> Self { + Self { + id, + time_modified: None, + resource_type: resource_type.to_string(), + virtual_disk_bytes_provisioned: ByteCount( + external::ByteCount::from(0), + ), + cpus_provisioned: 0, + ram_provisioned: ByteCount(external::ByteCount::from(0)), + } + } +} diff --git a/nexus/src/app/disk.rs b/nexus/src/app/disk.rs index 4a24950b6b0..0a4759d3fec 100644 --- a/nexus/src/app/disk.rs +++ b/nexus/src/app/disk.rs @@ -386,13 +386,17 @@ impl super::Nexus { pub async fn project_delete_disk( self: &Arc, + opctx: &OpContext, disk_lookup: &lookup::Disk<'_>, ) -> DeleteResult { - let (.., authz_disk) = + let (.., project, authz_disk) = disk_lookup.lookup_for(authz::Action::Delete).await?; - let saga_params = - sagas::disk_delete::Params { disk_id: authz_disk.id() }; + let saga_params = sagas::disk_delete::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + project_id: project.id(), + disk_id: authz_disk.id(), + }; self.execute_saga::(saga_params) .await?; Ok(()) @@ -542,7 +546,7 @@ impl super::Nexus { .fetch() .await?; - self.volume_remove_read_only_parent(db_disk.volume_id).await?; + self.volume_remove_read_only_parent(&opctx, db_disk.volume_id).await?; Ok(()) } diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 06b515b2853..1d4fc6ba4e6 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -265,12 +265,13 @@ impl super::Nexus { // TODO-robustness We need to figure out what to do with Destroyed // instances? Presumably we need to clean them up at some point, but // not right away so that callers can see that they've been destroyed. - let (.., authz_instance) = - instance_lookup.lookup_for(authz::Action::Delete).await?; + let (.., authz_instance, instance) = + instance_lookup.fetch_for(authz::Action::Delete).await?; let saga_params = sagas::instance_delete::Params { serialized_authn: authn::saga::Serialized::for_opctx(opctx), authz_instance, + instance, }; self.execute_saga::( saga_params, diff --git a/nexus/src/app/metrics.rs b/nexus/src/app/metrics.rs new file mode 100644 index 00000000000..947c8ef1a6f --- /dev/null +++ b/nexus/src/app/metrics.rs @@ -0,0 +1,39 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Metrics + +use crate::authz; +use crate::context::OpContext; +use crate::external_api::http_entrypoints::SystemMetricName; +use crate::external_api::http_entrypoints::SystemMetricParams; +use omicron_common::api::external::Error; +use oximeter_db::Measurement; +use std::num::NonZeroU32; + +impl super::Nexus { + pub async fn system_metric_lookup( + &self, + opctx: &OpContext, + metric_name: SystemMetricName, + query: SystemMetricParams, + limit: NonZeroU32, + ) -> Result, Error> { + let timeseries = match metric_name { + SystemMetricName::VirtualDiskSpaceProvisioned + | SystemMetricName::CpusProvisioned + | SystemMetricName::RamProvisioned => { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + format!("collection_target:{metric_name}") + } + }; + self.select_timeseries( + ×eries, + &[&format!("id=={}", query.id)], + query.pagination, + limit, + ) + .await + } +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index ce54bc347b3..1f293e3790b 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -14,6 +14,7 @@ use crate::populate::populate_start; use crate::populate::PopulateArgs; use crate::populate::PopulateStatus; use crate::saga_interface::SagaContext; +use ::oximeter::types::ProducerRegistry; use anyhow::anyhow; use omicron_common::api::external::Error; use slog::Logger; @@ -29,9 +30,11 @@ mod iam; mod image; mod instance; mod ip_pool; +mod metrics; mod organization; mod oximeter; mod project; +pub mod provisioning; mod rack; pub mod saga; mod session; @@ -123,12 +126,15 @@ impl Nexus { log: Logger, resolver: internal_dns_client::multiclient::Resolver, pool: db::Pool, + producer_registry: &ProducerRegistry, config: &config::Config, authz: Arc, ) -> Arc { let pool = Arc::new(pool); - let my_sec_id = db::SecId::from(config.deployment.id); let db_datastore = Arc::new(db::DataStore::new(Arc::clone(&pool))); + db_datastore.register_producers(&producer_registry); + + let my_sec_id = db::SecId::from(config.deployment.id); let sec_store = Arc::new(db::CockroachDbSecStore::new( my_sec_id, Arc::clone(&db_datastore), diff --git a/nexus/src/app/provisioning.rs b/nexus/src/app/provisioning.rs new file mode 100644 index 00000000000..1501ceed818 --- /dev/null +++ b/nexus/src/app/provisioning.rs @@ -0,0 +1,129 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Types to export metrics about provisioning information. + +use crate::db::model::VirtualProvisioningCollection; +use oximeter::{types::Sample, Metric, MetricsError, Target}; +use std::sync::{Arc, Mutex}; +use uuid::Uuid; + +/// Describes a collection that holds other resources. +/// +/// Example targets might include projects, organizations, silos or fleets. +#[derive(Debug, Clone, Target)] +struct CollectionTarget { + id: Uuid, +} + +#[derive(Debug, Clone, Metric)] +struct VirtualDiskSpaceProvisioned { + #[datum] + bytes_used: i64, +} + +#[derive(Debug, Clone, Metric)] +struct CpusProvisioned { + #[datum] + cpus: i64, +} + +#[derive(Debug, Clone, Metric)] +struct RamProvisioned { + #[datum] + bytes: i64, +} + +/// An oximeter producer for reporting [`VirtualProvisioningCollection`] information to Clickhouse. +/// +/// This producer collects samples whenever the database record for a collection +/// is created or updated. This implies that the CockroachDB record is always +/// kept up-to-date, and the Clickhouse historical records are batched and +/// transmitted once they are collected (as is the norm for Clickhouse metrics). +#[derive(Debug, Default, Clone)] +pub(crate) struct Producer { + samples: Arc>>, +} + +impl Producer { + pub fn new() -> Self { + Self { samples: Arc::new(Mutex::new(vec![])) } + } + + pub fn append_all_metrics( + &self, + provisions: &Vec, + ) { + self.append_cpu_metrics(&provisions); + self.append_disk_metrics(&provisions); + } + + pub fn append_disk_metrics( + &self, + provisions: &Vec, + ) { + let new_samples = provisions + .iter() + .map(|provision| { + Sample::new_with_timestamp( + provision + .time_modified + .expect("Should always have default value"), + &CollectionTarget { id: provision.id }, + &VirtualDiskSpaceProvisioned { + bytes_used: provision + .virtual_disk_bytes_provisioned + .into(), + }, + ) + }) + .collect::>(); + + self.append(new_samples); + } + + pub fn append_cpu_metrics( + &self, + provisions: &Vec, + ) { + let new_samples = provisions + .iter() + .map(|provision| { + Sample::new_with_timestamp( + provision + .time_modified + .expect("Should always have default value"), + &CollectionTarget { id: provision.id }, + &CpusProvisioned { cpus: provision.cpus_provisioned }, + ) + }) + .chain(provisions.iter().map(|provision| { + Sample::new_with_timestamp( + provision + .time_modified + .expect("Should always have default value"), + &CollectionTarget { id: provision.id }, + &RamProvisioned { bytes: provision.ram_provisioned.into() }, + ) + })) + .collect::>(); + + self.append(new_samples); + } + + fn append(&self, mut new_samples: Vec) { + let mut pending_samples = self.samples.lock().unwrap(); + pending_samples.append(&mut new_samples); + } +} + +impl oximeter::Producer for Producer { + fn produce( + &mut self, + ) -> Result + 'static>, MetricsError> { + let samples = + std::mem::replace(&mut *self.samples.lock().unwrap(), vec![]); + Ok(Box::new(samples.into_iter())) + } +} diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 256c3a73290..e12d32518ce 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -101,6 +101,15 @@ impl super::Nexus { ) -> Result { // Construct the context necessary to execute this saga. let saga_id = SagaId(Uuid::new_v4()); + + self.create_runnable_saga_with_id(dag, saga_id).await + } + + pub async fn create_runnable_saga_with_id( + self: &Arc, + dag: SagaDag, + saga_id: SagaId, + ) -> Result { let saga_logger = self.log.new(o!( "saga_name" => dag.saga_name().to_string(), "saga_id" => saga_id.to_string() diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index 12a00211996..9bd96dd5cf4 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -46,6 +46,10 @@ declare_saga_actions! { + sdc_alloc_regions - sdc_alloc_regions_undo } + SPACE_ACCOUNT -> "no_result" { + + sdc_account_space + - sdc_account_space_undo + } REGIONS_ENSURE -> "regions_ensure" { + sdc_regions_ensure - sdc_regions_ensure_undo @@ -89,6 +93,7 @@ impl NexusSaga for SagaDiskCreate { builder.append(create_disk_record_action()); builder.append(regions_alloc_action()); + builder.append(space_account_action()); builder.append(regions_ensure_action()); builder.append(create_volume_record_action()); builder.append(finalize_disk_record_action()); @@ -239,6 +244,48 @@ async fn sdc_alloc_regions_undo( Ok(()) } +async fn sdc_account_space( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let disk_created = sagactx.lookup::("created_disk")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_insert_disk( + &opctx, + disk_created.id(), + params.project_id, + disk_created.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdc_account_space_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let disk_created = sagactx.lookup::("created_disk")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_delete_disk( + &opctx, + disk_created.id(), + params.project_id, + disk_created.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + /// Call out to Crucible agent and perform region creation. async fn sdc_regions_ensure( sagactx: NexusActionContext, @@ -472,9 +519,11 @@ async fn sdc_create_volume_record_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let volume_id = sagactx.lookup::("volume_id")?; - osagactx.nexus().volume_delete(volume_id).await?; + osagactx.nexus().volume_delete(&opctx, volume_id).await?; Ok(()) } @@ -590,7 +639,10 @@ pub(crate) mod test { app::sagas::disk_create::SagaDiskCreate, authn::saga::Serialized, context::OpContext, db::datastore::DataStore, external_api::params, }; - use async_bb8_diesel::{AsyncRunQueryDsl, OptionalExtension}; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + OptionalExtension, + }; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use dropshot::test_util::ClientTestContext; use nexus_test_utils::resource_helpers::create_ip_pool; @@ -701,6 +753,53 @@ pub(crate) mod test { .is_none() } + async fn no_virtual_provisioning_resource_records_exist( + datastore: &DataStore, + ) -> bool { + use crate::db::model::VirtualProvisioningResource; + use crate::db::schema::virtual_provisioning_resource::dsl; + + dsl::virtual_provisioning_resource + .select(VirtualProvisioningResource::as_select()) + .first_async::( + datastore.pool_for_tests().await.unwrap(), + ) + .await + .optional() + .unwrap() + .is_none() + } + + async fn no_virtual_provisioning_collection_records_using_storage( + datastore: &DataStore, + ) -> bool { + use crate::db::model::VirtualProvisioningCollection; + use crate::db::schema::virtual_provisioning_collection::dsl; + + datastore + .pool_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + conn.batch_execute_async(crate::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + Ok::<_, crate::db::TransactionError<()>>( + dsl::virtual_provisioning_collection + .filter(dsl::virtual_disk_bytes_provisioned.ne(0)) + .select(VirtualProvisioningCollection::as_select()) + .get_results_async::( + &conn, + ) + .await + .unwrap() + .is_empty(), + ) + }) + .await + .unwrap() + } + async fn no_region_allocations_exist( datastore: &DataStore, test: &DiskTest, @@ -745,6 +844,13 @@ pub(crate) mod test { assert!(no_disk_records_exist(datastore).await); assert!(no_volume_records_exist(datastore).await); + assert!( + no_virtual_provisioning_resource_records_exist(datastore).await + ); + assert!( + no_virtual_provisioning_collection_records_using_storage(datastore) + .await + ); assert!(no_region_allocations_exist(datastore, &test).await); assert!(no_regions_ensured(&sled_agent, &test).await); } @@ -873,7 +979,7 @@ pub(crate) mod test { let disk_lookup = nexus.disk_lookup(&opctx, &disk_selector).unwrap(); nexus - .project_delete_disk(&disk_lookup) + .project_delete_disk(&opctx, &disk_lookup) .await .expect("Failed to delete disk"); } diff --git a/nexus/src/app/sagas/disk_delete.rs b/nexus/src/app/sagas/disk_delete.rs index 3634217a3a9..c149f1cc6b7 100644 --- a/nexus/src/app/sagas/disk_delete.rs +++ b/nexus/src/app/sagas/disk_delete.rs @@ -6,6 +6,9 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use crate::app::sagas::declare_saga_actions; +use crate::authn; +use crate::context::OpContext; +use crate::db; use serde::Deserialize; use serde::Serialize; use steno::ActionError; @@ -15,6 +18,8 @@ use uuid::Uuid; #[derive(Debug, Deserialize, Serialize)] pub struct Params { + pub serialized_authn: authn::saga::Serialized, + pub project_id: Uuid, pub disk_id: Uuid, } @@ -22,13 +27,17 @@ pub struct Params { declare_saga_actions! { disk_delete; - DELETE_DISK_RECORD -> "volume_id" { + DELETE_DISK_RECORD -> "deleted_disk" { // TODO: See the comment on the "DeleteRegions" step, // we may want to un-delete the disk if we cannot remove // underlying regions. + sdd_delete_disk_record } - DELETE_VOLUME -> "no_result" { + SPACE_ACCOUNT -> "no_result1" { + + sdd_account_space + - sdd_account_space_undo + } + DELETE_VOLUME -> "no_result2" { + sdd_delete_volume } } @@ -50,6 +59,7 @@ impl NexusSaga for SagaDiskDelete { mut builder: steno::DagBuilder, ) -> Result { builder.append(delete_disk_record_action()); + builder.append(space_account_action()); builder.append(delete_volume_action()); Ok(builder.build()?) } @@ -59,26 +69,71 @@ impl NexusSaga for SagaDiskDelete { async fn sdd_delete_disk_record( sagactx: NexusActionContext, -) -> Result { +) -> Result { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; - let volume_id = osagactx + let disk = osagactx .datastore() .project_delete_disk_no_auth(¶ms.disk_id) .await .map_err(ActionError::action_failed)?; - Ok(volume_id) + Ok(disk) +} + +async fn sdd_account_space( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let deleted_disk = sagactx.lookup::("deleted_disk")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_delete_disk( + &opctx, + deleted_disk.id(), + params.project_id, + deleted_disk.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sdd_account_space_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let deleted_disk = sagactx.lookup::("deleted_disk")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_insert_disk( + &opctx, + deleted_disk.id(), + params.project_id, + deleted_disk.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) } async fn sdd_delete_volume( sagactx: NexusActionContext, ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); - let volume_id = sagactx.lookup::("volume_id")?; + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + let volume_id = + sagactx.lookup::("deleted_disk")?.volume_id; osagactx .nexus() - .volume_delete(volume_id) + .volume_delete(&opctx, volume_id) .await .map_err(ActionError::action_failed)?; Ok(()) @@ -88,7 +143,8 @@ async fn sdd_delete_volume( pub(crate) mod test { use crate::{ app::saga::create_saga_dag, app::sagas::disk_delete::Params, - app::sagas::disk_delete::SagaDiskDelete, context::OpContext, + app::sagas::disk_delete::SagaDiskDelete, authn::saga::Serialized, + context::OpContext, }; use dropshot::test_util::ClientTestContext; use nexus_test_utils::resource_helpers::create_ip_pool; @@ -151,11 +207,16 @@ pub(crate) mod test { let client = &cptestctx.external_client; let nexus = &cptestctx.server.apictx.nexus; - create_org_and_project(&client).await; + let project_id = create_org_and_project(&client).await; let disk_id = create_disk(&cptestctx).await; // Build the saga DAG with the provided test parameters - let params = Params { disk_id }; + let opctx = test_opctx(&cptestctx); + let params = Params { + serialized_authn: Serialized::for_opctx(&opctx), + project_id, + disk_id, + }; let dag = create_saga_dag::(params).unwrap(); let runnable_saga = nexus.create_runnable_saga(dag).await.unwrap(); @@ -171,11 +232,16 @@ pub(crate) mod test { let client = &cptestctx.external_client; let nexus = &cptestctx.server.apictx.nexus; - create_org_and_project(&client).await; + let project_id = create_org_and_project(&client).await; let disk_id = create_disk(&cptestctx).await; // Build the saga DAG with the provided test parameters - let params = Params { disk_id }; + let opctx = test_opctx(&cptestctx); + let params = Params { + serialized_authn: Serialized::for_opctx(&opctx), + project_id, + disk_id, + }; let dag = create_saga_dag::(params).unwrap(); let runnable_saga = diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index 8f41eb47ccd..afc71a2c9b8 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -12,6 +12,7 @@ use crate::app::{ use crate::context::OpContext; use crate::db::identity::Resource; use crate::db::lookup::LookupPath; +use crate::db::model::ByteCount as DbByteCount; use crate::db::queries::network_interface::InsertError as InsertNicError; use crate::external_api::params; use crate::{authn, authz, db}; @@ -75,6 +76,10 @@ declare_saga_actions! { ALLOC_SERVER -> "server_id" { + sic_alloc_server } + RESOURCES_ACCOUNT -> "no_result" { + + sic_account_resources + - sic_account_resources_undo + } ALLOC_PROPOLIS_IP -> "propolis_ip" { + sic_allocate_propolis_ip } @@ -135,6 +140,7 @@ impl NexusSaga for SagaInstanceCreate { )); builder.append(alloc_server_action()); + builder.append(resources_account_action()); builder.append(alloc_propolis_ip_action()); builder.append(create_instance_record_action()); @@ -771,6 +777,50 @@ pub(super) async fn allocate_sled_ipv6( .map_err(ActionError::action_failed) } +async fn sic_account_resources( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let instance_id = sagactx.lookup::("instance_id")?; + + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_insert_instance( + &opctx, + instance_id, + params.project_id, + i64::from(params.create_params.ncpus.0), + DbByteCount(params.create_params.memory), + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn sic_account_resources_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let instance_id = sagactx.lookup::("instance_id")?; + + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_delete_instance( + &opctx, + instance_id, + params.project_id, + i64::from(params.create_params.ncpus.0), + DbByteCount(params.create_params.memory), + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + // Allocate an IP address on the destination sled for the Propolis server async fn sic_allocate_propolis_ip( sagactx: NexusActionContext, @@ -967,8 +1017,13 @@ pub mod test { authn::saga::Serialized, context::OpContext, db::datastore::DataStore, external_api::params, }; - use async_bb8_diesel::{AsyncRunQueryDsl, OptionalExtension}; - use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + OptionalExtension, + }; + use diesel::{ + BoolExpressionMethods, ExpressionMethods, QueryDsl, SelectableHelper, + }; use dropshot::test_util::ClientTestContext; use nexus_test_utils::resource_helpers::create_disk; use nexus_test_utils::resource_helpers::create_organization; @@ -1103,6 +1158,67 @@ pub mod test { .is_none() } + async fn no_virtual_provisioning_resource_records_exist( + datastore: &DataStore, + ) -> bool { + use crate::db::model::VirtualProvisioningResource; + use crate::db::schema::virtual_provisioning_resource::dsl; + + datastore.pool_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + conn + .batch_execute_async(crate::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + + Ok::<_, crate::db::TransactionError<()>>( + dsl::virtual_provisioning_resource + .filter(dsl::resource_type.eq(crate::db::model::ResourceTypeProvisioned::Instance.to_string())) + .select(VirtualProvisioningResource::as_select()) + .get_results_async::(&conn) + .await + .unwrap() + .is_empty() + ) + }).await.unwrap() + } + + async fn no_virtual_provisioning_collection_records_using_instances( + datastore: &DataStore, + ) -> bool { + use crate::db::model::VirtualProvisioningCollection; + use crate::db::schema::virtual_provisioning_collection::dsl; + + datastore + .pool_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + conn.batch_execute_async(crate::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + Ok::<_, crate::db::TransactionError<()>>( + dsl::virtual_provisioning_collection + .filter( + dsl::cpus_provisioned + .ne(0) + .or(dsl::ram_provisioned.ne(0)), + ) + .select(VirtualProvisioningCollection::as_select()) + .get_results_async::( + &conn, + ) + .await + .unwrap() + .is_empty(), + ) + }) + .await + .unwrap() + } + async fn disk_is_detached(datastore: &DataStore) -> bool { use crate::db::model::Disk; use crate::db::schema::disk::dsl; @@ -1134,6 +1250,15 @@ pub mod test { assert!(no_instance_records_exist(datastore).await); assert!(no_network_interface_records_exist(datastore).await); assert!(no_external_ip_records_exist(datastore).await); + assert!( + no_virtual_provisioning_resource_records_exist(datastore).await + ); + assert!( + no_virtual_provisioning_collection_records_using_instances( + datastore + ) + .await + ); assert!(disk_is_detached(datastore).await); assert!(no_instances_or_disks_on_sled(&sled_agent).await); } diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index e96d3c38534..7a7b5da8376 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -7,7 +7,9 @@ use super::NexusActionContext; use super::NexusSaga; use crate::app::sagas::declare_saga_actions; use crate::context::OpContext; +use crate::db; use crate::{authn, authz}; +use nexus_types::identity::Resource; use omicron_common::api::external::{Error, ResourceType}; use serde::Deserialize; use serde::Serialize; @@ -19,6 +21,7 @@ use steno::ActionError; pub struct Params { pub serialized_authn: authn::saga::Serialized, pub authz_instance: authz::Instance, + pub instance: db::model::Instance, } // instance delete saga: actions @@ -34,6 +37,9 @@ declare_saga_actions! { DEALLOCATE_EXTERNAL_IP -> "no_result3" { + sid_deallocate_external_ip } + RESOURCES_ACCOUNT -> "no_result4" { + + sid_account_resources + } } // instance delete saga: definition @@ -55,6 +61,7 @@ impl NexusSaga for SagaInstanceDelete { builder.append(instance_delete_record_action()); builder.append(delete_network_interfaces_action()); builder.append(deallocate_external_ip_action()); + builder.append(resources_account_action()); Ok(builder.build()?) } } @@ -116,6 +123,27 @@ async fn sid_deallocate_external_ip( Ok(()) } +async fn sid_account_resources( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + osagactx + .datastore() + .virtual_provisioning_collection_delete_instance( + &opctx, + params.instance.id(), + params.instance.project_id, + i64::from(params.instance.runtime_state.ncpus.0 .0), + params.instance.runtime_state.memory, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + #[cfg(test)] mod test { use crate::{ @@ -163,7 +191,7 @@ mod test { let opctx = test_opctx(&cptestctx); let datastore = cptestctx.server.apictx.nexus.datastore(); - let (.., authz_instance, _instance) = + let (.., authz_instance, instance) = LookupPath::new(&opctx, &datastore) .instance_id(instance_id) .fetch() @@ -172,6 +200,7 @@ mod test { Params { serialized_authn: Serialized::for_opctx(&opctx), authz_instance, + instance, } } diff --git a/nexus/src/app/sagas/project_create.rs b/nexus/src/app/sagas/project_create.rs index 22c6cc7a907..14a13bca44f 100644 --- a/nexus/src/app/sagas/project_create.rs +++ b/nexus/src/app/sagas/project_create.rs @@ -151,7 +151,10 @@ mod test { authz, context::OpContext, db::datastore::DataStore, external_api::params, }; - use async_bb8_diesel::{AsyncRunQueryDsl, OptionalExtension}; + use async_bb8_diesel::{ + AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, + OptionalExtension, + }; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use dropshot::test_util::ClientTestContext; use nexus_test_utils::resource_helpers::create_organization; @@ -216,6 +219,10 @@ mod test { async fn verify_clean_slate(datastore: &DataStore) { assert!(no_projects_exist(datastore).await); + assert!( + no_virtual_provisioning_collection_records_for_projects(datastore) + .await + ); crate::app::sagas::vpc_create::test::verify_clean_slate(datastore) .await; } @@ -237,6 +244,33 @@ mod test { .is_none() } + async fn no_virtual_provisioning_collection_records_for_projects( + datastore: &DataStore, + ) -> bool { + use crate::db::model::VirtualProvisioningCollection; + use crate::db::schema::virtual_provisioning_collection::dsl; + + datastore.pool_for_tests() + .await + .unwrap() + .transaction_async(|conn| async move { + conn + .batch_execute_async(crate::db::ALLOW_FULL_TABLE_SCAN_SQL) + .await + .unwrap(); + Ok::<_, crate::db::TransactionError<()>>( + dsl::virtual_provisioning_collection + .filter(dsl::collection_type.eq(crate::db::model::CollectionTypeProvisioned::Project.to_string())) + + .select(VirtualProvisioningCollection::as_select()) + .get_results_async::(&conn) + .await + .unwrap() + .is_empty() + ) + }).await.unwrap() + } + #[nexus_test(server = crate::Server)] async fn test_saga_basic_usage_succeeds( cptestctx: &ControlPlaneTestContext, diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index f2d7feb555a..50655ec2dbd 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -142,6 +142,10 @@ declare_saga_actions! { + ssc_create_snapshot_record - ssc_create_snapshot_record_undo } + SPACE_ACCOUNT -> "no_result" { + + ssc_account_space + - ssc_account_space_undo + } SEND_SNAPSHOT_REQUEST -> "snapshot_request" { + ssc_send_snapshot_request - ssc_send_snapshot_request_undo @@ -204,6 +208,8 @@ impl NexusSaga for SagaSnapshotCreate { // (DB) Creates a record of the snapshot, referencing both the // original disk ID and the destination volume builder.append(create_snapshot_record_action()); + // (DB) Tracks virtual resource provisioning. + builder.append(space_account_action()); // (Sleds) Sends a request for the disk to create a ZFS snapshot builder.append(send_snapshot_request_action()); // (Sleds + DB) Start snapshot downstairs, add an entry in the DB for @@ -409,10 +415,12 @@ async fn ssc_create_destination_volume_record_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let destination_volume_id = sagactx.lookup::("destination_volume_id")?; - osagactx.nexus().volume_delete(destination_volume_id).await?; + osagactx.nexus().volume_delete(&opctx, destination_volume_id).await?; Ok(()) } @@ -504,6 +512,49 @@ async fn ssc_create_snapshot_record_undo( Ok(()) } +async fn ssc_account_space( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let snapshot_created = + sagactx.lookup::("created_snapshot")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_insert_snapshot( + &opctx, + snapshot_created.id(), + params.project_id, + snapshot_created.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} + +async fn ssc_account_space_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let snapshot_created = + sagactx.lookup::("created_snapshot")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_delete_snapshot( + &opctx, + snapshot_created.id(), + params.project_id, + snapshot_created.size, + ) + .await?; + Ok(()) +} + async fn ssc_send_snapshot_request( sagactx: NexusActionContext, ) -> Result<(), ActionError> { @@ -874,10 +925,12 @@ async fn ssc_create_volume_record_undo( ) -> Result<(), anyhow::Error> { let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); let volume_id = sagactx.lookup::("volume_id")?; info!(log, "deleting volume {}", volume_id); - osagactx.nexus().volume_delete(volume_id).await?; + osagactx.nexus().volume_delete(&opctx, volume_id).await?; Ok(()) } diff --git a/nexus/src/app/sagas/snapshot_delete.rs b/nexus/src/app/sagas/snapshot_delete.rs index 516010a58b2..9ff8283ae6f 100644 --- a/nexus/src/app/sagas/snapshot_delete.rs +++ b/nexus/src/app/sagas/snapshot_delete.rs @@ -21,9 +21,12 @@ pub struct Params { declare_saga_actions! { snapshot_delete; - DELETE_SNAPSHOT_RECORD -> "no_result" { + DELETE_SNAPSHOT_RECORD -> "no_result1" { + ssd_delete_snapshot_record } + SPACE_ACCOUNT -> "no_result2" { + + ssd_account_space + } } #[derive(Debug)] @@ -41,12 +44,14 @@ impl NexusSaga for SagaSnapshotDelete { mut builder: steno::DagBuilder, ) -> Result { builder.append(delete_snapshot_record_action()); + builder.append(space_account_action()); const DELETE_VOLUME_PARAMS: &'static str = "delete_volume_params"; const DELETE_VOLUME_DESTINATION_PARAMS: &'static str = "delete_volume_destination_params"; let volume_delete_params = sagas::volume_delete::Params { + serialized_authn: params.serialized_authn.clone(), volume_id: params.snapshot.volume_id, }; builder.append(Node::constant( @@ -60,6 +65,7 @@ impl NexusSaga for SagaSnapshotDelete { )); let volume_delete_params = sagas::volume_delete::Params { + serialized_authn: params.serialized_authn.clone(), volume_id: params.snapshot.destination_volume_id, }; builder.append(Node::constant( @@ -93,6 +99,8 @@ impl NexusSaga for SagaSnapshotDelete { } } +// snapshot delete saga: action implementations + async fn ssd_delete_snapshot_record( sagactx: NexusActionContext, ) -> Result<(), ActionError> { @@ -111,3 +119,22 @@ async fn ssd_delete_snapshot_record( .map_err(ActionError::action_failed)?; Ok(()) } + +async fn ssd_account_space( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + osagactx + .datastore() + .virtual_provisioning_collection_delete_snapshot( + &opctx, + params.authz_snapshot.id(), + params.snapshot.project_id, + params.snapshot.size, + ) + .await + .map_err(ActionError::action_failed)?; + Ok(()) +} diff --git a/nexus/src/app/sagas/volume_delete.rs b/nexus/src/app/sagas/volume_delete.rs index 53c51f8facc..df449f67f95 100644 --- a/nexus/src/app/sagas/volume_delete.rs +++ b/nexus/src/app/sagas/volume_delete.rs @@ -29,6 +29,7 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use crate::app::sagas::declare_saga_actions; +use crate::authn; use crate::db::datastore::CrucibleResources; use nexus_types::identity::Asset; use serde::Deserialize; @@ -40,9 +41,9 @@ use uuid::Uuid; #[derive(Debug, Deserialize, Serialize)] pub struct Params { + pub serialized_authn: authn::saga::Serialized, pub volume_id: Uuid, } - // volume delete saga: actions declare_saga_actions! { diff --git a/nexus/src/app/sagas/volume_remove_rop.rs b/nexus/src/app/sagas/volume_remove_rop.rs index b7eb30cc125..c97660dc8ed 100644 --- a/nexus/src/app/sagas/volume_remove_rop.rs +++ b/nexus/src/app/sagas/volume_remove_rop.rs @@ -5,6 +5,7 @@ use super::{ActionRegistry, NexusActionContext, NexusSaga, SagaInitError}; use crate::app::sagas; use crate::app::sagas::declare_saga_actions; +use crate::authn; use crate::db; use omicron_common::api::external::Error; use serde::Deserialize; @@ -17,6 +18,7 @@ use uuid::Uuid; #[derive(Debug, Deserialize, Serialize)] pub struct Params { + pub serialized_authn: authn::saga::Serialized, pub volume_id: Uuid, } @@ -65,14 +67,16 @@ impl NexusSaga for SagaVolumeRemoveROP { } fn make_saga_dag( - _params: &Self::Params, + params: &Self::Params, mut builder: steno::DagBuilder, ) -> Result { // Generate the temp volume ID this saga will use. let temp_volume_id = Uuid::new_v4(); // Generate the params for the subsaga called at the end. - let subsaga_params = - sagas::volume_delete::Params { volume_id: temp_volume_id }; + let subsaga_params = sagas::volume_delete::Params { + serialized_authn: params.serialized_authn.clone(), + volume_id: temp_volume_id, + }; let subsaga_dag = { let subsaga_builder = steno::DagBuilder::new(steno::SagaName::new( sagas::volume_delete::SagaVolumeDelete::NAME, diff --git a/nexus/src/app/volume.rs b/nexus/src/app/volume.rs index 86c01b19c2a..99018083f0d 100644 --- a/nexus/src/app/volume.rs +++ b/nexus/src/app/volume.rs @@ -5,6 +5,8 @@ //! Volumes use crate::app::sagas; +use crate::authn; +use crate::context::OpContext; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use std::sync::Arc; @@ -24,9 +26,13 @@ impl super::Nexus { /// and the user's query shouldn't wait on those DELETE calls. pub async fn volume_delete( self: &Arc, + opctx: &OpContext, volume_id: Uuid, ) -> DeleteResult { - let saga_params = sagas::volume_delete::Params { volume_id }; + let saga_params = sagas::volume_delete::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + volume_id, + }; // TODO execute this in the background instead, not using the usual SEC let saga_outputs = self @@ -44,9 +50,13 @@ impl super::Nexus { /// Start a saga to remove a read only parent from a volume. pub async fn volume_remove_read_only_parent( self: &Arc, + opctx: &OpContext, volume_id: Uuid, ) -> DeleteResult { - let saga_params = sagas::volume_remove_rop::Params { volume_id }; + let saga_params = sagas::volume_remove_rop::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + volume_id, + }; self.execute_saga::( saga_params, diff --git a/nexus/src/context.rs b/nexus/src/context.rs index 6f44504e6ae..f82ed523d10 100644 --- a/nexus/src/context.rs +++ b/nexus/src/context.rs @@ -173,6 +173,7 @@ impl ServerContext { log.new(o!("component" => "nexus")), resolver, pool, + &producer_registry, config, Arc::clone(&authz), ) diff --git a/nexus/src/db/datastore/disk.rs b/nexus/src/db/datastore/disk.rs index 33a6b257cf1..98a35d8ae6d 100644 --- a/nexus/src/db/datastore/disk.rs +++ b/nexus/src/db/datastore/disk.rs @@ -523,7 +523,7 @@ impl DataStore { pub async fn project_delete_disk_no_auth( &self, disk_id: &Uuid, - ) -> Result { + ) -> Result { use db::schema::disk::dsl; let pool = self.pool(); let now = Utc::now(); @@ -558,7 +558,7 @@ impl DataStore { })?; match result.status { - UpdateStatus::Updated => Ok(result.found.volume_id), + UpdateStatus::Updated => Ok(result.found), UpdateStatus::NotUpdatedButExists => { let disk = result.found; let disk_state = disk.state(); @@ -568,7 +568,7 @@ impl DataStore { { // To maintain idempotency, if the disk has already been // destroyed, don't throw an error. - return Ok(disk.volume_id); + return Ok(disk); } else if !ok_to_delete_states.contains(disk_state.state()) { return Err(Error::InvalidRequest { message: format!( diff --git a/nexus/src/db/datastore/mod.rs b/nexus/src/db/datastore/mod.rs index 460fabfa958..2c201e1b86d 100644 --- a/nexus/src/db/datastore/mod.rs +++ b/nexus/src/db/datastore/mod.rs @@ -26,6 +26,7 @@ use crate::db::{ self, error::{public_error_from_diesel_pool, ErrorHandler}, }; +use ::oximeter::types::ProducerRegistry; use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager}; use diesel::pg::Pg; use diesel::prelude::*; @@ -66,10 +67,12 @@ mod sled; mod snapshot; mod ssh_key; mod update; +mod virtual_provisioning_collection; mod volume; mod vpc; mod zpool; +pub use virtual_provisioning_collection::StorageType; pub use volume::CrucibleResources; // Number of unique datasets required to back a region. @@ -103,6 +106,8 @@ impl RunnableQuery for T where pub struct DataStore { pool: Arc, + virtual_provisioning_collection_producer: + crate::app::provisioning::Producer, } // The majority of `DataStore`'s methods live in our submodules as a concession @@ -110,7 +115,19 @@ pub struct DataStore { // recompilation of that query's module instead of all queries on `DataStore`. impl DataStore { pub fn new(pool: Arc) -> Self { - DataStore { pool } + DataStore { + pool, + virtual_provisioning_collection_producer: + crate::app::provisioning::Producer::new(), + } + } + + pub fn register_producers(&self, registry: &ProducerRegistry) { + registry + .register_producer( + self.virtual_provisioning_collection_producer.clone(), + ) + .unwrap(); } // TODO-security This should be deprecated in favor of pool_authorized(), diff --git a/nexus/src/db/datastore/organization.rs b/nexus/src/db/datastore/organization.rs index e237aa63819..74c93c00b2a 100644 --- a/nexus/src/db/datastore/organization.rs +++ b/nexus/src/db/datastore/organization.rs @@ -6,6 +6,7 @@ use super::DataStore; use crate::authz; +use crate::authz::ApiResource; use crate::context::OpContext; use crate::db; use crate::db::collection_insert::AsyncInsertError; @@ -13,14 +14,17 @@ use crate::db::collection_insert::DatastoreCollection; use crate::db::error::diesel_pool_result_optional; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; +use crate::db::error::TransactionError; use crate::db::identity::Resource; +use crate::db::model::CollectionTypeProvisioned; use crate::db::model::Name; use crate::db::model::Organization; use crate::db::model::OrganizationUpdate; use crate::db::model::Silo; +use crate::db::model::VirtualProvisioningCollection; use crate::db::pagination::paginated; use crate::external_api::params; -use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl, PoolError}; use chrono::Utc; use diesel::prelude::*; use omicron_common::api::external::CreateResult; @@ -51,27 +55,48 @@ impl DataStore { let organization = Organization::new(organization.clone(), silo_id); let name = organization.name().as_str().to_string(); - Silo::insert_resource( - silo_id, - diesel::insert_into(dsl::organization).values(organization), - ) - .insert_and_get_result_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| match e { - AsyncInsertError::CollectionNotFound => Error::InternalError { - internal_message: format!( - "attempting to create an \ - organization under non-existent silo {}", - silo_id - ), - }, - AsyncInsertError::DatabaseError(e) => { - public_error_from_diesel_pool( - e, - ErrorHandler::Conflict(ResourceType::Organization, &name), + self.pool_authorized(opctx) + .await? + .transaction_async(|conn| async move { + let org = Silo::insert_resource( + silo_id, + diesel::insert_into(dsl::organization).values(organization), ) - } - }) + .insert_and_get_result_async(&conn) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => { + authz_silo.not_found() + } + AsyncInsertError::DatabaseError(e) => { + public_error_from_diesel_pool( + e, + ErrorHandler::Conflict( + ResourceType::Organization, + &name, + ), + ) + } + })?; + + self.virtual_provisioning_collection_create_on_connection( + &conn, + VirtualProvisioningCollection::new( + org.id(), + CollectionTypeProvisioned::Organization, + ), + ) + .await?; + + Ok(org) + }) + .await + .map_err(|e| match e { + TransactionError::CustomError(e) => e, + TransactionError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + }) } /// Delete a organization @@ -105,27 +130,48 @@ impl DataStore { } let now = Utc::now(); - let updated_rows = diesel::update(dsl::organization) - .filter(dsl::time_deleted.is_null()) - .filter(dsl::id.eq(authz_org.id())) - .filter(dsl::rcgen.eq(db_org.rcgen)) - .set(dsl::time_deleted.eq(now)) - .execute_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByResource(authz_org), + + type TxnError = TransactionError; + self.pool_authorized(opctx) + .await? + .transaction_async(|conn| async move { + let updated_rows = diesel::update(dsl::organization) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(authz_org.id())) + .filter(dsl::rcgen.eq(db_org.rcgen)) + .set(dsl::time_deleted.eq(now)) + .execute_async(&conn) + .await + .map_err(|e| { + public_error_from_diesel_pool( + PoolError::from(e), + ErrorHandler::NotFoundByResource(authz_org), + ) + })?; + + if updated_rows == 0 { + return Err(TxnError::CustomError(Error::InvalidRequest { + message: + "deletion failed due to concurrent modification" + .to_string(), + })); + } + + self.virtual_provisioning_collection_delete_on_connection( + &conn, + authz_org.id(), ) - })?; + .await?; - if updated_rows == 0 { - return Err(Error::InvalidRequest { - message: "deletion failed due to concurrent modification" - .to_string(), - }); - } - Ok(()) + Ok(()) + }) + .await + .map_err(|e| match e { + TxnError::CustomError(e) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + }) } pub async fn organizations_list_by_id( diff --git a/nexus/src/db/datastore/project.rs b/nexus/src/db/datastore/project.rs index aec73001aed..707edaa4e1b 100644 --- a/nexus/src/db/datastore/project.rs +++ b/nexus/src/db/datastore/project.rs @@ -13,13 +13,16 @@ use crate::db::collection_insert::DatastoreCollection; use crate::db::error::diesel_pool_result_optional; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; +use crate::db::error::TransactionError; use crate::db::identity::Resource; +use crate::db::model::CollectionTypeProvisioned; use crate::db::model::Name; use crate::db::model::Organization; use crate::db::model::Project; use crate::db::model::ProjectUpdate; +use crate::db::model::VirtualProvisioningCollection; use crate::db::pagination::paginated; -use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl, PoolError}; use chrono::Utc; use diesel::prelude::*; use omicron_common::api::external::CreateResult; @@ -101,24 +104,52 @@ impl DataStore { let name = project.name().as_str().to_string(); let organization_id = project.organization_id; - let db_project = Organization::insert_resource( - organization_id, - diesel::insert_into(dsl::project).values(project), - ) - .insert_and_get_result_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| match e { - AsyncInsertError::CollectionNotFound => Error::ObjectNotFound { - type_name: ResourceType::Organization, - lookup_type: LookupType::ById(organization_id), - }, - AsyncInsertError::DatabaseError(e) => { - public_error_from_diesel_pool( - e, - ErrorHandler::Conflict(ResourceType::Project, &name), + let db_project = self + .pool_authorized(opctx) + .await? + .transaction_async(|conn| async move { + let project = Organization::insert_resource( + organization_id, + diesel::insert_into(dsl::project).values(project), ) - } - })?; + .insert_and_get_result_async(&conn) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => { + Error::ObjectNotFound { + type_name: ResourceType::Organization, + lookup_type: LookupType::ById(organization_id), + } + } + AsyncInsertError::DatabaseError(e) => { + public_error_from_diesel_pool( + e, + ErrorHandler::Conflict( + ResourceType::Project, + &name, + ), + ) + } + })?; + + // Create resource provisioning for the project. + self.virtual_provisioning_collection_create_on_connection( + &conn, + VirtualProvisioningCollection::new( + project.id(), + CollectionTypeProvisioned::Project, + ), + ) + .await?; + Ok(project) + }) + .await + .map_err(|e| match e { + TransactionError::CustomError(e) => e, + TransactionError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + })?; Ok(( authz::Project::new( @@ -154,28 +185,48 @@ impl DataStore { use db::schema::project::dsl; - let now = Utc::now(); - let updated_rows = diesel::update(dsl::project) - .filter(dsl::time_deleted.is_null()) - .filter(dsl::id.eq(authz_project.id())) - .filter(dsl::rcgen.eq(db_project.rcgen)) - .set(dsl::time_deleted.eq(now)) - .returning(Project::as_returning()) - .execute_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByResource(authz_project), + type TxnError = TransactionError; + self.pool_authorized(opctx) + .await? + .transaction_async(|conn| async move { + let now = Utc::now(); + let updated_rows = diesel::update(dsl::project) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(authz_project.id())) + .filter(dsl::rcgen.eq(db_project.rcgen)) + .set(dsl::time_deleted.eq(now)) + .returning(Project::as_returning()) + .execute_async(&conn) + .await + .map_err(|e| { + public_error_from_diesel_pool( + PoolError::from(e), + ErrorHandler::NotFoundByResource(authz_project), + ) + })?; + + if updated_rows == 0 { + return Err(TxnError::CustomError(Error::InvalidRequest { + message: + "deletion failed due to concurrent modification" + .to_string(), + })); + } + + self.virtual_provisioning_collection_delete_on_connection( + &conn, + db_project.id(), ) + .await?; + Ok(()) + }) + .await + .map_err(|e| match e { + TxnError::CustomError(e) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } })?; - - if updated_rows == 0 { - return Err(Error::InvalidRequest { - message: "deletion failed due to concurrent modification" - .to_string(), - }); - } Ok(()) } diff --git a/nexus/src/db/datastore/silo.rs b/nexus/src/db/datastore/silo.rs index f1e85033f88..165ac5ef316 100644 --- a/nexus/src/db/datastore/silo.rs +++ b/nexus/src/db/datastore/silo.rs @@ -15,13 +15,16 @@ use crate::db::error::ErrorHandler; use crate::db::error::TransactionError; use crate::db::fixed_data::silo::DEFAULT_SILO; use crate::db::identity::Resource; +use crate::db::model::CollectionTypeProvisioned; use crate::db::model::Name; use crate::db::model::Silo; +use crate::db::model::VirtualProvisioningCollection; use crate::db::pagination::paginated; use crate::external_api::params; use crate::external_api::shared; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::PoolError; use chrono::Utc; use diesel::prelude::*; use omicron_common::api::external::CreateResult; @@ -53,10 +56,20 @@ impl DataStore { public_error_from_diesel_pool(e, ErrorHandler::Server) })?; info!(opctx.log, "created {} built-in silos", count); + + self.virtual_provisioning_collection_create( + opctx, + VirtualProvisioningCollection::new( + DEFAULT_SILO.id(), + CollectionTypeProvisioned::Silo, + ), + ) + .await?; + Ok(()) } - pub async fn silo_create_query( + async fn silo_create_query( opctx: &OpContext, silo: Silo, ) -> Result, Error> { @@ -77,7 +90,7 @@ impl DataStore { let silo_id = Uuid::new_v4(); let silo_group_id = Uuid::new_v4(); - let silo_create_query = DataStore::silo_create_query( + let silo_create_query = Self::silo_create_query( opctx, db::model::Silo::new_with_id(silo_id, new_silo_params.clone()), ) @@ -134,6 +147,14 @@ impl DataStore { .await? .transaction_async(|conn| async move { let silo = silo_create_query.get_result_async(&conn).await?; + self.virtual_provisioning_collection_create_on_connection( + &conn, + VirtualProvisioningCollection::new( + silo.id(), + CollectionTypeProvisioned::Silo, + ), + ) + .await?; if let Some(query) = silo_admin_group_ensure_query { query.get_result_async(&conn).await?; @@ -229,28 +250,48 @@ impl DataStore { } let now = Utc::now(); - let updated_rows = diesel::update(silo::dsl::silo) - .filter(silo::dsl::time_deleted.is_null()) - .filter(silo::dsl::id.eq(id)) - .filter(silo::dsl::rcgen.eq(rcgen)) - .set(silo::dsl::time_deleted.eq(now)) - .execute_async(self.pool_authorized(opctx).await?) - .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByResource(authz_silo), - ) - })?; - if updated_rows == 0 { - return Err(Error::InvalidRequest { - message: "silo deletion failed due to concurrent modification" - .to_string(), - }); - } + type TxnError = TransactionError; + self.pool_authorized(opctx) + .await? + .transaction_async(|conn| async move { + let updated_rows = diesel::update(silo::dsl::silo) + .filter(silo::dsl::time_deleted.is_null()) + .filter(silo::dsl::id.eq(id)) + .filter(silo::dsl::rcgen.eq(rcgen)) + .set(silo::dsl::time_deleted.eq(now)) + .execute_async(&conn) + .await + .map_err(|e| { + public_error_from_diesel_pool( + PoolError::from(e), + ErrorHandler::NotFoundByResource(authz_silo), + ) + })?; + + if updated_rows == 0 { + return Err(TxnError::CustomError(Error::InvalidRequest { + message: "silo deletion failed due to concurrent modification" + .to_string(), + })); + } + + info!(opctx.log, "deleted silo {}", id); - info!(opctx.log, "deleted silo {}", id); + self.virtual_provisioning_collection_delete_on_connection( + &conn, + id, + ).await?; + + Ok(()) + }) + .await + .map_err(|e| match e { + TxnError::CustomError(e) => e, + TxnError::Pool(e) => { + public_error_from_diesel_pool(e, ErrorHandler::Server) + } + })?; // TODO-correctness This needs to happen in a saga or some other // mechanism that ensures it happens even if we crash at this point. diff --git a/nexus/src/db/datastore/virtual_provisioning_collection.rs b/nexus/src/db/datastore/virtual_provisioning_collection.rs new file mode 100644 index 00000000000..da64f76c6f1 --- /dev/null +++ b/nexus/src/db/datastore/virtual_provisioning_collection.rs @@ -0,0 +1,323 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! [`DataStore`] methods on [`VirtualProvisioningCollection`]s. + +use super::DataStore; +use crate::context::OpContext; +use crate::db; +use crate::db::error::public_error_from_diesel_pool; +use crate::db::error::ErrorHandler; +use crate::db::model::ByteCount; +use crate::db::model::VirtualProvisioningCollection; +use crate::db::pool::DbConnection; +use crate::db::queries::virtual_provisioning_collection_update::VirtualProvisioningCollectionUpdate; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; +use diesel::prelude::*; +use omicron_common::api::external::{DeleteResult, Error}; +use uuid::Uuid; + +/// The types of resources which can consume storage space. +pub enum StorageType { + Disk, + Snapshot, +} + +impl From for crate::db::model::ResourceTypeProvisioned { + fn from( + storage_type: StorageType, + ) -> crate::db::model::ResourceTypeProvisioned { + match storage_type { + StorageType::Disk => { + crate::db::model::ResourceTypeProvisioned::Disk + } + StorageType::Snapshot => { + crate::db::model::ResourceTypeProvisioned::Snapshot + } + } + } +} + +impl DataStore { + /// Create a [`VirtualProvisioningCollection`] object. + pub async fn virtual_provisioning_collection_create( + &self, + opctx: &OpContext, + virtual_provisioning_collection: VirtualProvisioningCollection, + ) -> Result, Error> { + let pool = self.pool_authorized(opctx).await?; + self.virtual_provisioning_collection_create_on_connection( + pool, + virtual_provisioning_collection, + ) + .await + } + + pub(crate) async fn virtual_provisioning_collection_create_on_connection< + ConnErr, + >( + &self, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), + virtual_provisioning_collection: VirtualProvisioningCollection, + ) -> Result, Error> + where + ConnErr: From + Send + 'static, + PoolError: From, + { + use db::schema::virtual_provisioning_collection::dsl; + + let provisions: Vec = + diesel::insert_into(dsl::virtual_provisioning_collection) + .values(virtual_provisioning_collection) + .on_conflict_do_nothing() + .get_results_async(conn) + .await + .map_err(|e| { + public_error_from_diesel_pool( + PoolError::from(e), + ErrorHandler::Server, + ) + })?; + self.virtual_provisioning_collection_producer + .append_all_metrics(&provisions); + Ok(provisions) + } + + pub async fn virtual_provisioning_collection_get( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result { + use db::schema::virtual_provisioning_collection::dsl; + + let virtual_provisioning_collection = + dsl::virtual_provisioning_collection + .find(id) + .select(VirtualProvisioningCollection::as_select()) + .get_result_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool(e, ErrorHandler::Server) + })?; + Ok(virtual_provisioning_collection) + } + + /// Delete a [`VirtualProvisioningCollection`] object. + pub async fn virtual_provisioning_collection_delete( + &self, + opctx: &OpContext, + id: Uuid, + ) -> DeleteResult { + let pool = self.pool_authorized(opctx).await?; + self.virtual_provisioning_collection_delete_on_connection(pool, id) + .await + } + + /// Delete a [`VirtualProvisioningCollection`] object. + pub(crate) async fn virtual_provisioning_collection_delete_on_connection< + ConnErr, + >( + &self, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), + id: Uuid, + ) -> DeleteResult + where + ConnErr: From + Send + 'static, + PoolError: From, + { + use db::schema::virtual_provisioning_collection::dsl; + + // NOTE: We don't really need to extract the value we're deleting from + // the DB, but by doing so, we can validate that we haven't + // miscalculated our usage accounting. + let collection = diesel::delete(dsl::virtual_provisioning_collection) + .filter(dsl::id.eq(id)) + .returning(VirtualProvisioningCollection::as_select()) + .get_result_async(conn) + .await + .map_err(|e| { + public_error_from_diesel_pool( + PoolError::from(e), + ErrorHandler::Server, + ) + })?; + assert!( + collection.is_empty(), + "Collection deleted while non-empty: {collection:?}" + ); + Ok(()) + } + + // TODO: These could 100% act on model types: + // - Would help with identifying UUID + // - Would help with project ID lookup + // - Would help with calculating resource usage + // + // I think we just need to validate that the model exists when we make these + // calls? Maybe it could be an optional helper? + + pub async fn virtual_provisioning_collection_insert_disk( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + ) -> Result, Error> { + self.virtual_provisioning_collection_insert_storage( + opctx, + id, + project_id, + disk_byte_diff, + StorageType::Disk, + ) + .await + } + + pub async fn virtual_provisioning_collection_insert_snapshot( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + ) -> Result, Error> { + self.virtual_provisioning_collection_insert_storage( + opctx, + id, + project_id, + disk_byte_diff, + StorageType::Snapshot, + ) + .await + } + + /// Transitively updates all provisioned disk provisions from project -> fleet. + async fn virtual_provisioning_collection_insert_storage( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + storage_type: StorageType, + ) -> Result, Error> { + let provisions = + VirtualProvisioningCollectionUpdate::new_insert_storage( + id, + disk_byte_diff, + project_id, + storage_type, + ) + .get_results_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool(e, ErrorHandler::Server) + })?; + self.virtual_provisioning_collection_producer + .append_disk_metrics(&provisions); + Ok(provisions) + } + + pub async fn virtual_provisioning_collection_delete_disk( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + ) -> Result, Error> { + self.virtual_provisioning_collection_delete_storage( + opctx, + id, + project_id, + disk_byte_diff, + ) + .await + } + + pub async fn virtual_provisioning_collection_delete_snapshot( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + ) -> Result, Error> { + self.virtual_provisioning_collection_delete_storage( + opctx, + id, + project_id, + disk_byte_diff, + ) + .await + } + + // Transitively updates all provisioned disk provisions from project -> fleet. + async fn virtual_provisioning_collection_delete_storage( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + disk_byte_diff: ByteCount, + ) -> Result, Error> { + let provisions = + VirtualProvisioningCollectionUpdate::new_delete_storage( + id, + disk_byte_diff, + project_id, + ) + .get_results_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool(e, ErrorHandler::Server) + })?; + self.virtual_provisioning_collection_producer + .append_disk_metrics(&provisions); + Ok(provisions) + } + + /// Transitively updates all CPU/RAM provisions from project -> fleet. + pub async fn virtual_provisioning_collection_insert_instance( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + cpus_diff: i64, + ram_diff: ByteCount, + ) -> Result, Error> { + let provisions = + VirtualProvisioningCollectionUpdate::new_insert_instance( + id, cpus_diff, ram_diff, project_id, + ) + .get_results_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool(e, ErrorHandler::Server) + })?; + self.virtual_provisioning_collection_producer + .append_cpu_metrics(&provisions); + Ok(provisions) + } + + /// Transitively updates all CPU/RAM provisions from project -> fleet. + pub async fn virtual_provisioning_collection_delete_instance( + &self, + opctx: &OpContext, + id: Uuid, + project_id: Uuid, + cpus_diff: i64, + ram_diff: ByteCount, + ) -> Result, Error> { + let provisions = + VirtualProvisioningCollectionUpdate::new_delete_instance( + id, cpus_diff, ram_diff, project_id, + ) + .get_results_async(self.pool_authorized(opctx).await?) + .await + .map_err(|e| { + public_error_from_diesel_pool(e, ErrorHandler::Server) + })?; + self.virtual_provisioning_collection_producer + .append_cpu_metrics(&provisions); + Ok(provisions) + } +} diff --git a/nexus/src/db/fixed_data/silo.rs b/nexus/src/db/fixed_data/silo.rs index 3c910177291..dea6477a1be 100644 --- a/nexus/src/db/fixed_data/silo.rs +++ b/nexus/src/db/fixed_data/silo.rs @@ -21,6 +21,6 @@ lazy_static! { discoverable: false, identity_mode: shared::SiloIdentityMode::LocalOnly, admin_group_name: None, - } + }, ); } diff --git a/nexus/src/db/mod.rs b/nexus/src/db/mod.rs index 26eee7eec0b..d76d5dac3d7 100644 --- a/nexus/src/db/mod.rs +++ b/nexus/src/db/mod.rs @@ -37,9 +37,13 @@ pub use nexus_db_model as model; use nexus_db_model::saga_types; pub use nexus_db_model::schema; +#[cfg(test)] +pub use crate::db::error::TransactionError; pub use config::Config; pub use datastore::DataStore; pub use pool::Pool; +#[cfg(test)] +pub use pool::ALLOW_FULL_TABLE_SCAN_SQL; pub use saga_recovery::{recover, RecoveryTask}; pub use saga_types::SecId; pub use sec_store::CockroachDbSecStore; diff --git a/nexus/src/db/pool.rs b/nexus/src/db/pool.rs index 335b5bafb73..8132ecee29f 100644 --- a/nexus/src/db/pool.rs +++ b/nexus/src/db/pool.rs @@ -53,6 +53,7 @@ impl Pool { Pool { pool } } + #[cfg(test)] pub fn new_failfast(db_config: &DbConfig) -> Self { let manager = ConnectionManager::::new(&db_config.url.url()); @@ -69,6 +70,25 @@ impl Pool { } } +const DISALLOW_FULL_TABLE_SCAN_SQL: &str = + "set disallow_full_table_scans = on; set large_full_scan_rows = 0;"; + +/// SQL used to enable full table scans for the duration of the current +/// transaction. +/// +/// We normally disallow table scans in effort to identify scalability issues +/// during development. But it's preferable for some ad hoc test-only queries to +/// do table scans (rather than add indexes that are only used for the test +/// suite). +/// +/// This SQL appears to have no effect when used outside of a transaction. +/// That's intentional. We do not want to use `SET` (rather than `SET LOCAL`) +/// here because that would change the behavior for any code that happens to use +/// the same pooled connection after this SQL gets run. +#[cfg(test)] +pub const ALLOW_FULL_TABLE_SCAN_SQL: &str = + "set local disallow_full_table_scans = off; set local large_full_scan_rows = 1000;"; + #[derive(Debug)] struct DisallowFullTableScans {} #[async_trait] @@ -79,10 +99,6 @@ impl CustomizeConnection, ConnectionError> &self, conn: &mut Connection, ) -> Result<(), ConnectionError> { - conn.batch_execute_async( - "set disallow_full_table_scans = on;\ - set large_full_scan_rows = 0;", - ) - .await + conn.batch_execute_async(DISALLOW_FULL_TABLE_SCAN_SQL).await } } diff --git a/nexus/src/db/queries/mod.rs b/nexus/src/db/queries/mod.rs index 077d542dbbb..d413dce6aad 100644 --- a/nexus/src/db/queries/mod.rs +++ b/nexus/src/db/queries/mod.rs @@ -11,5 +11,6 @@ pub mod ip_pool; mod next_item; pub mod network_interface; pub mod region_allocation; +pub mod virtual_provisioning_collection_update; pub mod vpc; pub mod vpc_subnet; diff --git a/nexus/src/db/queries/virtual_provisioning_collection_update.rs b/nexus/src/db/queries/virtual_provisioning_collection_update.rs new file mode 100644 index 00000000000..cd9c5fd4d63 --- /dev/null +++ b/nexus/src/db/queries/virtual_provisioning_collection_update.rs @@ -0,0 +1,456 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Implementation of queries for updating resource provisioning info. + +use crate::db::alias::ExpressionAlias; +use crate::db::model::ByteCount; +use crate::db::model::ResourceTypeProvisioned; +use crate::db::model::VirtualProvisioningCollection; +use crate::db::model::VirtualProvisioningResource; +use crate::db::pool::DbConnection; +use crate::db::schema::virtual_provisioning_collection; +use crate::db::schema::virtual_provisioning_resource; +use crate::db::subquery::{AsQuerySource, Cte, CteBuilder, CteQuery}; +use db_macros::Subquery; +use diesel::pg::Pg; +use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; +use diesel::{ + sql_types, CombineDsl, ExpressionMethods, IntoSql, + NullableExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper, +}; +use nexus_db_model::queries::virtual_provisioning_collection_update::{ + all_collections, do_update, parent_org, parent_silo, +}; + +#[derive(Subquery, QueryId)] +#[subquery(name = parent_org)] +struct ParentOrg { + query: Box>, +} + +impl ParentOrg { + fn new(project_id: uuid::Uuid) -> Self { + use crate::db::schema::project::dsl; + Self { + query: Box::new( + dsl::project.filter(dsl::id.eq(project_id)).select(( + ExpressionAlias::new::( + dsl::organization_id, + ), + )), + ), + } + } +} + +#[derive(Subquery, QueryId)] +#[subquery(name = parent_silo)] +struct ParentSilo { + query: Box>, +} + +impl ParentSilo { + fn new(parent_org: &ParentOrg) -> Self { + use crate::db::schema::organization::dsl; + Self { + query: Box::new( + dsl::organization + .filter(dsl::id.eq_any( + parent_org.query_source().select(parent_org::id), + )) + .select((ExpressionAlias::new::( + dsl::silo_id, + ),)), + ), + } + } +} + +#[derive(Subquery, QueryId)] +#[subquery(name = all_collections)] +struct AllCollections { + query: Box>, +} + +impl AllCollections { + fn new( + project_id: uuid::Uuid, + parent_org: &ParentOrg, + parent_silo: &ParentSilo, + fleet_id: uuid::Uuid, + ) -> Self { + let project_id = project_id.into_sql::(); + let fleet_id = fleet_id.into_sql::(); + Self { + query: Box::new( + diesel::select((ExpressionAlias::new::< + all_collections::dsl::id, + >(project_id),)) + .union(parent_org.query_source().select(( + ExpressionAlias::new::( + parent_org::id, + ), + ))) + .union(parent_silo.query_source().select(( + ExpressionAlias::new::( + parent_silo::id, + ), + ))) + .union(diesel::select((ExpressionAlias::new::< + all_collections::dsl::id, + >(fleet_id),))), + ), + } + } +} + +#[derive(Subquery, QueryId)] +#[subquery(name = do_update)] +struct DoUpdate { + query: Box>, +} + +impl DoUpdate { + fn new_for_insert(id: uuid::Uuid) -> Self { + use virtual_provisioning_resource::dsl; + + let not_allocted = dsl::virtual_provisioning_resource + .find(id) + .count() + .single_value() + .assume_not_null() + .eq(0); + + Self { + query: Box::new(diesel::select((ExpressionAlias::new::< + do_update::update, + >(not_allocted),))), + } + } + + fn new_for_delete(id: uuid::Uuid) -> Self { + use virtual_provisioning_resource::dsl; + + let already_allocated = dsl::virtual_provisioning_resource + .find(id) + .count() + .single_value() + .assume_not_null() + .eq(1); + + Self { + query: Box::new(diesel::select((ExpressionAlias::new::< + do_update::update, + >(already_allocated),))), + } + } +} + +#[derive(Subquery, QueryId)] +#[subquery(name = virtual_provisioning_collection)] +struct UpdatedProvisions { + query: + Box>, +} + +impl UpdatedProvisions { + fn new( + all_collections: &AllCollections, + do_update: &DoUpdate, + values: V, + ) -> Self + where + V: diesel::AsChangeset, + ::Changeset: + QueryFragment + Send + 'static, + { + use virtual_provisioning_collection::dsl; + + Self { + query: Box::new( + diesel::update(dsl::virtual_provisioning_collection) + .set(values) + .filter( + dsl::id.eq_any( + all_collections + .query_source() + .select(all_collections::id), + ), + ) + .filter( + do_update + .query_source() + .select(do_update::update) + .single_value() + .assume_not_null(), + ) + .returning(virtual_provisioning_collection::all_columns), + ), + } + } +} + +// This structure wraps a query, such that it can be used within a CTE. +// +// It generates a name that can be used by the "CteBuilder", but does not +// implement "AsQuerySource". This basically means: +// - It can be used to add data-modifying statements to the CTE +// - The result of the query cannot be referenced by subsequent queries +// +// NOTE: The name for each CTE arm should be unique, so this shouldn't be used +// multiple times within a single CTE. This restriction could be removed by +// generating unique identifiers. +struct UnreferenceableSubquery(Q); + +impl QueryFragment for UnreferenceableSubquery +where + Q: QueryFragment + Send + 'static, +{ + fn walk_ast<'a>( + &'a self, + mut out: diesel::query_builder::AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.push_identifier("unused_cte_arm")?; + Ok(()) + } +} + +impl crate::db::subquery::Subquery for UnreferenceableSubquery +where + Q: QueryFragment + Send + 'static, +{ + fn query(&self) -> &dyn QueryFragment { + &self.0 + } +} + +/// Constructs a CTE for updating resource provisioning information in all +/// collections for a particular object. +#[derive(QueryId)] +pub struct VirtualProvisioningCollectionUpdate { + cte: Cte, +} + +impl VirtualProvisioningCollectionUpdate { + // Generic utility for updating all collections including this resource, + // even transitively. + // + // Propagated updates include: + // - Project + // - Organization + // - Silo + // - Fleet + // + // Arguments: + // - do_update: A boolean SQL query to answer the question: "Should this update + // be applied"? This query is necessary for idempotency. + // - update: A SQL query to actually modify the resource record. Generally + // this is an "INSERT", "UPDATE", or "DELETE". + // - project_id: The project to which the resource belongs. + // - values: The updated values to propagate through collections (iff + // "do_update" evaluates to "true"). + fn apply_update( + do_update: DoUpdate, + update: U, + project_id: uuid::Uuid, + values: V, + ) -> Self + where + U: QueryFragment + crate::db::subquery::Subquery + Send + 'static, + V: diesel::AsChangeset, + ::Changeset: + QueryFragment + Send + 'static, + { + let parent_org = ParentOrg::new(project_id); + let parent_silo = ParentSilo::new(&parent_org); + let all_collections = AllCollections::new( + project_id, + &parent_org, + &parent_silo, + *crate::db::fixed_data::FLEET_ID, + ); + let updated_collections = + UpdatedProvisions::new(&all_collections, &do_update, values); + + // TODO: Do we want to select from "all_collections" instead? Seems more + // idempotent; it'll work even when we don't update anything... + let final_select = Box::new( + updated_collections + .query_source() + .select(VirtualProvisioningCollection::as_select()), + ); + + let cte = CteBuilder::new() + .add_subquery(parent_org) + .add_subquery(parent_silo) + .add_subquery(all_collections) + .add_subquery(do_update) + .add_subquery(update) + .add_subquery(updated_collections) + .build(final_select); + + Self { cte } + } + + pub fn new_insert_storage( + id: uuid::Uuid, + disk_byte_diff: ByteCount, + project_id: uuid::Uuid, + storage_type: crate::db::datastore::StorageType, + ) -> Self { + use virtual_provisioning_collection::dsl as collection_dsl; + use virtual_provisioning_resource::dsl as resource_dsl; + + let mut provision = + VirtualProvisioningResource::new(id, storage_type.into()); + provision.virtual_disk_bytes_provisioned = disk_byte_diff; + + Self::apply_update( + // We should insert the record if it does not already exist. + DoUpdate::new_for_insert(id), + // The query to actually insert the record. + UnreferenceableSubquery( + diesel::insert_into( + resource_dsl::virtual_provisioning_resource, + ) + .values(provision) + .on_conflict_do_nothing() + .returning(virtual_provisioning_resource::all_columns), + ), + // Within this project, org, silo, fleet... + project_id, + // ... We add the disk usage. + ( + collection_dsl::time_modified.eq(diesel::dsl::now), + collection_dsl::virtual_disk_bytes_provisioned + .eq(collection_dsl::virtual_disk_bytes_provisioned + + disk_byte_diff), + ), + ) + } + + pub fn new_delete_storage( + id: uuid::Uuid, + disk_byte_diff: ByteCount, + project_id: uuid::Uuid, + ) -> Self { + use virtual_provisioning_collection::dsl as collection_dsl; + use virtual_provisioning_resource::dsl as resource_dsl; + + Self::apply_update( + // We should delete the record if it exists. + DoUpdate::new_for_delete(id), + // The query to actually delete the record. + UnreferenceableSubquery( + diesel::delete(resource_dsl::virtual_provisioning_resource) + .filter(resource_dsl::id.eq(id)) + .returning(virtual_provisioning_resource::all_columns), + ), + // Within this project, org, silo, fleet... + project_id, + // ... We subtract the disk usage. + ( + collection_dsl::time_modified.eq(diesel::dsl::now), + collection_dsl::virtual_disk_bytes_provisioned + .eq(collection_dsl::virtual_disk_bytes_provisioned + - disk_byte_diff), + ), + ) + } + + pub fn new_insert_instance( + id: uuid::Uuid, + cpus_diff: i64, + ram_diff: ByteCount, + project_id: uuid::Uuid, + ) -> Self { + use virtual_provisioning_collection::dsl as collection_dsl; + use virtual_provisioning_resource::dsl as resource_dsl; + + let mut provision = VirtualProvisioningResource::new( + id, + ResourceTypeProvisioned::Instance, + ); + provision.cpus_provisioned = cpus_diff; + provision.ram_provisioned = ram_diff; + + Self::apply_update( + // We should insert the record if it does not already exist. + DoUpdate::new_for_insert(id), + // The query to actually insert the record. + UnreferenceableSubquery( + diesel::insert_into( + resource_dsl::virtual_provisioning_resource, + ) + .values(provision) + .on_conflict_do_nothing() + .returning(virtual_provisioning_resource::all_columns), + ), + // Within this project, org, silo, fleet... + project_id, + // ... We update the resource usage. + ( + collection_dsl::time_modified.eq(diesel::dsl::now), + collection_dsl::cpus_provisioned + .eq(collection_dsl::cpus_provisioned + cpus_diff), + collection_dsl::ram_provisioned + .eq(collection_dsl::ram_provisioned + ram_diff), + ), + ) + } + + pub fn new_delete_instance( + id: uuid::Uuid, + cpus_diff: i64, + ram_diff: ByteCount, + project_id: uuid::Uuid, + ) -> Self { + use virtual_provisioning_collection::dsl as collection_dsl; + use virtual_provisioning_resource::dsl as resource_dsl; + + Self::apply_update( + // We should delete the record if it exists. + DoUpdate::new_for_delete(id), + // The query to actually delete the record. + UnreferenceableSubquery( + diesel::delete(resource_dsl::virtual_provisioning_resource) + .filter(resource_dsl::id.eq(id)) + .returning(virtual_provisioning_resource::all_columns), + ), + // Within this project, org, silo, fleet... + project_id, + // ... We update the resource usage. + ( + collection_dsl::time_modified.eq(diesel::dsl::now), + collection_dsl::cpus_provisioned + .eq(collection_dsl::cpus_provisioned - cpus_diff), + collection_dsl::ram_provisioned + .eq(collection_dsl::ram_provisioned - ram_diff), + ), + ) + } +} + +impl QueryFragment for VirtualProvisioningCollectionUpdate { + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.unsafe_to_cache_prepared(); + + self.cte.walk_ast(out.reborrow())?; + Ok(()) + } +} + +type SelectableSql = < + >::SelectExpression as diesel::Expression +>::SqlType; + +impl Query for VirtualProvisioningCollectionUpdate { + type SqlType = SelectableSql; +} + +impl RunQueryDsl for VirtualProvisioningCollectionUpdate {} diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 71f70d76be3..5da8de7f3b9 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -278,6 +278,7 @@ pub fn external_api() -> NexusApiDescription { api.register(system_image_view_by_id)?; api.register(system_image_delete)?; + api.register(system_metric)?; api.register(updates_refresh)?; api.register(user_list)?; @@ -2564,7 +2565,7 @@ async fn disk_delete_v1( let handler = async { let opctx = OpContext::for_external_api(&rqctx).await?; let disk_lookup = nexus.disk_lookup(&opctx, &disk_selector)?; - nexus.project_delete_disk(&disk_lookup).await?; + nexus.project_delete_disk(&opctx, &disk_lookup).await?; Ok(HttpResponseDeleted()) }; apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await @@ -2592,7 +2593,7 @@ async fn disk_delete( let handler = async { let opctx = OpContext::for_external_api(&rqctx).await?; let disk_lookup = nexus.disk_lookup(&opctx, &disk_selector)?; - nexus.project_delete_disk(&disk_lookup).await?; + nexus.project_delete_disk(&opctx, &disk_lookup).await?; Ok(HttpResponseDeleted()) }; apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await @@ -5342,6 +5343,64 @@ async fn updates_refresh( apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } +// Metrics + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct SystemMetricParams { + #[serde(flatten)] + pub pagination: dropshot::PaginationParams< + params::ResourceMetrics, + params::ResourceMetrics, + >, + + /// The UUID of the container being queried + // TODO: I might want to force the caller to specify type here? + pub id: Uuid, +} + +#[derive(Display, Deserialize, JsonSchema)] +#[display(style = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum SystemMetricName { + VirtualDiskSpaceProvisioned, + CpusProvisioned, + RamProvisioned, +} + +#[derive(Deserialize, JsonSchema)] +struct SystemMetricsPathParam { + metric_name: SystemMetricName, +} + +/// Access metrics data +#[endpoint { + method = GET, + path = "/system/metrics/{metric_name}", + tags = ["system"], +}] +async fn system_metric( + rqctx: Arc>>, + path_params: Path, + query_params: Query, +) -> Result>, HttpError> { + let apictx = rqctx.context(); + let nexus = &apictx.nexus; + let metric_name = path_params.into_inner().metric_name; + + let query = query_params.into_inner(); + let limit = rqctx.page_limit(&query.pagination)?; + + let handler = async { + let opctx = OpContext::for_external_api(&rqctx).await?; + let result = nexus + .system_metric_lookup(&opctx, metric_name, query, limit) + .await?; + + Ok(HttpResponseOk(result)) + }; + apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await +} + // Sagas /// List sagas diff --git a/nexus/src/external_api/tag-config.json b/nexus/src/external_api/tag-config.json index 960fe81c8f8..ce3a069132a 100644 --- a/nexus/src/external_api/tag-config.json +++ b/nexus/src/external_api/tag-config.json @@ -147,4 +147,4 @@ } } } -} \ No newline at end of file +} diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index f5c2cae8e52..8f22df4ca43 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -221,7 +221,8 @@ async fn cpapi_volume_remove_read_only_parent( let path = path_params.into_inner(); let handler = async { - nexus.volume_remove_read_only_parent(path.volume_id).await?; + let opctx = OpContext::for_internal_api(&rqctx).await; + nexus.volume_remove_read_only_parent(&opctx, path.volume_id).await?; Ok(HttpResponseUpdatedNoContent()) }; apictx.internal_latencies.instrument_dropshot_handler(&rqctx, handler).await diff --git a/nexus/src/populate.rs b/nexus/src/populate.rs index ae4cfe423e5..31a1cfcd715 100644 --- a/nexus/src/populate.rs +++ b/nexus/src/populate.rs @@ -266,6 +266,36 @@ impl Populator for PopulateSiloUserRoleAssignments { } } +#[derive(Debug)] +struct PopulateFleet; +impl Populator for PopulateFleet { + fn populate<'a, 'b>( + &self, + opctx: &'a OpContext, + datastore: &'a DataStore, + _args: &'a PopulateArgs, + ) -> BoxFuture<'b, Result<(), Error>> + where + 'a: 'b, + { + async { + let id = *db::fixed_data::FLEET_ID; + datastore + .virtual_provisioning_collection_create( + opctx, + db::model::VirtualProvisioningCollection::new( + id, + db::model::CollectionTypeProvisioned::Fleet, + ), + ) + .await?; + + Ok(()) + } + .boxed() + } +} + #[derive(Debug)] struct PopulateRack; impl Populator for PopulateRack { @@ -320,13 +350,14 @@ impl Populator for PopulateRack { } lazy_static! { - static ref ALL_POPULATORS: [&'static dyn Populator; 7] = [ + static ref ALL_POPULATORS: [&'static dyn Populator; 8] = [ &PopulateBuiltinUsers, &PopulateBuiltinRoles, &PopulateBuiltinRoleAssignments, &PopulateBuiltinSilos, &PopulateSiloUsers, &PopulateSiloUserRoleAssignments, + &PopulateFleet, &PopulateRack, ]; } diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 2ddacc27470..a4a24bd8152 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -4,11 +4,14 @@ //! Tests basic disk support in the API +use super::metrics::{ + query_for_latest_metric, query_for_metrics_until_they_exist, +}; + use chrono::Utc; use crucible_agent_client::types::State as RegionState; use dropshot::test_util::ClientTestContext; use dropshot::HttpErrorResponseBody; -use dropshot::ResultsPage; use http::method::Method; use http::StatusCode; use nexus_test_utils::http_testing::AuthnMode; @@ -31,9 +34,9 @@ use omicron_common::api::external::DiskState; use omicron_common::api::external::IdentityMetadataCreateParams; use omicron_common::api::external::Instance; use omicron_common::api::external::Name; -use omicron_common::backoff; +use omicron_nexus::db::fixed_data::{silo::SILO_ID, FLEET_ID}; use omicron_nexus::TestInterfaces as _; -use omicron_nexus::{external_api::params, Nexus}; +use omicron_nexus::{context::OpContext, external_api::params, Nexus}; use oximeter::types::Datum; use oximeter::types::Measurement; use sled_agent_client::TestInterfaces as _; @@ -45,6 +48,7 @@ type ControlPlaneTestContext = const ORG_NAME: &str = "test-org"; const PROJECT_NAME: &str = "springfield-squidport-disks"; +const PROJECT_NAME_2: &str = "bouncymeadow-octopusharbor-disks"; const DISK_NAME: &str = "just-rainsticks"; const INSTANCE_NAME: &str = "just-rainsticks"; @@ -898,6 +902,237 @@ async fn test_disk_too_big(cptestctx: &ControlPlaneTestContext) { .unwrap(); } +#[nexus_test] +async fn test_disk_virtual_provisioning_collection( + cptestctx: &ControlPlaneTestContext, +) { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let datastore = nexus.datastore(); + + let _test = DiskTest::new(&cptestctx).await; + + populate_ip_pool(&client, "default", None).await; + let org_id = create_organization(&client, ORG_NAME).await.identity.id; + let project_id1 = + create_project(client, ORG_NAME, PROJECT_NAME).await.identity.id; + let project_id2 = + create_project(client, ORG_NAME, PROJECT_NAME_2).await.identity.id; + + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // The project and organization should start as empty. + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id1) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id2) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, org_id) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, *SILO_ID) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, *FLEET_ID) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + + // Ask for a 1 gibibyte disk in the first project. + // + // This disk should appear in the accounting information for the project + // in which it was allocated + let disk_size = ByteCount::from_gibibytes_u32(1); + let disks_url = + format!("/organizations/{}/projects/{}/disks", ORG_NAME, PROJECT_NAME); + let disk_one = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: "disk-one".parse().unwrap(), + description: String::from("sells rainsticks"), + }, + disk_source: params::DiskSource::Blank { + block_size: params::BlockSize::try_from(512).unwrap(), + }, + size: disk_size, + }; + NexusRequest::new( + RequestBuilder::new(client, Method::POST, &disks_url) + .body(Some(&disk_one)) + .expect_status(Some(StatusCode::CREATED)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("unexpected failure creating 1 GiB disk"); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id1) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id2) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, org_id) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, *SILO_ID) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, *FLEET_ID) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + + // Ask for a 1 gibibyte disk in the second project. + // + // Each project should be using "one disk" of real storage, but the org + // should be using both. + let disks_url = format!( + "/organizations/{}/projects/{}/disks", + ORG_NAME, PROJECT_NAME_2 + ); + let disk_one = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: "disk-two".parse().unwrap(), + description: String::from("sells rainsticks"), + }, + disk_source: params::DiskSource::Blank { + block_size: params::BlockSize::try_from(512).unwrap(), + }, + size: disk_size, + }; + NexusRequest::new( + RequestBuilder::new(client, Method::POST, &disks_url) + .body(Some(&disk_one)) + .expect_status(Some(StatusCode::CREATED)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("unexpected failure creating 1 GiB disk"); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id1) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id2) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, org_id) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 2 * disk_size.to_bytes() + ); + + // Delete the disk we just created, observe the utilization drop + // accordingly. + let disk_url = format!("{}/{}", disks_url, "disk-two"); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id1) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id2) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection + .virtual_disk_bytes_provisioned + .to_bytes(), + 0 + ); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, org_id) + .await + .unwrap(); + assert_eq!( + virtual_provisioning_collection.virtual_disk_bytes_provisioned.0, + disk_size, + ); +} + // Test disk size accounting #[nexus_test] async fn test_disk_size_accounting(cptestctx: &ControlPlaneTestContext) { @@ -1148,41 +1383,35 @@ async fn create_instance_with_disk(client: &ClientTestContext) { const ALL_METRICS: [&'static str; 6] = ["activated", "read", "write", "read_bytes", "write_bytes", "flush"]; -async fn query_for_metrics_until_they_exist( - client: &ClientTestContext, - path: &str, -) -> ResultsPage { - backoff::retry_notify( - backoff::retry_policy_local(), - || async { - let measurements: ResultsPage = - objects_list_page_authz(client, path).await; - - if measurements.items.is_empty() { - return Err(backoff::BackoffError::transient("No metrics yet")); - } - Ok(measurements) - }, - |_, _| {}, - ) - .await - .expect("Failed to query for measurements") -} - #[nexus_test] async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { + // Normally, Nexus is not registered as a producer for tests. + // Turn this bit on so we can also test some metrics from Nexus itself. + cptestctx.server.register_as_producer().await; + + let oximeter = &cptestctx.oximeter; let client = &cptestctx.external_client; DiskTest::new(&cptestctx).await; - create_org_and_project(client).await; - create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + let project_id = create_org_and_project(client).await; + let disk = create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + oximeter.force_collect().await; // Whenever we grab this URL, get the surrounding few seconds of metrics. let metric_url = |metric_type: &str| { let disk_url = format!("/organizations/{ORG_NAME}/projects/{PROJECT_NAME}/disks/{DISK_NAME}"); format!( "{disk_url}/metrics/{metric_type}?start_time={:?}&end_time={:?}", - Utc::now() - chrono::Duration::seconds(2), - Utc::now() + chrono::Duration::seconds(2), + Utc::now() - chrono::Duration::seconds(10), + Utc::now() + chrono::Duration::seconds(10), + ) + }; + // Check the utilization info for the whole project too. + let utilization_url = |id: Uuid| { + format!( + "/system/metrics/virtual_disk_space_provisioned?start_time={:?}&end_time={:?}&id={:?}", + Utc::now() - chrono::Duration::seconds(10), + Utc::now() + chrono::Duration::seconds(10), + id, ) }; @@ -1190,12 +1419,19 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { // // Observe that no metrics exist yet; no "upstairs" should have been // instantiated on a sled. - let measurements: ResultsPage = - objects_list_page_authz(client, &metric_url("read")).await; + let measurements = + objects_list_page_authz::(client, &metric_url("read")) + .await; assert!(measurements.items.is_empty()); + assert_eq!( + query_for_latest_metric(client, &utilization_url(project_id)).await, + i64::from(disk.size) + ); + // Create an instance, attach the disk to it. create_instance_with_disk(client).await; + oximeter.force_collect().await; for metric in &ALL_METRICS { let measurements = @@ -1211,10 +1447,20 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { assert!(cumulative.start_time() <= item.timestamp()); } } + + // Check the utilization info for the whole project too. + assert_eq!( + query_for_latest_metric(client, &utilization_url(project_id)).await, + i64::from(disk.size) + ); } #[nexus_test] async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { + // Normally, Nexus is not registered as a producer for tests. + // Turn this bit on so we can also test some metrics from Nexus itself. + cptestctx.server.register_as_producer().await; + let client = &cptestctx.external_client; DiskTest::new(&cptestctx).await; create_org_and_project(client).await; @@ -1224,6 +1470,8 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { "/organizations/{ORG_NAME}/projects/{PROJECT_NAME}/disks/{DISK_NAME}" ); + let oximeter = &cptestctx.oximeter; + oximeter.force_collect().await; for metric in &ALL_METRICS { let collection_url = format!("{}/metrics/{metric}", disk_url); let initial_params = format!( @@ -1232,7 +1480,7 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { Utc::now() + chrono::Duration::seconds(2), ); - query_for_metrics_until_they_exist( + objects_list_page_authz::( client, &format!("{collection_url}?{initial_params}"), ) diff --git a/nexus/tests/integration_tests/endpoints.rs b/nexus/tests/integration_tests/endpoints.rs index fa83cd73dbb..ad58959aa09 100644 --- a/nexus/tests/integration_tests/endpoints.rs +++ b/nexus/tests/integration_tests/endpoints.rs @@ -423,6 +423,14 @@ lazy_static! { group_attribute_name: None, }; + pub static ref DEMO_SYSTEM_METRICS_URL: String = + format!( + "/system/metrics/virtual_disk_space_provisioned?start_time={:?}&end_time={:?}&id={}", + Utc::now(), + Utc::now(), + "3aaf22ae-5691-4f6d-b62c-aa532512fa78", + ); + // Users pub static ref DEMO_USER_CREATE: params::UserCreate = params::UserCreate { external_id: params::UserId::from_str("dummy-user").unwrap(), @@ -1462,6 +1470,17 @@ lazy_static! { )], }, + /* Metrics */ + + VerifyEndpoint { + url: &*DEMO_SYSTEM_METRICS_URL, + visibility: Visibility::Public, + unprivileged_access: UnprivilegedAccess::None, + allowed_methods: vec![ + AllowedMethod::Get, + ], + }, + /* Global Images */ VerifyEndpoint { diff --git a/nexus/tests/integration_tests/instances.rs b/nexus/tests/integration_tests/instances.rs index 7f3ccf8e8df..08ceea19113 100644 --- a/nexus/tests/integration_tests/instances.rs +++ b/nexus/tests/integration_tests/instances.rs @@ -4,6 +4,9 @@ //! Tests basic instance support in the API +use super::metrics::query_for_latest_metric; + +use chrono::Utc; use http::method::Method; use http::StatusCode; use nexus_test_utils::http_testing::AuthnMode; @@ -30,6 +33,7 @@ use omicron_common::api::external::Ipv4Net; use omicron_common::api::external::Name; use omicron_common::api::external::NetworkInterface; use omicron_nexus::authz::SiloRole; +use omicron_nexus::context::OpContext; use omicron_nexus::external_api::shared::IpKind; use omicron_nexus::external_api::shared::IpRange; use omicron_nexus::external_api::shared::Ipv4Range; @@ -513,6 +517,178 @@ async fn test_instances_create_reboot_halt( .unwrap(); } +#[nexus_test] +async fn test_instance_metrics(cptestctx: &ControlPlaneTestContext) { + // Normally, Nexus is not registered as a producer for tests. + // Turn this bit on so we can also test some metrics from Nexus itself. + cptestctx.server.register_as_producer().await; + + let client = &cptestctx.external_client; + let oximeter = &cptestctx.oximeter; + let apictx = &cptestctx.server.apictx; + let nexus = &apictx.nexus; + let datastore = nexus.datastore(); + + // Create an IP pool and project that we'll use for testing. + populate_ip_pool(&client, "default", None).await; + let organization_id = + create_organization(&client, ORGANIZATION_NAME).await.identity.id; + let url_instances = format!( + "/organizations/{}/projects/{}/instances", + ORGANIZATION_NAME, PROJECT_NAME + ); + let project_id = create_project(&client, ORGANIZATION_NAME, PROJECT_NAME) + .await + .identity + .id; + + // Query the view of these metrics stored within CRDB + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(virtual_provisioning_collection.cpus_provisioned, 0); + assert_eq!(virtual_provisioning_collection.ram_provisioned.to_bytes(), 0); + + // Query the view of these metrics stored within Clickhouse + let metric_url = |metric_type: &str, id: Uuid| { + format!( + "/system/metrics/{metric_type}?start_time={:?}&end_time={:?}&id={id}", + Utc::now() - chrono::Duration::seconds(30), + Utc::now() + chrono::Duration::seconds(30), + ) + }; + oximeter.force_collect().await; + for id in vec![organization_id, project_id] { + assert_eq!( + query_for_latest_metric( + client, + &metric_url("virtual_disk_space_provisioned", id), + ) + .await, + 0 + ); + assert_eq!( + query_for_latest_metric( + client, + &metric_url("cpus_provisioned", id), + ) + .await, + 0 + ); + assert_eq!(query_for_latest_metric( + client, + &metric_url("ram_provisioned", id), + ) + .await, 0); + } + + // Create an instance. + let instance_name = "just-rainsticks"; + let instance_url = format!("{url_instances}/{instance_name}"); + create_instance(client, ORGANIZATION_NAME, PROJECT_NAME, instance_name) + .await; + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(virtual_provisioning_collection.cpus_provisioned, 4); + assert_eq!( + virtual_provisioning_collection.ram_provisioned.0, + ByteCount::from_gibibytes_u32(1), + ); + + // Stop the instance + let instance = + instance_post(&client, instance_name, InstanceOp::Stop).await; + instance_simulate(nexus, &instance.identity.id).await; + let instance = instance_get(&client, &instance_url).await; + assert_eq!(instance.runtime.run_state, InstanceState::Stopped); + // NOTE: I think it's arguably "more correct" to identify that the + // number of CPUs being used by guests at this point is actually "0", + // not "4", because the instance is stopped (same re: RAM usage). + // + // However, for implementation reasons, this is complicated (we have a + // tendency to update the runtime without checking the prior state, which + // makes edge-triggered behavior trickier to notice). + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + let expected_cpus = 4; + let expected_ram = + i64::try_from(ByteCount::from_gibibytes_u32(1).to_bytes()).unwrap(); + assert_eq!(virtual_provisioning_collection.cpus_provisioned, expected_cpus); + assert_eq!( + i64::from(virtual_provisioning_collection.ram_provisioned.0), + expected_ram + ); + oximeter.force_collect().await; + for id in vec![organization_id, project_id] { + assert_eq!( + query_for_latest_metric( + client, + &metric_url("virtual_disk_space_provisioned", id), + ) + .await, + 0 + ); + assert_eq!( + query_for_latest_metric( + client, + &metric_url("cpus_provisioned", id), + ) + .await, + expected_cpus + ); + assert_eq!(query_for_latest_metric( + client, + &metric_url("ram_provisioned", id), + ) + .await, expected_ram); + } + + // Stop the instance + NexusRequest::object_delete(client, &instance_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .unwrap(); + + let virtual_provisioning_collection = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(virtual_provisioning_collection.cpus_provisioned, 0); + assert_eq!(virtual_provisioning_collection.ram_provisioned.to_bytes(), 0); + oximeter.force_collect().await; + for id in vec![organization_id, project_id] { + assert_eq!( + query_for_latest_metric( + client, + &metric_url("virtual_disk_space_provisioned", id), + ) + .await, + 0 + ); + assert_eq!( + query_for_latest_metric( + client, + &metric_url("cpus_provisioned", id), + ) + .await, + 0 + ); + assert_eq!(query_for_latest_metric( + client, + &metric_url("ram_provisioned", id), + ) + .await, 0); + } +} + #[nexus_test] async fn test_instances_create_stopped_start( cptestctx: &ControlPlaneTestContext, @@ -580,13 +756,9 @@ async fn test_instances_delete_fails_when_running_succeeds_when_stopped( // Create an instance. let instance_url = get_instance_url(instance_name); - let instance = create_instance( - client, - ORGANIZATION_NAME, - PROJECT_NAME, - "just-rainsticks", - ) - .await; + let instance = + create_instance(client, ORGANIZATION_NAME, PROJECT_NAME, instance_name) + .await; // Simulate the instance booting. instance_simulate(nexus, &instance.identity.id).await; diff --git a/nexus/tests/integration_tests/metrics.rs b/nexus/tests/integration_tests/metrics.rs new file mode 100644 index 00000000000..2a39892e05a --- /dev/null +++ b/nexus/tests/integration_tests/metrics.rs @@ -0,0 +1,39 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use dropshot::test_util::ClientTestContext; +use dropshot::ResultsPage; +use nexus_test_utils::resource_helpers::objects_list_page_authz; +use oximeter::types::Datum; +use oximeter::types::Measurement; + +pub async fn query_for_metrics_until_they_exist( + client: &ClientTestContext, + path: &str, +) -> ResultsPage { + loop { + let measurements: ResultsPage = + objects_list_page_authz(client, path).await; + + if !measurements.items.is_empty() { + return measurements; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } +} + +pub async fn query_for_latest_metric( + client: &ClientTestContext, + path: &str, +) -> i64 { + let measurements: ResultsPage = + objects_list_page_authz(client, path).await; + + let item = &measurements.items[measurements.items.len() - 1]; + let datum = match item.datum() { + Datum::I64(c) => c, + _ => panic!("Unexpected datum type {:?}", item.datum()), + }; + return *datum; +} diff --git a/nexus/tests/integration_tests/mod.rs b/nexus/tests/integration_tests/mod.rs index 454114d53e0..0533841b305 100644 --- a/nexus/tests/integration_tests/mod.rs +++ b/nexus/tests/integration_tests/mod.rs @@ -13,6 +13,7 @@ mod disks; mod images; mod instances; mod ip_pools; +mod metrics; mod organizations; mod oximeter; mod password_login; diff --git a/nexus/tests/integration_tests/snapshots.rs b/nexus/tests/integration_tests/snapshots.rs index 4495191a15a..fcfe7118386 100644 --- a/nexus/tests/integration_tests/snapshots.rs +++ b/nexus/tests/integration_tests/snapshots.rs @@ -288,9 +288,11 @@ async fn test_snapshot_without_instance(cptestctx: &ControlPlaneTestContext) { #[nexus_test] async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { let client = &cptestctx.external_client; + let nexus = &cptestctx.server.apictx.nexus; + let datastore = nexus.datastore(); DiskTest::new(&cptestctx).await; populate_ip_pool(&client, "default", None).await; - create_org_and_project(client).await; + let project_id = create_org_and_project(client).await; let disks_url = get_disks_url(); // Create a blank disk @@ -319,6 +321,14 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { .parsed_body() .unwrap(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(provision.virtual_disk_bytes_provisioned.0, disk_size); + // Issue snapshot request let snapshots_url = format!( "/organizations/{}/projects/{}/snapshots", @@ -333,13 +343,21 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { name: "not-attached".parse().unwrap(), description: "not attached to instance".into(), }, - disk: base_disk_name, + disk: base_disk_name.clone(), }, ) .await; assert_eq!(snapshot.disk_id, base_disk.identity.id); assert_eq!(snapshot.size, base_disk.size); + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!( + provision.virtual_disk_bytes_provisioned.to_bytes(), + 2 * disk_size.to_bytes() + ); // Create a disk from this snapshot let disk_size = ByteCount::from_gibibytes_u32(2); @@ -367,6 +385,15 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { .parsed_body() .unwrap(); + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!( + provision.virtual_disk_bytes_provisioned.to_bytes(), + 3 * disk_size.to_bytes() + ); + // Delete snapshot let snapshot_url = format!( "/organizations/{}/projects/{}/snapshots/not-attached", @@ -381,6 +408,41 @@ async fn test_delete_snapshot(cptestctx: &ControlPlaneTestContext) { .execute() .await .unwrap(); + + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!( + provision.virtual_disk_bytes_provisioned.to_bytes(), + 2 * disk_size.to_bytes() + ); + + // Delete the disk using the snapshot + let disk_url = format!("{}/{}", disks_url, snap_disk_name); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(provision.virtual_disk_bytes_provisioned.0, disk_size); + + // Delete the original base disk + let disk_url = format!("{}/{}", disks_url, base_disk_name); + NexusRequest::object_delete(client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk"); + let provision = datastore + .virtual_provisioning_collection_get(&opctx, project_id) + .await + .unwrap(); + assert_eq!(provision.virtual_disk_bytes_provisioned.to_bytes(), 0); } // Test the various ways Nexus can reject a disk created from a snapshot diff --git a/nexus/tests/integration_tests/unauthorized_coverage.rs b/nexus/tests/integration_tests/unauthorized_coverage.rs index 856dafe394d..68c6465d44c 100644 --- a/nexus/tests/integration_tests/unauthorized_coverage.rs +++ b/nexus/tests/integration_tests/unauthorized_coverage.rs @@ -85,8 +85,9 @@ fn test_unauthorized_coverage() { // a VerifyEndpoint for it. let method_string = m.http_method().to_string().to_uppercase(); let found = spec_operations.iter().find(|(op, regex)| { - op.method.to_uppercase() == method_string - && regex.is_match(v.url.split('?').next().unwrap_or(v.url)) + // Strip query parameters, if they exist. + let url = v.url.split('?').next().unwrap(); + op.method.to_uppercase() == method_string && regex.is_match(url) }); if let Some((op, _)) = found { println!( diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 8c1b793a07f..1df1bac6306 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -185,6 +185,7 @@ system_image_delete /system/images/{image_name} system_image_list /system/images system_image_view /system/images/{image_name} system_image_view_by_id /system/by-id/images/{id} +system_metric /system/metrics/{metric_name} system_user_list /system/user system_user_view /system/user/{user_name} updates_refresh /system/updates/refresh diff --git a/openapi/nexus.json b/openapi/nexus.json index 8446c1cead4..766fef6d56f 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -6295,6 +6295,92 @@ } } }, + "/system/metrics/{metric_name}": { + "get": { + "tags": [ + "system" + ], + "summary": "Access metrics data", + "operationId": "system_metric", + "parameters": [ + { + "in": "path", + "name": "metric_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/SystemMetricName" + } + }, + { + "in": "query", + "name": "end_time", + "description": "An exclusive end time of metrics.", + "schema": { + "type": "string", + "format": "date-time" + } + }, + { + "in": "query", + "name": "id", + "description": "The UUID of the container being queried", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + }, + { + "in": "query", + "name": "start_time", + "description": "An inclusive start time of metrics.", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MeasurementResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": true + } + }, "/system/policy": { "get": { "tags": [ @@ -14117,6 +14203,14 @@ "write", "write_bytes" ] + }, + "SystemMetricName": { + "type": "string", + "enum": [ + "virtual_disk_space_provisioned", + "cpus_provisioned", + "ram_provisioned" + ] } } }, diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index a6f3b551b54..727b81b1e0d 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -51,7 +51,6 @@ pub enum Error { #[derive(Debug, Clone)] enum CollectionMessage { // Explicit request that the task collect data from its producer - #[allow(dead_code)] Collect, // Request that the task update its interval and the socket address on which it collects data // from its producer. @@ -61,6 +60,59 @@ enum CollectionMessage { Shutdown, } +async fn perform_collection( + log: &Logger, + client: &reqwest::Client, + producer: &ProducerEndpoint, + outbox: &mpsc::Sender, +) { + info!(log, "collecting from producer"); + let res = client + .get(format!( + "http://{}{}", + producer.address, + producer.collection_route() + )) + .send() + .await; + match res { + Ok(res) => { + if res.status().is_success() { + match res.json::().await { + Ok(results) => { + debug!( + log, + "collected {} total results", + results.len(); + ); + outbox.send(results).await.unwrap(); + } + Err(e) => { + warn!( + log, + "failed to collect results from producer: {}", + e.to_string(); + ); + } + } + } else { + warn!( + log, + "failed to receive metric results from producer"; + "status_code" => res.status().as_u16(), + ); + } + } + Err(e) => { + warn!( + log, + "failed to send collection request to producer: {}", + e.to_string(); + ); + } + } +} + // Background task used to collect metrics from one producer on an interval. // // This function is started by the `OximeterAgent`, when a producer is registered. The task loops @@ -81,6 +133,7 @@ async fn collection_task( "starting oximeter collection task"; "interval" => ?producer.interval, ); + loop { tokio::select! { message = inbox.recv() => { @@ -91,9 +144,11 @@ async fn collection_task( } Some(CollectionMessage::Shutdown) => { debug!(log, "collection task received shutdown request"); + return; }, Some(CollectionMessage::Collect) => { - debug!(log, "collection task received request to collect"); + debug!(log, "collection task received explicit request to collect"); + perform_collection(&log, &client, &producer, &outbox).await; }, Some(CollectionMessage::Update(new_info)) => { producer = new_info; @@ -109,46 +164,7 @@ async fn collection_task( } } _ = collection_timer.tick() => { - info!(log, "collecting from producer"); - let res = client.get(format!("http://{}{}", producer.address, producer.collection_route())) - .send() - .await; - match res { - Ok(res) => { - if res.status().is_success() { - match res.json::().await { - Ok(results) => { - debug!( - log, - "collected {} total results", - results.len(); - ); - outbox.send(results).await.unwrap(); - }, - Err(e) => { - warn!( - log, - "failed to collect results from producer: {}", - e.to_string(); - ); - } - } - } else { - warn!( - log, - "failed to receive metric results from producer"; - "status_code" => res.status().as_u16(), - ); - } - }, - Err(e) => { - warn!( - log, - "failed to send collection request to producer: {}", - e.to_string(); - ); - } - } + perform_collection(&log, &client, &producer, &outbox).await; } } } @@ -351,6 +367,13 @@ impl OximeterAgent { } Ok(()) } + + pub async fn force_collection(&self) { + let collection_tasks = self.collection_tasks.lock().await; + for task in collection_tasks.iter() { + task.1.inbox.send(CollectionMessage::Collect).await.unwrap(); + } + } } /// Configuration used to initialize an oximeter server @@ -523,6 +546,14 @@ impl Oximeter { pub async fn close(self) -> Result<(), Error> { self.server.close().await.map_err(Error::Server) } + + /// Forces Oximeter to perform a collection immediately. + /// + /// This is particularly useful during tests, which would prefer to + /// avoid waiting until a collection interval completes. + pub async fn force_collect(&self) { + self.server.app_private().force_collection().await + } } // Build the HTTP API internal to the control plane diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 78e769bc476..dde3ed4f007 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -661,7 +661,7 @@ where { fn from(sample: DbTimeseriesScalarGaugeSample) -> Measurement { let datum = Datum::from(sample.datum); - Measurement::with_timestamp(sample.timestamp, datum) + Measurement::new(sample.timestamp, datum) } } @@ -674,7 +674,7 @@ where let cumulative = Cumulative::with_start_time(sample.start_time, sample.datum); let datum = Datum::from(cumulative); - Measurement::with_timestamp(sample.timestamp, datum) + Measurement::new(sample.timestamp, datum) } } @@ -692,7 +692,7 @@ where ) .unwrap(), ); - Measurement::with_timestamp(sample.timestamp, datum) + Measurement::new(sample.timestamp, datum) } } diff --git a/oximeter/oximeter-macro-impl/src/lib.rs b/oximeter/oximeter-macro-impl/src/lib.rs index 604e8b234a7..f7bb407ee32 100644 --- a/oximeter/oximeter-macro-impl/src/lib.rs +++ b/oximeter/oximeter-macro-impl/src/lib.rs @@ -207,8 +207,11 @@ fn build_metric_trait_impl( &mut self.#datum_field_ident } - fn measure(&self) -> ::oximeter::Measurement { - ::oximeter::Measurement::new(::oximeter::Datum::from(&self.#datum_field_ident)) + fn measure(&self, timestamp: ::chrono::DateTime) -> ::oximeter::Measurement { + ::oximeter::Measurement::new( + timestamp, + ::oximeter::Datum::from(&self.#datum_field_ident) + ) } fn start_time(&self) -> Option<::chrono::DateTime<::chrono::Utc>> { diff --git a/oximeter/oximeter/src/traits.rs b/oximeter/oximeter/src/traits.rs index b9b70f240df..cb6a7e8b935 100644 --- a/oximeter/oximeter/src/traits.rs +++ b/oximeter/oximeter/src/traits.rs @@ -107,6 +107,7 @@ pub trait Target { /// Example /// ------- /// ```rust +/// use chrono::Utc; /// use oximeter::Metric; /// /// // A gauge with a floating-point value. @@ -118,7 +119,7 @@ pub trait Target { /// /// let met = MyMetric { name: "name".into(), datum: 0.0 }; /// assert_eq!(met.datum_type(), oximeter::DatumType::F64); -/// let measurement = met.measure(); +/// let measurement = met.measure(Utc::now()); /// assert!(measurement.start_time().is_none()); /// assert_eq!(measurement.datum(), &oximeter::Datum::F64(0.0)); /// ``` @@ -166,8 +167,8 @@ pub trait Metric { /// Return a mutable reference to the underlying metric itself. fn datum_mut(&mut self) -> &mut Self::Datum; - /// Sample the underlying metric, returning a measurement from it. - fn measure(&self) -> Measurement; + /// Sample the underlying metric, with a caller-supplied timestamp. + fn measure(&self, timestamp: DateTime) -> Measurement; /// Return true if the metric is cumulative, else false. fn is_cumulative(&self) -> bool { diff --git a/oximeter/oximeter/src/types.rs b/oximeter/oximeter/src/types.rs index 0a25c2ddf75..a7d1c273da8 100644 --- a/oximeter/oximeter/src/types.rs +++ b/oximeter/oximeter/src/types.rs @@ -355,13 +355,8 @@ impl PartialEq<&Measurement> for Measurement { impl Measurement { /// Construct a `Measurement` with the given timestamp. - pub fn with_timestamp(timestamp: DateTime, datum: Datum) -> Self { - Self { timestamp, datum } - } - - /// Generate a new measurement from a `Datum`, using the current time as the timestamp - pub fn new>(datum: D) -> Measurement { - Measurement { timestamp: Utc::now(), datum: datum.into() } + pub fn new>(timestamp: DateTime, datum: D) -> Self { + Self { timestamp, datum: datum.into() } } /// Return the datum for this measurement @@ -531,11 +526,15 @@ impl PartialEq for Sample { } impl Sample { - /// Construct a new sample. + /// Construct a new sample, recorded at the time of the supplied timestamp. /// /// This materializes the data from the target and metric, and stores that information along /// with the measurement data itself. - pub fn new(target: &T, metric: &M) -> Self + pub fn new_with_timestamp( + timestamp: DateTime, + target: &T, + metric: &M, + ) -> Self where T: traits::Target, M: traits::Metric, @@ -544,10 +543,22 @@ impl Sample { timeseries_name: format!("{}:{}", target.name(), metric.name()), target: FieldSet::from_target(target), metric: FieldSet::from_metric(metric), - measurement: metric.measure(), + measurement: metric.measure(timestamp), } } + /// Construct a new sample, created at the time the function is called. + /// + /// This materializes the data from the target and metric, and stores that information along + /// with the measurement data itself. + pub fn new(target: &T, metric: &M) -> Self + where + T: traits::Target, + M: traits::Metric, + { + Self::new_with_timestamp(Utc::now(), target, metric) + } + /// Return the fields for this sample. /// /// This returns the target fields and metric fields, chained, although there is no distinction @@ -709,12 +720,12 @@ mod tests { #[test] fn test_measurement() { - let measurement = Measurement::new(0i64); + let measurement = Measurement::new(chrono::Utc::now(), 0i64); assert_eq!(measurement.datum_type(), DatumType::I64); assert_eq!(measurement.start_time(), None); let datum = Cumulative::new(0i64); - let measurement = Measurement::new(datum.clone()); + let measurement = Measurement::new(chrono::Utc::now(), datum.clone()); assert_eq!(measurement.datum(), &Datum::from(datum)); assert!(measurement.start_time().is_some()); assert!(measurement.timestamp() >= measurement.start_time().unwrap());