Skip to content

Commit be886c6

Browse files
committed
cdn: additional support for Fastly caching headers & invalidation
1 parent 2682ed4 commit be886c6

25 files changed

+1430
-230
lines changed

.sqlx/query-0107ab57a47a423721cc6257cf1572348bf76ecf16632fe625ebafa17f45738a.json

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-f550ed904fdb5d3ee6581fe1ad036c9b5b8db8765d5665042deb9ade67394d3c.json

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ chrono = { version = "0.4.11", default-features = false, features = ["clock", "s
109109
# Transitive dependencies we don't use directly but need to have specific versions of
110110
thread_local = "1.1.3"
111111
constant_time_eq = "0.4.2"
112+
fastly-api = "12.0.0"
112113

113114
[dev-dependencies]
114115
criterion = "0.7.0"

src/build_queue.rs

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::{
2-
BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder, cdn,
2+
BuildPackageSummary, Config, Context, Index, InstanceMetrics, RustwideBuilder,
3+
cdn::{self, CdnMetrics},
34
db::{
4-
CrateId, Pool, delete_crate, delete_version, types::version::Version,
5+
CrateId, Pool, delete_crate, delete_version,
6+
types::{krate_name::KrateName, version::Version},
57
update_latest_version_id,
68
},
79
docbuilder::{BuilderMetrics, PackageKind},
@@ -62,6 +64,7 @@ pub struct AsyncBuildQueue {
6264
metrics: Arc<InstanceMetrics>,
6365
queue_metrics: BuildQueueMetrics,
6466
builder_metrics: Arc<BuilderMetrics>,
67+
cdn_metrics: Arc<CdnMetrics>,
6568
max_attempts: i32,
6669
}
6770

@@ -71,6 +74,7 @@ impl AsyncBuildQueue {
7174
metrics: Arc<InstanceMetrics>,
7275
config: Arc<Config>,
7376
storage: Arc<AsyncStorage>,
77+
cdn_metrics: Arc<CdnMetrics>,
7478
otel_meter_provider: &AnyMeterProvider,
7579
) -> Self {
7680
AsyncBuildQueue {
@@ -81,6 +85,7 @@ impl AsyncBuildQueue {
8185
storage,
8286
queue_metrics: BuildQueueMetrics::new(otel_meter_provider),
8387
builder_metrics: Arc::new(BuilderMetrics::new(otel_meter_provider)),
88+
cdn_metrics,
8489
}
8590
}
8691

@@ -259,6 +264,25 @@ impl AsyncBuildQueue {
259264

260265
/// Index methods.
261266
impl AsyncBuildQueue {
267+
async fn queue_crate_invalidation(&self, conn: &mut sqlx::PgConnection, krate: &str) {
268+
let krate = match krate
269+
.parse::<KrateName>()
270+
.with_context(|| format!("can't parse crate name '{}'", krate))
271+
{
272+
Ok(krate) => krate,
273+
Err(err) => {
274+
report_error(&err);
275+
return;
276+
}
277+
};
278+
279+
if let Err(err) =
280+
cdn::queue_crate_invalidation(conn, &self.config, &self.cdn_metrics, &krate).await
281+
{
282+
report_error(&err);
283+
}
284+
}
285+
262286
/// Updates registry index repository and adds new crates into build queue.
263287
///
264288
/// Returns the number of crates added
@@ -296,11 +320,8 @@ impl AsyncBuildQueue {
296320
),
297321
Err(err) => report_error(&err),
298322
}
299-
if let Err(err) =
300-
cdn::queue_crate_invalidation(&mut conn, &self.config, krate).await
301-
{
302-
report_error(&err);
303-
}
323+
324+
self.queue_crate_invalidation(&mut conn, krate).await;
304325
continue;
305326
}
306327

@@ -328,11 +349,9 @@ impl AsyncBuildQueue {
328349
),
329350
Err(err) => report_error(&err),
330351
}
331-
if let Err(err) =
332-
cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await
333-
{
334-
report_error(&err);
335-
}
352+
353+
self.queue_crate_invalidation(&mut conn, &release.name)
354+
.await;
336355
continue;
337356
}
338357

@@ -387,11 +406,8 @@ impl AsyncBuildQueue {
387406
report_error(&err);
388407
}
389408

390-
if let Err(err) =
391-
cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await
392-
{
393-
report_error(&err);
394-
}
409+
self.queue_crate_invalidation(&mut conn, &release.name)
410+
.await;
395411
}
396412
}
397413

@@ -581,13 +597,11 @@ impl BuildQueue {
581597

582598
self.inner.metrics.total_builds.inc();
583599
self.inner.builder_metrics.total_builds.add(1, &[]);
584-
if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation(
585-
&mut transaction,
586-
&self.inner.config,
587-
&to_process.name,
588-
)) {
589-
report_error(&err);
590-
}
600+
601+
self.runtime.block_on(
602+
self.inner
603+
.queue_crate_invalidation(&mut transaction, &to_process.name),
604+
);
591605

592606
let mut increase_attempt_count = || -> Result<()> {
593607
let attempt: i32 = self.runtime.block_on(
@@ -1119,7 +1133,7 @@ mod tests {
11191133
assert!(
11201134
env.runtime()
11211135
.block_on(async {
1122-
cdn::queued_or_active_crate_invalidations(
1136+
cdn::cloudfront::queued_or_active_crate_invalidations(
11231137
&mut *env.async_db().async_conn().await,
11241138
)
11251139
.await
@@ -1148,7 +1162,7 @@ mod tests {
11481162
env.runtime()
11491163
.block_on(async {
11501164
let mut conn = env.async_db().async_conn().await;
1151-
cdn::queued_or_active_crate_invalidations(&mut conn).await
1165+
cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn).await
11521166
})
11531167
.unwrap()
11541168
};

src/cdn.rs renamed to src/cdn/cloudfront.rs

Lines changed: 17 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
use crate::{
2-
Config, InstanceMetrics,
3-
metrics::{CDN_INVALIDATION_HISTOGRAM_BUCKETS, otel::AnyMeterProvider},
4-
utils::report_error,
5-
};
1+
use super::CdnMetrics;
2+
use crate::{Config, InstanceMetrics, utils::report_error};
63
use anyhow::{Context, Error, Result, anyhow, bail};
74
use aws_config::BehaviorVersion;
85
use aws_sdk_cloudfront::{
@@ -13,7 +10,7 @@ use aws_sdk_cloudfront::{
1310
};
1411
use chrono::{DateTime, Utc};
1512
use futures_util::stream::TryStreamExt;
16-
use opentelemetry::{KeyValue, metrics::Histogram};
13+
use opentelemetry::KeyValue;
1714
use serde::Serialize;
1815
use sqlx::Connection as _;
1916
use std::{
@@ -29,31 +26,6 @@ use uuid::Uuid;
2926
/// triggered invalidations
3027
const MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS: i32 = 13;
3128

32-
#[derive(Debug)]
33-
pub struct CdnMetrics {
34-
invalidation_time: Histogram<f64>,
35-
queue_time: Histogram<f64>,
36-
}
37-
38-
impl CdnMetrics {
39-
pub fn new(meter_provider: &AnyMeterProvider) -> Self {
40-
let meter = meter_provider.meter("cdn");
41-
const PREFIX: &str = "docsrs.cdn";
42-
Self {
43-
invalidation_time: meter
44-
.f64_histogram(format!("{PREFIX}.invalidation_time"))
45-
.with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec())
46-
.with_unit("s")
47-
.build(),
48-
queue_time: meter
49-
.f64_histogram(format!("{PREFIX}.queue_time"))
50-
.with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec())
51-
.with_unit("s")
52-
.build(),
53-
}
54-
}
55-
}
56-
5729
#[derive(Debug, EnumString)]
5830
pub enum CdnKind {
5931
#[strum(ascii_case_insensitive)]
@@ -567,57 +539,6 @@ pub(crate) async fn handle_queued_invalidation_requests(
567539
Ok(())
568540
}
569541

570-
#[instrument(skip(conn, config))]
571-
pub(crate) async fn queue_crate_invalidation(
572-
conn: &mut sqlx::PgConnection,
573-
config: &Config,
574-
name: &str,
575-
) -> Result<()> {
576-
if !config.cache_invalidatable_responses {
577-
info!("full page cache disabled, skipping queueing invalidation");
578-
return Ok(());
579-
}
580-
581-
async fn add(
582-
conn: &mut sqlx::PgConnection,
583-
name: &str,
584-
distribution_id: &str,
585-
path_patterns: &[&str],
586-
) -> Result<()> {
587-
for pattern in path_patterns {
588-
debug!(distribution_id, pattern, "enqueueing web CDN invalidation");
589-
sqlx::query!(
590-
"INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern)
591-
VALUES ($1, $2, $3)",
592-
name,
593-
distribution_id,
594-
pattern
595-
)
596-
.execute(&mut *conn)
597-
.await?;
598-
}
599-
Ok(())
600-
}
601-
602-
if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() {
603-
add(
604-
conn,
605-
name,
606-
distribution_id,
607-
&[&format!("/{name}*"), &format!("/crate/{name}*")],
608-
)
609-
.await
610-
.context("error enqueueing web CDN invalidation")?;
611-
}
612-
if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() {
613-
add(conn, name, distribution_id, &[&format!("/rustdoc/{name}*")])
614-
.await
615-
.context("error enqueueing static CDN invalidation")?;
616-
}
617-
618-
Ok(())
619-
}
620-
621542
#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)]
622543
pub(crate) struct QueuedInvalidation {
623544
pub krate: String,
@@ -683,14 +604,12 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(
683604

684605
#[cfg(test)]
685606
mod tests {
686-
use std::time::Duration;
687-
688607
use super::*;
689-
use crate::test::TestEnvironment;
690-
608+
use crate::{cdn::queue_crate_invalidation, test::TestEnvironment};
691609
use aws_sdk_cloudfront::{Config, config::Credentials};
692610
use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient};
693611
use aws_smithy_types::body::SdkBody;
612+
use std::time::Duration;
694613

695614
const DISTRIBUTION_ID_WEB: &str = "distribution_id_web";
696615
const DISTRIBUTION_ID_STATIC: &str = "distribution_id_static";
@@ -817,7 +736,9 @@ mod tests {
817736
.is_empty()
818737
);
819738

820-
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
739+
let metrics = otel_metrics(&env);
740+
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
741+
.await?;
821742

822743
// invalidation paths are queued.
823744
assert_eq!(
@@ -922,7 +843,9 @@ mod tests {
922843
.is_empty()
923844
);
924845

925-
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
846+
let metrics = otel_metrics(&env);
847+
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
848+
.await?;
926849

927850
// invalidation paths are queued.
928851
assert_eq!(
@@ -1088,7 +1011,9 @@ mod tests {
10881011
.await?;
10891012

10901013
// queue an invalidation
1091-
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
1014+
let metrics = otel_metrics(&env);
1015+
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
1016+
.await?;
10921017

10931018
let metrics = otel_metrics(&env);
10941019

@@ -1150,7 +1075,9 @@ mod tests {
11501075
.await?;
11511076

11521077
// queue an invalidation
1153-
queue_crate_invalidation(&mut conn, env.config(), "krate").await?;
1078+
let metrics = otel_metrics(&env);
1079+
queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap())
1080+
.await?;
11541081

11551082
let metrics = otel_metrics(&env);
11561083

0 commit comments

Comments
 (0)