diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md new file mode 100644 index 0000000..a84bac1 --- /dev/null +++ b/DEVELOPMENT.md @@ -0,0 +1,444 @@ +# Development Guide + +This guide covers building, testing, and contributing to s3dedup. + +## Building from Source + +```bash +# Build binary +cargo build + +# Build for release +cargo build --release + +# Run in development mode +cargo run -- server --config config.json + +# Format code +cargo fmt + +# Run linter +cargo clippy + +# Check for compilation errors without building +cargo check +``` + +## Testing + +### Quick Start + +```bash +# Run all unit tests (no external dependencies) +cargo test --lib + +# Run all tests (requires PostgreSQL + MinIO) +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" +cargo test +``` + +### Unit Tests + +Run all unit tests without external dependencies: + +```bash +# Run all unit tests +cargo test --lib + +# Run specific test +cargo test --lib test_hash_key_deterministic + +# Run with output +cargo test --lib -- --nocapture +``` + +### Integration Tests + +Integration tests require external services. Run specific tests with appropriate setup: + +```bash +# Run all integration tests (requires PostgreSQL + MinIO) +cargo test --test integration_test + +# Run specific integration test (requires PostgreSQL + MinIO) +cargo test --test integration_test test_put_and_get_file -- --nocapture +``` + +### PostgreSQL Lock Tests + +The PostgreSQL advisory locks implementation requires a running PostgreSQL instance. + +#### Setup PostgreSQL for Testing + +**Option 1: Docker (Recommended)** + +```bash +# Start PostgreSQL container +docker run -d \ + --name postgres-test \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_DB=s3dedup_test \ + -p 5432:5432 \ + postgres:15 +``` + +**Option 2: Local PostgreSQL Installation** + +```bash +# Create test database (requires PostgreSQL installed locally) +psql -U postgres -c "CREATE DATABASE s3dedup_test;" +``` + +#### Run PostgreSQL Lock Tests + +Once PostgreSQL is running: + +```bash +# Set DATABASE_URL environment variable +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" + +# Run all PostgreSQL lock tests +cargo test --test postgres_locks_test -- --nocapture + +# Run specific PostgreSQL lock test +cargo test --test postgres_locks_test test_exclusive_lock_mutual_exclusion -- --nocapture + +# Run with debug logging +RUST_LOG=debug cargo test --test postgres_locks_test -- --nocapture +``` + +#### PostgreSQL Lock Tests Overview + +The `postgres_locks_test.rs` suite validates PostgreSQL advisory lock functionality: + +- **test_postgres_locks_creation**: Verifies lock system initialization +- **test_exclusive_lock_mutual_exclusion**: Ensures exclusive locks block concurrent acquisitions +- **test_shared_locks_concurrent**: Verifies multiple shared locks can coexist +- **test_exclusive_blocks_shared**: Ensures exclusive locks block shared locks +- **test_different_keys_independent**: Verifies different lock keys don't interfere +- **test_lock_release_on_guard_drop**: Ensures locks release when guard is dropped + +**Lock Release Mechanism**: Both memory and PostgreSQL locks must call `.release().await` before being dropped. For memory locks, this explicitly drops the Tokio RwLock guard. For PostgreSQL locks, this calls the PostgreSQL advisory unlock function. + +### Migration Tests + +Run migration tests with PostgreSQL: + +```bash +# Set DATABASE_URL environment variable +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" + +# Run all migration tests +cargo test --test migration_test -- --nocapture + +# Run specific migration test +cargo test --test migration_test test_offline_migration_empty -- --nocapture +``` + +### Cleaner Tests + +Run background cleaner tests with PostgreSQL: + +```bash +# Set DATABASE_URL environment variable +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" + +# Run all cleaner tests +cargo test --test cleaner_test -- --nocapture +``` + +### Full Integration Testing Setup + +For complete integration testing with all services: + +```bash +# Option 1: Using Docker Compose (recommended for CI/CD) +docker-compose up -d + +# Run all tests +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" +cargo test + +# Cleanup +docker-compose down +``` + +**Option 2: Manual Setup** + +```bash +# Terminal 1: Start PostgreSQL +docker run -d \ + --name postgres-test \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_DB=s3dedup_test \ + -p 5432:5432 \ + postgres:15 + +# Terminal 2: Start MinIO +docker run -d \ + --name minio-test \ + -p 9000:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + minio/minio:latest server /data + +# Terminal 3: Run tests +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" +cargo test + +# Cleanup +docker stop postgres-test minio-test +docker rm postgres-test minio-test +``` + +### Test Categories + +| Test Type | Command | Dependencies | Time | +|-----------|---------|--------------|------| +| Unit tests | `cargo test --lib` | None | ~1-2s | +| PostgreSQL lock tests | `cargo test --test postgres_locks_test` | PostgreSQL | ~5-10s | +| Migration tests | `cargo test --test migration_test` | PostgreSQL | ~10-20s | +| Integration tests | `cargo test --test integration_test` | PostgreSQL + MinIO | ~20-30s | +| Cleaner tests | `cargo test --test cleaner_test` | PostgreSQL | ~5-10s | +| All tests | `cargo test` | PostgreSQL + MinIO | ~30-50s | + +### Environment Variables for Testing + +```bash +# PostgreSQL connection (required for integration/lock/migration tests) +DATABASE_URL=postgres://postgres:postgres@localhost:5432/s3dedup_test + +# Logging during tests +RUST_LOG=debug # Show debug logs +RUST_LOG=s3dedup=debug # Only s3dedup crate logs +RUST_BACKTRACE=1 # Enable backtraces for panics +``` + +### Troubleshooting Tests + +**PostgreSQL Connection Refused** + +```bash +# Verify PostgreSQL is running +docker ps | grep postgres + +# Check connection manually +psql -U postgres -h localhost -d s3dedup_test -c "SELECT 1;" +``` + +**Database Already Exists Error** + +```bash +# Drop and recreate test database +dropdb -U postgres s3dedup_test || true +createdb -U postgres s3dedup_test +``` + +**Test Hangs or Times Out** + +```bash +# Run with timeout +timeout 30 cargo test --test postgres_locks_test test_exclusive_lock_mutual_exclusion -- --nocapture + +# Check PostgreSQL locks status +psql -U postgres -d s3dedup_test -c "SELECT * FROM pg_locks WHERE locktype = 'advisory';" +``` + +## Connection Pool Sizing + +The `POSTGRES_MAX_CONNECTIONS` setting controls the maximum number of concurrent database connections from a single s3dedup instance. This **single pool** is shared between KV storage operations and lock management. + +### How to Choose Pool Size + +``` +Pool Size = (Concurrent Requests × 1.5) + Lock Overhead +``` + +### General Guidelines + +| Deployment | Concurrency | Recommended Pool Size | Notes | +|------------|-------------|----------------------|-------| +| **Low** | 1-5 concurrent requests | 10 | Default, suitable for development/testing | +| **Medium** | 5-20 concurrent requests | 20-30 | Small production deployments | +| **High** | 20-100 concurrent requests | 50-100 | Large production deployments | +| **Very High** | 100+ concurrent requests | 100-200 | Use multiple instances with load balancing | + +### Factors to Consider + +1. **Number of s3dedup Instances** + - If you have N instances, each needs its own pool + - Total connections = N instances × pool_size + - PostgreSQL must have enough capacity for all instances + - Example: 3 instances × 30 pool_size = 90 connections needed + +2. **Lock Contention** + - File operations acquire locks (1 connection per lock) + - Concurrent uploads/downloads increase lock pressure + - Add 20% overhead for lock operations + - Example: 20 concurrent requests → pool_size = (20 × 1.5) + overhead ≈ 35 + +3. **Database Configuration** + - Check PostgreSQL `max_connections` setting + - Reserve connections for maintenance, monitoring, backups + - Example: PostgreSQL with 200 max_connections: + - Reserve 10 for maintenance + - If 3 s3dedup instances: (200 - 10) / 3 ≈ 63 per instance + +4. **Memory Usage Per Connection** + - Each connection uses ~5-10 MB of memory + - Pool size 50 = ~250-500 MB per instance + - Monitor actual usage and adjust accordingly + +### Example Configurations + +**Development (1 instance, low throughput):** +```json +"postgres": { + "pool_size": 10 +} +``` + +**Production (3 instances, medium throughput):** +```json +"postgres": { + "pool_size": 30 +} +``` +With PostgreSQL `max_connections = 100`: +- 3 × 30 = 90 connections (10 reserved) + +**High-Availability (5 instances, high throughput with PostgreSQL max_connections = 200):** +```json +"postgres": { + "pool_size": 35 +} +``` +- 5 × 35 = 175 connections (25 reserved for other operations) + +### Monitoring and Tuning + +Monitor these metrics to optimize pool size: + +1. **Connection Utilization**: Check if connections are frequently exhausted + ```sql + SELECT count(*) FROM pg_stat_activity WHERE datname = 's3dedup'; + ``` + +2. **Lock Wait Times**: Monitor if operations wait for available connections +3. **Memory Usage**: Watch instance memory as pool size increases + +**Scaling Strategy:** + +- **Start Conservative**: Begin with pool_size = 10-20 +- **Monitor Usage**: Track connection utilization over 1-2 weeks +- **Increase Gradually**: Increment by 10-20 when you see high utilization +- **Scale Horizontally**: Instead of very large pools (>100), use more instances with moderate pools + +## Lock Implementation Details + +### Memory Locks + +Memory-based locks use Tokio RwLock for efficient single-instance deployments: + +- **Shared Locks**: Multiple readers can hold the lock simultaneously +- **Exclusive Locks**: Only one writer can hold the lock +- **Release**: Explicitly dropping the Tokio guard via `.release().await` +- **Cleanup**: Lock entries are cleaned up from the HashMap when no references remain + +### PostgreSQL Locks + +PostgreSQL advisory locks provide distributed locking for multi-instance deployments: + +- **Session-Scoped**: Locks are tied to a database connection +- **Shared vs Exclusive**: Identical semantics to memory locks but database-enforced +- **Explicit Release**: Must call `.release().await` to unlock before connection returns to pool +- **Key Hashing**: Lock keys are hashed to 64-bit integers for PostgreSQL's lock API +- **Atomic Release**: Both lock release and connection return to pool happen atomically + +### Lock Guard Pattern + +Both implementations follow the same pattern: + +```rust +// Acquire lock +let guard = lock.acquire_exclusive().await?; + +// ... do work ... + +// Explicitly release +guard.release().await?; // Calls drop in memory locks, pg_advisory_unlock in PostgreSQL +``` + +This unified pattern ensures consistent behavior across both backends. + +## Architecture + +For detailed architecture documentation, see: + +- **[docs/deduplication.md](docs/deduplication.md)** - How content-based deduplication works, data flows, and performance characteristics +- **[docs/migration.md](docs/migration.md)** - Migration strategies and procedures + +## Key Components + +### Locks Module (`src/locks/`) + +- **`mod.rs`**: Trait definitions and enums +- **`memory.rs`**: In-memory lock implementation using Tokio RwLock +- **`postgres.rs`**: PostgreSQL advisory lock implementation + +### Routes Module (`src/routes/ft/`) + +- **`get_file.rs`**: GET endpoint with shared locks +- **`put_file.rs`**: PUT endpoint with exclusive locks +- **`delete_file.rs`**: DELETE endpoint with exclusive locks +- **`utils.rs`**: Shared utilities for Filetracker protocol + +### Storage Module (`src/kvstorage/`) + +- **`mod.rs`**: Trait definitions +- **`sqlite.rs`**: SQLite backend implementation +- **`postgres.rs`**: PostgreSQL backend implementation + +## Code Style + +Follow Rust conventions: + +```bash +# Format code +cargo fmt + +# Check linter warnings +cargo clippy + +# Fix common issues +cargo clippy --fix +``` + +## Contributing + +When making changes: + +1. Write tests for new functionality +2. Ensure all existing tests pass: `cargo test` +3. Run clippy: `cargo clippy` +4. Format code: `cargo fmt` +5. Document public APIs with doc comments +6. Update relevant documentation files + +## Performance Considerations + +### Lock Contention + +- Use shortest critical sections possible +- Release locks explicitly with `.release().await` to avoid holding connections +- PostgreSQL connection pool size should account for lock overhead + +### Database Pool Sizing + +See [docs/configuration.md](README.md#connection-pool-sizing) for detailed pool sizing guidance. + +### Lock Performance + +- Memory locks are faster (no network round-trip) +- PostgreSQL locks have slight overhead but enable distributed coordination +- Choose based on deployment model (single-instance vs multi-instance HA) diff --git a/README.md b/README.md index 67f9c57..4ce92e7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ S3 deduplication proxy server with Filetracker protocol compatibility. - **Distributed Locking**: PostgreSQL advisory locks for distributed, high-availability deployments - **Migration Support**: Offline and live migration from old Filetracker instances - **Auto Cleanup**: Background cleaner removes unreferenced S3 objects -- **Multi-bucket**: Run multiple independent buckets on different ports +- **Single-instance per bucket**: Each instance handles exactly one bucket; scale horizontally with multiple instances ## Quick Start with Docker @@ -93,7 +93,7 @@ POSTGRES_MAX_CONNECTIONS=10 ### Distributed Locking (PostgreSQL Advisory Locks) -For high-availability deployments with multiple s3dedup instances, enable PostgreSQL-based distributed locks: +For distributed locking across multiple instances in high-availability setups, enable PostgreSQL-based advisory locks: ``` LOCKS_TYPE=postgres @@ -105,109 +105,22 @@ POSTGRES_DB=s3dedup POSTGRES_MAX_CONNECTIONS=10 ``` -**Benefits of PostgreSQL Locks**: -- **Distributed Locking**: Multiple s3dedup instances can safely coordinate file operations -- **High Availability**: If one instance fails, others can continue with the same locks -- **Load Balancing**: Multiple instances can share the same database for coordinated access -- **Atomic Operations**: Prevents race conditions in concurrent file operations +**When to Use**: +- **Single-instance deployments**: Use default memory-based locking (LOCKS_TYPE=memory) +- **Multi-instance HA deployments**: Use PostgreSQL-based locking for coordinated access -**How It Works**: -- Uses PostgreSQL's built-in advisory locks (`pg_advisory_lock`, `pg_advisory_lock_shared`) -- Lock keys are hashed to 64-bit integers for PostgreSQL's lock API -- Shared locks allow concurrent reads; exclusive locks ensure serialized writes -- Automatic lock release when guard is dropped (via background cleanup tasks) - -**Note**: PostgreSQL locks require the same PostgreSQL instance used for KV storage. Connection pool is shared between both uses. +**Note**: PostgreSQL locks share the connection pool with KV storage. Ensure sufficient pool size for concurrent operations. See [DEVELOPMENT.md](DEVELOPMENT.md) for implementation details. ### Connection Pool Sizing -The `POSTGRES_MAX_CONNECTIONS` setting controls the maximum number of concurrent database connections from a single s3dedup instance. This **single pool** is shared between KV storage operations and lock management. - -**How to Choose Pool Size:** - -``` -Pool Size = (Concurrent Requests × 1.5) + Lock Overhead -``` - -**General Guidelines:** - -| Deployment | Concurrency | Recommended Pool Size | Notes | -|------------|-------------|----------------------|-------| -| **Low** | 1-5 concurrent requests | 10 | Default, suitable for development/testing | -| **Medium** | 5-20 concurrent requests | 20-30 | Small production deployments | -| **High** | 20-100 concurrent requests | 50-100 | Large production deployments | -| **Very High** | 100+ concurrent requests | 100-200 | Use multiple instances with load balancing | - -**Factors to Consider:** - -1. **Number of s3dedup Instances** - - If you have N instances, each needs its own pool - - Total connections = N instances × pool_size - - PostgreSQL must have enough capacity for all instances - - Example: 3 instances × 30 pool_size = 90 connections needed - -2. **Lock Contention** - - File operations acquire locks (1 connection per lock) - - Concurrent uploads/downloads increase lock pressure - - Add 20% overhead for lock operations - - Example: 20 concurrent requests → pool_size = (20 × 1.5) + overhead ≈ 35 - -3. **Database Configuration** - - Check PostgreSQL `max_connections` setting - - Reserve connections for maintenance, monitoring, backups - - Example: PostgreSQL with 200 max_connections: - - Reserve 10 for maintenance - - If 3 s3dedup instances: (200 - 10) / 3 ≈ 63 per instance - -4. **Memory Usage Per Connection** - - Each connection uses ~5-10 MB of memory - - Pool size 50 = ~250-500 MB per instance - - Monitor actual usage and adjust accordingly - -**Example Configurations:** - -**Development (1 instance, low throughput):** -```json -"postgres": { - "pool_size": 10 -} -``` - -**Production (3 instances, medium throughput):** -```json -"postgres": { - "pool_size": 30 -} -``` -With PostgreSQL `max_connections = 100`: -- 3 × 30 = 90 connections (10 reserved) - -**High-Availability (5 instances, high throughput with PostgreSQL max_connections = 200):** -```json -"postgres": { - "pool_size": 35 -} -``` -- 5 × 35 = 175 connections (25 reserved for other operations) +The `POSTGRES_MAX_CONNECTIONS` setting controls the maximum number of concurrent database connections. This pool is shared between KV storage operations and lock management. -**Monitoring and Tuning:** +**Quick Start Recommendations:** +- **Development**: `POSTGRES_MAX_CONNECTIONS=10` +- **Small Production (1-3 instances)**: `POSTGRES_MAX_CONNECTIONS=20-30` +- **Large Production (5+ instances)**: `POSTGRES_MAX_CONNECTIONS=50-100` -Monitor these metrics to optimize pool size: - -1. **Connection Utilization**: Check if connections are frequently exhausted - ```sql - SELECT count(*) FROM pg_stat_activity WHERE datname = 's3dedup'; - ``` - -2. **Lock Wait Times**: Monitor if operations wait for available connections -3. **Memory Usage**: Watch instance memory as pool size increases - -**Scaling Strategy:** - -- **Start Conservative**: Begin with pool_size = 10-20 -- **Monitor Usage**: Track connection utilization over 1-2 weeks -- **Increase Gradually**: Increment by 10-20 when you see high utilization -- **Scale Horizontally**: Instead of very large pools (>100), use more instances with moderate pools +For detailed pool sizing guidance, monitoring strategies, and tuning considerations, see [DEVELOPMENT.md](DEVELOPMENT.md#connection-pool-sizing). ### Config File @@ -224,6 +137,47 @@ docker run -d \ Environment variables override config file values. +## Deployment and Scaling + +### Single-Instance per Bucket Architecture + +s3dedup follows a **single-bucket-per-instance** design pattern, consistent with 12-factor application principles: + +- **One Instance = One Bucket**: Each s3dedup instance manages exactly one S3 bucket and serves one Filetracker endpoint +- **Horizontal Scaling**: For multiple buckets, run multiple s3dedup instances (one per bucket) +- **Simplified Configuration**: Cleaner config files, easier to reason about, better for container orchestration + +### High-Availability Deployments + +For a single bucket with high availability, run multiple instances with PostgreSQL locks and shared database: + +```bash +# All instances share the same PostgreSQL database and use PostgreSQL locks +docker run -d \ + --name s3dedup-ha-1 \ + -p 8001:8080 \ + -e BUCKET_NAME=files \ + -e LISTEN_PORT=8080 \ + -e KVSTORAGE_TYPE=postgres \ + -e LOCKS_TYPE=postgres \ + -e POSTGRES_HOST=postgres-db \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=password \ + -e POSTGRES_DB=s3dedup \ + -e S3_ENDPOINT=http://minio:9000 \ + -e S3_ACCESS_KEY=minioadmin \ + -e S3_SECRET_KEY=minioadmin \ + ghcr.io/sio2project/s3dedup:latest server --env + +# Repeat for instances 2, 3, etc., on different ports +``` + +**Benefits of HA Setup**: +- **Load Balancing**: Requests can be distributed across multiple instances +- **Fault Tolerance**: If one instance fails, others continue serving requests +- **Coordinated Access**: PostgreSQL locks ensure safe concurrent file operations +- **Shared Metadata**: Single database prevents data inconsistency + ## Migration > **📖 Complete Migration Guide**: See [docs/migration.md](docs/migration.md) for comprehensive migration instructions @@ -344,27 +298,43 @@ Compatible with Filetracker protocol v2: - `PUT /ft/files/{path}` - Upload file - `DELETE /ft/files/{path}` - Delete file -## Building from Source -```bash -# Build binary -cargo build --release +## Testing + +For comprehensive testing guide, see **[DEVELOPMENT.md](DEVELOPMENT.md)**. -# Build Docker image -docker build -t s3dedup:1.0.0-dev . +Quick start: + +```bash +# Run unit tests (no external dependencies) +cargo test --lib -# Run tests +# Run all tests (requires PostgreSQL + MinIO) +docker-compose up -d +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" cargo test +docker-compose down ``` ## Development +See **[DEVELOPMENT.md](DEVELOPMENT.md)** for detailed development instructions including: + +- Building from source +- Running tests with different configurations +- PostgreSQL advisory lock implementation details +- Contributing guidelines +- Performance considerations + +Quick start: + ```bash -# Run with Docker Compose (includes MinIO) +# Run with Docker Compose (includes PostgreSQL + MinIO) docker-compose up -# Run locally -cargo run -- server --config config.json +# In another terminal, run tests +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/s3dedup_test" +cargo test ``` ## Architecture @@ -378,10 +348,13 @@ cargo run -- server --config config.json - PostgreSQL locks: Distributed coordination, suitable for multi-instance HA setups - **Cleaner**: Background worker that removes unreferenced S3 objects -For detailed architecture documentation, see [docs/deduplication.md](docs/deduplication.md). +For detailed architecture documentation, see: +- [docs/deduplication.md](docs/deduplication.md) - Deduplication architecture and performance +- [DEVELOPMENT.md](DEVELOPMENT.md) - Lock implementation details and code architecture ## Documentation +- **[Development Guide](DEVELOPMENT.md)** - Building, testing, lock implementation details, and contributing - **[Migration Guide](docs/migration.md)** - Migrating from Filetracker v2.1+ (offline and live migration strategies) - **[Deduplication Architecture](docs/deduplication.md)** - How content-based deduplication works, data flows, and performance characteristics diff --git a/config.json b/config.json index d514db5..63949d7 100644 --- a/config.json +++ b/config.json @@ -3,30 +3,28 @@ "level": "debug", "json": false }, - "buckets": [ - { - "name": "bucket1", - "address": "0.0.0.0", - "port": 3000, - "kvstorage_type": "sqlite", - "sqlite": { - "path": "db/kv.db", - "pool_size": 10 - }, - "locks_type": "memory", - "s3storage_type": "minio", - "minio": { - "endpoint": "http://localhost:9000", - "access_key": "minioadmin", - "secret_key": "minioadmin", - "force_path_style": true - }, - "cleaner": { - "enabled": false, - "interval_seconds": 3600, - "batch_size": 1000, - "max_deletes_per_run": 10000 - } + "kvstorage_type": "sqlite", + "sqlite": { + "path": "db/kv.db", + "pool_size": 10 + }, + "locks_type": "memory", + "bucket": { + "name": "default", + "address": "0.0.0.0", + "port": 8080, + "s3storage_type": "minio", + "minio": { + "endpoint": "http://localhost:9000", + "access_key": "minioadmin", + "secret_key": "minioadmin", + "force_path_style": true + }, + "cleaner": { + "enabled": false, + "interval_seconds": 3600, + "batch_size": 1000, + "max_deletes_per_run": 10000 } - ] + } } \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index efadb5c..bb06970 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,14 +13,6 @@ pub use crate::s3storage::minio::MinIOConfig; #[derive(Debug, serde::Deserialize, Clone)] pub struct Config { pub logging: LoggingConfig, - pub buckets: Vec, -} - -#[derive(Debug, serde::Deserialize, Clone)] -pub struct BucketConfig { - pub name: String, - pub address: String, - pub port: u16, pub kvstorage_type: KVStorageType, @@ -32,6 +24,15 @@ pub struct BucketConfig { pub locks_type: LocksType, + pub bucket: BucketConfig, +} + +#[derive(Debug, serde::Deserialize, Clone)] +pub struct BucketConfig { + pub name: String, + pub address: String, + pub port: u16, + pub s3storage_type: S3StorageType, #[serde(default)] @@ -54,10 +55,8 @@ impl Config { let config_str = std::fs::read_to_string(path)?; let mut config: Config = serde_json::from_str(config_str.as_str())?; - // Apply environment variable overrides for the first bucket - if !config.buckets.is_empty() { - config.buckets[0].apply_env_overrides(); - } + // Apply environment variable overrides + config.apply_env_overrides(); Ok(config) } @@ -72,30 +71,69 @@ impl Config { .unwrap_or(false), }; + let kvstorage_type_str = + std::env::var("KVSTORAGE_TYPE").unwrap_or_else(|_| "sqlite".to_string()); + let kvstorage_type = match kvstorage_type_str.as_str() { + "postgres" => KVStorageType::Postgres, + "sqlite" => KVStorageType::SQLite, + _ => bail!("Invalid KVSTORAGE_TYPE: {}", kvstorage_type_str), + }; + + let sqlite = if matches!(kvstorage_type, KVStorageType::SQLite) { + Some(SQLiteConfig { + path: std::env::var("SQLITE_PATH") + .unwrap_or_else(|_| "/app/data/kv.db".to_string()), + pool_size: std::env::var("SQLITE_MAX_CONNECTIONS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10), + }) + } else { + None + }; + + let postgres = if matches!(kvstorage_type, KVStorageType::Postgres) { + Some(PostgresConfig { + host: std::env::var("POSTGRES_HOST")?, + port: std::env::var("POSTGRES_PORT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5432), + user: std::env::var("POSTGRES_USER")?, + password: std::env::var("POSTGRES_PASSWORD")?, + dbname: std::env::var("POSTGRES_DB")?, + pool_size: std::env::var("POSTGRES_MAX_CONNECTIONS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10), + }) + } else { + None + }; + + let locks_type = { + let locks_type_str = + std::env::var("LOCKS_TYPE").unwrap_or_else(|_| "memory".to_string()); + match locks_type_str.as_str() { + "memory" => LocksType::Memory, + _ => LocksType::Memory, // Default to memory + } + }; + let bucket = BucketConfig::from_env()?; Ok(Config { logging, - buckets: vec![bucket], + kvstorage_type, + postgres, + sqlite, + locks_type, + bucket, }) } -} -impl BucketConfig { - /// Apply environment variable overrides to this bucket config + /// Apply environment variable overrides to this config fn apply_env_overrides(&mut self) { - if let Ok(val) = std::env::var("BUCKET_NAME") { - self.name = val; - } - if let Ok(val) = std::env::var("LISTEN_ADDRESS") { - self.address = val; - } - if let Ok(val) = std::env::var("LISTEN_PORT") - && let Ok(port) = val.parse() - { - self.port = port; - } - // SQLite overrides if let Some(ref mut sqlite) = self.sqlite { if let Ok(val) = std::env::var("SQLITE_PATH") { @@ -134,6 +172,26 @@ impl BucketConfig { } } + // Apply bucket-specific overrides + self.bucket.apply_env_overrides(); + } +} + +impl BucketConfig { + /// Apply environment variable overrides to this bucket config + fn apply_env_overrides(&mut self) { + if let Ok(val) = std::env::var("BUCKET_NAME") { + self.name = val; + } + if let Ok(val) = std::env::var("LISTEN_ADDRESS") { + self.address = val; + } + if let Ok(val) = std::env::var("LISTEN_PORT") + && let Ok(port) = val.parse() + { + self.port = port; + } + // MinIO/S3 overrides if let Some(ref mut minio) = self.minio { if let Ok(val) = std::env::var("S3_ENDPOINT") { @@ -155,46 +213,6 @@ impl BucketConfig { /// Create a bucket config from environment variables only fn from_env() -> Result { - let kvstorage_type_str = - std::env::var("KVSTORAGE_TYPE").unwrap_or_else(|_| "sqlite".to_string()); - let kvstorage_type = match kvstorage_type_str.as_str() { - "postgres" => KVStorageType::Postgres, - "sqlite" => KVStorageType::SQLite, - _ => bail!("Invalid KVSTORAGE_TYPE: {}", kvstorage_type_str), - }; - - let sqlite = if matches!(kvstorage_type, KVStorageType::SQLite) { - Some(SQLiteConfig { - path: std::env::var("SQLITE_PATH") - .unwrap_or_else(|_| "/app/data/kv.db".to_string()), - pool_size: std::env::var("SQLITE_MAX_CONNECTIONS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(10), - }) - } else { - None - }; - - let postgres = if matches!(kvstorage_type, KVStorageType::Postgres) { - Some(PostgresConfig { - host: std::env::var("POSTGRES_HOST")?, - port: std::env::var("POSTGRES_PORT") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(5432), - user: std::env::var("POSTGRES_USER")?, - password: std::env::var("POSTGRES_PASSWORD")?, - dbname: std::env::var("POSTGRES_DB")?, - pool_size: std::env::var("POSTGRES_MAX_CONNECTIONS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(10), - }) - } else { - None - }; - let s3storage_type_str = std::env::var("S3STORAGE_TYPE").unwrap_or_else(|_| "minio".to_string()); let s3storage_type = match s3storage_type_str.as_str() { @@ -223,17 +241,6 @@ impl BucketConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(8080), - kvstorage_type, - sqlite, - postgres, - locks_type: { - let locks_type_str = - std::env::var("LOCKS_TYPE").unwrap_or_else(|_| "memory".to_string()); - match locks_type_str.as_str() { - "memory" => LocksType::Memory, - _ => LocksType::Memory, // Default to memory - } - }, s3storage_type, minio, cleaner: CleanerConfig { diff --git a/src/kvstorage/mod.rs b/src/kvstorage/mod.rs index 9ffaec5..8715187 100644 --- a/src/kvstorage/mod.rs +++ b/src/kvstorage/mod.rs @@ -1,4 +1,4 @@ -use crate::config::BucketConfig; +use crate::config::Config; use anyhow::Result; use serde::Deserialize; use tracing::{debug, info}; @@ -16,13 +16,37 @@ pub enum KVStorageType { } pub(crate) trait KVStorageTrait { - async fn new(config: &BucketConfig) -> Result> + async fn new(config: &Config) -> Result> where Self: Sized; async fn setup(&mut self) -> Result<()>; async fn get_ref_count(&mut self, bucket: &str, hash: &str) -> Result; async fn set_ref_count(&mut self, bucket: &str, hash: &str, ref_cnt: i32) -> Result<()>; + /// Atomically increment the reference count (database-level atomic operation) + /// Prefer this over get_ref_count + set_ref_count to avoid race conditions + async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + // Default implementation: non-atomic. Database implementations should override. + let cnt = self.get_ref_count(bucket, hash).await?; + self.set_ref_count(bucket, hash, cnt + 1).await?; + Ok(cnt + 1) + } + + /// Atomically decrement the reference count (database-level atomic operation) + /// Prefer this over get_ref_count + set_ref_count to avoid race conditions + /// If the reference count is already 0, do nothing and return 0. + /// Returns the new reference count after decrementing. + async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + // Default implementation: non-atomic. Database implementations should override. + let cnt = self.get_ref_count(bucket, hash).await?; + if cnt == 0 { + return Ok(0); + } + let new_count = cnt - 1; + self.set_ref_count(bucket, hash, new_count).await?; + Ok(new_count) + } + async fn increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result<()> { let cnt = self.get_ref_count(bucket, hash).await?; self.set_ref_count(bucket, hash, cnt + 1).await @@ -106,6 +130,13 @@ pub(crate) trait KVStorageTrait { /// Get deduplicated bytes saved (sum of (refcount - 1) * logical_size for all blobs) async fn get_deduplicated_bytes_saved(&mut self, bucket: &str) -> Result; + + /// Get connection pool statistics (active connections, idle connections) + /// Returns (active, idle) + fn get_pool_stats(&self) -> (u32, u32) { + // Default implementation for non-database backends + (0, 0) + } } #[derive(Clone)] @@ -115,7 +146,7 @@ pub enum KVStorage { } impl KVStorage { - pub async fn new(config: &BucketConfig) -> Result> { + pub async fn new(config: &Config) -> Result> { match config.kvstorage_type { KVStorageType::Postgres => { info!("Using Postgres as KV storage"); @@ -166,6 +197,39 @@ impl KVStorage { } } + /** + * Atomically increment the reference count (database-level atomic operation). + * Returns the new reference count after incrementing. + * Prefer this over increment_ref_count to avoid race conditions. + */ + pub async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + debug!( + "Atomically incrementing ref count for bucket: {}, hash: {}", + bucket, hash + ); + match self { + KVStorage::Postgres(storage) => storage.atomic_increment_ref_count(bucket, hash).await, + KVStorage::SQLite(storage) => storage.atomic_increment_ref_count(bucket, hash).await, + } + } + + /** + * Atomically decrement the reference count (database-level atomic operation). + * If the reference count is already 0, do nothing and return 0. + * Returns the new reference count after decrementing. + * Prefer this over decrement_ref_count to avoid race conditions. + */ + pub async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + debug!( + "Atomically decrementing ref count for bucket: {}, hash: {}", + bucket, hash + ); + match self { + KVStorage::Postgres(storage) => storage.atomic_decrement_ref_count(bucket, hash).await, + KVStorage::SQLite(storage) => storage.atomic_decrement_ref_count(bucket, hash).await, + } + } + /** * Increment the reference count for a hash. */ @@ -449,4 +513,11 @@ impl KVStorage { KVStorage::SQLite(storage) => storage.get_deduplicated_bytes_saved(bucket).await, } } + + pub fn get_pool_stats(&self) -> (u32, u32) { + match self { + KVStorage::Postgres(storage) => storage.get_pool_stats(), + KVStorage::SQLite(storage) => storage.get_pool_stats(), + } + } } diff --git a/src/kvstorage/postgres.rs b/src/kvstorage/postgres.rs index 9fdfaaf..30a7477 100644 --- a/src/kvstorage/postgres.rs +++ b/src/kvstorage/postgres.rs @@ -1,9 +1,10 @@ -use crate::config::BucketConfig; +use crate::config::Config; use crate::kvstorage::KVStorageTrait; -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Deserialize; use sqlx::PgPool; use sqlx::postgres::PgPoolOptions; +use std::time::Duration; use tracing::debug; #[derive(Debug, Clone, Deserialize)] @@ -31,20 +32,37 @@ impl Postgres { } impl KVStorageTrait for Postgres { - async fn new(config: &BucketConfig) -> Result> { + async fn new(config: &Config) -> Result> { let pg_config = config.postgres.as_ref().unwrap(); let db_url = format!( - "postgres://{}:{}@{}:{}/{}", + "postgres://{}:{}@{}:{}/{}?connect_timeout=10", pg_config.user, pg_config.password, pg_config.host, pg_config.port, pg_config.dbname ); - debug!("Connecting to Postgres database: {}", db_url); + debug!( + "Connecting to Postgres: postgres://{}:****@{}:{}/{}", + pg_config.user, pg_config.host, pg_config.port, pg_config.dbname + ); + let pool = PgPoolOptions::new() .max_connections(pg_config.pool_size) + .acquire_timeout(Duration::from_secs(30)) + .idle_timeout(Some(Duration::from_secs(600))) + .max_lifetime(Some(Duration::from_secs(1800))) .connect(&db_url) - .await?; + .await + .context("Failed to connect to PostgreSQL")?; + + // Validate connection works + sqlx::query("SELECT 1") + .execute(&pool) + .await + .context("PostgreSQL connection validation failed")?; + + debug!("Successfully validated PostgreSQL connection"); + Ok(Box::new(Postgres { pool, - bucket: config.name.clone(), + bucket: config.bucket.name.clone(), })) } async fn setup(&mut self) -> Result<()> { @@ -141,6 +159,48 @@ impl KVStorageTrait for Postgres { Ok(()) } + async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + let table = self.table_name("refcount"); + // PostgreSQL: atomic increment using INSERT...ON CONFLICT...DO UPDATE...RETURNING + let query = format!( + "INSERT INTO {} (bucket, hash, refcount) VALUES ($1, $2, 1) + ON CONFLICT (bucket, hash) DO UPDATE SET refcount = refcount + 1 + RETURNING refcount", + table + ); + let (count,): (i32,) = sqlx::query_as(&query) + .bind(bucket) + .bind(hash) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + + async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + let table = self.table_name("refcount"); + // PostgreSQL: atomic decrement using UPDATE...RETURNING with GREATEST to prevent negative + let query = format!( + "UPDATE {} SET refcount = GREATEST(0, refcount - 1) + WHERE bucket = $1 AND hash = $2 + RETURNING refcount", + table + ); + let result = sqlx::query_as::<_, (i32,)>(&query) + .bind(bucket) + .bind(hash) + .fetch_optional(&self.pool) + .await; + + match result { + Ok(Some((count,))) => Ok(count), + Ok(None) => { + // Row not found, return 0 + Ok(0) + } + Err(e) => Err(e.into()), + } + } + async fn get_modified(&mut self, bucket: &str, path: &str) -> Result { let table = self.table_name("modified"); let query = format!( @@ -473,4 +533,11 @@ impl KVStorageTrait for Postgres { .await?; Ok(total) } + + fn get_pool_stats(&self) -> (u32, u32) { + let total_connections = self.pool.size(); + let idle_connections = self.pool.num_idle() as u32; + let active_connections = total_connections.saturating_sub(idle_connections); + (active_connections, idle_connections) + } } diff --git a/src/kvstorage/sqlite.rs b/src/kvstorage/sqlite.rs index 51c273f..d335086 100644 --- a/src/kvstorage/sqlite.rs +++ b/src/kvstorage/sqlite.rs @@ -1,9 +1,10 @@ -use crate::config::BucketConfig; +use crate::config::Config; use crate::kvstorage::KVStorageTrait; -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Deserialize; use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; use std::path::Path; +use std::time::Duration; use tracing::debug; #[derive(Debug, Clone, Deserialize)] @@ -18,21 +19,32 @@ pub struct SQLite { } impl KVStorageTrait for SQLite { - async fn new(config: &BucketConfig) -> Result> { + async fn new(config: &Config) -> Result> { let sqlite_config = config.sqlite.as_ref().unwrap(); - if !Path::new(&sqlite_config.path).exists() { - std::fs::File::create(&sqlite_config.path)?; + // Ensure parent directory exists + if let Some(parent) = Path::new(&sqlite_config.path).parent() { + std::fs::create_dir_all(parent)?; } + // SQLite with mode=rwc creates file atomically if needed let db_url = format!("sqlite://{}?mode=rwc", sqlite_config.path); debug!("Connecting to SQLite database: {}", db_url); let pool = SqlitePoolOptions::new() .max_connections(sqlite_config.pool_size) - .acquire_timeout(std::time::Duration::from_secs(30)) + .acquire_timeout(Duration::from_secs(30)) .connect(&db_url) - .await?; + .await + .context("Failed to connect to SQLite")?; + + // Validate connection works + sqlx::query("SELECT 1") + .execute(&pool) + .await + .context("SQLite connection validation failed")?; + + debug!("Successfully validated SQLite connection"); // Enable WAL mode for better concurrency sqlx::query("PRAGMA journal_mode=WAL") @@ -102,6 +114,62 @@ impl KVStorageTrait for SQLite { Ok(()) } + async fn atomic_increment_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + // SQLite atomic increment using UPDATE...RETURNING (SQLite 3.35+) + // First try INSERT if not exists, then UPDATE to increment + let result: Result<(i32,), sqlx::Error> = sqlx::query_as( + "INSERT OR IGNORE INTO refcount (bucket, hash, refcount) VALUES (?1, ?2, 1); + UPDATE refcount SET refcount = refcount + 1 WHERE bucket = ?1 AND hash = ?2; + SELECT refcount FROM refcount WHERE bucket = ?1 AND hash = ?2", + ) + .bind(bucket) + .bind(hash) + .bind(bucket) + .bind(hash) + .bind(bucket) + .bind(hash) + .fetch_one(&self.pool) + .await; + + match result { + Ok((count,)) => Ok(count), + Err(_) => { + // Fallback to explicit increment if UPDATE...RETURNING not supported + let cnt = self.get_ref_count(bucket, hash).await?; + self.set_ref_count(bucket, hash, cnt + 1).await?; + Ok(cnt + 1) + } + } + } + + async fn atomic_decrement_ref_count(&mut self, bucket: &str, hash: &str) -> Result { + // SQLite atomic decrement using UPDATE...RETURNING (SQLite 3.35+) + let result: Result<(i32,), sqlx::Error> = sqlx::query_as( + "UPDATE refcount SET refcount = MAX(0, refcount - 1) WHERE bucket = ?1 AND hash = ?2; + SELECT refcount FROM refcount WHERE bucket = ?1 AND hash = ?2", + ) + .bind(bucket) + .bind(hash) + .bind(bucket) + .bind(hash) + .fetch_one(&self.pool) + .await; + + match result { + Ok((count,)) => Ok(count), + Err(_) => { + // Fallback to explicit decrement if UPDATE...RETURNING not supported + let cnt = self.get_ref_count(bucket, hash).await?; + if cnt == 0 { + return Ok(0); + } + let new_count = cnt - 1; + self.set_ref_count(bucket, hash, new_count).await?; + Ok(new_count) + } + } + } + async fn get_modified(&mut self, bucket: &str, path: &str) -> Result { let result: Result<(i64,), sqlx::Error> = sqlx::query_as("SELECT modified FROM modified WHERE bucket = ?1 AND path = ?2") @@ -367,4 +435,11 @@ impl KVStorageTrait for SQLite { .await?; Ok(total.unwrap_or(0)) } + + fn get_pool_stats(&self) -> (u32, u32) { + let total_connections = self.pool.size(); + let idle_connections = self.pool.num_idle() as u32; + let active_connections = total_connections.saturating_sub(idle_connections); + (active_connections, idle_connections) + } } diff --git a/src/lib.rs b/src/lib.rs index 13a580d..b432496 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,13 +38,13 @@ pub struct AppState { } impl AppState { - pub async fn new(config: &config::BucketConfig) -> Result> { + pub async fn new(config: &config::Config) -> Result> { let kvstorage = kvstorage::KVStorage::new(config).await?; let locks = locks::LocksStorage::new_with_config(config.locks_type, config).await?; - let s3storage = s3storage::S3Storage::new(config).await?; + let s3storage = s3storage::S3Storage::new(&config.bucket).await?; let metrics = Arc::new(metrics::Metrics::new()); Ok(Arc::new(Self { - bucket_name: config.name.clone(), + bucket_name: config.bucket.name.clone(), kvstorage: Arc::new(Mutex::new(kvstorage)), locks, s3storage: Arc::new(Mutex::new(s3storage)), @@ -54,12 +54,12 @@ impl AppState { } pub async fn new_with_filetracker( - config: &config::BucketConfig, + config: &config::Config, filetracker_url: String, ) -> Result> { let kvstorage = kvstorage::KVStorage::new(config).await?; let locks = locks::LocksStorage::new_with_config(config.locks_type, config).await?; - let s3storage = s3storage::S3Storage::new(config).await?; + let s3storage = s3storage::S3Storage::new(&config.bucket).await?; let filetracker_client = filetracker_client::FiletrackerClient::new(filetracker_url); let metrics = Arc::new(metrics::Metrics::new()); @@ -67,7 +67,7 @@ impl AppState { metrics::MIGRATION_ACTIVE.set(1); Ok(Arc::new(Self { - bucket_name: config.name.clone(), + bucket_name: config.bucket.name.clone(), kvstorage: Arc::new(Mutex::new(kvstorage)), locks, s3storage: Arc::new(Mutex::new(s3storage)), @@ -80,6 +80,11 @@ impl AppState { pub async fn update_storage_metrics(&self) -> Result<()> { let mut kv = self.kvstorage.lock().await; + // Update database connection pool metrics + let (active_conns, idle_conns) = kv.get_pool_stats(); + metrics::DB_CONNECTIONS_ACTIVE.set(active_conns as i64); + metrics::DB_CONNECTIONS_IDLE.set(idle_conns as i64); + // Get all stats let total_files = kv.get_total_files(&self.bucket_name).await?; let total_blobs = kv.get_total_blobs(&self.bucket_name).await?; diff --git a/src/locks/memory.rs b/src/locks/memory.rs index 74c5659..9546748 100644 --- a/src/locks/memory.rs +++ b/src/locks/memory.rs @@ -18,17 +18,40 @@ struct LockedKey<'a> { key: String, } -impl<'a> SharedLockGuard<'a> for RwLockReadGuard<'a, ()> {} -impl<'a> ExclusiveLockGuard<'a> for RwLockWriteGuard<'a, ()> {} +impl<'a> SharedLockGuard<'a> for RwLockReadGuard<'a, ()> { + fn release( + self: Box, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(async move { + drop(self); // Explicitly drop the guard to release the read lock + Ok(()) + }) + } +} + +impl<'a> ExclusiveLockGuard<'a> for RwLockWriteGuard<'a, ()> { + fn release( + self: Box, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(async move { + drop(self); // Explicitly drop the guard to release the write lock + Ok(()) + }) + } +} #[async_trait] impl<'a> Lock for LockedKey<'a> { - async fn acquire_shared<'b>(&'b self) -> Box + Send + 'b> { - Box::new(self.lock.read().await) + async fn acquire_shared<'b>( + &'b self, + ) -> anyhow::Result + Send + 'b>> { + Ok(Box::new(self.lock.read().await)) } - async fn acquire_exclusive<'b>(&'b self) -> Box + Send + 'b> { - Box::new(self.lock.write().await) + async fn acquire_exclusive<'b>( + &'b self, + ) -> anyhow::Result + Send + 'b>> { + Ok(Box::new(self.lock.write().await)) } } @@ -90,7 +113,7 @@ mod tests { async fn assert_locks_compile() { let memory = MemoryLocks::new(); let lock = memory.prepare_lock("1".into()).await; - let _guard = lock.acquire_exclusive().await; + let _guard = lock.acquire_exclusive().await.unwrap(); } #[tokio::test] @@ -103,9 +126,10 @@ mod tests { let tx = tx.clone(); tokio::spawn(async move { let lock = memory.prepare_lock("key1".into()).await; - let _guard = lock.acquire_shared().await; + let guard = lock.acquire_shared().await.unwrap(); tx.send(format!("acquired_{}", i)).await.unwrap(); sleep(Duration::from_millis(50)).await; + let _ = guard.release().await; tx.send(format!("released_{}", i)).await.unwrap(); }); } @@ -131,9 +155,10 @@ mod tests { let tx1 = tx.clone(); tokio::spawn(async move { let lock = memory1.prepare_lock("key1".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx1.send("task1_acquired").await.unwrap(); sleep(Duration::from_millis(100)).await; + let _ = guard.release().await; tx1.send("task1_released").await.unwrap(); }); @@ -143,8 +168,9 @@ mod tests { let tx2 = tx.clone(); tokio::spawn(async move { let lock = memory2.prepare_lock("key1".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx2.send("task2_acquired").await.unwrap(); + let _ = guard.release().await; }); drop(tx); @@ -171,9 +197,10 @@ mod tests { let tx1 = tx.clone(); tokio::spawn(async move { let lock = memory1.prepare_lock("key1".into()).await; - let _guard = lock.acquire_shared().await; + let guard = lock.acquire_shared().await.unwrap(); tx1.send("shared_acquired").await.unwrap(); sleep(Duration::from_millis(100)).await; + let _ = guard.release().await; tx1.send("shared_released").await.unwrap(); }); @@ -183,8 +210,9 @@ mod tests { let tx2 = tx.clone(); tokio::spawn(async move { let lock = memory2.prepare_lock("key1".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx2.send("exclusive_acquired").await.unwrap(); + let _ = guard.release().await; }); drop(tx); @@ -208,7 +236,8 @@ mod tests { { let lock = memory.prepare_lock("cleanup_key".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); + let _ = guard.release().await; } sleep(Duration::from_millis(50)).await; @@ -229,9 +258,10 @@ mod tests { let tx1 = tx.clone(); tokio::spawn(async move { let lock = memory1.prepare_lock("key1".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx1.send("key1_acquired").await.unwrap(); sleep(Duration::from_millis(100)).await; + let _ = guard.release().await; tx1.send("key1_released").await.unwrap(); }); @@ -241,8 +271,9 @@ mod tests { let tx2 = tx.clone(); tokio::spawn(async move { let lock = memory2.prepare_lock("key2".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx2.send("key2_acquired").await.unwrap(); + let _ = guard.release().await; }); drop(tx); @@ -265,9 +296,10 @@ mod tests { let tx1 = tx.clone(); tokio::spawn(async move { let lock = memory1.prepare_lock("shared_key".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx1.send("task1_acquired").await.unwrap(); sleep(Duration::from_millis(50)).await; + let _ = guard.release().await; tx1.send("task1_released").await.unwrap(); }); @@ -277,9 +309,10 @@ mod tests { let tx2 = tx.clone(); tokio::spawn(async move { let lock = memory2.prepare_lock("shared_key".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx2.send("task2_acquired").await.unwrap(); sleep(Duration::from_millis(100)).await; + let _ = guard.release().await; tx2.send("task2_released").await.unwrap(); }); @@ -289,8 +322,9 @@ mod tests { let tx3 = tx.clone(); tokio::spawn(async move { let lock = memory3.prepare_lock("shared_key".into()).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock.acquire_exclusive().await.unwrap(); tx3.send("task3_acquired").await.unwrap(); + let _ = guard.release().await; }); drop(tx); diff --git a/src/locks/mod.rs b/src/locks/mod.rs index 8bac476..6096a7b 100644 --- a/src/locks/mod.rs +++ b/src/locks/mod.rs @@ -28,16 +28,38 @@ pub enum LocksType { Postgres, } +use std::future::Future; +use std::pin::Pin; + #[must_use = "droping temporary lock makes no sense"] -pub trait SharedLockGuard<'a> {} +pub trait SharedLockGuard<'a> { + /// Release the lock explicitly before the guard is dropped. + /// For PostgreSQL locks, this unlocks the advisory lock in the database. + /// For memory locks, this drops the Tokio RwLock guard. + fn release(self: Box) -> Pin> + Send + 'a>> { + Box::pin(async { Ok(()) }) + } +} + #[must_use = "droping temporary lock makes no sense"] -pub trait ExclusiveLockGuard<'a> {} +pub trait ExclusiveLockGuard<'a> { + /// Release the lock explicitly before the guard is dropped. + /// For PostgreSQL locks, this unlocks the advisory lock in the database. + /// For memory locks, this drops the Tokio RwLock guard. + fn release(self: Box) -> Pin> + Send + 'a>> { + Box::pin(async { Ok(()) }) + } +} #[async_trait] #[must_use = "preparing temporary lock makes no sense"] pub trait Lock { - async fn acquire_shared<'a>(&'a self) -> Box + Send + 'a>; - async fn acquire_exclusive<'a>(&'a self) -> Box + Send + 'a>; + async fn acquire_shared<'a>( + &'a self, + ) -> anyhow::Result + Send + 'a>>; + async fn acquire_exclusive<'a>( + &'a self, + ) -> anyhow::Result + Send + 'a>>; } #[async_trait] @@ -69,7 +91,7 @@ impl LocksStorage { pub async fn new_with_config( lock_type: LocksType, - bucket_config: &crate::config::BucketConfig, + config: &crate::config::Config, ) -> anyhow::Result> { match lock_type { LocksType::Memory => { @@ -78,7 +100,7 @@ impl LocksStorage { } LocksType::Postgres => { info!("Using PostgreSQL as locks storage"); - let pg_locks = postgres::PostgresLocks::new_with_config(bucket_config).await?; + let pg_locks = postgres::PostgresLocks::new_with_config(config).await?; Ok(Box::new(LocksStorage::Postgres(pg_locks))) } } diff --git a/src/locks/postgres.rs b/src/locks/postgres.rs index ba0ece3..c7549f8 100644 --- a/src/locks/postgres.rs +++ b/src/locks/postgres.rs @@ -1,10 +1,11 @@ -use crate::config::BucketConfig; +use crate::config::Config; use crate::locks::{ExclusiveLockGuard, Lock, LockStorage, SharedLockGuard}; -use anyhow::Result; +use anyhow::{Context, Result}; use async_trait::async_trait; use serde::Deserialize; use sqlx::{PgPool, postgres::PgPoolOptions}; use std::sync::Arc; +use std::time::Duration; use tracing::debug; #[derive(Debug, Clone, Deserialize)] @@ -37,7 +38,7 @@ impl PostgresLocks { impl PostgresLocks { /// Create a new PostgreSQL locks instance with configuration - pub async fn new_with_config(config: &BucketConfig) -> Result> { + pub async fn new_with_config(config: &Config) -> Result> { let pg_config = config.postgres.as_ref().ok_or_else(|| { anyhow::anyhow!( "PostgreSQL locks require PostgreSQL configuration, but none was provided" @@ -45,16 +46,31 @@ impl PostgresLocks { })?; let db_url = format!( - "postgres://{}:{}@{}:{}/{}", + "postgres://{}:{}@{}:{}/{}?connect_timeout=10", pg_config.user, pg_config.password, pg_config.host, pg_config.port, pg_config.dbname ); - debug!("Connecting to Postgres for locks: {}", db_url); + debug!( + "Connecting to Postgres for locks: postgres://{}:****@{}:{}/{}", + pg_config.user, pg_config.host, pg_config.port, pg_config.dbname + ); let pool = PgPoolOptions::new() .max_connections(pg_config.pool_size) + .acquire_timeout(Duration::from_secs(30)) + .idle_timeout(Some(Duration::from_secs(600))) + .max_lifetime(Some(Duration::from_secs(1800))) .connect(&db_url) - .await?; + .await + .context("Failed to connect to PostgreSQL for locks")?; + + // Validate connection works + sqlx::query("SELECT 1") + .execute(&pool) + .await + .context("PostgreSQL locks connection validation failed")?; + + debug!("Successfully validated PostgreSQL locks connection"); Ok(Box::new(PostgresLocks { pool: Arc::new(pool), @@ -86,110 +102,124 @@ struct PostgresLock { #[async_trait] impl Lock for PostgresLock { - async fn acquire_shared<'a>(&'a self) -> Box + Send + 'a> { + async fn acquire_shared<'a>(&'a self) -> Result + Send + 'a>> { // Get connection from pool let mut conn = self .pool .acquire() .await - .expect("Failed to acquire connection for shared lock"); + .context("Failed to acquire connection for shared lock")?; - // Acquire shared advisory lock (returns void, so we use query instead of query_scalar) + // Acquire shared advisory lock + // WARNING: Advisory locks are SESSION-SCOPED and persist when connections return to the pool! + // We MUST explicitly unlock using pg_advisory_unlock_shared before the connection returns. sqlx::query("SELECT pg_advisory_lock_shared($1)") .bind(self.key_hash) .execute(&mut *conn) .await - .expect("Failed to acquire shared lock"); + .context("Failed to acquire shared lock")?; debug!("Acquired shared lock for key: {}", self.key); - Box::new(PostgresSharedLockGuard { + // Return a guard that requires explicit async release + Ok(Box::new(PostgresSharedLockGuard { key: self.key.clone(), key_hash: self.key_hash, - pool: self.pool.clone(), - }) + conn: Some(conn), + })) } - async fn acquire_exclusive<'a>(&'a self) -> Box + Send + 'a> { + async fn acquire_exclusive<'a>( + &'a self, + ) -> Result + Send + 'a>> { // Get connection from pool let mut conn = self .pool .acquire() .await - .expect("Failed to acquire connection for exclusive lock"); + .context("Failed to acquire connection for exclusive lock")?; - // Acquire exclusive advisory lock (returns void, so we use query instead of query_scalar) + // Acquire exclusive advisory lock + // WARNING: Advisory locks are SESSION-SCOPED and persist when connections return to the pool! + // We MUST explicitly unlock using pg_advisory_unlock before the connection returns. sqlx::query("SELECT pg_advisory_lock($1)") .bind(self.key_hash) .execute(&mut *conn) .await - .expect("Failed to acquire exclusive lock"); + .context("Failed to acquire exclusive lock")?; debug!("Acquired exclusive lock for key: {}", self.key); - Box::new(PostgresExclusiveLockGuard { + // Return a guard that requires explicit async release + Ok(Box::new(PostgresExclusiveLockGuard { key: self.key.clone(), key_hash: self.key_hash, - pool: self.pool.clone(), - }) + conn: Some(conn), + })) } } -struct PostgresSharedLockGuard { - #[allow(dead_code)] +/// Wrapper around a shared advisory lock that requires explicit async release +pub struct PostgresSharedLockGuard { key: String, key_hash: i64, - pool: Arc, + conn: Option>, } -impl Drop for PostgresSharedLockGuard { - fn drop(&mut self) { - // Release lock when guard is dropped - let key_hash = self.key_hash; - let pool = self.pool.clone(); - // Spawn background task to release lock - // Note: We can't await in Drop, so we spawn a background task - tokio::spawn(async move { - if let Err(e) = sqlx::query("SELECT pg_advisory_unlock_shared($1)") - .bind(key_hash) - .execute(&*pool) - .await - { - tracing::warn!("Failed to release shared lock: {}", e); +impl<'a> SharedLockGuard<'a> for PostgresSharedLockGuard { + fn release( + mut self: Box, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(async move { + if let Some(mut conn) = self.conn.take() { + sqlx::query("SELECT pg_advisory_unlock_shared($1)") + .bind(self.key_hash) + .execute(&mut *conn) + .await + .context("Failed to release shared advisory lock")?; + debug!("Released shared lock for key: {}", self.key); } - }); + Ok(()) + }) } } -impl<'a> SharedLockGuard<'a> for PostgresSharedLockGuard {} +impl Drop for PostgresSharedLockGuard { + fn drop(&mut self) { + // Drop is called after release() removes the connection, so this is OK + } +} -struct PostgresExclusiveLockGuard { - #[allow(dead_code)] +/// Wrapper around an exclusive advisory lock that requires explicit async release +pub struct PostgresExclusiveLockGuard { key: String, key_hash: i64, - pool: Arc, + conn: Option>, } -impl Drop for PostgresExclusiveLockGuard { - fn drop(&mut self) { - // Release lock when guard is dropped - let key_hash = self.key_hash; - let pool = self.pool.clone(); - // Spawn background task to release lock - // Note: We can't await in Drop, so we spawn a background task - tokio::spawn(async move { - if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") - .bind(key_hash) - .execute(&*pool) - .await - { - tracing::warn!("Failed to release exclusive lock: {}", e); +impl<'a> ExclusiveLockGuard<'a> for PostgresExclusiveLockGuard { + fn release( + mut self: Box, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(async move { + if let Some(mut conn) = self.conn.take() { + sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(self.key_hash) + .execute(&mut *conn) + .await + .context("Failed to release exclusive advisory lock")?; + debug!("Released exclusive lock for key: {}", self.key); } - }); + Ok(()) + }) } } -impl<'a> ExclusiveLockGuard<'a> for PostgresExclusiveLockGuard {} +impl Drop for PostgresExclusiveLockGuard { + fn drop(&mut self) { + // Drop is called after release() removes the connection, so this is OK + } +} #[cfg(test)] mod tests { diff --git a/src/main.rs b/src/main.rs index f4751e3..30959bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use axum::Router; use axum::routing::get; use clap::{Parser, Subcommand}; @@ -97,82 +98,96 @@ enum Commands { }, } -async fn run_server(addr: SocketAddr, app: Router) { - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - axum::serve(listener, app).await.unwrap(); +async fn run_server(addr: SocketAddr, app: Router) -> anyhow::Result<()> { + let listener = tokio::net::TcpListener::bind(addr) + .await + .context("Failed to bind TCP listener")?; + axum::serve(listener, app).await.context("Server error")?; + Ok(()) +} + +/// Create the router with all Filetracker routes +fn create_router(app_state: Arc) -> Router { + Router::new() + .route("/ft/version", get(ft_version)) + .route("/ft/version/", get(ft_version)) + .route("/ft/list/", get(ft_list_files)) + .route("/ft/list/{*path}", get(ft_list_files)) + .route( + "/ft/files/{*path}", + get(ft_get_file) + .head(ft_get_file) + .put(ft_put_file) + .delete(ft_delete_file), + ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) + .route("/health", get(s3dedup::routes::metrics::health_handler)) + .layer( + TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) + .on_response(DefaultOnResponse::new().level(Level::INFO)), + ) + .with_state(app_state) +} + +/// Start background tasks (cleaner and metrics updater) +fn start_background_tasks(app_state: Arc, bucket_config: &config::BucketConfig) { + // Start cleaner + let cleaner = Arc::new(Cleaner::new( + bucket_config.name.clone(), + app_state.kvstorage.clone(), + app_state.s3storage.clone(), + bucket_config.cleaner.clone(), + )); + cleaner.start(); + + // Start metrics updater task + let metrics_state = app_state.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + loop { + interval.tick().await; + if let Err(e) = metrics_state.update_storage_metrics().await { + warn!("Failed to update storage metrics: {}", e); + } + } + }); } -async fn run_s3dedup_server(config_path: Option<&str>, use_env: bool) { +async fn run_s3dedup_server(config_path: Option<&str>, use_env: bool) -> anyhow::Result<()> { let config = if use_env { - config::Config::from_env().unwrap() + config::Config::from_env().context("Failed to load configuration from environment")? } else { - config::Config::new(config_path.unwrap_or("config.json")).unwrap() + config::Config::new(config_path.unwrap_or("config.json")) + .context("Failed to load configuration from file")? }; - s3dedup::logging::setup(&config.logging).unwrap(); - let mut handles = vec![]; - - for bucket in config.buckets.iter() { - info!("Starting server for bucket: {}", bucket.name); - - let app_state = AppState::new(bucket).await.unwrap(); - app_state.kvstorage.lock().await.setup().await.unwrap(); - - // Start cleaner for this bucket - let cleaner = Arc::new(Cleaner::new( - bucket.name.clone(), - app_state.kvstorage.clone(), - app_state.s3storage.clone(), - bucket.cleaner.clone(), - )); - cleaner.start(); - - // Start metrics updater task - let metrics_state = app_state.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); - loop { - interval.tick().await; - if let Err(e) = metrics_state.update_storage_metrics().await { - warn!("Failed to update storage metrics: {}", e); - } - } - }); + s3dedup::logging::setup(&config.logging).context("Failed to setup logging")?; - let app = Router::new() - .route("/ft/version", get(ft_version)) - .route("/ft/version/", get(ft_version)) - .route("/ft/list/", get(ft_list_files)) - .route("/ft/list/{*path}", get(ft_list_files)) - .route( - "/ft/files/{*path}", - get(ft_get_file) - .head(ft_get_file) - .put(ft_put_file) - .delete(ft_delete_file), - ) - .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) - .route( - "/metrics/json", - get(s3dedup::routes::metrics::metrics_json_handler), - ) - .route("/health", get(s3dedup::routes::metrics::health_handler)) - .layer( - // Logging middleware - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response(DefaultOnResponse::new().level(Level::INFO)), - ) - .with_state(app_state); - let address: SocketAddr = format!("{}:{}", bucket.address, bucket.port) - .parse() - .unwrap(); - let handle = tokio::spawn(run_server(address, app)); - handles.push(handle); - } + info!("Starting server for bucket: {}", config.bucket.name); - for handle in handles { - handle.await.unwrap(); - } + let app_state = AppState::new(&config) + .await + .context("Failed to initialize app state")?; + app_state + .kvstorage + .lock() + .await + .setup() + .await + .context("Failed to setup KV storage")?; + + start_background_tasks(app_state.clone(), &config.bucket); + + let app = create_router(app_state); + let address: SocketAddr = format!("{}:{}", config.bucket.address, config.bucket.port) + .parse() + .context("Failed to parse socket address")?; + + run_server(address, app).await } async fn run_migrate( @@ -180,13 +195,14 @@ async fn run_migrate( use_env: bool, filetracker_url: &str, max_concurrency: usize, -) { +) -> anyhow::Result<()> { let config = if use_env { - config::Config::from_env().unwrap() + config::Config::from_env().context("Failed to load configuration from environment")? } else { - config::Config::new(config_path.unwrap_or("config.json")).unwrap() + config::Config::new(config_path.unwrap_or("config.json")) + .context("Failed to load configuration from file")? }; - s3dedup::logging::setup(&config.logging).unwrap(); + s3dedup::logging::setup(&config.logging).context("Failed to setup logging")?; info!("Starting offline migration from old filetracker to s3dedup"); if use_env { @@ -195,31 +211,22 @@ async fn run_migrate( info!("Config file: {}", config_path.unwrap_or("config.json")); } info!("Filetracker URL: {}", filetracker_url); + info!("Bucket: {}", config.bucket.name); info!("Max concurrency: {}", max_concurrency); - // For offline migration, we only migrate the first bucket - if config.buckets.is_empty() { - error!("No buckets configured"); - return; - } - - let bucket_config = &config.buckets[0]; - info!("Migrating to bucket: {}", bucket_config.name); - // Initialize AppState - let app_state = match AppState::new(bucket_config).await { - Ok(state) => state, - Err(e) => { - error!("Failed to initialize app state: {}", e); - return; - } - }; + let app_state = AppState::new(&config) + .await + .context("Failed to initialize app state")?; // Setup KV storage - if let Err(e) = app_state.kvstorage.lock().await.setup().await { - error!("Failed to setup KV storage: {}", e); - return; - } + app_state + .kvstorage + .lock() + .await + .setup() + .await + .context("Failed to setup KV storage")?; // Initialize filetracker client let filetracker_client = Arc::new(s3dedup::filetracker_client::FiletrackerClient::new( @@ -227,26 +234,23 @@ async fn run_migrate( )); // Run migration with specified concurrency - match s3dedup::migration::migrate_all_files(filetracker_client, app_state, max_concurrency) - .await - { - Ok(stats) => { - info!("Migration completed successfully"); - info!("Total files: {}", stats.total_files); - info!("Migrated: {}", stats.migrated); - info!("Skipped: {}", stats.skipped); - info!("Failed: {}", stats.failed); - - if stats.failed > 0 { - warn!("{} files failed to migrate", stats.failed); - std::process::exit(1); - } - } - Err(e) => { - error!("Migration failed: {}", e); - std::process::exit(1); - } + let stats = + s3dedup::migration::migrate_all_files(filetracker_client, app_state, max_concurrency) + .await + .context("Migration failed")?; + + info!("Migration completed successfully"); + info!("Total files: {}", stats.total_files); + info!("Migrated: {}", stats.migrated); + info!("Skipped: {}", stats.skipped); + info!("Failed: {}", stats.failed); + + if stats.failed > 0 { + warn!("{} files failed to migrate", stats.failed); + std::process::exit(1); } + + Ok(()) } async fn run_migrate_v1( @@ -254,13 +258,14 @@ async fn run_migrate_v1( use_env: bool, v1_directory: &str, max_concurrency: usize, -) { +) -> anyhow::Result<()> { let config = if use_env { - config::Config::from_env().unwrap() + config::Config::from_env().context("Failed to load configuration from environment")? } else { - config::Config::new(config_path.unwrap_or("config.json")).unwrap() + config::Config::new(config_path.unwrap_or("config.json")) + .context("Failed to load configuration from file")? }; - s3dedup::logging::setup(&config.logging).unwrap(); + s3dedup::logging::setup(&config.logging).context("Failed to setup logging")?; info!("Starting offline V1 filesystem migration to s3dedup"); if use_env { @@ -269,63 +274,55 @@ async fn run_migrate_v1( info!("Config file: {}", config_path.unwrap_or("config.json")); } info!("V1 directory: {}", v1_directory); + info!("Bucket: {}", config.bucket.name); info!("Max concurrency: {}", max_concurrency); - // For offline migration, we only migrate the first bucket - if config.buckets.is_empty() { - error!("No buckets configured"); - return; - } - - let bucket_config = &config.buckets[0]; - info!("Migrating to bucket: {}", bucket_config.name); - // Initialize AppState - let app_state = match AppState::new(bucket_config).await { - Ok(state) => state, - Err(e) => { - error!("Failed to initialize app state: {}", e); - return; - } - }; + let app_state = AppState::new(&config) + .await + .context("Failed to initialize app state")?; // Setup KV storage - if let Err(e) = app_state.kvstorage.lock().await.setup().await { - error!("Failed to setup KV storage: {}", e); - return; - } + app_state + .kvstorage + .lock() + .await + .setup() + .await + .context("Failed to setup KV storage")?; // Run V1 filesystem migration - match s3dedup::migration::migrate_all_files_from_v1_fs(v1_directory, app_state, max_concurrency) - .await - { - Ok(stats) => { - info!("V1 migration completed successfully"); - info!("Total files: {}", stats.total_files); - info!("Migrated: {}", stats.migrated); - info!("Skipped: {}", stats.skipped); - info!("Failed: {}", stats.failed); - - if stats.failed > 0 { - warn!("{} files failed to migrate", stats.failed); - std::process::exit(1); - } - } - Err(e) => { - error!("V1 migration failed: {}", e); - std::process::exit(1); - } + let stats = + s3dedup::migration::migrate_all_files_from_v1_fs(v1_directory, app_state, max_concurrency) + .await + .context("V1 migration failed")?; + + info!("V1 migration completed successfully"); + info!("Total files: {}", stats.total_files); + info!("Migrated: {}", stats.migrated); + info!("Skipped: {}", stats.skipped); + info!("Failed: {}", stats.failed); + + if stats.failed > 0 { + warn!("{} files failed to migrate", stats.failed); + std::process::exit(1); } + + Ok(()) } -async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurrency: usize) { +async fn run_live_migrate( + config_path: Option<&str>, + use_env: bool, + max_concurrency: usize, +) -> anyhow::Result<()> { let config = if use_env { - config::Config::from_env().unwrap() + config::Config::from_env().context("Failed to load configuration from environment")? } else { - config::Config::new(config_path.unwrap_or("config.json")).unwrap() + config::Config::new(config_path.unwrap_or("config.json")) + .context("Failed to load configuration from file")? }; - s3dedup::logging::setup(&config.logging).unwrap(); - let mut handles = vec![]; + s3dedup::logging::setup(&config.logging).context("Failed to setup logging")?; info!("Starting live migration from old filetracker to s3dedup"); if use_env { @@ -333,117 +330,53 @@ async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurre } else { info!("Config file: {}", config_path.unwrap_or("config.json")); } - info!("Max concurrency per bucket: {}", max_concurrency); - - for bucket in config.buckets.iter() { - // Check if this bucket has a filetracker URL configured - let filetracker_url = match &bucket.filetracker_url { - Some(url) => url.clone(), - None => { - info!( - "Bucket '{}' has no filetracker_url configured, starting in normal mode", - bucket.name - ); - // Start server without migration for this bucket - let app_state = AppState::new(bucket).await.unwrap(); - app_state.kvstorage.lock().await.setup().await.unwrap(); - - let cleaner = Arc::new(s3dedup::cleaner::Cleaner::new( - bucket.name.clone(), - app_state.kvstorage.clone(), - app_state.s3storage.clone(), - bucket.cleaner.clone(), - )); - cleaner.start(); - - // Start metrics updater task - let metrics_state = app_state.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); - loop { - interval.tick().await; - if let Err(e) = metrics_state.update_storage_metrics().await { - warn!("Failed to update storage metrics: {}", e); - } - } - }); - - let app = Router::new() - .route("/ft/version", get(s3dedup::routes::ft::version::ft_version)) - .route( - "/ft/version/", - get(s3dedup::routes::ft::version::ft_version), - ) - .route( - "/ft/list/", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/list/{*path}", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/files/{*path}", - get(s3dedup::routes::ft::get_file::ft_get_file) - .head(s3dedup::routes::ft::get_file::ft_get_file) - .put(s3dedup::routes::ft::put_file::ft_put_file) - .delete(s3dedup::routes::ft::delete_file::ft_delete_file), - ) - .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) - .route( - "/metrics/json", - get(s3dedup::routes::metrics::metrics_json_handler), - ) - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response(DefaultOnResponse::new().level(Level::INFO)), - ) - .with_state(app_state); - - let address: SocketAddr = format!("{}:{}", bucket.address, bucket.port) - .parse() - .unwrap(); - let handle = tokio::spawn(run_server(address, app)); - handles.push(handle); - continue; - } - }; + info!("Bucket: {}", config.bucket.name); + info!("Max concurrency: {}", max_concurrency); + // Check if this bucket has a filetracker URL configured + let filetracker_url = &config.bucket.filetracker_url; + + if filetracker_url.is_none() { + info!( + "Bucket '{}' has no filetracker_url configured, starting in normal server mode (no migration)", + config.bucket.name + ); + } else { info!( "Starting server with live migration for bucket: {} (filetracker: {})", - bucket.name, filetracker_url + config.bucket.name, + filetracker_url.as_ref().unwrap() ); + } - let app_state = AppState::new_with_filetracker(bucket, filetracker_url) + // Initialize AppState + let app_state = if let Some(url) = filetracker_url { + AppState::new_with_filetracker(&config, url.clone()) .await - .unwrap(); - app_state.kvstorage.lock().await.setup().await.unwrap(); - - // Start cleaner for this bucket - let cleaner = Arc::new(s3dedup::cleaner::Cleaner::new( - bucket.name.clone(), - app_state.kvstorage.clone(), - app_state.s3storage.clone(), - bucket.cleaner.clone(), - )); - cleaner.start(); - - // Start metrics updater task - let metrics_state = app_state.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); - loop { - interval.tick().await; - if let Err(e) = metrics_state.update_storage_metrics().await { - warn!("Failed to update storage metrics: {}", e); - } - } - }); + .context("Failed to initialize app state with filetracker")? + } else { + AppState::new(&config) + .await + .context("Failed to initialize app state")? + }; + + app_state + .kvstorage + .lock() + .await + .setup() + .await + .context("Failed to setup KV storage")?; - // Start background migration worker + start_background_tasks(app_state.clone(), &config.bucket); + + // Start background migration worker if filetracker URL is configured + if filetracker_url.is_some() { let migration_app_state = app_state.clone(); - let migration_client = app_state.filetracker_client.clone().unwrap(); + let migration_client = app_state + .filetracker_client + .clone() + .context("Filetracker client not available")?; tokio::spawn(async move { s3dedup::migration::live_migration_worker( migration_client, @@ -452,51 +385,14 @@ async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurre ) .await; }); - - let app = Router::new() - .route("/ft/version", get(s3dedup::routes::ft::version::ft_version)) - .route( - "/ft/version/", - get(s3dedup::routes::ft::version::ft_version), - ) - .route( - "/ft/list/", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/list/{*path}", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/files/{*path}", - get(s3dedup::routes::ft::get_file::ft_get_file) - .head(s3dedup::routes::ft::get_file::ft_get_file) - .put(s3dedup::routes::ft::put_file::ft_put_file) - .delete(s3dedup::routes::ft::delete_file::ft_delete_file), - ) - .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) - .route( - "/metrics/json", - get(s3dedup::routes::metrics::metrics_json_handler), - ) - .route("/health", get(s3dedup::routes::metrics::health_handler)) - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response(DefaultOnResponse::new().level(Level::INFO)), - ) - .with_state(app_state); - - let address: SocketAddr = format!("{}:{}", bucket.address, bucket.port) - .parse() - .unwrap(); - let handle = tokio::spawn(run_server(address, app)); - handles.push(handle); } - for handle in handles { - handle.await.unwrap(); - } + let app = create_router(app_state); + let address: SocketAddr = format!("{}:{}", config.bucket.address, config.bucket.port) + .parse() + .context("Failed to parse socket address")?; + + run_server(address, app).await } async fn run_live_migrate_v1( @@ -505,14 +401,14 @@ async fn run_live_migrate_v1( v1_directory: Option<&str>, filetracker_url: Option<&str>, max_concurrency: usize, -) { +) -> anyhow::Result<()> { let config = if use_env { - config::Config::from_env().unwrap() + config::Config::from_env().context("Failed to load configuration from environment")? } else { - config::Config::new(config_path.unwrap_or("config.json")).unwrap() + config::Config::new(config_path.unwrap_or("config.json")) + .context("Failed to load configuration from file")? }; - s3dedup::logging::setup(&config.logging).unwrap(); - let mut handles = vec![]; + s3dedup::logging::setup(&config.logging).context("Failed to setup logging")?; info!("Starting live migration from V1 filetracker to s3dedup"); if use_env { @@ -520,146 +416,96 @@ async fn run_live_migrate_v1( } else { info!("Config file: {}", config_path.unwrap_or("config.json")); } - info!("Max concurrency per bucket: {}", max_concurrency); + info!("Bucket: {}", config.bucket.name); + info!("Max concurrency: {}", max_concurrency); - for bucket in config.buckets.iter() { - // Determine V1 directory: CLI > config > env - let effective_v1_dir = v1_directory - .or(bucket.filetracker_v1_dir.as_deref()) - .map(|s| s.to_string()); + // Determine V1 directory: CLI > config > env + let effective_v1_dir = v1_directory + .or(config.bucket.filetracker_v1_dir.as_deref()) + .map(|s| s.to_string()); + + // Determine filetracker URL: CLI > config > env + let effective_ft_url = filetracker_url + .or(config.bucket.filetracker_url.as_deref()) + .map(|s| s.to_string()); + + info!( + "V1 migration configuration: v1_dir: {:?}, filetracker_url: {:?}", + effective_v1_dir, effective_ft_url + ); + + // Initialize AppState with filetracker client if URL is provided + let app_state = if let Some(ref ft_url) = effective_ft_url { + info!("Creating app state with V1 filetracker client for HTTP fallback"); + AppState::new_with_filetracker(&config, ft_url.clone()) + .await + .context("Failed to initialize app state with filetracker")? + } else { + AppState::new(&config) + .await + .context("Failed to initialize app state")? + }; + app_state + .kvstorage + .lock() + .await + .setup() + .await + .context("Failed to setup KV storage")?; - // Determine filetracker URL: CLI > config > env - let effective_ft_url = filetracker_url - .or(bucket.filetracker_url.as_deref()) - .map(|s| s.to_string()); + start_background_tasks(app_state.clone(), &config.bucket); - info!( - "Starting server with V1 migration for bucket: {} (v1_dir: {:?}, filetracker_url: {:?})", - bucket.name, effective_v1_dir, effective_ft_url - ); + // Start background V1 filesystem migration worker if v1_directory is provided + if let Some(v1_dir) = effective_v1_dir { + // Set migration_active gauge to indicate migration is in progress + s3dedup::metrics::MIGRATION_ACTIVE.set(1); - // Initialize AppState with filetracker client if URL is provided - let app_state = if let Some(ref ft_url) = effective_ft_url { - info!("Creating app state with V1 filetracker client for HTTP fallback"); - AppState::new_with_filetracker(bucket, ft_url.clone()) - .await - .unwrap() - } else { - AppState::new(bucket).await.unwrap() - }; - app_state.kvstorage.lock().await.setup().await.unwrap(); - - // Start cleaner for this bucket - let cleaner = Arc::new(Cleaner::new( - bucket.name.clone(), - app_state.kvstorage.clone(), - app_state.s3storage.clone(), - bucket.cleaner.clone(), - )); - cleaner.start(); - - // Start metrics updater task - let metrics_state = app_state.clone(); + let migration_app_state = app_state.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); - loop { - interval.tick().await; - if let Err(e) = metrics_state.update_storage_metrics().await { - warn!("Failed to update storage metrics: {}", e); - } - } - }); - - // Start background V1 filesystem migration worker if v1_directory is provided - if let Some(v1_dir) = effective_v1_dir { - // Set migration_active gauge to indicate migration is in progress - s3dedup::metrics::MIGRATION_ACTIVE.set(1); - - let migration_app_state = app_state.clone(); - tokio::spawn(async move { - match s3dedup::migration::migrate_all_files_from_v1_fs( - &v1_dir, - migration_app_state, - max_concurrency, - ) - .await - { - Ok(stats) => { - info!("Background V1 filesystem migration completed successfully"); - info!("Total files: {}", stats.total_files); - info!("Migrated: {}", stats.migrated); - info!("Skipped: {}", stats.skipped); - info!("Failed: {}", stats.failed); - - if stats.failed > 0 { - warn!("{} files failed to migrate", stats.failed); - } - } - Err(e) => { - error!("Background V1 filesystem migration failed: {}", e); + match s3dedup::migration::migrate_all_files_from_v1_fs( + &v1_dir, + migration_app_state, + max_concurrency, + ) + .await + { + Ok(stats) => { + info!("Background V1 filesystem migration completed successfully"); + info!("Total files: {}", stats.total_files); + info!("Migrated: {}", stats.migrated); + info!("Skipped: {}", stats.skipped); + info!("Failed: {}", stats.failed); + + if stats.failed > 0 { + warn!("{} files failed to migrate", stats.failed); } } + Err(e) => { + error!("Background V1 filesystem migration failed: {}", e); + } + } - // Reset migration_active gauge - s3dedup::metrics::MIGRATION_ACTIVE.set(0); - info!("Background V1 filesystem migration worker finished"); - }); - } - - // Create router with all endpoints - let app = Router::new() - .route("/ft/version", get(s3dedup::routes::ft::version::ft_version)) - .route( - "/ft/version/", - get(s3dedup::routes::ft::version::ft_version), - ) - .route( - "/ft/list/", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/list/{*path}", - get(s3dedup::routes::ft::list_files::ft_list_files), - ) - .route( - "/ft/files/{*path}", - get(s3dedup::routes::ft::get_file::ft_get_file) - .head(s3dedup::routes::ft::get_file::ft_get_file) - .put(s3dedup::routes::ft::put_file::ft_put_file) - .delete(s3dedup::routes::ft::delete_file::ft_delete_file), - ) - .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) - .route( - "/metrics/json", - get(s3dedup::routes::metrics::metrics_json_handler), - ) - .route("/health", get(s3dedup::routes::metrics::health_handler)) - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response(DefaultOnResponse::new().level(Level::INFO)), - ) - .with_state(app_state); - - let address: SocketAddr = format!("{}:{}", bucket.address, bucket.port) - .parse() - .unwrap(); - let handle = tokio::spawn(run_server(address, app)); - handles.push(handle); + // Reset migration_active gauge + s3dedup::metrics::MIGRATION_ACTIVE.set(0); + info!("Background V1 filesystem migration worker finished"); + }); } - for handle in handles { - handle.await.unwrap(); - } + let app = create_router(app_state); + let address: SocketAddr = format!("{}:{}", config.bucket.address, config.bucket.port) + .parse() + .context("Failed to parse socket address")?; + + run_server(address, app).await } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); match cli.command { Commands::Server { config, env } => { - run_s3dedup_server(config.as_deref(), env).await; + run_s3dedup_server(config.as_deref(), env).await?; } Commands::Migrate { config, @@ -667,14 +513,14 @@ async fn main() { filetracker_url, max_concurrency, } => { - run_migrate(config.as_deref(), env, &filetracker_url, max_concurrency).await; + run_migrate(config.as_deref(), env, &filetracker_url, max_concurrency).await?; } Commands::LiveMigrate { config, env, max_concurrency, } => { - run_live_migrate(config.as_deref(), env, max_concurrency).await; + run_live_migrate(config.as_deref(), env, max_concurrency).await?; } Commands::MigrateV1 { config, @@ -682,7 +528,7 @@ async fn main() { v1_directory, max_concurrency, } => { - run_migrate_v1(config.as_deref(), env, &v1_directory, max_concurrency).await; + run_migrate_v1(config.as_deref(), env, &v1_directory, max_concurrency).await?; } Commands::LiveMigrateV1 { config, @@ -698,7 +544,9 @@ async fn main() { filetracker_url.as_deref(), max_concurrency, ) - .await; + .await?; } } + + Ok(()) } diff --git a/src/metrics.rs b/src/metrics.rs index 22c3813..fda6ced 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -159,6 +159,33 @@ lazy_static! { "Server uptime in seconds" ) .unwrap(); + + // Database connection pool metrics + pub static ref DB_CONNECTIONS_ACTIVE: IntGauge = register_int_gauge!( + "s3dedup_db_connections_active", + "Number of active database connections" + ) + .unwrap(); + + pub static ref DB_CONNECTIONS_IDLE: IntGauge = register_int_gauge!( + "s3dedup_db_connections_idle", + "Number of idle connections in the pool" + ) + .unwrap(); + + pub static ref DB_CONNECTION_ACQUIRE_DURATION_SECONDS: HistogramVec = register_histogram_vec!( + "s3dedup_db_connection_acquire_duration_seconds", + "Time to acquire a database connection", + &["storage_type"] + ) + .unwrap(); + + pub static ref DB_CONNECTION_ACQUIRE_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_db_connection_acquire_errors_total", + "Total failed database connection acquisitions", + &["storage_type", "reason"] + ) + .unwrap(); } #[derive(Clone)] diff --git a/src/migration/mod.rs b/src/migration/mod.rs index a8a19e5..c132c2d 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -174,7 +174,10 @@ pub async fn migrate_single_file_from_metadata( let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); let locks = &app_state.locks; let lock = locks.prepare_lock(lock_key).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock + .acquire_exclusive() + .await + .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) let current_modified_after_lock = app_state @@ -186,6 +189,7 @@ pub async fn migrate_single_file_from_metadata( if current_modified_after_lock >= file_metadata.last_modified { // File was migrated by another concurrent task, skip + let _ = guard.release().await; return Ok(()); } @@ -272,6 +276,8 @@ pub async fn migrate_single_file_from_metadata( .await .set_modified(&app_state.bucket_name, path, file_metadata.last_modified) .await?; + + let _ = guard.release().await; Ok(()) } @@ -315,7 +321,10 @@ async fn migrate_single_file( let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); let locks_storage = &app_state.locks; let lock = locks_storage.prepare_lock(lock_key).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock + .acquire_exclusive() + .await + .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) let current_modified_after_lock = app_state @@ -327,6 +336,7 @@ async fn migrate_single_file( if current_modified_after_lock >= file_metadata.last_modified { // File was migrated by another concurrent task, skip + let _ = guard.release().await; return Ok(false); } @@ -414,6 +424,7 @@ async fn migrate_single_file( .set_modified(&app_state.bucket_name, path, file_metadata.last_modified) .await?; + let _ = guard.release().await; Ok(true) } @@ -654,7 +665,10 @@ async fn migrate_single_file_from_v1_fs( let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); let locks_storage = &app_state.locks; let lock = locks_storage.prepare_lock(lock_key).await; - let _guard = lock.acquire_exclusive().await; + let guard = lock + .acquire_exclusive() + .await + .context("Failed to acquire exclusive lock for migration")?; // Recheck if file was already migrated after acquiring lock (race condition protection) let current_modified_after_lock = app_state @@ -666,6 +680,7 @@ async fn migrate_single_file_from_v1_fs( if current_modified_after_lock >= file_info.last_modified { // File was migrated by another concurrent task, skip + let _ = guard.release().await; return Ok(false); } @@ -761,5 +776,6 @@ async fn migrate_single_file_from_v1_fs( .set_modified(&app_state.bucket_name, path, file_info.last_modified) .await?; + let _ = guard.release().await; Ok(true) } diff --git a/src/routes/ft/delete_file.rs b/src/routes/ft/delete_file.rs index c863bd1..096bc89 100644 --- a/src/routes/ft/delete_file.rs +++ b/src/routes/ft/delete_file.rs @@ -51,7 +51,17 @@ pub async fn ft_delete_file( let lock_key = locks::file_lock(&state.bucket_name, path); let locks_storage = &state.locks; let lock = locks_storage.prepare_lock(lock_key).await; - let _guard = lock.acquire_exclusive().await; + let guard = match lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + error!("Failed to acquire exclusive lock: {}", e); + record_metrics("500"); + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("Failed to acquire lock".to_string()) + .unwrap(); + } + }; // 3. Check if file exists let current_modified = state @@ -63,6 +73,7 @@ pub async fn ft_delete_file( if current_modified.is_err() { error!("Failed to get current modified"); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get current modified".to_string()) @@ -74,6 +85,7 @@ pub async fn ft_delete_file( if current_modified == 0 { debug!("File {} not found", path); record_metrics("404"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::NOT_FOUND) .body("File not found".to_string()) @@ -87,6 +99,7 @@ pub async fn ft_delete_file( path, timestamp, current_modified ); record_metrics("200"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::OK) .body("".to_string()) @@ -103,6 +116,7 @@ pub async fn ft_delete_file( if hash.is_err() { error!("Failed to get ref file"); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get ref file".to_string()) @@ -113,6 +127,7 @@ pub async fn ft_delete_file( if hash.is_empty() { error!("File {} has no hash reference", path); record_metrics("404"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::NOT_FOUND) .body("File has no hash reference".to_string()) @@ -131,6 +146,7 @@ pub async fn ft_delete_file( Err(e) => { error!("Failed to decrement ref count: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to decrement ref count".to_string()) @@ -166,6 +182,7 @@ pub async fn ft_delete_file( { error!("Failed to delete ref file: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to delete ref file".to_string()) @@ -181,6 +198,7 @@ pub async fn ft_delete_file( { error!("Failed to delete modified time: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to delete modified time".to_string()) @@ -189,6 +207,9 @@ pub async fn ft_delete_file( debug!("Deleted file {}", path); + // Release lock early since all critical metadata operations are complete + let _ = guard.release().await; + // 9. Dual-delete from filetracker if in live migration mode if let Some(filetracker_client) = &state.filetracker_client { debug!("Live migration mode: also deleting from filetracker"); diff --git a/src/routes/ft/get_file.rs b/src/routes/ft/get_file.rs index 506a756..3cb2bff 100644 --- a/src/routes/ft/get_file.rs +++ b/src/routes/ft/get_file.rs @@ -21,7 +21,23 @@ pub async fn ft_get_file( let lock_key = locks::file_lock(&state.bucket_name, path); let locks_storage = &state.locks; let lock = locks_storage.prepare_lock(lock_key).await; - let guard = lock.acquire_shared().await; + let guard = match lock.acquire_shared().await { + Ok(g) => g, + Err(e) => { + error!("Failed to acquire shared lock: {}", e); + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "500"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap(); + } + }; // 2. Check if file exists and get metadata let modified_time = state @@ -39,6 +55,7 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -61,9 +78,9 @@ pub async fn ft_get_file( .with_label_values(&[&state.bucket_name]) .inc(); - // Drop the shared lock before migration to avoid deadlock + // Release the shared lock before migration to avoid deadlock // (migration needs exclusive lock on the same key) - drop(guard); + let _ = guard.release().await; // Migrate the file on-the-fly using migration logic let result = crate::migration::migrate_single_file_from_metadata( @@ -152,6 +169,7 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -168,6 +186,7 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); + let _ = guard.release().await; return Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()) @@ -190,6 +209,7 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -208,6 +228,7 @@ pub async fn ft_get_file( .with_label_values(&["GET", "/ft/files"]) .observe(start.elapsed().as_secs_f64()); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -215,7 +236,7 @@ pub async fn ft_get_file( } let blob_data = blob_data.unwrap(); - drop(guard); + let _ = guard.release().await; // 6. Record metrics metrics::HTTP_REQUESTS_TOTAL diff --git a/src/routes/ft/put_file.rs b/src/routes/ft/put_file.rs index f2ed4d1..c7368f0 100644 --- a/src/routes/ft/put_file.rs +++ b/src/routes/ft/put_file.rs @@ -94,7 +94,17 @@ pub async fn ft_put_file( let lock_key = locks::file_lock(&state.bucket_name, path); let locks_storage = &state.locks; let lock = locks_storage.prepare_lock(lock_key).await; - let _guard = lock.acquire_exclusive().await; + let guard = match lock.acquire_exclusive().await { + Ok(g) => g, + Err(e) => { + error!("Failed to acquire exclusive lock: {}", e); + record_metrics("500"); + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("Failed to acquire lock".to_string()) + .unwrap(); + } + }; // 5. Check existing version (matching original logic) let current_modified = state @@ -106,6 +116,7 @@ pub async fn ft_put_file( if current_modified.is_err() { error!("Failed to get current modified"); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to get current modified".to_string()) @@ -120,6 +131,7 @@ pub async fn ft_put_file( path, timestamp, current_modified ); record_metrics("200"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/plain") @@ -145,6 +157,7 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to decompress gzip data: {}", e); record_metrics("400"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::BAD_REQUEST) .body("Failed to decompress gzip data".to_string()) @@ -167,6 +180,7 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to compress data: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to compress data".to_string()) @@ -188,6 +202,7 @@ pub async fn ft_put_file( Err(e) => { error!("Failed to check object existence: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to check object existence".to_string()) @@ -219,6 +234,7 @@ pub async fn ft_put_file( { error!("Failed to store object in S3: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to store object".to_string()) @@ -235,6 +251,7 @@ pub async fn ft_put_file( { error!("Failed to store compressed size: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to store compressed size".to_string()) @@ -252,6 +269,7 @@ pub async fn ft_put_file( { error!("Failed to store logical size: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to store logical size".to_string()) @@ -290,6 +308,7 @@ pub async fn ft_put_file( { error!("Failed to increment ref count: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to increment ref count".to_string()) @@ -332,6 +351,7 @@ pub async fn ft_put_file( { error!("Failed to set ref file: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to set ref file".to_string()) @@ -347,6 +367,7 @@ pub async fn ft_put_file( { error!("Failed to set modified time: {}", e); record_metrics("500"); + let _ = guard.release().await; return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Failed to set modified time".to_string()) @@ -355,6 +376,9 @@ pub async fn ft_put_file( debug!("Created link {}.", path); + // Release lock early since all critical metadata operations are complete + let _ = guard.release().await; + // 9. Dual-write to filetracker if in live migration mode if let Some(filetracker_client) = &state.filetracker_client { debug!("Live migration mode: also writing to filetracker"); diff --git a/tests/cleaner_test.rs b/tests/cleaner_test.rs index 17f49a3..e800c06 100644 --- a/tests/cleaner_test.rs +++ b/tests/cleaner_test.rs @@ -1,5 +1,5 @@ use s3dedup::cleaner::{Cleaner, CleanerConfig}; -use s3dedup::config::BucketConfig; +use s3dedup::config::Config; use s3dedup::kvstorage::KVStorage; use s3dedup::s3storage::S3Storage; use std::sync::Arc; @@ -7,29 +7,35 @@ use tokio::sync::Mutex; // Helper to check if MinIO is available async fn is_minio_available() -> bool { - let bucket_config = create_test_bucket_config("health_check"); - S3Storage::new(&bucket_config).await.is_ok() + let config = create_test_config("health_check"); + S3Storage::new(&config.bucket).await.is_ok() } -// Helper to create test bucket config -fn create_test_bucket_config(bucket_name: &str) -> BucketConfig { +// Helper to create test config +fn create_test_config(bucket_name: &str) -> Config { let config_str = format!( r#"{{ - "name": "{}", - "address": "0.0.0.0", - "port": 3000, + "logging": {{ + "level": "info", + "json": false + }}, "kvstorage_type": "sqlite", "sqlite": {{ "path": ":memory:", "pool_size": 5 }}, "locks_type": "memory", - "s3storage_type": "minio", - "minio": {{ - "endpoint": "http://localhost:9000", - "access_key": "minioadmin", - "secret_key": "minioadmin", - "force_path_style": true + "bucket": {{ + "name": "{}", + "address": "0.0.0.0", + "port": 3000, + "s3storage_type": "minio", + "minio": {{ + "endpoint": "http://localhost:9000", + "access_key": "minioadmin", + "secret_key": "minioadmin", + "force_path_style": true + }} }} }}"#, bucket_name @@ -46,10 +52,10 @@ async fn setup_test_env( Arc>>, String, ) { - let bucket_config = create_test_bucket_config(bucket_name); + let config = create_test_config(bucket_name); - let kvstorage = KVStorage::new(&bucket_config).await.unwrap(); - let s3storage = S3Storage::new(&bucket_config).await.unwrap(); + let kvstorage = KVStorage::new(&config).await.unwrap(); + let s3storage = S3Storage::new(&config.bucket).await.unwrap(); let kvstorage = Arc::new(Mutex::new(kvstorage)); let s3storage = Arc::new(Mutex::new(s3storage)); diff --git a/tests/integration_test.rs b/tests/integration_test.rs index f47e323..25251f9 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -16,7 +16,9 @@ async fn create_test_app() -> Router { // Helper to create test app with access to app state for S3 verification async fn create_test_app_with_state() -> (Router, Arc) { use s3dedup::AppState; - use s3dedup::config::{BucketConfig, KVStorageType, MinIOConfig, PostgresConfig, SQLiteConfig}; + use s3dedup::config::{ + BucketConfig, Config, KVStorageType, MinIOConfig, PostgresConfig, SQLiteConfig, + }; use s3dedup::kvstorage::KVStorage; use s3dedup::locks::LocksStorage; use s3dedup::s3storage::S3Storage; @@ -67,14 +69,10 @@ async fn create_test_app_with_state() -> (Router, Arc) { ) }; - let config = BucketConfig { + let bucket_config = BucketConfig { name: test_bucket.clone(), address: "127.0.0.1".to_string(), port: 3001, - kvstorage_type, - sqlite: sqlite_config, - postgres: postgres_config, - locks_type: s3dedup::locks::LocksType::Memory, s3storage_type: s3dedup::s3storage::S3StorageType::MinIO, minio: Some(MinIOConfig { endpoint: "http://localhost:9000".to_string(), @@ -87,14 +85,26 @@ async fn create_test_app_with_state() -> (Router, Arc) { filetracker_v1_dir: None, }; + let config = Config { + logging: s3dedup::logging::LoggingConfig { + level: "info".to_string(), + json: false, + }, + kvstorage_type, + sqlite: sqlite_config, + postgres: postgres_config, + locks_type: s3dedup::locks::LocksType::Memory, + bucket: bucket_config, + }; + let kvstorage = KVStorage::new(&config).await.unwrap(); let locks = LocksStorage::new_with_config(config.locks_type, &config) .await .unwrap(); - let s3storage = S3Storage::new(&config).await.unwrap(); + let s3storage = S3Storage::new(&config.bucket).await.unwrap(); let app_state = Arc::new(AppState { - bucket_name: config.name, + bucket_name: config.bucket.name.clone(), kvstorage: Arc::new(Mutex::new(kvstorage)), locks, s3storage: Arc::new(Mutex::new(s3storage)), diff --git a/tests/metrics_test.rs b/tests/metrics_test.rs index b04ed88..3ea8974 100644 --- a/tests/metrics_test.rs +++ b/tests/metrics_test.rs @@ -3,7 +3,7 @@ use axum::body::Body; use axum::http::{Request, StatusCode}; use axum::routing::get; use s3dedup::AppState; -use s3dedup::config::BucketConfig; +use s3dedup::config::{BucketConfig, Config}; use std::sync::Arc; use tower::util::ServiceExt; @@ -22,17 +22,10 @@ async fn create_test_app_state() -> Arc { std::fs::create_dir_all("db").ok(); - let config = BucketConfig { + let bucket_config = BucketConfig { name: test_bucket, address: "127.0.0.1".to_string(), port: 3000, - kvstorage_type: s3dedup::config::KVStorageType::SQLite, - sqlite: Some(s3dedup::config::SQLiteConfig { - path: format!("db/test-metrics-{}.db", unique_id), - pool_size: 10, - }), - postgres: None, - locks_type: s3dedup::config::LocksType::Memory, s3storage_type: s3dedup::config::S3StorageType::MinIO, minio: Some(s3dedup::config::MinIOConfig { endpoint: "http://localhost:9000".to_string(), @@ -45,6 +38,21 @@ async fn create_test_app_state() -> Arc { filetracker_v1_dir: None, }; + let config = Config { + logging: s3dedup::logging::LoggingConfig { + level: "info".to_string(), + json: false, + }, + kvstorage_type: s3dedup::config::KVStorageType::SQLite, + sqlite: Some(s3dedup::config::SQLiteConfig { + path: format!("db/test-metrics-{}.db", unique_id), + pool_size: 10, + }), + postgres: None, + locks_type: s3dedup::config::LocksType::Memory, + bucket: bucket_config, + }; + let app_state = AppState::new(&config).await.unwrap(); app_state.kvstorage.lock().await.setup().await.unwrap(); @@ -256,17 +264,10 @@ async fn test_migration_active_metric() { std::fs::create_dir_all("db").ok(); // Create app with filetracker client (migration mode) - let config = BucketConfig { + let bucket_config = BucketConfig { name: test_bucket, address: "127.0.0.1".to_string(), port: 3000, - kvstorage_type: s3dedup::config::KVStorageType::SQLite, - sqlite: Some(s3dedup::config::SQLiteConfig { - path: format!("db/test-migration-{}.db", unique_id), - pool_size: 10, - }), - postgres: None, - locks_type: s3dedup::config::LocksType::Memory, s3storage_type: s3dedup::config::S3StorageType::MinIO, minio: Some(s3dedup::config::MinIOConfig { endpoint: "http://localhost:9000".to_string(), @@ -279,6 +280,21 @@ async fn test_migration_active_metric() { filetracker_v1_dir: None, }; + let config = Config { + logging: s3dedup::logging::LoggingConfig { + level: "info".to_string(), + json: false, + }, + kvstorage_type: s3dedup::config::KVStorageType::SQLite, + sqlite: Some(s3dedup::config::SQLiteConfig { + path: format!("db/test-migration-{}.db", unique_id), + pool_size: 10, + }), + postgres: None, + locks_type: s3dedup::config::LocksType::Memory, + bucket: bucket_config, + }; + let app_state = AppState::new_with_filetracker(&config, "http://localhost:8000".to_string()) .await .unwrap(); diff --git a/tests/migration_test.rs b/tests/migration_test.rs index 4d36ce9..9ff11ec 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -3,7 +3,7 @@ use axum::body::Body; use axum::http::{Response, StatusCode}; use axum::routing::get; use s3dedup::AppState; -use s3dedup::config::BucketConfig; +use s3dedup::config::{BucketConfig, Config}; use s3dedup::filetracker_client::FiletrackerClient; use s3dedup::migration::migrate_all_files; use std::collections::HashMap; @@ -184,17 +184,10 @@ async fn create_test_app_state() -> Arc { let test_db = format!("db/test_migration_{}.db", unique_id); let test_bucket = format!("test-migration-{}", unique_id.to_lowercase()); - let config = BucketConfig { + let bucket_config = BucketConfig { name: test_bucket.clone(), address: "127.0.0.1".to_string(), port: 3001, - kvstorage_type: KVStorageType::SQLite, - sqlite: Some(SQLiteConfig { - path: test_db.clone(), - pool_size: 50, - }), - postgres: None, - locks_type: s3dedup::locks::LocksType::Memory, s3storage_type: s3dedup::s3storage::S3StorageType::MinIO, minio: Some(MinIOConfig { endpoint: "http://localhost:9000".to_string(), @@ -207,14 +200,29 @@ async fn create_test_app_state() -> Arc { filetracker_v1_dir: None, }; + let config = Config { + logging: s3dedup::logging::LoggingConfig { + level: "info".to_string(), + json: false, + }, + kvstorage_type: KVStorageType::SQLite, + sqlite: Some(SQLiteConfig { + path: test_db.clone(), + pool_size: 50, + }), + postgres: None, + locks_type: s3dedup::locks::LocksType::Memory, + bucket: bucket_config, + }; + let kvstorage = KVStorage::new(&config).await.unwrap(); let locks = LocksStorage::new_with_config(config.locks_type, &config) .await .unwrap(); - let s3storage = S3Storage::new(&config).await.unwrap(); + let s3storage = S3Storage::new(&config.bucket).await.unwrap(); let app_state = Arc::new(AppState { - bucket_name: config.name, + bucket_name: config.bucket.name.clone(), kvstorage: Arc::new(tokio::sync::Mutex::new(kvstorage)), locks, s3storage: Arc::new(tokio::sync::Mutex::new(s3storage)), diff --git a/tests/postgres_locks_test.rs b/tests/postgres_locks_test.rs index c3bac40..18a162f 100644 --- a/tests/postgres_locks_test.rs +++ b/tests/postgres_locks_test.rs @@ -7,20 +7,37 @@ mod postgres_locks_tests { //! //! NOTE: These tests require a running PostgreSQL instance with the DATABASE_URL environment variable set. //! If DATABASE_URL is not set, the tests are skipped. - use s3dedup::config::{BucketConfig, KVStorageType, MinIOConfig, PostgresConfig}; + use s3dedup::config::{BucketConfig, Config, KVStorageType, MinIOConfig, PostgresConfig}; use s3dedup::locks::{LocksStorage, LocksType}; use std::sync::Arc; - fn get_postgres_config() -> Option { + fn get_postgres_config() -> Option { // Only run PostgreSQL tests if DATABASE_URL is set if std::env::var("DATABASE_URL").is_err() { return None; } - Some(BucketConfig { + let bucket_config = BucketConfig { name: "test-postgres-locks".to_string(), address: "127.0.0.1".to_string(), port: 3001, + s3storage_type: s3dedup::s3storage::S3StorageType::MinIO, + minio: Some(MinIOConfig { + endpoint: "http://localhost:9000".to_string(), + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + force_path_style: true, + }), + cleaner: s3dedup::cleaner::CleanerConfig::default(), + filetracker_url: None, + filetracker_v1_dir: None, + }; + + Some(Config { + logging: s3dedup::logging::LoggingConfig { + level: "info".to_string(), + json: false, + }, kvstorage_type: KVStorageType::Postgres, sqlite: None, postgres: Some(PostgresConfig { @@ -32,16 +49,7 @@ mod postgres_locks_tests { pool_size: 10, }), locks_type: LocksType::Postgres, - s3storage_type: s3dedup::s3storage::S3StorageType::MinIO, - minio: Some(MinIOConfig { - endpoint: "http://localhost:9000".to_string(), - access_key: "minioadmin".to_string(), - secret_key: "minioadmin".to_string(), - force_path_style: true, - }), - cleaner: s3dedup::cleaner::CleanerConfig::default(), - filetracker_url: None, - filetracker_v1_dir: None, + bucket: bucket_config, }) } @@ -86,7 +94,10 @@ mod postgres_locks_tests { // First exclusive lock should acquire successfully let lock1 = locks.prepare_lock(lock_key.clone()).await; - let guard1 = lock1.acquire_exclusive().await; + let guard1 = lock1 + .acquire_exclusive() + .await + .expect("Should acquire first exclusive lock"); // Spawn a task to try to acquire the same lock let locks_for_task = locks.clone(); @@ -95,8 +106,12 @@ mod postgres_locks_tests { let task = tokio::spawn(async move { // This should block until guard1 is released let lock2 = locks_for_task.prepare_lock(lock_key_clone).await; - let _guard2 = lock2.acquire_exclusive().await; + let guard2 = lock2 + .acquire_exclusive() + .await + .expect("Should acquire second exclusive lock"); // If we get here, the lock was acquired (after guard1 was dropped) + let _ = guard2.release().await; true }); @@ -109,8 +124,8 @@ mod postgres_locks_tests { "Lock should be held and task should be waiting" ); - // Drop the first lock - drop(guard1); + // Release the first lock explicitly (required for PostgreSQL locks) + let _ = guard1.release().await; // Now the task should be able to acquire the lock let result = tokio::time::timeout(std::time::Duration::from_secs(5), task) @@ -145,12 +160,18 @@ mod postgres_locks_tests { let lock1 = locks.prepare_lock(lock_key.clone()).await; let lock2 = locks.prepare_lock(lock_key.clone()).await; - let guard1 = lock1.acquire_shared().await; - let guard2 = lock2.acquire_shared().await; + let guard1 = lock1 + .acquire_shared() + .await + .expect("Should acquire shared lock"); + let guard2 = lock2 + .acquire_shared() + .await + .expect("Should acquire shared lock"); // Both guards are held - this should not deadlock - drop(guard1); - drop(guard2); + let _ = guard1.release().await; + let _ = guard2.release().await; } #[tokio::test] @@ -175,7 +196,10 @@ mod postgres_locks_tests { // Acquire an exclusive lock let lock1 = locks.prepare_lock(lock_key.clone()).await; - let guard1 = lock1.acquire_exclusive().await; + let guard1 = lock1 + .acquire_exclusive() + .await + .expect("Should acquire exclusive lock"); // Try to acquire a shared lock in another task let locks_clone = locks.clone(); @@ -183,7 +207,11 @@ mod postgres_locks_tests { let task = tokio::spawn(async move { let lock2 = locks_clone.prepare_lock(lock_key_clone).await; - let _guard2 = lock2.acquire_shared().await; + let guard2 = lock2 + .acquire_shared() + .await + .expect("Should acquire shared lock"); + let _ = guard2.release().await; true }); @@ -196,8 +224,8 @@ mod postgres_locks_tests { "Shared lock should be blocked by exclusive lock" ); - // Drop the exclusive lock - drop(guard1); + // Release the exclusive lock explicitly + let _ = guard1.release().await; // Now the task should be able to acquire the shared lock let result = tokio::time::timeout(std::time::Duration::from_secs(5), task) @@ -235,14 +263,20 @@ mod postgres_locks_tests { let lock1 = locks.prepare_lock(lock_key1).await; let lock2 = locks.prepare_lock(lock_key2).await; - let guard1 = lock1.acquire_exclusive().await; + let guard1 = lock1 + .acquire_exclusive() + .await + .expect("Should acquire exclusive lock"); // Should be able to acquire exclusive lock on different key immediately - let guard2 = lock2.acquire_exclusive().await; + let guard2 = lock2 + .acquire_exclusive() + .await + .expect("Should acquire exclusive lock"); // Both locks should be held independently - drop(guard1); - drop(guard2); + let _ = guard1.release().await; + let _ = guard2.release().await; } #[tokio::test] @@ -268,18 +302,25 @@ mod postgres_locks_tests { // Acquire and release lock in a scope { let lock1 = locks.prepare_lock(lock_key.clone()).await; - let _guard1 = lock1.acquire_exclusive().await; - // Guard is dropped here + let guard1 = lock1 + .acquire_exclusive() + .await + .expect("Should acquire lock"); + // Explicitly release before scope ends + let _ = guard1.release().await; } - // Give time for the background task to release the lock - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Give time for the connection to be returned to the pool + tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Should be able to acquire the lock immediately let lock2 = locks.prepare_lock(lock_key.clone()).await; - let guard2 = lock2.acquire_exclusive().await; + let guard2 = lock2 + .acquire_exclusive() + .await + .expect("Should acquire lock after release"); // If we get here, the lock was successfully released and reacquired - drop(guard2); + let _ = guard2.release().await; } }