From 9d26f936977bbc39cd440d120b9a63d05258d1cf Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Sat, 2 Jul 2022 03:49:36 +0000 Subject: [PATCH 01/11] add disk metrics endpoint --- nexus/src/app/oximeter.rs | 89 ++- nexus/src/external_api/http_entrypoints.rs | 79 ++- nexus/src/external_api/params.rs | 14 + nexus/src/external_api/views.rs | 2 + nexus/tests/integration_tests/endpoints.rs | 16 + nexus/tests/output/nexus_tags.txt | 1 + openapi/nexus.json | 605 +++++++++++++++++++++ oximeter/db/src/client.rs | 14 +- 8 files changed, 800 insertions(+), 20 deletions(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index e270868f90c..4e621079b37 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -8,6 +8,8 @@ use crate::authz; use crate::context::OpContext; use crate::db; use crate::db::identity::Asset; +use crate::external_api::params::ResourceMetrics; +use crate::external_api::params::ResourceMetricsPagination; use crate::internal_api::params::OximeterInfo; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; @@ -16,6 +18,8 @@ use omicron_common::api::external::PaginationOrder; use omicron_common::api::internal::nexus; use omicron_common::backoff; use oximeter_client::Client as OximeterClient; +use oximeter_db::query::Timestamp; +use oximeter_db::Measurement; use oximeter_db::TimeseriesSchema; use oximeter_db::TimeseriesSchemaPaginationParams; use oximeter_producer::register; @@ -162,14 +166,74 @@ impl super::Nexus { self.timeseries_client .timeseries_schema_list(&pag_params.page, limit) .await - .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { - Error::ServiceUnavailable { - internal_message: e.to_string(), - } - } - _ => Error::InternalError { internal_message: e.to_string() }, - }) + .map_err(map_oximeter_err) + } + + pub async fn select_timeseries( + &self, + timeseries_name: &str, + criteria: &[&str], + interval: Duration, + query_params: ResourceMetricsPagination, + limit: NonZeroU32, + ) -> Result, Error> { + #[inline] + fn no_results() -> dropshot::ResultsPage { + dropshot::ResultsPage { next_page: None, items: Vec::new() } + } + + let query = match query_params.page { + dropshot::WhichPage::First(query) => query, + dropshot::WhichPage::Next(query) => query, + }; + let start_time = query.start_time; + if start_time >= query.end_time { + return Ok(no_results()); + } + let max_timeframe = chrono::Duration::from_std(interval * limit.get()) + .map_err(|e| Error::internal_error(&e.to_string()))?; + let end_time = query.end_time.min(start_time + max_timeframe); + + let timeseries_list = self + .timeseries_client + .select_timeseries_with( + timeseries_name, + criteria, + Some(Timestamp::Inclusive(start_time)), + Some(Timestamp::Exclusive(end_time)), + ) + .await + .map_err(map_oximeter_err)?; + + if timeseries_list.len() > 1 { + return Err(Error::internal_error(&format!( + "expected 1 timeseries but got {} ({:?} {:?})", + timeseries_list.len(), + timeseries_name, + criteria + ))); + } + + Ok(if let Some(timeseries) = timeseries_list.into_iter().next() { + let next_start_time = end_time; + + dropshot::ResultsPage { + next_page: if next_start_time >= query.end_time { + None + } else { + Some(base64::encode_config( + serde_json::to_string(&ResourceMetrics { + start_time: next_start_time, + end_time: query.end_time, + })?, + base64::URL_SAFE, + )) + }, + items: timeseries.measurements, + } + } else { + no_results() + }) } // Internal helper to build an Oximeter client from its ID and address (common data between @@ -209,3 +273,12 @@ impl super::Nexus { Ok((self.build_oximeter_client(&id, address), id)) } } + +fn map_oximeter_err(error: oximeter_db::Error) -> Error { + match error { + oximeter_db::Error::DatabaseUnavailable(_) => { + Error::ServiceUnavailable { internal_message: error.to_string() } + } + _ => Error::InternalError { internal_message: error.to_string() }, + } +} diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 5384b9f412a..ac07f693db0 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -9,9 +9,9 @@ use super::views::IpPoolRange; use super::{ console_api, device_auth, params, views, views::{ - GlobalImage, IdentityProvider, Image, Organization, Project, Rack, - Role, Silo, Sled, Snapshot, SshKey, User, UserBuiltin, Vpc, VpcRouter, - VpcSubnet, + GlobalImage, IdentityProvider, Image, Measurement, Organization, + Project, Rack, Role, Silo, Sled, Snapshot, SshKey, User, UserBuiltin, + Vpc, VpcRouter, VpcSubnet, }, }; use crate::authz; @@ -67,11 +67,13 @@ use omicron_common::api::external::VpcFirewallRules; use omicron_common::{ api::external::http_pagination::data_page_params_for, bail_unless, }; +use parse_display::Display; use ref_cast::RefCast; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; type NexusApiDescription = ApiDescription>; @@ -123,6 +125,7 @@ pub fn external_api() -> NexusApiDescription { api.register(project_disks_post)?; api.register(project_disks_get_disk)?; api.register(project_disks_delete_disk)?; + api.register(project_disks_get_metrics)?; api.register(project_instances_get)?; api.register(project_instances_post)?; @@ -1420,6 +1423,67 @@ async fn project_disks_delete_disk( apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } +#[derive(Display, Deserialize, JsonSchema)] +#[display(style = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum DiskMetricName { + Activated, + Flush, + Read, + ReadBytes, + Write, + WriteBytes, +} + +/// Fetch metrics for a disk. +#[endpoint { + method = GET, + path = "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}", + tags = ["disks"], +}] +async fn project_disks_get_metrics( + rqctx: Arc>>, + path_params: Path>, + query_params: Query, +) -> Result>, HttpError> { + let apictx = rqctx.context(); + let nexus = &apictx.nexus; + + let path = path_params.into_inner(); + let organization_name = &path.inner.organization_name; + let project_name = &path.inner.project_name; + let disk_name = &path.inner.disk_name; + let metric_name = path.metric_name; + + let query = query_params.into_inner(); + let limit = rqctx.page_limit(&query)?; + + let handler = async { + let opctx = OpContext::for_external_api(&rqctx).await?; + + // this ensures the user is authorized on Action::Read for this disk + nexus + .disk_fetch(&opctx, organization_name, project_name, disk_name) + .await?; + + // FIXME fill this in with the code that gets the upstairs UUID from the disk UUID + let upstairs_uuid = uuid::uuid!("4fe353bf-c3a6-421f-a51d-c833091637fa"); + + Ok(HttpResponseOk( + nexus + .select_timeseries( + &format!("crucible_upstairs:{}", metric_name), + &[&format!("upstairs_uuid=={}", upstairs_uuid)], + Duration::from_secs(10), + query, + limit, + ) + .await?, + )) + }; + apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await +} + // Instances /// List instances in a project. @@ -3753,6 +3817,15 @@ async fn sshkeys_delete_key( apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } +/// Path parameters for metrics requests where `/metrics/{metric_name}` is +/// appended to an existing path parameter type +#[derive(Deserialize, JsonSchema)] +struct MetricsPathParam { + #[serde(flatten)] + inner: T, + metric_name: M, +} + #[cfg(test)] mod test { use super::external_api; diff --git a/nexus/src/external_api/params.rs b/nexus/src/external_api/params.rs index 68d81d05bec..2f3d72ccaa2 100644 --- a/nexus/src/external_api/params.rs +++ b/nexus/src/external_api/params.rs @@ -5,6 +5,8 @@ //! Params define the request bodies of API endpoints for creating or updating resources. use crate::external_api::shared; +use chrono::{DateTime, Utc}; +use dropshot::PaginationParams; use omicron_common::api::external::{ ByteCount, IdentityMetadataCreateParams, IdentityMetadataUpdateParams, InstanceCpuCount, Ipv4Net, Ipv6Net, Name, @@ -779,6 +781,18 @@ pub struct SshKeyCreate { pub public_key: String, } +// METRICS + +/// Query parameters common to resource metrics endpoints. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +pub struct ResourceMetrics { + pub start_time: DateTime, + pub end_time: DateTime, +} + +pub type ResourceMetricsPagination = + PaginationParams; + #[cfg(test)] mod test { use super::*; diff --git a/nexus/src/external_api/views.rs b/nexus/src/external_api/views.rs index cc37f2763f0..e4729661292 100644 --- a/nexus/src/external_api/views.rs +++ b/nexus/src/external_api/views.rs @@ -20,6 +20,8 @@ use serde::{Deserialize, Serialize}; use std::net::SocketAddrV6; use uuid::Uuid; +pub use oximeter_db::Measurement; + // IDENTITY METADATA /// Identity-related metadata that's included in "asset" public API objects diff --git a/nexus/tests/integration_tests/endpoints.rs b/nexus/tests/integration_tests/endpoints.rs index e928c365c9a..6edf3b0a1bf 100644 --- a/nexus/tests/integration_tests/endpoints.rs +++ b/nexus/tests/integration_tests/endpoints.rs @@ -8,6 +8,7 @@ //! THERE ARE NO TESTS IN THIS FILE. use crate::integration_tests::unauthorized::HTTP_SERVER; +use chrono::Utc; use http::method::Method; use lazy_static::lazy_static; use nexus_test_utils::RACK_UUID; @@ -177,6 +178,13 @@ lazy_static! { disk_source: params::DiskSource::Blank { block_size: params::BlockSize::try_from(4096).unwrap() }, size: ByteCount::from_gibibytes_u32(16), }; + pub static ref DEMO_DISK_METRICS_URL: String = + format!( + "{}/metrics/activated?start_time={:?}&end_time={:?}", + *DEMO_DISK_URL, + Utc::now(), + Utc::now(), + ); // Instance used for testing pub static ref DEMO_INSTANCE_NAME: Name = "demo-instance".parse().unwrap(); @@ -829,6 +837,14 @@ lazy_static! { ], }, + VerifyEndpoint { + url: &*DEMO_DISK_METRICS_URL, + visibility: Visibility::Protected, + allowed_methods: vec![ + AllowedMethod::Get, + ], + }, + VerifyEndpoint { url: &*DEMO_INSTANCE_DISKS_URL, visibility: Visibility::Protected, diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 95d99abfe46..34fc94ae529 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -3,6 +3,7 @@ OPERATION ID URL PATH project_disks_delete_disk /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} project_disks_get /organizations/{organization_name}/projects/{project_name}/disks project_disks_get_disk /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} +project_disks_get_metrics /organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name} project_disks_post /organizations/{organization_name}/projects/{project_name}/disks API operations found with tag "firewall" diff --git a/openapi/nexus.json b/openapi/nexus.json index 9e354e60d11..d1868295558 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -1803,6 +1803,112 @@ } } }, + "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}": { + "get": { + "tags": [ + "disks" + ], + "summary": "Fetch metrics for a disk.", + "operationId": "project_disks_get_metrics", + "parameters": [ + { + "in": "path", + "name": "disk_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "path", + "name": "metric_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/DiskMetricName" + }, + "style": "simple" + }, + { + "in": "path", + "name": "organization_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "path", + "name": "project_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "query", + "name": "end_time", + "schema": { + "type": "string", + "format": "date-time" + }, + "style": "form" + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + }, + "style": "form" + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + }, + "style": "form" + }, + { + "in": "query", + "name": "start_time", + "schema": { + "type": "string", + "format": "date-time" + }, + "style": "form" + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MeasurementResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": true + } + }, "/organizations/{organization_name}/projects/{project_name}/images": { "get": { "tags": [ @@ -6226,6 +6332,194 @@ } }, "schemas": { + "BinRangedouble": { + "description": "A type storing a range over `T`.\n\nThis type supports ranges similar to the `RangeTo`, `Range` and `RangeFrom` types in the standard library. Those cover `(..end)`, `(start..end)`, and `(start..)` respectively.", + "oneOf": [ + { + "description": "A range unbounded below and exclusively above, `..end`.", + "type": "object", + "properties": { + "end": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range_to" + ] + } + }, + "required": [ + "end", + "type" + ] + }, + { + "description": "A range bounded inclusively below and exclusively above, `start..end`.", + "type": "object", + "properties": { + "end": { + "type": "number", + "format": "double" + }, + "start": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range" + ] + } + }, + "required": [ + "end", + "start", + "type" + ] + }, + { + "description": "A range bounded inclusively below and unbounded above, `start..`.", + "type": "object", + "properties": { + "start": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range_from" + ] + } + }, + "required": [ + "start", + "type" + ] + } + ] + }, + "BinRangeint64": { + "description": "A type storing a range over `T`.\n\nThis type supports ranges similar to the `RangeTo`, `Range` and `RangeFrom` types in the standard library. Those cover `(..end)`, `(start..end)`, and `(start..)` respectively.", + "oneOf": [ + { + "description": "A range unbounded below and exclusively above, `..end`.", + "type": "object", + "properties": { + "end": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range_to" + ] + } + }, + "required": [ + "end", + "type" + ] + }, + { + "description": "A range bounded inclusively below and exclusively above, `start..end`.", + "type": "object", + "properties": { + "end": { + "type": "integer", + "format": "int64" + }, + "start": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range" + ] + } + }, + "required": [ + "end", + "start", + "type" + ] + }, + { + "description": "A range bounded inclusively below and unbounded above, `start..`.", + "type": "object", + "properties": { + "start": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range_from" + ] + } + }, + "required": [ + "start", + "type" + ] + } + ] + }, + "Bindouble": { + "description": "Type storing bin edges and a count of samples within it.", + "type": "object", + "properties": { + "count": { + "description": "The total count of samples in this bin.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "range": { + "description": "The range of the support covered by this bin.", + "allOf": [ + { + "$ref": "#/components/schemas/BinRangedouble" + } + ] + } + }, + "required": [ + "count", + "range" + ] + }, + "Binint64": { + "description": "Type storing bin edges and a count of samples within it.", + "type": "object", + "properties": { + "count": { + "description": "The total count of samples in this bin.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "range": { + "description": "The range of the support covered by this bin.", + "allOf": [ + { + "$ref": "#/components/schemas/BinRangeint64" + } + ] + } + }, + "required": [ + "count", + "range" + ] + }, "BlockSize": { "title": "disk block size in bytes", "type": "integer", @@ -6241,6 +6535,216 @@ "format": "uint64", "minimum": 0 }, + "Cumulativedouble": { + "description": "A cumulative or counter data type.", + "type": "object", + "properties": { + "start_time": { + "type": "string", + "format": "date-time" + }, + "value": { + "type": "number", + "format": "double" + } + }, + "required": [ + "start_time", + "value" + ] + }, + "Cumulativeint64": { + "description": "A cumulative or counter data type.", + "type": "object", + "properties": { + "start_time": { + "type": "string", + "format": "date-time" + }, + "value": { + "type": "integer", + "format": "int64" + } + }, + "required": [ + "start_time", + "value" + ] + }, + "Datum": { + "description": "A `Datum` is a single sampled data point from a metric.", + "oneOf": [ + { + "type": "object", + "properties": { + "datum": { + "type": "boolean" + }, + "type": { + "type": "string", + "enum": [ + "bool" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "string" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "array", + "items": { + "type": "integer", + "format": "uint8", + "minimum": 0 + } + }, + "type": { + "type": "string", + "enum": [ + "bytes" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Cumulativeint64" + }, + "type": { + "type": "string", + "enum": [ + "cumulative_i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Cumulativedouble" + }, + "type": { + "type": "string", + "enum": [ + "cumulative_f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Histogramint64" + }, + "type": { + "type": "string", + "enum": [ + "histogram_i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Histogramdouble" + }, + "type": { + "type": "string", + "enum": [ + "histogram_f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + } + ] + }, "DatumType": { "description": "The type of an individual datum of a metric.", "type": "string", @@ -6961,6 +7465,58 @@ "items" ] }, + "Histogramdouble": { + "description": "A simple type for managing a histogram metric.\n\nA histogram maintains the count of any number of samples, over a set of bins. Bins are specified on construction via their _left_ edges, inclusive. There can't be any \"gaps\" in the bins, and an additional bin may be added to the left, right, or both so that the bins extend to the entire range of the support.\n\nNote that any gaps, unsorted bins, or non-finite values will result in an error.\n\nExample ------- ```rust use oximeter::histogram::{BinRange, Histogram};\n\nlet edges = [0i64, 10, 20]; let mut hist = Histogram::new(&edges).unwrap(); assert_eq!(hist.n_bins(), 4); // One additional bin for the range (20..) assert_eq!(hist.n_samples(), 0); hist.sample(4); hist.sample(100); assert_eq!(hist.n_samples(), 2);\n\nlet data = hist.iter().collect::>(); assert_eq!(data[0].range, BinRange::range(i64::MIN, 0)); // An additional bin for `..0` assert_eq!(data[0].count, 0); // Nothing is in this bin\n\nassert_eq!(data[1].range, BinRange::range(0, 10)); // The range `0..10` assert_eq!(data[1].count, 1); // 4 is sampled into this bin ```\n\nNotes -----\n\nHistograms may be constructed either from their left bin edges, or from a sequence of ranges. In either case, the left-most bin may be converted upon construction. In particular, if the left-most value is not equal to the minimum of the support, a new bin will be added from the minimum to that provided value. If the left-most value _is_ the support's minimum, because the provided bin was unbounded below, such as `(..0)`, then that bin will be converted into one bounded below, `(MIN..0)` in this case.\n\nThe short of this is that, most of the time, it shouldn't matter. If one specifies the extremes of the support as their bins, be aware that the left-most may be converted from a `BinRange::RangeTo` into a `BinRange::Range`. In other words, the first bin of a histogram is _always_ a `Bin::Range` or a `Bin::RangeFrom` after construction. In fact, every bin is one of those variants, the `BinRange::RangeTo` is only provided as a convenience during construction.", + "type": "object", + "properties": { + "bins": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Bindouble" + } + }, + "n_samples": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "start_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "bins", + "n_samples", + "start_time" + ] + }, + "Histogramint64": { + "description": "A simple type for managing a histogram metric.\n\nA histogram maintains the count of any number of samples, over a set of bins. Bins are specified on construction via their _left_ edges, inclusive. There can't be any \"gaps\" in the bins, and an additional bin may be added to the left, right, or both so that the bins extend to the entire range of the support.\n\nNote that any gaps, unsorted bins, or non-finite values will result in an error.\n\nExample ------- ```rust use oximeter::histogram::{BinRange, Histogram};\n\nlet edges = [0i64, 10, 20]; let mut hist = Histogram::new(&edges).unwrap(); assert_eq!(hist.n_bins(), 4); // One additional bin for the range (20..) assert_eq!(hist.n_samples(), 0); hist.sample(4); hist.sample(100); assert_eq!(hist.n_samples(), 2);\n\nlet data = hist.iter().collect::>(); assert_eq!(data[0].range, BinRange::range(i64::MIN, 0)); // An additional bin for `..0` assert_eq!(data[0].count, 0); // Nothing is in this bin\n\nassert_eq!(data[1].range, BinRange::range(0, 10)); // The range `0..10` assert_eq!(data[1].count, 1); // 4 is sampled into this bin ```\n\nNotes -----\n\nHistograms may be constructed either from their left bin edges, or from a sequence of ranges. In either case, the left-most bin may be converted upon construction. In particular, if the left-most value is not equal to the minimum of the support, a new bin will be added from the minimum to that provided value. If the left-most value _is_ the support's minimum, because the provided bin was unbounded below, such as `(..0)`, then that bin will be converted into one bounded below, `(MIN..0)` in this case.\n\nThe short of this is that, most of the time, it shouldn't matter. If one specifies the extremes of the support as their bins, be aware that the left-most may be converted from a `BinRange::RangeTo` into a `BinRange::Range`. In other words, the first bin of a histogram is _always_ a `Bin::Range` or a `Bin::RangeFrom` after construction. In fact, every bin is one of those variants, the `BinRange::RangeTo` is only provided as a convenience during construction.", + "type": "object", + "properties": { + "bins": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Binint64" + } + }, + "n_samples": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "start_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "bins", + "n_samples", + "start_time" + ] + }, "IdentityProvider": { "description": "Client view of an [`IdentityProvider`]", "type": "object", @@ -7864,6 +8420,44 @@ "minLength": 17, "maxLength": 17 }, + "Measurement": { + "description": "A `Measurement` is a timestamped datum from a single metric", + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Datum" + }, + "timestamp": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "datum", + "timestamp" + ] + }, + "MeasurementResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/Measurement" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, "Name": { "title": "A name used in the API", "description": "Names must begin with a lower case ASCII letter, be composed exclusively of lowercase ASCII, uppercase ASCII, numbers, and '-', and may not end with a '-'.", @@ -10500,6 +11094,17 @@ "name_descending", "id_ascending" ] + }, + "DiskMetricName": { + "type": "string", + "enum": [ + "activated", + "flush", + "read", + "read_bytes", + "write", + "write_bytes" + ] } } }, diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 60e04958eb2..4f17076fe5d 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -79,15 +79,11 @@ impl Client { // to/from the database, as well as the cost of parsing them for each measurement, only to // promptly throw away almost all of them (except for the first). let timeseries_name = TimeseriesName::try_from(timeseries_name)?; - let schema = self - .schema_for_timeseries(×eries_name) - .await? - .ok_or_else(|| { - Error::QueryError(format!( - "No such timeseries: '{}'", - timeseries_name - )) - })?; + let schema = match self.schema_for_timeseries(×eries_name).await? { + Some(schema) => schema, + // If the timeseries doesn't exist, just return an empty Vec. + None => return Ok(Vec::new()), + }; let mut query_builder = query::SelectQueryBuilder::new(&schema) .start_time(start_time) .end_time(end_time); From b427eded0dfdc10b7c9461a38f110e9c620d0f5f Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 11 Jul 2022 14:08:34 -0400 Subject: [PATCH 02/11] Fix tags, openapi --- nexus/tests/output/nexus_tags.txt | 2 +- openapi/nexus.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 503be4ed28e..ee64b60c58f 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -2,8 +2,8 @@ API operations found with tag "disks" OPERATION ID URL PATH disk_create /organizations/{organization_name}/projects/{project_name}/disks disk_delete /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} -disk_get_metrics /organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name} disk_list /organizations/{organization_name}/projects/{project_name}/disks +disk_metrics_list /organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name} disk_view /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} API operations found with tag "hardware" diff --git a/openapi/nexus.json b/openapi/nexus.json index 367c725d9fd..584ecfa9bb4 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -1778,7 +1778,7 @@ "disks" ], "summary": "Fetch metrics for a disk.", - "operationId": "project_disks_get_metrics", + "operationId": "disk_metrics_list", "parameters": [ { "in": "path", From 01be1653518e1f524d8accbc9dc51d36300d166a Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 11 Jul 2022 16:44:38 -0400 Subject: [PATCH 03/11] Use volume ID as upstairs ID --- nexus/src/external_api/http_entrypoints.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index ee921d00351..f3526f4906b 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -1465,18 +1465,15 @@ async fn disk_metrics_list( let opctx = OpContext::for_external_api(&rqctx).await?; // this ensures the user is authorized on Action::Read for this disk - nexus + let disk = nexus .disk_fetch(&opctx, organization_name, project_name, disk_name) .await?; - // FIXME fill this in with the code that gets the upstairs UUID from the disk UUID - let upstairs_uuid = uuid::uuid!("4fe353bf-c3a6-421f-a51d-c833091637fa"); - Ok(HttpResponseOk( nexus .select_timeseries( &format!("crucible_upstairs:{}", metric_name), - &[&format!("upstairs_uuid=={}", upstairs_uuid)], + &[&format!("upstairs_uuid=={}", disk.volume_id)], Duration::from_secs(10), query, limit, From 609ea2872eea885f898a369426431f895a04d2b4 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 11 Jul 2022 17:39:07 -0400 Subject: [PATCH 04/11] Make clippy happy --- nexus/src/app/oximeter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 1066bb20004..3376be7363c 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -251,7 +251,7 @@ impl super::Nexus { .map_err(|e| { Error::internal_error(&format!( "Cannot access timeseries DB: {}", - e.to_string() + e )) })? .select_timeseries_with( From 506e1b8ef7b9a8f6b447ee96ad1ee1933de95a15 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 13 Jul 2022 22:51:10 -0400 Subject: [PATCH 05/11] Add integration test for metrics collection --- Cargo.lock | 3 + nexus/src/external_api/http_entrypoints.rs | 22 +-- nexus/test-utils/src/lib.rs | 23 ++- nexus/test-utils/src/resource_helpers.rs | 13 +- nexus/tests/integration_tests/disks.rs | 78 ++++++++ nexus/tests/integration_tests/oximeter.rs | 2 + .../integration_tests/subnet_allocation.rs | 6 +- sled-agent/Cargo.toml | 7 + sled-agent/src/sim/collection.rs | 16 ++ sled-agent/src/sim/disk.rs | 183 +++++++++++++++++- sled-agent/src/sim/instance.rs | 9 + sled-agent/src/sim/server.rs | 1 + sled-agent/src/sim/simulatable.rs | 9 + sled-agent/src/sim/sled_agent.rs | 23 +++ 14 files changed, 374 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21c24c9e139..684cd37be24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3164,6 +3164,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "clap 3.2.6", + "crucible", "crucible-agent-client", "ddm-admin-client", "dropshot", @@ -3182,6 +3183,8 @@ dependencies = [ "openapiv3", "opte", "opte-ioctl", + "oximeter 0.1.0", + "oximeter-producer 0.1.0", "p256", "percent-encoding", "progenitor", diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 1e67316d14a..f56b4cbc6d4 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -1557,18 +1557,18 @@ async fn disk_metrics_list( let disk = nexus .disk_fetch(&opctx, organization_name, project_name, disk_name) .await?; + let upstairs_uuid = disk.id(); + let result = nexus + .select_timeseries( + &format!("crucible_upstairs:{}", metric_name), + &[&format!("upstairs_uuid=={}", upstairs_uuid)], + Duration::from_secs(10), + query, + limit, + ) + .await?; - Ok(HttpResponseOk( - nexus - .select_timeseries( - &format!("crucible_upstairs:{}", metric_name), - &[&format!("upstairs_uuid=={}", disk.volume_id)], - Duration::from_secs(10), - query, - limit, - ) - .await?, - )) + Ok(HttpResponseOk(result)) }; apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 211e3116a8a..73279124b53 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -159,6 +159,7 @@ pub async fn test_setup_with_config( ) .await .unwrap(); + register_test_producer(&producer).unwrap(); ControlPlaneTestContext { server, @@ -253,6 +254,10 @@ impl oximeter::Producer for IntegrationProducer { } } +/// Creates and starts a producer server. +/// +/// Actual producers can be registered with the [`register_producer`] +/// helper function. pub async fn start_producer_server( nexus_address: SocketAddr, id: Uuid, @@ -281,9 +286,22 @@ pub async fn start_producer_server( }; let server = ProducerServer::start(&config).await.map_err(|e| e.to_string())?; + Ok(server) +} +/// Registers an arbitrary producer with the test server. +pub fn register_producer( + server: &ProducerServer, + producer: impl oximeter::Producer, +) -> Result<(), String> { + server.registry().register_producer(producer).map_err(|e| e.to_string())?; + Ok(()) +} + +/// Registers a sample-generating test-specific producer. +pub fn register_test_producer(server: &ProducerServer) -> Result<(), String> { // Create and register an actual metric producer. - let producer = IntegrationProducer { + let test_producer = IntegrationProducer { target: IntegrationTarget { name: "integration-test-target".to_string(), }, @@ -292,8 +310,7 @@ pub async fn start_producer_server( datum: 0, }, }; - server.registry().register_producer(producer).map_err(|e| e.to_string())?; - Ok(server) + register_producer(server, test_producer) } /// Returns whether the two identity metadata objects are identical. diff --git a/nexus/test-utils/src/resource_helpers.rs b/nexus/test-utils/src/resource_helpers.rs index 6da36b46396..b7236bd9d21 100644 --- a/nexus/test-utils/src/resource_helpers.rs +++ b/nexus/test-utils/src/resource_helpers.rs @@ -186,28 +186,35 @@ pub async fn create_disk( .await } +/// Creates an instance with a default NIC and no disks. +/// +/// Wrapper around [`create_instance_with`]. pub async fn create_instance( client: &ClientTestContext, organization_name: &str, project_name: &str, instance_name: &str, ) -> Instance { - create_instance_with_nics( + create_instance_with( client, organization_name, project_name, instance_name, ¶ms::InstanceNetworkInterfaceAttachment::Default, + // Disks= + vec![], ) .await } -pub async fn create_instance_with_nics( +/// Creates an instance with attached resou8rces. +pub async fn create_instance_with( client: &ClientTestContext, organization_name: &str, project_name: &str, instance_name: &str, nics: ¶ms::InstanceNetworkInterfaceAttachment, + disks: Vec, ) -> Instance { let url = format!( "/organizations/{}/projects/{}/instances", @@ -228,7 +235,7 @@ pub async fn create_instance_with_nics( b"#cloud-config\nsystem_info:\n default_user:\n name: oxide" .to_vec(), network_interfaces: nics.clone(), - disks: vec![], + disks, }, ) .await diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index ea3a64c5f82..9a64b8cf336 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -4,9 +4,11 @@ //! Tests basic disk support in the API +use chrono::Utc; use crucible_agent_client::types::State as RegionState; use dropshot::test_util::ClientTestContext; use dropshot::HttpErrorResponseBody; +use dropshot::ResultsPage; use http::method::Method; use http::StatusCode; use nexus_test_utils::http_testing::AuthnMode; @@ -15,9 +17,11 @@ use nexus_test_utils::http_testing::RequestBuilder; use nexus_test_utils::identity_eq; use nexus_test_utils::resource_helpers::create_disk; use nexus_test_utils::resource_helpers::create_instance; +use nexus_test_utils::resource_helpers::create_instance_with; use nexus_test_utils::resource_helpers::create_ip_pool; use nexus_test_utils::resource_helpers::create_organization; use nexus_test_utils::resource_helpers::create_project; +use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::DiskTest; use nexus_test_utils::ControlPlaneTestContext; use nexus_test_utils_macros::nexus_test; @@ -29,6 +33,8 @@ use omicron_common::api::external::Instance; use omicron_common::api::external::Name; use omicron_nexus::TestInterfaces as _; use omicron_nexus::{external_api::params, Nexus}; +use oximeter::types::Datum; +use oximeter::types::Measurement; use sled_agent_client::TestInterfaces as _; use std::sync::Arc; use uuid::Uuid; @@ -822,6 +828,78 @@ async fn test_disk_reject_total_size_not_divisible_by_min_disk_size( ); } +#[nexus_test] +async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + create_org_and_project(client).await; + let disks_url = get_disks_url(); + + // Create a disk. + let _new_disk = params::DiskCreate { + identity: IdentityMetadataCreateParams { + name: DISK_NAME.parse().unwrap(), + description: String::from("sells rainsticks"), + }, + disk_source: params::DiskSource::Blank { + block_size: params::BlockSize::try_from(512).unwrap(), + }, + size: ByteCount::from_gibibytes_u32(1), + }; + let _disk = create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + let disk_url = format!("{}/{}", disks_url, DISK_NAME); + + // Whenever we grab this URL, get the last few seconds of metrics. + let metric_url = |metric_type: &str| { + format!( + "{disk_url}/metrics/{metric_type}?start_time={:?}&end_time={:?}", + Utc::now() - chrono::Duration::seconds(5), + Utc::now(), + ) + }; + + // Try accessing metrics before we attach the disk to an instance. + // + // Observe that no metrics exist yet; no "upstairs" should have been + // instantiated on a sled. + let measurements: ResultsPage = + objects_list_page_authz(client, &metric_url("read")).await; + assert!(measurements.items.is_empty()); + + // Create an instance, attach the disk to it. + let _instance = create_instance_with( + &client, + ORG_NAME, + PROJECT_NAME, + INSTANCE_NAME, + ¶ms::InstanceNetworkInterfaceAttachment::Default, + vec![params::InstanceDiskAttachment::Attach( + params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, + )], + ) + .await; + + // TODO: test that get as unprivileged fails? + // TODO: make the intervals smaller to avoid sleeping? + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + + let metrics = + ["activated", "read", "write", "read_bytes", "write_bytes", "flush"]; + + for metric in &metrics { + let measurements: ResultsPage = + objects_list_page_authz(client, &metric_url(metric)).await; + assert!(!measurements.items.is_empty()); + for item in &measurements.items { + let cumulative = match item.datum() { + Datum::CumulativeI64(c) => c, + _ => panic!("Unexpected datum type {:?}", item.datum()), + }; + assert!(cumulative.start_time() <= item.timestamp()); + } + } +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser) diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 65046ac68da..e1d37f3320b 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -204,6 +204,8 @@ async fn test_oximeter_reregistration() { ) .await .expect("Failed to restart metric producer server"); + nexus_test_utils::register_test_producer(&context.producer) + .expect("Failed to register producer"); // Run the verification in a loop using wait_for_condition, waiting until there is more data, // or failing the test otherwise. diff --git a/nexus/tests/integration_tests/subnet_allocation.rs b/nexus/tests/integration_tests/subnet_allocation.rs index a270e1c14b0..afc289d9790 100644 --- a/nexus/tests/integration_tests/subnet_allocation.rs +++ b/nexus/tests/integration_tests/subnet_allocation.rs @@ -13,7 +13,7 @@ use ipnetwork::Ipv4Network; use nexus_test_utils::http_testing::AuthnMode; use nexus_test_utils::http_testing::NexusRequest; use nexus_test_utils::http_testing::RequestBuilder; -use nexus_test_utils::resource_helpers::create_instance_with_nics; +use nexus_test_utils::resource_helpers::create_instance_with; use nexus_test_utils::resource_helpers::create_ip_pool; use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::{create_organization, create_project}; @@ -137,12 +137,14 @@ async fn test_subnet_allocation(cptestctx: &ControlPlaneTestContext) { NUM_INITIAL_RESERVED_IP_ADDRESSES + n_final_reserved_addresses; let subnet_size = subnet.size() as usize - n_reserved_addresses; for i in 0..subnet_size { - create_instance_with_nics( + create_instance_with( client, organization_name, project_name, &format!("i{}", i), &nic, + // Disks= + vec![], ) .await; } diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index d4681ba60eb..a06038e26c4 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -14,6 +14,11 @@ cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.2", features = ["derive"] } # Only used by the simulated sled agent. +# TODO: This is probably overkill. We'd like to only depend on +# the "crucible-agent-client", but the VolumeConstructionRequest object +# does not exist there. +crucible = { git = "https://github.com/oxidecomputer/crucible", rev = "fed3e8ca7762130ee146fc516a4ef6eed2b91629" } +# Only used by the simulated sled agent. crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "fed3e8ca7762130ee146fc516a4ef6eed2b91629" } ddm-admin-client = { path = "../ddm-admin-client" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } @@ -24,6 +29,8 @@ libc = "0.2.126" macaddr = { version = "1.0.1", features = [ "serde_std" ] } nexus-client = { path = "../nexus-client" } omicron-common = { path = "../common" } +oximeter = { version = "0.1.0", path = "../oximeter/oximeter" } +oximeter-producer = { version = "0.1.0", path = "../oximeter/producer" } p256 = "0.9.0" percent-encoding = "2.1.0" progenitor = { git = "https://github.com/oxidecomputer/progenitor" } diff --git a/sled-agent/src/sim/collection.rs b/sled-agent/src/sim/collection.rs index 68841311077..225bf6cd0f4 100644 --- a/sled-agent/src/sim/collection.rs +++ b/sled-agent/src/sim/collection.rs @@ -250,6 +250,22 @@ impl SimCollection { } } + pub async fn sim_ensure_producer( + self: &Arc, + id: &Uuid, + args: S::ProducerArgs, + ) -> Result<(), Error> { + self.objects + .lock() + .await + .get_mut(id) + .expect("Setting producer on object that does not exist") + .object + .set_producer(args) + .await?; + Ok(()) + } + /// Move the object identified by `id` from its current state to the /// requested state `target`. The object does not need to exist already; if /// not, it will be created from `current`. (This is the only case where diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index 5d67b8decb3..02ba7b2a477 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -8,32 +8,211 @@ use crate::nexus::NexusClient; use crate::params::DiskStateRequested; use crate::sim::simulatable::Simulatable; use async_trait::async_trait; +use dropshot::ConfigDropshot; +use dropshot::ConfigLogging; +use dropshot::ConfigLoggingLevel; use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::internal::nexus::DiskRuntimeState; +use omicron_common::api::internal::nexus::ProducerEndpoint; +use oximeter_producer::Server as ProducerServer; use propolis_client::api::DiskAttachmentState as PropolisDiskState; +use std::net::{Ipv6Addr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; use crate::common::disk::{Action as DiskAction, DiskStates}; +// Oximeter timeseries names are derived based on the precise names of structs, +// so we shove this in a module to more liberally use arbitrary names (like +// "Read"). +mod producers { + use super::*; + use oximeter::{ + types::{Cumulative, Sample}, + Metric, Target, + }; + + #[derive(Debug, Clone, Target)] + pub struct CrucibleUpstairs { + pub upstairs_uuid: Uuid, + } + + // TODO: It would be a lot nicer if we could just depend on the Crucible + // types here directly, rather than recreate them. However, in doing so, + // we bump into issues with the "Metric" trait - the implementation of + // oximeter::Producer claims that "Metric" is not implemented for the + // Crucible-defined structure, even though it is derived. + // I suspect this is due to version skew between Crucible vs Omicron's copy + // of Oximeter. + + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Activated { + /// Count of times this upstairs has activated. + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Write { + /// Count of region writes this upstairs has completed + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct WriteBytes { + /// Count of bytes written + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Read { + /// Count of region reads this upstairs has completed + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct ReadBytes { + /// Count of bytes read + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Flush { + /// Count of region flushes this upstairs has completed + #[datum] + count: Cumulative, + } + + #[derive(Debug, Clone)] + pub struct DiskProducer { + target: CrucibleUpstairs, + activated_count: Activated, + write_count: Write, + write_bytes: WriteBytes, + read_count: Read, + read_bytes: ReadBytes, + flush_count: Flush, + } + + impl DiskProducer { + pub fn new(id: Uuid) -> Self { + Self { + target: CrucibleUpstairs { upstairs_uuid: id }, + activated_count: Default::default(), + write_count: Default::default(), + write_bytes: Default::default(), + read_count: Default::default(), + read_bytes: Default::default(), + flush_count: Default::default(), + } + } + } + + impl oximeter::Producer for DiskProducer { + fn produce( + &mut self, + ) -> Result< + Box<(dyn Iterator + 'static)>, + oximeter::MetricsError, + > { + let samples = vec![ + Sample::new(&self.target, &self.activated_count), + Sample::new(&self.target, &self.write_count), + Sample::new(&self.target, &self.write_bytes), + Sample::new(&self.target, &self.read_count), + Sample::new(&self.target, &self.read_bytes), + Sample::new(&self.target, &self.flush_count), + ]; + + *self.activated_count.datum_mut() += 1; + *self.write_count.datum_mut() += 1; + *self.write_bytes.datum_mut() += 1; + *self.read_count.datum_mut() += 1; + *self.read_bytes.datum_mut() += 1; + *self.flush_count.datum_mut() += 1; + + Ok(Box::new(samples.into_iter())) + } + } +} + /// Simulated Disk (network block device), as created by the external Oxide API /// /// See `Simulatable` for how this works. -#[derive(Debug)] pub struct SimDisk { state: DiskStates, + producer: Option, +} + +// "producer" doesn't implement Debug, so we can't derive it on SimDisk. +impl std::fmt::Debug for SimDisk { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimDisk").field("state", &self.state).finish() + } +} + +impl SimDisk { + pub async fn start_producer_server( + &mut self, + nexus_address: SocketAddr, + id: Uuid, + ) -> Result<(), String> { + // Set up a producer server. + // + // This listens on any available port, and the server internally updates this to the actual + // bound port of the Dropshot HTTP server. + let producer_address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 0); + let server_info = ProducerEndpoint { + id, + address: producer_address, + base_route: "/collect".to_string(), + interval: Duration::from_secs(1), + }; + let config = oximeter_producer::Config { + server_info, + registration_address: nexus_address, + dropshot_config: ConfigDropshot { + bind_address: producer_address, + ..Default::default() + }, + logging_config: ConfigLogging::StderrTerminal { + level: ConfigLoggingLevel::Error, + }, + }; + let server = + ProducerServer::start(&config).await.map_err(|e| e.to_string())?; + + let producer = producers::DiskProducer::new(id); + server + .registry() + .register_producer(producer) + .map_err(|e| e.to_string())?; + self.producer.replace(server); + Ok(()) + } } #[async_trait] impl Simulatable for SimDisk { type CurrentState = DiskRuntimeState; type RequestedState = DiskStateRequested; + type ProducerArgs = (std::net::SocketAddr, Uuid); type Action = DiskAction; fn new(current: DiskRuntimeState) -> Self { - SimDisk { state: DiskStates::new(current) } + SimDisk { state: DiskStates::new(current), producer: None } + } + + async fn set_producer( + &mut self, + args: Self::ProducerArgs, + ) -> Result<(), Error> { + self.start_producer_server(args.0, args.1).await.map_err(|e| { + Error::internal_error(&format!("Setting producer server: {e}")) + })?; + Ok(()) } fn request_transition( diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index 428138348ec..39d9bc624d0 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -30,12 +30,21 @@ pub struct SimInstance { impl Simulatable for SimInstance { type CurrentState = InstanceRuntimeState; type RequestedState = InstanceRuntimeStateRequested; + type ProducerArgs = (); type Action = InstanceAction; fn new(current: InstanceRuntimeState) -> Self { SimInstance { state: InstanceStates::new(current) } } + async fn set_producer( + &mut self, + _args: Self::ProducerArgs, + ) -> Result<(), Error> { + // NOTE: Not implemented, yet. + Ok(()) + } + fn request_transition( &mut self, target: &InstanceRuntimeStateRequested, diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs index 555fa2f56fc..6413fdf6ff5 100644 --- a/sled-agent/src/sim/server.rs +++ b/sled-agent/src/sim/server.rs @@ -46,6 +46,7 @@ impl Server { let sled_agent = Arc::new(SledAgent::new_simulated_with_id( &config, sa_log, + config.nexus_address, Arc::clone(&nexus_client), )); diff --git a/sled-agent/src/sim/simulatable.rs b/sled-agent/src/sim/simulatable.rs index ae94409ddee..4e8f88f7119 100644 --- a/sled-agent/src/sim/simulatable.rs +++ b/sled-agent/src/sim/simulatable.rs @@ -61,6 +61,9 @@ pub trait Simulatable: fmt::Debug + Send + Sync { /// transitions to intermediate states. type RequestedState: Send + Clone + fmt::Debug; + /// Arguments to start a producer on the simulated object. + type ProducerArgs: Send + Clone + fmt::Debug; + /// Represents an action that should be taken by the Sled Agent. /// Generated in response to a state change, either requested or observed. type Action: Send + Clone + fmt::Debug; @@ -68,6 +71,12 @@ pub trait Simulatable: fmt::Debug + Send + Sync { /// Creates a new Simulatable object. fn new(current: Self::CurrentState) -> Self; + /// Sets the producer based on the provided arguments. + async fn set_producer( + &mut self, + args: Self::ProducerArgs, + ) -> Result<(), Error>; + /// Requests that the simulated object transition to a new target. /// /// If successful, returns the action that must be taken by the Sled Agent diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index b11baf5665c..5a7d3dd0c2f 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -15,6 +15,7 @@ use omicron_common::api::external::Error; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::InstanceRuntimeState; use slog::Logger; +use std::net::SocketAddr; use std::sync::Arc; use uuid::Uuid; @@ -37,6 +38,7 @@ pub struct SledAgent { /// collection of simulated disks, indexed by disk uuid disks: Arc>, storage: Mutex, + nexus_address: SocketAddr, pub nexus_client: Arc, } @@ -47,6 +49,7 @@ impl SledAgent { pub fn new_simulated_with_id( config: &Config, log: Logger, + nexus_address: SocketAddr, nexus_client: Arc, ) -> SledAgent { let id = config.id; @@ -74,6 +77,7 @@ impl SledAgent { config.storage.ip, storage_log, )), + nexus_address, nexus_client, } } @@ -87,6 +91,25 @@ impl SledAgent { initial_hardware: InstanceHardware, target: InstanceRuntimeStateRequested, ) -> Result { + for disk in &initial_hardware.disks { + let initial_state = DiskRuntimeState { + disk_state: omicron_common::api::external::DiskState::Attached( + instance_id, + ), + gen: omicron_common::api::external::Generation::new(), + time_updated: chrono::Utc::now(), + }; + let target = DiskStateRequested::Attached(instance_id); + + let id = match disk.volume_construction_request { + crucible::VolumeConstructionRequest::Volume { id, .. } => id, + _ => panic!("Unexpected construction type"), + }; + self.disks.sim_ensure(&id, initial_state, target).await?; + self.disks + .sim_ensure_producer(&id, (self.nexus_address, id)) + .await?; + } self.instances .sim_ensure(&instance_id, initial_hardware.runtime, target) .await From 9f8ab70f788746eb0f3af9b9788b9de8ffc003a3 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 14 Jul 2022 09:57:22 -0400 Subject: [PATCH 06/11] Unprivileged_access added to VerifyEndpoint --- nexus/tests/integration_tests/endpoints.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/nexus/tests/integration_tests/endpoints.rs b/nexus/tests/integration_tests/endpoints.rs index 5d94277f567..37a2e358717 100644 --- a/nexus/tests/integration_tests/endpoints.rs +++ b/nexus/tests/integration_tests/endpoints.rs @@ -975,6 +975,7 @@ lazy_static! { VerifyEndpoint { url: &*DEMO_DISK_METRICS_URL, visibility: Visibility::Protected, + unprivileged_access: UnprivilegedAccess::None, allowed_methods: vec![ AllowedMethod::Get, ], From 4e5b70c1e6ba65687c7634e16fed4c8390dde8fc Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 14 Jul 2022 12:00:35 -0400 Subject: [PATCH 07/11] Add limits, fix pagination (hopefully. Tests incoming) --- nexus/src/app/oximeter.rs | 79 +++++++++++++++------- nexus/src/external_api/http_entrypoints.rs | 4 +- nexus/tests/integration_tests/oximeter.rs | 2 +- oximeter/db/src/bin/oxdb.rs | 1 + oximeter/db/src/client.rs | 14 +++- 5 files changed, 71 insertions(+), 29 deletions(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 3376be7363c..adb955465c1 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -219,11 +219,21 @@ impl super::Nexus { .map_err(map_oximeter_err) } + /// Returns a results from the timeseries DB based on the provided query + /// parameters. + /// + /// * `timeseries_name`: The "target:metric" name identifying the metric to + /// be queried. + /// * `criteria`: Any additional parameters to help narrow down the query + /// selection further. + /// * `query_params`: Pagination parameter, identifying which page of + /// results to return. + /// * `limit`: The maximum number of results to return in a paginated + /// request. pub async fn select_timeseries( &self, timeseries_name: &str, criteria: &[&str], - interval: Duration, query_params: ResourceMetricsPagination, limit: NonZeroU32, ) -> Result, Error> { @@ -237,12 +247,10 @@ impl super::Nexus { dropshot::WhichPage::Next(query) => query, }; let start_time = query.start_time; - if start_time >= query.end_time { + let end_time = query.end_time; + if start_time >= end_time { return Ok(no_results()); } - let max_timeframe = chrono::Duration::from_std(interval * limit.get()) - .map_err(|e| Error::internal_error(&e.to_string()))?; - let end_time = query.end_time.min(start_time + max_timeframe); let timeseries_list = self .timeseries_client @@ -259,6 +267,7 @@ impl super::Nexus { criteria, Some(Timestamp::Inclusive(start_time)), Some(Timestamp::Exclusive(end_time)), + Some(limit), ) .await .map_err(map_oximeter_err)?; @@ -272,26 +281,48 @@ impl super::Nexus { ))); } - Ok(if let Some(timeseries) = timeseries_list.into_iter().next() { - let next_start_time = end_time; + // If we received no data, exit early. + let timeseries = + if let Some(timeseries) = timeseries_list.into_iter().next() { + timeseries + } else { + return Ok(no_results()); + }; - dropshot::ResultsPage { - next_page: if next_start_time >= query.end_time { - None - } else { - Some(base64::encode_config( - serde_json::to_string(&ResourceMetrics { - start_time: next_start_time, - end_time: query.end_time, - })?, - base64::URL_SAFE, - )) - }, - items: timeseries.measurements, - } - } else { - no_results() - }) + // Otherwise, set up the next page to-be-queried before returning data. + // + // This is only relevant if we hit the limit of measurements when + // querying. + let next_page = + if timeseries.measurements.len() >= limit.get() as usize { + // The "next start time" is the latest value we saw from the query. + let next_start_time = timeseries + .measurements + .iter() + .map(|m| m.timestamp()) + .reduce(|latest, timestamp| { + if latest >= timestamp { + latest + } else { + timestamp + } + }) + // Unwrap safety: This case shouldn't be possible to hit; we + // only enter this conditional when we're at the (nonzero) + // pagination limit of measurements. + .expect("Expected measurements"); + Some(base64::encode_config( + serde_json::to_string(&ResourceMetrics { + start_time: next_start_time, + end_time: query.end_time, + })?, + base64::URL_SAFE, + )) + } else { + None + }; + + Ok(dropshot::ResultsPage { next_page, items: timeseries.measurements }) } // Internal helper to build an Oximeter client from its ID and address (common data between diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index e10d124c386..6c059894d8c 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -73,7 +73,6 @@ use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; use std::sync::Arc; -use std::time::Duration; use uuid::Uuid; type NexusApiDescription = ApiDescription>; @@ -1554,7 +1553,7 @@ async fn disk_metrics_list( let handler = async { let opctx = OpContext::for_external_api(&rqctx).await?; - // this ensures the user is authorized on Action::Read for this disk + // This ensures the user is authorized on Action::Read for this disk let disk = nexus .disk_fetch(&opctx, organization_name, project_name, disk_name) .await?; @@ -1563,7 +1562,6 @@ async fn disk_metrics_list( .select_timeseries( &format!("crucible_upstairs:{}", metric_name), &[&format!("upstairs_uuid=={}", upstairs_uuid)], - Duration::from_secs(10), query, limit, ) diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index e1d37f3320b..587134db67b 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -111,7 +111,7 @@ async fn test_oximeter_reregistration() { let timeseries_name = "integration_target:integration_metric"; let retrieve_timeseries = || async { match client - .select_timeseries_with(timeseries_name, &[], None, None) + .select_timeseries_with(timeseries_name, &[], None, None, None) .await { Ok(maybe_series) => { diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index 5410bc5a74a..478aa796150 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -287,6 +287,7 @@ async fn query( filters.as_slice(), start, end, + None, ) .await?; println!("{}", serde_json::to_string(×eries).unwrap()); diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 4f17076fe5d..4ea7ad6ac71 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -66,6 +66,7 @@ impl Client { criteria: &[&str], start_time: Option, end_time: Option, + limit: Option, ) -> Result, Error> { // Querying uses up to three queries to the database: // 1. Retrieve the schema @@ -84,9 +85,16 @@ impl Client { // If the timeseries doesn't exist, just return an empty Vec. None => return Ok(Vec::new()), }; - let mut query_builder = query::SelectQueryBuilder::new(&schema) + let query_builder = query::SelectQueryBuilder::new(&schema) .start_time(start_time) .end_time(end_time); + + let mut query_builder = if let Some(limit) = limit { + query_builder.limit(limit) + } else { + query_builder + }; + for criterion in criteria.iter() { query_builder = query_builder.filter_raw(criterion)?; } @@ -834,6 +842,7 @@ mod tests { &criteria.iter().map(|x| x.as_str()).collect::>(), None, None, + None, ) .await .unwrap(); @@ -998,6 +1007,7 @@ mod tests { &["id==0"], None, None, + None, ) .await .expect("Failed to select test samples"); @@ -1086,6 +1096,7 @@ mod tests { criteria, start_time, end_time, + None, ) .await .expect("Failed to select timeseries"); @@ -1342,6 +1353,7 @@ mod tests { &[], Some(query::Timestamp::Exclusive(start_time)), None, + None, ) .await .expect("Failed to select timeseries"); From 24a6488032ad19e8d32cd51498bb3d6330fc6f61 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 14 Jul 2022 13:00:24 -0400 Subject: [PATCH 08/11] Add test for limit --- oximeter/db/src/client.rs | 75 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 4ea7ad6ac71..1d37d555504 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -1371,6 +1371,81 @@ mod tests { db.cleanup().await.expect("Failed to cleanup database"); } + #[tokio::test] + async fn test_select_timeseries_with_limit() { + let (_, _, samples) = setup_select_test(); + let mut db = ClickHouseInstance::new(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + let log = Logger::root(slog::Discard, o!()); + let client = Client::new(address, &log); + client + .init_db() + .await + .expect("Failed to initialize timeseries database"); + client + .insert_samples(&samples) + .await + .expect("Failed to insert samples"); + let timeseries_name = "service:request_latency"; + + // First, query without a limit. We should see all the results. + let all_measurements = &client + .select_timeseries_with(timeseries_name, &[], None, None, None) + .await + .expect("Failed to select timeseries")[0] + .measurements; + + // Check some constraints on the number of measurements - we + // can change these, but these assumptions make the test simpler. + // + // For now, assume we can cleanly cut the number of measurements in + // half. + assert!(all_measurements.len() >= 2); + assert!(all_measurements.len() % 2 == 0); + + // Next, let's set a limit to half the results and query again. + let limit = + NonZeroU32::new(u32::try_from(all_measurements.len() / 2).unwrap()) + .unwrap(); + let timeseries = &client + .select_timeseries_with( + timeseries_name, + &[], + None, + None, + Some(limit), + ) + .await + .expect("Failed to select timeseries")[0]; + assert_eq!(timeseries.measurements.len() as u32, limit.get()); + + let get_last = |timeseries: &Timeseries| { + timeseries + .measurements + .iter() + .map(|m| m.timestamp()) + .reduce(|latest, ts| if latest >= ts { latest } else { ts }) + .unwrap() + }; + + // Get the other half of the results. + let timeseries = &client + .select_timeseries_with( + timeseries_name, + &[], + Some(query::Timestamp::Exclusive(get_last(timeseries))), + None, + Some(limit), + ) + .await + .expect("Failed to select timeseries")[0]; + assert_eq!(timeseries.measurements.len() as u32, limit.get()); + + db.cleanup().await.expect("Failed to cleanup database"); + } + #[tokio::test] async fn test_get_schema_no_new_values() { let (mut db, client, _) = setup_filter_testcase().await; From 5b0d56085cb37e29dea83c5d6c8ef3d9f119eddf Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 15 Jul 2022 14:54:45 -0400 Subject: [PATCH 09/11] Fix pagination, add tests --- nexus/src/app/oximeter.rs | 71 +++++++-------- nexus/tests/integration_tests/disks.rs | 115 ++++++++++++++++++------- sled-agent/src/sim/disk.rs | 2 +- 3 files changed, 112 insertions(+), 76 deletions(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index adb955465c1..5a5be0d68c0 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -242,13 +242,19 @@ impl super::Nexus { dropshot::ResultsPage { next_page: None, items: Vec::new() } } - let query = match query_params.page { - dropshot::WhichPage::First(query) => query, - dropshot::WhichPage::Next(query) => query, + let (start_time, end_time, query) = match query_params.page { + dropshot::WhichPage::First(query) => ( + Timestamp::Inclusive(query.start_time), + Timestamp::Exclusive(query.end_time), + query, + ), + dropshot::WhichPage::Next(query) => ( + Timestamp::Exclusive(query.start_time), + Timestamp::Exclusive(query.end_time), + query, + ), }; - let start_time = query.start_time; - let end_time = query.end_time; - if start_time >= end_time { + if query.start_time >= query.end_time { return Ok(no_results()); } @@ -265,8 +271,8 @@ impl super::Nexus { .select_timeseries_with( timeseries_name, criteria, - Some(Timestamp::Inclusive(start_time)), - Some(Timestamp::Exclusive(end_time)), + Some(start_time), + Some(end_time), Some(limit), ) .await @@ -282,47 +288,28 @@ impl super::Nexus { } // If we received no data, exit early. - let timeseries = + let mut timeseries = if let Some(timeseries) = timeseries_list.into_iter().next() { timeseries } else { return Ok(no_results()); }; - // Otherwise, set up the next page to-be-queried before returning data. - // - // This is only relevant if we hit the limit of measurements when - // querying. - let next_page = - if timeseries.measurements.len() >= limit.get() as usize { - // The "next start time" is the latest value we saw from the query. - let next_start_time = timeseries - .measurements - .iter() - .map(|m| m.timestamp()) - .reduce(|latest, timestamp| { - if latest >= timestamp { - latest - } else { - timestamp - } - }) - // Unwrap safety: This case shouldn't be possible to hit; we - // only enter this conditional when we're at the (nonzero) - // pagination limit of measurements. - .expect("Expected measurements"); - Some(base64::encode_config( - serde_json::to_string(&ResourceMetrics { - start_time: next_start_time, - end_time: query.end_time, - })?, - base64::URL_SAFE, - )) - } else { - None - }; + // Otherwise, sort the output result, and prepare the next page + // to-be-queried. + timeseries.measurements.sort_by_key(|m| m.timestamp()); - Ok(dropshot::ResultsPage { next_page, items: timeseries.measurements }) + Ok(dropshot::ResultsPage::new( + timeseries.measurements, + &query, + |last_measurement: &Measurement, query: &ResourceMetrics| { + ResourceMetrics { + start_time: last_measurement.timestamp(), + end_time: query.end_time, + } + }, + ) + .unwrap()) } // Internal helper to build an Oximeter client from its ID and address (common data between diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 9a64b8cf336..484807e0b61 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -12,6 +12,7 @@ use dropshot::ResultsPage; use http::method::Method; use http::StatusCode; use nexus_test_utils::http_testing::AuthnMode; +use nexus_test_utils::http_testing::Collection; use nexus_test_utils::http_testing::NexusRequest; use nexus_test_utils::http_testing::RequestBuilder; use nexus_test_utils::identity_eq; @@ -828,29 +829,33 @@ async fn test_disk_reject_total_size_not_divisible_by_min_disk_size( ); } +async fn create_instance_with_disk(client: &ClientTestContext) { + create_instance_with( + &client, + ORG_NAME, + PROJECT_NAME, + INSTANCE_NAME, + ¶ms::InstanceNetworkInterfaceAttachment::Default, + vec![params::InstanceDiskAttachment::Attach( + params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, + )], + ) + .await; +} + +const ALL_METRICS: [&'static str; 6] = + ["activated", "read", "write", "read_bytes", "write_bytes", "flush"]; + #[nexus_test] async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { let client = &cptestctx.external_client; DiskTest::new(&cptestctx).await; create_org_and_project(client).await; - let disks_url = get_disks_url(); - - // Create a disk. - let _new_disk = params::DiskCreate { - identity: IdentityMetadataCreateParams { - name: DISK_NAME.parse().unwrap(), - description: String::from("sells rainsticks"), - }, - disk_source: params::DiskSource::Blank { - block_size: params::BlockSize::try_from(512).unwrap(), - }, - size: ByteCount::from_gibibytes_u32(1), - }; - let _disk = create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; - let disk_url = format!("{}/{}", disks_url, DISK_NAME); + create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; // Whenever we grab this URL, get the last few seconds of metrics. let metric_url = |metric_type: &str| { + let disk_url = format!("{}/{}", get_disks_url(), DISK_NAME); format!( "{disk_url}/metrics/{metric_type}?start_time={:?}&end_time={:?}", Utc::now() - chrono::Duration::seconds(5), @@ -867,26 +872,14 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { assert!(measurements.items.is_empty()); // Create an instance, attach the disk to it. - let _instance = create_instance_with( - &client, - ORG_NAME, - PROJECT_NAME, - INSTANCE_NAME, - ¶ms::InstanceNetworkInterfaceAttachment::Default, - vec![params::InstanceDiskAttachment::Attach( - params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, - )], - ) - .await; - - // TODO: test that get as unprivileged fails? - // TODO: make the intervals smaller to avoid sleeping? - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + create_instance_with_disk(client).await; - let metrics = - ["activated", "read", "write", "read_bytes", "write_bytes", "flush"]; + // NOTE: This is a little gnarly, but we sleep long enough to allow some + // metrics to be populated. Noted that this relies on the producer server + // within the simulated disk using an interval smaller than a second. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; - for metric in &metrics { + for metric in &ALL_METRICS { let measurements: ResultsPage = objects_list_page_authz(client, &metric_url(metric)).await; assert!(!measurements.items.is_empty()); @@ -900,6 +893,62 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { } } +#[nexus_test] +async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + create_org_and_project(client).await; + create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + create_instance_with_disk(client).await; + + // NOTE: This is a little gnarly, but we sleep long enough to allow some + // metrics to be populated. Noted that this relies on the producer server + // within the simulated disk using an interval smaller than a second. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + for metric in &ALL_METRICS { + let collection_url = + format!("{}/{DISK_NAME}/metrics/{metric}", get_disks_url(),); + let initial_params = format!( + "start_time={:?}&end_time={:?}", + Utc::now() - chrono::Duration::seconds(5), + Utc::now(), + ); + let measurements_paginated: Collection = + NexusRequest::iter_collection_authn( + client, + &collection_url, + &initial_params, + Some(10), + ) + .await + .expect("failed to iterate over metrics"); + assert!(!measurements_paginated.all_items.is_empty()); + + let mut last_timestamp = None; + let mut last_value = None; + for item in &measurements_paginated.all_items { + let cumulative = match item.datum() { + Datum::CumulativeI64(c) => c, + _ => panic!("Unexpected datum type {:?}", item.datum()), + }; + assert!(cumulative.start_time() <= item.timestamp()); + + // Validate that the timestamps increase. + if let Some(last_ts) = last_timestamp { + assert!(last_ts < item.timestamp()); + } + // Validate that the values increase. + if let Some(last_value) = last_value { + assert!(last_value < cumulative.value()); + } + + last_timestamp = Some(item.timestamp()); + last_value = Some(cumulative.value()); + } + } +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser) diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index 02ba7b2a477..4c8fc55ac3e 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -168,7 +168,7 @@ impl SimDisk { id, address: producer_address, base_route: "/collect".to_string(), - interval: Duration::from_secs(1), + interval: Duration::from_millis(200), }; let config = oximeter_producer::Config { server_info, From 12a247d35c149c7baca75cdccf86b7d53d668799 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 21 Jul 2022 13:53:00 -0400 Subject: [PATCH 10/11] NotFound -> Empty Vec in Nexus --- nexus/src/app/oximeter.rs | 8 ++++++++ nexus/tests/integration_tests/oximeter.rs | 2 +- oximeter/db/src/client.rs | 14 +++++--------- oximeter/db/src/lib.rs | 5 ++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index f9394790267..be394f0c567 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -284,6 +284,14 @@ impl super::Nexus { Some(limit), ) .await + .or_else(|err| { + // If the timeseries name exists in the API, but not in Clickhouse, + // it might just not have been populated yet. + match err { + oximeter_db::Error::TimeseriesNotFound(_) => Ok(vec![]), + _ => Err(err), + } + }) .map_err(map_oximeter_err)?; if timeseries_list.len() > 1 { diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 587134db67b..8455277b834 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -121,7 +121,7 @@ async fn test_oximeter_reregistration() { Ok(maybe_series) } } - Err(oximeter_db::Error::QueryError(_)) => { + Err(oximeter_db::Error::TimeseriesNotFound(_)) => { Err(CondCheckError::NotYet) } Err(e) => Err(CondCheckError::from(e)), diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 3aa0fd1f0d4..d476b1189cf 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -80,11 +80,10 @@ impl Client { // to/from the database, as well as the cost of parsing them for each measurement, only to // promptly throw away almost all of them (except for the first). let timeseries_name = TimeseriesName::try_from(timeseries_name)?; - let schema = match self.schema_for_timeseries(×eries_name).await? { - Some(schema) => schema, - // If the timeseries doesn't exist, just return an empty Vec. - None => return Ok(Vec::new()), - }; + let schema = + self.schema_for_timeseries(×eries_name).await?.ok_or_else( + || Error::TimeseriesNotFound(format!("{timeseries_name}")), + )?; let query_builder = query::SelectQueryBuilder::new(&schema) .start_time(start_time) .end_time(end_time); @@ -127,10 +126,7 @@ impl Client { .schema_for_timeseries(¶ms.timeseries_name) .await? .ok_or_else(|| { - Error::QueryError(format!( - "No such timeseries: '{}'", - params.timeseries_name - )) + Error::TimeseriesNotFound(format!("{}", params.timeseries_name)) })?; // TODO: Handle inclusive/exclusive timestamps in general. // diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 27b50d65727..10890de04f9 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -43,9 +43,8 @@ pub enum Error { actual: BTreeMap, }, - /// An error querying or filtering data - #[error("Invalid query or data filter: {0}")] - QueryError(String), + #[error("Timeseries not found for: {0}")] + TimeseriesNotFound(String), #[error("The field comparison operation '{op}' is not valid for field '{field_name}' with type {field_type}")] InvalidSelectionOp { op: String, field_name: String, field_type: FieldType }, From 0e357531cfafa463bd6c342b48a890d1166a9735 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 26 Jul 2022 10:35:06 -0400 Subject: [PATCH 11/11] Fix merge --- nexus/src/app/oximeter.rs | 4 ++-- nexus/src/external_api/http_entrypoints.rs | 12 +++++++----- nexus/types/src/external_api/params.rs | 4 ---- nexus/types/src/external_api/views.rs | 2 -- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index be394f0c567..050e228b71f 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -9,8 +9,8 @@ use crate::context::OpContext; use crate::db; use crate::db::identity::Asset; use crate::external_api::params::ResourceMetrics; -use crate::external_api::params::ResourceMetricsPagination; use crate::internal_api::params::OximeterInfo; +use dropshot::PaginationParams; use internal_dns_client::{ multiclient::{ResolveError, Resolver}, names::{ServiceName, SRV}, @@ -235,7 +235,7 @@ impl super::Nexus { &self, timeseries_name: &str, criteria: &[&str], - query_params: ResourceMetricsPagination, + query_params: PaginationParams, limit: NonZeroU32, ) -> Result, Error> { #[inline] diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index a6c2a5c5708..dde6ca166bb 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -9,9 +9,9 @@ use super::views::IpPoolRange; use super::{ console_api, device_auth, params, views, views::{ - GlobalImage, IdentityProvider, Image, Measurement, Organization, - Project, Rack, Role, Silo, Sled, Snapshot, SshKey, User, UserBuiltin, - Vpc, VpcRouter, VpcSubnet, + GlobalImage, IdentityProvider, Image, Organization, Project, Rack, + Role, Silo, Sled, Snapshot, SshKey, User, UserBuiltin, Vpc, VpcRouter, + VpcSubnet, }, }; use crate::authz; @@ -1538,8 +1538,10 @@ pub enum DiskMetricName { async fn disk_metrics_list( rqctx: Arc>>, path_params: Path>, - query_params: Query, -) -> Result>, HttpError> { + query_params: Query< + PaginationParams, + >, +) -> Result>, HttpError> { let apictx = rqctx.context(); let nexus = &apictx.nexus; diff --git a/nexus/types/src/external_api/params.rs b/nexus/types/src/external_api/params.rs index dfff48cfad1..0f59257a616 100644 --- a/nexus/types/src/external_api/params.rs +++ b/nexus/types/src/external_api/params.rs @@ -6,7 +6,6 @@ use crate::external_api::shared; use chrono::{DateTime, Utc}; -use dropshot::PaginationParams; use omicron_common::api::external::{ ByteCount, IdentityMetadataCreateParams, IdentityMetadataUpdateParams, InstanceCpuCount, Ipv4Net, Ipv6Net, Name, @@ -848,9 +847,6 @@ pub struct ResourceMetrics { pub end_time: DateTime, } -pub type ResourceMetricsPagination = - PaginationParams; - #[cfg(test)] mod test { use super::*; diff --git a/nexus/types/src/external_api/views.rs b/nexus/types/src/external_api/views.rs index 9474b858be5..081cd459d55 100644 --- a/nexus/types/src/external_api/views.rs +++ b/nexus/types/src/external_api/views.rs @@ -19,8 +19,6 @@ use std::net::IpAddr; use std::net::SocketAddrV6; use uuid::Uuid; -pub use oximeter_db::Measurement; - // IDENTITY METADATA /// Identity-related metadata that's included in "asset" public API objects