From c50f207cfd6882ae23d7f5b78a8f4507f5e6b940 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Tue, 28 Oct 2025 08:46:14 +0100 Subject: [PATCH 1/2] Add migration from V1 filetracker --- Cargo.lock | 30 +++ Cargo.toml | 4 + README.md | 81 ++++++- src/config.rs | 5 + src/kvstorage/mod.rs | 11 +- src/main.rs | 280 ++++++++++++++++++++++++ src/migration/mod.rs | 380 ++++++++++++++++++++++++++++++--- src/migration/v1_filesystem.rs | 253 ++++++++++++++++++++++ src/routes/ft/delete_file.rs | 29 ++- src/routes/ft/put_file.rs | 13 +- tests/integration_test.rs | 1 + tests/metrics_test.rs | 2 + tests/migration_test.rs | 1 + 13 files changed, 1022 insertions(+), 68 deletions(-) create mode 100644 src/migration/v1_filesystem.rs diff --git a/Cargo.lock b/Cargo.lock index 4763cf8..821edec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2809,12 +2809,23 @@ dependencies = [ "serde_json", "sha2", "sqlx", + "tempfile", "tokio", "tower", "tower-http", "tracing", "tracing-subscriber", "urlencoding", + "walkdir", +] + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", ] [[package]] @@ -3797,6 +3808,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3934,6 +3955,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.1", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index b5cd539..b1fce21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,7 @@ anyhow = "1.0.100" parking_lot = { version = "0.12.4", features = ["arc_lock", "send_guard"] } async-trait = "0.1.77" futures-util = "0.3.31" +walkdir = "2.5.0" + +[dev-dependencies] +tempfile = "3.15.0" diff --git a/README.md b/README.md index f463f90..211ecdf 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,8 @@ docker run -d \ | `CLEANER_INTERVAL` | `3600` | Cleaner run interval (seconds) | | `CLEANER_BATCH_SIZE` | `1000` | Cleaner batch size | | `CLEANER_MAX_DELETES` | `10000` | Max deletions per cleaner run | -| `FILETRACKER_URL` | - | Old Filetracker URL for live migration | +| `FILETRACKER_URL` | - | Old Filetracker URL for live migration (HTTP fallback) | +| `FILETRACKER_V1_DIR` | - | V1 Filetracker directory for filesystem-based migration | For PostgreSQL, use: ``` @@ -103,13 +104,15 @@ Environment variables override config file values. ## Migration -> **📖 Complete Migration Guide**: See [docs/migration.md](docs/migration.md) for comprehensive migration instructions from Filetracker v2.1+ -> -> _Note: Migration from Filetracker v1.x will be supported in a future release._ +> **📖 Complete Migration Guide**: See [docs/migration.md](docs/migration.md) for comprehensive migration instructions -### Quick Start: Offline Migration +s3dedup supports migration from both Filetracker V1 (filesystem-based) and V2 (HTTP-based) servers. -Migrate all files from old Filetracker while the proxy is offline: +### V2 Migration (Filetracker 2.1+) + +#### Offline Migration + +Migrate all files from Filetracker V2 via HTTP while the proxy is offline: ```bash docker run --rm \ @@ -121,7 +124,7 @@ docker run --rm \ --max-concurrency 10 ``` -### Quick Start: Live Migration (Zero Downtime) +#### Live Migration (Zero Downtime) Run the proxy while migrating in the background: @@ -139,11 +142,73 @@ docker run -d \ live-migrate --env --max-concurrency 10 ``` -During live migration: +During V2 live migration: - **GET**: Falls back to old Filetracker if file not found, migrates on-the-fly - **PUT**: Writes to both s3dedup and old Filetracker - **DELETE**: Deletes from both systems +### V1 Migration (Legacy Filetracker) + +V1 Filetracker stores files directly on the filesystem and serves them via a simple HTTP protocol. +The key difference from V2 is that V1 doesn't have a `/list/` endpoint for file discovery, so migration uses +filesystem walking. + +**Performance**: V1 migration uses chunked processing to handle millions of files efficiently without loading +all file paths into memory. The filesystem is scanned in chunks of 10,000 files, keeping memory usage constant +regardless of total file count. + +#### Offline Migration + +Migrate from V1 filesystem (requires access to `$FILETRACKER_DIR`): + +```bash +docker run --rm \ + --env-file .env \ + -v s3dedup-data:/app/data \ + -v /path/to/filetracker:/filetracker:ro \ + ghcr.io/sio2project/s3dedup:latest \ + migrate-v1 --env \ + --v1-directory /filetracker \ + --max-concurrency 10 +``` + +#### Live Migration + +Run the proxy while migrating from V1 in the background: + +```bash +# With both filesystem access and HTTP fallback +docker run -d \ + --name s3dedup \ + -p 8080:8080 \ + -v s3dedup-data:/app/data \ + -v /path/to/filetracker:/filetracker:ro \ + --env-file .env \ + ghcr.io/sio2project/s3dedup:latest \ + live-migrate-v1 --env \ + --v1-directory /filetracker \ + --filetracker-url http://old-filetracker-v1:8000 \ + --max-concurrency 10 + +# Or with HTTP fallback only (no filesystem access) +docker run -d \ + --name s3dedup \ + -p 8080:8080 \ + -v s3dedup-data:/app/data \ + --env-file .env \ + ghcr.io/sio2project/s3dedup:latest \ + live-migrate-v1 --env \ + --filetracker-url http://old-filetracker-v1:8000 \ + --max-concurrency 10 +``` + +During V1 live migration: +- **Background filesystem migration**: If `--v1-directory` is provided, filesystem is scanned in chunks to migrate all files + - Chunked processing handles millions of files with constant memory usage +- **HTTP fallback**: If `--filetracker-url` is provided, GET requests fall back to V1 server if file not found + - Automatically migrates files on first access +- **New requests**: Server accepts PUT/GET/DELETE requests normally during migration + For detailed migration strategies, performance tuning, troubleshooting, and rollback procedures, see the [Migration Guide](docs/migration.md). ## API Endpoints diff --git a/src/config.rs b/src/config.rs index 9c2c157..efadb5c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,6 +43,10 @@ pub struct BucketConfig { /// Optional filetracker URL for live migration mode #[serde(default)] pub filetracker_url: Option, + + /// Optional V1 filetracker directory for filesystem-based migration + #[serde(default)] + pub filetracker_v1_dir: Option, } impl Config { @@ -251,6 +255,7 @@ impl BucketConfig { .unwrap_or(10000), }, filetracker_url: std::env::var("FILETRACKER_URL").ok(), + filetracker_v1_dir: std::env::var("FILETRACKER_V1_DIR").ok(), }) } } diff --git a/src/kvstorage/mod.rs b/src/kvstorage/mod.rs index 8c31f71..9ffaec5 100644 --- a/src/kvstorage/mod.rs +++ b/src/kvstorage/mod.rs @@ -28,12 +28,14 @@ pub(crate) trait KVStorageTrait { self.set_ref_count(bucket, hash, cnt + 1).await } - async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result<()> { + async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { let cnt = self.get_ref_count(bucket, hash).await?; if cnt == 0 { - return Ok(()); + return Ok(0); } - self.set_ref_count(bucket, hash, cnt - 1).await + let new_count = cnt - 1; + self.set_ref_count(bucket, hash, new_count).await?; + Ok(new_count as i64) } async fn get_modified(&mut self, bucket: &str, path: &str) -> Result; @@ -181,8 +183,9 @@ impl KVStorage { /** * Decrement the reference count for a hash. * If the reference count is already 0, do nothing. + * Returns the new reference count after decrementing. */ - pub async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result<()> { + pub async fn decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { debug!( "Decrementing ref count for bucket: {}, hash: {}", bucket, hash diff --git a/src/main.rs b/src/main.rs index c2ae9b3..f4751e3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,6 +62,39 @@ enum Commands { #[arg(short, long, default_value = "10")] max_concurrency: usize, }, + /// Migrate data from V1 filetracker filesystem to s3dedup + MigrateV1 { + /// Path to configuration file (optional if using environment variables) + #[arg(short, long)] + config: Option, + /// Use environment variables for configuration instead of config file + #[arg(short, long)] + env: bool, + /// Path to V1 filetracker directory ($FILETRACKER_DIR) + #[arg(short = 'd', long)] + v1_directory: String, + /// Maximum number of concurrent migration workers + #[arg(short, long, default_value = "10")] + max_concurrency: usize, + }, + /// Perform live migration from V1 filetracker while server is running + LiveMigrateV1 { + /// Path to configuration file (optional if using environment variables) + #[arg(short, long)] + config: Option, + /// Use environment variables for configuration instead of config file + #[arg(short, long)] + env: bool, + /// Path to V1 filetracker directory for background migration + #[arg(short = 'd', long)] + v1_directory: Option, + /// URL of the V1 filetracker HTTP server for fallback (optional, can be set via config/env) + #[arg(short = 'u', long)] + filetracker_url: Option, + /// Maximum number of concurrent migration workers per bucket + #[arg(short, long, default_value = "10")] + max_concurrency: usize, + }, } async fn run_server(addr: SocketAddr, app: Router) { @@ -216,6 +249,75 @@ async fn run_migrate( } } +async fn run_migrate_v1( + config_path: Option<&str>, + use_env: bool, + v1_directory: &str, + max_concurrency: usize, +) { + let config = if use_env { + config::Config::from_env().unwrap() + } else { + config::Config::new(config_path.unwrap_or("config.json")).unwrap() + }; + s3dedup::logging::setup(&config.logging).unwrap(); + + info!("Starting offline V1 filesystem migration to s3dedup"); + if use_env { + info!("Using environment variables for configuration"); + } else { + info!("Config file: {}", config_path.unwrap_or("config.json")); + } + info!("V1 directory: {}", v1_directory); + info!("Max concurrency: {}", max_concurrency); + + // For offline migration, we only migrate the first bucket + if config.buckets.is_empty() { + error!("No buckets configured"); + return; + } + + let bucket_config = &config.buckets[0]; + info!("Migrating to bucket: {}", bucket_config.name); + + // Initialize AppState + let app_state = match AppState::new(bucket_config).await { + Ok(state) => state, + Err(e) => { + error!("Failed to initialize app state: {}", e); + return; + } + }; + + // Setup KV storage + if let Err(e) = app_state.kvstorage.lock().await.setup().await { + error!("Failed to setup KV storage: {}", e); + return; + } + + // Run V1 filesystem migration + match s3dedup::migration::migrate_all_files_from_v1_fs(v1_directory, app_state, max_concurrency) + .await + { + Ok(stats) => { + info!("V1 migration completed successfully"); + info!("Total files: {}", stats.total_files); + info!("Migrated: {}", stats.migrated); + info!("Skipped: {}", stats.skipped); + info!("Failed: {}", stats.failed); + + if stats.failed > 0 { + warn!("{} files failed to migrate", stats.failed); + std::process::exit(1); + } + } + Err(e) => { + error!("V1 migration failed: {}", e); + std::process::exit(1); + } + } +} + async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurrency: usize) { let config = if use_env { config::Config::from_env().unwrap() @@ -397,6 +499,160 @@ async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurre } } +async fn run_live_migrate_v1( + config_path: Option<&str>, + use_env: bool, + v1_directory: Option<&str>, + filetracker_url: Option<&str>, + max_concurrency: usize, +) { + let config = if use_env { + config::Config::from_env().unwrap() + } else { + config::Config::new(config_path.unwrap_or("config.json")).unwrap() + }; + s3dedup::logging::setup(&config.logging).unwrap(); + let mut handles = vec![]; + + info!("Starting live migration from V1 filetracker to s3dedup"); + if use_env { + info!("Using environment variables for configuration"); + } else { + info!("Config file: {}", config_path.unwrap_or("config.json")); + } + info!("Max concurrency per bucket: {}", max_concurrency); + + for bucket in config.buckets.iter() { + // Determine V1 directory: CLI > config > env + let effective_v1_dir = v1_directory + .or(bucket.filetracker_v1_dir.as_deref()) + .map(|s| s.to_string()); + + // Determine filetracker URL: CLI > config > env + let effective_ft_url = filetracker_url + .or(bucket.filetracker_url.as_deref()) + .map(|s| s.to_string()); + + info!( + "Starting server with V1 migration for bucket: {} (v1_dir: {:?}, filetracker_url: {:?})", + bucket.name, effective_v1_dir, effective_ft_url + ); + + // Initialize AppState with filetracker client if URL is provided + let app_state = if let Some(ref ft_url) = effective_ft_url { + info!("Creating app state with V1 filetracker client for HTTP fallback"); + AppState::new_with_filetracker(bucket, ft_url.clone()) + .await + .unwrap() + } else { + AppState::new(bucket).await.unwrap() + }; + app_state.kvstorage.lock().await.setup().await.unwrap(); + + // Start cleaner for this bucket + let cleaner = Arc::new(Cleaner::new( + bucket.name.clone(), + app_state.kvstorage.clone(), + app_state.s3storage.clone(), + bucket.cleaner.clone(), + )); + cleaner.start(); + + // Start metrics updater task + let metrics_state = app_state.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + loop { + interval.tick().await; + if let Err(e) = metrics_state.update_storage_metrics().await { + warn!("Failed to update storage metrics: {}", e); + } + } + }); + + // Start background V1 filesystem migration worker if v1_directory is provided + if let Some(v1_dir) = effective_v1_dir { + // Set migration_active gauge to indicate migration is in progress + s3dedup::metrics::MIGRATION_ACTIVE.set(1); + + let migration_app_state = app_state.clone(); + tokio::spawn(async move { + match s3dedup::migration::migrate_all_files_from_v1_fs( + &v1_dir, + migration_app_state, + max_concurrency, + ) + .await + { + Ok(stats) => { + info!("Background V1 filesystem migration completed successfully"); + info!("Total files: {}", stats.total_files); + info!("Migrated: {}", stats.migrated); + info!("Skipped: {}", stats.skipped); + info!("Failed: {}", stats.failed); + + if stats.failed > 0 { + warn!("{} files failed to migrate", stats.failed); + } + } + Err(e) => { + error!("Background V1 filesystem migration failed: {}", e); + } + } + + // Reset migration_active gauge + s3dedup::metrics::MIGRATION_ACTIVE.set(0); + info!("Background V1 filesystem migration worker finished"); + }); + } + + // Create router with all endpoints + let app = Router::new() + .route("/ft/version", get(s3dedup::routes::ft::version::ft_version)) + .route( + "/ft/version/", + get(s3dedup::routes::ft::version::ft_version), + ) + .route( + "/ft/list/", + get(s3dedup::routes::ft::list_files::ft_list_files), + ) + .route( + "/ft/list/{*path}", + get(s3dedup::routes::ft::list_files::ft_list_files), + ) + .route( + "/ft/files/{*path}", + get(s3dedup::routes::ft::get_file::ft_get_file) + .head(s3dedup::routes::ft::get_file::ft_get_file) + .put(s3dedup::routes::ft::put_file::ft_put_file) + .delete(s3dedup::routes::ft::delete_file::ft_delete_file), + ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) + .route("/health", get(s3dedup::routes::metrics::health_handler)) + .layer( + TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) + .on_response(DefaultOnResponse::new().level(Level::INFO)), + ) + .with_state(app_state); + + let address: SocketAddr = format!("{}:{}", bucket.address, bucket.port) + .parse() + .unwrap(); + let handle = tokio::spawn(run_server(address, app)); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } +} + #[tokio::main] async fn main() { let cli = Cli::parse(); @@ -420,5 +676,29 @@ async fn main() { } => { run_live_migrate(config.as_deref(), env, max_concurrency).await; } + Commands::MigrateV1 { + config, + env, + v1_directory, + max_concurrency, + } => { + run_migrate_v1(config.as_deref(), env, &v1_directory, max_concurrency).await; + } + Commands::LiveMigrateV1 { + config, + env, + v1_directory, + filetracker_url, + max_concurrency, + } => { + run_live_migrate_v1( + config.as_deref(), + env, + v1_directory.as_deref(), + filetracker_url.as_deref(), + max_concurrency, + ) + .await; + } } } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index d72f6fa..64bc1f0 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -1,12 +1,15 @@ use crate::AppState; use crate::filetracker_client::{FileMetadata, FiletrackerClient}; use crate::routes::ft::storage_helpers; -use anyhow::Result; +use anyhow::{Context, Result}; use futures_util::future::join_all; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::Semaphore; use tracing::{error, info, warn}; +pub mod v1_filesystem; + pub struct MigrationStats { pub total_files: usize, pub migrated: usize, @@ -74,7 +77,14 @@ pub async fn migrate_all_files( let handle = tokio::spawn(async move { // Acquire semaphore permit - let _permit = semaphore.acquire().await.unwrap(); + let _permit = match semaphore.acquire().await { + Ok(permit) => permit, + Err(_) => { + error!("Semaphore closed unexpectedly for file: {}", path); + *failed.lock().await += 1; + return; + } + }; // Log progress every 100 files if file_idx.is_multiple_of(100) && file_idx > 0 { @@ -223,30 +233,28 @@ pub async fn migrate_single_file_from_metadata( .await?; if !old_hash.is_empty() && old_hash != digest { - // Decrement old reference count - app_state - .kvstorage - .lock() - .await - .decrement_ref_count(&app_state.bucket_name, &old_hash) - .await?; - - // Check if we should delete the old blob + // Decrement old reference count atomically and get new count let old_ref_count = app_state .kvstorage .lock() .await - .get_ref_count(&app_state.bucket_name, &old_hash) + .decrement_ref_count(&app_state.bucket_name, &old_hash) .await?; - if old_ref_count <= 0 { - let _ = app_state + // Delete blob if no longer referenced + if old_ref_count <= 0 + && let Err(e) = app_state .s3storage .lock() .await .delete_object(&old_hash) - .await; - } + .await + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } } } @@ -366,30 +374,28 @@ async fn migrate_single_file( .await?; if !old_hash.is_empty() && old_hash != digest { - // Decrement old reference count - app_state - .kvstorage - .lock() - .await - .decrement_ref_count(&app_state.bucket_name, &old_hash) - .await?; - - // Check if we should delete the old blob + // Decrement old reference count atomically and get new count let old_ref_count = app_state .kvstorage .lock() .await - .get_ref_count(&app_state.bucket_name, &old_hash) + .decrement_ref_count(&app_state.bucket_name, &old_hash) .await?; - if old_ref_count <= 0 { - let _ = app_state + // Delete blob if no longer referenced + if old_ref_count <= 0 + && let Err(e) = app_state .s3storage .lock() .await .delete_object(&old_hash) - .await; - } + .await + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } } } @@ -443,3 +449,317 @@ pub async fn live_migration_worker( crate::metrics::MIGRATION_ACTIVE.set(0); info!("Background migration worker finished, migration_active set to 0"); } + +/// Migrate all files from V1 filetracker filesystem to s3dedup +/// +/// This function uses chunked processing to avoid loading all file metadata into memory, +/// making it suitable for directories with millions of files. +pub async fn migrate_all_files_from_v1_fs( + v1_dir: &str, + app_state: Arc, + max_concurrency: usize, +) -> Result { + info!( + "Starting V1 filesystem migration from directory: {}", + v1_dir + ); + info!("Max concurrency: {}", max_concurrency); + info!("Processing directory in chunks to handle large file counts efficiently"); + + // Track stats across all chunks using atomics to avoid async locks in sync context + let total_files = Arc::new(AtomicUsize::new(0)); + let migrated = Arc::new(AtomicUsize::new(0)); + let failed = Arc::new(AtomicUsize::new(0)); + let skipped = Arc::new(AtomicUsize::new(0)); + let semaphore = Arc::new(Semaphore::new(max_concurrency)); + + // Chunk size for filesystem walking: 10,000 files per chunk + // This keeps memory usage reasonable while still being efficient + let filesystem_chunk_size = 10_000; + + // Task batch size: spawn tasks in smaller batches to avoid too many concurrent tasks + let task_batch_size = max_concurrency * 10; + + // Create a channel to send chunks from blocking walker to async processor + let (chunk_tx, mut chunk_rx) = + tokio::sync::mpsc::unbounded_channel::>(); + + let v1_dir_owned = v1_dir.to_string(); + + // Spawn the filesystem walker in a blocking task to avoid nested block_on + let walker_handle = tokio::task::spawn_blocking(move || { + v1_filesystem::walk_v1_directory_chunked( + &v1_dir_owned, + filesystem_chunk_size, + |file_chunk| { + // Send chunk to async processor + // If receiver is dropped, stop walking + if chunk_tx.send(file_chunk.to_vec()).is_err() { + anyhow::bail!("Chunk receiver dropped"); + } + Ok(()) + }, + ) + }); + + // Process chunks as they arrive + let mut chunk_count = 0; + while let Some(file_chunk) = chunk_rx.recv().await { + chunk_count += 1; + let chunk_size = file_chunk.len(); + total_files.fetch_add(chunk_size, Ordering::Relaxed); + let total_so_far = total_files.load(Ordering::Relaxed); + + info!( + "Processing filesystem chunk {} with {} files (total discovered: {})", + chunk_count, chunk_size, total_so_far + ); + + // Process this chunk in task batches + let total_batches = file_chunk.chunks(task_batch_size).len(); + for (batch_idx, batch) in file_chunk.chunks(task_batch_size).enumerate() { + let mut handles = vec![]; + + for file_info in batch.iter() { + let app_state = app_state.clone(); + let migrated = migrated.clone(); + let failed = failed.clone(); + let skipped = skipped.clone(); + let semaphore = semaphore.clone(); + let file_info = file_info.clone(); + + let handle = tokio::spawn(async move { + // Acquire semaphore permit + let _permit = match semaphore.acquire().await { + Ok(permit) => permit, + Err(_) => { + error!( + "Semaphore closed unexpectedly for file: {}", + file_info.relative_path + ); + failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return; + } + }; + + // Migrate the file + match migrate_single_file_from_v1_fs(app_state, &file_info).await { + Ok(true) => { + migrated.fetch_add(1, Ordering::Relaxed); + } + Ok(false) => { + skipped.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + error!("Failed to migrate file {}: {}", file_info.relative_path, e); + failed.fetch_add(1, Ordering::Relaxed); + } + } + }); + + handles.push(handle); + } + + // Wait for this task batch to complete + let _ = join_all(handles).await; + + // Log progress periodically + if batch_idx % 10 == 0 || batch_idx == total_batches - 1 { + let current_migrated = migrated.load(Ordering::Relaxed); + let current_failed = failed.load(Ordering::Relaxed); + let current_skipped = skipped.load(Ordering::Relaxed); + info!( + "Progress: {} files discovered (migrated: {}, skipped: {}, failed: {})", + total_so_far, current_migrated, current_skipped, current_failed + ); + } + } + } + + // Wait for walker to complete + match walker_handle.await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + error!("Filesystem walker failed: {}", e); + anyhow::bail!("Filesystem walker failed: {}", e); + } + Err(e) => { + error!("Walker task panicked: {}", e); + anyhow::bail!("Walker task panicked: {}", e); + } + } + + let total_count = total_files.load(Ordering::Relaxed); + let migrated_count = migrated.load(Ordering::Relaxed); + let failed_count = failed.load(Ordering::Relaxed); + let skipped_count = skipped.load(Ordering::Relaxed); + + info!("V1 filesystem migration complete:"); + info!(" Total files: {}", total_count); + info!(" Migrated: {}", migrated_count); + info!(" Skipped: {}", skipped_count); + info!(" Failed: {}", failed_count); + + Ok(MigrationStats { + total_files: total_count, + migrated: migrated_count, + failed: failed_count, + skipped: skipped_count, + }) +} + +/// Migrate a single file from V1 filetracker filesystem to s3dedup +/// Returns Ok(true) if migrated, Ok(false) if skipped, Err if failed +async fn migrate_single_file_from_v1_fs( + app_state: Arc, + file_info: &v1_filesystem::V1FileInfo, +) -> Result { + let path = &file_info.relative_path; + + // Check if file already exists in s3dedup with same or newer version + let current_modified = app_state + .kvstorage + .lock() + .await + .get_modified(&app_state.bucket_name, path) + .await?; + + if current_modified >= file_info.last_modified { + // File already exists with same or newer version, skip + return Ok(false); + } + + // Move blocking operations (file I/O, SHA256, compression) to blocking thread pool + // to avoid blocking the async runtime + let file_info_clone = file_info.clone(); + let (uncompressed_data, digest, compressed_data) = tokio::task::spawn_blocking(move || { + // Read file data from filesystem (blocking I/O) + let uncompressed_data = v1_filesystem::read_v1_file(&file_info_clone)?; + + // Compute SHA256 hash (CPU-intensive) + let digest = storage_helpers::compute_sha256(&uncompressed_data); + + // Compress data (CPU-intensive) + let compressed_data = storage_helpers::compress_gzip(&uncompressed_data)?; + + Ok::<_, anyhow::Error>((uncompressed_data, digest, compressed_data)) + }) + .await + .context("Task panicked during file processing")? + .context("Failed to read and process V1 file")?; + + let logical_size = uncompressed_data.len(); + + // Acquire file lock + let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); + let locks_storage = &app_state.locks; + let lock = locks_storage.prepare_lock(lock_key).await; + let _guard = lock.acquire_exclusive().await; + + // Recheck if file was already migrated after acquiring lock (race condition protection) + let current_modified_after_lock = app_state + .kvstorage + .lock() + .await + .get_modified(&app_state.bucket_name, path) + .await?; + + if current_modified_after_lock >= file_info.last_modified { + // File was migrated by another concurrent task, skip + return Ok(false); + } + + // Check if blob already exists in S3 + let blob_exists = app_state + .s3storage + .lock() + .await + .object_exists(&digest) + .await?; + + // Store blob if it doesn't exist + if !blob_exists { + app_state + .s3storage + .lock() + .await + .put_object(&digest, compressed_data.clone()) + .await?; + } + + // Store logical size metadata + app_state + .kvstorage + .lock() + .await + .set_logical_size(&app_state.bucket_name, &digest, logical_size) + .await?; + + // Store compressed size metadata + app_state + .kvstorage + .lock() + .await + .set_compressed_size(&app_state.bucket_name, &digest, compressed_data.len()) + .await?; + + // Increment reference count + app_state + .kvstorage + .lock() + .await + .increment_ref_count(&app_state.bucket_name, &digest) + .await?; + + // Handle overwriting existing file + if current_modified > 0 { + let old_hash = app_state + .kvstorage + .lock() + .await + .get_ref_file(&app_state.bucket_name, path) + .await?; + + if !old_hash.is_empty() && old_hash != digest { + // Decrement old reference count atomically and get new count + let old_ref_count = app_state + .kvstorage + .lock() + .await + .decrement_ref_count(&app_state.bucket_name, &old_hash) + .await?; + + // Delete blob if no longer referenced + if old_ref_count <= 0 + && let Err(e) = app_state + .s3storage + .lock() + .await + .delete_object(&old_hash) + .await + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } + } + } + + // Update file metadata + app_state + .kvstorage + .lock() + .await + .set_ref_file(&app_state.bucket_name, path, &digest) + .await?; + + app_state + .kvstorage + .lock() + .await + .set_modified(&app_state.bucket_name, path, file_info.last_modified) + .await?; + + Ok(true) +} diff --git a/src/migration/v1_filesystem.rs b/src/migration/v1_filesystem.rs new file mode 100644 index 0000000..7ee6627 --- /dev/null +++ b/src/migration/v1_filesystem.rs @@ -0,0 +1,253 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::{Path, PathBuf}; +use tracing::{debug, warn}; +use walkdir::WalkDir; + +/// Information about a file in V1 filetracker filesystem +#[derive(Debug, Clone)] +pub struct V1FileInfo { + /// Relative path from filetracker root (e.g., "submissions/123/file.txt") + pub relative_path: String, + /// Absolute path to the file on filesystem + pub absolute_path: PathBuf, + /// Last modified timestamp (Unix timestamp) + pub last_modified: i64, + /// File size in bytes + pub size: u64, +} + +/// Walk V1 filetracker directory and return list of all files +/// +/// V1 filetracker stores files in $FILETRACKER_DIR/files/ +/// This function recursively scans that directory and returns metadata for all files. +/// +/// **Note**: For large directories with millions of files, use `walk_v1_directory_chunked` instead +/// to avoid loading all file paths into memory at once. +pub fn walk_v1_directory(v1_dir: &str) -> Result> { + let mut files = Vec::new(); + walk_v1_directory_chunked(v1_dir, usize::MAX, |chunk| { + files.extend_from_slice(chunk); + Ok(()) + })?; + Ok(files) +} + +/// Walk V1 filetracker directory and process files in chunks +/// +/// This function processes the directory in chunks to avoid loading all file metadata into memory +/// at once. This is essential for directories with millions of files. +/// +/// The callback function is called with each chunk of files. If the callback returns an error, +/// the walk stops and the error is propagated. +/// +/// # Arguments +/// * `v1_dir` - Path to V1 filetracker directory +/// * `chunk_size` - Number of files to process per chunk +/// * `callback` - Function called with each chunk of file metadata +pub fn walk_v1_directory_chunked(v1_dir: &str, chunk_size: usize, mut callback: F) -> Result<()> +where + F: FnMut(&[V1FileInfo]) -> Result<()>, +{ + let files_dir = Path::new(v1_dir).join("files"); + + if !files_dir.exists() { + anyhow::bail!( + "V1 filetracker files directory does not exist: {}", + files_dir.display() + ); + } + + if !files_dir.is_dir() { + anyhow::bail!( + "V1 filetracker files path is not a directory: {}", + files_dir.display() + ); + } + + debug!( + "Walking V1 filetracker directory in chunks of {}: {}", + chunk_size, + files_dir.display() + ); + + // Cap capacity at a reasonable value to avoid overflow with usize::MAX + let capacity = chunk_size.min(10_000); + let mut chunk = Vec::with_capacity(capacity); + let mut total_files = 0; + + for entry_result in WalkDir::new(&files_dir).follow_links(false).into_iter() { + // Propagate walkdir errors instead of silently skipping them + let entry = entry_result.context("Failed to read directory entry during V1 migration")?; + + // Skip directories + if !entry.file_type().is_file() { + continue; + } + + let absolute_path = entry.path().to_path_buf(); + + // Get relative path from files/ directory + let relative_path = absolute_path + .strip_prefix(&files_dir) + .context("Failed to strip prefix from file path")? + .to_str() + .context("File path contains invalid UTF-8")? + .to_string(); + + // Skip if relative path is empty (shouldn't happen, but be safe) + if relative_path.is_empty() { + warn!( + "Skipping file with empty relative path: {:?}", + absolute_path + ); + continue; + } + + // Get file metadata + let metadata = entry.metadata().context("Failed to get file metadata")?; + + // Get last modified time + let modified_time = metadata + .modified() + .context("Failed to get file modification time")?; + + let last_modified = modified_time + .duration_since(std::time::UNIX_EPOCH) + .context("File modification time is before Unix epoch")? + .as_secs() as i64; + + let size = metadata.len(); + + chunk.push(V1FileInfo { + relative_path, + absolute_path, + last_modified, + size, + }); + + total_files += 1; + + // Process chunk when it reaches the desired size + if chunk.len() >= chunk_size { + debug!( + "Processing chunk of {} files (total processed: {})", + chunk.len(), + total_files + ); + callback(&chunk)?; + chunk.clear(); + } + } + + // Process remaining files + if !chunk.is_empty() { + debug!( + "Processing final chunk of {} files (total: {})", + chunk.len(), + total_files + ); + callback(&chunk)?; + } + + debug!("Finished walking {} files", total_files); + + Ok(()) +} + +/// Read a file from V1 filetracker filesystem +/// +/// V1 files are stored uncompressed on disk. +pub fn read_v1_file(file_info: &V1FileInfo) -> Result> { + debug!("Reading V1 file: {}", file_info.relative_path); + + let data = fs::read(&file_info.absolute_path).context(format!( + "Failed to read file: {}", + file_info.absolute_path.display() + ))?; + + debug!( + "Read {} bytes from V1 file: {}", + data.len(), + file_info.relative_path + ); + + Ok(data) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::TempDir; + + #[test] + fn test_walk_empty_directory() { + let temp_dir = TempDir::new().unwrap(); + let v1_dir = temp_dir.path(); + + // Create files/ directory + fs::create_dir(v1_dir.join("files")).unwrap(); + + let files = walk_v1_directory(v1_dir.to_str().unwrap()).unwrap(); + assert_eq!(files.len(), 0); + } + + #[test] + fn test_walk_with_files() { + let temp_dir = TempDir::new().unwrap(); + let v1_dir = temp_dir.path(); + + // Create files/ directory structure + let files_dir = v1_dir.join("files"); + fs::create_dir(&files_dir).unwrap(); + fs::create_dir(files_dir.join("dir1")).unwrap(); + fs::create_dir(files_dir.join("dir1/dir2")).unwrap(); + + // Create test files + fs::write(files_dir.join("file1.txt"), b"content1").unwrap(); + fs::write(files_dir.join("dir1/file2.txt"), b"content2").unwrap(); + fs::write(files_dir.join("dir1/dir2/file3.txt"), b"content3").unwrap(); + + let files = walk_v1_directory(v1_dir.to_str().unwrap()).unwrap(); + assert_eq!(files.len(), 3); + + // Check relative paths are correct + let paths: Vec = files.iter().map(|f| f.relative_path.clone()).collect(); + assert!(paths.contains(&"file1.txt".to_string())); + assert!(paths.contains(&"dir1/file2.txt".to_string())); + assert!(paths.contains(&"dir1/dir2/file3.txt".to_string())); + } + + #[test] + fn test_read_v1_file() { + let temp_dir = TempDir::new().unwrap(); + let v1_dir = temp_dir.path(); + + // Create files/ directory + let files_dir = v1_dir.join("files"); + fs::create_dir(&files_dir).unwrap(); + + // Create test file + let test_content = b"Hello, V1 Filetracker!"; + fs::write(files_dir.join("test.txt"), test_content).unwrap(); + + // Walk directory to get file info + let files = walk_v1_directory(v1_dir.to_str().unwrap()).unwrap(); + assert_eq!(files.len(), 1); + + // Read file + let data = read_v1_file(&files[0]).unwrap(); + assert_eq!(data, test_content); + } + + #[test] + fn test_missing_files_directory() { + let temp_dir = TempDir::new().unwrap(); + let v1_dir = temp_dir.path(); + + let result = walk_v1_directory(v1_dir.to_str().unwrap()); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("does not exist")); + } +} diff --git a/src/routes/ft/delete_file.rs b/src/routes/ft/delete_file.rs index 452cb78..c863bd1 100644 --- a/src/routes/ft/delete_file.rs +++ b/src/routes/ft/delete_file.rs @@ -119,30 +119,27 @@ pub async fn ft_delete_file( .unwrap(); } - // 6. Decrement reference count - if let Err(e) = state + // 6. Decrement reference count atomically and get new count + let ref_count = match state .kvstorage .lock() .await .decrement_ref_count(&state.bucket_name, &hash) .await { - error!("Failed to decrement ref count: {}", e); - record_metrics("500"); - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("Failed to decrement ref count".to_string()) - .unwrap(); - } + Ok(count) => count, + Err(e) => { + error!("Failed to decrement ref count: {}", e); + record_metrics("500"); + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("Failed to decrement ref count".to_string()) + .unwrap(); + } + }; // 7. Check if we should delete the blob (ref count is now 0) - let ref_count = state - .kvstorage - .lock() - .await - .get_ref_count(&state.bucket_name, &hash) - .await; - if ref_count.is_ok() && ref_count.unwrap() <= 0 { + if ref_count <= 0 { debug!("Deleting blob with hash: {}", hash); // Delete blob from S3 if let Err(e) = state.s3storage.lock().await.delete_object(&hash).await { diff --git a/src/routes/ft/put_file.rs b/src/routes/ft/put_file.rs index 4b3b2b2..f2ed4d1 100644 --- a/src/routes/ft/put_file.rs +++ b/src/routes/ft/put_file.rs @@ -305,22 +305,15 @@ pub async fn ft_put_file( "Overwriting existing link {}. Old hash: {}, new hash: {}", path, old_hash, digest ); - // Decrement old reference count - let _ = state - .kvstorage - .lock() - .await - .decrement_ref_count(&state.bucket_name, &old_hash) - .await; - - // Check if we should delete the old blob + // Decrement old reference count atomically and get new count let old_ref_count_result = state .kvstorage .lock() .await - .get_ref_count(&state.bucket_name, &old_hash) + .decrement_ref_count(&state.bucket_name, &old_hash) .await; + // Delete old blob if no longer referenced if let Ok(old_ref_count) = old_ref_count_result && old_ref_count <= 0 { diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 310c782..f03d8cf 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -84,6 +84,7 @@ async fn create_test_app_with_state() -> (Router, Arc) { }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, + filetracker_v1_dir: None, }; let kvstorage = KVStorage::new(&config).await.unwrap(); diff --git a/tests/metrics_test.rs b/tests/metrics_test.rs index bf393c6..b04ed88 100644 --- a/tests/metrics_test.rs +++ b/tests/metrics_test.rs @@ -42,6 +42,7 @@ async fn create_test_app_state() -> Arc { }), cleaner: Default::default(), filetracker_url: None, + filetracker_v1_dir: None, }; let app_state = AppState::new(&config).await.unwrap(); @@ -275,6 +276,7 @@ async fn test_migration_active_metric() { }), cleaner: Default::default(), filetracker_url: Some("http://localhost:8000".to_string()), + filetracker_v1_dir: None, }; let app_state = AppState::new_with_filetracker(&config, "http://localhost:8000".to_string()) diff --git a/tests/migration_test.rs b/tests/migration_test.rs index 4e5d88e..0348932 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -204,6 +204,7 @@ async fn create_test_app_state() -> Arc { }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, + filetracker_v1_dir: None, }; let kvstorage = KVStorage::new(&config).await.unwrap(); From a090acc4a1322500dacecb7e54fca1fef4b6907b Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Tue, 28 Oct 2025 08:51:24 +0100 Subject: [PATCH 2/2] Format code --- src/migration/mod.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 64bc1f0..a8a19e5 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -249,12 +249,12 @@ pub async fn migrate_single_file_from_metadata( .await .delete_object(&old_hash) .await - { - warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e - ); - } + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } } } @@ -390,12 +390,12 @@ async fn migrate_single_file( .await .delete_object(&old_hash) .await - { - warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e - ); - } + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } } } @@ -737,12 +737,12 @@ async fn migrate_single_file_from_v1_fs( .await .delete_object(&old_hash) .await - { - warn!( - "Failed to delete orphaned S3 object {} during migration: {}", - old_hash, e - ); - } + { + warn!( + "Failed to delete orphaned S3 object {} during migration: {}", + old_hash, e + ); + } } }