diff --git a/Cargo.lock b/Cargo.lock index 7415a0d408..0c7a24b069 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,7 +192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "220044e6a1bb31ddee4e3db724d29767f352de47445a6cd75e1a173142136c83" dependencies = [ "nom 7.1.3", - "vte", + "vte 0.10.1", ] [[package]] @@ -12424,7 +12424,6 @@ version = "25.4.2" dependencies = [ "anyhow", "async-posthog", - "base64 0.22.1", "clap", "ctrlc", "deno-embed", @@ -13176,6 +13175,8 @@ dependencies = [ "anyhow", "assert_cmd", "async-stream", + "base64 0.22.1", + "chrono", "clap", "console", "const_format", @@ -13201,6 +13202,7 @@ dependencies = [ "serde", "serde_json", "sha1", + "strip-ansi-escapes", "strum 0.24.1", "tar", "tempfile", @@ -14836,6 +14838,15 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strip-ansi-escapes" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8f8038e7e7969abb3f1b7c2a811225e9296da208539e0f79c5251d6cac0025" +dependencies = [ + "vte 0.14.1", +] + [[package]] name = "strsim" version = "0.10.0" @@ -17317,6 +17328,15 @@ dependencies = [ "vte_generate_state_changes", ] +[[package]] +name = "vte" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231fdcd7ef3037e8330d8e17e61011a2c244126acc0a982f4040ac3f9f0bc077" +dependencies = [ + "memchr", +] + [[package]] name = "vte_generate_state_changes" version = "0.1.2" diff --git a/cloud/packages/ci-manager/src/executors/docker.ts b/cloud/packages/ci-manager/src/executors/docker.ts index b707f40940..670ff49190 100644 --- a/cloud/packages/ci-manager/src/executors/docker.ts +++ b/cloud/packages/ci-manager/src/executors/docker.ts @@ -38,6 +38,11 @@ export async function runDockerBuild( `Starting kaniko with args: docker ${kanikoArgs.join(" ")}`, ); + buildStore.updateStatus(buildId, { + type: "running", + data: { docker: {} } + }); + return new Promise((resolve, reject) => { const dockerProcess = spawn("docker", kanikoArgs, { stdio: ["pipe", "pipe", "pipe"] diff --git a/cloud/packages/ci-manager/src/executors/rivet.ts b/cloud/packages/ci-manager/src/executors/rivet.ts index 021f6c36da..5e8d5c7655 100644 --- a/cloud/packages/ci-manager/src/executors/rivet.ts +++ b/cloud/packages/ci-manager/src/executors/rivet.ts @@ -78,6 +78,13 @@ export async function runRivetBuild( const actorId = createResponse.actor.id; buildStore.addLog(buildId, `Created Rivet actor: ${actorId}`); + buildStore.updateStatus(buildId, { + type: "running", + data: { + rivet: { actorId } + } + }); + await pollActorStatus( buildStore, client, diff --git a/cloud/packages/ci-manager/src/kaniko-runner.ts b/cloud/packages/ci-manager/src/kaniko-runner.ts index 1de7535642..cb95147454 100644 --- a/cloud/packages/ci-manager/src/kaniko-runner.ts +++ b/cloud/packages/ci-manager/src/kaniko-runner.ts @@ -17,7 +17,12 @@ export async function runKanikoBuild( await mkdir(dirname(build.contextPath!), { recursive: true }); - buildStore.updateStatus(buildId, { type: "running", data: {} }); + buildStore.updateStatus(buildId, { + type: "running", + data: { + noRunner: {} + } + }); const executionMode = process.env.KANIKO_EXECUTION_MODE || "docker"; buildStore.addLog(buildId, `Using execution mode: ${executionMode}`); diff --git a/cloud/packages/ci-manager/src/types.ts b/cloud/packages/ci-manager/src/types.ts index 1e5966c722..d56c6b9d4b 100644 --- a/cloud/packages/ci-manager/src/types.ts +++ b/cloud/packages/ci-manager/src/types.ts @@ -1,9 +1,25 @@ import { z } from "zod"; import { NO_SEP_CHAR_REGEX, UNIT_SEP_CHAR } from "./common"; +export const RunnerSchema = z.union([ + z.object({ + rivet: z.object({ + actorId: z.string(), + }) + }), + z.object({ + docker: z.object({}) + }), + z.object({ + noRunner: z.object({}) + }) +]); + +export type Runner = z.infer; + export const StatusSchema = z.discriminatedUnion("type", [ z.object({ type: z.literal("starting"), data: z.object({}) }), - z.object({ type: z.literal("running"), data: z.object({}) }), + z.object({ type: z.literal("running"), data: RunnerSchema }), z.object({ type: z.literal("finishing"), data: z.object({}) }), z.object({ type: z.literal("converting"), data: z.object({}) }), z.object({ type: z.literal("uploading"), data: z.object({}) }), diff --git a/packages/toolchain/cli/Cargo.toml b/packages/toolchain/cli/Cargo.toml index 1b27f05d9b..9330a42d24 100644 --- a/packages/toolchain/cli/Cargo.toml +++ b/packages/toolchain/cli/Cargo.toml @@ -16,7 +16,6 @@ path = "src/main.rs" [dependencies] anyhow = "1.0" async-posthog.workspace = true -base64 = "0.22.1" clap = { version = "4.5.9", features = ["derive"] } ctrlc = "3.4.5" deno-embed.workspace = true diff --git a/packages/toolchain/cli/src/commands/actor/create.rs b/packages/toolchain/cli/src/commands/actor/create.rs index 0e5273b079..8549073437 100644 --- a/packages/toolchain/cli/src/commands/actor/create.rs +++ b/packages/toolchain/cli/src/commands/actor/create.rs @@ -103,7 +103,7 @@ pub struct Opts { /// Specify which log stream to display #[clap(long)] - log_stream: Option, + log_stream: Option, /// Deploy the build before creating the actor #[clap(long)] @@ -299,17 +299,18 @@ impl Opts { // Tail logs if self.logs { - crate::util::actor::logs::tail( + toolchain::util::actor::logs::tail( &ctx, - crate::util::actor::logs::TailOpts { + toolchain::util::actor::logs::TailOpts { environment: &env, actor_id: response.actor.id, stream: self .log_stream .clone() - .unwrap_or(crate::util::actor::logs::LogStream::All), + .unwrap_or(toolchain::util::actor::logs::LogStream::All), follow: true, - timestamps: true, + print_type: toolchain::util::actor::logs::PrintType::PrintWithTime, + exit_on_ctrl_c: true, }, ) .await?; diff --git a/packages/toolchain/cli/src/commands/actor/logs.rs b/packages/toolchain/cli/src/commands/actor/logs.rs index abb45665e8..d502cf8a79 100644 --- a/packages/toolchain/cli/src/commands/actor/logs.rs +++ b/packages/toolchain/cli/src/commands/actor/logs.rs @@ -16,7 +16,7 @@ pub struct Opts { /// Specify which log stream to display (stdout, stderr, or all) #[clap(long, short = 's')] - stream: Option, + stream: Option, /// Disable timestamp display in logs #[clap(long)] @@ -36,17 +36,23 @@ impl Opts { let actor_id = Uuid::parse_str(&self.id).map_err(|_| errors::UserError::new("invalid id uuid"))?; - crate::util::actor::logs::tail( + let print_type = if self.no_timestamps { + toolchain::util::actor::logs::PrintType::Print + } else { + toolchain::util::actor::logs::PrintType::PrintWithTime + }; + toolchain::util::actor::logs::tail( &ctx, - crate::util::actor::logs::TailOpts { + toolchain::util::actor::logs::TailOpts { environment: &env, actor_id, stream: self .stream .clone() - .unwrap_or(crate::util::actor::logs::LogStream::All), + .unwrap_or(toolchain::util::actor::logs::LogStream::All), follow: !self.no_follow, - timestamps: !self.no_timestamps, + print_type, + exit_on_ctrl_c: true }, ) .await?; diff --git a/packages/toolchain/cli/src/util/mod.rs b/packages/toolchain/cli/src/util/mod.rs index 3feafda32a..f0aee4910e 100644 --- a/packages/toolchain/cli/src/util/mod.rs +++ b/packages/toolchain/cli/src/util/mod.rs @@ -1,4 +1,3 @@ -pub mod actor; pub mod deploy; pub mod env; pub mod login; diff --git a/packages/toolchain/toolchain/Cargo.toml b/packages/toolchain/toolchain/Cargo.toml index 89edd3f718..28e797d29d 100644 --- a/packages/toolchain/toolchain/Cargo.toml +++ b/packages/toolchain/toolchain/Cargo.toml @@ -9,6 +9,8 @@ edition.workspace = true [dependencies] anyhow = "1.0" async-stream = "0.3.3" +base64 = "0.22.1" +chrono = "0.4" clap = { version = "4.5", features = ["derive"] } console = "0.15" const_format = "0.2.32" @@ -34,6 +36,7 @@ schemars = "0.8.21" serde = { version = "1.0", features = ["derive", "rc"] } serde_json = { version = "1.0", features = ["raw_value"] } sha1 = "0.10.6" +strip-ansi-escapes = "0.2.1" strum = { version = "0.24", features = ["derive"] } tar = "0.4.40" tempfile = "3.13.0" diff --git a/packages/toolchain/cli/src/util/actor/logs.rs b/packages/toolchain/toolchain/src/util/actor/logs.rs similarity index 77% rename from packages/toolchain/cli/src/util/actor/logs.rs rename to packages/toolchain/toolchain/src/util/actor/logs.rs index 269571636d..f2c6023dd8 100644 --- a/packages/toolchain/cli/src/util/actor/logs.rs +++ b/packages/toolchain/toolchain/src/util/actor/logs.rs @@ -1,10 +1,14 @@ use anyhow::*; use base64::{engine::general_purpose::STANDARD, Engine}; use clap::ValueEnum; +use chrono::{DateTime, Utc}; use std::time::Duration; use tokio::signal; use tokio::sync::watch; -use toolchain::rivet_api::{apis, models}; +use crate::{ + rivet_api::{apis, models}, + ToolchainCtx +}; use uuid::Uuid; #[derive(ValueEnum, Clone)] @@ -17,23 +21,36 @@ pub enum LogStream { StdErr, } +pub enum PrintType { + /// Callback that is called when a new line is fetched. + /// The first argument is the timestamp of the line, the second argument is the decoded line. + Custom(fn(DateTime, String)), + /// Prints the line to stdout. + Print, + /// Prints with timestamp + PrintWithTime, +} + pub struct TailOpts<'a> { + pub print_type: PrintType, pub environment: &'a str, pub actor_id: Uuid, pub stream: LogStream, pub follow: bool, - pub timestamps: bool, + pub exit_on_ctrl_c: bool, } /// Reads the logs of an actor. -pub async fn tail(ctx: &toolchain::ToolchainCtx, opts: TailOpts<'_>) -> Result<()> { +pub async fn tail(ctx: &ToolchainCtx, opts: TailOpts<'_>) -> Result<()> { let (stdout_fetched_tx, stdout_fetched_rx) = watch::channel(false); let (stderr_fetched_tx, stderr_fetched_rx) = watch::channel(false); + let exit_on_ctrl_c = opts.exit_on_ctrl_c; + tokio::select! { result = tail_streams(ctx, &opts, stdout_fetched_tx, stderr_fetched_tx) => result, result = poll_actor_state(ctx, &opts, stdout_fetched_rx, stderr_fetched_rx) => result, - _ = signal::ctrl_c() => { + _ = signal::ctrl_c(), if exit_on_ctrl_c => { Ok(()) } } @@ -41,7 +58,7 @@ pub async fn tail(ctx: &toolchain::ToolchainCtx, opts: TailOpts<'_>) -> Result<( /// Reads the streams of an actor's logs. async fn tail_streams( - ctx: &toolchain::ToolchainCtx, + ctx: &ToolchainCtx, opts: &TailOpts<'_>, stdout_fetched_tx: watch::Sender, stderr_fetched_tx: watch::Sender, @@ -66,7 +83,7 @@ async fn tail_streams( /// Reads a specific stream of an actor's log. async fn tail_stream( - ctx: &toolchain::ToolchainCtx, + ctx: &ToolchainCtx, opts: &TailOpts<'_>, stream: models::ActorsQueryLogStream, log_fetched_tx: watch::Sender, @@ -111,6 +128,10 @@ async fn tail_stream( } for (ts, line) in res.timestamps.iter().zip(res.lines.iter()) { + let Result::Ok(ts) = ts.parse::>() else { + eprintln!("Failed to parse timestamp: {ts} for line {line}"); + continue; + }; let decoded_line = match STANDARD.decode(line) { Result::Ok(bytes) => String::from_utf8_lossy(&bytes).to_string(), Err(_) => { @@ -119,10 +140,17 @@ async fn tail_stream( } }; - if opts.timestamps { - println!("{ts} {decoded_line}"); - } else { - println!("{decoded_line}"); + + match &opts.print_type { + PrintType::Custom(callback) => { + (callback)(ts, decoded_line); + } + PrintType::Print => { + println!("{decoded_line}"); + } + PrintType::PrintWithTime => { + println!("{ts} {decoded_line}"); + } } } @@ -138,7 +166,7 @@ async fn tail_stream( /// /// Using this in a `tokio::select` will make all other tasks cancel when the actor finishes. async fn poll_actor_state( - ctx: &toolchain::ToolchainCtx, + ctx: &ToolchainCtx, opts: &TailOpts<'_>, mut stdout_fetched_rx: watch::Receiver, mut stderr_fetched_rx: watch::Receiver, @@ -171,7 +199,12 @@ async fn poll_actor_state( .map_err(|err| anyhow!("Failed to poll actor: {err}"))?; if res.actor.destroyed_at.is_some() { - println!("Actor finished"); + match opts.print_type { + PrintType::Custom(_cb) => {} + _ => { + println!("Actor finished"); + } + } return Ok(()); } } diff --git a/packages/toolchain/cli/src/util/actor/mod.rs b/packages/toolchain/toolchain/src/util/actor/mod.rs similarity index 100% rename from packages/toolchain/cli/src/util/actor/mod.rs rename to packages/toolchain/toolchain/src/util/actor/mod.rs diff --git a/packages/toolchain/toolchain/src/util/docker/build_remote.rs b/packages/toolchain/toolchain/src/util/docker/build_remote.rs index 055b399799..5129932548 100644 --- a/packages/toolchain/toolchain/src/util/docker/build_remote.rs +++ b/packages/toolchain/toolchain/src/util/docker/build_remote.rs @@ -505,7 +505,7 @@ async fn upload_build_context( } async fn poll_build_status( - _ctx: &ToolchainCtx, + ctx: &ToolchainCtx, task: task::TaskCtx, build_id: &str, ci_manager_endpoint: &str, @@ -514,11 +514,16 @@ async fn poll_build_status( // Poll build status until completion task.log("[Remote Build] Polling build status..."); - let max_attempts = 900; // 30 minutes - let interval = Duration::from_secs(2); + let max_timeout = Duration::from_secs(30 * 60); + let poll_interval = Duration::from_secs(2); + let start_time = std::time::Instant::now(); + let mut previous_status = String::new(); + loop { + if start_time.elapsed() >= max_timeout { + bail!("Build polling timeout after 30 minutes"); + } - for attempt in 0..max_attempts { - tokio::time::sleep(interval).await; + tokio::time::sleep(poll_interval).await; let response = reqwest::Client::new() .get(&format!("{}/builds/{}", server_url, build_id)) @@ -539,7 +544,10 @@ async fn poll_build_status( .and_then(|t| t.as_str()) .unwrap_or("unknown"); - task.log(format!("[Remote Build] Build status: {}", status)); + let is_status_changed = previous_status != status; + if is_status_changed { + previous_status = status.to_string(); + } match status { "success" => { @@ -562,33 +570,138 @@ async fn poll_build_status( .unwrap_or("Unknown error"); bail!("Remote build failed: {}", reason); } + "running" => { + task.log(format!("[Remote Build] Build status: {}", status)); + + // If we know its running, and have its actor ID, + // we can stop polling and start streaming logs + // until the actor exists. + let runner = build_info + .get("status") + .and_then(|s| s.get("data")) + .and_then(|d| d.get("rivet")); + if let Some(runner) = runner { + let actor_id_raw = runner + .get("actorId") + .and_then(|a| a.as_str()) + .context("Rivet runner missing actorId")?; + + let actor_id = Uuid::parse_str(&actor_id_raw) + .context("Failed to parse actor ID from rivet runner")?; + + task.log("[Remote Build] Streaming build logs from actor."); + + match crate::util::actor::logs::tail( + &ctx, + crate::util::actor::logs::TailOpts { + environment: CI_ENVIRONMENT_ID, + actor_id, + stream: crate::util::actor::logs::LogStream::All, + follow: true, + print_type: crate::util::actor::logs::PrintType::Custom(handle_build_log_line), + exit_on_ctrl_c: false + }, + ) + .await { + Ok(_) => { + task.log("[Remote Build] Build logs streaming completed."); + } + Err(e) => { + task.log(format!( + "[Remote Build] Failed to stream build logs: {}", + e + )); + } + } + } + } _ => { + if is_status_changed { + task.log(format!("[Remote Build] Build status: {}", status)); + } // Continue polling for other statuses (pending, running, etc.) } } } else { task.log(format!( - "[Remote Build] Poll attempt {} failed: HTTP {}", - attempt + 1, + "[Remote Build] Poll failed: HTTP {}", res.status() )); } } Err(e) => { task.log(format!( - "[Remote Build] Poll attempt {} failed: {}", - attempt + 1, + "[Remote Build] Poll failed: {}", e )); } } + } +} - if attempt == max_attempts - 1 { - bail!("Build polling timeout after {} attempts", max_attempts); - } +fn handle_build_log_line( + _timestamp: chrono::DateTime, + line: String, +) { + let line = strip_ansi_escape_codes(&line); + + // If the line starts with INFO[.+], its a Kaniko log line + // so we strip the prefix and only print it if its important + let Some(line) = transform_log_line(line) else { + return; + }; + + println!("[Remote Build] {line}"); +} + +fn transform_log_line(line: String) -> Option { + let Some(stripped) = line.strip_prefix("INFO[") else { + return Some(format!("> {}", line).to_string()); + }; + + let end_index = stripped.find(']').unwrap_or(stripped.len()); + let stripped = &stripped[end_index + 1..].trim(); + + let line: String = stripped.to_string(); + + // If it starts with uppercase word, its probably important + // since it's probably a Dockerfile instruction + let first_word = &line.split_whitespace() + .next() + .unwrap_or(""); + let is_docker_instruction = first_word + .chars() + .filter(|c| c.is_alphabetic()) + .all(|c| c.is_uppercase()); + + if is_docker_instruction { + return Some(line); + } + + if line.starts_with("Unpacking rootfs") { + return Some("Initializing image filesystem...".to_string()); + } + + if line.starts_with("Taking snapshot of full filesystem") { + return Some("Taking snapshot of filesystem...".to_string()); + } + + if line.starts_with("Uploading tar file") { + return Some("Exporting built image...".to_string()); + } + + None +} + +fn strip_ansi_escape_codes(line: &str) -> String { + // If the input doesn't contain escape sequences, don't process it, + // as strip_ansi_escapes happens to strip tabs as well. + // (See https://github.com/luser/strip-ansi-escapes/issues/20) + if line.contains('\x1b') { + return strip_ansi_escapes::strip_str(line).to_string() } - bail!("timed out polling status") + line.to_string() } async fn _get_build_by_tags( diff --git a/packages/toolchain/toolchain/src/util/mod.rs b/packages/toolchain/toolchain/src/util/mod.rs index 022fb26915..5c9e48faef 100644 --- a/packages/toolchain/toolchain/src/util/mod.rs +++ b/packages/toolchain/toolchain/src/util/mod.rs @@ -1,3 +1,4 @@ +pub mod actor; pub mod api; pub mod build; pub mod cmd;