From 310907d238a2517292ef9d1be8888099670cd385 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 13 Jun 2025 20:43:02 +0000 Subject: [PATCH] chore(pegboard): fix local cache bugs --- .../infra/client/manager/src/actor/mod.rs | 3 +- .../infra/client/manager/src/actor/setup.rs | 6 ++-- .../manager/src/image_download_handler.rs | 4 +-- .../edge/infra/client/manager/src/main.rs | 30 +++++++++++++++++-- .../infra/client/manager/src/utils/mod.rs | 29 ++++++++++++++---- .../manager/tests/container_lifecycle.rs | 5 +--- .../client/manager/tests/isolate_lifecycle.rs | 13 +++++--- 7 files changed, 69 insertions(+), 21 deletions(-) diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index e894ddc45e..419baa5b21 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -155,8 +155,7 @@ impl Actor { let (_, ports) = tokio::try_join!( async { self.download_image(&ctx).await?; - self.make_fs(&ctx).await?; - Result::<(), anyhow::Error>::Ok(()) + self.make_fs(&ctx).await }, async { let ports = self.bind_ports(ctx).await?; diff --git a/packages/edge/infra/client/manager/src/actor/setup.rs b/packages/edge/infra/client/manager/src/actor/setup.rs index c1f8b3545b..752ef406ab 100644 --- a/packages/edge/infra/client/manager/src/actor/setup.rs +++ b/packages/edge/infra/client/manager/src/actor/setup.rs @@ -391,7 +391,8 @@ impl Actor { .arg("add") .arg(netns_path.file_name().context("bad netns path")?) .output() - .await?; + .await + .context("failed to run `ip`")?; ensure!( cmd_out.status.success(), "failed `ip netns` command\n{}", @@ -413,7 +414,8 @@ impl Actor { .env("CNI_IFNAME", &ctx.config().cni.network_interface) .env("CAP_ARGS", cni_params_json) .output() - .await?; + .await + .context("failed to run `cnitool`")?; ensure!( cmd_out.status.success(), "failed `cnitool` command\n{}", diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 0eda6e9fb3..6deed48840 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -230,11 +230,11 @@ impl ImageDownloadHandler { metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); // Update state to signify download completed successfully - sqlx::query(indoc!( + let foo = sqlx::query(indoc!( " UPDATE images_cache SET - download_complete_ts = ?2 AND + download_complete_ts = ?2, size = ?3 WHERE image_id = ?1 ", diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index 9b99311628..7a2d7b6b52 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -16,7 +16,7 @@ use tokio::{ fs, runtime::{Builder, Runtime}, }; -use tracing_subscriber::prelude::*; +use tracing_subscriber::{prelude::*, EnvFilter}; use url::Url; mod actor; @@ -239,8 +239,34 @@ fn init_tracing() { tracing_subscriber::registry() .with( tracing_logfmt::builder() + .with_span_name(std::env::var("RUST_LOG_SPAN_NAME").map_or(false, |x| x == "1")) + .with_span_path(std::env::var("RUST_LOG_SPAN_PATH").map_or(false, |x| x == "1")) + .with_target(std::env::var("RUST_LOG_TARGET").map_or(false, |x| x == "1")) + .with_location(std::env::var("RUST_LOG_LOCATION").map_or(false, |x| x == "1")) + .with_module_path(std::env::var("RUST_LOG_MODULE_PATH").map_or(false, |x| x == "1")) + .with_ansi_color(std::env::var("RUST_LOG_ANSI_COLOR").map_or(false, |x| x == "1")) .layer() - .with_filter(tracing_subscriber::filter::LevelFilter::INFO), + .with_filter(env_filter("RUST_LOG")), ) .init(); } + +fn env_filter(env_var: &str) -> EnvFilter { + // Create env filter + let mut env_filter = EnvFilter::default() + // Default filter + .add_directive("info".parse().unwrap()) + // Disable verbose logs + .add_directive("tokio_cron_scheduler=warn".parse().unwrap()) + .add_directive("tokio=warn".parse().unwrap()) + .add_directive("hyper=warn".parse().unwrap()) + .add_directive("h2=warn".parse().unwrap()); + + if let Ok(filter) = std::env::var(env_var) { + for s in filter.split(',').filter(|x| !x.is_empty()) { + env_filter = env_filter.add_directive(s.parse().expect("invalid env filter")); + } + } + + env_filter +} diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index 475ff557a2..3f3c98705b 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -378,15 +378,34 @@ pub async fn total_dir_size>(path: P) -> Result { ensure!(path.is_dir(), "path is not a directory: {}", path.display()); let mut total_size = 0; - let mut read_dir = fs::read_dir(path).await?; - - while let Some(entry) = read_dir.next_entry().await? { + let mut read_dir = fs::read_dir(path).await.context("failed to read dir")?; + + while let Some(entry) = read_dir.next_entry().await.transpose() { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + tracing::debug!(?err, "failed to read entry"); + continue; + } + }; let entry_path = entry.path(); if entry_path.is_dir() { - total_size += Box::pin(total_dir_size(entry_path)).await?; + match Box::pin(total_dir_size(entry_path)).await { + Ok(size) => total_size += size, + Err(err) => { + tracing::debug!(?err, p=?entry.path().display(), "failed to calculate size for directory"); + continue; + } + } } else { - total_size += fs::metadata(entry_path).await?.len(); + match fs::metadata(entry_path).await { + Ok(metadata) => total_size += metadata.len(), + Err(err) => { + tracing::debug!(?err, p=?entry.path().display(), "failed to get metadata for file"); + continue; + } + } } } diff --git a/packages/edge/infra/client/manager/tests/container_lifecycle.rs b/packages/edge/infra/client/manager/tests/container_lifecycle.rs index e6f716f7ea..cf94a6760d 100644 --- a/packages/edge/infra/client/manager/tests/container_lifecycle.rs +++ b/packages/edge/infra/client/manager/tests/container_lifecycle.rs @@ -83,9 +83,6 @@ async fn handle_connection( send_init_packet(&mut tx).await; start_echo_actor(&mut tx, actor_id).await; - start_echo_actor(&mut tx, Uuid::new_v4()).await; - - tokio::time::sleep(std::time::Duration::from_millis(10000)).await; } protocol::ToServer::Events(events) => { for event in events { @@ -188,7 +185,7 @@ async fn handle_connection( ); } - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(1000)).await; // Verify client state let actors = ctx.actors().read().await; diff --git a/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs b/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs index 56c2ef9ec4..6722765d86 100644 --- a/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs +++ b/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs @@ -40,8 +40,13 @@ async fn isolate_lifecycle() { // Init project directories let tmp_dir = tempfile::TempDir::new().unwrap(); - let config = init_client(&gen_tmp_dir_path, tmp_dir.path()).await; - tracing::info!(path=%tmp_dir.path().display(), "client dir"); + let path = tmp_dir.path(); + // let path = std::path::Path::new( + // "/home/rivet/rivet-ee/oss/packages/edge/infra/client/manager/tests/foo", + // ); + + let config = init_client(&gen_tmp_dir_path, &path).await; + tracing::info!(path=%path.display(), "client dir"); start_client(config, ctx_wrapper, close_rx.clone(), port).await; } @@ -121,7 +126,7 @@ async fn handle_connection( "actor not in client memory" ); - tokio::time::sleep(std::time::Duration::from_millis(250)) + tokio::time::sleep(std::time::Duration::from_millis(1000)) .await; tracing::info!("sending echo"); @@ -180,7 +185,7 @@ async fn handle_connection( ); } - tokio::time::sleep(Duration::from_millis(5)).await; + tokio::time::sleep(Duration::from_millis(50)).await; // Verify client state let actors = ctx.actors().read().await;