From 621394879eaeadac9d9b39a3d328baeeff3f315f Mon Sep 17 00:00:00 2001 From: Alexander Lyon Date: Fri, 3 May 2024 15:26:21 +0100 Subject: [PATCH] add support for upload speed / remaining in the cache upload step --- Cargo.lock | 85 +++++++--- crates/turborepo-api-client/Cargo.toml | 5 +- crates/turborepo-api-client/src/error.rs | 2 + crates/turborepo-api-client/src/lib.rs | 13 +- crates/turborepo-auth/src/auth/login.rs | 6 +- crates/turborepo-auth/src/auth/sso.rs | 6 +- crates/turborepo-auth/src/lib.rs | 6 +- crates/turborepo-cache/Cargo.toml | 4 + crates/turborepo-cache/src/async_cache.rs | 53 ++++-- crates/turborepo-cache/src/http.rs | 36 +++- crates/turborepo-cache/src/lib.rs | 1 + crates/turborepo-cache/src/multiplexer.rs | 15 +- crates/turborepo-cache/src/upload_progress.rs | 160 ++++++++++++++++++ crates/turborepo-lib/Cargo.toml | 1 + crates/turborepo-lib/src/config.rs | 5 +- crates/turborepo-lib/src/run/cache.rs | 15 +- crates/turborepo-lib/src/run/mod.rs | 65 ++++++- 17 files changed, 418 insertions(+), 60 deletions(-) create mode 100644 crates/turborepo-cache/src/upload_progress.rs diff --git a/Cargo.lock b/Cargo.lock index 905bfbec2558d..df1515a29cb89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2166,24 +2166,24 @@ checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" [[package]] name = "curl" -version = "0.4.44" +version = "0.4.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "509bd11746c7ac09ebd19f0b17782eae80aadee26237658a6b4808afb5c11a22" +checksum = "1e2161dd6eba090ff1594084e95fd67aeccf04382ffea77999ea94ed42ec67b6" dependencies = [ "curl-sys", "libc", "openssl-probe", "openssl-sys", "schannel", - "socket2 0.4.9", - "winapi", + "socket2 0.5.7", + "windows-sys 0.52.0", ] [[package]] name = "curl-sys" -version = "0.4.60+curl-7.88.1" +version = "0.4.72+curl-8.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "717abe2cb465a5da6ce06617388a3980c9a2844196734bec8ccb8e575250f13f" +checksum = "29cbdc8314c447d11e8fd156dcdd031d9e02a7a976163e396b548c03153bc9ea" dependencies = [ "cc", "libc", @@ -2192,7 +2192,7 @@ dependencies = [ "openssl-sys", "pkg-config", "vcpkg", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -3471,6 +3471,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "human_format" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3b1f728c459d27b12448862017b96ad4767b1ec2ec5e6434e99f1577f085b8" + [[package]] name = "humantime" version = "2.1.0" @@ -3494,7 +3500,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -4220,9 +4226,9 @@ dependencies = [ [[package]] name = "libnghttp2-sys" -version = "0.1.7+1.45.0" +version = "0.1.10+1.61.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ed28aba195b38d5ff02b9170cbff627e336a20925e43b4945390401c5dc93f" +checksum = "959c25552127d2e1fa72f0e52548ec04fc386e827ba71a7bd01db46a447dc135" dependencies = [ "cc", "libc", @@ -4657,9 +4663,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "log", @@ -5516,18 +5522,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -6402,10 +6408,12 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.22.6", "winreg", @@ -7231,9 +7239,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "similar" -version = "2.2.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" dependencies = [ "bstr 0.2.17", "unicode-segmentation", @@ -7309,12 +7317,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -9468,9 +9476,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -9480,7 +9488,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -9498,9 +9506,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -9551,9 +9559,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite", @@ -10937,6 +10945,7 @@ name = "turborepo-api-client" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "chrono", "http 0.2.11", "httpmock", @@ -10950,6 +10959,8 @@ dependencies = [ "test-case", "thiserror", "tokio", + "tokio-stream", + "tokio-util", "tracing", "turbopath", "turborepo-ci", @@ -10993,6 +11004,7 @@ version = "0.1.0" dependencies = [ "anyhow", "base64 0.21.4", + "bytes", "camino", "futures", "hmac", @@ -11000,6 +11012,7 @@ dependencies = [ "os_str_bytes", "path-clean 1.0.1", "petgraph", + "pin-project", "port_scanner", "reqwest", "serde", @@ -11010,6 +11023,8 @@ dependencies = [ "test-case", "thiserror", "tokio", + "tokio-stream", + "tokio-util", "tracing", "turbopath", "turborepo-analytics", @@ -11145,6 +11160,7 @@ dependencies = [ "globwatch", "go-parse-duration", "hex", + "human_format", "humantime", "ignore", "itertools 0.10.5", @@ -12054,6 +12070,19 @@ dependencies = [ "leb128", ] +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmer" version = "4.2.5" diff --git a/crates/turborepo-api-client/Cargo.toml b/crates/turborepo-api-client/Cargo.toml index c900b52ae2411..306ecb9656ede 100644 --- a/crates/turborepo-api-client/Cargo.toml +++ b/crates/turborepo-api-client/Cargo.toml @@ -21,15 +21,18 @@ workspace = true [dependencies] anyhow = { workspace = true } +bytes.workspace = true chrono = { workspace = true, features = ["serde"] } lazy_static = { workspace = true } regex = { workspace = true } -reqwest = { workspace = true, features = ["json"] } +reqwest = { workspace = true, features = ["json", "stream"] } rustc_version_runtime = "0.2.1" serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1.15" +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = { workspace = true } turbopath = { workspace = true } turborepo-ci = { workspace = true } diff --git a/crates/turborepo-api-client/src/error.rs b/crates/turborepo-api-client/src/error.rs index 5bd378db7b938..422d8d5762039 100644 --- a/crates/turborepo-api-client/src/error.rs +++ b/crates/turborepo-api-client/src/error.rs @@ -7,6 +7,8 @@ use crate::CachingStatus; #[derive(Debug, Error)] pub enum Error { + #[error("Error reading from disk: {0}")] + ReadError(#[from] std::io::Error), #[error("Error making HTTP request: {0}")] ReqwestError(#[from] reqwest::Error), #[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")] diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 1a659ec471c0d..e541fca9e7ed1 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -8,7 +8,7 @@ use std::{backtrace::Backtrace, env, future::Future, time::Duration}; use lazy_static::lazy_static; use regex::Regex; pub use reqwest::Response; -use reqwest::{Method, RequestBuilder, StatusCode}; +use reqwest::{Body, Method, RequestBuilder, StatusCode}; use serde::Deserialize; use turborepo_ci::{is_ci, Vendor}; use turborepo_vercel_api::{ @@ -26,6 +26,9 @@ mod retry; pub mod spaces; pub mod telemetry; +pub use bytes::Bytes; +pub use tokio_stream::Stream; + lazy_static! { static ref AUTHORIZATION_REGEX: Regex = Regex::new(r"(?i)(?:^|,) *authorization *(?:,|$)").unwrap(); @@ -74,7 +77,7 @@ pub trait CacheClient { fn put_artifact( &self, hash: &str, - artifact_body: &[u8], + artifact_body: impl tokio_stream::Stream> + Send + Sync + 'static, duration: u64, tag: Option<&str>, token: &str, @@ -358,7 +361,7 @@ impl CacheClient for APIClient { async fn put_artifact( &self, hash: &str, - artifact_body: &[u8], + artifact_body: impl tokio_stream::Stream> + Send + Sync + 'static, duration: u64, tag: Option<&str>, token: &str, @@ -382,13 +385,15 @@ impl CacheClient for APIClient { request_url = preflight_response.location.clone(); } + let stream = Body::wrap_stream(artifact_body); + let mut request_builder = self .cache_client .put(request_url) .header("Content-Type", "application/octet-stream") .header("x-artifact-duration", duration.to_string()) .header("User-Agent", self.user_agent.clone()) - .body(artifact_body.to_vec()); + .body(stream); if allow_auth { request_builder = request_builder.header("Authorization", format!("Bearer {}", token)); diff --git a/crates/turborepo-auth/src/auth/login.rs b/crates/turborepo-auth/src/auth/login.rs index b27cfcb534e12..8ba766cdd6f10 100644 --- a/crates/turborepo-auth/src/auth/login.rs +++ b/crates/turborepo-auth/src/auth/login.rs @@ -313,7 +313,11 @@ mod tests { async fn put_artifact( &self, _hash: &str, - _artifact_body: &[u8], + _artifact_body: impl turborepo_api_client::Stream< + Item = Result, + > + Send + + Sync + + 'static, _duration: u64, _tag: Option<&str>, _token: &str, diff --git a/crates/turborepo-auth/src/auth/sso.rs b/crates/turborepo-auth/src/auth/sso.rs index 7f626fe7fc9e8..ee9d707d04cd4 100644 --- a/crates/turborepo-auth/src/auth/sso.rs +++ b/crates/turborepo-auth/src/auth/sso.rs @@ -310,7 +310,11 @@ mod tests { async fn put_artifact( &self, _hash: &str, - _artifact_body: &[u8], + _artifact_body: impl turborepo_api_client::Stream< + Item = Result, + > + Send + + Sync + + 'static, _duration: u64, _tag: Option<&str>, _token: &str, diff --git a/crates/turborepo-auth/src/lib.rs b/crates/turborepo-auth/src/lib.rs index 6a157e9ae16b4..1959e68ce2d3c 100644 --- a/crates/turborepo-auth/src/lib.rs +++ b/crates/turborepo-auth/src/lib.rs @@ -420,7 +420,11 @@ mod tests { async fn put_artifact( &self, _hash: &str, - _artifact_body: &[u8], + _artifact_body: impl turborepo_api_client::Stream< + Item = Result, + > + Send + + Sync + + 'static, _duration: u64, _tag: Option<&str>, _token: &str, diff --git a/crates/turborepo-cache/Cargo.toml b/crates/turborepo-cache/Cargo.toml index 39db432bcf79d..0a79d1bd10f11 100644 --- a/crates/turborepo-cache/Cargo.toml +++ b/crates/turborepo-cache/Cargo.toml @@ -24,12 +24,14 @@ workspace = true [dependencies] base64 = "0.21.0" +bytes.workspace = true camino = { workspace = true } futures = { workspace = true } hmac = "0.12.1" os_str_bytes = "6.5.0" path-clean = { workspace = true } petgraph = "0.6.3" +pin-project = "1.1.5" reqwest = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -37,6 +39,8 @@ sha2 = { workspace = true } tar = "0.4.38" thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1.15" +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = { workspace = true } turbopath = { workspace = true } turborepo-analytics = { workspace = true } diff --git a/crates/turborepo-cache/src/async_cache.rs b/crates/turborepo-cache/src/async_cache.rs index 94c32a0ad8da6..f117d21aebc37 100644 --- a/crates/turborepo-cache/src/async_cache.rs +++ b/crates/turborepo-cache/src/async_cache.rs @@ -1,13 +1,15 @@ -use std::sync::{atomic::AtomicU8, Arc}; +use std::sync::{atomic::AtomicU8, Arc, Mutex}; use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{mpsc, oneshot, Semaphore}; use tracing::{warn, Instrument, Level}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{APIAuth, APIClient}; -use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts}; +use crate::{ + http::UploadMap, multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts, +}; const WARNING_CUTOFF: u8 = 4; @@ -24,8 +26,11 @@ enum WorkerRequest { duration: u64, files: Vec, }, - Flush(tokio::sync::oneshot::Sender<()>), - Shutdown(tokio::sync::oneshot::Sender<()>), + Flush(oneshot::Sender<()>), + /// Shutdown the cache. The first oneshot notifies when shutdown starts and + /// allows the user to inspect the status of the uploads. The second + /// oneshot notifies when the shutdown is complete. + Shutdown(oneshot::Sender>>, oneshot::Sender<()>), } impl AsyncCache { @@ -95,8 +100,8 @@ impl AsyncCache { } drop(callback); } - WorkerRequest::Shutdown(callback) => { - shutdown_callback = Some(callback); + WorkerRequest::Shutdown(closing, done) => { + shutdown_callback = Some((closing, done)); break; } }; @@ -104,10 +109,18 @@ impl AsyncCache { // Drop write consumer to immediately notify callers that cache is shutting down drop(write_consumer); + let shutdown_callback = if let Some((closing, done)) = shutdown_callback { + closing.send(real_cache.requests().unwrap_or_default()).ok(); + Some(done) + } else { + None + }; + // wait for all writers to finish while let Some(worker) = workers.next().await { let _ = worker; } + if let Some(callback) = shutdown_callback { callback.send(()).ok(); } @@ -162,7 +175,7 @@ impl AsyncCache { // before checking the cache. #[tracing::instrument(skip_all)] pub async fn wait(&self) -> Result<(), CacheError> { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.writer_sender .send(WorkerRequest::Flush(tx)) .await @@ -172,14 +185,30 @@ impl AsyncCache { Ok(()) } + /// Shut down the cache, waiting for all workers to finish writing. + /// This function returns as soon as the shut down has started, + /// returning a channel through which workers can report on their + /// progress. #[tracing::instrument(skip_all)] - pub async fn shutdown(&self) -> Result<(), CacheError> { - let (tx, rx) = tokio::sync::oneshot::channel(); + pub async fn start_shutdown( + &self, + ) -> Result<(Arc>, oneshot::Receiver<()>), CacheError> { + let (closing_tx, closing_rx) = oneshot::channel::>>(); + let (closed_tx, closed_rx) = oneshot::channel::<()>(); self.writer_sender - .send(WorkerRequest::Shutdown(tx)) + .send(WorkerRequest::Shutdown(closing_tx, closed_tx)) .await .map_err(|_| CacheError::CacheShuttingDown)?; - rx.await.ok(); + Ok((closing_rx.await.unwrap(), closed_rx)) // todo + } + + /// Shut down the cache, waiting for all workers to finish writing. + /// This function returns only when the last worker is complete. + /// It is a convenience wrapper around `start_shutdown`. + #[tracing::instrument(skip_all)] + pub async fn shutdown(&self) -> Result<(), CacheError> { + let (_, closed_rx) = self.start_shutdown().await?; + closed_rx.await.ok(); Ok(()) } } diff --git a/crates/turborepo-cache/src/http.rs b/crates/turborepo-cache/src/http.rs index 510fa1e8ee795..fb208567f8fb7 100644 --- a/crates/turborepo-cache/src/http.rs +++ b/crates/turborepo-cache/src/http.rs @@ -1,5 +1,11 @@ -use std::{backtrace::Backtrace, io::Write}; +use std::{ + backtrace::Backtrace, + collections::HashMap, + io::{Cursor, Write}, + sync::{Arc, Mutex}, +}; +use tokio_stream::StreamExt; use tracing::debug; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; @@ -11,15 +17,19 @@ use turborepo_api_client::{ use crate::{ cache_archive::{CacheReader, CacheWriter}, signature_authentication::ArtifactSignatureAuthenticator, + upload_progress::{UploadProgress, UploadProgressQuery}, CacheError, CacheHitMetadata, CacheOpts, CacheSource, }; +pub type UploadMap = HashMap>; + pub struct HTTPCache { client: APIClient, signer_verifier: Option, repo_root: AbsoluteSystemPathBuf, api_auth: APIAuth, analytics_recorder: Option, + uploads: Arc>, } impl HTTPCache { @@ -53,6 +63,7 @@ impl HTTPCache { client, signer_verifier, repo_root, + uploads: Arc::new(Mutex::new(HashMap::new())), api_auth, analytics_recorder, } @@ -68,6 +79,7 @@ impl HTTPCache { ) -> Result<(), CacheError> { let mut artifact_body = Vec::new(); self.write(&mut artifact_body, anchor, files).await?; + let bytes = artifact_body.len(); let tag = self .signer_verifier @@ -75,13 +87,29 @@ impl HTTPCache { .map(|signer| signer.generate_tag(hash.as_bytes(), &artifact_body)) .transpose()?; + let stream = tokio_util::codec::FramedRead::new( + Cursor::new(artifact_body), + tokio_util::codec::BytesCodec::new(), + ) + .map(|res| { + res.map(|bytes| bytes.freeze()) + .map_err(turborepo_api_client::Error::from) + }); + + let (progress, query) = UploadProgress::<10, 100, _>::new(stream, Some(bytes)); + + { + let mut uploads = self.uploads.lock().unwrap(); + uploads.insert(hash.to_string(), query); + } + tracing::debug!("uploading {}", hash); match self .client .put_artifact( hash, - &artifact_body, + progress, duration, tag.as_deref(), &self.api_auth.token, @@ -237,6 +265,10 @@ impl HTTPCache { ))) } + pub fn requests(&self) -> Arc> { + self.uploads.clone() + } + #[tracing::instrument(skip_all)] pub(crate) fn restore_tar( root: &AbsoluteSystemPath, diff --git a/crates/turborepo-cache/src/lib.rs b/crates/turborepo-cache/src/lib.rs index 0f9fbd00de00d..efcafe5124f9b 100644 --- a/crates/turborepo-cache/src/lib.rs +++ b/crates/turborepo-cache/src/lib.rs @@ -19,6 +19,7 @@ mod multiplexer; pub mod signature_authentication; #[cfg(test)] mod test_cases; +mod upload_progress; use std::{backtrace, backtrace::Backtrace}; diff --git a/crates/turborepo-cache/src/multiplexer.rs b/crates/turborepo-cache/src/multiplexer.rs index 33a8bccb359e2..4a02954a5b437 100644 --- a/crates/turborepo-cache/src/multiplexer.rs +++ b/crates/turborepo-cache/src/multiplexer.rs @@ -1,11 +1,18 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use tracing::{debug, warn}; use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{APIAuth, APIClient}; -use crate::{fs::FSCache, http::HTTPCache, CacheError, CacheHitMetadata, CacheOpts}; +use crate::{ + fs::FSCache, + http::{HTTPCache, UploadMap}, + CacheError, CacheHitMetadata, CacheOpts, +}; pub struct CacheMultiplexer { // We use an `AtomicBool` instead of removing the cache because that would require @@ -82,6 +89,10 @@ impl CacheMultiplexer { } } + pub fn requests(&self) -> Option>> { + self.http.as_ref().map(|http| http.requests()) + } + #[tracing::instrument(skip_all)] pub async fn put( &self, diff --git a/crates/turborepo-cache/src/upload_progress.rs b/crates/turborepo-cache/src/upload_progress.rs new file mode 100644 index 0000000000000..8d8979698ad37 --- /dev/null +++ b/crates/turborepo-cache/src/upload_progress.rs @@ -0,0 +1,160 @@ +use std::{ + pin::Pin, + sync::{Arc, Mutex, Weak}, + task::{Context, Poll}, + time::Instant, +}; + +use futures::Stream; +use pin_project::pin_project; + +type State = Mutex<(usize, [(usize, usize); BUCKETS])>; + +/// Consists of a total file upload time and a ring buffer of bytes sent per +/// second over some time interval. +#[pin_project] +pub struct UploadProgress { + /// A pair of bucket generation and bytes uploaded in that bucket. + /// + /// We need to store the generation to ensure that we don't accidentally + /// read from an expired bucket if there is a gap in writing. + state: Arc>, + start: Instant, + #[pin] + inner: S, +} + +impl UploadProgress { + /// Create a new `UploadProgress` with the given stream and interval. + pub fn new(inner: S, size: Option) -> (Self, UploadProgressQuery) { + let state = Arc::new(Mutex::new((0, [(0, 0); BUCKETS]))); + let now = Instant::now(); + let query = UploadProgressQuery::new(now, Arc::downgrade(&state), size); + + ( + Self { + state, + start: now, + inner, + }, + query, + ) + } +} + +impl Stream + for UploadProgress +where + S::Item: ProgressLen, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.as_mut().project(); + match this.inner.poll_next(cx) { + Poll::Ready(Some(item)) => { + // same as `curr_gen_index` but we can't borrow `self` twice + let (curr_gen, index) = { + // usize fits 570 million years of milliseconds since start on 64 bit + let gen = (this.start.elapsed().as_millis() as usize) / INTERVAL; + (gen, gen % BUCKETS) + }; + let mut state = this.state.lock().unwrap(); + let (gen, value) = &mut state.1[index]; + if *gen != curr_gen { + *gen = curr_gen; + *value = item.len(); + } else { + *value += item.len(); + } + + state.0 += item.len(); + + Poll::Ready(Some(item)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +trait ProgressLen { + fn len(&self) -> usize; +} + +impl ProgressLen for bytes::Bytes { + fn len(&self) -> usize { + self.len() + } +} + +impl ProgressLen for Result { + fn len(&self) -> usize { + match self { + Ok(t) => t.len(), + Err(_) => 0, + } + } +} + +#[derive(Clone)] +pub struct UploadProgressQuery { + start: Instant, + state: Weak>, + size: Option, +} + +impl UploadProgressQuery { + fn new(start: Instant, state: Weak>, size: Option) -> Self { + Self { start, state, size } + } + + // Note: this usize is since the upload started so, on 64 bit systems, it + // should be good for 584.5 million years. Downcasting is probably safe... + fn curr_gen(&self) -> usize { + let since = self.start.elapsed().as_millis() as usize; + since / self.interval_ms() + } + + pub const fn interval_ms(&self) -> usize { + INTERVAL + } + + /// Get the total number of bytes uploaded. + /// + /// Returns `None` if the `UploadProgress` has been dropped. + pub fn bytes(&self) -> Option { + self.state.upgrade().map(|s| s.lock().unwrap().0) + } + + pub fn size(&self) -> Option { + self.size + } + + pub fn done(&self) -> bool { + self.state.strong_count() == 0 + } + + /// Get the average bytes per second over the last `SIZE` intervals. + /// + /// Returns `None` if the `UploadProgress` has been dropped. + pub fn average_bps(&self) -> Option { + let curr_gen = self.curr_gen(); + let min_gen = curr_gen.saturating_sub(BUCKETS); + self.state.upgrade().map(|s| { + let s = s.lock().unwrap(); + let total_bytes = + s.1.iter() + .filter(|(gen, _)| *gen >= min_gen) + .map(|(_, bytes)| *bytes) + .sum::(); + + // buckets * interval = milliseconds, so we multiply by 1000 to get seconds + (total_bytes as f64 / (BUCKETS * INTERVAL) as f64) * 1000.0 + }) + } +} diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 041a1fd31e9a5..27eb0ca243c0b 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -61,6 +61,7 @@ globwalk = { version = "0.1.0", path = "../turborepo-globwalk" } globwatch = { path = "../turborepo-globwatch" } go-parse-duration = "0.1.1" hex = "0.4.3" +human_format = "1.1.0" humantime = "2.1.0" ignore = "0.4.22" itertools = { workspace = true } diff --git a/crates/turborepo-lib/src/config.rs b/crates/turborepo-lib/src/config.rs index cffd2428577c8..1ab014ac18639 100644 --- a/crates/turborepo-lib/src/config.rs +++ b/crates/turborepo-lib/src/config.rs @@ -322,7 +322,10 @@ fn get_env_var_config( turbo_mapping.insert(OsString::from("turbo_teamid"), "team_id"); turbo_mapping.insert(OsString::from("turbo_token"), "token"); turbo_mapping.insert(OsString::from("turbo_remote_cache_timeout"), "timeout"); - turbo_mapping.insert(OsString::from("turbo_api_timeout"), "api_timeout"); + turbo_mapping.insert( + OsString::from("turbo_remote_cache_upload_timeout"), + "upload_timeout", + ); turbo_mapping.insert(OsString::from("turbo_experimental_ui"), "experimental_ui"); turbo_mapping.insert(OsString::from("turbo_preflight"), "preflight"); diff --git a/crates/turborepo-lib/src/run/cache.rs b/crates/turborepo-lib/src/run/cache.rs index d4e50b9074783..4f49e9da3789d 100644 --- a/crates/turborepo-lib/src/run/cache.rs +++ b/crates/turborepo-lib/src/run/cache.rs @@ -1,10 +1,15 @@ -use std::{io::Write, sync::Arc, time::Duration}; +use std::{ + io::Write, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::oneshot; use tracing::{debug, error}; use turbopath::{ AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf, }; -use turborepo_cache::{AsyncCache, CacheError, CacheHitMetadata, CacheSource}; +use turborepo_cache::{http::UploadMap, AsyncCache, CacheError, CacheHitMetadata, CacheSource}; use turborepo_repository::package_graph::PackageInfo; use turborepo_scm::SCM; use turborepo_telemetry::events::{task::PackageTaskEventBuilder, TrackedErrors}; @@ -120,9 +125,11 @@ impl RunCache { } } - pub async fn shutdown_cache(&self) { + pub async fn shutdown_cache( + &self, + ) -> Result<(Arc>, oneshot::Receiver<()>), CacheError> { // Ignore errors coming from cache already shutting down - self.cache.shutdown().await.ok(); + self.cache.start_shutdown().await } } diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 2e73d7a48cffd..a2a291b4e40cc 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -12,12 +12,12 @@ pub mod task_access; pub mod task_id; pub mod watch; -use std::{collections::HashSet, io::Write, sync::Arc}; +use std::{collections::HashSet, io::Write, sync::Arc, time::Duration}; pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache}; use chrono::{DateTime, Local}; use rayon::iter::ParallelBridge; -use tokio::task::JoinHandle; +use tokio::{select, task::JoinHandle}; use tracing::debug; use turbopath::AbsoluteSystemPathBuf; use turborepo_api_client::{APIAuth, APIClient}; @@ -161,7 +161,66 @@ impl Run { tokio::spawn(async move { let _guard = subscriber.listen().await; let spinner = turborepo_ui::start_spinner("...Finishing writing to cache..."); - run_cache.shutdown_cache().await; + if let Ok((status, closed)) = run_cache.shutdown_cache().await { + let fut = async { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // loop through hashmap, extract items that are still running, + // sum up bit per second + let (bytes_per_second, bytes_uploaded, bytes_total) = { + let status = status.lock().unwrap(); + let total_bps: f64 = status + .iter() + .filter_map(|(_hash, task)| task.average_bps()) + .sum(); + let bytes_uploaded: usize = + status.iter().filter_map(|(_hash, task)| task.bytes()).sum(); + let bytes_total: usize = status + .iter() + .filter(|(_hash, task)| !task.done()) + .filter_map(|(_hash, task)| task.size()) + .sum(); + (total_bps, bytes_uploaded, bytes_total) + }; + + if bytes_total == 0 { + continue; + } + + // convert to human readable + let mut formatter = human_format::Formatter::new(); + let formatter = formatter.with_decimals(2).with_separator(""); + let bytes_per_second = + formatter.with_units("B/s").format(bytes_per_second); + let bytes_remaining = formatter + .with_units("B") + .format(bytes_total.saturating_sub(bytes_uploaded) as f64); + + spinner.set_message(format!( + "...Finishing writing to cache... ({} remaining, {})", + bytes_remaining, bytes_per_second + )); + } + }; + + let interrupt = async { + if let Ok(fut) = crate::commands::run::get_signal() { + fut.await; + } else { + tracing::warn!("could not register ctrl-c handler"); + // wait forever + tokio::time::sleep(Duration::MAX).await; + } + }; + + select! { + _ = closed => {} + _ = fut => {} + _ = interrupt => {tracing::debug!("received interrupt, exiting");} + } + } else { + tracing::warn!("could not start shutdown, exiting"); + } spinner.finish_and_clear(); }); }