diff --git a/.sqlx/query-0107ab57a47a423721cc6257cf1572348bf76ecf16632fe625ebafa17f45738a.json b/.sqlx/query-0107ab57a47a423721cc6257cf1572348bf76ecf16632fe625ebafa17f45738a.json new file mode 100644 index 000000000..545d2e451 --- /dev/null +++ b/.sqlx/query-0107ab57a47a423721cc6257cf1572348bf76ecf16632fe625ebafa17f45738a.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO crates (name) VALUES ($1)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "0107ab57a47a423721cc6257cf1572348bf76ecf16632fe625ebafa17f45738a" +} diff --git a/.sqlx/query-f550ed904fdb5d3ee6581fe1ad036c9b5b8db8765d5665042deb9ade67394d3c.json b/.sqlx/query-f550ed904fdb5d3ee6581fe1ad036c9b5b8db8765d5665042deb9ade67394d3c.json new file mode 100644 index 000000000..44b1f373a --- /dev/null +++ b/.sqlx/query-f550ed904fdb5d3ee6581fe1ad036c9b5b8db8765d5665042deb9ade67394d3c.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT name as \"name: KrateName\" FROM crates", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name: KrateName", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "f550ed904fdb5d3ee6581fe1ad036c9b5b8db8765d5665042deb9ade67394d3c" +} diff --git a/Cargo.lock b/Cargo.lock index ef78adb86..0e8170576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2051,6 +2051,7 @@ dependencies = [ "derive_builder", "derive_more 2.0.1", "docsrs-metadata", + "fastly-api", "flate2", "fn-error-context", "font-awesome-as-a-crate", @@ -2264,6 +2265,19 @@ dependencies = [ "serde", ] +[[package]] +name = "fastly-api" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6aa65a56cdc77f4d8a1c6e8ff46e752823b7bbc7ed2b708e2fb488e892e8f9f" +dependencies = [ + "reqwest", + "serde", + "serde_derive", + "serde_json", + "url", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -6739,6 +6753,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index e937cf627..98fd0e803 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ chrono = { version = "0.4.11", default-features = false, features = ["clock", "s # Transitive dependencies we don't use directly but need to have specific versions of thread_local = "1.1.3" constant_time_eq = "0.4.2" +fastly-api = "12.0.0" [dev-dependencies] criterion = "0.7.0" diff --git a/src/build_queue.rs b/src/build_queue.rs index 4ee79b5a4..84c4b037b 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -1,7 +1,9 @@ use crate::{ - BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder, cdn, + BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder, + cdn::{self, CdnMetrics}, db::{ - CrateId, Pool, delete_crate, delete_version, types::version::Version, + CrateId, Pool, delete_crate, delete_version, + types::{krate_name::KrateName, version::Version}, update_latest_version_id, }, docbuilder::{BuilderMetrics, PackageKind}, @@ -62,6 +64,7 @@ pub struct AsyncBuildQueue { metrics: Arc, queue_metrics: BuildQueueMetrics, builder_metrics: Arc, + cdn_metrics: Arc, max_attempts: i32, } @@ -71,6 +74,7 @@ impl AsyncBuildQueue { metrics: Arc, config: Arc, storage: Arc, + cdn_metrics: Arc, otel_meter_provider: &AnyMeterProvider, ) -> Self { AsyncBuildQueue { @@ -81,6 +85,7 @@ impl AsyncBuildQueue { storage, queue_metrics: BuildQueueMetrics::new(otel_meter_provider), builder_metrics: Arc::new(BuilderMetrics::new(otel_meter_provider)), + cdn_metrics, } } @@ -259,6 +264,25 @@ impl AsyncBuildQueue { /// Index methods. impl AsyncBuildQueue { + async fn queue_crate_invalidation(&self, conn: &mut sqlx::PgConnection, krate: &str) { + let krate = match krate + .parse::() + .with_context(|| format!("can't parse crate name '{}'", krate)) + { + Ok(krate) => krate, + Err(err) => { + report_error(&err); + return; + } + }; + + if let Err(err) = + cdn::queue_crate_invalidation(conn, &self.config, &self.cdn_metrics, &krate).await + { + report_error(&err); + } + } + /// Updates registry index repository and adds new crates into build queue. /// /// Returns the number of crates added @@ -296,11 +320,8 @@ impl AsyncBuildQueue { ), Err(err) => report_error(&err), } - if let Err(err) = - cdn::queue_crate_invalidation(&mut conn, &self.config, krate).await - { - report_error(&err); - } + + self.queue_crate_invalidation(&mut conn, krate).await; continue; } @@ -328,11 +349,9 @@ impl AsyncBuildQueue { ), Err(err) => report_error(&err), } - if let Err(err) = - cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await - { - report_error(&err); - } + + self.queue_crate_invalidation(&mut conn, &release.name) + .await; continue; } @@ -387,11 +406,8 @@ impl AsyncBuildQueue { report_error(&err); } - if let Err(err) = - cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await - { - report_error(&err); - } + self.queue_crate_invalidation(&mut conn, &release.name) + .await; } } @@ -581,13 +597,11 @@ impl BuildQueue { self.inner.metrics.total_builds.inc(); self.inner.builder_metrics.total_builds.add(1, &[]); - if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( - &mut transaction, - &self.inner.config, - &to_process.name, - )) { - report_error(&err); - } + + self.runtime.block_on( + self.inner + .queue_crate_invalidation(&mut transaction, &to_process.name), + ); let mut increase_attempt_count = || -> Result<()> { let attempt: i32 = self.runtime.block_on( @@ -1119,7 +1133,7 @@ mod tests { assert!( env.runtime() .block_on(async { - cdn::queued_or_active_crate_invalidations( + cdn::cloudfront::queued_or_active_crate_invalidations( &mut *env.async_db().async_conn().await, ) .await @@ -1148,7 +1162,7 @@ mod tests { env.runtime() .block_on(async { let mut conn = env.async_db().async_conn().await; - cdn::queued_or_active_crate_invalidations(&mut conn).await + cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn).await }) .unwrap() }; diff --git a/src/cdn.rs b/src/cdn/cloudfront.rs similarity index 93% rename from src/cdn.rs rename to src/cdn/cloudfront.rs index 13006d50f..8ec806407 100644 --- a/src/cdn.rs +++ b/src/cdn/cloudfront.rs @@ -1,8 +1,5 @@ -use crate::{ - Config, InstanceMetrics, - metrics::{CDN_INVALIDATION_HISTOGRAM_BUCKETS, otel::AnyMeterProvider}, - utils::report_error, -}; +use super::CdnMetrics; +use crate::{Config, InstanceMetrics, utils::report_error}; use anyhow::{Context, Error, Result, anyhow, bail}; use aws_config::BehaviorVersion; use aws_sdk_cloudfront::{ @@ -13,7 +10,7 @@ use aws_sdk_cloudfront::{ }; use chrono::{DateTime, Utc}; use futures_util::stream::TryStreamExt; -use opentelemetry::{KeyValue, metrics::Histogram}; +use opentelemetry::KeyValue; use serde::Serialize; use sqlx::Connection as _; use std::{ @@ -29,31 +26,6 @@ use uuid::Uuid; /// triggered invalidations const MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS: i32 = 13; -#[derive(Debug)] -pub struct CdnMetrics { - invalidation_time: Histogram, - queue_time: Histogram, -} - -impl CdnMetrics { - pub fn new(meter_provider: &AnyMeterProvider) -> Self { - let meter = meter_provider.meter("cdn"); - const PREFIX: &str = "docsrs.cdn"; - Self { - invalidation_time: meter - .f64_histogram(format!("{PREFIX}.invalidation_time")) - .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) - .with_unit("s") - .build(), - queue_time: meter - .f64_histogram(format!("{PREFIX}.queue_time")) - .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) - .with_unit("s") - .build(), - } - } -} - #[derive(Debug, EnumString)] pub enum CdnKind { #[strum(ascii_case_insensitive)] @@ -567,57 +539,6 @@ pub(crate) async fn handle_queued_invalidation_requests( Ok(()) } -#[instrument(skip(conn, config))] -pub(crate) async fn queue_crate_invalidation( - conn: &mut sqlx::PgConnection, - config: &Config, - name: &str, -) -> Result<()> { - if !config.cache_invalidatable_responses { - info!("full page cache disabled, skipping queueing invalidation"); - return Ok(()); - } - - async fn add( - conn: &mut sqlx::PgConnection, - name: &str, - distribution_id: &str, - path_patterns: &[&str], - ) -> Result<()> { - for pattern in path_patterns { - debug!(distribution_id, pattern, "enqueueing web CDN invalidation"); - sqlx::query!( - "INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern) - VALUES ($1, $2, $3)", - name, - distribution_id, - pattern - ) - .execute(&mut *conn) - .await?; - } - Ok(()) - } - - if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { - add( - conn, - name, - distribution_id, - &[&format!("/{name}*"), &format!("/crate/{name}*")], - ) - .await - .context("error enqueueing web CDN invalidation")?; - } - if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { - add(conn, name, distribution_id, &[&format!("/rustdoc/{name}*")]) - .await - .context("error enqueueing static CDN invalidation")?; - } - - Ok(()) -} - #[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] pub(crate) struct QueuedInvalidation { pub krate: String, @@ -683,14 +604,12 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution( #[cfg(test)] mod tests { - use std::time::Duration; - use super::*; - use crate::test::TestEnvironment; - + use crate::{cdn::queue_crate_invalidation, test::TestEnvironment}; use aws_sdk_cloudfront::{Config, config::Credentials}; use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; use aws_smithy_types::body::SdkBody; + use std::time::Duration; const DISTRIBUTION_ID_WEB: &str = "distribution_id_web"; const DISTRIBUTION_ID_STATIC: &str = "distribution_id_static"; @@ -817,7 +736,9 @@ mod tests { .is_empty() ); - queue_crate_invalidation(&mut conn, env.config(), "krate").await?; + let metrics = otel_metrics(&env); + queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) + .await?; // invalidation paths are queued. assert_eq!( @@ -922,7 +843,9 @@ mod tests { .is_empty() ); - queue_crate_invalidation(&mut conn, env.config(), "krate").await?; + let metrics = otel_metrics(&env); + queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) + .await?; // invalidation paths are queued. assert_eq!( @@ -1088,7 +1011,9 @@ mod tests { .await?; // queue an invalidation - queue_crate_invalidation(&mut conn, env.config(), "krate").await?; + let metrics = otel_metrics(&env); + queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) + .await?; let metrics = otel_metrics(&env); @@ -1150,7 +1075,9 @@ mod tests { .await?; // queue an invalidation - queue_crate_invalidation(&mut conn, env.config(), "krate").await?; + let metrics = otel_metrics(&env); + queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) + .await?; let metrics = otel_metrics(&env); diff --git a/src/cdn/fastly.rs b/src/cdn/fastly.rs new file mode 100644 index 000000000..dfb7f6667 --- /dev/null +++ b/src/cdn/fastly.rs @@ -0,0 +1,109 @@ +use crate::{ + cdn::CdnMetrics, + config::Config, + web::headers::{SurrogateKey, SurrogateKeys}, +}; +use anyhow::{Result, bail}; +use chrono::{DateTime, Utc}; +use fastly_api::apis::{ + configuration::{ApiKey, Configuration}, + purge_api::{BulkPurgeTagParams, bulk_purge_tag}, +}; +use itertools::Itertools as _; +use opentelemetry::KeyValue; +use tracing::error; + +/// Purge the given surrogate keys from all configured fastly services. +/// +/// Accepts any number of surrogate keys, and splits them into appropriately sized +/// batches for the Fastly API. +pub(crate) async fn purge_surrogate_keys( + config: &Config, + metrics: &CdnMetrics, + keys: I, +) -> Result<()> +where + I: IntoIterator, +{ + let Some(api_token) = &config.fastly_api_token else { + bail!("Fastly API token not configured"); + }; + + let mut cfg = Configuration { + api_key: Some(ApiKey { + prefix: None, + key: api_token.to_owned(), + }), + ..Default::default() + }; + + // the `bulk_purge_tag` supports up to 256 surrogate keys in its list, + // but I believe we also have to respect the length limits for the full + // surrogate key header. + for encoded_surrogate_keys in keys.into_iter().batching(|it| { + const MAX_SURROGATE_KEYS_IN_BATCH_PURGE: usize = 256; + + // SurrogateKeys::from_iter::until_full only consumes as many elements as will fit into + // the header. + // The rest is up to the next `batching` iteration. + Some(SurrogateKeys::from_iter_until_full( + it.take(MAX_SURROGATE_KEYS_IN_BATCH_PURGE), + )) + }) { + for sid in config + .fastly_service_sid_web + .iter() + .chain(config.fastly_service_sid_static.iter()) + { + // NOTE: we start with just calling the API, and logging an error if they happen. + // We can then see if we need retries or escalation to full purges. + + let kv = [KeyValue::new("service_sid", sid.clone())]; + match bulk_purge_tag( + &mut cfg, + BulkPurgeTagParams { + service_id: sid.to_owned(), + // TODO: investigate how they could help & test + // soft purge. But later, after the initial migration. + fastly_soft_purge: None, + surrogate_key: Some(encoded_surrogate_keys.to_string()), + ..Default::default() + }, + ) + .await + { + Ok(_) => { + metrics.fastly_batch_purges_with_surrogate.add(1, &kv); + metrics + .fastly_purge_surrogate_keys + .add(encoded_surrogate_keys.key_count() as u64, &kv); + } + Err(err) => { + metrics.fastly_batch_purge_errors.add(1, &kv); + let rate_limit_reset = + DateTime::::from_timestamp(cfg.rate_limit_reset as i64, 0) + .map(|dt| dt.to_rfc3339()); + error!( + sid, + ?err, + %encoded_surrogate_keys, + rate_limit_remaining=cfg.rate_limit_remaining, + rate_limit_reset, + "Failed to purge Fastly surrogate keys for service" + ); + } + } + } + } + + metrics + .fastly_rate_limit_remaining + .record(cfg.rate_limit_remaining, &[]); + metrics.fastly_time_until_rate_limit_reset.record( + cfg.rate_limit_reset + .saturating_sub(Utc::now().timestamp() as u64), + &[], + ); + + Ok(()) +} diff --git a/src/cdn/mod.rs b/src/cdn/mod.rs new file mode 100644 index 000000000..1ad7019dd --- /dev/null +++ b/src/cdn/mod.rs @@ -0,0 +1,134 @@ +use crate::{ + Config, + db::types::krate_name::KrateName, + metrics::{CDN_INVALIDATION_HISTOGRAM_BUCKETS, otel::AnyMeterProvider}, + web::headers::SurrogateKey, +}; +use anyhow::{Context, Result}; +use opentelemetry::metrics::{Counter, Gauge, Histogram}; +use tracing::{debug, error, info, instrument}; + +pub(crate) mod cloudfront; +pub(crate) mod fastly; + +#[derive(Debug)] +pub struct CdnMetrics { + invalidation_time: Histogram, + queue_time: Histogram, + fastly_batch_purges_with_surrogate: Counter, + fastly_batch_purge_errors: Counter, + fastly_purge_surrogate_keys: Counter, + fastly_rate_limit_remaining: Gauge, + fastly_time_until_rate_limit_reset: Gauge, +} + +impl CdnMetrics { + pub fn new(meter_provider: &AnyMeterProvider) -> Self { + let meter = meter_provider.meter("cdn"); + const PREFIX: &str = "docsrs.cdn"; + Self { + invalidation_time: meter + .f64_histogram(format!("{PREFIX}.invalidation_time")) + .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) + .with_unit("s") + .build(), + queue_time: meter + .f64_histogram(format!("{PREFIX}.queue_time")) + .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) + .with_unit("s") + .build(), + fastly_batch_purges_with_surrogate: meter + .u64_counter(format!("{PREFIX}.fastly_batch_purges_with_surrogate")) + .with_unit("1") + .build(), + fastly_batch_purge_errors: meter + .u64_counter(format!("{PREFIX}.fastly_batch_purge_errors")) + .with_unit("1") + .build(), + fastly_purge_surrogate_keys: meter + .u64_counter(format!("{PREFIX}.fastly_purge_surrogate_keys")) + .with_unit("1") + .build(), + fastly_rate_limit_remaining: meter + .u64_gauge(format!("{PREFIX}.fasty_rate_limit_remaining")) + .with_unit("1") + .build(), + fastly_time_until_rate_limit_reset: meter + .u64_gauge(format!("{PREFIX}.fastly_time_until_rate_limit_reset")) + .with_unit("s") + .build(), + } + } +} + +#[instrument(skip(conn, config))] +pub(crate) async fn queue_crate_invalidation( + conn: &mut sqlx::PgConnection, + config: &Config, + metrics: &CdnMetrics, + krate_name: &KrateName, +) -> Result<()> { + if !config.cache_invalidatable_responses { + info!("full page cache disabled, skipping queueing invalidation"); + return Ok(()); + } + + if config.fastly_api_token.is_some() + && let Err(err) = fastly::purge_surrogate_keys( + config, + metrics, + std::iter::once(SurrogateKey::from(krate_name.clone())), + ) + .await + { + // TODO: for now just consume & report the error, I want to see how often that happens. + // We can then decide if we need more protection mechanisms (like retries or queuing). + error!(%krate_name, ?err, "error purging Fastly surrogate keys"); + } + + /// cloudfront needs a queue to work around a concurrency limit of just 15 parallel + /// wildcard invalidations. + async fn add( + conn: &mut sqlx::PgConnection, + name: &str, + distribution_id: &str, + path_patterns: &[&str], + ) -> Result<()> { + for pattern in path_patterns { + debug!(distribution_id, pattern, "enqueueing web CDN invalidation"); + sqlx::query!( + "INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern) + VALUES ($1, $2, $3)", + name, + distribution_id, + pattern + ) + .execute(&mut *conn) + .await?; + } + Ok(()) + } + + if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { + add( + conn, + krate_name, + distribution_id, + &[&format!("/{krate_name}*"), &format!("/crate/{krate_name}*")], + ) + .await + .context("error enqueueing web CDN invalidation")?; + } + if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { + add( + conn, + krate_name, + distribution_id, + &[&format!("/rustdoc/{krate_name}*")], + ) + .await + .context("error enqueueing static CDN invalidation")?; + } + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index 246736b94..51528880d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use crate::{cdn::CdnKind, storage::StorageKind}; +use crate::{cdn::cloudfront::CdnKind, storage::StorageKind}; use anyhow::{Context, Result, anyhow, bail}; use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration}; use tracing::trace; @@ -102,6 +102,8 @@ pub struct Config { // This only affects pages that depend on invalidations to work. pub(crate) cache_invalidatable_responses: bool, + /// CDN backend to use for invalidations. + /// Only needed for cloudfront. pub(crate) cdn_backend: CdnKind, /// The maximum age of a queued invalidation request before it is @@ -112,8 +114,19 @@ pub struct Config { // CloudFront distribution ID for the web server. // Will be used for invalidation-requests. pub cloudfront_distribution_id_web: Option, + /// same for the `static.docs.rs` distribution pub cloudfront_distribution_id_static: Option, + + /// Fastly API token for purging the services below. + pub fastly_api_token: Option, + + /// fastly service SID for the main domain + pub fastly_service_sid_web: Option, + + /// same for the `static.docs.rs` distribution + pub fastly_service_sid_static: Option, + pub(crate) build_workspace_reinitialization_interval: Duration, // Build params @@ -213,6 +226,9 @@ impl Config { .cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?)) .cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?) .cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?) + .fastly_api_token(maybe_env("DOCSRS_FASTLY_API_TOKEN")?) + .fastly_service_sid_web(maybe_env("DOCSRS_FASTLY_SERVICE_SID_WEB")?) + .fastly_service_sid_static(maybe_env("DOCSRS_FASTLY_SERVICE_SID_STATIC")?) .local_archive_cache_path(ensure_absolute_path(env( "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", prefix.join("archive_cache"), diff --git a/src/context.rs b/src/context.rs index d8640f268..aefe457e1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,7 +1,7 @@ use crate::{ AsyncBuildQueue, AsyncStorage, BuildQueue, Config, InstanceMetrics, RegistryApi, ServiceMetrics, Storage, - cdn::CdnBackend, + cdn::{CdnMetrics, cloudfront::CdnBackend}, db::Pool, metrics::otel::{AnyMeterProvider, get_meter_provider}, repositories::RepositoryStatsUpdater, @@ -17,6 +17,7 @@ pub struct Context { pub storage: Arc, pub async_storage: Arc, pub cdn: Arc, + pub cdn_metrics: Arc, pub pool: Pool, pub service_metrics: Arc, pub instance_metrics: Arc, @@ -69,16 +70,17 @@ impl Context { .await?, ); + let cdn_metrics = Arc::new(CdnMetrics::new(&meter_provider)); + let cdn = Arc::new(CdnBackend::new(&config).await); let async_build_queue = Arc::new(AsyncBuildQueue::new( pool.clone(), instance_metrics.clone(), config.clone(), async_storage.clone(), + cdn_metrics.clone(), &meter_provider, )); - let cdn = Arc::new(CdnBackend::new(&config).await); - let runtime = runtime::Handle::current(); // sync wrappers around build-queue & storage async resources @@ -91,6 +93,7 @@ impl Context { storage, async_storage, cdn, + cdn_metrics, pool: pool.clone(), service_metrics: Arc::new(ServiceMetrics::new()?), instance_metrics, diff --git a/src/db/types/krate_name.rs b/src/db/types/krate_name.rs new file mode 100644 index 000000000..44e963b67 --- /dev/null +++ b/src/db/types/krate_name.rs @@ -0,0 +1,143 @@ +use anyhow::{Result, bail}; +use derive_more::{Deref, Display, Into}; +use serde_with::{DeserializeFromStr, SerializeDisplay}; +use sqlx::{ + Decode, Encode, Postgres, + encode::IsNull, + error::BoxDynError, + postgres::{PgArgumentBuffer, PgTypeInfo, PgValueRef}, + prelude::*, +}; +use std::{io::Write, str::FromStr}; + +/// validated crate name +/// +/// Right now only used in web::cache, but we'll probably also use it +/// to match our routes later. +/// +/// FIXME: this should actually come from some shared crate between the rust projects, +/// so the amount of duplication is less. +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Deref, Into, Display, DeserializeFromStr, SerializeDisplay, +)] +pub struct KrateName(String); + +impl FromStr for KrateName { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + validate_crate_name(s)?; + Ok(KrateName(s.to_string())) + } +} + +impl Type for KrateName { + fn type_info() -> PgTypeInfo { + >::type_info() + } + + fn compatible(ty: &PgTypeInfo) -> bool { + >::compatible(ty) + } +} + +impl<'q> Encode<'q, Postgres> for KrateName { + fn encode_by_ref(&self, buf: &mut PgArgumentBuffer) -> Result { + write!(**buf, "{}", self.0)?; + Ok(IsNull::No) + } +} + +impl<'r> Decode<'r, Postgres> for KrateName { + fn decode(value: PgValueRef<'r>) -> Result { + let s: &str = Decode::::decode(value)?; + Ok(Self(s.parse()?)) + } +} + +impl PartialEq for KrateName +where + T: AsRef, +{ + fn eq(&self, other: &T) -> bool { + self.0 == other.as_ref() + } +} + +/// validate if a string is a valid crate name. +/// Based on the crates.io implementation in their publish-endpoint: +/// https://github.com/rust-lang/crates.io/blob/9651eaab14887e0442849d5e81c1f2bbf10a73a2/crates/crates_io_database/src/models/krate.rs#L218-L252 +fn validate_crate_name(name: &str) -> Result<()> { + if name.is_empty() { + bail!("empty crate name"); + } + if name.len() > 64 { + bail!("crate name too long (maximum is 64 characters)"); + } + + let mut chars = name.chars(); + if let Some(ch) = chars.next() { + if ch.is_ascii_digit() { + bail!("crate name cannot start with a digit"); + } + if !ch.is_ascii_alphabetic() { + bail!("crate name must start with an alphabetic character"); + } + } + + for ch in chars { + if !(ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') { + bail!("invalid character '{}' in crate name", ch); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::test::TestEnvironment; + + use super::*; + use test_case::test_case; + + #[test_case("valid_crate_name")] + #[test_case("with-dash")] + #[test_case("CapitalLetter")] + fn test_valid_crate_name(name: &str) { + assert!(validate_crate_name(name).is_ok()); + assert_eq!(name.parse::().unwrap(), name); + } + + #[test_case("with space")] + #[test_case("line break\n")] + #[test_case("non ascii äöü")] + #[test_case("0123456789101112131415161718192021222324252627282930313233343536373839"; "too long")] + fn test_invalid_crate_name(name: &str) { + assert!(validate_crate_name(name).is_err()); + assert!(name.parse::().is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_sqlx_encode_decode() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_db().async_conn().await; + + let some_crate_name = "some-krate-123".parse::()?; + + sqlx::query!( + "INSERT INTO crates (name) VALUES ($1)", + some_crate_name as _ + ) + .execute(&mut *conn) + .await?; + + let new_name = sqlx::query_scalar!(r#"SELECT name as "name: KrateName" FROM crates"#) + .fetch_one(&mut *conn) + .await?; + + assert_eq!(new_name, some_crate_name); + + Ok(()) + } +} diff --git a/src/db/types/mod.rs b/src/db/types/mod.rs index 54a2bf3be..a7e863537 100644 --- a/src/db/types/mod.rs +++ b/src/db/types/mod.rs @@ -2,6 +2,7 @@ use derive_more::{Display, FromStr}; use serde::{Deserialize, Serialize}; pub mod dependencies; +pub mod krate_name; pub mod version; #[derive(Debug, Clone, Copy, Display, PartialEq, Eq, Hash, Serialize, sqlx::Type)] diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 54003e11b..898283b36 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -366,8 +366,10 @@ impl ServiceMetrics { let mut conn = pool.get_async().await?; for (distribution_id, count) in - cdn::queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config) - .await? + cdn::cloudfront::queued_or_active_crate_invalidation_count_by_distribution( + &mut conn, config, + ) + .await? { self.queued_cdn_invalidations_by_distribution .with_label_values(&[&distribution_id]) diff --git a/src/metrics/service.rs b/src/metrics/service.rs index 9483f47b9..9203593bb 100644 --- a/src/metrics/service.rs +++ b/src/metrics/service.rs @@ -84,8 +84,10 @@ impl OtelServiceMetrics { } for (distribution_id, count) in - cdn::queued_or_active_crate_invalidation_count_by_distribution(&mut *conn, config) - .await? + cdn::cloudfront::queued_or_active_crate_invalidation_count_by_distribution( + &mut *conn, config, + ) + .await? { self.queued_cdn_invalidations_by_distribution.record( count as u64, diff --git a/src/test/headers.rs b/src/test/headers.rs new file mode 100644 index 000000000..e67c1a57f --- /dev/null +++ b/src/test/headers.rs @@ -0,0 +1,24 @@ +use axum_extra::headers::{self, Header, HeaderMapExt}; +use http::{HeaderMap, HeaderValue}; + +pub(crate) fn test_typed_decode(value: V) -> Result, headers::Error> +where + H: Header, + V: TryInto, + >::Error: std::fmt::Debug, +{ + let mut map = HeaderMap::new(); + map.append( + H::name(), + // this `.try_into` only generates the `HeaderValue` items. + value.try_into().unwrap(), + ); + // parsing errors from the typed header end up here. + map.typed_try_get() +} + +pub(crate) fn test_typed_encode(header: H) -> HeaderValue { + let mut map = HeaderMap::new(); + map.typed_insert(header); + map.get(H::name()).cloned().unwrap() +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 42012da77..44419a4bc 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,23 +1,30 @@ mod fakes; +pub(crate) mod headers; mod test_metrics; pub(crate) use self::fakes::{FakeBuild, fake_release_that_failed_before_build}; use crate::{ AsyncBuildQueue, BuildQueue, Config, Context, InstanceMetrics, - cdn::CdnBackend, + cdn::{CdnMetrics, cloudfront::CdnBackend}, config::ConfigBuilder, db::{self, AsyncPoolClient, Pool, types::version::Version}, error::Result, metrics::otel::AnyMeterProvider, storage::{AsyncStorage, Storage, StorageKind}, test::test_metrics::CollectedMetrics, - web::{build_axum_app, cache, page::TemplateData}, + web::{ + build_axum_app, + cache::{self, TargetCdn}, + headers::SURROGATE_CONTROL, + page::TemplateData, + }, }; use anyhow::Context as _; use axum::body::Bytes; use axum::{Router, body::Body, http::Request, response::Response as AxumResponse}; use fn_error_context::context; use futures_util::stream::TryStreamExt; +use http::header::CACHE_CONTROL; use http_body_util::BodyExt; use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader}; use serde::de::DeserializeOwned; @@ -46,6 +53,22 @@ where env.runtime().block_on(f(env.clone())).expect("test failed"); } +pub(crate) fn assert_cache_headers_eq( + response: &axum::response::Response, + expected_headers: &cache::ResponseCacheHeaders, +) { + assert_eq!( + expected_headers.cache_control.as_ref(), + response.headers().get(CACHE_CONTROL), + "cache control header mismatch" + ); + assert_eq!( + expected_headers.surrogate_control.as_ref(), + response.headers().get(&SURROGATE_CONTROL), + "surrogate control header mismatch" + ); +} + pub(crate) trait AxumResponseTestExt { async fn text(self) -> Result; async fn bytes(self) -> Result; @@ -73,19 +96,15 @@ impl AxumResponseTestExt for axum::response::Response { } fn assert_cache_control(&self, cache_policy: cache::CachePolicy, config: &Config) { assert!(config.cache_control_stale_while_revalidate.is_some()); - let cache_control = self.headers().get("Cache-Control"); - if let Some(expected_directives) = cache_policy.render(config) { - assert_eq!( - cache_control - .expect("missing cache-control header") - .to_str() - .unwrap(), - expected_directives.to_str().unwrap(), - ); - } else { - assert!(cache_control.is_none(), "{:?}", cache_control); - } + // This method is only about asserting if the handler did set the right _policy_. + // + // But we only test for CloudFront here. + // The different policies are unique enough so we would have a test failure when + // we emit the wrong cache policy in a handler. + // + // The fastly specifics are tested in web::cache unittests. + assert_cache_headers_eq(self, &cache_policy.render(config, TargetCdn::CloudFront)); } fn error_for_status(self) -> Result @@ -160,16 +179,7 @@ impl AxumRouterTestExt for axum::Router { let response = self.get(path).await?; // for now, 404s should always have `no-cache` - // assert_no_cache(&response); - assert_eq!( - response - .headers() - .get("Cache-Control") - .expect("missing cache-control header") - .to_str() - .unwrap(), - cache::NO_CACHING.to_str().unwrap(), - ); + assert_cache_headers_eq(&response, &cache::NO_CACHING); assert_eq!(response.status(), 404, "GET {path} should have been a 404"); Ok(()) @@ -429,6 +439,10 @@ impl TestEnvironment { &self.context.config } + pub(crate) fn cdn_metrics(&self) -> &CdnMetrics { + &self.context.cdn_metrics + } + pub(crate) fn async_storage(&self) -> &AsyncStorage { &self.context.async_storage } diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 5cac36af1..ce0da0a93 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -153,7 +153,7 @@ pub fn start_background_cdn_invalidator(context: &Context) -> Result<(), Error> let runtime = context.runtime.clone(); let cdn = context.cdn.clone(); - let otel_metrics = Arc::new(cdn::CdnMetrics::new(&context.meter_provider)); + let otel_metrics = context.cdn_metrics.clone(); if config.cloudfront_distribution_id_web.is_none() && config.cloudfront_distribution_id_static.is_none() @@ -180,7 +180,7 @@ pub fn start_background_cdn_invalidator(context: &Context) -> Result<(), Error> async move { let mut conn = pool.get_async().await?; if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { - cdn::handle_queued_invalidation_requests( + cdn::cloudfront::handle_queued_invalidation_requests( &config, &cdn, &metrics, @@ -192,7 +192,7 @@ pub fn start_background_cdn_invalidator(context: &Context) -> Result<(), Error> .context("error handling queued invalidations for web CDN invalidation")?; } if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { - cdn::handle_queued_invalidation_requests( + cdn::cloudfront::handle_queued_invalidation_requests( &config, &cdn, &metrics, diff --git a/src/web/cache.rs b/src/web/cache.rs index 6d1ec98b8..f3e176558 100644 --- a/src/web/cache.rs +++ b/src/web/cache.rs @@ -1,21 +1,143 @@ -use crate::config::Config; +use crate::{ + config::Config, + db::types::krate_name::KrateName, + web::{ + extractors::Path, + headers::{SURROGATE_CONTROL, SURROGATE_KEY, SurrogateKeys}, + }, +}; use axum::{ - extract::Request as AxumHttpRequest, middleware::Next, response::Response as AxumResponse, + Extension, + extract::{FromRequestParts, MatchedPath, Request as AxumHttpRequest}, + middleware::Next, + response::Response as AxumResponse, +}; +use axum_extra::headers::HeaderMapExt as _; +use http::{HeaderMap, HeaderName, HeaderValue, header::CACHE_CONTROL, request::Parts}; +use serde::Deserialize; +use std::{convert::Infallible, sync::Arc}; +use tracing::error; + +const FASTLY_FF: HeaderName = HeaderName::from_static("fastly-ff"); + +#[derive(Debug, Clone, PartialEq)] +pub struct ResponseCacheHeaders { + pub cache_control: Option, + pub surrogate_control: Option, + pub needs_cdn_invalidation: bool, +} + +impl ResponseCacheHeaders { + fn set_on_response(&self, headers: &mut HeaderMap) { + if let Some(ref cache_control) = self.cache_control { + headers.insert(CACHE_CONTROL, cache_control.clone()); + } + if let Some(ref surrogate_control) = self.surrogate_control { + headers.insert(&SURROGATE_CONTROL, surrogate_control.clone()); + } + } +} + +/// No caching in the CDN & in the browser. +/// Browser & CDN often still store the file, +/// but then always revalidate using `If-Modified-Since` (with last modified) +/// or `If-None-Match` (with etag). +/// Browser might still sometimes use cached content, for example when using +/// the "back" button. +pub static NO_CACHING: ResponseCacheHeaders = ResponseCacheHeaders { + cache_control: Some(HeaderValue::from_static("max-age=0")), + surrogate_control: None, + needs_cdn_invalidation: false, +}; + +/// Cache for a short time in the browser & in the CDN. +/// Helps protecting against traffic spikes. +pub static SHORT: ResponseCacheHeaders = ResponseCacheHeaders { + cache_control: Some(HeaderValue::from_static("public, max-age=60")), + surrogate_control: None, + needs_cdn_invalidation: false, +}; + +/// don't cache, don't even store. Never. Ever. +pub static NO_STORE_MUST_REVALIDATE: ResponseCacheHeaders = ResponseCacheHeaders { + cache_control: Some(HeaderValue::from_static( + "no-cache, no-store, must-revalidate, max-age=0", + )), + surrogate_control: None, + needs_cdn_invalidation: false, +}; + +pub static FOREVER_IN_FASTLY_CDN: ResponseCacheHeaders = ResponseCacheHeaders { + // explicitly forbid browser caching, same as NO_CACHING above. + cache_control: Some(HeaderValue::from_static("max-age=0")), + + // set `surrogate-control`, cache forever in the CDN + // https://www.fastly.com/documentation/reference/http/http-headers/Surrogate-Control/ + // + // TODO: evaluate if we can / should set `stale-while-revalidate` or `stale-if-error` here, + // especially in combination with our fastly compute service. + // https://www.fastly.com/documentation/guides/concepts/edge-state/cache/stale/ + surrogate_control: Some(HeaderValue::from_static("max-age=31536000")), + + needs_cdn_invalidation: true, +}; + +pub static FOREVER_IN_CLOUDFRONT_CDN: ResponseCacheHeaders = ResponseCacheHeaders { + // A missing `max-age` or `s-maxage` in the Cache-Control header will lead to + // CloudFront using the default TTL, while the browser not seeing any caching header. + // + // Default TTL is set here: + // https://github.com/rust-lang/simpleinfra/blob/becf4532a10a7a218aedb34d4648ecb73e61f5fd/terraform/docs-rs/cloudfront.tf#L24 + // + // This means we can have the CDN caching the documentation while just + // issuing a purge after a build. + // https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Expiration.html#ExpirationDownloadDist + // + // There might be edge cases where browsers add caching based on arbitraty heuristics + // when `Cache-Control` is missing. + cache_control: None, + surrogate_control: None, + needs_cdn_invalidation: true, }; -use http::{HeaderValue, header::CACHE_CONTROL}; -use std::sync::Arc; -pub static NO_CACHING: HeaderValue = HeaderValue::from_static("max-age=0"); -pub static SHORT: HeaderValue = HeaderValue::from_static("max-age=60"); +/// cache forever in browser & CDN. +/// Only usable for content with unique filenames. +/// +/// We use this policy mostly for static files, rustdoc toolchain assets, +/// or build assets. +pub static FOREVER_IN_CDN_AND_BROWSER: ResponseCacheHeaders = ResponseCacheHeaders { + cache_control: Some(HeaderValue::from_static( + "public, max-age=31104000, immutable", + )), + surrogate_control: None, + needs_cdn_invalidation: false, +}; + +#[derive(Debug, Copy, Clone, PartialEq)] +#[cfg_attr(test, derive(strum::EnumIter))] +pub enum TargetCdn { + Fastly, + CloudFront, +} -pub static NO_STORE_MUST_REVALIDATE: HeaderValue = - HeaderValue::from_static("no-cache, no-store, must-revalidate, max-age=0"); +impl FromRequestParts for TargetCdn +where + S: Send + Sync, +{ + type Rejection = Infallible; -pub static FOREVER_IN_CDN_AND_BROWSER: HeaderValue = - HeaderValue::from_static("public, max-age=31104000, immutable"); + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + if parts.headers.contains_key(FASTLY_FF) { + Ok(TargetCdn::Fastly) + } else { + Ok(TargetCdn::CloudFront) + } + } +} /// defines the wanted caching behaviour for a web response. #[derive(Debug, Clone)] +#[cfg_attr(test, derive(strum::EnumIter))] pub enum CachePolicy { /// no browser or CDN caching. /// In some cases the browser might still use cached content, @@ -49,72 +171,262 @@ pub enum CachePolicy { } impl CachePolicy { - pub fn render(&self, config: &Config) -> Option { + pub fn render(&self, config: &Config, target_cdn: TargetCdn) -> ResponseCacheHeaders { match *self { - CachePolicy::NoCaching => Some(NO_CACHING.clone()), - CachePolicy::NoStoreMustRevalidate => Some(NO_STORE_MUST_REVALIDATE.clone()), - CachePolicy::ShortInCdnAndBrowser => Some(SHORT.clone()), - CachePolicy::ForeverInCdnAndBrowser => Some(FOREVER_IN_CDN_AND_BROWSER.clone()), + CachePolicy::NoCaching => NO_CACHING.clone(), + CachePolicy::NoStoreMustRevalidate => NO_STORE_MUST_REVALIDATE.clone(), + CachePolicy::ShortInCdnAndBrowser => SHORT.clone(), + CachePolicy::ForeverInCdnAndBrowser => FOREVER_IN_CDN_AND_BROWSER.clone(), CachePolicy::ForeverInCdn => { if config.cache_invalidatable_responses { - // A missing `max-age` or `s-maxage` in the Cache-Control header will lead to - // CloudFront using the default TTL, while the browser not seeing any caching header. - // This means we can have the CDN caching the documentation while just - // issuing a purge after a build. - // https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Expiration.html#ExpirationDownloadDist - None + match target_cdn { + TargetCdn::Fastly => FOREVER_IN_FASTLY_CDN.clone(), + TargetCdn::CloudFront => FOREVER_IN_CLOUDFRONT_CDN.clone(), + } } else { - Some(NO_CACHING.clone()) + NO_CACHING.clone() } } CachePolicy::ForeverInCdnAndStaleInBrowser => { - if config.cache_invalidatable_responses { - config - .cache_control_stale_while_revalidate - .map(|seconds| format!("stale-while-revalidate={seconds}").parse().unwrap()) - } else { - Some(NO_CACHING.clone()) + // when caching invalidatable responses is disabled, this results in NO_CACHING + let mut forever_in_cdn = CachePolicy::ForeverInCdn.render(config, target_cdn); + + if config.cache_invalidatable_responses + && let Some(cache_control) = + config.cache_control_stale_while_revalidate.map(|seconds| { + format!("stale-while-revalidate={seconds}") + .parse::() + .unwrap() + }) + { + forever_in_cdn.cache_control = Some(cache_control); } + + forever_in_cdn } } } } -pub(crate) async fn cache_middleware(req: AxumHttpRequest, next: Next) -> AxumResponse { - let config = req - .extensions() - .get::>() - .cloned() - .expect("missing config extension in request"); +/// All our routes use `{name}` to identify the crate name +/// in routes. +/// With this struct we can extract only that, if it exists. +#[derive(Deserialize)] +pub(crate) struct CrateParam { + name: Option, +} +pub(crate) async fn cache_middleware( + Path(param): Path, + matched_route: Option, + Extension(config): Extension>, + target_cdn: TargetCdn, + req: AxumHttpRequest, + next: Next, +) -> AxumResponse { let mut response = next.run(req).await; - let cache = response + debug_assert!( + !(response + .headers() + .keys() + .any(|h| { h == CACHE_CONTROL || h == SURROGATE_CONTROL || h == SURROGATE_KEY })), + "handlers should never set their own caching headers and only use CachePolicy to control caching. \n{:?}", + response.headers(), + ); + + // extract cache policy, default to "forbid caching everywhere". + // We only use cache policies in our successful responses (with content, or redirect), + // so any errors (4xx, 5xx) should always get "NoCaching". + let cache_policy = response .extensions() .get::() .unwrap_or(&CachePolicy::NoCaching); + let cache_headers = cache_policy.render(&config, target_cdn); + let resp_status = response.status(); + let resp_headers = response.headers_mut(); - if cfg!(test) { - assert!( - !response.headers().contains_key(CACHE_CONTROL), - "handlers should never set their own caching headers and only use CachePolicy to control caching." - ); + // early return for CloudFront, as it doesn't support the `Surrogate-Control` header, + // but also doesn't need surrogate keys. + // While that sounds nice, CloudFront invalidations with path prefixes are suuper slow, + // and have a concurrency limit. + if let TargetCdn::CloudFront = target_cdn { + debug_assert!(cache_headers.surrogate_control.is_none()); + cache_headers.set_on_response(resp_headers); + return response; } - if let Some(cache_directive) = cache.render(&config) { - response - .headers_mut() - .insert(CACHE_CONTROL, cache_directive); - } + // simple implementation first: + // This is for content we need to invalidate in the CDN level. + // We don't care about content that is filename-hashed and can be cached + // forever, or content that is not cached at all. + // + // Generally Fastly can either purge single URLs, or a whole service. + // When you want to purge the cache for a bigger subset, but not everything, you need to "tag" + // your content with surrogate keys when delivering it to Fastly for caching. + // https://www.fastly.com/documentation/guides/full-site-delivery/purging/working-with-surrogate-keys/ + // + // At some point we should extend this system and make it explicit, so in all places you return + // a cache policy you also return these surrogate keys, probably based on the krate, release + // or other things. For now we stick to invalidating the whole crate on all changes. + // + // For the first version I found an easy "hack" that doesn't need the full refactor across + // all our handlers; + // If the URL contains a crate name, we create a surrogate key based on that. + // Since we always call the crate name (and only the crate name) `{name}` in our routes, + // we're safe here. I added some debug assertions to ensure my assumptions are right, and + // any change to these in the routes would lead to test failures. + let cache_headers = if let Some(ref name) = param.name { + // we could theoretically only run this part when cache_invalidatable_responses and + // cache_headers.needs_cdn_invalidation are true, + // but let's always to this validation and add the surrogate-key to know if + // our "hack" still works. + // + // I didn't think through the possible edge-cases yet, but I feel safer + // always adding a surrogate key if we have one. + debug_assert!( + matched_route + .map(|matched_route| { + let matched_route = matched_route.as_str(); + matched_route.starts_with("/crate/{name}") + || matched_route.starts_with("/{name}") + }) + .unwrap_or(true), + "there shouldn't be a name on any other routes" + ); + if let Ok(krate_name) = name.parse::() { + let keys = SurrogateKeys::from_iter_until_full(vec![krate_name.into()]); + + resp_headers.typed_insert(keys); + + // only allow caching in the CDN when we have a surrogate key to invalidate it later. + // This is just the default for all routes that include a crate name. + // Then we build build & add the surrugate yet. + // It's totally possible that this policy here then states NO_CACHING, + // or FOREVER_IN_CDN_AND_BROWSER, where we wouln't need the surrogate key. + &cache_headers + } else { + // This theoretically shouldn't happen, all current crate names would be valid + // for surrogate keys, and the `KrateName` validation matches the crates.io crate + // publish validation. + // But I'll leave this error log here just in case, until I migrated to using the + // `KrateName` type in all entrypoints (web, builds). + if resp_status.is_success() || resp_status.is_redirection() { + error!( + name = param.name, + "failed to create surrogate key for crate" + ); + } + if cache_headers.needs_cdn_invalidation { + &NO_CACHING + } else { + &cache_headers + } + } + } else { + debug_assert!( + matched_route + .map(|matched_route| { + let matched_route = matched_route.as_str(); + !(matched_route.starts_with("/crate/{name}") + || matched_route.starts_with("/{name}")) + }) + .unwrap_or(true), + "for rustdoc & crate-detail routes the `name` param should always be present" + ); + debug_assert!( + !(config.cache_invalidatable_responses && cache_headers.needs_cdn_invalidation), + "We got to a route without crate name, and a cache policy that needs invalidation. + This doesn't work because Fastly only supports surrogate keys for partial + invalidation." + ); + + // standard case, just use the cache policy, no surrogate keys needed. + &cache_headers + }; + + cache_headers.set_on_response(resp_headers); + response } #[cfg(test)] mod tests { use super::*; - use crate::test::TestEnvironment; - use anyhow::Result; - use test_case::test_case; + use crate::test::{ + AxumResponseTestExt as _, TestEnvironment, assert_cache_headers_eq, + headers::{test_typed_decode, test_typed_encode}, + }; + use anyhow::{Context as _, Result}; + use axum::{Router, body::Body, http::Request, routing::get}; + use axum_extra::headers::CacheControl; + use itertools::Itertools as _; + use strum::IntoEnumIterator as _; + use test_case::{test_case, test_matrix}; + use tower::{ServiceBuilder, ServiceExt as _}; + + fn validate_cache_control(value: &HeaderValue) -> Result<()> { + assert!(!value.as_bytes().is_empty()); + + // first parse attempt. + // The `CacheControl` typed header impl will just skip over unknown directives. + let parsed: CacheControl = test_typed_decode(value.clone())?.unwrap(); + + // So we just re-render it, re-parse and compare both. + let re_rendered = test_typed_encode(parsed.clone()); + let re_parsed: CacheControl = test_typed_decode(re_rendered)?.unwrap(); + + assert_eq!(parsed, re_parsed); + + Ok(()) + } + + #[test] + fn test_const_response_consistency() { + assert_eq!( + FOREVER_IN_FASTLY_CDN.cache_control, + NO_CACHING.cache_control + ); + assert!(FOREVER_IN_CLOUDFRONT_CDN.cache_control.is_none()); + } + + #[test_matrix( + [true, false], + [Some(86400), None] + )] + fn test_validate_header_syntax_for_all_possible_combinations( + cache_invalidatable_responses: bool, + stale_while_revalidate: Option, + ) -> Result<()> { + let config = TestEnvironment::base_config() + .cache_invalidatable_responses(cache_invalidatable_responses) + .cache_control_stale_while_revalidate(stale_while_revalidate) + .build()?; + + for (policy, target_cdn) in CachePolicy::iter().cartesian_product(TargetCdn::iter()) { + let headers = policy.render(&config, target_cdn); + + if let Some(cache_control) = headers.cache_control { + validate_cache_control(&cache_control).with_context(|| { + format!( + "couldn't validate Cache-Control header syntax for policy {:?}, CDN: {:?}", + policy, target_cdn, + ) + })?; + } + + if let Some(surrogate_control) = headers.surrogate_control { + validate_cache_control(&surrogate_control).with_context(|| { + format!( + "couldn't validate Surrogate-Control header syntax for policy {:?}, CDN: {:?}", + policy, + target_cdn, + ) + })?; + } + } + Ok(()) + } #[test_case(CachePolicy::NoCaching, Some("max-age=0"))] #[test_case( @@ -130,12 +442,17 @@ mod tests { CachePolicy::ForeverInCdnAndStaleInBrowser, Some("stale-while-revalidate=86400") )] - fn render(cache: CachePolicy, expected: Option<&str>) -> Result<()> { + fn render(cache: CachePolicy, cache_control: Option<&str>) -> Result<()> { let config = TestEnvironment::base_config().build()?; + let headers = cache.render(&config, TargetCdn::CloudFront); + assert_eq!( - cache.render(&config), - expected.map(|s| HeaderValue::from_str(s).unwrap()) + headers.cache_control, + cache_control.map(|s| HeaderValue::from_str(s).unwrap()) ); + + assert!(headers.surrogate_control.is_none()); + Ok(()) } @@ -145,11 +462,10 @@ mod tests { .cache_control_stale_while_revalidate(None) .build()?; - assert!( - CachePolicy::ForeverInCdnAndStaleInBrowser - .render(&config) - .is_none() - ); + let headers = + CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); + assert!(headers.cache_control.is_none()); + assert!(headers.surrogate_control.is_none()); Ok(()) } @@ -160,12 +476,10 @@ mod tests { .cache_control_stale_while_revalidate(Some(666)) .build()?; - assert_eq!( - CachePolicy::ForeverInCdnAndStaleInBrowser - .render(&config) - .unwrap(), - "stale-while-revalidate=666" - ); + let headers = + CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); + assert_eq!(headers.cache_control.unwrap(), "stale-while-revalidate=666"); + assert!(headers.surrogate_control.is_none()); Ok(()) } @@ -176,10 +490,9 @@ mod tests { .cache_invalidatable_responses(false) .build()?; - assert_eq!( - CachePolicy::ForeverInCdn.render(&config).unwrap(), - "max-age=0" - ); + let headers = CachePolicy::ForeverInCdn.render(&config, TargetCdn::CloudFront); + assert_eq!(headers.cache_control.unwrap(), "max-age=0"); + assert!(headers.surrogate_control.is_none()); Ok(()) } @@ -190,12 +503,204 @@ mod tests { .cache_invalidatable_responses(false) .build()?; + let headers = + CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); + assert_eq!(headers.cache_control.unwrap(), "max-age=0"); + assert!(headers.surrogate_control.is_none()); + + Ok(()) + } + + #[test_case(CachePolicy::NoCaching, Some("max-age=0"), None)] + #[test_case( + CachePolicy::NoStoreMustRevalidate, + Some("no-cache, no-store, must-revalidate, max-age=0"), + None + )] + #[test_case( + CachePolicy::ForeverInCdnAndBrowser, + Some("public, max-age=31104000, immutable"), + None + )] + #[test_case(CachePolicy::ForeverInCdn, Some("max-age=0"), Some("max-age=31536000"))] + #[test_case( + CachePolicy::ForeverInCdnAndStaleInBrowser, + Some("stale-while-revalidate=86400"), + Some("max-age=31536000") + )] + fn render_fastly( + cache: CachePolicy, + cache_control: Option<&str>, + surrogate_control: Option<&str>, + ) -> Result<()> { + let config = TestEnvironment::base_config().build()?; + let headers = cache.render(&config, TargetCdn::Fastly); + + assert_eq!( + headers.cache_control, + cache_control.map(|s| HeaderValue::from_str(s).unwrap()) + ); + + assert_eq!( + headers.surrogate_control, + surrogate_control.map(|s| HeaderValue::from_str(s).unwrap()) + ); + + Ok(()) + } + + #[test] + fn render_stale_without_config_fastly() -> Result<()> { + let config = TestEnvironment::base_config() + .cache_control_stale_while_revalidate(None) + .build()?; + + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + assert_eq!(headers, FOREVER_IN_FASTLY_CDN); + + Ok(()) + } + + #[test] + fn render_stale_with_config_fastly() -> Result<()> { + let config = TestEnvironment::base_config() + .cache_control_stale_while_revalidate(Some(666)) + .build()?; + + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + assert_eq!(headers.cache_control.unwrap(), "stale-while-revalidate=666"); assert_eq!( - CachePolicy::ForeverInCdnAndStaleInBrowser - .render(&config) - .unwrap(), - "max-age=0" + headers.surrogate_control, + FOREVER_IN_FASTLY_CDN.surrogate_control + ); + + Ok(()) + } + + #[test] + fn render_forever_in_cdn_disabled_fastly() -> Result<()> { + let config = TestEnvironment::base_config() + .cache_invalidatable_responses(false) + .build()?; + + let headers = CachePolicy::ForeverInCdn.render(&config, TargetCdn::Fastly); + assert_eq!(headers.cache_control.unwrap(), "max-age=0"); + assert!(headers.surrogate_control.is_none()); + + Ok(()) + } + + #[test] + fn render_forever_in_cdn_or_stale_disabled_fastly() -> Result<()> { + let config = TestEnvironment::base_config() + .cache_invalidatable_responses(false) + .build()?; + + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + assert_eq!(headers.cache_control.unwrap(), "max-age=0"); + assert!(headers.surrogate_control.is_none()); + + Ok(()) + } + + #[test_case(TargetCdn::Fastly, &FOREVER_IN_FASTLY_CDN)] + #[test_case(TargetCdn::CloudFront, &FOREVER_IN_CLOUDFRONT_CDN)] + #[tokio::test] + async fn test_middleware_reacts_to_fastly_header_in_crate_route( + expected_target_cdn: TargetCdn, + expected_headers: &ResponseCacheHeaders, + ) -> Result<()> { + let config = TestEnvironment::base_config() + .cache_invalidatable_responses(true) + .build()?; + + let app = Router::new() + .route( + "/{name}", + get(move |target_cdn: TargetCdn| async move { + assert_eq!(target_cdn, expected_target_cdn); + + ( + // this cache policy leads to the same result in both CDNs + Extension(CachePolicy::ForeverInCdn), + "Hello, World!", + ) + }), + ) + .layer( + ServiceBuilder::new() + .layer(Extension(Arc::new(config))) + .layer(axum::middleware::from_fn(cache_middleware)), + ); + + let mut builder = Request::builder().uri("/krate"); + + if let TargetCdn::Fastly = expected_target_cdn { + builder = builder.header(FASTLY_FF, "some_value"); + } + + let response = app + .clone() + .oneshot(builder.body(Body::empty()).unwrap()) + .await?; + + assert!( + response.status().is_success(), + "{}", + response.text().await.unwrap(), ); + assert_cache_headers_eq(&response, expected_headers); + + Ok(()) + } + + #[test_case(TargetCdn::Fastly)] + #[test_case(TargetCdn::CloudFront)] + #[tokio::test] + async fn test_middleware_reacts_to_fastly_header_in_other_route( + expected_target_cdn: TargetCdn, + ) -> Result<()> { + let config = TestEnvironment::base_config().build()?; + + let app = Router::new() + .route( + "/", + get(move |target_cdn: TargetCdn| async move { + assert_eq!(target_cdn, expected_target_cdn); + + ( + // this cache policy leads to the same result in both CDNs + Extension(CachePolicy::ForeverInCdnAndBrowser), + "Hello, World!", + ) + }), + ) + .layer( + ServiceBuilder::new() + .layer(Extension(Arc::new(config))) + .layer(axum::middleware::from_fn(cache_middleware)), + ); + + let mut builder = Request::builder().uri("/"); + + if let TargetCdn::Fastly = expected_target_cdn { + builder = builder.header(FASTLY_FF, "some_value"); + } + + let response = app + .clone() + .oneshot(builder.body(Body::empty()).unwrap()) + .await?; + + assert!( + response.status().is_success(), + "{}", + response.text().await.unwrap(), + ); + + // this cache policy leads to the same result in both CDNs + assert_cache_headers_eq(&response, &FOREVER_IN_CDN_AND_BROWSER); + Ok(()) } } diff --git a/src/web/headers.rs b/src/web/headers/canonical_url.rs similarity index 98% rename from src/web/headers.rs rename to src/web/headers/canonical_url.rs index e5650547b..c7560c807 100644 --- a/src/web/headers.rs +++ b/src/web/headers/canonical_url.rs @@ -1,4 +1,4 @@ -use super::escaped_uri::EscapedURI; +use crate::web::escaped_uri::EscapedURI; use anyhow::Result; use askama::filters::HtmlSafe; use axum::http::uri::Uri; diff --git a/src/web/headers/mod.rs b/src/web/headers/mod.rs new file mode 100644 index 000000000..e89abd1da --- /dev/null +++ b/src/web/headers/mod.rs @@ -0,0 +1,10 @@ +mod canonical_url; +mod surrogate_key; + +pub use canonical_url::CanonicalUrl; +use http::HeaderName; +pub use surrogate_key::{SURROGATE_KEY, SurrogateKey, SurrogateKeys}; + +/// Fastly's Surrogate-Control header +/// https://www.fastly.com/documentation/reference/http/http-headers/Surrogate-Control/ +pub static SURROGATE_CONTROL: HeaderName = HeaderName::from_static("surrogate-control"); diff --git a/src/web/headers/surrogate_key.rs b/src/web/headers/surrogate_key.rs new file mode 100644 index 000000000..cf1ff64f9 --- /dev/null +++ b/src/web/headers/surrogate_key.rs @@ -0,0 +1,239 @@ +//! Structs to build Surrogate-Key header for Fastly CDN +//! see +//! https://www.fastly.com/documentation/reference/http/http-headers/Surrogate-Key/haeders.surrogate keys + +use anyhow::{Context as _, bail}; +use axum_extra::headers::{self, Header}; +use derive_more::Deref; +use http::{HeaderName, HeaderValue}; +use itertools::Itertools as _; +use std::{fmt::Display, iter, str::FromStr}; + +use crate::db::types::krate_name::KrateName; + +pub static SURROGATE_KEY: HeaderName = HeaderName::from_static("surrogate-key"); + +/// a single surrogate key. +/// +/// The typical Fastly `Surrogate-Key` header might include more than one. +#[derive(Debug, Clone, Deref, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SurrogateKey(HeaderValue); + +impl FromStr for SurrogateKey { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + // From the Fastly documentation: + // + // > Surrogate keys must contain only printable ASCII characters + // > (those between 0x21 and 0x7E, inclusive). + // > Any invalid keys will be ignored. + // > Individual keys are limited to 1024 bytes in length, and the + // > total length of the Surrogate-Key header may not exceed 16,384 bytes. + // > If either of these limits are reached while parsing a Surrogate-Key header, + // > the key currently being parsed and all keys following it within the same header + // > will be ignored. + // + // https://www.fastly.com/documentation/reference/http/http-headers/Surrogate-Key/ + + if s.is_empty() { + bail!("surrogate key cannot be empty"); + } + if s.len() > 1024 { + bail!("surrogate key exceeds maximum length of 1024 bytes"); + } + if !s.as_bytes().iter().all(|b| (0x21..=0x7E).contains(b)) { + bail!("invalid character found"); + } + + Ok(SurrogateKey(s.parse().context("invalid header value")?)) + } +} + +impl PartialEq for SurrogateKey +where + T: AsRef, +{ + fn eq(&self, other: &T) -> bool { + self.0 == other.as_ref() + } +} + +/// Create a surrogate key from a crate name. +impl From for SurrogateKey { + fn from(value: KrateName) -> Self { + // valid crate names only contain chars that are also + // valid in surrogate keys. + // And all these are also valid in header-values. + let key = format!("crate-{}", value); + Self( + key.parse() + .expect("crate name that can't be parsed into HeaderValue"), + ) + } +} + +/// A full Fastly Surrogate-Key header, containing one or more keys. +#[derive(Debug, PartialEq)] +pub struct SurrogateKeys(Vec); + +impl Display for SurrogateKeys { + #[allow(unstable_name_collisions)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for key_or_space in self + .0 + .iter() + .map(|key| { + key.0.to_str().expect( + "single SurrogateKeys can only be created from strings, so this always succeeds", + ) + }) + .intersperse(" ") + { + write!(f, "{}", key_or_space)?; + } + Ok(()) + } +} + +impl Header for SurrogateKeys { + fn name() -> &'static http::HeaderName { + &SURROGATE_KEY + } + + fn decode<'i, I>(_values: &mut I) -> Result + where + Self: Sized, + I: Iterator, + { + unimplemented!(); + } + + fn encode>(&self, values: &mut E) { + let header_value: HeaderValue = self.to_string().parse().expect( + "we know the single keys are valid HeaderValue, and valid Strings, + so after joining using spaces, the joined version one must be valid too", + ); + values.extend(iter::once(header_value)) + } +} + +impl SurrogateKeys { + /// Build SurrogateKeys from an iterator, de-duplicating keys. + /// Takes only as many elements as would fit into the header, + /// then stops consuming the iterator. + pub fn from_iter_until_full(iter: I) -> Self + where + I: IntoIterator, + { + // From the Fastly documentation: + // + // > [...] and the total length of the Surrogate-Key header may not + // > exceed 16,384 bytes. + // + // https://www.fastly.com/documentation/reference/http/http-headers/Surrogate-Key/ + + const MAX_LEN: u64 = 16_384; + + let mut current_key_size: u64 = 0; + + SurrogateKeys( + iter.into_iter() + .unique() + .take_while(|key| { + let key_size = key.len() as u64 + 1; // +1 for the space or terminator + if current_key_size + key_size > MAX_LEN { + false + } else { + current_key_size += key_size; + true + } + }) + .collect(), + ) + } + + #[cfg(test)] + pub fn encoded_len(&self) -> usize { + self.0.iter().map(|k| k.0.len() + 1).sum::() + } + + pub fn key_count(&self) -> usize { + self.0.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::headers::test_typed_encode; + use std::ops::RangeInclusive; + use test_case::test_case; + + #[test] + fn test_parse_surrogate_key_too_long() { + let input = "X".repeat(1025); + assert!(SurrogateKey::from_str(&input).is_err()); + } + + #[test_case(""; "empty")] + #[test_case(" "; "space")] + #[test_case("\n"; "newline")] + fn test_parse_surrogate_key_err(input: &str) { + assert!(SurrogateKey::from_str(input).is_err()); + } + + #[test_case("some-key")] + #[test_case("1234")] + #[test_case("crate-some-crate")] + #[test_case("release-some-crate-1.2.3")] + fn test_parse_surrogate_key_ok(input: &str) { + assert_eq!(SurrogateKey::from_str(input).unwrap(), input); + } + + #[test] + fn test_encode() -> anyhow::Result<()> { + let k1 = SurrogateKey::from_str("key-2").unwrap(); + let k2 = SurrogateKey::from_str("key-1").unwrap(); + // this key is duplicate, should be removed + let k3 = SurrogateKey::from_str("key-2").unwrap(); + + assert_eq!(k1, k3); + assert_ne!(k1, k2); + assert_ne!(k3, k2); + + assert_eq!( + test_typed_encode(SurrogateKeys::from_iter_until_full([k1, k2, k3])), + "key-2 key-1" + ); + + Ok(()) + } + + #[test_case('0'..='9'; "numbers")] + #[test_case('a'..='z'; "lower case")] + #[test_case('A'..='Z'; "upper case")] + fn test_from_krate_name(range: RangeInclusive) { + // ensure that the valid character range for crate names also fits + // into surrogate keys, and header values. + for ch in range { + let name = format!("k{}", ch); + let krate_name: KrateName = name.parse().unwrap(); + let surrogate_key: SurrogateKey = krate_name.into(); + assert_eq!(surrogate_key, format!("crate-{name}")); + } + } + + #[test] + fn test_try_from_iter_checks_full_length() -> anyhow::Result<()> { + let mut it = (0..10_000).map(|n| SurrogateKey::from_str(&format!("key-{n}")).unwrap()); + + let first_key = SurrogateKeys::from_iter_until_full(&mut it); + assert_eq!(first_key.encoded_len(), 16377); // < the max length of 16384 + + // elements remaining in the iterator + assert_eq!(it.count(), 8056); + + Ok(()) + } +} diff --git a/src/web/mod.rs b/src/web/mod.rs index cf3400759..f4492d60b 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -29,7 +29,7 @@ mod escaped_uri; mod extractors; mod features; mod file; -mod headers; +pub(crate) mod headers; mod highlight; mod licenses; mod markdown; diff --git a/src/web/releases.rs b/src/web/releases.rs index a8b4175ce..84d997ada 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -735,11 +735,12 @@ pub(crate) async fn build_queue_handler( mut conn: DbConnection, Query(params): Query, ) -> AxumResult { - let mut active_cdn_deployments: Vec<_> = cdn::queued_or_active_crate_invalidations(&mut conn) - .await? - .into_iter() - .map(|i| i.krate) - .collect(); + let mut active_cdn_deployments: Vec<_> = + cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn) + .await? + .into_iter() + .map(|i| i.krate) + .collect(); // deduplicate the list of crates while keeping their order let mut set = HashSet::new(); @@ -1822,7 +1823,13 @@ mod tests { let web = env.web_app().await; let mut conn = env.async_db().async_conn().await; - cdn::queue_crate_invalidation(&mut conn, env.config(), "krate_2").await?; + cdn::queue_crate_invalidation( + &mut conn, + env.config(), + env.cdn_metrics(), + &"krate_2".parse().unwrap(), + ) + .await?; let content = kuchikiki::parse_html().one(web.get("/releases/queue").await?.text().await?); assert!( diff --git a/src/web/rustdoc.rs b/src/web/rustdoc.rs index fe34c60d4..32d2969ab 100644 --- a/src/web/rustdoc.rs +++ b/src/web/rustdoc.rs @@ -750,6 +750,8 @@ pub(crate) async fn json_download_handler( // TODO: we could also additionally read the accept-encoding header here. But especially // in combination with priorities it's complex to parse correctly. So for now only // file extensions in the URL. + // When using Accept-Encoding, we also have to return "Vary: Accept-Encoding" to ensure + // the cache behaves correctly. let wanted_compression = if let Some(ext) = file_extension.map(|ext| ext.0) { Some(compression_from_file_extension(&ext).ok_or_else(|| { diff --git a/src/web/statics.rs b/src/web/statics.rs index 839c1bf47..99266634e 100644 --- a/src/web/statics.rs +++ b/src/web/statics.rs @@ -131,7 +131,7 @@ mod tests { let web = env.web_app().await; let resp = web.get("/-/static/vendored.css").await?; - assert!(resp.status().is_success()); + assert!(resp.status().is_success(), "{}", resp.text().await?); resp.assert_cache_control(CachePolicy::ForeverInCdnAndBrowser, env.config()); assert_eq!( resp.headers().get("Content-Type"),