From d5a43d89d9afd899c864009ea97bee30889d6e8d Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Fri, 22 Mar 2024 18:52:06 +0100 Subject: [PATCH 01/13] proxy: upload consumption events to S3 --- proxy/src/bin/proxy.rs | 27 +- proxy/src/config.rs | 17 ++ proxy/src/context/parquet.rs | 16 +- proxy/src/proxy/passthrough.rs | 2 +- proxy/src/serverless/sql_over_http.rs | 1 + proxy/src/usage_metrics.rs | 363 +++++++++++++++++++++----- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- 9 files changed, 355 insertions(+), 77 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index d38439c2a0d6..1884f4a6e992 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -10,6 +10,7 @@ use proxy::auth; use proxy::auth::backend::MaybeOwned; use proxy::cancellation::CancelMap; use proxy::cancellation::CancellationHandler; +use proxy::config::remote_storage_from_toml; use proxy::config::AuthenticationConfig; use proxy::config::CacheOptions; use proxy::config::HttpConfig; @@ -184,6 +185,18 @@ struct ProxyCliArgs { #[clap(flatten)] parquet_upload: ParquetUploadArgs, + + /// interval for backup metric collection + #[clap(long, default_value = "10m", value_parser = humantime::parse_duration)] + metric_backup_collection_interval: std::time::Duration, + /// remote storage configuration for backup metric collection + /// Encoded as toml (same format as pageservers), eg + /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}` + #[clap(long, default_value = "{}")] + metric_backup_collection_remote_storage: String, + /// chunk size for backup metric collection + #[clap(long, default_value = "8192")] + metric_backup_collection_chunk_size: usize, } #[derive(clap::Args, Clone, Copy, Debug)] @@ -365,13 +378,17 @@ async fn main() -> anyhow::Result<()> { // maintenance tasks. these never return unless there's an error let mut maintenance_tasks = JoinSet::new(); - maintenance_tasks.spawn(proxy::handle_signals(cancellation_token)); + maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone())); maintenance_tasks.spawn(http::health_server::task_main(http_listener)); maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener)); if let Some(metrics_config) = &config.metric_collection { maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); } + client_tasks.spawn(usage_metrics::task_backup( + &config.backup_metric_collection, + cancellation_token, + )); if let auth::BackendType::Console(api, _) = &config.auth_backend { if let proxy::console::provider::ConsoleBackend::Console(api) = &**api { @@ -442,6 +459,13 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { and metric-collection-interval must be specified" ), }; + let backup_metric_collection = config::MetricBackupCollectionConfig { + interval: args.metric_backup_collection_interval, + remote_storage_config: remote_storage_from_toml( + &args.metric_backup_collection_remote_storage, + )?, + chunk_size: args.metric_backup_collection_chunk_size, + }; let rate_limiter_config = RateLimiterConfig { disable: args.disable_dynamic_rate_limiter, algorithm: args.rate_limit_algorithm, @@ -531,6 +555,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { handshake_timeout: args.handshake_timeout, region: args.region.clone(), aws_region: args.aws_region.clone(), + backup_metric_collection, })); Ok(config) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 45f8d7614439..69490ffc9bc6 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,6 +1,7 @@ use crate::{auth, rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions}; use anyhow::{bail, ensure, Context, Ok}; use itertools::Itertools; +use remote_storage::RemoteStorageConfig; use rustls::{ crypto::ring::sign, pki_types::{CertificateDer, PrivateKeyDer}, @@ -19,6 +20,7 @@ pub struct ProxyConfig { pub tls_config: Option, pub auth_backend: auth::BackendType<'static, (), ()>, pub metric_collection: Option, + pub backup_metric_collection: MetricBackupCollectionConfig, pub allow_self_signed_compute: bool, pub http_config: HttpConfig, pub authentication_config: AuthenticationConfig, @@ -305,6 +307,21 @@ impl CertResolver { } } +#[derive(Debug)] +pub struct MetricBackupCollectionConfig { + pub interval: Duration, + pub remote_storage_config: OptRemoteStorageConfig, + pub chunk_size: usize, +} + +/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get +/// runtime type errors from the value parser we use. +pub type OptRemoteStorageConfig = Option; + +pub fn remote_storage_from_toml(s: &str) -> anyhow::Result { + RemoteStorageConfig::from_toml(&s.parse()?) +} + /// Helper for cmdline cache options parsing. #[derive(Debug)] pub struct CacheOptions { diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index a2be1c41861f..04e5695255a4 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -13,12 +13,14 @@ use parquet::{ }, record::RecordWriter, }; -use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel}; +use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use tokio::{sync::mpsc, time}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, Span}; use utils::backoff; +use crate::config::{remote_storage_from_toml, OptRemoteStorageConfig}; + use super::{RequestMonitoring, LOG_CHAN}; #[derive(clap::Args, Clone, Debug)] @@ -50,21 +52,13 @@ pub struct ParquetUploadArgs { parquet_upload_compression: Compression, } -/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get -/// runtime type errors from the value parser we use. -type OptRemoteStorageConfig = Option; - -fn remote_storage_from_toml(s: &str) -> anyhow::Result { - RemoteStorageConfig::from_toml(&s.parse()?) -} - // Occasional network issues and such can cause remote operations to fail, and // that's expected. If a upload fails, we log it at info-level, and retry. // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN // level instead, as repeated failures can mean a more serious problem. If it // fails more than FAILED_UPLOAD_RETRIES times, we give up -pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3; -pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10; +pub const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3; +pub const FAILED_UPLOAD_MAX_RETRIES: u32 = 10; // the parquet crate leaves a lot to be desired... // what follows is an attempt to write parquet files with minimal allocs. diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index f6d43143915a..cf53c6e673bd 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -4,7 +4,7 @@ use crate::{ console::messages::MetricsAuxInfo, metrics::NUM_BYTES_PROXIED_COUNTER, stream::Stream, - usage_metrics::{Ids, USAGE_METRICS}, + usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}, }; use metrics::IntCounterPairGuard; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index f675375ff1d6..d5f2fea48717 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -44,6 +44,7 @@ use crate::metrics::HTTP_CONTENT_LENGTH; use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE; use crate::proxy::NeonOptions; use crate::serverless::backend::HttpConnError; +use crate::usage_metrics::MetricCounterRecorder; use crate::DbName; use crate::RoleName; diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index d75aedf89b07..4c25b1325b85 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -1,20 +1,32 @@ //! Periodically collect proxy consumption metrics //! and push them to a HTTP endpoint. -use crate::{config::MetricCollectionConfig, http, BranchId, EndpointId}; -use chrono::{DateTime, Utc}; +use crate::{ + config::{MetricBackupCollectionConfig, MetricCollectionConfig}, + context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, + http, BranchId, EndpointId, +}; +use anyhow::Context; +use bytes::Bytes; +use chrono::{DateTime, Datelike, Timelike, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; use dashmap::{mapref::entry::Entry, DashMap}; +use futures::future::select; use once_cell::sync::Lazy; +use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use serde::{Deserialize, Serialize}; use std::{ convert::Infallible, + pin::pin, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, time::Duration, }; +use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, trace}; +use utils::backoff; +use uuid::{NoContext, Timestamp}; const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client"; @@ -33,19 +45,93 @@ pub struct Ids { pub branch_id: BranchId, } +pub trait MetricCounterRecorder { + /// Record that some bytes were sent from the proxy to the client + fn record_egress(&self, bytes: u64); + /// Record that some connections were opened + fn record_connection(&self, count: usize); +} + +trait MetricCounterReporter { + fn get_metrics(&self) -> (u64, usize); + fn move_metrics(&self) -> (u64, usize); +} + +#[derive(Debug)] +struct MetricBackupCounter { + transmitted: AtomicU64, + opened_connections: AtomicUsize, +} + +impl MetricCounterRecorder for MetricBackupCounter { + fn record_egress(&self, bytes: u64) { + self.transmitted.fetch_add(bytes, Ordering::AcqRel); + } + + fn record_connection(&self, count: usize) { + self.opened_connections.fetch_add(count, Ordering::AcqRel); + } +} + +impl MetricCounterReporter for MetricBackupCounter { + fn get_metrics(&self) -> (u64, usize) { + ( + self.transmitted.load(Ordering::Acquire), + self.opened_connections.load(Ordering::Acquire), + ) + } + fn move_metrics(&self) -> (u64, usize) { + ( + self.transmitted.swap(0, Ordering::AcqRel), + self.opened_connections.swap(0, Ordering::AcqRel), + ) + } +} + #[derive(Debug)] pub struct MetricCounter { transmitted: AtomicU64, opened_connections: AtomicUsize, + backup: Arc, } -impl MetricCounter { +impl MetricCounterRecorder for MetricCounter { /// Record that some bytes were sent from the proxy to the client - pub fn record_egress(&self, bytes: u64) { + fn record_egress(&self, bytes: u64) { self.transmitted.fetch_add(bytes, Ordering::AcqRel); + self.backup.record_egress(bytes); } + /// Record that some connections were opened + fn record_connection(&self, count: usize) { + self.opened_connections.fetch_add(count, Ordering::AcqRel); + self.backup.record_connection(count); + } +} + +impl MetricCounterReporter for MetricCounter { + fn get_metrics(&self) -> (u64, usize) { + ( + self.transmitted.load(Ordering::Acquire), + self.opened_connections.load(Ordering::Acquire), + ) + } + fn move_metrics(&self) -> (u64, usize) { + ( + self.transmitted.swap(0, Ordering::AcqRel), + self.opened_connections.swap(0, Ordering::AcqRel), + ) + } +} + +trait Clearable { /// extract the value that should be reported + fn should_report(self: &Arc) -> Option; + /// Determine whether the counter should be cleared from the global map. + fn should_clear(self: &mut Arc) -> bool; +} + +impl Clearable for C { fn should_report(self: &Arc) -> Option { // heuristic to see if the branch is still open // if a clone happens while we are observing, the heuristic will be incorrect. @@ -54,13 +140,12 @@ impl MetricCounter { // However, for the strong count to be 1 it must have occured that at one instant // all the endpoints were closed, so missing a report because the endpoints are closed is valid. let is_open = Arc::strong_count(self) > 1; - let opened = self.opened_connections.swap(0, Ordering::AcqRel); // update cached metrics eagerly, even if they can't get sent // (to avoid sending the same metrics twice) // see the relevant discussion on why to do so even if the status is not success: // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956 - let value = self.transmitted.swap(0, Ordering::AcqRel); + let (value, opened) = self.move_metrics(); // Our only requirement is that we report in every interval if there was an open connection // if there were no opened connections since, then we don't need to report @@ -70,15 +155,12 @@ impl MetricCounter { Some(value) } } - - /// Determine whether the counter should be cleared from the global map. fn should_clear(self: &mut Arc) -> bool { // we can't clear this entry if it's acquired elsewhere let Some(counter) = Arc::get_mut(self) else { return false; }; - let opened = *counter.opened_connections.get_mut(); - let value = *counter.transmitted.get_mut(); + let (opened, value) = counter.get_metrics(); // clear if there's no data to report value == 0 && opened == 0 } @@ -90,11 +172,26 @@ type FastHasher = std::hash::BuildHasherDefault; #[derive(Default)] pub struct Metrics { endpoints: DashMap, FastHasher>, + backup_endpoints: DashMap, FastHasher>, } impl Metrics { /// Register a new byte metrics counter for this endpoint pub fn register(&self, ids: Ids) -> Arc { + let backup = if let Some(entry) = self.backup_endpoints.get(&ids) { + entry.clone() + } else { + self.backup_endpoints + .entry(ids.clone()) + .or_insert_with(|| { + Arc::new(MetricBackupCounter { + transmitted: AtomicU64::new(0), + opened_connections: AtomicUsize::new(0), + }) + }) + .clone() + }; + let entry = if let Some(entry) = self.endpoints.get(&ids) { entry.clone() } else { @@ -104,12 +201,13 @@ impl Metrics { Arc::new(MetricCounter { transmitted: AtomicU64::new(0), opened_connections: AtomicUsize::new(0), + backup: backup.clone(), }) }) .clone() }; - entry.opened_connections.fetch_add(1, Ordering::AcqRel); + entry.record_connection(1); entry } } @@ -132,7 +230,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result anyhow::Result( + endpoints: &DashMap, FastHasher>, +) -> Vec<(Ids, u64)> { + let mut metrics_to_clear = Vec::new(); + + let metrics_to_send: Vec<(Ids, u64)> = endpoints + .iter() + .filter_map(|counter| { + let key = counter.key().clone(); + let Some(value) = counter.should_report() else { + metrics_to_clear.push(key); + return None; + }; + Some((key, value)) + }) + .collect(); + + for metric in metrics_to_clear { + match endpoints.entry(metric) { + Entry::Occupied(mut counter) => { + if counter.get_mut().should_clear() { + counter.remove_entry(); + } + } + Entry::Vacant(_) => {} + } + } + metrics_to_send +} + +fn create_event_chunks<'a>( + metrics_to_send: &'a [(Ids, u64)], + hostname: &'a str, + prev: DateTime, + now: DateTime, + chunk_size: usize, +) -> impl Iterator>> + 'a { + // Split into chunks of 1000 metrics to avoid exceeding the max request size + metrics_to_send + .chunks(chunk_size) + .map(move |chunk| EventChunk { + events: chunk + .iter() + .map(|(ids, value)| Event { + kind: EventType::Incremental { + start_time: prev, + stop_time: now, + }, + metric: PROXY_IO_BYTES_PER_CLIENT, + idempotency_key: idempotency_key(hostname), + value: *value, + extra: ids.clone(), + }) + .collect(), + }) +} + #[instrument(skip_all)] async fn collect_metrics_iteration( - metrics: &Metrics, + endpoints: &DashMap, FastHasher>, client: &http::ClientWithMiddleware, metric_collection_endpoint: &reqwest::Url, hostname: &str, @@ -158,48 +313,17 @@ async fn collect_metrics_iteration( metric_collection_endpoint ); - let mut metrics_to_clear = Vec::new(); - - let metrics_to_send: Vec<(Ids, u64)> = metrics - .endpoints - .iter() - .filter_map(|counter| { - let key = counter.key().clone(); - let Some(value) = counter.should_report() else { - metrics_to_clear.push(key); - return None; - }; - Some((key, value)) - }) - .collect(); + let metrics_to_send = collect_and_clear_metrics(endpoints); if metrics_to_send.is_empty() { trace!("no new metrics to send"); } // Send metrics. - // Split into chunks of 1000 metrics to avoid exceeding the max request size - for chunk in metrics_to_send.chunks(CHUNK_SIZE) { - let events = chunk - .iter() - .map(|(ids, value)| Event { - kind: EventType::Incremental { - start_time: prev, - stop_time: now, - }, - metric: PROXY_IO_BYTES_PER_CLIENT, - idempotency_key: idempotency_key(hostname), - value: *value, - extra: Ids { - endpoint_id: ids.endpoint_id.clone(), - branch_id: ids.branch_id.clone(), - }, - }) - .collect(); - + for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) { let res = client .post(metric_collection_endpoint.clone()) - .json(&EventChunk { events }) + .json(&chunk) .send() .await; @@ -213,25 +337,141 @@ async fn collect_metrics_iteration( if !res.status().is_success() { error!("metrics endpoint refused the sent metrics: {:?}", res); - for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) { + for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) { // Report if the metric value is suspiciously large error!("potentially abnormal metric value: {:?}", metric); } } } +} - for metric in metrics_to_clear { - match metrics.endpoints.entry(metric) { - Entry::Occupied(mut counter) => { - if counter.get_mut().should_clear() { - counter.remove_entry(); - } +pub async fn task_backup( + backup_config: &MetricBackupCollectionConfig, + cancellation_token: CancellationToken, +) -> anyhow::Result<()> { + info!("metrics backup config: {backup_config:?}"); + scopeguard::defer! { + info!("metrics backup has shut down"); + } + // Even if the remote storage is not configured, we still want to clear the metrics. + let storage = backup_config + .remote_storage_config + .as_ref() + .map(|config| GenericRemoteStorage::from_config(config).context("remote storage init")) + .transpose()?; + let mut ticker = tokio::time::interval(backup_config.interval); + let mut prev = Utc::now(); + let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned(); + loop { + select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await; + let now = Utc::now(); + collect_metrics_backup_iteration( + &USAGE_METRICS.backup_endpoints, + &storage, + &hostname, + prev, + now, + backup_config.chunk_size, + ) + .await; + + prev = now; + if cancellation_token.is_cancelled() { + info!("metrics backup has been cancelled"); + break; + } + } + Ok(()) +} + +#[instrument(skip_all)] +async fn collect_metrics_backup_iteration( + endpoints: &DashMap, FastHasher>, + storage: &Option, + hostname: &str, + prev: DateTime, + now: DateTime, + chunk_size: usize, +) { + let year = now.year(); + let month = now.month(); + let day = now.day(); + let hour = now.hour(); + let minute = now.minute(); + let cancel = CancellationToken::new(); + + info!("starting collect_metrics_backup_iteration"); + + let metrics_to_send = collect_and_clear_metrics(endpoints); + + if metrics_to_send.is_empty() { + trace!("no new metrics to send"); + } + + // Send metrics. + for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) { + let real_now = Utc::now(); + let id = uuid::Uuid::new_v7(Timestamp::from_unix( + NoContext, + real_now.second().into(), + real_now.nanosecond(), + )); + let path = format!("{year:04}/{month:02}/{day:02}/{hour:02}:{minute:02}/events-{id}.json"); + let remote_path = match RemotePath::from_string(&path) { + Ok(remote_path) => remote_path, + Err(e) => { + error!("failed to create remote path from str {path}: {:?}", e); + continue; } - Entry::Vacant(_) => {} + }; + + let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await; + + if let Err(e) = res { + error!( + "failed to upload consumption events to remote storage: {:?}", + e + ); } } } +async fn upload_events_chunk( + storage: &Option, + chunk: EventChunk<'_, Event>, + remote_path: &RemotePath, + cancel: &CancellationToken, +) -> anyhow::Result<()> { + let storage = match storage { + Some(storage) => storage, + None => { + error!("no remote storage configured"); + return Ok(()); + } + }; + let data: Bytes = serde_json::to_vec(&chunk) + .context("serialize metrics")? + .into(); + backoff::retry( + || async { + let stream = futures::stream::once(futures::future::ready(Ok(data.clone()))); + storage + .upload(stream, data.len(), remote_path, None, cancel) + .await + }, + TimeoutOrCancel::caused_by_cancel, + FAILED_UPLOAD_WARN_THRESHOLD, + FAILED_UPLOAD_MAX_RETRIES, + "request_data_upload", + cancel, + ) + .await + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) + .and_then(|x| x) + .context("request_data_upload")?; + Ok(()) +} + #[cfg(test)] mod tests { use std::{ @@ -248,7 +488,7 @@ mod tests { }; use url::Url; - use super::{collect_metrics_iteration, Ids, Metrics}; + use super::*; use crate::{http, rate_limiter::RateLimiterConfig}; #[tokio::test] @@ -284,18 +524,19 @@ mod tests { let now = Utc::now(); // no counters have been registered - collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; let r = std::mem::take(&mut *reports2.lock().unwrap()); assert!(r.is_empty()); - // register a new counter + // register a new counteruse itertools::Itertools; + let counter = metrics.register(Ids { endpoint_id: "e1".into(), branch_id: "b1".into(), }); // the counter should be observed despite 0 egress - collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; let r = std::mem::take(&mut *reports2.lock().unwrap()); assert_eq!(r.len(), 1); assert_eq!(r[0].events.len(), 1); @@ -305,7 +546,7 @@ mod tests { counter.record_egress(1); // egress should be observered - collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; let r = std::mem::take(&mut *reports2.lock().unwrap()); assert_eq!(r.len(), 1); assert_eq!(r[0].events.len(), 1); @@ -315,7 +556,7 @@ mod tests { drop(counter); // we do not observe the counter - collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; let r = std::mem::take(&mut *reports2.lock().unwrap()); assert!(r.is_empty()); diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index c5d920a7d9e9..3b09894ddb88 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit c5d920a7d9e9cbeb62b6c46f292db08162763f68 +Subproject commit 3b09894ddb8825b50c963942059eab1a2a0b0a89 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index af9ab67bc80a..80cef885add1 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit af9ab67bc80afd94e4eb11c34f50c0a29c37eb1b +Subproject commit 80cef885add1af6741aa31944c7d2c84d8f9098f diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 111e82c45d79..90078947229a 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 111e82c45d79728fdd3a4816605378c3cc5cfe84 +Subproject commit 90078947229aa7f9ac5f7ed4527b2c7386d5332b From 6ce0c63c5b65306b3d27fc3ef789399faa3d36b8 Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Fri, 22 Mar 2024 18:55:18 +0100 Subject: [PATCH 02/13] Merge submodule --- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 3b09894ddb88..c5d920a7d9e9 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 3b09894ddb8825b50c963942059eab1a2a0b0a89 +Subproject commit c5d920a7d9e9cbeb62b6c46f292db08162763f68 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 80cef885add1..af9ab67bc80a 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 80cef885add1af6741aa31944c7d2c84d8f9098f +Subproject commit af9ab67bc80afd94e4eb11c34f50c0a29c37eb1b diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 90078947229a..111e82c45d79 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 90078947229aa7f9ac5f7ed4527b2c7386d5332b +Subproject commit 111e82c45d79728fdd3a4816605378c3cc5cfe84 From 6372d58639dc0175f8e5875fca69f991a2671fd4 Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Fri, 22 Mar 2024 18:58:46 +0100 Subject: [PATCH 03/13] Fix comment --- proxy/src/usage_metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 4c25b1325b85..159764c4116e 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -528,7 +528,7 @@ mod tests { let r = std::mem::take(&mut *reports2.lock().unwrap()); assert!(r.is_empty()); - // register a new counteruse itertools::Itertools; + // register a new counter let counter = metrics.register(Ids { endpoint_id: "e1".into(), From 3b7abc563400c66d82188a95e93fa8ac750ccafc Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 10:47:29 +0100 Subject: [PATCH 04/13] Added test --- proxy/src/usage_metrics.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 159764c4116e..7b73432336e9 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -562,5 +562,8 @@ mod tests { // counter is unregistered assert!(metrics.endpoints.is_empty()); + collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000).await; + // backup counter is unregistered + assert!(metrics.backup_endpoints.is_empty()); } } From 0efdad3e88e3d14c181eabc9b7ff7a5b5832c74f Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 10:49:39 +0100 Subject: [PATCH 05/13] Fmt --- proxy/src/usage_metrics.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 7b73432336e9..07a163c4fbaf 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -562,7 +562,8 @@ mod tests { // counter is unregistered assert!(metrics.endpoints.is_empty()); - collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000).await; + collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000) + .await; // backup counter is unregistered assert!(metrics.backup_endpoints.is_empty()); } From 029443edebc10acfa7b2010822480acc816c72fe Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 11:03:51 +0100 Subject: [PATCH 06/13] Change default size --- proxy/src/bin/proxy.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 1884f4a6e992..e378b3658f7e 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -195,7 +195,8 @@ struct ProxyCliArgs { #[clap(long, default_value = "{}")] metric_backup_collection_remote_storage: String, /// chunk size for backup metric collection - #[clap(long, default_value = "8192")] + /// Size of each event is no more than 400 bytes, so 2**18 is about 12.5MB. + #[clap(long, default_value = "262144")] metric_backup_collection_chunk_size: usize, } From c7cdcae5ffbdb9539fab9147b6abd5ad9253f36a Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 12:33:52 +0100 Subject: [PATCH 07/13] Added gzip compression --- Cargo.lock | 1 + proxy/Cargo.toml | 1 + proxy/src/usage_metrics.rs | 12 ++++++++---- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6409c79ef95c..793fbd59bb63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4222,6 +4222,7 @@ name = "proxy" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "async-trait", "aws-config", "aws-sdk-iam", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 57a2736d5b32..b327890be29e 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] anyhow.workspace = true +async-compression.workspace = true async-trait.workspace = true aws-config.workspace = true aws-sdk-iam.workspace = true diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 07a163c4fbaf..40240e194e5e 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -6,6 +6,7 @@ use crate::{ http, BranchId, EndpointId, }; use anyhow::Context; +use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; @@ -23,6 +24,7 @@ use std::{ }, time::Duration, }; +use tokio::io::AsyncWriteExt; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, trace}; use utils::backoff; @@ -449,12 +451,14 @@ async fn upload_events_chunk( return Ok(()); } }; - let data: Bytes = serde_json::to_vec(&chunk) - .context("serialize metrics")? - .into(); + let data = serde_json::to_vec(&chunk).context("serialize metrics")?; + let mut encoder = GzipEncoder::new(Vec::new()); + encoder.write_all(&data).await.context("compress metrics")?; + encoder.shutdown().await.context("compress metrics")?; + let compressed_data: Bytes = encoder.get_ref().clone().into(); backoff::retry( || async { - let stream = futures::stream::once(futures::future::ready(Ok(data.clone()))); + let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone()))); storage .upload(stream, data.len(), remote_path, None, cancel) .await From 8c5b4805a17edef7b6315144736baff6e34baca4 Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 12:50:41 +0100 Subject: [PATCH 08/13] Review --- proxy/src/bin/proxy.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index e378b3658f7e..c693f8138f05 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -384,12 +384,13 @@ async fn main() -> anyhow::Result<()> { maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener)); if let Some(metrics_config) = &config.metric_collection { + // TODO: Add gc regardles of the metric collection being enabled. maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); + client_tasks.spawn(usage_metrics::task_backup( + &config.backup_metric_collection, + cancellation_token, + )); } - client_tasks.spawn(usage_metrics::task_backup( - &config.backup_metric_collection, - cancellation_token, - )); if let auth::BackendType::Console(api, _) = &config.auth_backend { if let proxy::console::provider::ConsoleBackend::Console(api) = &**api { From 04f2efbf4b710273ba343f2517034c12174b32bf Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 12:52:56 +0100 Subject: [PATCH 09/13] Config --- proxy/src/bin/proxy.rs | 18 +++++++++--------- proxy/src/config.rs | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index c693f8138f05..a55d1a344796 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -387,7 +387,7 @@ async fn main() -> anyhow::Result<()> { // TODO: Add gc regardles of the metric collection being enabled. maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); client_tasks.spawn(usage_metrics::task_backup( - &config.backup_metric_collection, + &metrics_config.backup_metric_collection_config, cancellation_token, )); } @@ -446,6 +446,13 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { if args.allow_self_signed_compute { warn!("allowing self-signed compute certificates"); } + let backup_metric_collection_config = config::MetricBackupCollectionConfig { + interval: args.metric_backup_collection_interval, + remote_storage_config: remote_storage_from_toml( + &args.metric_backup_collection_remote_storage, + )?, + chunk_size: args.metric_backup_collection_chunk_size, + }; let metric_collection = match ( &args.metric_collection_endpoint, @@ -454,6 +461,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { (Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig { endpoint: endpoint.parse()?, interval: humantime::parse_duration(interval)?, + backup_metric_collection_config, }), (None, None) => None, _ => bail!( @@ -461,13 +469,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { and metric-collection-interval must be specified" ), }; - let backup_metric_collection = config::MetricBackupCollectionConfig { - interval: args.metric_backup_collection_interval, - remote_storage_config: remote_storage_from_toml( - &args.metric_backup_collection_remote_storage, - )?, - chunk_size: args.metric_backup_collection_chunk_size, - }; let rate_limiter_config = RateLimiterConfig { disable: args.disable_dynamic_rate_limiter, algorithm: args.rate_limit_algorithm, @@ -557,7 +558,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { handshake_timeout: args.handshake_timeout, region: args.region.clone(), aws_region: args.aws_region.clone(), - backup_metric_collection, })); Ok(config) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 69490ffc9bc6..055fa42baa45 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -20,7 +20,6 @@ pub struct ProxyConfig { pub tls_config: Option, pub auth_backend: auth::BackendType<'static, (), ()>, pub metric_collection: Option, - pub backup_metric_collection: MetricBackupCollectionConfig, pub allow_self_signed_compute: bool, pub http_config: HttpConfig, pub authentication_config: AuthenticationConfig, @@ -37,6 +36,7 @@ pub struct ProxyConfig { pub struct MetricCollectionConfig { pub endpoint: reqwest::Url, pub interval: Duration, + pub backup_metric_collection_config: MetricBackupCollectionConfig, } pub struct TlsConfig { From b0fa5edfbe38c15af0aa29649051647301774de1 Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 12:58:22 +0100 Subject: [PATCH 10/13] Increase chunk size --- proxy/src/bin/proxy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index a55d1a344796..49a117a856ba 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -195,8 +195,8 @@ struct ProxyCliArgs { #[clap(long, default_value = "{}")] metric_backup_collection_remote_storage: String, /// chunk size for backup metric collection - /// Size of each event is no more than 400 bytes, so 2**18 is about 12.5MB. - #[clap(long, default_value = "262144")] + /// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression. + #[clap(long, default_value = "4194304")] metric_backup_collection_chunk_size: usize, } From eb1febcef4ec69e1265184f3f5fce2f8071aabb6 Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 12:59:26 +0100 Subject: [PATCH 11/13] Updated path --- proxy/src/usage_metrics.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 40240e194e5e..93e84a1c6f68 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -400,6 +400,7 @@ async fn collect_metrics_backup_iteration( let day = now.day(); let hour = now.hour(); let minute = now.minute(); + let second = now.second(); let cancel = CancellationToken::new(); info!("starting collect_metrics_backup_iteration"); @@ -418,7 +419,7 @@ async fn collect_metrics_backup_iteration( real_now.second().into(), real_now.nanosecond(), )); - let path = format!("{year:04}/{month:02}/{day:02}/{hour:02}:{minute:02}/events-{id}.json"); + let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz"); let remote_path = match RemotePath::from_string(&path) { Ok(remote_path) => remote_path, Err(e) => { From d16dfe7d0d8c91204285c8864e06f3ad81b43b2e Mon Sep 17 00:00:00 2001 From: Anna Khanova Date: Mon, 25 Mar 2024 13:28:53 +0100 Subject: [PATCH 12/13] Fix tests --- proxy/src/usage_metrics.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 93e84a1c6f68..89d161f139c8 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -567,9 +567,13 @@ mod tests { // counter is unregistered assert!(metrics.endpoints.is_empty()); + + collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000) + .await; + assert!(!metrics.backup_endpoints.is_empty()); collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000) .await; - // backup counter is unregistered + // backup counter is unregistered after the second iteration assert!(metrics.backup_endpoints.is_empty()); } } From 12a99a6daa13fd4aba370f07c5cff765a2e1d528 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 28 Mar 2024 16:27:15 +0000 Subject: [PATCH 13/13] use get_mut --- proxy/src/usage_metrics.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 89d161f139c8..2ad0883fb062 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -55,7 +55,7 @@ pub trait MetricCounterRecorder { } trait MetricCounterReporter { - fn get_metrics(&self) -> (u64, usize); + fn get_metrics(&mut self) -> (u64, usize); fn move_metrics(&self) -> (u64, usize); } @@ -76,10 +76,10 @@ impl MetricCounterRecorder for MetricBackupCounter { } impl MetricCounterReporter for MetricBackupCounter { - fn get_metrics(&self) -> (u64, usize) { + fn get_metrics(&mut self) -> (u64, usize) { ( - self.transmitted.load(Ordering::Acquire), - self.opened_connections.load(Ordering::Acquire), + *self.transmitted.get_mut(), + *self.opened_connections.get_mut(), ) } fn move_metrics(&self) -> (u64, usize) { @@ -112,10 +112,10 @@ impl MetricCounterRecorder for MetricCounter { } impl MetricCounterReporter for MetricCounter { - fn get_metrics(&self) -> (u64, usize) { + fn get_metrics(&mut self) -> (u64, usize) { ( - self.transmitted.load(Ordering::Acquire), - self.opened_connections.load(Ordering::Acquire), + *self.transmitted.get_mut(), + *self.opened_connections.get_mut(), ) } fn move_metrics(&self) -> (u64, usize) {