Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: upload consumption events to S3 #7213

Merged
merged 13 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ testing = []

[dependencies]
anyhow.workspace = true
async-compression.workspace = true
async-trait.workspace = true
aws-config.workspace = true
aws-sdk-iam.workspace = true
Expand Down
29 changes: 28 additions & 1 deletion proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use proxy::auth;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::remote_storage_from_toml;
use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
Expand Down Expand Up @@ -184,6 +185,19 @@ struct ProxyCliArgs {

#[clap(flatten)]
parquet_upload: ParquetUploadArgs,

/// interval for backup metric collection
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
metric_backup_collection_interval: std::time::Duration,
/// remote storage configuration for backup metric collection
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, default_value = "{}")]
metric_backup_collection_remote_storage: String,
/// chunk size for backup metric collection
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
metric_backup_collection_chunk_size: usize,
}

#[derive(clap::Args, Clone, Copy, Debug)]
Expand Down Expand Up @@ -365,12 +379,17 @@ async fn main() -> anyhow::Result<()> {

// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token));
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
maintenance_tasks.spawn(http::health_server::task_main(http_listener));
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));

if let Some(metrics_config) = &config.metric_collection {
// TODO: Add gc regardles of the metric collection being enabled.
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
client_tasks.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
));
}

if let auth::BackendType::Console(api, _) = &config.auth_backend {
Expand Down Expand Up @@ -427,6 +446,13 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
if args.allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: remote_storage_from_toml(
&args.metric_backup_collection_remote_storage,
)?,
chunk_size: args.metric_backup_collection_chunk_size,
};

let metric_collection = match (
&args.metric_collection_endpoint,
Expand All @@ -435,6 +461,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
(Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
interval: humantime::parse_duration(interval)?,
backup_metric_collection_config,
}),
(None, None) => None,
_ => bail!(
Expand Down
17 changes: 17 additions & 0 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{auth, rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions};
use anyhow::{bail, ensure, Context, Ok};
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::{
crypto::ring::sign,
pki_types::{CertificateDer, PrivateKeyDer},
Expand Down Expand Up @@ -35,6 +36,7 @@ pub struct ProxyConfig {
pub struct MetricCollectionConfig {
pub endpoint: reqwest::Url,
pub interval: Duration,
pub backup_metric_collection_config: MetricBackupCollectionConfig,
}

pub struct TlsConfig {
Expand Down Expand Up @@ -305,6 +307,21 @@ impl CertResolver {
}
}

#[derive(Debug)]
pub struct MetricBackupCollectionConfig {
pub interval: Duration,
pub remote_storage_config: OptRemoteStorageConfig,
pub chunk_size: usize,
}

/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
/// runtime type errors from the value parser we use.
pub type OptRemoteStorageConfig = Option<RemoteStorageConfig>;

pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
RemoteStorageConfig::from_toml(&s.parse()?)
}

/// Helper for cmdline cache options parsing.
#[derive(Debug)]
pub struct CacheOptions {
Expand Down
16 changes: 5 additions & 11 deletions proxy/src/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use parquet::{
},
record::RecordWriter,
};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use tokio::{sync::mpsc, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span};
use utils::backoff;

use crate::config::{remote_storage_from_toml, OptRemoteStorageConfig};

use super::{RequestMonitoring, LOG_CHAN};

#[derive(clap::Args, Clone, Debug)]
Expand Down Expand Up @@ -50,21 +52,13 @@ pub struct ParquetUploadArgs {
parquet_upload_compression: Compression,
}

/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
/// runtime type errors from the value parser we use.
type OptRemoteStorageConfig = Option<RemoteStorageConfig>;

fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
RemoteStorageConfig::from_toml(&s.parse()?)
}

// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a upload fails, we log it at info-level, and retry.
// But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
// level instead, as repeated failures can mean a more serious problem. If it
// fails more than FAILED_UPLOAD_RETRIES times, we give up
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
pub const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
pub const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;

// the parquet crate leaves a lot to be desired...
// what follows is an attempt to write parquet files with minimal allocs.
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
console::messages::MetricsAuxInfo,
metrics::NUM_BYTES_PROXIED_COUNTER,
stream::Stream,
usage_metrics::{Ids, USAGE_METRICS},
usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS},
};
use metrics::IntCounterPairGuard;
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down
1 change: 1 addition & 0 deletions proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::MetricCounterRecorder;
use crate::DbName;
use crate::RoleName;

Expand Down
Loading
Loading