diff --git a/Cargo.lock b/Cargo.lock index debcfa991b0e4..15e76943541f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,6 +1476,25 @@ dependencies = [ "crossbeam-utils 0.8.16", ] +[[package]] +name = "config" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23738e11972c7643e4ec947840fc463b6a571afcd3e735bdfce7d03c7a784aca" +dependencies = [ + "async-trait", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml 0.5.11", + "yaml-rust", +] + [[package]] name = "console" version = "0.15.7" @@ -2387,6 +2406,12 @@ dependencies = [ "libloading 0.7.4", ] +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + [[package]] name = "doc-comment" version = "0.3.3" @@ -3911,6 +3936,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "json_comments" version = "0.2.1" @@ -5198,6 +5234,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + [[package]] name = "os_info" version = "3.7.0" @@ -6535,6 +6581,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "ron" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" +dependencies = [ + "base64 0.13.1", + "bitflags 1.3.2", + "serde", +] + [[package]] name = "rstest" version = "0.16.0" @@ -6573,6 +6630,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if 1.0.0", + "ordered-multimap", +] + [[package]] name = "rust_hawktracer" version = "0.7.0" @@ -6635,6 +6702,16 @@ dependencies = [ "semver 0.9.0", ] +[[package]] +name = "rustc_version_runtime" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d" +dependencies = [ + "rustc_version 0.4.0", + "semver 1.0.18", +] + [[package]] name = "rustix" version = "0.37.23" @@ -7032,9 +7109,9 @@ checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "sha2" -version = "0.10.6" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if 1.0.0", "cpufeatures", @@ -10893,7 +10970,7 @@ dependencies = [ "port_scanner", "regex", "reqwest", - "rustc_version_runtime", + "rustc_version_runtime 0.2.1", "serde", "serde_json", "test-case", @@ -11110,7 +11187,7 @@ dependencies = [ "rayon", "regex", "reqwest", - "rustc_version_runtime", + "rustc_version_runtime 0.2.1", "semver 1.0.18", "serde", "serde_json", @@ -11152,6 +11229,7 @@ dependencies = [ "turborepo-lockfiles", "turborepo-repository", "turborepo-scm", + "turborepo-telemetry", "turborepo-ui", "turborepo-vercel-api", "turborepo-vercel-api-mock", @@ -11245,6 +11323,34 @@ dependencies = [ "which", ] +[[package]] +name = "turborepo-telemetry" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "config", + "dirs-next", + "futures 0.3.28", + "hex", + "once_cell", + "reqwest", + "rustc_version_runtime 0.3.0", + "serde", + "serde_json", + "sha2", + "test-case", + "thiserror", + "tokio", + "tracing", + "turborepo-api-client", + "turborepo-ui", + "turborepo-vercel-api", + "turborepo-vercel-api-mock", + "url 2.4.1", + "uuid", +] + [[package]] name = "turborepo-ui" version = "0.1.0" diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 6d1d11d45eb39..eb2e9d6ca071b 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -23,6 +23,7 @@ pub mod analytics; mod error; mod retry; pub mod spaces; +pub mod telemetry; lazy_static! { static ref AUTHORIZATION_REGEX: Regex = @@ -452,13 +453,7 @@ impl APIClient { let client = client_build.map_err(Error::TlsError)?; - let user_agent = format!( - "turbo {} {} {} {}", - version, - rustc_version_runtime::version(), - env::consts::OS, - env::consts::ARCH - ); + let user_agent = build_user_agent(version); Ok(APIClient { client, base_url: base_url.as_ref().to_string(), @@ -536,6 +531,49 @@ impl APIAuth { } } +// Anon Client +#[derive(Clone)] +pub struct AnonAPIClient { + client: reqwest::Client, + base_url: String, + user_agent: String, +} + +impl AnonAPIClient { + fn make_url(&self, endpoint: &str) -> String { + format!("{}{}", self.base_url, endpoint) + } + + pub fn new(base_url: impl AsRef, timeout: u64, version: &str) -> Result { + let client_build = if timeout != 0 { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(timeout)) + .build() + } else { + reqwest::Client::builder().build() + }; + + let client = client_build.map_err(Error::TlsError)?; + + let user_agent = build_user_agent(version); + Ok(AnonAPIClient { + client, + base_url: base_url.as_ref().to_string(), + user_agent, + }) + } +} + +fn build_user_agent(version: &str) -> String { + format!( + "turbo {} {} {} {}", + version, + rustc_version_runtime::version(), + env::consts::OS, + env::consts::ARCH + ) +} + #[cfg(test)] mod test { use anyhow::Result; diff --git a/crates/turborepo-api-client/src/telemetry.rs b/crates/turborepo-api-client/src/telemetry.rs new file mode 100644 index 0000000000000..06de7ab1c1ea6 --- /dev/null +++ b/crates/turborepo-api-client/src/telemetry.rs @@ -0,0 +1,43 @@ +use async_trait::async_trait; +use reqwest::Method; +use turborepo_vercel_api::TelemetryEvent; + +use crate::{retry, AnonAPIClient, Error}; + +const TELEMETRY_ENDPOINT: &str = "/api/turborepo/v1/events"; + +#[async_trait] +pub trait TelemetryClient { + async fn record_telemetry( + &self, + events: Vec, + telemetry_id: &str, + session_id: &str, + ) -> Result<(), Error>; +} + +#[async_trait] +impl TelemetryClient for AnonAPIClient { + async fn record_telemetry( + &self, + events: Vec, + telemetry_id: &str, + session_id: &str, + ) -> Result<(), Error> { + let url = self.make_url(TELEMETRY_ENDPOINT); + let telemetry_request = self + .client + .request(Method::POST, url) + .header("User-Agent", self.user_agent.clone()) + .header("Content-Type", "application/json") + .header("x-turbo-telemetry-id", telemetry_id) + .header("x-turbo-session-id", session_id) + .json(&events); + + retry::make_retryable_request(telemetry_request) + .await? + .error_for_status()?; + + Ok(()) + } +} diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 35b3969d1f855..fb6eaac94ddd9 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -125,6 +125,7 @@ turborepo-env = { workspace = true } turborepo-filewatch = { path = "../turborepo-filewatch" } turborepo-lockfiles = { workspace = true } turborepo-scm = { workspace = true } +turborepo-telemetry = { path = "../turborepo-telemetry" } turborepo-ui = { workspace = true } twox-hash = "1.6.3" wax = { workspace = true } diff --git a/crates/turborepo-lib/src/cli/mod.rs b/crates/turborepo-lib/src/cli/mod.rs index 07f6acd13ae92..3cadba8de507b 100644 --- a/crates/turborepo-lib/src/cli/mod.rs +++ b/crates/turborepo-lib/src/cli/mod.rs @@ -10,11 +10,21 @@ pub use error::Error; use serde::{Deserialize, Serialize}; use tracing::{debug, error}; use turbopath::AbsoluteSystemPathBuf; +use turborepo_api_client::AnonAPIClient; use turborepo_repository::inference::{RepoMode, RepoState}; +use turborepo_telemetry::{ + events::{ + command::{CodePath, CommandEventBuilder}, + PubEventBuilder, + }, + init_telemetry, TelemetryHandle, +}; use turborepo_ui::UI; use crate::{ - commands::{bin, daemon, generate, info, link, login, logout, prune, unlink, CommandBase}, + commands::{ + bin, daemon, generate, info, link, login, logout, prune, telemetry, unlink, CommandBase, + }, get_version, tracing::TurboSubscriber, Payload, @@ -200,6 +210,17 @@ pub enum DaemonCommand { Clean, } +#[derive(Subcommand, Copy, Clone, Debug, Serialize, PartialEq)] +#[serde(tag = "command")] +pub enum TelemetryCommand { + /// Enables anonymous telemetry + Enable, + /// Disables anonymous telemetry + Disable, + /// Reports the status of telemetry + Status, +} + #[derive(Copy, Clone, Debug, PartialEq, Serialize, ValueEnum)] pub enum LinkTarget { RemoteCache, @@ -346,6 +367,14 @@ pub enum Command { #[serde(skip)] command: Option>, }, + // TODO:[telemetry] Unhide this in `1.12` + /// Enable or disable anonymous telemetry + #[clap(hide = true)] + Telemetry { + #[clap(subcommand)] + #[serde(flatten)] + command: Option, + }, #[clap(hide = true)] Info { workspace: Option, @@ -686,6 +715,29 @@ pub async fn run( ui: UI, ) -> Result { let mut cli_args = Args::new(); + let version = get_version(); + + // track telemetry handle to close at the end of the run + let mut telemetry_handle: Option = None; + // TODO:[telemetry] Remove this check in `1.12` + if turborepo_telemetry::config::is_telemetry_internal_test() { + // initialize telemetry + match AnonAPIClient::new("https://telemetry.vercel.com", 250, version) { + Ok(anonymous_api_client) => { + let handle = init_telemetry(anonymous_api_client, ui); + match handle { + Ok(h) => telemetry_handle = Some(h), + Err(error) => { + debug!("failed to start telemetry: {:?}", error) + } + } + } + Err(error) => { + debug!("Failed to create AnonAPIClient: {:?}", error); + } + } + } + // If there is no command, we set the command to `Command::Run` with // `self.parsed_args.run_args` as arguments. let mut command = if let Some(command) = mem::take(&mut cli_args.command) { @@ -750,19 +802,19 @@ pub async fn run( AbsoluteSystemPathBuf::cwd()? }; - let version = get_version(); - cli_args.command = Some(command); cli_args.cwd = Some(repo_root.as_path().to_owned()); - match cli_args.command.as_ref().unwrap() { + let cli_result = match cli_args.command.as_ref().unwrap() { Command::Bin { .. } => { + CommandEventBuilder::new("bin").track_call(); bin::run()?; Ok(Payload::Rust(Ok(0))) } #[allow(unused_variables)] Command::Daemon { command, idle_time } => { + CommandEventBuilder::new("daemon").track_call(); let base = CommandBase::new(cli_args.clone(), repo_root, version, ui); match command { @@ -785,6 +837,8 @@ pub async fn run( args, command, } => { + let event = CommandEventBuilder::new("generate"); + event.track_call(); // build GeneratorCustomArgs struct let args = GeneratorCustomArgs { generator_name: generator_name.clone(), @@ -792,11 +846,20 @@ pub async fn run( root: root.clone(), args: args.clone(), }; - - generate::run(tag, command, &args)?; + let child_event = event.child(); + generate::run(tag, command, &args, child_event)?; + Ok(Payload::Rust(Ok(0))) + } + Command::Telemetry { command } => { + let event = CommandEventBuilder::new("telemetry"); + event.track_call(); + let mut base = CommandBase::new(cli_args.clone(), repo_root, version, ui); + let child_event = event.child(); + telemetry::configure(command, &mut base, child_event); Ok(Payload::Rust(Ok(0))) } Command::Info { workspace, json } => { + CommandEventBuilder::new("info").track_call(); let json = *json; let workspace = workspace.clone(); let mut base = CommandBase::new(cli_args, repo_root, version, ui); @@ -808,6 +871,7 @@ pub async fn run( no_gitignore, target, } => { + CommandEventBuilder::new("link").track_call(); if cli_args.test_run { println!("Link test run successful"); return Ok(Payload::Rust(Ok(0))); @@ -824,12 +888,14 @@ pub async fn run( Ok(Payload::Rust(Ok(0))) } Command::Logout { .. } => { + CommandEventBuilder::new("logout").track_call(); let mut base = CommandBase::new(cli_args, repo_root, version, ui); logout::logout(&mut base)?; Ok(Payload::Rust(Ok(0))) } Command::Login { sso_team } => { + CommandEventBuilder::new("login").track_call(); if cli_args.test_run { println!("Login test run successful"); return Ok(Payload::Rust(Ok(0))); @@ -848,6 +914,7 @@ pub async fn run( Ok(Payload::Rust(Ok(0))) } Command::Unlink { target } => { + CommandEventBuilder::new("unlink").track_call(); if cli_args.test_run { println!("Unlink test run successful"); return Ok(Payload::Rust(Ok(0))); @@ -861,6 +928,8 @@ pub async fn run( Ok(Payload::Rust(Ok(0))) } Command::Run(args) => { + let event = CommandEventBuilder::new("run"); + event.track_call(); // in the case of enabling the run stub, we want to be able to opt-in // to the rust codepath for running turbo if args.tasks.is_empty() { @@ -875,10 +944,18 @@ pub async fn run( let should_use_go = args.go_fallback || env::var("EXPERIMENTAL_RUST_CODEPATH").as_deref() == Ok("false"); + if should_use_go { + event.track_run_code_path(CodePath::Go); + // we have to clear the telemetry queue before we hand off to go + if telemetry_handle.is_some() { + let handle = telemetry_handle.take().unwrap(); + handle.close_with_timeout().await; + } Ok(Payload::Go(Box::new(base))) } else { use crate::commands::run; + event.track_run_code_path(CodePath::Rust); let exit_code = run::run(base).await?; Ok(Payload::Rust(Ok(exit_code))) } @@ -889,6 +966,7 @@ pub async fn run( docker, output_dir, } => { + CommandEventBuilder::new("prune").track_call(); let scope = scope_arg .as_ref() .or(scope.as_ref()) @@ -901,11 +979,18 @@ pub async fn run( Ok(Payload::Rust(Ok(0))) } Command::Completion { shell } => { + CommandEventBuilder::new("completion").track_call(); generate(*shell, &mut Args::command(), "turbo", &mut io::stdout()); - Ok(Payload::Rust(Ok(0))) } + }; + + match telemetry_handle { + Some(handle) => handle.close_with_timeout().await, + None => debug!("Skipping telemetry close - not initialized"), } + + cli_result } #[cfg(test)] diff --git a/crates/turborepo-lib/src/commands/generate.rs b/crates/turborepo-lib/src/commands/generate.rs index 42370442f3f6c..2269b87924cc2 100644 --- a/crates/turborepo-lib/src/commands/generate.rs +++ b/crates/turborepo-lib/src/commands/generate.rs @@ -5,6 +5,7 @@ use std::{ use thiserror::Error; use tracing::debug; +use turborepo_telemetry::events::command::CommandEventBuilder; use which::which; use crate::{ @@ -46,14 +47,18 @@ pub fn run( tag: &String, command: &Option>, args: &GeneratorCustomArgs, + telemetry: CommandEventBuilder, ) -> Result<(), Error> { + telemetry.track_generator_tag(tag); // check if a subcommand was passed if let Some(box GenerateCommand::Workspace(workspace_args)) = command { let raw_args = serde_json::to_string(&workspace_args)?; + telemetry.track_generator_option("workspace"); call_turbo_gen("workspace", tag, &raw_args)?; } else { // if no subcommand was passed, run the generate command as default let raw_args = serde_json::to_string(&args)?; + telemetry.track_generator_option("run"); call_turbo_gen("run", tag, &raw_args)?; } diff --git a/crates/turborepo-lib/src/commands/mod.rs b/crates/turborepo-lib/src/commands/mod.rs index 39281c099f08b..95034290a1d94 100644 --- a/crates/turborepo-lib/src/commands/mod.rs +++ b/crates/turborepo-lib/src/commands/mod.rs @@ -20,6 +20,7 @@ pub(crate) mod login; pub(crate) mod logout; pub(crate) mod prune; pub(crate) mod run; +pub(crate) mod telemetry; pub(crate) mod unlink; #[derive(Debug)] diff --git a/crates/turborepo-lib/src/commands/telemetry.rs b/crates/turborepo-lib/src/commands/telemetry.rs new file mode 100644 index 0000000000000..583697fabd7d9 --- /dev/null +++ b/crates/turborepo-lib/src/commands/telemetry.rs @@ -0,0 +1,77 @@ +use turborepo_telemetry::{config::TelemetryConfig, events::command::CommandEventBuilder}; +use turborepo_ui::{color, BOLD, BOLD_GREEN, BOLD_RED}; + +use super::CommandBase; +use crate::cli::TelemetryCommand; + +fn log_status(config: TelemetryConfig, base: &CommandBase) { + let status = config.is_enabled(); + match status { + true => { + println!( + "\nStatus: {}", + base.ui.apply(BOLD_GREEN.apply_to("Enabled")) + ); + println!("\nTurborepo telemetry is completely anonymous. Thank you for participating!"); + } + false => { + println!("\nStatus: {}", base.ui.apply(BOLD_RED.apply_to("Disabled"))); + println!( + "\nYou have opted-out of Turborepo anonymous telemetry. No data will be collected \ + from your machine." + ); + } + } + println!("Learn more: https://turbo.build/repo/docs/telemetry"); +} + +fn log_error(message: &str, error: &str, base: &CommandBase) { + println!( + "{}: {}", + color!(base.ui, BOLD_RED, "{}", message), + color!(base.ui, BOLD_RED, "{}", error) + ); +} + +pub fn configure( + command: &Option, + base: &mut CommandBase, + telemetry: CommandEventBuilder, +) { + let config = TelemetryConfig::new(); + let mut config = match config { + Ok(config) => config, + Err(e) => { + log_error("Failed to load telemetry config", &e.to_string(), base); + return; + } + }; + + match command { + Some(TelemetryCommand::Enable) => { + let result = config.enable(); + match result { + Ok(_) => { + println!("{}", color!(base.ui, BOLD, "{}", "Success!")); + log_status(config, base); + telemetry.track_telemetry_config(true); + } + Err(e) => log_error("Failed to enable telemetry", &e.to_string(), base), + } + } + Some(TelemetryCommand::Disable) => { + let result = config.disable(); + match result { + Ok(_) => { + println!("{}", color!(base.ui, BOLD, "{}", "Success!")); + log_status(config, base); + telemetry.track_telemetry_config(false); + } + Err(e) => log_error("Failed to disable telemetry", &e.to_string(), base), + } + } + _ => { + log_status(config, base); + } + } +} diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 7cee9ea24e204..afc088e025fbc 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -22,6 +22,7 @@ use turborepo_repository::{ package_graph::{PackageGraph, WorkspaceName, ROOT_PKG_NAME}, package_manager::PackageManager, }; +use turborepo_telemetry::events::{task::PackageTaskEventBuilder, PubEventBuilder}; use turborepo_ui::{ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI}; use which::which; @@ -155,6 +156,7 @@ impl<'a> Visitor<'a> { task_id: info.clone(), })?; + let package_task_event = PackageTaskEventBuilder::new(info.package(), info.task()); let command = workspace_info .package_json .scripts @@ -163,10 +165,11 @@ impl<'a> Visitor<'a> { match command { Some(cmd) if info.package() == ROOT_PKG_NAME && turbo_regex().is_match(&cmd) => { + package_task_event.track_recursive_error(); return Err(Error::RecursiveTurbo { task_name: info.to_string(), command: cmd.to_string(), - }) + }); } _ => (), } @@ -190,14 +193,17 @@ impl<'a> Visitor<'a> { let dependency_set = engine.dependencies(&info).ok_or(Error::MissingDefinition)?; + let package_task_event_child = package_task_event.child(); let task_hash = self.task_hasher.calculate_task_hash( &info, task_definition, task_env_mode, workspace_info, dependency_set, + package_task_event_child, )?; + package_task_event.track_hash(&task_hash.to_string()); debug!("task {} hash is {}", info, task_hash); // We do this calculation earlier than we do in Go due to the `task_hasher` // being !Send. In the future we can look at doing this right before diff --git a/crates/turborepo-lib/src/task_hash.rs b/crates/turborepo-lib/src/task_hash.rs index 5880009cd7981..75e3c49cd8ca7 100644 --- a/crates/turborepo-lib/src/task_hash.rs +++ b/crates/turborepo-lib/src/task_hash.rs @@ -12,6 +12,7 @@ use turborepo_cache::CacheHitMetadata; use turborepo_env::{BySource, DetailedMap, EnvironmentVariableMap, ResolvedEnvMode}; use turborepo_repository::package_graph::{WorkspaceInfo, WorkspaceName}; use turborepo_scm::SCM; +use turborepo_telemetry::events::task::PackageTaskEventBuilder; use crate::{ engine::TaskNode, @@ -205,6 +206,7 @@ impl<'a> TaskHasher<'a> { task_env_mode: ResolvedEnvMode, workspace: &WorkspaceInfo, dependency_set: HashSet<&TaskNode>, + telemetry: PackageTaskEventBuilder, ) -> Result { let do_framework_inference = self.opts.run_opts.framework_inference; let is_monorepo = !self.opts.run_opts.single_package; @@ -226,6 +228,7 @@ impl<'a> TaskHasher<'a> { framework.slug(), framework.env_wildcards() ); + telemetry.track_framework(framework.slug()); let mut computed_wildcards = framework .env_wildcards() .iter() diff --git a/crates/turborepo-telemetry/Cargo.toml b/crates/turborepo-telemetry/Cargo.toml new file mode 100644 index 0000000000000..0b465f68925cf --- /dev/null +++ b/crates/turborepo-telemetry/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "turborepo-telemetry" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lints] +workspace = true + +[dev-dependencies] +serde_json = { workspace = true } +test-case = { workspace = true } +turborepo-vercel-api-mock = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true, features = ["serde"] } +config = "0.13.4" +dirs-next = "2.0.0" +futures = { workspace = true } +hex = "0.4.3" +once_cell = "1.18.0" +reqwest = { workspace = true, features = ["json"] } +rustc_version_runtime = "0.3.0" +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +sha2 = "0.10.8" +thiserror = { workspace = true } +tokio = { workspace = true, features = ["full", "time"] } +tracing = { workspace = true } +turborepo-api-client = { workspace = true } +turborepo-ui = { workspace = true } +turborepo-vercel-api = { workspace = true } +url = { workspace = true } +uuid = { version = "1.5.0", features = ["v4"] } diff --git a/crates/turborepo-telemetry/README.md b/crates/turborepo-telemetry/README.md new file mode 100644 index 0000000000000..c53c0dbb1b383 --- /dev/null +++ b/crates/turborepo-telemetry/README.md @@ -0,0 +1,10 @@ +# turborepo-telemetry + +## Overview + +This crate provides a way to optionally record anonymous usage data. +This information is used to shape the Turborepo roadmap and prioritize features. You can learn more, including how to opt-out if you'd not like to participate in this anonymous program, by visiting the [documentation](https://turbo.build/repo/docs/telemetry): + +## Events + +All recorded events can be found by browsing the [events](./src/events) directory. diff --git a/crates/turborepo-telemetry/src/config.rs b/crates/turborepo-telemetry/src/config.rs new file mode 100644 index 0000000000000..d863680e61165 --- /dev/null +++ b/crates/turborepo-telemetry/src/config.rs @@ -0,0 +1,280 @@ +use std::{env, fs, path::Path}; + +use chrono::{DateTime, Utc}; +pub use config::{Config, ConfigError, File, FileFormat}; +use hex; +use serde::{Deserialize, Serialize}; +use serde_json; +use sha2::{Digest, Sha256}; +use tracing::{debug, error}; +use turborepo_ui::{color, BOLD, GREY, UI, UNDERLINE}; +use uuid::Uuid; + +// Telemetry ships disabled by default until we can announce it publicly, this +// allows us to test it internally, and will be removed in 1.12 +// TODO:[telemetry] Remove this in `1.12` +static ENABLED_ENV_VAR: &str = "TURBO_TELEMETRY_ENABLED"; + +static DEBUG_ENV_VAR: &str = "TURBO_TELEMETRY_DEBUG"; +static DISABLED_ENV_VAR: &str = "TURBO_TELEMETRY_DISABLED"; +static DISABLED_MESSAGE_ENV_VAR: &str = "TURBO_TELEMETRY_MESSAGE_DISABLED"; +static DO_NOT_TRACK_ENV_VAR: &str = "DO_NOT_TRACK"; + +#[derive(Debug, Deserialize, Serialize)] +pub struct TelemetryConfigContents { + // whether or not telemetry is enabled + telemetry_enabled: bool, + // randomized and salted machine id - used for linking events together + telemetry_id: String, + // private salt used to anonymize event data (telemetry_id, task names, package names, etc.) - + // this is generated on first run and never leaves the machine + telemetry_salt: String, + + // when the alert was shown + #[serde(skip_serializing_if = "Option::is_none")] + telemetry_alerted: Option>, +} + +impl Default for TelemetryConfigContents { + fn default() -> Self { + let telemetry_salt = Uuid::new_v4().to_string(); + let raw_telemetry_id = Uuid::new_v4().to_string(); + let telemetry_id = one_way_hash_with_salt(&telemetry_salt, &raw_telemetry_id); + + TelemetryConfigContents { + telemetry_enabled: true, + telemetry_alerted: None, + telemetry_salt, + telemetry_id, + } + } +} + +#[derive(Debug)] +pub struct TelemetryConfig { + config_path: String, + config: TelemetryConfigContents, +} + +impl TelemetryConfig { + pub fn new() -> Result { + let file_path = &get_config_path()?; + debug!("Telemetry config path: {}", file_path); + if !Path::new(file_path).try_exists().unwrap_or(false) { + write_new_config()?; + } + + let mut settings = Config::builder(); + settings = settings.add_source(File::new(file_path, FileFormat::Json)); + let settings = settings.build(); + + // If this is a FileParse error, we assume something corrupted the file or + // its structure. In this case, because the telemetry config is intentionally + // isolated from other turborepo config, try to remove the entire config + // file and write a new one, otherwise return the error + let config = match settings { + Ok(settings) => settings.try_deserialize::()?, + Err(ConfigError::FileParse { .. }) => { + fs::remove_file(file_path).map_err(|e| ConfigError::Message(e.to_string()))?; + write_new_config()?; + return Err(settings.unwrap_err()); + } + // Propagate other errors + Err(err) => return Err(err), + }; + + let config = TelemetryConfig { + config_path: file_path.to_string(), + config, + }; + + Ok(config) + } + + fn write(&self) -> Result<(), ConfigError> { + let serialized = serde_json::to_string_pretty(&self.config) + .map_err(|e| ConfigError::Message(e.to_string()))?; + fs::write(&self.config_path, serialized) + .map_err(|e| ConfigError::Message(e.to_string()))?; + Ok(()) + } + + pub fn one_way_hash(input: &str) -> String { + match TelemetryConfig::new() { + Ok(config) => config.one_way_hash_with_config_salt(input), + Err(_) => TelemetryConfig::one_way_hash_with_tmp_salt(input), + } + } + + /// Obfuscate with the config salt - this is used for all sensitive event + /// data + fn one_way_hash_with_config_salt(&self, input: &str) -> String { + one_way_hash_with_salt(&self.config.telemetry_salt, input) + } + + /// Obfuscate with a temporary salt - this is used as a fallback when the + /// config salt is not available (e.g. config loading failed etc.) + /// + /// This is just as secure as the config salt, but it prevents us from + /// linking together events that include obfuscated data generated with + /// this method as each call will generate a new salt. + fn one_way_hash_with_tmp_salt(input: &str) -> String { + let tmp_salt = Uuid::new_v4().to_string(); + one_way_hash_with_salt(&tmp_salt, input) + } + + pub fn show_alert(&mut self, ui: UI) { + if !self.has_seen_alert() && self.is_enabled() && Self::is_telemetry_warning_enabled() { + println!( + "\n{}\n{}\n{}\n{}\n{}\n", + color!(ui, BOLD, "{}", "Attention:"), + color!( + ui, + GREY, + "{}", + "Turborepo now collects completely anonymous telemetry regarding usage." + ), + color!( + ui, + GREY, + "{}", + "This information is used to shape the Turborepo roadmap and prioritize \ + features." + ), + color!( + ui, + GREY, + "{}", + "You can learn more, including how to opt-out if you'd not like to \ + participate in this anonymous program, by visiting the following URL:" + ), + color!( + ui, + UNDERLINE, + "{}", + color!(ui, GREY, "{}", "https://turbo.build/repo/docs/telemetry") + ), + ); + + if let Err(err) = self.alert_shown() { + error!( + "Error saving seen alert event to telemetry config: {:?}", + err + ); + } + } + } + + // getters + pub fn has_seen_alert(&self) -> bool { + self.config.telemetry_alerted.is_some() + } + + pub fn is_enabled(&self) -> bool { + let do_not_track = env::var(DO_NOT_TRACK_ENV_VAR).unwrap_or("0".to_string()); + let turbo_telemetry_disabled = env::var(DISABLED_ENV_VAR).unwrap_or("0".to_string()); + + if do_not_track == "1" + || do_not_track == "true" + || turbo_telemetry_disabled == "1" + || turbo_telemetry_disabled == "true" + { + return false; + } + + self.config.telemetry_enabled + } + + pub fn is_telemetry_warning_enabled() -> bool { + let turbo_telemetry_msg_disabled = + env::var(DISABLED_MESSAGE_ENV_VAR).unwrap_or("0".to_string()); + turbo_telemetry_msg_disabled != "1" && turbo_telemetry_msg_disabled != "true" + } + + pub fn get_id(&self) -> &str { + &self.config.telemetry_id + } + + // setters + pub fn enable(&mut self) -> Result<&TelemetryConfigContents, ConfigError> { + self.config.telemetry_enabled = true; + self.write()?; + Ok(&self.config) + } + + pub fn disable(&mut self) -> Result<&TelemetryConfigContents, ConfigError> { + self.config.telemetry_enabled = false; + self.write()?; + Ok(&self.config) + } + + pub fn alert_shown(&mut self) -> Result<&TelemetryConfigContents, ConfigError> { + match self.has_seen_alert() { + true => Ok(&self.config), + false => { + self.config.telemetry_alerted = Some(Utc::now()); + self.write()?; + Ok(&self.config) + } + } + } +} + +fn get_config_path() -> Result { + if cfg!(test) { + let tmp_dir = env::temp_dir(); + let config_path = tmp_dir.join("test-telemetry.json"); + Ok(config_path.to_str().unwrap().to_string()) + } else { + let config_dir = dirs_next::config_dir().ok_or(ConfigError::Message( + "Could find telemetry config directory".to_string(), + ))?; + // stored as a sibling to the turbo global config + let config_path = config_dir.join("turborepo").join("telemetry.json"); + Ok(config_path.to_str().unwrap().to_string()) + } +} + +fn write_new_config() -> Result<(), ConfigError> { + let file_path = get_config_path()?; + let serialized = serde_json::to_string_pretty(&TelemetryConfigContents::default()) + .map_err(|e| ConfigError::Message(e.to_string()))?; + + // Extract the directory path from the file path + let dir_path = Path::new(&file_path).parent().ok_or_else(|| { + ConfigError::Message("Failed to extract directory path from file path".to_string()) + })?; + + // Create the directory if it doesn't exist + if !dir_path.try_exists().unwrap_or(false) { + fs::create_dir_all(dir_path).map_err(|e| { + ConfigError::Message(format!( + "Failed to create directory {}: {}", + dir_path.display(), + e + )) + })?; + } + + // Write the file + fs::write(&file_path, serialized).map_err(|e| ConfigError::Message(e.to_string()))?; + Ok(()) +} + +pub fn is_debug() -> bool { + let debug = env::var(DEBUG_ENV_VAR).unwrap_or("0".to_string()); + debug == "1" || debug == "true" +} + +fn one_way_hash_with_salt(salt: &str, input: &str) -> String { + let salted = format!("{}{}", salt, input); + let mut hasher = Sha256::new(); + hasher.update(salted.as_bytes()); + let generic = hasher.finalize(); + hex::encode(generic) +} + +// TODO:[telemetry] Remove this in `1.12` +pub fn is_telemetry_internal_test() -> bool { + env::var(ENABLED_ENV_VAR).unwrap_or("0".to_string()) == "1" +} diff --git a/crates/turborepo-telemetry/src/errors.rs b/crates/turborepo-telemetry/src/errors.rs new file mode 100644 index 0000000000000..2abcb7c966d6b --- /dev/null +++ b/crates/turborepo-telemetry/src/errors.rs @@ -0,0 +1,29 @@ +use std::backtrace::Backtrace; + +use reqwest::header::ToStrError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error making HTTP request: {0}")] + ReqwestError(#[from] reqwest::Error), + #[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")] + TooManyFailures(#[from] Box), + #[error("Unable to set up TLS.")] + TlsError(#[source] reqwest::Error), + #[error("Error parsing header: {0}")] + InvalidHeader(#[from] ToStrError), + #[error("Error parsing URL: {0}")] + InvalidUrl(#[from] url::ParseError), + #[error("unknown status {code}: {message}")] + UnknownStatus { + code: String, + message: String, + #[backtrace] + backtrace: Backtrace, + }, + #[error("Error making retryable request: {0}")] + RetryError(#[from] turborepo_api_client::Error), +} + +pub type Result = std::result::Result; diff --git a/crates/turborepo-telemetry/src/events/command.rs b/crates/turborepo-telemetry/src/events/command.rs new file mode 100644 index 0000000000000..15048a7ce970e --- /dev/null +++ b/crates/turborepo-telemetry/src/events/command.rs @@ -0,0 +1,111 @@ +use serde::{Deserialize, Serialize}; +use turborepo_vercel_api::{TelemetryCommandEvent, TelemetryEvent}; +use uuid::Uuid; + +use super::{Event, EventBuilder, EventType, PubEventBuilder}; +use crate::{config::TelemetryConfig, telem}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandEventBuilder { + id: String, + command: String, + parent: Option, +} + +impl EventBuilder for CommandEventBuilder { + fn get_id(&self) -> &String { + &self.id + } + + fn with_parent(mut self, parent_event: &CommandEventBuilder) -> Self { + self.parent = Some(parent_event.get_id().clone()); + self + } +} + +impl PubEventBuilder for CommandEventBuilder { + fn track(&self, event: Event) { + let val = match event.is_sensitive { + EventType::Sensitive => TelemetryConfig::one_way_hash(&event.value), + EventType::NonSensitive => event.value.to_string(), + }; + + telem(TelemetryEvent::Command(TelemetryCommandEvent { + id: self.id.clone(), + command: self.command.clone(), + parent: self.parent.clone(), + key: event.key, + value: val, + })); + } + + fn child(&self) -> Self { + Self::new(&self.command).with_parent(self) + } +} + +// events + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CodePath { + Go, + Rust, +} + +impl CommandEventBuilder { + pub fn new(command: &str) -> Self { + Self { + id: Uuid::new_v4().to_string(), + command: command.to_string(), + parent: None, + } + } + + pub fn track_call(&self) -> &Self { + self.track(Event { + key: "command".to_string(), + value: "called".to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_run_code_path(&self, path: CodePath) -> &Self { + self.track(Event { + key: "binary".to_string(), + value: match path { + CodePath::Go => "go".to_string(), + CodePath::Rust => "rust".to_string(), + }, + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_telemetry_config(&self, enabled: bool) -> &Self { + self.track(Event { + key: "action".to_string(), + value: if enabled { "enabled" } else { "disabled" }.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_generator_option(&self, option: &str) -> &Self { + self.track(Event { + key: "option".to_string(), + value: option.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_generator_tag(&self, tag: &str) -> &Self { + self.track(Event { + key: "tag".to_string(), + value: tag.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } +} diff --git a/crates/turborepo-telemetry/src/events/mod.rs b/crates/turborepo-telemetry/src/events/mod.rs new file mode 100644 index 0000000000000..4e8c14e550cf6 --- /dev/null +++ b/crates/turborepo-telemetry/src/events/mod.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; + +// all event builders and their event methods +pub mod command; +pub mod repo; +pub mod task; + +/// All possible telemetry events must be included in this enum. +/// These events must be added to the backend (telemetry.vercel.com) +/// before they can be tracked - invalid or unknown events will be +/// ignored. +pub use turborepo_vercel_api::TelemetryEvent; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum EventType { + Sensitive, + NonSensitive, +} + +/// Key-value pairs that are sent with each even - if the value is +/// sensitive, it will be hashed and anonymized before being sent +/// using the users private salt. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Event { + key: String, + value: String, + is_sensitive: EventType, +} + +/// Private trait that can be used for building telemetry events. +/// +/// Supports connecting events via a parent-child relationship +/// to aid in connecting events together. +trait EventBuilder { + fn get_id(&self) -> &String; + fn with_parent(self, parent_event: &T) -> Self; +} + +/// Public trait that can be used for building telemetry events. +pub trait PubEventBuilder { + fn track(&self, event: Event); + fn child(&self) -> Self; +} diff --git a/crates/turborepo-telemetry/src/events/repo.rs b/crates/turborepo-telemetry/src/events/repo.rs new file mode 100644 index 0000000000000..b894cf7d79501 --- /dev/null +++ b/crates/turborepo-telemetry/src/events/repo.rs @@ -0,0 +1,83 @@ +use serde::{Deserialize, Serialize}; +use turborepo_vercel_api::{TelemetryEvent, TelemetryRepoEvent}; +use uuid::Uuid; + +use super::{Event, EventBuilder, EventType, PubEventBuilder}; +use crate::{config::TelemetryConfig, telem}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepoEventBuilder { + id: String, + repo: String, + parent: Option, +} + +impl EventBuilder for RepoEventBuilder { + fn get_id(&self) -> &String { + &self.id + } + + fn with_parent(mut self, parent_event: &RepoEventBuilder) -> Self { + self.parent = Some(parent_event.get_id().clone()); + self + } +} + +impl PubEventBuilder for RepoEventBuilder { + fn track(&self, event: Event) { + let val = match event.is_sensitive { + EventType::Sensitive => TelemetryConfig::one_way_hash(&event.value), + EventType::NonSensitive => event.value.to_string(), + }; + + telem(TelemetryEvent::Repo(TelemetryRepoEvent { + id: self.id.clone(), + repo: self.repo.clone(), + key: event.key, + value: val, + parent: self.parent.clone(), + })); + } + + fn child(&self) -> Self { + Self::new(&self.repo).with_parent(self) + } +} + +// events +impl RepoEventBuilder { + pub fn new(repo: &str) -> Self { + Self { + id: Uuid::new_v4().to_string(), + repo: TelemetryConfig::one_way_hash(repo), + parent: None, + } + } + + pub fn track_package_manager_name(&self, name: &str) -> &Self { + self.track(Event { + key: "package_manager_name".to_string(), + value: name.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_package_manager_version(&self, version: &str) -> &Self { + self.track(Event { + key: "package_manager_version".to_string(), + value: version.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_is_monorepo(&self, is_monorepo: bool) -> &Self { + self.track(Event { + key: "is_monorepo".to_string(), + value: is_monorepo.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } +} diff --git a/crates/turborepo-telemetry/src/events/task.rs b/crates/turborepo-telemetry/src/events/task.rs new file mode 100644 index 0000000000000..2542dcfad1645 --- /dev/null +++ b/crates/turborepo-telemetry/src/events/task.rs @@ -0,0 +1,112 @@ +use serde::{Deserialize, Serialize}; +use turborepo_vercel_api::{TelemetryEvent, TelemetryTaskEvent}; +use uuid::Uuid; + +use super::{Event, EventBuilder, EventType, PubEventBuilder}; +use crate::{config::TelemetryConfig, telem}; + +// task names that will be passed through to the API without obfuscation +const ALLOWLIST: [&str; 8] = [ + "build", + "test", + "lint", + "typecheck", + "checktypes", + "check-types", + "type-check", + "check", +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PackageTaskEventBuilder { + id: String, + package: String, + task: String, + parent: Option, +} + +impl EventBuilder for PackageTaskEventBuilder { + fn get_id(&self) -> &String { + &self.id + } + + fn with_parent(mut self, parent_event: &PackageTaskEventBuilder) -> Self { + self.parent = Some(parent_event.get_id().clone()); + self + } +} + +impl PubEventBuilder for PackageTaskEventBuilder { + fn track(&self, event: Event) { + let val = match event.is_sensitive { + EventType::Sensitive => TelemetryConfig::one_way_hash(&event.value), + EventType::NonSensitive => event.value.to_string(), + }; + + telem(TelemetryEvent::Task(TelemetryTaskEvent { + id: self.id.clone(), + package: self.package.clone(), + task: self.task.clone(), + parent: self.parent.clone(), + key: event.key, + value: val, + })); + } + + fn child(&self) -> Self { + Self::new(&self.package, &self.task).with_parent(self) + } +} + +impl PackageTaskEventBuilder { + pub fn new(package: &str, task: &str) -> Self { + // don't obfuscate the package in development mode + let package = if cfg!(debug_assertions) { + package.to_string() + } else { + TelemetryConfig::one_way_hash(package) + }; + + // don't obfuscate the task in development mode or if it's in the allowlist + let task = if cfg!(debug_assertions) || ALLOWLIST.contains(&task) { + task.to_string() + } else { + TelemetryConfig::one_way_hash(task) + }; + + Self { + id: Uuid::new_v4().to_string(), + parent: None, + package, + task, + } + } + + // event methods + pub fn track_recursive_error(&self) -> &Self { + self.track(Event { + key: "error".to_string(), + value: "recursive".to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_hash(&self, hash: &str) -> &Self { + self.track(Event { + key: "hash".to_string(), + value: hash.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } + + pub fn track_framework(&self, framework: &str) -> &Self { + self.track(Event { + key: "framework".to_string(), + value: framework.to_string(), + is_sensitive: EventType::NonSensitive, + }); + self + } +} diff --git a/crates/turborepo-telemetry/src/lib.rs b/crates/turborepo-telemetry/src/lib.rs new file mode 100644 index 0000000000000..b2426979cd550 --- /dev/null +++ b/crates/turborepo-telemetry/src/lib.rs @@ -0,0 +1,447 @@ +//! Turborepo's telemetry library. Handles sending anonymous telemetry events to +//! the Vercel API in the background. +//! +//! More detail is available at https://turbo.build/repo/docs/telemetry. + +#![feature(error_generic_member_access)] + +pub mod config; +pub mod errors; +pub mod events; + +use std::time::Duration; + +use config::{ConfigError, TelemetryConfig}; +use events::TelemetryEvent; +use futures::{stream::FuturesUnordered, StreamExt}; +use once_cell::sync::OnceCell; +use thiserror::Error; +use tokio::{ + select, + sync::{mpsc, oneshot}, + task::{JoinError, JoinHandle}, +}; +use tracing::{debug, error}; +use turborepo_api_client::telemetry; +use turborepo_ui::{color, BOLD, GREY, UI}; +use uuid::Uuid; + +const BUFFER_THRESHOLD: usize = 10; + +static EVENT_TIMEOUT: Duration = Duration::from_millis(1000); +static NO_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60); +static REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to initialize telemetry")] + InitError(#[from] ConfigError), + #[error("Failed to send telemetry event")] + SendError(#[from] mpsc::error::SendError), + #[error("Failed to record telemetry")] + Join(#[from] JoinError), + #[error("Telemetry already initialized")] + AlreadyInitialized(), +} + +pub type TelemetrySender = mpsc::UnboundedSender; + +/// The handle on the `Worker` tokio thread, along with a channel +/// to indicate to the thread that it should shut down. +pub struct TelemetryHandle { + exit_ch: oneshot::Receiver<()>, + handle: JoinHandle<()>, +} + +static SENDER_INSTANCE: OnceCell = OnceCell::new(); + +// A global instance of the TelemetrySender. +pub fn telem(event: events::TelemetryEvent) { + let sender = SENDER_INSTANCE.get(); + match sender { + Some(s) => { + let result = s.send(event); + if let Err(err) = result { + debug!("failed to send telemetry event. error: {}", err) + } + } + None => { + // If we're in debug mode - log a warning + // TODO:[telemetry] Remove the internal test check in `1.12` + if cfg!(debug_assertions) && !cfg!(test) && config::is_telemetry_internal_test() { + println!("\n[DEVELOPMENT ERROR] telemetry sender not initialized\n"); + } + debug!("telemetry sender not initialized"); + } + } +} + +fn init( + client: impl telemetry::TelemetryClient + Clone + Send + Sync + 'static, + ui: UI, +) -> Result<(TelemetryHandle, TelemetrySender), Box> { + let (tx, rx) = mpsc::unbounded_channel(); + let (cancel_tx, cancel_rx) = oneshot::channel(); + let mut config = TelemetryConfig::new()?; + config.show_alert(ui); + + let session_id = Uuid::new_v4(); + let worker = Worker { + rx, + buffer: Vec::new(), + senders: FuturesUnordered::new(), + exit_ch: cancel_tx, + client, + session_id: session_id.to_string(), + telemetry_id: config.get_id().to_string(), + enabled: config.is_enabled(), + ui, + }; + let handle = worker.start(); + + let telemetry_handle = TelemetryHandle { + exit_ch: cancel_rx, + handle, + }; + + // return + Ok((telemetry_handle, tx)) +} + +/// Starts the `Worker` on a separate tokio thread. Returns an `TelemetrySender` +/// and an `TelemetryHandle`. +/// +/// We have two different types because the TelemetrySender should be shared +/// across threads (i.e. Clone + Send), while the TelemetryHandle cannot be +/// shared since it contains the structs necessary to shut down the worker. +pub fn init_telemetry( + client: impl telemetry::TelemetryClient + Clone + Send + Sync + 'static, + ui: UI, +) -> Result> { + // make sure we're not already initialized + if SENDER_INSTANCE.get().is_some() { + debug!("telemetry already initialized"); + return Err(Box::new(Error::AlreadyInitialized())); + } + let (handle, sender) = init(client, ui)?; + SENDER_INSTANCE.set(sender).unwrap(); + Ok(handle) +} + +impl TelemetryHandle { + async fn close(self) -> Result<(), Error> { + drop(self.exit_ch); + self.handle.await?; + + Ok(()) + } + + /// Closes the handle with an explicit timeout. If the handle fails to close + /// within that timeout, it will log an error and drop the handle. + pub async fn close_with_timeout(self) { + if let Err(err) = tokio::time::timeout(EVENT_TIMEOUT, self.close()).await { + debug!("failed to close telemetry handle. error: {}", err) + } else { + debug!("telemetry handle closed") + } + } +} + +struct Worker { + rx: mpsc::UnboundedReceiver, + buffer: Vec, + senders: FuturesUnordered>, + // Used to cancel the worker + exit_ch: oneshot::Sender<()>, + client: C, + telemetry_id: String, + session_id: String, + enabled: bool, + ui: UI, +} + +impl Worker { + pub fn start(mut self) -> JoinHandle<()> { + tokio::spawn(async move { + let mut timeout = tokio::time::sleep(NO_TIMEOUT); + loop { + select! { + // We want the events to be prioritized over closing + biased; + event = self.rx.recv() => { + if let Some(event) = event { + self.buffer.push(event); + } else { + // There are no senders left so we can shut down + break; + } + if self.buffer.len() == BUFFER_THRESHOLD { + self.flush_events(); + timeout = tokio::time::sleep(NO_TIMEOUT); + } else { + timeout = tokio::time::sleep(EVENT_TIMEOUT); + } + } + _ = timeout => { + self.flush_events(); + timeout = tokio::time::sleep(NO_TIMEOUT); + } + _ = self.exit_ch.closed() => { + break; + } + } + } + self.flush_events(); + while let Some(result) = self.senders.next().await { + if let Err(err) = result { + debug!("failed to send telemetry event. error: {}", err) + } + } + }) + } + + pub fn flush_events(&mut self) { + if !self.buffer.is_empty() { + let events = std::mem::take(&mut self.buffer); + debug!( + "Starting telemetry event queue flush (num_events={:?})", + events.len() + ); + let handle = self.send_events(events); + if let Some(handle) = handle { + self.senders.push(handle); + } + debug!("Done telemetry event queue flush"); + } + } + + fn send_events(&self, events: Vec) -> Option> { + if !self.enabled { + return None; + } + + if config::is_debug() { + for event in &events { + let pretty_event = serde_json::to_string_pretty(&event) + .unwrap_or("Error serializing event".to_string()); + println!( + "\n{}\n{}\n", + color!(self.ui, BOLD, "{}", "[telemetry event]"), + color!(self.ui, GREY, "{}", pretty_event) + ); + } + } + + let client = self.client.clone(); + let session_id = self.session_id.clone(); + let telemetry_id = self.telemetry_id.clone(); + Some(tokio::spawn(async move { + if let Ok(Err(err)) = tokio::time::timeout( + REQUEST_TIMEOUT, + client.record_telemetry(events, telemetry_id.as_str(), session_id.as_str()), + ) + .await + { + debug!("failed to record cache usage telemetry. error: {}", err) + } + })) + } +} + +#[cfg(test)] +mod tests { + use std::{ + cell::RefCell, + sync::{Arc, Mutex}, + time::Duration, + }; + + use async_trait::async_trait; + use tokio::{ + select, + sync::{mpsc, mpsc::UnboundedReceiver}, + }; + use turborepo_api_client::telemetry::TelemetryClient; + use turborepo_ui::UI; + use turborepo_vercel_api::{TelemetryCommandEvent, TelemetryEvent}; + + use crate::init; + + #[derive(Clone)] + struct DummyClient { + // A vector that stores each batch of events + events: Arc>>>>, + tx: mpsc::UnboundedSender<()>, + } + + impl DummyClient { + pub fn events(&self) -> Vec> { + self.events.lock().unwrap().borrow().clone() + } + } + + #[async_trait] + impl TelemetryClient for DummyClient { + async fn record_telemetry( + &self, + events: Vec, + _telemetry_id: &str, + _session_id: &str, + ) -> Result<(), turborepo_api_client::Error> { + self.events.lock().unwrap().borrow_mut().push(events); + self.tx.send(()).unwrap(); + + Ok(()) + } + } + + // Asserts that we get the message after the timeout + async fn expect_timeout_then_message(rx: &mut UnboundedReceiver<()>) { + let timeout = tokio::time::sleep(std::time::Duration::from_millis(150)); + + select! { + _ = rx.recv() => { + panic!("Expected to wait out the flush timeout") + } + _ = timeout => { + } + } + + rx.recv().await; + } + + // Asserts that we get the message immediately before the timeout + async fn expected_immediate_message(rx: &mut UnboundedReceiver<()>) { + let timeout = tokio::time::sleep(std::time::Duration::from_millis(150)); + + select! { + _ = rx.recv() => { + } + _ = timeout => { + panic!("expected to not wait out the flush timeout") + } + } + } + + #[tokio::test] + async fn test_batching() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let client = DummyClient { + events: Default::default(), + tx, + }; + + let result = init(client.clone(), UI::new(false)); + assert!(result.is_ok()); + + let (telemetry_handle, telemetry_sender) = result.unwrap(); + + for _ in 0..2 { + telemetry_sender + .send(TelemetryEvent::Command(TelemetryCommandEvent { + id: "id".to_string(), + command: "command".to_string(), + key: "key".to_string(), + value: "value".to_string(), + parent: None, + })) + .unwrap(); + } + let found = client.events(); + // Should have no events since we haven't flushed yet + assert_eq!(found.len(), 0); + + expect_timeout_then_message(&mut rx).await; + let found = client.events(); + assert_eq!(found.len(), 1); + let payloads = &found[0]; + assert_eq!(payloads.len(), 2); + + drop(telemetry_handle); + } + + #[tokio::test] + async fn test_batching_across_two_batches() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let client = DummyClient { + events: Default::default(), + tx, + }; + + let result = init(client.clone(), UI::new(false)); + assert!(result.is_ok()); + + let (telemetry_handle, telemetry_sender) = result.unwrap(); + + for _ in 0..12 { + telemetry_sender + .send(TelemetryEvent::Command(TelemetryCommandEvent { + id: "id".to_string(), + command: "command".to_string(), + key: "key".to_string(), + value: "value".to_string(), + parent: None, + })) + .unwrap(); + } + + expected_immediate_message(&mut rx).await; + + let found = client.events(); + assert_eq!(found.len(), 1); + + let payloads = &found[0]; + assert_eq!(payloads.len(), 10); + + expect_timeout_then_message(&mut rx).await; + let found = client.events(); + assert_eq!(found.len(), 2); + + let payloads = &found[1]; + assert_eq!(payloads.len(), 2); + + drop(telemetry_handle); + } + + #[tokio::test] + async fn test_closing() { + let (tx, mut _rx) = mpsc::unbounded_channel(); + + let client = DummyClient { + events: Default::default(), + tx, + }; + + let result = init(client.clone(), UI::new(false)); + assert!(result.is_ok()); + + let (telemetry_handle, telemetry_sender) = result.unwrap(); + + for _ in 0..2 { + telemetry_sender + .send(TelemetryEvent::Command(TelemetryCommandEvent { + id: "id".to_string(), + command: "command".to_string(), + key: "key".to_string(), + value: "value".to_string(), + parent: None, + })) + .unwrap(); + } + drop(telemetry_sender); + + let found = client.events(); + assert!(found.is_empty()); + + tokio::time::timeout(Duration::from_millis(5), telemetry_handle.close()) + .await + .expect("timeout before close") + .expect("analytics worker panicked"); + let found = client.events(); + assert_eq!(found.len(), 1); + let payloads = &found[0]; + assert_eq!(payloads.len(), 2); + } +} diff --git a/crates/turborepo-vercel-api/src/lib.rs b/crates/turborepo-vercel-api/src/lib.rs index dd4a6d42c4c28..f0a4cba8e546c 100644 --- a/crates/turborepo-vercel-api/src/lib.rs +++ b/crates/turborepo-vercel-api/src/lib.rs @@ -158,6 +158,49 @@ impl AnalyticsEvent { } } +// telemetry events + +/// All possible telemetry events must be included in this enum. +/// +/// These events must be added to the backend (telemetry.vercel.com) +/// before they can be tracked - invalid or unknown events will be +/// ignored. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TelemetryEvent { + Task(TelemetryTaskEvent), + Command(TelemetryCommandEvent), + Repo(TelemetryRepoEvent), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryCommandEvent { + pub id: String, + pub command: String, + pub key: String, + pub value: String, + pub parent: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryRepoEvent { + pub id: String, + pub repo: String, + pub key: String, + pub value: String, + pub parent: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryTaskEvent { + pub id: String, + pub package: String, + pub task: String, + pub key: String, + pub value: String, + pub parent: Option, +} + #[cfg(test)] mod tests { use test_case::test_case; diff --git a/turborepo-tests/integration/tests/command-bin.t b/turborepo-tests/integration/tests/command-bin.t index 76105408a212d..8d0bba6a9d968 100644 --- a/turborepo-tests/integration/tests/command-bin.t +++ b/turborepo-tests/integration/tests/command-bin.t @@ -5,4 +5,4 @@ Setup $ grep --quiet "Global turbo version: .*" out.log $ grep --quiet "No local turbo binary found at" out.log $ grep --quiet "Running command as global turbo" out.log - $ tail -n1 out.log | grep --quiet -E ".*[\/\\]target[\/\\]debug[\/\\]turbo(\.exe)?$" + $ grep --quiet -E ".*[\/\\]target[\/\\]debug[\/\\]turbo(\.exe)?$" out.log