/
PollingWatchService.scala
219 lines (191 loc) · 7.11 KB
/
PollingWatchService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package sbt.io
import java.nio.file.StandardWatchEventKinds._
import java.nio.file.{
ClosedWatchServiceException,
Files,
WatchEvent,
WatchKey,
Watchable,
Path => JPath
}
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ArrayBlockingQueue
import java.util.{ List => JList }
import sbt.io.syntax._
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.{ Duration, FiniteDuration }
/** A `WatchService` that polls the filesystem every `delay`. */
class PollingWatchService(delay: FiniteDuration) extends WatchService with Unregisterable {
private var closed: Boolean = false
private val thread: PollingThread = new PollingThread(delay)
private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty
private val pathLengthOrdering: Ordering[JPath] =
Ordering.fromLessThan {
case (null, _) | (_, null) => true
case (a, b) =>
a.toString.length < b.toString.length
}
private val watched: mutable.Map[JPath, Seq[WatchEvent.Kind[JPath]]] =
mutable.Map.empty
override def close(): Unit =
closed = true
override def init(): Unit = {
ensureNotClosed()
thread.start()
}
override def poll(timeout: Duration): WatchKey = thread.withKeys { keys =>
ensureNotClosed()
if (keys.isEmpty) {
keys.wait(timeout.toMillis)
}
keys.headOption.map { k =>
keys -= k
k
}.orNull
}
override def pollEvents(): Map[WatchKey, immutable.Seq[WatchEvent[JPath]]] =
thread.withKeys { keys =>
import scala.collection.JavaConverters._
ensureNotClosed()
val events =
keys.map(k => k -> k.pollEvents().asScala.asInstanceOf[Seq[WatchEvent[JPath]]].toIndexedSeq)
keys.clear()
events.toMap
}
override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
ensureNotClosed()
val key = new PollingWatchKey(path)
keys += path -> key
thread.setFileTimes(path)
watched += path -> events
key
}
override def unregister(path: JPath): Unit = {
ensureNotClosed()
watched -= path
()
}
private def ensureNotClosed(): Unit =
if (closed) throw new ClosedWatchServiceException
private class PollingThread(delay: FiniteDuration) extends Thread {
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey]
private[this] val _initDone = new AtomicBoolean(false)
private[this] var fileTimes: Map[JPath, Long] = Map.empty
private[PollingWatchService] def withKeys[R](
f: mutable.LinkedHashSet[PollingWatchKey] => R): R =
_keysWithEvents.synchronized(f(_keysWithEvents))
@deprecated("The initDone variable should not be accessed externally", "1.1.17")
def initDone: Boolean = _initDone.get()
@deprecated("The initDone variable should not be set externally", "1.1.17")
def initDone_=(initDone: Boolean) = _initDone.set(initDone)
@deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17")
def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents
override def run(): Unit =
while (!closed) {
populateEvents()
_initDone.synchronized {
_initDone.set(true)
_initDone.notify()
}
Thread.sleep(delay.toMillis)
}
override def start(): Unit = {
super.start()
_initDone.synchronized { while (!_initDone.get()) _initDone.wait() }
}
private[PollingWatchService] def setFileTimes(path: JPath): Unit = {
val entries = path.toFile.allPaths.get.map(f => f.toPath -> IO.getModifiedTimeOrZero(f))
fileTimes.synchronized(fileTimes ++= entries)
}
def getFileTimes(): Map[JPath, Long] = {
val results = mutable.Map.empty[JPath, Long]
watched.toSeq.sortBy(_._1)(pathLengthOrdering).foreach {
case (p, _) =>
if (!results.contains(p))
p.toFile.allPaths.get.foreach(f => results += f.toPath -> IO.getModifiedTimeOrZero(f))
}
results.toMap
}
private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized {
keys.get(path).foreach { k =>
_keysWithEvents += k
k.offer(ev)
_keysWithEvents.notifyAll()
}
}
private def populateEvents(): Unit = {
val (deletedFiles, createdFiles, modifiedFiles) = fileTimes.synchronized {
val newFileTimes = getFileTimes()
val newFiles = newFileTimes.keySet
val oldFiles = fileTimes.keySet
val deletedFiles = (oldFiles -- newFiles).toSeq
val createdFiles = (newFiles -- oldFiles).toSeq
val modifiedFiles = fileTimes.collect {
case (p, oldTime) if newFileTimes.getOrElse(p, 0L) > oldTime => p
}
fileTimes = newFileTimes
(deletedFiles, createdFiles, modifiedFiles)
}
deletedFiles
.map { deleted =>
val parent = deleted.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_DELETE)) {
val ev = new PollingWatchEvent(parent.relativize(deleted), ENTRY_DELETE)
addEvent(parent, ev)
}
deleted
}
.foreach(watched -= _)
createdFiles.sorted(pathLengthOrdering).foreach {
case dir if Files.isDirectory(dir) =>
val parent = dir.getParent
val parentEvents = watched.getOrElse(parent, Seq.empty)
if (parentEvents.contains(ENTRY_CREATE)) {
val ev = new PollingWatchEvent(parent.relativize(dir), ENTRY_CREATE)
addEvent(parent, ev)
}
case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_CREATE)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_CREATE)
addEvent(parent, ev)
}
}
modifiedFiles.foreach { file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
}
}
}
private object Overflow
extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]])
private class PollingWatchKey(override val watchable: Watchable) extends WatchKey {
private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256)
private[this] val hasOverflow = new AtomicBoolean(false)
override def cancel(): Unit = ()
override def isValid(): Boolean = true
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val evs = new java.util.ArrayList[WatchEvent[_]]()
val overflow = hasOverflow.getAndSet(false)
events.drainTo(evs)
if (overflow) evs.add(Overflow)
evs
}
override def reset(): Boolean = true
def offer(ev: WatchEvent[_]): Unit = this.synchronized {
if (!hasOverflow.get && !events.offer(ev)) {
hasOverflow.set(true)
}
}
}
}
private class PollingWatchEvent(
override val context: JPath,
override val kind: WatchEvent.Kind[JPath]
) extends WatchEvent[JPath] {
override val count: Int = 1
}