Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/edge/infra/client/manager/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ impl Actor {
let (_, ports) = tokio::try_join!(
async {
self.download_image(&ctx).await?;
self.make_fs(&ctx).await?;
Result::<(), anyhow::Error>::Ok(())
self.make_fs(&ctx).await
},
async {
let ports = self.bind_ports(ctx).await?;
Expand Down
6 changes: 4 additions & 2 deletions packages/edge/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ impl Actor {
.arg("add")
.arg(netns_path.file_name().context("bad netns path")?)
.output()
.await?;
.await
.context("failed to run `ip`")?;
ensure!(
cmd_out.status.success(),
"failed `ip netns` command\n{}",
Expand All @@ -413,7 +414,8 @@ impl Actor {
.env("CNI_IFNAME", &ctx.config().cni.network_interface)
.env("CAP_ARGS", cni_params_json)
.output()
.await?;
.await
.context("failed to run `cnitool`")?;
ensure!(
cmd_out.status.success(),
"failed `cnitool` command\n{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,11 @@ impl ImageDownloadHandler {
metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes);

// Update state to signify download completed successfully
sqlx::query(indoc!(
let foo = sqlx::query(indoc!(
"
UPDATE images_cache
SET
download_complete_ts = ?2 AND
download_complete_ts = ?2,
size = ?3
WHERE image_id = ?1
",
Expand Down
30 changes: 28 additions & 2 deletions packages/edge/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
fs,
runtime::{Builder, Runtime},
};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;

mod actor;
Expand Down Expand Up @@ -239,8 +239,34 @@ fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_logfmt::builder()
.with_span_name(std::env::var("RUST_LOG_SPAN_NAME").map_or(false, |x| x == "1"))
.with_span_path(std::env::var("RUST_LOG_SPAN_PATH").map_or(false, |x| x == "1"))
.with_target(std::env::var("RUST_LOG_TARGET").map_or(false, |x| x == "1"))
.with_location(std::env::var("RUST_LOG_LOCATION").map_or(false, |x| x == "1"))
.with_module_path(std::env::var("RUST_LOG_MODULE_PATH").map_or(false, |x| x == "1"))
.with_ansi_color(std::env::var("RUST_LOG_ANSI_COLOR").map_or(false, |x| x == "1"))
.layer()
.with_filter(tracing_subscriber::filter::LevelFilter::INFO),
.with_filter(env_filter("RUST_LOG")),
)
.init();
}

fn env_filter(env_var: &str) -> EnvFilter {
// Create env filter
let mut env_filter = EnvFilter::default()
// Default filter
.add_directive("info".parse().unwrap())
// Disable verbose logs
.add_directive("tokio_cron_scheduler=warn".parse().unwrap())
.add_directive("tokio=warn".parse().unwrap())
.add_directive("hyper=warn".parse().unwrap())
.add_directive("h2=warn".parse().unwrap());

if let Ok(filter) = std::env::var(env_var) {
for s in filter.split(',').filter(|x| !x.is_empty()) {
env_filter = env_filter.add_directive(s.parse().expect("invalid env filter"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using parse().map_err() instead of expect() to provide a more graceful error handling path for invalid filter directives

}
}

env_filter
}
29 changes: 24 additions & 5 deletions packages/edge/infra/client/manager/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,34 @@ pub async fn total_dir_size<P: AsRef<Path>>(path: P) -> Result<u64> {
ensure!(path.is_dir(), "path is not a directory: {}", path.display());

let mut total_size = 0;
let mut read_dir = fs::read_dir(path).await?;

while let Some(entry) = read_dir.next_entry().await? {
let mut read_dir = fs::read_dir(path).await.context("failed to read dir")?;

while let Some(entry) = read_dir.next_entry().await.transpose() {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
tracing::debug!(?err, "failed to read entry");
continue;
}
};
let entry_path = entry.path();

if entry_path.is_dir() {
total_size += Box::pin(total_dir_size(entry_path)).await?;
match Box::pin(total_dir_size(entry_path)).await {
Ok(size) => total_size += size,
Err(err) => {
tracing::debug!(?err, p=?entry.path().display(), "failed to calculate size for directory");
continue;
}
}
} else {
total_size += fs::metadata(entry_path).await?.len();
match fs::metadata(entry_path).await {
Ok(metadata) => total_size += metadata.len(),
Err(err) => {
tracing::debug!(?err, p=?entry.path().display(), "failed to get metadata for file");
continue;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ async fn handle_connection(
send_init_packet(&mut tx).await;

start_echo_actor(&mut tx, actor_id).await;
start_echo_actor(&mut tx, Uuid::new_v4()).await;

tokio::time::sleep(std::time::Duration::from_millis(10000)).await;
}
protocol::ToServer::Events(events) => {
for event in events {
Expand Down Expand Up @@ -188,7 +185,7 @@ async fn handle_connection(
);
}

tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(1000)).await;

// Verify client state
let actors = ctx.actors().read().await;
Expand Down
13 changes: 9 additions & 4 deletions packages/edge/infra/client/manager/tests/isolate_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ async fn isolate_lifecycle() {

// Init project directories
let tmp_dir = tempfile::TempDir::new().unwrap();
let config = init_client(&gen_tmp_dir_path, tmp_dir.path()).await;
tracing::info!(path=%tmp_dir.path().display(), "client dir");
let path = tmp_dir.path();
// let path = std::path::Path::new(
// "/home/rivet/rivet-ee/oss/packages/edge/infra/client/manager/tests/foo",
// );

let config = init_client(&gen_tmp_dir_path, &path).await;
tracing::info!(path=%path.display(), "client dir");

start_client(config, ctx_wrapper, close_rx.clone(), port).await;
}
Expand Down Expand Up @@ -121,7 +126,7 @@ async fn handle_connection(
"actor not in client memory"
);

tokio::time::sleep(std::time::Duration::from_millis(250))
tokio::time::sleep(std::time::Duration::from_millis(1000))
.await;

tracing::info!("sending echo");
Expand Down Expand Up @@ -180,7 +185,7 @@ async fn handle_connection(
);
}

tokio::time::sleep(Duration::from_millis(5)).await;
tokio::time::sleep(Duration::from_millis(50)).await;

// Verify client state
let actors = ctx.actors().read().await;
Expand Down
Loading