From 1cdfb30484cf03a7248718f939fce790bc55b689 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 12 Nov 2025 22:24:15 +0100 Subject: [PATCH] temporary command to recompress broken index files in storage --- ...4d9943a38c16ca49ba014d8227df1b605bad9.json | 41 ++ Cargo.toml | 2 +- Justfile | 7 +- src/bin/cratesfyi.rs | 34 +- src/db/types/mod.rs | 4 +- src/storage/archive_index.rs | 6 +- src/storage/mod.rs | 432 +++++++++++++++++- 7 files changed, 507 insertions(+), 19 deletions(-) create mode 100644 .sqlx/query-29e5ac519965abe821ae15c21014d9943a38c16ca49ba014d8227df1b605bad9.json diff --git a/.sqlx/query-29e5ac519965abe821ae15c21014d9943a38c16ca49ba014d8227df1b605bad9.json b/.sqlx/query-29e5ac519965abe821ae15c21014d9943a38c16ca49ba014d8227df1b605bad9.json new file mode 100644 index 000000000..d2c498571 --- /dev/null +++ b/.sqlx/query-29e5ac519965abe821ae15c21014d9943a38c16ca49ba014d8227df1b605bad9.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n r.id,\n c.name,\n r.version as \"version: Version\",\n r.release_time\n FROM\n crates AS c\n INNER JOIN releases AS r ON r.crate_id = c.id\n WHERE\n r.archive_storage IS TRUE AND\n r.id >= $1 AND\n r.id <= $2\n ORDER BY\n r.id DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "version: Version", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "release_time", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [ + false, + false, + false, + true + ] + }, + "hash": "29e5ac519965abe821ae15c21014d9943a38c16ca49ba014d8227df1b605bad9" +} diff --git a/Cargo.toml b/Cargo.toml index 67b3a9b21..810f94018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ bzip2 = "0.6.0" getrandom = "0.3.1" itertools = { version = "0.14.0" } hex = "0.4.3" -derive_more = { version = "2.0.0", features = ["display", "deref", "from", "into"] } +derive_more = { version = "2.0.0", features = ["display", "deref", "from", "into", "from_str"] } sysinfo = { version = "0.37.2", default-features = false, features = ["system"] } derive_builder = "0.20.2" diff --git a/Justfile b/Justfile index 14016ee71..a767a7979 100644 --- a/Justfile +++ b/Justfile @@ -11,8 +11,11 @@ sqlx-prepare ADDITIONAL_ARGS="": sqlx-check: just sqlx-prepare "--check" -lint: - cargo clippy --all-features --all-targets --workspace --locked -- -D warnings +lint *args: + cargo clippy --all-features --all-targets --workspace --locked {{ args }} -- -D warnings + +lint-fix: + just lint --fix --allow-dirty --allow-staged lint-js *args: deno run -A npm:eslint@9 static templates gui-tests eslint.config.js {{ args }} diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 6337379f9..dc2a63fd4 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -2,7 +2,7 @@ use anyhow::{Context as _, Result, anyhow}; use clap::{Parser, Subcommand, ValueEnum}; use docs_rs::{ Config, Context, PackageKind, RustwideBuilder, - db::{self, CrateId, Overrides, add_path_into_database, types::version::Version}, + db::{self, CrateId, Overrides, ReleaseId, add_path_into_database, types::version::Version}, start_background_metrics_webserver, start_web_server, utils::{ ConfigName, get_config, get_crate_pattern_and_priority, list_crate_priorities, @@ -531,6 +531,16 @@ enum DatabaseSubcommand { /// Backfill GitHub/GitLab stats for crates. BackfillRepositoryStats, + /// Recompress broken archive index files in storage. + RecompressArchiveIndexes { + #[arg(long)] + min_release_id: Option, + #[arg(long)] + max_release_id: Option, + #[arg(long)] + concurrency: Option, + }, + /// Updates info for a crate from the registry's API UpdateCrateRegistryFields { #[arg(name = "CRATE")] @@ -619,6 +629,28 @@ impl DatabaseSubcommand { db::update_crate_data_in_database(&mut conn, &name, ®istry_data).await })?, + Self::RecompressArchiveIndexes { + min_release_id, + max_release_id, + concurrency, + } => ctx.runtime.block_on(async move { + let mut conn = ctx.pool.get_async().await?; + + let (checked, recompressed) = ctx + .async_storage + .recompress_index_files_in_bucket( + &mut conn, + min_release_id, + max_release_id, + concurrency.map(Into::into), + ) + .await?; + + println!("{} index files checked", checked); + println!("{} index files recompressed", recompressed); + Ok::<_, anyhow::Error>(()) + })?, + Self::AddDirectory { directory } => { ctx.runtime .block_on(add_path_into_database( diff --git a/src/db/types/mod.rs b/src/db/types/mod.rs index d6a818413..54a2bf3be 100644 --- a/src/db/types/mod.rs +++ b/src/db/types/mod.rs @@ -1,4 +1,4 @@ -use derive_more::Display; +use derive_more::{Display, FromStr}; use serde::{Deserialize, Serialize}; pub mod dependencies; @@ -8,7 +8,7 @@ pub mod version; #[sqlx(transparent)] pub struct CrateId(pub i32); -#[derive(Debug, Clone, Copy, Display, PartialEq, Eq, Hash, Serialize, sqlx::Type)] +#[derive(Debug, Clone, Copy, Display, PartialEq, Eq, Hash, FromStr, Serialize, sqlx::Type)] #[sqlx(transparent)] pub struct ReleaseId(pub i32); diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index ca0793ed7..498a2ec7a 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,5 +1,7 @@ -use crate::error::Result; -use crate::storage::{FileRange, compression::CompressionAlgorithm}; +use crate::{ + error::Result, + storage::{FileRange, compression::CompressionAlgorithm}, +}; use anyhow::{Context as _, bail}; use itertools::Itertools as _; use sqlx::{Acquire as _, QueryBuilder, Row as _, Sqlite}; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6cf4bba09..76655714d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,7 +12,7 @@ use self::{ use crate::{ Config, InstanceMetrics, db::{ - BuildId, Pool, + BuildId, Pool, ReleaseId, file::{FileEntry, detect_mime}, mimes, types::version::Version, @@ -20,10 +20,10 @@ use crate::{ error::Result, utils::spawn_blocking, }; -use anyhow::anyhow; +use anyhow::{anyhow, bail}; use chrono::{DateTime, Utc}; use fn_error_context::context; -use futures_util::stream::BoxStream; +use futures_util::{TryStreamExt as _, stream::BoxStream}; use mime::Mime; use path_slash::PathExt; use std::{ @@ -33,16 +33,22 @@ use std::{ num::ParseIntError, ops::RangeInclusive, path::{Path, PathBuf}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, }; use std::{iter, str::FromStr}; use tokio::{ io::{AsyncRead, AsyncWriteExt}, runtime, }; -use tracing::{error, info_span, instrument, trace}; +use tracing::{error, info, info_span, instrument, trace, warn}; +use tracing_futures::Instrument as _; use walkdir::WalkDir; +const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; + type FileRange = RangeInclusive; #[derive(Debug, thiserror::Error)] @@ -97,9 +103,8 @@ impl StreamingBlob { self } + /// consume the inner stream and materialize the full blob into memory. pub(crate) async fn materialize(mut self, max_size: usize) -> Result { - self = self.decompress(); - let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size); content.reserve(self.content_length); @@ -329,11 +334,13 @@ impl AsyncStorage { } } + /// get, decompress and materialize an object from store #[instrument] pub(crate) async fn get(&self, path: &str, max_size: usize) -> Result { self.get_stream(path).await?.materialize(max_size).await } + /// get a decompressing stream to an object in storage #[instrument] pub(crate) async fn get_stream(&self, path: &str) -> Result { let blob = match &self.backend { @@ -343,6 +350,7 @@ impl AsyncStorage { Ok(blob.decompress()) } + /// get, decompress and materialize part of an object from store #[instrument] pub(super) async fn get_range( &self, @@ -357,6 +365,7 @@ impl AsyncStorage { .await } + /// get a decompressing stream to a range inside an object in storage #[instrument] pub(super) async fn get_range_stream( &self, @@ -746,6 +755,148 @@ impl AsyncStorage { } Ok(()) } + + /// fix the broken zstd archives in our bucket + /// See https://github.com/rust-lang/docs.rs/pull/2988 + /// returns the number of files recompressed. + /// + /// Doesn't handle the local cache, when the remove files are fixed, + /// I'll just wipe it. + /// + /// We intentionally start with the latest releases, I'll probably first + /// find a release ID to check up to and then let the command run in the + /// background. + /// + /// so we start at release_id_max and go down to release_id_min. + pub async fn recompress_index_files_in_bucket( + &self, + conn: &mut sqlx::PgConnection, + min_release_id: Option, + max_release_id: Option, + concurrency: Option, + ) -> Result<(u64, u64)> { + let recompressed = Arc::new(AtomicU64::new(0)); + let checked = Arc::new(AtomicU64::new(0)); + + let StorageBackend::S3(raw_storage) = &self.backend else { + bail!("only works with S3 backend"); + }; + + sqlx::query!( + r#" + SELECT + r.id, + c.name, + r.version as "version: Version", + r.release_time + FROM + crates AS c + INNER JOIN releases AS r ON r.crate_id = c.id + WHERE + r.archive_storage IS TRUE AND + r.id >= $1 AND + r.id <= $2 + ORDER BY + r.id DESC + "#, + min_release_id.unwrap_or(ReleaseId(0)) as _, + max_release_id.unwrap_or(ReleaseId(i32::MAX)) as _ + ) + .fetch(conn) + .err_into::() + .try_for_each_concurrent(concurrency.unwrap_or_else(num_cpus::get), |row| { + let recompressed = recompressed.clone(); + let checked = checked.clone(); + + let release_span = tracing::info_span!( + "recompress_release", + id=row.id, + name=&row.name, + version=%row.version, + release_time=row.release_time.map(|rt| rt.to_rfc3339()), + ); + + async move { + trace!("handling release"); + + for path in &[ + rustdoc_archive_path(&row.name, &row.version), + source_archive_path(&row.name, &row.version), + ] { + let path = format!("{path}.index"); + trace!(path, "checking path"); + + let compressed_stream = match raw_storage.get_stream(&path, None).await { + Ok(stream) => stream, + Err(err) => { + if matches!(err.downcast_ref(), Some(PathNotFoundError)) { + trace!(path, "path not found, skipping"); + continue; + } + trace!(path, ?err, "error fetching stream"); + return Err(err); + } + }; + + let alg = CompressionAlgorithm::default(); + + if compressed_stream.compression != Some(alg) { + trace!(path, "Archive index not compressed with zstd, skipping"); + continue; + } + + info!(path, "checking archive"); + checked.fetch_add(1, Ordering::Relaxed); + + // download the compressed raw blob first. + // Like this we can first check if it's worth recompressing & re-uploading. + let mut compressed_blob = compressed_stream.materialize(usize::MAX).await?; + if compressed_blob + .content + .last_chunk::<{ ZSTD_EOF_BYTES.len() }>() + == Some(&ZSTD_EOF_BYTES) + { + info!(path, "Archive already has correct zstd ending, skipping"); + continue; + } + + warn!(path, "recompressing archive"); + recompressed.fetch_add(1, Ordering::Relaxed); + + let mut decompressed = Vec::new(); + { + // old async-compression can read the broken zstd stream + let mut reader = wrap_reader_for_decompression( + io::Cursor::new(compressed_blob.content.clone()), + alg, + ); + + tokio::io::copy(&mut reader, &mut decompressed).await?; + } + + let mut buf = Vec::with_capacity(decompressed.len()); + compress_async(&mut io::Cursor::new(&decompressed), &mut buf, alg).await?; + debug_assert_eq!( + buf.last_chunk::<{ ZSTD_EOF_BYTES.len() }>(), + Some(&ZSTD_EOF_BYTES) + ); + compressed_blob.content = buf; + compressed_blob.compression = Some(alg); + + // `.store_inner` just uploads what it gets, without any compression logic + self.store_inner(vec![compressed_blob]).await?; + } + Ok(()) + } + .instrument(release_span) + }) + .await?; + + Ok(( + checked.load(Ordering::Relaxed), + recompressed.load(Ordering::Relaxed), + )) + } } impl std::fmt::Debug for AsyncStorage { @@ -1011,10 +1162,82 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String { #[cfg(test)] mod test { + use crate::test::{TestEnvironment, V0_1}; + use super::*; use std::env; use test_case::test_case; + fn streaming_blob( + content: impl Into>, + alg: Option, + ) -> StreamingBlob { + let content = content.into(); + StreamingBlob { + path: "some_path.db".into(), + mime: mime::APPLICATION_OCTET_STREAM, + date_updated: Utc::now(), + compression: alg, + content_length: content.len(), + content: Box::new(io::Cursor::new(content)), + } + } + + #[tokio::test] + async fn test_streaming_blob_uncompressed() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + + // without decompression + { + let stream = streaming_blob(CONTENT, None); + let blob = stream.materialize(usize::MAX).await?; + assert_eq!(blob.content, CONTENT); + assert!(blob.compression.is_none()); + } + + // with decompression, does nothing + { + let stream = streaming_blob(CONTENT, None); + let blob = stream.decompress().materialize(usize::MAX).await?; + assert_eq!(blob.content, CONTENT); + assert!(blob.compression.is_none()); + } + + Ok(()) + } + + #[tokio::test] + async fn test_streaming_blob_zstd() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + let mut compressed_content = Vec::new(); + let alg = CompressionAlgorithm::Zstd; + compress_async( + &mut io::Cursor::new(CONTENT.to_vec()), + &mut compressed_content, + alg, + ) + .await?; + + // without decompression + { + let stream = streaming_blob(compressed_content.clone(), Some(alg)); + let blob = stream.materialize(usize::MAX).await?; + assert_eq!(blob.content, compressed_content); + assert_eq!(blob.content.last_chunk::<3>().unwrap(), &ZSTD_EOF_BYTES); + assert_eq!(blob.compression, Some(alg)); + } + + // with decompression + { + let stream = streaming_blob(compressed_content.clone(), Some(alg)).decompress(); + let blob = stream.materialize(usize::MAX).await?; + assert_eq!(blob.content, CONTENT); + assert!(blob.compression.is_none()); + } + + Ok(()) + } + #[tokio::test] #[test_case(CompressionAlgorithm::Zstd)] #[test_case(CompressionAlgorithm::Bzip2)] @@ -1053,16 +1276,17 @@ mod test { } // try decompress via storage API - let stream = StreamingBlob { + let blob = StreamingBlob { path: "some_path.db".into(), mime: mime::APPLICATION_OCTET_STREAM, date_updated: Utc::now(), compression: Some(alg), content_length: compressed_index_content.len(), content: Box::new(io::Cursor::new(compressed_index_content)), - }; - - let blob = stream.materialize(usize::MAX).await?; + } + .decompress() + .materialize(usize::MAX) + .await?; assert_eq!(blob.compression, None); assert_eq!(blob.content, CONTENT); @@ -1112,6 +1336,192 @@ mod test { let detected_mime = detect_mime(Path::new(&path)); assert_eq!(detected_mime, expected_mime); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_recompress_just_check() -> Result<()> { + let env = TestEnvironment::with_config( + TestEnvironment::base_config() + .storage_backend(StorageKind::S3) + .build()?, + ) + .await?; + + let storage = env.async_storage(); + + const KRATE: &str = "test_crate"; + let rid = env + .fake_release() + .await + .name(KRATE) + .version(V0_1) + .archive_storage(true) + .keywords(vec!["kw 1".into(), "kw 2".into()]) + .create() + .await?; + + // run the recompression logic + let mut conn = env.async_db().async_conn().await; + let (checked, recompressed) = storage + .recompress_index_files_in_bucket(&mut conn, None, None, None) + .await?; + assert_eq!(checked, 2); + assert_eq!(recompressed, 0); + + assert!( + storage + .get(&rustdoc_archive_path(KRATE, &V0_1), usize::MAX) + .await + .is_ok() + ); + assert!( + storage + .get(&source_archive_path(KRATE, &V0_1), usize::MAX) + .await + .is_ok() + ); + + // release-id-min = the target release id for the iterator + // (we start at the latest, and go down). + // So setting that "target" to rid.0 + 1 means we stop before we hit our only release. + let (checked, recompressed) = storage + .recompress_index_files_in_bucket(&mut conn, Some(ReleaseId(rid.0 + 1)), None, None) + .await?; + assert_eq!(checked, 0); + assert_eq!(recompressed, 0); + + // release-id-max = where we start iterating the releases + // (we start at the max, and go down). + // So setting that "start" to rid.0 - 1 means we start behind our only release + let (checked, recompressed) = storage + .recompress_index_files_in_bucket(&mut conn, None, Some(ReleaseId(rid.0 - 1)), None) + .await?; + assert_eq!(checked, 0); + assert_eq!(recompressed, 0); + + // setting min & max to the same value that is also our only release + // tests if we filter as inclusive range. + let (checked, recompressed) = storage + .recompress_index_files_in_bucket(&mut conn, Some(rid), Some(rid), None) + .await?; + assert_eq!(checked, 2); + assert_eq!(recompressed, 0); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_recompress_index_files_in_bucket() -> Result<()> { + use std::io::Cursor; + use tokio::io; + + let env = TestEnvironment::with_config( + TestEnvironment::base_config() + .storage_backend(StorageKind::S3) + .build()?, + ) + .await?; + + const CONTENT: &[u8] = b"Hello, world! Hello, world! Hello, world! Hello, world!"; + let alg = Some(CompressionAlgorithm::Zstd); + + use async_compression::tokio::write; + + let broken_archive = { + // broken compression implementation, `.shutdown` missing. + let mut buf = Vec::new(); + let mut enc = write::ZstdEncoder::new(&mut buf); + io::copy(&mut Cursor::new(CONTENT), &mut enc).await?; + // check if it's really broken, EOF missing + assert_ne!(buf.last_chunk::<3>().unwrap(), &ZSTD_EOF_BYTES); + buf + }; + + const KRATE: &str = "test_crate"; + env.fake_release() + .await + .name(KRATE) + .version(V0_1) + .archive_storage(true) + .keywords(vec!["kw 1".into(), "kw 2".into()]) + .create() + .await?; + + let storage = env.async_storage(); + // delete everything in storage created by the fake_release above + for p in &["rustdoc/", "sources/"] { + storage.delete_prefix(p).await?; + } + + // use raw inner storage backend so we can fetch the compressed file without automatic + // decompression + let StorageBackend::S3(raw_storage) = &storage.backend else { + panic!("S3 backend set above"); + }; + + let index_path = format!("{}.index", rustdoc_archive_path(KRATE, &V0_1)); + + // upload as-is to the storage, into the place of an archive index. + // `.store_inner` doesn't compress + storage + .store_inner(vec![Blob { + path: index_path.clone(), + mime: mime::APPLICATION_OCTET_STREAM, + date_updated: Utc::now(), + content: broken_archive.clone(), + compression: alg, + }]) + .await?; + + // validate how the old compressed blob looks like, even though we just uploaded it + let old_compressed_blob = raw_storage + .get_stream(&index_path, None) + .await? + .materialize(usize::MAX) + .await?; + assert_eq!(old_compressed_blob.compression, alg); + + // try getting the decompressed broken blob via normal storage API. + // old async-compression can do this without choking. + assert_eq!( + CONTENT, + &storage.get(&index_path, usize::MAX).await?.content + ); + + // run the recompression logic + let mut conn = env.async_db().async_conn().await; + let (checked, recompressed) = storage + .recompress_index_files_in_bucket(&mut conn, None, None, None) + .await?; + assert_eq!(checked, 1); + assert_eq!(recompressed, 1); + + let new_compressed_blob = raw_storage + .get_stream(&index_path, None) + .await? + .materialize(usize::MAX) + .await?; + assert_eq!(new_compressed_blob.compression, alg); + + // after fixing, getting the decompressed blob via normal storage API still works + assert_eq!( + CONTENT, + &storage.get(&index_path, usize::MAX).await?.content + ); + + // after recompression the content length should be different, 3 bytes more for + // the zstd EOF + assert_eq!( + new_compressed_blob.content.len(), + old_compressed_blob.content.len() + ZSTD_EOF_BYTES.len() + ); + + assert_eq!( + [&old_compressed_blob.content[..], &ZSTD_EOF_BYTES].concat(), + new_compressed_blob.content + ); + + Ok(()) + } } /// Backend tests are a set of tests executed on all the supported storage backends. They ensure