From 776948b86f7af88108aead8a430d379079577101 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Fri, 24 Mar 2023 17:09:41 +0100 Subject: [PATCH 1/3] Prevent buildup of change events, debouncing while busy Changes from the file system already get debounced in a 1 second window. However, still each change that gets reported, will trigger a build. All changes happening during the build will get buffered and sequentially be executed. Depending on the length of the build, and the input from the user during this time, this can lead to a huge amount of changes, leading to a buildup that makes it look like trunk is "continuously" rebuilding. This change debounces change event while the build is active. It will record the request for a new build, and notify the build loop. Only when the build loop has started can another request be recorded. This results in a build starting as soon as possible when the first events emitted and ensures at most one more run after the build has finishes. No additional requests will be queued. --- CHANGELOG.md | 1 + src/cmd/watch.rs | 4 +-- src/debouncer.rs | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/serve.rs | 9 ++++-- src/watch.rs | 37 ++++++++++++++---------- 6 files changed, 107 insertions(+), 19 deletions(-) create mode 100644 src/debouncer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index db769873..8a6ffba2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ Subheadings to categorize changes are `added, changed, deprecated, removed, fixe - Features passed to trunk build are now passed to cargo build (unless overridden by attributes in the HTML file) - Fix [trunk/issues/330](https://github.com/thedodd/trunk/issues/330), to properly handle proxy endpoint with and without a slash at the end. - Fix [trunk/issues/475](https://github.com/thedodd/trunk/issues/475) indeterminate trunk behaviour when the project defines several binary crates and index.html has no `` +- Prevent buildup of change events and "endless" build loops ## 0.16.0 ### added diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index 8ff6a7c7..61897475 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -22,9 +22,9 @@ impl Watch { pub async fn run(self, config: Option) -> Result<()> { let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); let cfg = ConfigOpts::rtc_watch(self.build, self.watch, config)?; - let mut system = WatchSystem::new(cfg, shutdown_tx.clone(), None).await?; + let system = WatchSystem::new(cfg, shutdown_tx.clone(), None).await?; - system.build().await.ok(); + system.trigger_build(); let system_handle = tokio::spawn(system.run()); tokio::signal::ctrl_c() .await diff --git a/src/debouncer.rs b/src/debouncer.rs new file mode 100644 index 00000000..ee85e5b1 --- /dev/null +++ b/src/debouncer.rs @@ -0,0 +1,74 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use tokio::sync::Notify; + +/// Debounces events as long as it is busy. +/// +/// Instead of using a fixed time period to debounce events, it debounces events +/// as long as it is busy working on an event. +/// +/// The idea is that a producer can push events to the debouncer, which take some time processing. +/// While processing, only the most recent event will be recorded, and executed after the +/// previous event finished. +/// +/// It is intended for scenarios where it is not important to execute a task for all events, but +/// (ideally) as soon as possible, at least once after an event was published, but not process +/// for events that are obsolete due to succeeding events. +pub struct BusyDebouncer { + notify: Arc, + data: Arc>>, +} + +impl BusyDebouncer { + pub fn new(context: C, handler: F) -> Self + where + C: Send + 'static, + T: Send + Sync + 'static, + for<'a> F: Fn(&'a mut C, T) -> Pin + Send + Sync + 'a>> + + Send + + Sync + + 'static, + { + let notify = Arc::new(Notify::new()); + let data = Arc::new(Mutex::new(None)); + + { + let notify = notify.clone(); + let data = data.clone(); + tokio::spawn(async move { + let mut context = context; + loop { + notify.notified().await; + let next = data.lock().unwrap().take(); + match next { + Some(event) => { + handler(&mut context, event).await; + } + None => break, + } + } + }); + } + + Self { notify, data } + } + + /// Push a new task to the debouncer. + /// + /// This call will return immediately, and might spawn the event now, at a later time, or never. + pub fn push(&self, event: T) { + self.send(Some(event)); + } + + fn send(&self, msg: Option) { + *self.data.lock().unwrap() = msg; + self.notify.notify_one(); + } +} + +impl Drop for BusyDebouncer { + fn drop(&mut self) { + self.send(None); + } +} diff --git a/src/main.rs b/src/main.rs index c7bd49a1..9bf4ff5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod build; mod cmd; mod common; mod config; +mod debouncer; mod hooks; mod pipelines; mod proxy; diff --git a/src/serve.rs b/src/serve.rs index fa41e1c2..cd2f66d9 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -57,9 +57,14 @@ impl ServeSystem { /// Run the serve system. #[tracing::instrument(level = "trace", skip(self))] - pub async fn run(mut self) -> Result<()> { + pub async fn run(self) -> Result<()> { + // Perform an initial build + let mut build_done_rx = self.build_done_chan.subscribe(); + self.watch.trigger_build(); + let _build_res = build_done_rx.recv().await; // TODO: only open after a successful build. + drop(build_done_rx); + // Spawn the watcher & the server. - let _build_res = self.watch.build().await; // TODO: only open after a successful build. let watch_handle = tokio::spawn(self.watch.run()); let server_handle = Self::spawn_server( self.cfg.clone(), diff --git a/src/watch.rs b/src/watch.rs index 3b15946c..06617208 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -10,14 +10,14 @@ use tokio_stream::wrappers::BroadcastStream; use crate::build::BuildSystem; use crate::config::RtcWatch; +use crate::debouncer::BusyDebouncer; /// Blacklisted path segments which are ignored by the watcher by default. const BLACKLIST: [&str; 1] = [".git"]; /// A watch system wrapping a build system and a watcher. pub struct WatchSystem { - /// The build system. - build: BuildSystem, + debouncer: BusyDebouncer<()>, /// The current vector of paths to be ignored. ignored_paths: Vec, /// A channel of FS watch events. @@ -28,8 +28,6 @@ pub struct WatchSystem { _watcher: RecommendedWatcher, /// The application shutdown channel. shutdown: BroadcastStream<()>, - /// Channel that is sent on whenever a build completes. - build_done_tx: Option>, } impl WatchSystem { @@ -48,21 +46,35 @@ impl WatchSystem { // Build dependencies. let build = BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?; + + // Build debouncer, to only run when necessary + let debouncer = BusyDebouncer::new(build, move |build, ()| { + let mut build_done_tx = build_done_tx.clone(); + Box::pin(async move { + let _res = build.build().await; + + // TODO/NOTE: in the future, we will want to be able to pass along error info and other + // diagnostics info over the socket for use in an error overlay or console logging. + if let Some(tx) = build_done_tx.as_mut() { + let _ = tx.send(()); + } + }) + }); + Ok(Self { - build, + debouncer, ignored_paths: cfg.ignored_paths.clone(), watch_rx, build_rx, _watcher, shutdown: BroadcastStream::new(shutdown.subscribe()), - build_done_tx, }) } /// Run a build. #[tracing::instrument(level = "trace", skip(self))] - pub async fn build(&mut self) -> Result<()> { - self.build.build().await + pub fn trigger_build(&self) { + self.debouncer.push(()); } /// Run the watch system, responding to events and triggering builds. @@ -118,14 +130,9 @@ impl WatchSystem { return; // Don't emit a notification as path is on the blacklist. } - tracing::debug!("change detected in {:?}", ev_path); - let _res = self.build.build().await; + tracing::info!("change detected in {:?}", ev_path); - // TODO/NOTE: in the future, we will want to be able to pass along error info and other - // diagnostics info over the socket for use in an error overlay or console logging. - if let Some(tx) = self.build_done_tx.as_mut() { - let _ = tx.send(()); - } + self.trigger_build(); } fn update_ignore_list(&mut self, arg_path: PathBuf) { From de96cbe46e474770e22b540132d5e962839a0f60 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 27 Mar 2023 08:53:51 +0200 Subject: [PATCH 2/3] Apply clippy feedback In order to get away from using .unwrap() on the mutex lock calls, this uses the async variant of a Mutex from Tokio, making it a bit more difficult when dropping. However, that also doesn't happen that frequently. --- src/cmd/watch.rs | 2 +- src/debouncer.rs | 61 ++++++++++++++++++++++++++++++++++++++---------- src/serve.rs | 2 +- src/watch.rs | 6 ++--- 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/cmd/watch.rs b/src/cmd/watch.rs index 61897475..9351743e 100644 --- a/src/cmd/watch.rs +++ b/src/cmd/watch.rs @@ -24,7 +24,7 @@ impl Watch { let cfg = ConfigOpts::rtc_watch(self.build, self.watch, config)?; let system = WatchSystem::new(cfg, shutdown_tx.clone(), None).await?; - system.trigger_build(); + system.trigger_build().await; let system_handle = tokio::spawn(system.run()); tokio::signal::ctrl_c() .await diff --git a/src/debouncer.rs b/src/debouncer.rs index ee85e5b1..029bd4ed 100644 --- a/src/debouncer.rs +++ b/src/debouncer.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use tokio::sync::Notify; +use std::sync::Arc; +use tokio::sync::{Mutex, Notify}; /// Debounces events as long as it is busy. /// @@ -15,12 +15,37 @@ use tokio::sync::Notify; /// It is intended for scenarios where it is not important to execute a task for all events, but /// (ideally) as soon as possible, at least once after an event was published, but not process /// for events that are obsolete due to succeeding events. -pub struct BusyDebouncer { +pub struct BusyDebouncer +where + T: Send + 'static, +{ + inner: Inner, +} + +struct Inner +where + T: Send + 'static, +{ notify: Arc, data: Arc>>, } -impl BusyDebouncer { +impl Default for Inner +where + T: Send + 'static, +{ + fn default() -> Self { + Self { + notify: Default::default(), + data: Default::default(), + } + } +} + +impl BusyDebouncer +where + T: Send + 'static, +{ pub fn new(context: C, handler: F) -> Self where C: Send + 'static, @@ -40,7 +65,7 @@ impl BusyDebouncer { let mut context = context; loop { notify.notified().await; - let next = data.lock().unwrap().take(); + let next = data.lock().await.take(); match next { Some(event) => { handler(&mut context, event).await; @@ -51,24 +76,36 @@ impl BusyDebouncer { }); } - Self { notify, data } + Self { + inner: Inner { notify, data }, + } } /// Push a new task to the debouncer. /// /// This call will return immediately, and might spawn the event now, at a later time, or never. - pub fn push(&self, event: T) { - self.send(Some(event)); + pub async fn push(&self, event: T) { + self.inner.send(Some(event)).await; } +} - fn send(&self, msg: Option) { - *self.data.lock().unwrap() = msg; +impl Inner +where + T: Send + 'static, +{ + async fn send(&self, msg: Option) { + *self.data.lock().await = msg; self.notify.notify_one(); } } -impl Drop for BusyDebouncer { +impl Drop for BusyDebouncer +where + T: Send + 'static, +{ fn drop(&mut self) { - self.send(None); + let mut dropping = Default::default(); + std::mem::swap(&mut self.inner, &mut dropping); + tokio::spawn(async move { dropping.send(None).await }); } } diff --git a/src/serve.rs b/src/serve.rs index cd2f66d9..02198a6f 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -60,7 +60,7 @@ impl ServeSystem { pub async fn run(self) -> Result<()> { // Perform an initial build let mut build_done_rx = self.build_done_chan.subscribe(); - self.watch.trigger_build(); + self.watch.trigger_build().await; let _build_res = build_done_rx.recv().await; // TODO: only open after a successful build. drop(build_done_rx); diff --git a/src/watch.rs b/src/watch.rs index 06617208..fb234c94 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -73,8 +73,8 @@ impl WatchSystem { /// Run a build. #[tracing::instrument(level = "trace", skip(self))] - pub fn trigger_build(&self) { - self.debouncer.push(()); + pub async fn trigger_build(&self) { + self.debouncer.push(()).await; } /// Run the watch system, responding to events and triggering builds. @@ -132,7 +132,7 @@ impl WatchSystem { tracing::info!("change detected in {:?}", ev_path); - self.trigger_build(); + self.trigger_build().await; } fn update_ignore_list(&mut self, arg_path: PathBuf) { From 805f56423598a5b6db639f0ff3ba1ceda762286c Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 27 Mar 2023 10:28:57 +0200 Subject: [PATCH 3/3] Try fixing lint issues --- src/debouncer.rs | 1 + src/watch.rs | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/debouncer.rs b/src/debouncer.rs index 029bd4ed..6d6d53b0 100644 --- a/src/debouncer.rs +++ b/src/debouncer.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; + use tokio::sync::{Mutex, Notify}; /// Debounces events as long as it is busy. diff --git a/src/watch.rs b/src/watch.rs index fb234c94..6960c6a4 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -53,8 +53,9 @@ impl WatchSystem { Box::pin(async move { let _res = build.build().await; - // TODO/NOTE: in the future, we will want to be able to pass along error info and other - // diagnostics info over the socket for use in an error overlay or console logging. + // TODO/NOTE: in the future, we will want to be able to pass along error info and + // other diagnostics info over the socket for use in an error overlay or console + // logging. if let Some(tx) = build_done_tx.as_mut() { let _ = tx.send(()); }