diff --git a/packages/edge/infra/client/manager/src/actor/setup.rs b/packages/edge/infra/client/manager/src/actor/setup.rs index 00d06ebefa..b793e08c7b 100644 --- a/packages/edge/infra/client/manager/src/actor/setup.rs +++ b/packages/edge/infra/client/manager/src/actor/setup.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + hash::Hasher, path::{Path, PathBuf}, process::Stdio, result::Result::{Err, Ok}, @@ -10,7 +11,7 @@ use futures_util::StreamExt; use indoc::indoc; use pegboard::protocol; use pegboard_config::isolate_runner::actor as actor_config; -use rand::Rng; +use rand::{seq::SliceRandom, Rng}; use serde_json::json; use tokio::{ fs::{self, File}, @@ -86,220 +87,156 @@ impl Actor { let actor_path = ctx.actor_path(self.actor_id, self.generation); let fs_path = actor_path.join("fs"); - // Log the primary and fallback URLs we're attempting to download from - let primary_url = format!("{}/{}", self.config.image.artifact_url_stub, self.config.image.id); - let fallback_url = self.config.image.fallback_artifact_url.as_deref().map(|url| format!("{}/{}", url, self.config.image.id)); - + // Get addresses using the shared utility function + let addresses = crate::utils::get_image_addresses( + ctx, + self.config.image.id, + &self.config.image.artifact_url_stub, + self.config.image.fallback_artifact_url.as_deref() + ).await?; + + // Log the URLs we're attempting to download from tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, + actor_id=?self.actor_id, + generation=?self.generation, image_id=?self.config.image.id, - primary_url=%primary_url, - fallback_url=?fallback_url, + addresses=?addresses, "initiating image download" ); - - let mut stream = utils::fetch_image_stream( - ctx, - self.config.image.id, - &self.config.image.artifact_url_stub, - self.config.image.fallback_artifact_url.as_deref(), - ) - .await?; - - match self.config.image.kind { - protocol::ImageKind::DockerImage => { - let docker_image_path = fs_path.join("docker-image.tar"); - - match self.config.image.compression { - protocol::ImageCompression::None => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "saving uncompressed docker image to file", - ); - - let mut output_file = File::create(&docker_image_path).await?; - // Write from stream to file - while let Some(chunk) = stream.next().await { - output_file.write_all(&chunk?).await?; - } - } - protocol::ImageCompression::Lz4 => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading and decompressing docker image", - ); + // Try each URL until one succeeds + let mut last_error = None; + for url in &addresses { + tracing::info!(actor_id=?self.actor_id, generation=?self.generation, ?url, "attempting download"); + + // Build the shell command based on image kind and compression + // Using shell commands with native Unix pipes improves performance by: + // 1. Reducing overhead of passing data through Rust + // 2. Letting the OS handle data transfer between processes efficiently + // 3. Avoiding unnecessary buffer copies in memory + let shell_cmd = match (self.config.image.kind, self.config.image.compression) { + // Docker image, no compression + (protocol::ImageKind::DockerImage, protocol::ImageCompression::None) => { + let docker_image_path = fs_path.join("docker-image.tar"); + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + "downloading uncompressed docker image using curl" + ); - // Spawn the lz4 process - let mut lz4_child = Command::new("lz4") - .arg("-d") - .arg("-") - .arg(&docker_image_path) - .stdin(Stdio::piped()) - .spawn()?; - - // Take the stdin of lz4 - let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?; - - tokio::try_join!( - // Pipe the response body to lz4 stdin - async move { - while let Some(chunk) = stream.next().await { - let data = chunk?; - lz4_stdin.write_all(&data).await?; - } - lz4_stdin.shutdown().await?; + // Use curl to download directly to file + format!( + "curl -sSfL '{}' -o '{}'", + url, + docker_image_path.display() + ) + } - anyhow::Ok(()) - }, - // Wait for child process - async { - let cmd_out = lz4_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `lz4` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); + // Docker image with LZ4 compression + (protocol::ImageKind::DockerImage, protocol::ImageCompression::Lz4) => { + let docker_image_path = fs_path.join("docker-image.tar"); + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + "downloading and decompressing docker image using curl | lz4" + ); - Ok(()) - }, - )?; - } + // Use curl piped to lz4 for decompression + format!( + "curl -sSfL '{}' | lz4 -d - '{}'", + url, + docker_image_path.display() + ) } - } - protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript => { - match self.config.image.compression { - protocol::ImageCompression::None => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading and unarchiving uncompressed artifact", - ); - // Spawn the tar process - let mut tar_child = Command::new("tar") - .arg("-x") - .arg("-C") - .arg(&fs_path) - .stdin(Stdio::piped()) - .spawn()?; - - // Take the stdin of tar process - let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?; - - tokio::try_join!( - // Pipe the response body to tar stdin - async move { - while let Some(chunk) = stream.next().await { - let data = chunk?; - tar_stdin.write_all(&data).await?; - } - tar_stdin.shutdown().await?; + // OCI Bundle or JavaScript with no compression + ( + protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, + protocol::ImageCompression::None, + ) => { + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + "downloading and unarchiving uncompressed artifact using curl | tar" + ); - anyhow::Ok(()) - }, - // Wait for child process - async { - let cmd_out = tar_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `tar` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); + // Use curl piped to tar for extraction + format!( + "curl -sSfL '{}' | tar -x -C '{}'", + url, + fs_path.display() + ) + } - Ok(()) - }, - )?; - } - protocol::ImageCompression::Lz4 => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading, decompressing, and unarchiving artifact", - ); + // OCI Bundle or JavaScript with LZ4 compression + ( + protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, + protocol::ImageCompression::Lz4, + ) => { + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + "downloading, decompressing, and unarchiving artifact using curl | lz4 | tar" + ); - // Spawn the lz4 process - let mut lz4_child = Command::new("lz4") - .arg("-d") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn()?; - - // Spawn the tar process - let mut tar_child = Command::new("tar") - .arg("-x") - .arg("-C") - .arg(&fs_path) - .stdin(Stdio::piped()) - .spawn()?; - - // Take the stdin of lz4 and tar processes - let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?; - let mut lz4_stdout = lz4_child.stdout.take().context("lz4 stdout")?; - let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?; - - tokio::try_join!( - // Pipe the response body to lz4 stdin - async move { - while let Some(chunk) = stream.next().await { - let data = chunk?; - lz4_stdin.write_all(&data).await?; - } - lz4_stdin.shutdown().await?; + // Use curl piped to lz4 for decompression, then to tar for extraction + format!( + "curl -sSfL '{}' | lz4 -d | tar -x -C '{}'", + url, + fs_path.display() + ) + } + }; - anyhow::Ok(()) - }, - // Pipe lz4 stdout to tar stdin - async move { - // Large buffer size (instead of system page size) reduces system - // calls - let mut buffer = [0; 65536]; - loop { - let n = lz4_stdout.read(&mut buffer).await?; - if n == 0 { - break; - } - tar_stdin.write_all(&buffer[..n]).await?; - } - tar_stdin.shutdown().await?; + // Execute the shell command + // Use curl's built-in error handling to fail silently and let us try the next URL + let cmd_result = Command::new("sh").arg("-c").arg(&shell_cmd).output().await; - anyhow::Ok(()) - }, - // Wait for child processes - async { - let cmd_out = lz4_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `lz4` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); + match cmd_result { + Ok(output) if output.status.success() => { + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + ?url, + "successfully downloaded image" + ); - Ok(()) - }, - async { - let cmd_out = tar_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `tar` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); + let duration = timer.elapsed().as_secs_f64(); + crate::metrics::SETUP_DOWNLOAD_IMAGE_DURATION.observe(duration); + tracing::info!(actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "artifact download completed"); - Ok(()) - }, - )?; - } + return Ok(()); + } + Ok(output) => { + // Command ran but failed + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + actor_id=?self.actor_id, + generation=?self.generation, + ?url, + status=?output.status, + stderr=%stderr, + "failed to download image" + ); + last_error = Some(anyhow!("download failed: {}", stderr)); + } + Err(e) => { + // Failed to execute command + tracing::warn!( + actor_id=?self.actor_id, + generation=?self.generation, + ?url, + error=?e, + "failed to execute download command" + ); + last_error = Some(anyhow!("download command failed: {}", e)); } } } - let duration = timer.elapsed().as_secs_f64(); - crate::metrics::SETUP_DOWNLOAD_IMAGE_DURATION.observe(duration); - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "artifact download completed"); - - Ok(()) + // If we get here, all URLs failed + Err(last_error + .unwrap_or_else(|| anyhow!("failed to download image from any available URL"))) } pub async fn setup_oci_bundle( @@ -432,7 +369,7 @@ impl Actor { // Parallelize file writes for better performance // Prepare content for all files before writing let config_json = serde_json::to_vec(&config)?; - + // resolv.conf content // See also rivet-actor.conflist in packages/services/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh let resolv_conf = indoc!( @@ -446,7 +383,7 @@ impl Actor { options attempts:2 " ); - + // hosts file content let hosts_content = indoc!( " @@ -454,7 +391,7 @@ impl Actor { ::1 localhost ip6-localhost ip6-loopback " ); - + // Write all files in parallel tracing::info!( actor_id=?self.actor_id, @@ -470,7 +407,7 @@ impl Actor { let duration = timer.elapsed().as_secs_f64(); crate::metrics::SETUP_OCI_BUNDLE_DURATION.observe(duration); tracing::info!( - actor_id=?self.actor_id, + actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "OCI bundle setup completed" @@ -486,7 +423,7 @@ impl Actor { ) -> Result<()> { let timer = std::time::Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "setting up isolate environment"); - + let actor_path = ctx.actor_path(self.actor_id, self.generation); tracing::info!( @@ -530,7 +467,7 @@ impl Actor { let duration = timer.elapsed().as_secs_f64(); crate::metrics::SETUP_ISOLATE_DURATION.observe(duration); tracing::info!( - actor_id=?self.actor_id, + actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "isolate setup completed" @@ -630,7 +567,7 @@ impl Actor { let duration = timer.elapsed().as_secs_f64(); crate::metrics::SETUP_CNI_NETWORK_DURATION.observe(duration); tracing::info!( - actor_id=?self.actor_id, + actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "cni network setup completed" @@ -645,7 +582,7 @@ impl Actor { ) -> Result> { let timer = std::time::Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "binding ports"); - + let (mut gg_ports, mut host_ports): (Vec<_>, Vec<_>) = self .config .ports @@ -653,9 +590,9 @@ impl Actor { .partition(|(_, port)| matches!(port.routing, protocol::PortRouting::GameGuard)); tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - gg_ports_count=gg_ports.len(), + actor_id=?self.actor_id, + generation=?self.generation, + gg_ports_count=gg_ports.len(), host_ports_count=host_ports.len(), "partitioned ports for binding" ); @@ -685,7 +622,7 @@ impl Actor { generation=?self.generation, "sorting ports" ); - + // The SQL query returns a list of TCP ports then UDP ports. We sort the input ports here to match // that order. gg_ports.sort_by_key(|(_, port)| port.protocol); @@ -699,7 +636,7 @@ impl Actor { generation=?self.generation, "mapping proxied ports" ); - + let proxied_ports = gg_ports .iter() @@ -740,7 +677,7 @@ impl Actor { let duration = timer.elapsed().as_secs_f64(); crate::metrics::SETUP_BIND_PORTS_DURATION.observe(duration); tracing::info!( - actor_id=?self.actor_id, + actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, ports_count=proxied_ports.len(), diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index d3c078a784..c5166a8cee 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -15,6 +15,7 @@ use notify::{ use pegboard::protocol; use pegboard_config::Config; use rand::{prelude::SliceRandom, SeedableRng}; +use rand_chacha::ChaCha12Rng; use sql::SqlitePoolExt; use sqlx::{ migrate::MigrateDatabase, @@ -259,6 +260,50 @@ pub fn now() -> i64 { .expect("now doesn't fit in i64") } +/// Generates a list of address URLs for a given build ID, with deterministic shuffling. +/// +/// This function accepts a build ID and returns an array of URLs, including both +/// the seeded shuffling and the fallback address (if provided). +pub async fn get_image_addresses( + ctx: &Ctx, + image_id: Uuid, + image_artifact_url_stub: &str, + image_fallback_artifact_url: Option<&str>, +) -> Result> { + // Get hash from image id + let mut hasher = DefaultHasher::new(); + hasher.write(image_id.as_bytes()); + let hash = hasher.finish(); + + let mut rng = ChaCha12Rng::seed_from_u64(hash); + + // Shuffle based on hash + let mut addresses = ctx + .pull_addr_handler + .addresses(ctx.config()) + .await? + .iter() + .map(|addr| { + Ok(Url::parse(&format!("{addr}{}", image_artifact_url_stub)) + .context("failed to build artifact url")? + .to_string()) + }) + .collect::>>()?; + addresses.shuffle(&mut rng); + + // Add fallback url to the end if one is set + if let Some(fallback_artifact_url) = image_fallback_artifact_url { + addresses.push(fallback_artifact_url.to_string()); + } + + ensure!( + !addresses.is_empty(), + "no artifact urls available (no pull addresses nor fallback)" + ); + + Ok(addresses) +} + /// Creates an async file watcher. fn async_watcher() -> Result<(RecommendedWatcher, Receiver>)> { let (tx, rx) = channel(1); @@ -315,36 +360,7 @@ pub async fn fetch_image_stream( image_artifact_url_stub: &str, image_fallback_artifact_url: Option<&str>, ) -> Result>> { - // Get hash from image id - let mut hasher = DefaultHasher::new(); - hasher.write(image_id.as_bytes()); - let hash = hasher.finish(); - - let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(hash); - - // Shuffle based on hash - let mut addresses = ctx - .pull_addr_handler - .addresses(ctx.config()) - .await? - .iter() - .map(|addr| { - Ok(Url::parse(&format!("{addr}{}", image_artifact_url_stub)) - .context("failed to build artifact url")? - .to_string()) - }) - .collect::>>()?; - addresses.shuffle(&mut rng); - - // Add fallback url to the end if one is set - if let Some(fallback_artifact_url) = image_fallback_artifact_url { - addresses.push(fallback_artifact_url.to_string()); - } - - ensure!( - !addresses.is_empty(), - "no artifact urls available (no pull addresses nor fallback)" - ); + let addresses = get_image_addresses(ctx, image_id, image_artifact_url_stub, image_fallback_artifact_url).await?; let mut iter = addresses.into_iter(); while let Some(artifact_url) = iter.next() {