diff --git a/packages/edge/infra/client/manager/Cargo.toml b/packages/edge/infra/client/manager/Cargo.toml index 7607d187d1..e23ac05b56 100644 --- a/packages/edge/infra/client/manager/Cargo.toml +++ b/packages/edge/infra/client/manager/Cargo.toml @@ -30,7 +30,6 @@ rand = "0.8" rand_chacha = "0.3.1" reqwest = { version = "0.12", default-features = false, features = ["stream", "rustls-tls", "json"] } rivet-logs.workspace = true -scc = "2.3.4" serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" serde_yaml = "0.9.34" diff --git a/packages/edge/infra/client/manager/src/ctx.rs b/packages/edge/infra/client/manager/src/ctx.rs index faaa8028dd..b8d7b3ff39 100644 --- a/packages/edge/infra/client/manager/src/ctx.rs +++ b/packages/edge/infra/client/manager/src/ctx.rs @@ -975,8 +975,7 @@ impl Ctx { // Delete entries that aren't in our valid images table let deleted = sqlx::query(indoc!( " - DELETE - FROM images_cache + DELETE FROM images_cache WHERE image_id NOT IN ( SELECT image_id FROM __valid_images ) diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 0a8e051dbd..9ea90150c9 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -1,7 +1,9 @@ use std::{ + collections::{hash_map::Entry, HashMap}, hash::{DefaultHasher, Hasher}, io::ErrorKind, result::Result::Ok, + sync::Arc, time::Instant, }; @@ -10,10 +12,8 @@ use indoc::indoc; use pegboard::protocol; use rand::{prelude::SliceRandom, SeedableRng}; use rand_chacha::ChaCha12Rng; -use scc::hash_map::Entry; use sqlx::Acquire; -use tokio::fs; -use tokio::process::Command; +use tokio::{fs, process::Command, sync::Mutex}; use url::Url; use uuid::Uuid; @@ -23,249 +23,302 @@ use crate::{metrics, pull_addr_handler::PullAddrHandler, utils, Ctx}; /// it exists. pub struct ImageDownloadHandler { pull_addr_handler: PullAddrHandler, - // This is not a Set because it uses SCC's entry locking capability to function. - downloads: scc::HashMap, + downloads: Mutex>>>, } impl ImageDownloadHandler { pub fn new() -> Self { ImageDownloadHandler { pull_addr_handler: PullAddrHandler::new(), - downloads: scc::HashMap::new(), + downloads: Mutex::new(HashMap::new()), } } pub async fn download(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> { metrics::IMAGE_DOWNLOAD_REQUEST_TOTAL.inc(); - match self.downloads.entry_async(image_config.id).await { - // The image download started at some point in the current runtime and finished downloading - Entry::Occupied(_) => { - tracing::debug!(image_id=?image_config.id, "image already downloaded"); + let mut downloads_guard = self.downloads.lock().await; - // Update LRU cache - sqlx::query(indoc!( - " - UPDATE images_cache - SET last_used_ts = ?2 - WHERE image_id = ?1 - ", - )) - .bind(image_config.id) - .bind(utils::now()) - .execute(&mut *ctx.sql().await?) - .await?; - } + let entry_lock = match downloads_guard.entry(image_config.id) { + // The image download started at some point in the current runtime + Entry::Occupied(entry) => entry.get().clone(), // The image is not currently being downloaded Entry::Vacant(entry) => { - // Check database for image - let row = sqlx::query_as::<_, (i64,)>(indoc!( - " - SELECT 1 - FROM images_cache - WHERE image_id = ?1 AND download_complete_ts IS NOT NULL - ", - )) - .bind(image_config.id) - .fetch_optional(&mut *ctx.sql().await?) - .await?; - - // Image exists and is downloaded - if row.is_some() { - tracing::debug!(image_id=?image_config.id, "image already downloaded"); - return Ok(()); - } + let entry = entry.insert_entry(Arc::new(Mutex::new(false))); - // Image does not exist/wasn't fully downloaded and isn't currently downloading, continue - metrics::IMAGE_DOWNLOAD_CACHE_MISS_TOTAL.inc(); + entry.get().clone() + } + }; - let start_instant = Instant::now(); - tracing::info!(image_id=?image_config.id, "downloading image"); + drop(downloads_guard); - let image_path = ctx.image_path(image_config.id); + let mut entry_guard = entry_lock.lock().await; - // Clear any previous content and make image dir - match fs::remove_dir_all(&image_path).await { - Err(e) if e.kind() == ErrorKind::NotFound => {} - res => res.context("failed to delete image dir")?, - } - fs::create_dir(&image_path) - .await - .context("failed to create image dir")?; - - // NOTE: Txn here so that we prune and insert the new image row at the same time. This ensures - // if another image is downloading concurrently that it will calculate the correct images - // dir size. - let mut conn = ctx.sql().await?; - let mut tx = conn.begin().await?; - - let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!( - async { - // Get total size of images directory. Note that it doesn't matter if this doesn't - // match the actual fs size because it should either be exactly at or below actual fs - // size. Also calculating fs size manually is expensive. - sqlx::query_as::<_, (i64, i64)>(indoc!( - " - SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache - ", - )) - .fetch_one(&mut *tx) - .await - .map_err(Into::::into) - }, - // NOTE: The image size here is somewhat misleading because its only the size of the - // downloaded archive and not the total disk usage after it is unpacked. However, this is - // good enough - self.fetch_image_download_size(ctx, image_config), - )?; - - // Prune images - let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size - > ctx.config().images.max_cache_size() - { - // Fetch as many images as it takes to clear up enough space for this new image. - // Ordered by LRU - let rows = sqlx::query_as::<_, (Uuid, i64)>(indoc!( - " - WITH - cumulative_sizes AS ( - SELECT - ic.image_id, - ic.size, - ic.last_used_ts, - SUM(ic.size) - OVER (ORDER BY ic.last_used_ts ROWS UNBOUNDED PRECEDING) - AS running_total - FROM images_cache AS ic - LEFT JOIN actors AS a - -- Filter out images that are currently in use by actors - ON - ic.image_id = a.image_id AND - a.stop_ts IS NULL - WHERE - -- Filter out current image, will be upserted - ic.image_id != ?1 AND - a.image_id IS NULL - ORDER BY ic.last_used_ts - ) - SELECT image_id, size - FROM cumulative_sizes - WHERE running_total - size < ?2 - ORDER BY last_used_ts - ", - )) - .bind(image_config.id) - .bind( - (images_dir_size as u64) - .saturating_add(image_download_size) - .saturating_sub(ctx.config().images.max_cache_size()) as i64, - ) - .fetch_all(&mut *tx) - .await?; - - let rows_len = rows.len(); - - if rows.is_empty() { - tracing::error!( - image_id=?image_config.id, - "no inactive images to delete to make space for new image, downloading anyway", - ); - } else { - tracing::debug!(count=?rows_len, "cache full, clearing LRU entries"); - } + // The image was successfully downloaded before + if *entry_guard { + tracing::debug!(image_id=?image_config.id, "image already downloaded"); - let mut total_removed_bytes = 0; + drop(entry_guard); - for (image_id, size) in rows { - total_removed_bytes += size; + // Update LRU cache + sqlx::query(indoc!( + " + UPDATE images_cache + SET last_used_ts = ?2 + WHERE image_id = ?1 + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .execute(&mut *ctx.sql().await?) + .await?; - // NOTE: The sql query does not return the current image id so there is no chance - // for a deadlock here - // Acquire lock on entry - let entry = self.downloads.entry_async(image_id).await; + return Ok(()); + } - match fs::remove_dir_all(ctx.image_path(image_id)).await { - Err(e) if e.kind() == ErrorKind::NotFound => {} - res => res.context("failed to delete image dir")?, - } + // Check database for image + let row = sqlx::query_as::<_, (i64,)>(indoc!( + " + SELECT 1 + FROM images_cache + WHERE image_id = ?1 AND download_complete_ts IS NOT NULL + ", + )) + .bind(image_config.id) + .fetch_optional(&mut *ctx.sql().await?) + .await?; + + // Image exists and is downloaded + if row.is_some() { + tracing::debug!(image_id=?image_config.id, "image already downloaded"); + + *entry_guard = true; + + return Ok(()); + } - // Remove entry and release lock - if let Entry::Occupied(entry) = entry { - let _ = entry.remove(); - } - } + // Image does not exist/wasn't fully downloaded and isn't currently downloading, continue + metrics::IMAGE_DOWNLOAD_CACHE_MISS_TOTAL.inc(); - (rows_len as i64, total_removed_bytes as i64) - } else { - (0, 0) - }; + let start_instant = Instant::now(); + tracing::info!(image_id=?image_config.id, "downloading image"); - metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count); - metrics::IMAGE_CACHE_SIZE - .set(images_dir_size + image_download_size as i64 - removed_bytes); + let image_path = ctx.image_path(image_config.id); - sqlx::query(indoc!( + // Clear any previous content and make image dir + match fs::remove_dir_all(&image_path).await { + Err(e) if e.kind() == ErrorKind::NotFound => {} + res => res.context("failed to delete image dir")?, + } + fs::create_dir(&image_path) + .await + .context("failed to create image dir")?; + + // NOTE: Txn here so that we prune and insert the new image row at the same time. This ensures + // if another image is downloading concurrently that it will calculate the correct images + // dir size. + let mut conn = ctx.sql().await?; + let mut tx = conn.begin().await?; + + let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!( + async { + // Get total size of images directory. Note that it doesn't matter if this doesn't + // match the actual fs size because it should either be exactly at or below actual fs + // size. Also calculating fs size manually is expensive. + sqlx::query_as::<_, (i64, i64)>(indoc!( " - INSERT OR REPLACE INTO images_cache (image_id, size, last_used_ts, download_complete_ts) - VALUES (?1, 0, ?2, NULL) + SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache ", )) - .bind(image_config.id) - .bind(utils::now()) - .execute(&mut *tx) - .await?; + .fetch_one(&mut *tx) + .await + .map_err(Into::::into) + }, + // NOTE: The image size here is somewhat misleading because its only the size of the + // downloaded archive and not the total disk usage after it is unpacked. However, this is + // good enough + self.fetch_image_download_size(ctx, image_config), + )?; + + let rows = sqlx::query_as::<_, (Uuid, i64, Option, i64)>(indoc!( + " + SELECT + ic.image_id, + ic.size, + ic.last_used_ts, + SUM(ic.size) + OVER (ORDER BY ic.last_used_ts ROWS UNBOUNDED PRECEDING) + AS running_total + FROM images_cache AS ic + LEFT JOIN actors AS a + -- Filter out images that are currently in use by actors + ON + ic.image_id = a.image_id AND + a.stop_ts IS NULL AND + a.exit_ts IS NULL + WHERE + -- Filter out current image, will be upserted + ic.image_id != ?1 AND + a.image_id IS NULL + ORDER BY ic.last_used_ts + ", + )) + .bind(image_config.id) + .bind( + (images_dir_size as u64) + .saturating_add(image_download_size) + .saturating_sub(ctx.config().images.max_cache_size()) as i64, + ) + .fetch_all(&mut *tx) + .await?; + + // Prune images + let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size + > ctx.config().images.max_cache_size() + { + // Fetch as many images as it takes to clear up enough space for this new image. + // Ordered by LRU + let rows = sqlx::query_as::<_, (Uuid, i64)>(indoc!( + " + WITH + cumulative_sizes AS ( + SELECT + ic.image_id, + ic.size, + ic.last_used_ts, + SUM(ic.size) + OVER (ORDER BY ic.last_used_ts ROWS UNBOUNDED PRECEDING) + AS running_total + FROM images_cache AS ic + LEFT JOIN actors AS a + -- Filter out images that are currently in use by actors + ON + ic.image_id = a.image_id AND + a.stop_ts IS NULL AND + a.exit_ts IS NULL + WHERE + -- Filter out current image, will be upserted + ic.image_id != ?1 AND + a.image_id IS NULL + ORDER BY ic.last_used_ts + ) + DELETE FROM images_cache + WHERE image_id IN ( + SELECT image_id + FROM cumulative_sizes + WHERE running_total - size < ?2 + ORDER BY last_used_ts + ) + RETURNING image_id, size + ", + )) + .bind(image_config.id) + .bind( + (images_dir_size as u64) + .saturating_add(image_download_size) + .saturating_sub(ctx.config().images.max_cache_size()) as i64, + ) + .fetch_all(&mut *tx) + .await?; - tx.commit().await?; + let rows_len = rows.len(); - // Release lock on sqlite pool - drop(conn); + if rows.is_empty() { + tracing::error!( + image_id=?image_config.id, + "no inactive images to delete to make space for new image, downloading anyway", + ); + } else { + tracing::debug!(image_id=?image_config.id, count=?rows_len, "cache full, clearing LRU entries"); + } - // Download image & compute size - // - // `image_size` is a slight over-estimate of the image size, since this is - // counting the number of bytes read from the tar. This is fine since - // over-estimating is safe for caching. - let download_start_instant = Instant::now(); - let image_size = self.download_inner(ctx, image_config).await?; - let download_duration = download_start_instant.elapsed().as_secs_f64(); + let mut total_removed_bytes = 0; - let convert_start_instant = Instant::now(); - self.convert(ctx, image_config).await?; - let convert_duration = convert_start_instant.elapsed().as_secs_f64(); + for (image_id, size) in rows { + total_removed_bytes += size; - // Update metrics after unpacking - metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); + // NOTE: The sql query does not return the current image id so there is no chance + // for a deadlock here + // Acquire lock on entry + let mut downloads_guard = self.downloads.lock().await; - // Update state to signify download completed successfully - let foo = sqlx::query(indoc!( - " - UPDATE images_cache - SET - download_complete_ts = ?2, - size = ?3 - WHERE image_id = ?1 - ", - )) - .bind(image_config.id) - .bind(utils::now()) - .bind(image_size as i64) - .execute(&mut *ctx.sql().await?) - .await?; - - let total_duration = start_instant.elapsed().as_secs_f64(); - crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(total_duration); - tracing::info!( - total_duration, - download_duration, - convert_duration, - "image download completed" - ); + match fs::remove_dir_all(ctx.image_path(image_id)).await { + Err(e) if e.kind() == ErrorKind::NotFound => {} + res => res.context("failed to delete image dir")?, + } - // The lock on entry is held until this point. After this any other parallel downloaders will - // continue with the image already downloaded - entry.insert_entry(()); + // Remove entry and release lock + downloads_guard.remove(&image_id); } - } + + (rows_len as i64, total_removed_bytes as i64) + } else { + (0, 0) + }; + + metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count); + metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_download_size as i64 - removed_bytes); + + sqlx::query(indoc!( + " + INSERT OR REPLACE INTO images_cache (image_id, size, last_used_ts, download_complete_ts) + VALUES (?1, 0, ?2, NULL) + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + // Release lock on sqlite pool + drop(conn); + + // Download image & compute size + // + // `image_size` is a slight over-estimate of the image size, since this is + // counting the number of bytes read from the tar. This is fine since + // over-estimating is safe for caching. + let download_start_instant = Instant::now(); + let image_size = self.download_inner(ctx, image_config).await?; + let download_duration = download_start_instant.elapsed().as_secs_f64(); + + let convert_start_instant = Instant::now(); + self.convert(ctx, image_config).await?; + let convert_duration = convert_start_instant.elapsed().as_secs_f64(); + + // Update metrics after unpacking + metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); + + // Update state to signify download completed successfully + sqlx::query(indoc!( + " + UPDATE images_cache + SET + download_complete_ts = ?2, + size = ?3 + WHERE image_id = ?1 + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .bind(image_size as i64) + .execute(&mut *ctx.sql().await?) + .await?; + + let total_duration = start_instant.elapsed().as_secs_f64(); + crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(total_duration); + tracing::info!( + total_duration, + download_duration, + convert_duration, + "image download completed" + ); + + // The lock on entry is held until this point. After this any other parallel downloaders will + // continue with the image already downloaded + *entry_guard = true; Ok(()) } diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index 01a8cee22d..845f9eebe0 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -15,8 +15,11 @@ use pegboard_config::Config; use sql::SqlitePoolExt; use sqlx::{ migrate::MigrateDatabase, - sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqlitePoolOptions, SqliteSynchronous}, - Executor, Sqlite, SqlitePool, + sqlite::{ + SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteLockingMode, + SqlitePoolOptions, SqliteSynchronous, + }, + Sqlite, SqlitePool, }; use tokio::{ fs, @@ -111,24 +114,19 @@ pub async fn init_sqlite_db(config: &Config) -> Result { async fn build_sqlite_pool(db_url: &str) -> Result { let opts = db_url .parse::()? - // Set synchronous mode to NORMAL for performance and data safety balance - .synchronous(SqliteSynchronous::Normal) // Set busy timeout to 5 seconds to avoid "database is locked" errors .busy_timeout(Duration::from_secs(5)) // Enable foreign key constraint enforcement .foreign_keys(true) // Enable auto vacuuming and set it to incremental mode for gradual space reclaiming - .auto_vacuum(SqliteAutoVacuum::Incremental); + .auto_vacuum(SqliteAutoVacuum::Incremental) + // Set synchronous mode to NORMAL for performance and data safety balance + .synchronous(SqliteSynchronous::Normal) + // Increases write performance + .journal_mode(SqliteJournalMode::Wal) + .locking_mode(SqliteLockingMode::Normal); let pool = SqlitePoolOptions::new() - .after_connect(|conn, _meta| { - Box::pin(async move { - // NOTE: sqlx doesn't seem to have a WAL2 option so we set it with a PRAGMA query - conn.execute("PRAGMA journal_mode = WAL2").await?; - - Ok(()) - }) - }) // Open connection immediately on startup .min_connections(1) .connect_with(opts) @@ -212,7 +210,7 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { CREATE TABLE IF NOT EXISTS actors ( actor_id BLOB NOT NULL, -- UUID generation INTEGER NOT NULL, - config BLOB NOT NULL, + config BLOB NOT NULL, -- JSONB start_ts INTEGER NOT NULL, running_ts INTEGER,