Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ chrono = { version = "0.4.11", default-features = false, features = ["clock", "s
# Transitive dependencies we don't use directly but need to have specific versions of
thread_local = "1.1.3"
constant_time_eq = "0.4.2"
fastly-api = "12.0.0"

[dev-dependencies]
criterion = "0.7.0"
Expand Down
66 changes: 40 additions & 26 deletions src/build_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder, cdn,
BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder,
cdn::{self, CdnMetrics},
db::{
CrateId, Pool, delete_crate, delete_version, types::version::Version,
CrateId, Pool, delete_crate, delete_version,
types::{krate_name::KrateName, version::Version},
update_latest_version_id,
},
docbuilder::{BuilderMetrics, PackageKind},
Expand Down Expand Up @@ -62,6 +64,7 @@ pub struct AsyncBuildQueue {
metrics: Arc<InstanceMetrics>,
queue_metrics: BuildQueueMetrics,
builder_metrics: Arc<BuilderMetrics>,
cdn_metrics: Arc<CdnMetrics>,
max_attempts: i32,
}

Expand All @@ -71,6 +74,7 @@ impl AsyncBuildQueue {
metrics: Arc<InstanceMetrics>,
config: Arc<Config>,
storage: Arc<AsyncStorage>,
cdn_metrics: Arc<CdnMetrics>,
otel_meter_provider: &AnyMeterProvider,
) -> Self {
AsyncBuildQueue {
Expand All @@ -81,6 +85,7 @@ impl AsyncBuildQueue {
storage,
queue_metrics: BuildQueueMetrics::new(otel_meter_provider),
builder_metrics: Arc::new(BuilderMetrics::new(otel_meter_provider)),
cdn_metrics,
}
}

Expand Down Expand Up @@ -259,6 +264,25 @@ impl AsyncBuildQueue {

/// Index methods.
impl AsyncBuildQueue {
async fn queue_crate_invalidation(&self, conn: &mut sqlx::PgConnection, krate: &str) {
let krate = match krate
.parse::<KrateName>()
.with_context(|| format!("can't parse crate name '{}'", krate))
{
Ok(krate) => krate,
Err(err) => {
report_error(&err);
return;
}
};

if let Err(err) =
cdn::queue_crate_invalidation(conn, &self.config, &self.cdn_metrics, &krate).await
{
report_error(&err);
}
}

/// Updates registry index repository and adds new crates into build queue.
///
/// Returns the number of crates added
Expand Down Expand Up @@ -296,11 +320,8 @@ impl AsyncBuildQueue {
),
Err(err) => report_error(&err),
}
if let Err(err) =
cdn::queue_crate_invalidation(&mut conn, &self.config, krate).await
{
report_error(&err);
}

self.queue_crate_invalidation(&mut conn, krate).await;
continue;
}

Expand Down Expand Up @@ -328,11 +349,9 @@ impl AsyncBuildQueue {
),
Err(err) => report_error(&err),
}
if let Err(err) =
cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await
{
report_error(&err);
}

self.queue_crate_invalidation(&mut conn, &release.name)
.await;
continue;
}

Expand Down Expand Up @@ -387,11 +406,8 @@ impl AsyncBuildQueue {
report_error(&err);
}

if let Err(err) =
cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await
{
report_error(&err);
}
self.queue_crate_invalidation(&mut conn, &release.name)
.await;
}
}

Expand Down Expand Up @@ -581,13 +597,11 @@ impl BuildQueue {

self.inner.metrics.total_builds.inc();
self.inner.builder_metrics.total_builds.add(1, &[]);
if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation(
&mut transaction,
&self.inner.config,
&to_process.name,
)) {
report_error(&err);
}

self.runtime.block_on(
self.inner
.queue_crate_invalidation(&mut transaction, &to_process.name),
);

let mut increase_attempt_count = || -> Result<()> {
let attempt: i32 = self.runtime.block_on(
Expand Down Expand Up @@ -1119,7 +1133,7 @@ mod tests {
assert!(
env.runtime()
.block_on(async {
cdn::queued_or_active_crate_invalidations(
cdn::cloudfront::queued_or_active_crate_invalidations(
&mut *env.async_db().async_conn().await,
)
.await
Expand Down Expand Up @@ -1148,7 +1162,7 @@ mod tests {
env.runtime()
.block_on(async {
let mut conn = env.async_db().async_conn().await;
cdn::queued_or_active_crate_invalidations(&mut conn).await
cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn).await
})
.unwrap()
};
Expand Down
107 changes: 17 additions & 90 deletions src/cdn.rs → src/cdn/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
Config, InstanceMetrics,
metrics::{CDN_INVALIDATION_HISTOGRAM_BUCKETS, otel::AnyMeterProvider},
utils::report_error,
};
use super::CdnMetrics;
use crate::{Config, InstanceMetrics, utils::report_error};
use anyhow::{Context, Error, Result, anyhow, bail};
use aws_config::BehaviorVersion;
use aws_sdk_cloudfront::{
Expand All @@ -13,7 +10,7 @@ use aws_sdk_cloudfront::{
};
use chrono::{DateTime, Utc};
use futures_util::stream::TryStreamExt;
use opentelemetry::{KeyValue, metrics::Histogram};
use opentelemetry::KeyValue;
use serde::Serialize;
use sqlx::Connection as _;
use std::{
Expand All @@ -29,31 +26,6 @@ use uuid::Uuid;
/// triggered invalidations
const MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS: i32 = 13;

#[derive(Debug)]
pub struct CdnMetrics {
invalidation_time: Histogram<f64>,
queue_time: Histogram<f64>,
}

impl CdnMetrics {
pub fn new(meter_provider: &AnyMeterProvider) -> Self {
let meter = meter_provider.meter("cdn");
const PREFIX: &str = "docsrs.cdn";
Self {
invalidation_time: meter
.f64_histogram(format!("{PREFIX}.invalidation_time"))
.with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec())
.with_unit("s")
.build(),
queue_time: meter
.f64_histogram(format!("{PREFIX}.queue_time"))
.with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec())
.with_unit("s")
.build(),
}
}
}

#[derive(Debug, EnumString)]
pub enum CdnKind {
#[strum(ascii_case_insensitive)]
Expand Down Expand Up @@ -567,57 +539,6 @@ pub(crate) async fn handle_queued_invalidation_requests(
Ok(())
}

#[instrument(skip(conn, config))]
pub(crate) async fn queue_crate_invalidation(
conn: &mut sqlx::PgConnection,
config: &Config,
name: &str,
) -> Result<()> {
if !config.cache_invalidatable_responses {
info!("full page cache disabled, skipping queueing invalidation");
return Ok(());
}

async fn add(
conn: &mut sqlx::PgConnection,
name: &str,
distribution_id: &str,
path_patterns: &[&str],
) -> Result<()> {
for pattern in path_patterns {
debug!(distribution_id, pattern, "enqueueing web CDN invalidation");
sqlx::query!(
"INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern)
VALUES ($1, $2, $3)",
name,
distribution_id,
pattern
)
.execute(&mut *conn)
.await?;
}
Ok(())
}

if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() {
add(
conn,
name,
distribution_id,
&[&format!("/{name}*"), &format!("/crate/{name}*")],
)
.await
.context("error enqueueing web CDN invalidation")?;
}
if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() {
add(conn, name, distribution_id, &[&format!("/rustdoc/{name}*")])
.await
.context("error enqueueing static CDN invalidation")?;
}

Ok(())
}

#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)]
pub(crate) struct QueuedInvalidation {
pub krate: String,
Expand Down Expand Up @@ -683,14 +604,12 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::test::TestEnvironment;

use crate::{cdn::queue_crate_invalidation, test::TestEnvironment};
use aws_sdk_cloudfront::{Config, config::Credentials};
use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient};
use aws_smithy_types::body::SdkBody;
use std::time::Duration;

const DISTRIBUTION_ID_WEB: &str = "distribution_id_web";
const DISTRIBUTION_ID_STATIC: &str = "distribution_id_static";
Expand Down Expand Up @@ -817,7 +736,9 @@ mod tests {
.is_empty()
);

queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
let metrics = otel_metrics(&env);
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
.await?;

// invalidation paths are queued.
assert_eq!(
Expand Down Expand Up @@ -922,7 +843,9 @@ mod tests {
.is_empty()
);

queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
let metrics = otel_metrics(&env);
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
.await?;

// invalidation paths are queued.
assert_eq!(
Expand Down Expand Up @@ -1088,7 +1011,9 @@ mod tests {
.await?;

// queue an invalidation
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
let metrics = otel_metrics(&env);
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
.await?;

let metrics = otel_metrics(&env);

Expand Down Expand Up @@ -1150,7 +1075,9 @@ mod tests {
.await?;

// queue an invalidation
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
let metrics = otel_metrics(&env);
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
.await?;

let metrics = otel_metrics(&env);

Expand Down
Loading
Loading