From 72515fc466ca7d9cc5ed06dec8c0395410af7984 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 20 Jan 2023 02:21:49 -0500 Subject: [PATCH 1/3] Add collection tokens into oximeter --- Cargo.lock | 1 + oximeter/collector/Cargo.toml | 1 + oximeter/collector/src/lib.rs | 63 +++++++++++++++++++++++++++-------- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4fc3794b40..39b2673a7f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3989,6 +3989,7 @@ dependencies = [ "clap 4.1.1", "dropshot", "expectorate", + "futures", "internal-dns-client", "nexus-client", "omicron-common", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 0f08cda7a43..7dbb061b673 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -9,6 +9,7 @@ license = "MPL-2.0" clap.workspace = true dropshot.workspace = true internal-dns-client.workspace = true +futures.workspace = true nexus-client.workspace = true omicron-common.workspace = true oximeter.workspace = true diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 57874a38af0..4a54b2d3a7b 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -28,7 +28,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::{sync::mpsc, sync::Mutex, task::JoinHandle, time::interval}; +use tokio::{sync::mpsc, sync::oneshot, sync::Mutex, task::JoinHandle, time::interval}; use uuid::Uuid; /// Errors collecting metric data @@ -47,11 +47,16 @@ pub enum Error { ResolveError(#[from] ResolveError), } +type CollectionToken = oneshot::Sender<()>; + // Messages for controlling a collection task -#[derive(Debug, Clone)] +#[derive(Debug)] enum CollectionMessage { // Explicit request that the task collect data from its producer - Collect, + // + // Also sends a oneshot that is signalled once the task scrapes + // data from the Producer, and places it in the Oximeter server. + Collect(CollectionToken), // Request that the task update its interval and the socket address on which it collects data // from its producer. Update(ProducerEndpoint), @@ -64,7 +69,8 @@ async fn perform_collection( log: &Logger, client: &reqwest::Client, producer: &ProducerEndpoint, - outbox: &mpsc::Sender, + outbox: &mpsc::Sender<(Option, ProducerResults)>, + token: Option, ) { info!(log, "collecting from producer"); let res = client @@ -85,7 +91,7 @@ async fn perform_collection( "collected {} total results", results.len(); ); - outbox.send(results).await.unwrap(); + outbox.send((token, results)).await.unwrap(); } Err(e) => { warn!( @@ -123,7 +129,7 @@ async fn collection_task( log: Logger, mut producer: ProducerEndpoint, mut inbox: mpsc::Receiver, - outbox: mpsc::Sender, + outbox: mpsc::Sender<(Option, ProducerResults)>, ) { let client = reqwest::Client::new(); let mut collection_timer = interval(producer.interval); @@ -146,9 +152,9 @@ async fn collection_task( debug!(log, "collection task received shutdown request"); return; }, - Some(CollectionMessage::Collect) => { + Some(CollectionMessage::Collect(token)) => { debug!(log, "collection task received explicit request to collect"); - perform_collection(&log, &client, &producer, &outbox).await; + perform_collection(&log, &client, &producer, &outbox, Some(token)).await; }, Some(CollectionMessage::Update(new_info)) => { producer = new_info; @@ -164,7 +170,7 @@ async fn collection_task( } } _ = collection_timer.tick() => { - perform_collection(&log, &client, &producer, &outbox).await; + perform_collection(&log, &client, &producer, &outbox, None).await; } } } @@ -187,12 +193,13 @@ async fn results_sink( client: Client, batch_size: usize, batch_interval: Duration, - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver<(Option, ProducerResults)>, ) { let mut timer = interval(batch_interval); timer.tick().await; // completes immediately let mut batch = Vec::with_capacity(batch_size); loop { + let mut collection_token = None; let insert = tokio::select! { _ = timer.tick() => { if batch.is_empty() { @@ -204,7 +211,7 @@ async fn results_sink( } results = rx.recv() => { match results { - Some(results) => { + Some((token, results)) => { let flattened_results = { let mut flattened = Vec::with_capacity(results.len()); for inner_batch in results.into_iter() { @@ -222,7 +229,13 @@ async fn results_sink( flattened }; batch.extend(flattened_results); - batch.len() >= batch_size + + collection_token = token; + if collection_token.is_some() { + true + } else { + batch.len() >= batch_size + } } None => { warn!(log, "result queue closed, exiting"); @@ -249,6 +262,10 @@ async fn results_sink( // or otherwise handle an error here as well. batch.clear(); } + + if let Some(token) = collection_token { + let _ = token.send(()); + } } } @@ -276,7 +293,7 @@ pub struct OximeterAgent { pub id: Uuid, log: Logger, // Handle to the TX-side of a channel for collecting results from the collection tasks - result_sender: mpsc::Sender, + result_sender: mpsc::Sender<(Option, ProducerResults)>, // The actual tokio tasks running the collection on a timer. collection_tasks: Arc>>, } @@ -368,11 +385,29 @@ impl OximeterAgent { Ok(()) } + /// Forces a collection from all producers. + /// + /// Returns once all those values have been inserted into Clickhouse, + /// or an error occurs trying to perform the collection. pub async fn force_collection(&self) { + let mut collection_oneshots = vec![]; let collection_tasks = self.collection_tasks.lock().await; for task in collection_tasks.iter() { - task.1.inbox.send(CollectionMessage::Collect).await.unwrap(); + let (tx, rx) = oneshot::channel(); + // Scrape from each producer, into oximeter... + task.1.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); + // ... and keep track of the token that indicates once the metric + // has made it into Clickhouse. + collection_oneshots.push(rx); } + drop(collection_tasks); + + // Only return once all producers finish processing the token we + // provided. + // + // NOTE: This can either mean that the colleciton completed + // successfully, or an error occurred in the collection pathway. + futures::future::join_all(collection_oneshots).await; } } From 3cf7a60f0927c766fe580df82088a0e1fbbbf27b Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 20 Jan 2023 02:22:05 -0500 Subject: [PATCH 2/3] fmt --- oximeter/collector/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 4a54b2d3a7b..935059cafee 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -28,7 +28,9 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::{sync::mpsc, sync::oneshot, sync::Mutex, task::JoinHandle, time::interval}; +use tokio::{ + sync::mpsc, sync::oneshot, sync::Mutex, task::JoinHandle, time::interval, +}; use uuid::Uuid; /// Errors collecting metric data From 70bc60b386fb0cc0440a45bdce6529fc3ad6ce5b Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 20 Jan 2023 13:32:10 -0500 Subject: [PATCH 3/3] feedback --- oximeter/collector/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 935059cafee..bcb9908c99a 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -57,7 +57,7 @@ enum CollectionMessage { // Explicit request that the task collect data from its producer // // Also sends a oneshot that is signalled once the task scrapes - // data from the Producer, and places it in the Oximeter server. + // data from the Producer, and places it in the Clickhouse server. Collect(CollectionToken), // Request that the task update its interval and the socket address on which it collects data // from its producer. @@ -407,7 +407,7 @@ impl OximeterAgent { // Only return once all producers finish processing the token we // provided. // - // NOTE: This can either mean that the colleciton completed + // NOTE: This can either mean that the collection completed // successfully, or an error occurred in the collection pathway. futures::future::join_all(collection_oneshots).await; }