Skip to content

Commit

Permalink
Handle overflows in EventMonitor
Browse files Browse the repository at this point in the history
While working on a related project, I realized that it's essential to
handle the OVERFLOW case for a WatchEvent. The EventMonitor relies on
subdirectory creation events to create a new watch key for each
subdirectory. If those events are missed due to overflow, the
EventMonitor will not be monitoring the newly created subdirectories. To
fix this, when the EventMonitor detects an overflow, it will now poll
the directory of the watch key repeatedly until it stabilizes. Once that
happens, it will register all of the newly found directories (if any)
with the watch service and will create events for all of the files in
said directories.

It is still possible that a trigger could be missed in the event that a
lot of files in a watched directory cause an overflow before a valid
source file is touched and before the EventMonitor handles the overflow.
I am not particularly worried about this.

I also discovered that the PollingWatchService was using an unbounded
list of events. This seemed risky so I switched to using an
ArrayBlockingQueue of size 256, which is what the MacOSXWatchService
uses as well. Since the queue is now bounded, I added overflow events to
the PollingWatchService as well.

I also added synchronization for a few methods in PollingWatchey and
MacOSXWatchKey. This is more consistent with the AbstractWatchKey in the
jdk. The other methods return constant values so synchronization is
pointless.

I ran a number of travis builds against the content of this commit and
none of them failed.
  • Loading branch information
eatkins committed May 18, 2018
1 parent caf64f4 commit 2699a84
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 53 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Expand Up @@ -51,6 +51,9 @@ val io = (project in file("io"))
// method this(sbt.io.PollingWatchService,sbt.io.PollingWatchService#PollingThread,java.nio.file.Watchable,java.util.List)Unit in class sbt.io.PollingWatchService#PollingWatchKey does not have a correspondent in current version
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.this"),

// This is a private class
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.events"),

// moved JavaMilli to sbt.io
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli$"),
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli"),
Expand Down
70 changes: 65 additions & 5 deletions io/src/main/scala/sbt/internal/io/EventMonitor.scala
@@ -1,13 +1,17 @@
package sbt.internal.io

import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey }
import java.io.IOException
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }

import sbt.io.WatchService

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -135,8 +139,13 @@ private[sbt] object EventMonitor {
k.reset()
events
}
val allEvents = rawEvents.flatMap { e =>
Option(e.context).map(c => k.watchable.asInstanceOf[Path].resolve(c.asInstanceOf[Path]))
val keyPath = k.watchable.asInstanceOf[Path]
val allEvents = rawEvents.flatMap {
case e if e.kind.equals(OVERFLOW) =>
handleOverflow(k)
case e if !e.kind.equals(OVERFLOW) && e.context != null =>
Some(keyPath.resolve(e.context.asInstanceOf[Path]))
case _ => None
}
logger.debug(s"Received events:\n${allEvents.mkString("\n")}")
val (exist, notExist) = allEvents.partition(Files.exists(_))
Expand All @@ -146,11 +155,62 @@ private[sbt] object EventMonitor {
notExist.foreach(s.unregister)
updatedFiles ++ newFiles ++ notExist
}

/*
* In the case of an overflow, we must poll the file system to find out if there are added
* or removed directories. When there are new directories, we also want to return file
* events for the files that are found therein. Because an overflow is likely to occur while
* a directory is still being modified, we poll repeatedly until we get the same list of
* files consecutively. We will not trigger for any files that are updated while the WatchKey
* is in the OVERFLOW state. There is no good way to fix this without caching mtimes for
* all of the files, which I don't think is worth doing at this juncture.
*/
private def handleOverflow(key: WatchKey): Vector[Path] = lock.synchronized {
val allFiles = new mutable.HashSet[Path]
def getNewFiles(): Unit = {
allFiles.clear()
val path = key.watchable.asInstanceOf[Path]
Files.walkFileTree(
path,
new FileVisitor[Path] {
override def preVisitDirectory(dir: Path,
attrs: BasicFileAttributes): FileVisitResult = {
allFiles += dir
if (!registered.contains(dir)) registered += dir -> s.register(dir)
FileVisitResult.CONTINUE
}
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
allFiles += file
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: Path, exc: IOException): FileVisitResult =
FileVisitResult.SKIP_SUBTREE
override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
()
}

var oldFiles = mutable.Set.empty[Path]
do {
oldFiles = allFiles
getNewFiles()
} while (oldFiles != allFiles)
registered --= registered.collect {
case (d, k) if !Files.exists(d) =>
k.reset()
k.cancel()
d
}
allFiles.toVector
}

/*
* Returns new files found in new directory and any subdirectories, assuming that there is
* a recursive source with a base that is parent to the directory.
*/
def filesForNewDirectory(dir: Path): Iterator[Path] = {
private def filesForNewDirectory(dir: Path): Iterator[Path] = {
lazy val recursive =
s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive)
if (!registered.contains(dir) && recursive) {
Expand All @@ -164,7 +224,7 @@ private[sbt] object EventMonitor {
* Triggers only if there is no pending Trigger and the file is not in an anti-entropy
* quarantine.
*/
def maybeTrigger(path: Path): Unit =
private def maybeTrigger(path: Path): Unit =
if (s.accept(path)) {
if (recentEvents.get(path).fold(false)(!_.isOverdue))
logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint")
Expand Down
10 changes: 6 additions & 4 deletions io/src/main/scala/sbt/io/MacOSXWatchService.scala
Expand Up @@ -145,7 +145,7 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE

override def isValid: Boolean = valid.get

override def pollEvents(): JList[WatchEvent[_]] = {
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val result = new mutable.ArrayBuffer[WatchEvent[_]](events.size).asJava
events.drainTo(result)
val overflowCount = overflow.getAndSet(0)
Expand All @@ -167,8 +167,10 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE
private val overflow = new AtomicInteger()
private val valid = new AtomicBoolean(true)

@inline def addEvent(event: Event[JPath]): Unit = if (!events.offer(event)) {
overflow.incrementAndGet()
()
@inline def addEvent(event: Event[JPath]): Unit = this.synchronized {
if (!events.offer(event)) {
overflow.incrementAndGet()
()
}
}
}
48 changes: 29 additions & 19 deletions io/src/main/scala/sbt/io/PollingWatchService.scala
Expand Up @@ -9,7 +9,9 @@ import java.nio.file.{
Watchable,
Path => JPath
}
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ArrayBlockingQueue
import java.util.{ List => JList }

import sbt.io.syntax._
Expand Down Expand Up @@ -63,7 +65,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg

override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
ensureNotClosed()
val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]])
val key = new PollingWatchKey(path)
keys += path -> key
thread.setFileTimes(path)
watched += path -> events
Expand All @@ -80,19 +82,20 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
if (closed) throw new ClosedWatchServiceException

private class PollingThread(delay: FiniteDuration) extends Thread {
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey]
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey]
private[this] val _initDone = new AtomicBoolean(false)
private[this] var fileTimes: Map[JPath, Long] = Map.empty

private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R =
private[PollingWatchService] def withKeys[R](
f: mutable.LinkedHashSet[PollingWatchKey] => R): R =
_keysWithEvents.synchronized(f(_keysWithEvents))

@deprecated("The initDone variable should not be accessed externally", "1.1.17")
def initDone: Boolean = _initDone.get()
@deprecated("The initDone variable should not be set externally", "1.1.17")
def initDone_=(initDone: Boolean) = _initDone.set(initDone)
@deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17")
def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents
def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents

override def run(): Unit =
while (!closed) {
Expand Down Expand Up @@ -124,7 +127,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized {
keys.get(path).foreach { k =>
_keysWithEvents += k
k.events.add(ev)
k.offer(ev)
_keysWithEvents.notifyAll()
}
}
Expand Down Expand Up @@ -173,30 +176,37 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
}
}

modifiedFiles.foreach {
case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
modifiedFiles.foreach { file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
}
}

}

private class PollingWatchKey(
override val watchable: Watchable,
val events: JList[WatchEvent[_]]
) extends WatchKey {
private object Overflow
extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]])
private class PollingWatchKey(override val watchable: Watchable) extends WatchKey {
private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256)
private[this] val hasOverflow = new AtomicBoolean(false)
override def cancel(): Unit = ()
override def isValid(): Boolean = true
override def pollEvents(): java.util.List[WatchEvent[_]] = {
val evs = new java.util.ArrayList[WatchEvent[_]](events)
events.clear()
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val evs = new java.util.ArrayList[WatchEvent[_]]()
val overflow = hasOverflow.getAndSet(false)
events.drainTo(evs)
if (overflow) evs.add(Overflow)
evs
}
override def reset(): Boolean = true
def offer(ev: WatchEvent[_]): Unit = this.synchronized {
if (!hasOverflow.get && !events.offer(ev)) {
hasOverflow.set(true)
}
}
}

}
Expand Down
Expand Up @@ -13,6 +13,6 @@ object DefaultWatchServiceSpec {

class DefaultWatchServiceSpec
extends SourceModificationWatchSpec(
if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
_ => if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
DefaultWatchServiceSpec.pollDelay
)
Expand Up @@ -5,5 +5,5 @@ import sbt.io.PollingWatchService
import scala.concurrent.duration._

class PollingWatchServiceSpec
extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds),
extends SourceModificationWatchSpec((d: FiniteDuration) => new PollingWatchService(d),
DefaultWatchServiceSpec.pollDelay)
61 changes: 38 additions & 23 deletions io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala
@@ -1,7 +1,7 @@
package sbt.internal.io

import java.io.IOException
import java.nio.file.{ ClosedWatchServiceException, Paths }
import java.nio.file.{ ClosedWatchServiceException, Files, Paths }

import org.scalatest.{ Assertion, FlatSpec, Matchers }
import sbt.io.syntax._
Expand All @@ -11,10 +11,11 @@ import scala.annotation.tailrec
import scala.concurrent.duration._

abstract class SourceModificationWatchSpec(
getService: => WatchService,
getServiceWithPollDelay: FiniteDuration => WatchService,
pollDelay: FiniteDuration
) extends FlatSpec
with Matchers {
def getService = getServiceWithPollDelay(10.milliseconds)
val maxWait = 2 * pollDelay
it should "detect modified files" in IO.withTemporaryDirectory { dir =>
val parentDir = dir / "src" / "watchme"
Expand Down Expand Up @@ -348,28 +349,42 @@ abstract class SourceModificationWatchSpec(
} finally monitor.close()
}

it should "reset keys" in IO.withTemporaryDirectory { dir =>
val parentDir = dir / "src" / "watchme"
val file = parentDir / "Foo.scala"

writeNewFile(file, "foo")
// Longer timeout because there are many file system operations
val deadline = 5.seconds.fromNow
val monitor = defaultMonitor(getService, parentDir, tc = () => deadline.isOverdue)
try {
val n = 1000
val triggered0 = watchTest(monitor) {
(0 to n).foreach(i => IO.write(parentDir / s"Foo$i.scala", s"foo$i"))
}
assert(triggered0)
assert(IO.read(file) == s"foo")
it should "handle rapid creation of many subdirectories and files" in IO.withTemporaryDirectory {
dir =>
val parentDir = dir / "src" / "watchme"
Files.createDirectories(parentDir.toPath)
val subdirCount = 2000
val subdirFileCount = 4
var files = Seq.empty[File]

// Longer timeout because there are many file system operations. This can be very expensive
// especially in the PollingWatchSpec since both the PollingWatchService and the EventMonitor
// overflow handler are hammering the file system. To minimize the conflicts, we set a long
// interval between polls in the PollingWatchService using getServiceWithPollDelay.
val deadline = 20.seconds.fromNow
val monitor =
defaultMonitor(getServiceWithPollDelay(1.second), parentDir, tc = () => deadline.isOverdue)
try {
val triggered0 = watchTest(monitor) {
val subdirs =
(1 to subdirCount).map(i =>
Files.createDirectories(parentDir.toPath.resolve(s"subdir-$i")))
files = subdirs.flatMap { subdir =>
subdir.toFile +: (1 to subdirFileCount).map { j =>
Files.write(subdir.resolve(s"file-$j.scala"), s"foo".getBytes).toFile
}
}
}
val lastFile = files.last
assert(triggered0)
assert(IO.read(lastFile) == s"foo")

val triggered1 = watchTest(monitor) {
IO.write(file, "baz")
}
assert(triggered1)
assert(IO.read(file) == "baz")
} finally monitor.close()
val triggered1 = watchTest(monitor) {
IO.write(lastFile, "baz")
}
assert(triggered1)
assert(IO.read(lastFile) == "baz")
} finally monitor.close()
}

"WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in {
Expand Down

0 comments on commit 2699a84

Please sign in to comment.