Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overflows #155

Merged
merged 3 commits into from May 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Expand Up @@ -51,6 +51,9 @@ val io = (project in file("io"))
// method this(sbt.io.PollingWatchService,sbt.io.PollingWatchService#PollingThread,java.nio.file.Watchable,java.util.List)Unit in class sbt.io.PollingWatchService#PollingWatchKey does not have a correspondent in current version
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.this"),

// This is a private class
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.events"),

// moved JavaMilli to sbt.io
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli$"),
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli"),
Expand Down
70 changes: 65 additions & 5 deletions io/src/main/scala/sbt/internal/io/EventMonitor.scala
@@ -1,13 +1,17 @@
package sbt.internal.io

import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey }
import java.io.IOException
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }

import sbt.io.WatchService

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -135,8 +139,13 @@ private[sbt] object EventMonitor {
k.reset()
events
}
val allEvents = rawEvents.flatMap { e =>
Option(e.context).map(c => k.watchable.asInstanceOf[Path].resolve(c.asInstanceOf[Path]))
val keyPath = k.watchable.asInstanceOf[Path]
val allEvents = rawEvents.flatMap {
case e if e.kind.equals(OVERFLOW) =>
handleOverflow(k)
case e if !e.kind.equals(OVERFLOW) && e.context != null =>
Some(keyPath.resolve(e.context.asInstanceOf[Path]))
case _ => None
}
logger.debug(s"Received events:\n${allEvents.mkString("\n")}")
val (exist, notExist) = allEvents.partition(Files.exists(_))
Expand All @@ -146,11 +155,62 @@ private[sbt] object EventMonitor {
notExist.foreach(s.unregister)
updatedFiles ++ newFiles ++ notExist
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eatkins Thanks for the descriptive commit message on this commit.

/*
* In the case of an overflow, we must poll the file system to find out if there are added
* or removed directories. When there are new directories, we also want to return file
* events for the files that are found therein. Because an overflow is likely to occur while
* a directory is still being modified, we poll repeatedly until we get the same list of
* files consecutively. We will not trigger for any files that are updated while the WatchKey
* is in the OVERFLOW state. There is no good way to fix this without caching mtimes for
* all of the files, which I don't think is worth doing at this juncture.
*/
private def handleOverflow(key: WatchKey): Vector[Path] = lock.synchronized {
val allFiles = new mutable.HashSet[Path]
def getNewFiles(): Unit = {
allFiles.clear()
val path = key.watchable.asInstanceOf[Path]
Files.walkFileTree(
path,
new FileVisitor[Path] {
override def preVisitDirectory(dir: Path,
attrs: BasicFileAttributes): FileVisitResult = {
allFiles += dir
if (!registered.contains(dir)) registered += dir -> s.register(dir)
FileVisitResult.CONTINUE
}
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
allFiles += file
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: Path, exc: IOException): FileVisitResult =
FileVisitResult.SKIP_SUBTREE
override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
()
}

var oldFiles = mutable.Set.empty[Path]
do {
oldFiles = allFiles
getNewFiles()
} while (oldFiles != allFiles)
registered --= registered.collect {
case (d, k) if !Files.exists(d) =>
k.reset()
k.cancel()
d
}
allFiles.toVector
}

/*
* Returns new files found in new directory and any subdirectories, assuming that there is
* a recursive source with a base that is parent to the directory.
*/
def filesForNewDirectory(dir: Path): Iterator[Path] = {
private def filesForNewDirectory(dir: Path): Iterator[Path] = {
lazy val recursive =
s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive)
if (!registered.contains(dir) && recursive) {
Expand All @@ -164,7 +224,7 @@ private[sbt] object EventMonitor {
* Triggers only if there is no pending Trigger and the file is not in an anti-entropy
* quarantine.
*/
def maybeTrigger(path: Path): Unit =
private def maybeTrigger(path: Path): Unit =
if (s.accept(path)) {
if (recentEvents.get(path).fold(false)(!_.isOverdue))
logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint")
Expand Down
10 changes: 6 additions & 4 deletions io/src/main/scala/sbt/io/MacOSXWatchService.scala
Expand Up @@ -145,7 +145,7 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE

override def isValid: Boolean = valid.get

override def pollEvents(): JList[WatchEvent[_]] = {
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val result = new mutable.ArrayBuffer[WatchEvent[_]](events.size).asJava
events.drainTo(result)
val overflowCount = overflow.getAndSet(0)
Expand All @@ -167,8 +167,10 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE
private val overflow = new AtomicInteger()
private val valid = new AtomicBoolean(true)

@inline def addEvent(event: Event[JPath]): Unit = if (!events.offer(event)) {
overflow.incrementAndGet()
()
@inline def addEvent(event: Event[JPath]): Unit = this.synchronized {
if (!events.offer(event)) {
overflow.incrementAndGet()
()
}
}
}
37 changes: 32 additions & 5 deletions io/src/main/scala/sbt/io/Path.scala
Expand Up @@ -3,11 +3,20 @@
*/
package sbt.io

import java.io.File
import java.io.{ File, IOException }
import java.net.URL

import scala.collection.mutable
import java.nio.file.attribute._
import java.nio.file.{ Path => NioPath, LinkOption, FileSystem, Files }
import java.nio.file.{
FileSystem,
FileVisitResult,
FileVisitor,
Files,
LinkOption,
Path => NioPath
}

import scala.collection.JavaConverters._

final class RichFile(val asFile: File) extends AnyVal with RichNioPath {
Expand Down Expand Up @@ -439,9 +448,27 @@ private class DescendantOrSelfPathFinder(val parent: PathFinder, val filter: Fil
}

private def handleFileDescendant(file: File, fileSet: mutable.Set[File]): Unit = {
handleFile(file, fileSet)
for (childDirectory <- IO.wrapNull(file listFiles DirectoryFilter))
handleFileDescendant(new File(file, childDirectory.getName), fileSet)
Files.walkFileTree(
file.toPath,
new FileVisitor[NioPath] {
override def preVisitDirectory(dir: NioPath,
attrs: BasicFileAttributes): FileVisitResult = {
val file = dir.toFile
if (filter.accept(file)) fileSet += file
FileVisitResult.CONTINUE
}
override def visitFile(file: NioPath, attrs: BasicFileAttributes): FileVisitResult = {
val ioFile = file.toFile
if (filter.accept(ioFile)) fileSet += ioFile
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: NioPath, exc: IOException): FileVisitResult =
FileVisitResult.SKIP_SUBTREE
override def postVisitDirectory(dir: NioPath, exc: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
()
}
}

Expand Down
48 changes: 29 additions & 19 deletions io/src/main/scala/sbt/io/PollingWatchService.scala
Expand Up @@ -9,7 +9,9 @@ import java.nio.file.{
Watchable,
Path => JPath
}
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ArrayBlockingQueue
import java.util.{ List => JList }

import sbt.io.syntax._
Expand Down Expand Up @@ -63,7 +65,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg

override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
ensureNotClosed()
val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]])
val key = new PollingWatchKey(path)
keys += path -> key
thread.setFileTimes(path)
watched += path -> events
Expand All @@ -80,19 +82,20 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
if (closed) throw new ClosedWatchServiceException

private class PollingThread(delay: FiniteDuration) extends Thread {
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey]
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey]
private[this] val _initDone = new AtomicBoolean(false)
private[this] var fileTimes: Map[JPath, Long] = Map.empty

private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R =
private[PollingWatchService] def withKeys[R](
f: mutable.LinkedHashSet[PollingWatchKey] => R): R =
_keysWithEvents.synchronized(f(_keysWithEvents))

@deprecated("The initDone variable should not be accessed externally", "1.1.17")
def initDone: Boolean = _initDone.get()
@deprecated("The initDone variable should not be set externally", "1.1.17")
def initDone_=(initDone: Boolean) = _initDone.set(initDone)
@deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17")
def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents
def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents

override def run(): Unit =
while (!closed) {
Expand Down Expand Up @@ -124,7 +127,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized {
keys.get(path).foreach { k =>
_keysWithEvents += k
k.events.add(ev)
k.offer(ev)
_keysWithEvents.notifyAll()
}
}
Expand Down Expand Up @@ -173,30 +176,37 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
}
}

modifiedFiles.foreach {
case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
modifiedFiles.foreach { file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
}
}

}

private class PollingWatchKey(
override val watchable: Watchable,
val events: JList[WatchEvent[_]]
) extends WatchKey {
private object Overflow
extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]])
private class PollingWatchKey(override val watchable: Watchable) extends WatchKey {
private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256)
private[this] val hasOverflow = new AtomicBoolean(false)
override def cancel(): Unit = ()
override def isValid(): Boolean = true
override def pollEvents(): java.util.List[WatchEvent[_]] = {
val evs = new java.util.ArrayList[WatchEvent[_]](events)
events.clear()
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val evs = new java.util.ArrayList[WatchEvent[_]]()
val overflow = hasOverflow.getAndSet(false)
events.drainTo(evs)
if (overflow) evs.add(Overflow)
evs
}
override def reset(): Boolean = true
def offer(ev: WatchEvent[_]): Unit = this.synchronized {
if (!hasOverflow.get && !events.offer(ev)) {
hasOverflow.set(true)
}
}
}

}
Expand Down
Expand Up @@ -13,6 +13,6 @@ object DefaultWatchServiceSpec {

class DefaultWatchServiceSpec
extends SourceModificationWatchSpec(
if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
_ => if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
DefaultWatchServiceSpec.pollDelay
)
Expand Up @@ -5,5 +5,5 @@ import sbt.io.PollingWatchService
import scala.concurrent.duration._

class PollingWatchServiceSpec
extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds),
extends SourceModificationWatchSpec((d: FiniteDuration) => new PollingWatchService(d),
DefaultWatchServiceSpec.pollDelay)