Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "MPL-2.0"
clap.workspace = true
dropshot.workspace = true
internal-dns-client.workspace = true
futures.workspace = true
nexus-client.workspace = true
omicron-common.workspace = true
oximeter.workspace = true
Expand Down
65 changes: 51 additions & 14 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::{sync::mpsc, sync::Mutex, task::JoinHandle, time::interval};
use tokio::{
sync::mpsc, sync::oneshot, sync::Mutex, task::JoinHandle, time::interval,
};
use uuid::Uuid;

/// Errors collecting metric data
Expand All @@ -47,11 +49,16 @@ pub enum Error {
ResolveError(#[from] ResolveError),
}

type CollectionToken = oneshot::Sender<()>;

// Messages for controlling a collection task
#[derive(Debug, Clone)]
#[derive(Debug)]
enum CollectionMessage {
// Explicit request that the task collect data from its producer
Collect,
//
// Also sends a oneshot that is signalled once the task scrapes
// data from the Producer, and places it in the Clickhouse server.
Collect(CollectionToken),
// Request that the task update its interval and the socket address on which it collects data
// from its producer.
Update(ProducerEndpoint),
Expand All @@ -64,7 +71,8 @@ async fn perform_collection(
log: &Logger,
client: &reqwest::Client,
producer: &ProducerEndpoint,
outbox: &mpsc::Sender<ProducerResults>,
outbox: &mpsc::Sender<(Option<CollectionToken>, ProducerResults)>,
token: Option<CollectionToken>,
) {
info!(log, "collecting from producer");
let res = client
Expand All @@ -85,7 +93,7 @@ async fn perform_collection(
"collected {} total results",
results.len();
);
outbox.send(results).await.unwrap();
outbox.send((token, results)).await.unwrap();
}
Err(e) => {
warn!(
Expand Down Expand Up @@ -123,7 +131,7 @@ async fn collection_task(
log: Logger,
mut producer: ProducerEndpoint,
mut inbox: mpsc::Receiver<CollectionMessage>,
outbox: mpsc::Sender<ProducerResults>,
outbox: mpsc::Sender<(Option<CollectionToken>, ProducerResults)>,
) {
let client = reqwest::Client::new();
let mut collection_timer = interval(producer.interval);
Expand All @@ -146,9 +154,9 @@ async fn collection_task(
debug!(log, "collection task received shutdown request");
return;
},
Some(CollectionMessage::Collect) => {
Some(CollectionMessage::Collect(token)) => {
debug!(log, "collection task received explicit request to collect");
perform_collection(&log, &client, &producer, &outbox).await;
perform_collection(&log, &client, &producer, &outbox, Some(token)).await;
},
Some(CollectionMessage::Update(new_info)) => {
producer = new_info;
Expand All @@ -164,7 +172,7 @@ async fn collection_task(
}
}
_ = collection_timer.tick() => {
perform_collection(&log, &client, &producer, &outbox).await;
perform_collection(&log, &client, &producer, &outbox, None).await;
}
}
}
Expand All @@ -187,12 +195,13 @@ async fn results_sink(
client: Client,
batch_size: usize,
batch_interval: Duration,
mut rx: mpsc::Receiver<ProducerResults>,
mut rx: mpsc::Receiver<(Option<CollectionToken>, ProducerResults)>,
) {
let mut timer = interval(batch_interval);
timer.tick().await; // completes immediately
let mut batch = Vec::with_capacity(batch_size);
loop {
let mut collection_token = None;
let insert = tokio::select! {
_ = timer.tick() => {
if batch.is_empty() {
Expand All @@ -204,7 +213,7 @@ async fn results_sink(
}
results = rx.recv() => {
match results {
Some(results) => {
Some((token, results)) => {
let flattened_results = {
let mut flattened = Vec::with_capacity(results.len());
for inner_batch in results.into_iter() {
Expand All @@ -222,7 +231,13 @@ async fn results_sink(
flattened
};
batch.extend(flattened_results);
batch.len() >= batch_size

collection_token = token;
if collection_token.is_some() {
true
} else {
batch.len() >= batch_size
}
}
None => {
warn!(log, "result queue closed, exiting");
Expand All @@ -249,6 +264,10 @@ async fn results_sink(
// or otherwise handle an error here as well.
batch.clear();
}

if let Some(token) = collection_token {
let _ = token.send(());
}
}
}

Expand Down Expand Up @@ -276,7 +295,7 @@ pub struct OximeterAgent {
pub id: Uuid,
log: Logger,
// Handle to the TX-side of a channel for collecting results from the collection tasks
result_sender: mpsc::Sender<ProducerResults>,
result_sender: mpsc::Sender<(Option<CollectionToken>, ProducerResults)>,
// The actual tokio tasks running the collection on a timer.
collection_tasks: Arc<Mutex<BTreeMap<Uuid, CollectionTask>>>,
}
Expand Down Expand Up @@ -368,11 +387,29 @@ impl OximeterAgent {
Ok(())
}

/// Forces a collection from all producers.
///
/// Returns once all those values have been inserted into Clickhouse,
/// or an error occurs trying to perform the collection.
pub async fn force_collection(&self) {
let mut collection_oneshots = vec![];
let collection_tasks = self.collection_tasks.lock().await;
for task in collection_tasks.iter() {
task.1.inbox.send(CollectionMessage::Collect).await.unwrap();
let (tx, rx) = oneshot::channel();
// Scrape from each producer, into oximeter...
task.1.inbox.send(CollectionMessage::Collect(tx)).await.unwrap();
// ... and keep track of the token that indicates once the metric
// has made it into Clickhouse.
collection_oneshots.push(rx);
}
drop(collection_tasks);

// Only return once all producers finish processing the token we
// provided.
//
// NOTE: This can either mean that the collection completed
// successfully, or an error occurred in the collection pathway.
futures::future::join_all(collection_oneshots).await;
}
}

Expand Down