Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Subheadings to categorize changes are `added, changed, deprecated, removed, fixe
- Features passed to trunk build are now passed to cargo build (unless overridden by attributes in the HTML file)
- Fix [trunk/issues/330](https://github.com/thedodd/trunk/issues/330), to properly handle proxy endpoint with and without a slash at the end.
- Fix [trunk/issues/475](https://github.com/thedodd/trunk/issues/475) indeterminate trunk behaviour when the project defines several binary crates and index.html has no `<link rel="rust" data-bin=...>`
- Prevent buildup of change events and "endless" build loops

## 0.16.0
### added
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ impl Watch {
pub async fn run(self, config: Option<PathBuf>) -> Result<()> {
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
let cfg = ConfigOpts::rtc_watch(self.build, self.watch, config)?;
let mut system = WatchSystem::new(cfg, shutdown_tx.clone(), None).await?;
let system = WatchSystem::new(cfg, shutdown_tx.clone(), None).await?;

system.build().await.ok();
system.trigger_build().await;
let system_handle = tokio::spawn(system.run());
tokio::signal::ctrl_c()
.await
Expand Down
112 changes: 112 additions & 0 deletions src/debouncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use tokio::sync::{Mutex, Notify};

/// Debounces events as long as it is busy.
///
/// Instead of using a fixed time period to debounce events, it debounces events
/// as long as it is busy working on an event.
///
/// The idea is that a producer can push events to the debouncer, which take some time processing.
/// While processing, only the most recent event will be recorded, and executed after the
/// previous event finished.
///
/// It is intended for scenarios where it is not important to execute a task for all events, but
/// (ideally) as soon as possible, at least once after an event was published, but not process
/// for events that are obsolete due to succeeding events.
pub struct BusyDebouncer<T>
where
T: Send + 'static,
{
inner: Inner<T>,
}

struct Inner<T>
where
T: Send + 'static,
{
notify: Arc<Notify>,
data: Arc<Mutex<Option<T>>>,
}

impl<T> Default for Inner<T>
where
T: Send + 'static,
{
fn default() -> Self {
Self {
notify: Default::default(),
data: Default::default(),
}
}
}

impl<T> BusyDebouncer<T>
where
T: Send + 'static,
{
pub fn new<C, F>(context: C, handler: F) -> Self
where
C: Send + 'static,
T: Send + Sync + 'static,
for<'a> F: Fn(&'a mut C, T) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'a>>
+ Send
+ Sync
+ 'static,
{
let notify = Arc::new(Notify::new());
let data = Arc::new(Mutex::new(None));

{
let notify = notify.clone();
let data = data.clone();
tokio::spawn(async move {
let mut context = context;
loop {
notify.notified().await;
let next = data.lock().await.take();
match next {
Some(event) => {
handler(&mut context, event).await;
}
None => break,
}
}
});
}

Self {
inner: Inner { notify, data },
}
}

/// Push a new task to the debouncer.
///
/// This call will return immediately, and might spawn the event now, at a later time, or never.
pub async fn push(&self, event: T) {
self.inner.send(Some(event)).await;
}
}

impl<T> Inner<T>
where
T: Send + 'static,
{
async fn send(&self, msg: Option<T>) {
*self.data.lock().await = msg;
self.notify.notify_one();
}
}

impl<T> Drop for BusyDebouncer<T>
where
T: Send + 'static,
{
fn drop(&mut self) {
let mut dropping = Default::default();
std::mem::swap(&mut self.inner, &mut dropping);
tokio::spawn(async move { dropping.send(None).await });
}
}
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod build;
mod cmd;
mod common;
mod config;
mod debouncer;
mod hooks;
mod pipelines;
mod proxy;
Expand Down
9 changes: 7 additions & 2 deletions src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ impl ServeSystem {

/// Run the serve system.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn run(mut self) -> Result<()> {
pub async fn run(self) -> Result<()> {
// Perform an initial build
let mut build_done_rx = self.build_done_chan.subscribe();
self.watch.trigger_build().await;
let _build_res = build_done_rx.recv().await; // TODO: only open after a successful build.
drop(build_done_rx);

// Spawn the watcher & the server.
let _build_res = self.watch.build().await; // TODO: only open after a successful build.
let watch_handle = tokio::spawn(self.watch.run());
let server_handle = Self::spawn_server(
self.cfg.clone(),
Expand Down
38 changes: 23 additions & 15 deletions src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use tokio_stream::wrappers::BroadcastStream;

use crate::build::BuildSystem;
use crate::config::RtcWatch;
use crate::debouncer::BusyDebouncer;

/// Blacklisted path segments which are ignored by the watcher by default.
const BLACKLIST: [&str; 1] = [".git"];

/// A watch system wrapping a build system and a watcher.
pub struct WatchSystem {
/// The build system.
build: BuildSystem,
debouncer: BusyDebouncer<()>,
/// The current vector of paths to be ignored.
ignored_paths: Vec<PathBuf>,
/// A channel of FS watch events.
Expand All @@ -28,8 +28,6 @@ pub struct WatchSystem {
_watcher: RecommendedWatcher,
/// The application shutdown channel.
shutdown: BroadcastStream<()>,
/// Channel that is sent on whenever a build completes.
build_done_tx: Option<broadcast::Sender<()>>,
}

impl WatchSystem {
Expand All @@ -48,21 +46,36 @@ impl WatchSystem {

// Build dependencies.
let build = BuildSystem::new(cfg.build.clone(), Some(build_tx)).await?;

// Build debouncer, to only run when necessary
let debouncer = BusyDebouncer::new(build, move |build, ()| {
let mut build_done_tx = build_done_tx.clone();
Box::pin(async move {
let _res = build.build().await;

// TODO/NOTE: in the future, we will want to be able to pass along error info and
// other diagnostics info over the socket for use in an error overlay or console
// logging.
if let Some(tx) = build_done_tx.as_mut() {
let _ = tx.send(());
}
})
});

Ok(Self {
build,
debouncer,
ignored_paths: cfg.ignored_paths.clone(),
watch_rx,
build_rx,
_watcher,
shutdown: BroadcastStream::new(shutdown.subscribe()),
build_done_tx,
})
}

/// Run a build.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn build(&mut self) -> Result<()> {
self.build.build().await
pub async fn trigger_build(&self) {
self.debouncer.push(()).await;
}

/// Run the watch system, responding to events and triggering builds.
Expand Down Expand Up @@ -118,14 +131,9 @@ impl WatchSystem {
return; // Don't emit a notification as path is on the blacklist.
}

tracing::debug!("change detected in {:?}", ev_path);
let _res = self.build.build().await;
tracing::info!("change detected in {:?}", ev_path);

// TODO/NOTE: in the future, we will want to be able to pass along error info and other
// diagnostics info over the socket for use in an error overlay or console logging.
if let Some(tx) = self.build_done_tx.as_mut() {
let _ = tx.send(());
}
self.trigger_build().await;
}

fn update_ignore_list(&mut self, arg_path: PathBuf) {
Expand Down