Skip to content

Commit

Permalink
Reduce watch service latency
Browse files Browse the repository at this point in the history
There is often noticeable lag between when a file event happens and when
the build is triggered. There is also often latency when the user
presses enter to interrupt the watch. Both are due to the way that
SourceModificationWatch polls for results. It can be removed by having
the main thread block on an event queue that is asynchronously filled by
threads that wait for user input and file events. I made a similar
change in CloseWatch.

The new EventMonitor is where most of the logic lives. It is implemented
as a block box that effectively provides three methods:
1) watch() -- block the main thread until a user input or watched file
   event occurs, returning true if the build was triggered
2) state() -- a snapshot of the current watch state. This is primarily
   for legacy compatibility with sbt, which has a number of methods that
   take the state as input. In practice, all that sbt really needs most
   of the time is the count of the number of build triggers
3) close() -- shutdown any threads or service started up by the event
   monitor

I implemented the EventMonitor as a block box so that it would be
straightforward to change the existing implementation or add new
implementations without having to break forward and backwards
compatibility with sbt. In particular, I can envision adding a second
EventMonitor that uses a persistent file system cache for file events
instead of the ad-hoc cache that currently exists in the
EventMonitorImpl.eventThread.

At the moment, the one implementation is EventMonitorImpl. It spins up
two threads for monitoring user input and the file events. Both write
`EventMonitor.Events` to a concurrent queue that the main thread reads
from. User input events supersede file triggers. When the user hits
enter, the queue is cleared and filled with the exit event. Otherwise,
the event thread polls the watch service for events. When it receives a
file event, it adds the event to a cache of recent events. This cache is
used to prevent multiple builds from being triggered by the same file*.
The eventThread also detects when directories are created or deleted and
registers or unregisters the directory with the watch service as needed.

I also stopped registering all of the files. Only directories are
registered now. The registered files just waste memory.

This commit also adds logging of watch events. Unfortunately, I can't
get the logging to actually work at the debug level due to
sbt/sbt#4097, but once that issue is fixed,
logging with the new EventMonitor should work.

I added a simple test that the anti-entropy works and made some small
adjustments to the existing tests to make them work with the new
implementation.

For backwards compatibility with older versions of sbt, I re-implement
SourceModificationWatch.watch to wrap an EventMonitor that we shutdown
after each trigger. This is also the reason that I had to add two close
methods to EventMonitor -- one that shuts down the watch service, and
one that doesn't.

* When neovim, for example, saves a file. It moves the buffer into the
file location which can trigger a delete and a create event on the file.
Without the anti-entropy timeout, there would be two build triggered for
the same file save.
  • Loading branch information
eatkins committed Apr 13, 2018
1 parent bb12d4f commit 9d0bc17
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 103 deletions.
194 changes: 194 additions & 0 deletions io/src/main/scala/sbt/internal/io/EventMonitor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package sbt.internal.io

import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey }
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }

import sbt.io.WatchService

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration._

private[sbt] sealed trait EventMonitor extends AutoCloseable {

/** A snapshot of the WatchState that includes the number of build triggers and watch sources. */
def state(): WatchState

/** Block indefinitely until the monitor receives a file event or the user stops the watch. */
def watch(): Boolean

/** Cleans up any service and/or threads started by the monitor */
override def close(): Unit = close(closeService = true)
/*
* Workaround for the legacy implementation of SourceModificationWatch.watch
*/
private[io] def close(closeService: Boolean): Unit
}

object EventMonitor {
private sealed trait Event
private case object Cancelled extends Event
private case class Triggered(path: Path) extends Event

private class EventMonitorImpl private[EventMonitor] (
private[this] val service: WatchService,
private[this] val events: ArrayBlockingQueue[Event],
private[this] val eventThread: Looper with HasWatchState,
private[this] val userInputThread: Looper,
private[this] val logger: Logger)
extends EventMonitor {

override def state(): WatchState = eventThread.state()

override def watch(): Boolean = events.take() match {
case Cancelled => false
case Triggered(path) =>
logger.debug(s"Triggered watch event due to updated path: $path")
eventThread.incrementCount()
true
}

override def close(closeState: Boolean): Unit = {
if (closed.compareAndSet(false, true)) {
if (closeState) service.close()
userInputThread.close()
eventThread.close()
logger.debug("Closed EventMonitor")
}
}

private[this] val closed = new AtomicBoolean(false)
}

def apply(state: WatchState,
delay: FiniteDuration,
antiEntropy: FiniteDuration,
terminationCondition: => Boolean,
logger: Logger = NullLogger): EventMonitor = {
val events = new ArrayBlockingQueue[Event](1)
val eventThread = newEventsThread(delay, antiEntropy, state, events, logger)
val userInputThread = newUserInputThread(terminationCondition, events, logger)
new EventMonitorImpl(state.service, events, eventThread, userInputThread, logger)
}

private trait HasWatchState {
def state(): WatchState
def incrementCount(): Unit
}
private def newEventsThread(delay: FiniteDuration,
antiEntropy: FiniteDuration,
s: WatchState,
events: ArrayBlockingQueue[Event],
logger: Logger): Looper with HasWatchState = {
var recentEvents = Map.empty[Path, Deadline]
new Looper(s"watch-state-event-thread-${eventThreadId.incrementAndGet()}") with HasWatchState {
private[this] val lock = new Object
private[this] var count = s.count
private[this] var registered = s.registered
def incrementCount(): Unit = lock.synchronized { count += 1 }
def state(): WatchState = lock.synchronized(s.withCount(count).withRegistered(registered))
override def loop(): Unit = {
recentEvents = recentEvents.filterNot(_._2.isOverdue)
getFilesForKey(s.service.poll(delay)).foreach(maybeTrigger)
}
def getFilesForKey(key: WatchKey): Seq[Path] = key match {
case null => Nil
case k =>
val allEvents = k.pollEvents.asScala
.map(e => k.watchable.asInstanceOf[Path].resolve(e.context.asInstanceOf[Path]))
logger.debug(s"Received events:\n${allEvents.mkString("\n")}")
val (exist, notExist) = allEvents.partition(Files.exists(_))
val (updatedDirectories, updatedFiles) = exist.partition(Files.isDirectory(_))
val newFiles = updatedDirectories.flatMap(filesForNewDirectory)
lock.synchronized { registered --= notExist }
notExist.foreach(s.unregister)
updatedFiles ++ newFiles ++ notExist
}
/*
* Returns new files found in new directory and any subdirectories, assuming that there is
* a recursive source with a base that is parent to the directory.
*/
def filesForNewDirectory(dir: Path): Seq[Path] = {
lazy val recursive =
s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive)
if (!registered.contains(dir) && recursive) {
val dirs = Files.walk(dir).iterator.asScala.filter(Files.isDirectory(_))
val newDirs = dirs.map(d => d -> s.register(d)).toIndexedSeq
lock.synchronized { registered ++= newDirs }
Files.walk(dir).iterator.asScala.toSeq
} else Nil
}
/*
* Triggers only if there is no pending Trigger and the file is not in an anti-entropy
* quarantine.
*/
def maybeTrigger(path: Path): Unit =
if (s.accept(path)) {
if (recentEvents.get(path).fold(false)(!_.isOverdue))
logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint")
else
events.peek() match {
case Cancelled =>
logger.debug(s"Watch cancelled, not offering event for path $path")
case _ =>
recentEvents += path -> antiEntropy.fromNow
if (!events.offer(Triggered(path))) {
logger.debug(s"Event already pending, dropping event for path: $path")
}
}
}
}
}
// Shutup the compiler about unused arguments
@inline private[this] def ignoreArg(arg: => Any): Unit = if (true) () else { arg; () }
trait Logger {
def debug(msg: => Any): Unit = ignoreArg(msg)
}
object NullLogger extends Logger
private def newUserInputThread(terminationCondition: => Boolean,
events: ArrayBlockingQueue[Event],
logger: Logger): Looper =
new Looper(s"watch-state-user-input-${userInputId.incrementAndGet}") {
override final def loop(): Unit = {
if (terminationCondition) {
logger.debug("Received termination condition. Stopping watch...")
while (!events.offer(Cancelled)) {
events.clear()
}
} else {}
}
}

private abstract class Looper(name: String) extends Thread(name) with AutoCloseable {
private[this] var stopped = false
def isStopped: Boolean = this.synchronized(stopped)
def loop(): Unit
@tailrec
override final def run(): Unit = {
try {
if (!isStopped) {
loop()
}
} catch {
case (_: ClosedWatchServiceException | _: InterruptedException) =>
this.synchronized { stopped = true }
}
if (!isStopped) {
run()
}
}
def close(): Unit = this.synchronized {
if (!stopped) {
stopped = true
this.interrupt()
this.join(5000)
}
}
setDaemon(true)
start()
}
private val eventThreadId = new AtomicInteger(0)
private val userInputId = new AtomicInteger(0)

}
117 changes: 49 additions & 68 deletions io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,36 @@
*/
package sbt.internal.io

import java.nio.file.{ Files, Path, WatchEvent, WatchKey }
import java.nio.file.StandardWatchEventKinds._
import java.nio.file.{ WatchService => _, _ }

import sbt.io.{ DirectoryFilter, FileFilter, WatchService, AllPassFilter, NothingFilter }
import sbt.io._
import sbt.io.syntax._

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import scala.collection.JavaConverters._
import scala.concurrent.duration._

private[sbt] object SourceModificationWatch {

/**
* Checks for modifications on the file system every `delay`,
* until changes are detected or `terminationCondition` evaluates to `true`.
* Uses default anti-entropy time of 40.milliseconds.
*/
@tailrec
@deprecated("This is superseded by EventMonitor.watch", "1.1.7")
def watch(delay: FiniteDuration, state: WatchState)(
terminationCondition: => Boolean
): (Boolean, WatchState) = {
if (state.count == 0) (true, state.withCount(1))
else {
val events =
state.pollEvents().map(expandEvent)

if (events.isEmpty) {
if (terminationCondition) {
(false, state)
} else {
Thread.sleep(delay.toMillis)
watch(delay, state)(terminationCondition)
}
} else {
val previousFiles = state.registered.keySet
val newFiles = state.sources.flatMap(_.getUnfilteredPaths()).toSet
val createdFiles = newFiles -- previousFiles
val deletedFiles = previousFiles -- newFiles

// We may have events that are not relevant (e.g., created an empty directory.)
// We filter out those changes, so that we don't trigger unnecessarily.
val filteredDeleted = deletedFiles.filter(p => state.sources.exists(_.accept(p, false)))
val filteredCreated = createdFiles.filter(p => state.sources.exists(_.accept(p, false)))
val filteredModified = events.collect {
case (p, ENTRY_MODIFY) if state.sources.exists(_.accept(p, false)) => p
}

// Register and remove _unfiltered_ files. This is correct because directories
// are likely to be filtered out (for instance), but we should still add them
// to the files that are watched.
// We don't increment count because we don't know yet if we'll trigger.
val newState = state ++ createdFiles -- deletedFiles

if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) {
(true, newState.withCount(newState.count + 1))
} else {
Thread.sleep(delay.toMillis)
watch(delay, newState)(terminationCondition)
}
}
}
}

private def expandEvent(event: (Path, WatchEvent[_])): (Path, WatchEvent.Kind[Path]) = {
event match {
case (base, ev) =>
val fullPath = Option(ev.context().asInstanceOf[Path]) match {
case Some(path) => base.resolve(path)
case None => base
}
val kind = ev.kind().asInstanceOf[WatchEvent.Kind[Path]]
(fullPath, kind)
terminationCondition: => Boolean): (Boolean, WatchState) = {
if (state.count == 0) {
(true, state.withCount(1))
} else {
val eventMonitor = EventMonitor(state, delay, 40.milliseconds, {
Thread.sleep(10)
terminationCondition
})
try {
val triggered = eventMonitor.watch()
(triggered, eventMonitor.state())
} finally eventMonitor.close(closeService = false)
}
}
}
Expand All @@ -81,17 +41,22 @@ private[sbt] object SourceModificationWatch {
private[sbt] final class WatchState private (
val count: Int,
private[sbt] val sources: Seq[Source],
service: WatchService,
private[sbt] val service: WatchService,
private[sbt] val registered: Map[Path, WatchKey]
) {
) extends AutoCloseable {
def accept(p: Path): Boolean = sources.exists(_.accept(p))
def unregister(path: Path): Unit = service match {
case s: Unregisterable => s.unregister(path)
case _ =>
}

/** Removes all of `fs` from the watched paths. */
private[sbt] def --(fs: Iterable[Path]): WatchState = {
for {
f <- fs
wk <- registered.get(f)
if registered.values.count(_ == wk) <= 1
} service.unregister(wk.watchable().asInstanceOf[Path])
} unregister(wk.watchable().asInstanceOf[Path])
withRegistered(registered -- fs)
}

Expand All @@ -101,12 +66,11 @@ private[sbt] final class WatchState private (
fs.filter(Files.exists(_)).foldLeft(registered) {
case (ks, d) if Files.isDirectory(d) =>
if (ks.contains(d)) ks
else ks + (d -> service.register(d, WatchState.events: _*))

else ks + (d -> register(d))
case (ks, f) =>
val parent = f.getParent
if (ks.contains(parent)) ks + (f -> ks(parent))
else ks + (f -> service.register(parent, WatchState.events: _*))
if (!ks.contains(parent)) ks + (parent -> register(parent))
else ks
}
withRegistered(newKeys)
}
Expand All @@ -119,13 +83,21 @@ private[sbt] final class WatchState private (
}
}

/** register a path with the watch service */
private[sbt] def register(path: Path): WatchKey = service.register(path, WatchState.events: _*)

/** A new state, with a new `count`. */
private[sbt] def withCount(count: Int): WatchState =
new WatchState(count, sources, service, registered)

/** A new state, with new keys registered. */
private[sbt] def withRegistered(registered: Map[Path, WatchKey]): WatchState =
new WatchState(count, sources, service, registered)

/** Shutsdown the EventMonitor and the watch service. */
override def close(): Unit = {
service.close()
}
}

/**
Expand Down Expand Up @@ -172,7 +144,7 @@ final class Source(
def withRecursive(recursive: Boolean): Source =
new Source(base, includeFilter, excludeFilter, recursive)

override def toString =
override def toString: String =
s"""Source(
| base = $base,
| includeFilter = $includeFilter,
Expand Down Expand Up @@ -202,9 +174,18 @@ private[sbt] object WatchState {
* @return An initial `WatchState`.
*/
def empty(service: WatchService, sources: Seq[Source]): WatchState = {
val initFiles = sources.flatMap(_.getUnfilteredPaths())
val initFiles = sources.flatMap {
case s if s.recursive =>
val base = s.base.toPath
if (Files.exists(base)) {
Files.walk(base).iterator.asScala.collect {
case d if Files.isDirectory(d) => d.toRealPath()
}
} else Seq(base)
case s => Seq(s.base.toPath)
}.sorted
assert(initFiles.nonEmpty)
val initState = new WatchState(0, sources, service, Map.empty) ++ initFiles
val initState = new WatchState(count = 1, sources, service, Map.empty) ++ initFiles
service.init()
initState
}
Expand Down
Loading

0 comments on commit 9d0bc17

Please sign in to comment.