Skip to content

Commit

Permalink
Significant changes
Browse files Browse the repository at this point in the history
  • Loading branch information
propensive committed Mar 23, 2024
1 parent e2081c9 commit 652d109
Showing 1 changed file with 71 additions and 67 deletions.
138 changes: 71 additions & 67 deletions src/core/watch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package surveillance
import turbulence.*
import rudiments.*
import vacuous.*
import digression.*
import parasite.*, threadModels.platform
import feudalism.*
import fulminate.*
Expand All @@ -35,30 +34,23 @@ case class WatchError()
extends Error(msg"the operating system's limit on the number of paths that can be watched has been exceeded")

extension [PathType: GenericPath](path: PathType)
def watch[ResultType](lambda: WatchSet => ResultType): ResultType raises WatchError =
def watch[ResultType](lambda: Watch => ResultType): ResultType raises WatchError =
val watchSet = Watch(List(path))
lambda(watchSet).also:
Watch.unregister(watchSet)
watchSet.unregister()

extension [PathType: GenericPath](paths: Iterable[PathType])
def watch[ResultType](lambda: WatchSet => ResultType): ResultType raises WatchError =
def watch[ResultType](lambda: Watch => ResultType): ResultType raises WatchError =
val watchSet = Watch(paths)

lambda(watchSet).also:
Watch.unregister(watchSet)
watchSet.unregister()

object Watch:
def apply[PathType: GenericPath](paths: Iterable[PathType]): WatchSet =
register:
paths.map(_.fullPath.s).map(jnf.Paths.get(_).nn).map: javaPath =>
if javaPath.toFile.nn.isDirectory then (javaPath, (_: Text) => true)
else (javaPath.getParent.nn, (_: Text) == javaPath.getFileName.nn.toString.tt)
.toMap

private case class WatchService(watchService: jnf.WatchService, pollLoop: Loop):
def stop(): Unit = pollLoop.stop()
val async: Optional[Async[Unit]] = safely(supervise(task("surveillance".tt)(pollLoop.run())))

private val watches: Mutex[scm.HashMap[jnf.WatchKey, Set[Watch]]] = Mutex(scm.HashMap())
private var serviceValue: Optional[WatchService] = Unset

private def service: WatchService = serviceValue.or:
Expand All @@ -67,73 +59,85 @@ object Watch:
WatchService(watchService, pollLoop(watchService)).tap: service =>
serviceValue = service

private def register(paths: Map[jnf.Path, Text => Boolean]): WatchSet =
val funnel = Funnel[WatchEvent]()
private val watches: Mutex[scm.HashMap[jnf.WatchKey, Set[Watch#PathWatch]]] = Mutex(scm.HashMap())

val watchSet =
paths.map: (path, filter) =>
val key = path.register(service.watchService, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE).nn
new Watch(key, path, funnel, filter).tap: watch =>
watches.mutate: map =>
map(key) = map.at(key).or(Set()) + watch
.to(Set)

WatchSet(funnel, watchSet)

private[surveillance] def unregister(watchSet: WatchSet): Unit =
watches.mutate: map =>
watchSet.watches.each: watch =>
map(watch.key) = map.at(watch.key).or(Set()) - watch

if map(watch.key).isEmpty then
watch.key.cancel()
map.remove(watch.key)

if map.isEmpty then synchronized:
serviceValue.let: service =>
service.stop()
serviceValue = Unset


private def put(watch: Watch, event: jnf.WatchEvent[?]): Unit =
(event.context.nn: @unchecked) match
case path: jnf.Path =>
val name = path.toString.tt
if watch.filter(name) then try event.kind match
case ENTRY_CREATE =>
if watch.base.resolve(path).nn.toFile.nn.isDirectory
then watch.funnel.put(WatchEvent.NewDirectory(watch.base.toString.show, name))
else watch.funnel.put(WatchEvent.NewFile(watch.base.toString.show, name))

case ENTRY_MODIFY =>
watch.funnel.put(WatchEvent.Modify(watch.base.toString.show, name))

case ENTRY_DELETE =>
watch.funnel.put(WatchEvent.Delete(watch.base.toString.show, name))

case _ =>
()

catch case err: Exception => ()
private def register(paths: Map[jnf.Path, Text => Boolean]): Watch =
new Watch().tap(_.watch(paths))

private def pollLoop(service: jnf.WatchService): Loop = loop:
service.take().nn match
case key: jnf.WatchKey =>
key.pollEvents().nn.iterator.nn.asScala.each: event =>
watches.read: ref =>
ref()(key)
.each(put(_, event))
.each(_.put(event))

key.reset()

class WatchSet(funnel: Funnel[WatchEvent], private[surveillance] val watches: Set[Watch]):
def apply[PathType: GenericPath](paths: Iterable[PathType]): Watch =
Watch.register:
paths.map(_.fullPath.s).map(jnf.Paths.get(_).nn).map: javaPath =>
if javaPath.toFile.nn.isDirectory then (javaPath, (_: Text) => true)
else (javaPath.getParent.nn, (_: Text) == javaPath.getFileName.nn.toString.tt)
.toMap

class Watch():
private val funnel: Funnel[WatchEvent] = Funnel()
private val watches: scm.HashSet[PathWatch] = scm.HashSet[PathWatch]()

private class PathWatch
(private[Watch] val key: jnf.WatchKey,
private[Watch] val base: jnf.Path,
val funnel: Funnel[WatchEvent],
val filter: Text => Boolean):

def put(event: jnf.WatchEvent[?]): Unit =
(event.context.nn: @unchecked) match
case path: jnf.Path =>
val name = path.toString.tt
if filter(name) then try event.kind match
case ENTRY_CREATE =>
if base.resolve(path).nn.toFile.nn.isDirectory
then funnel.put(WatchEvent.NewDirectory(base.toString.show, name))
else funnel.put(WatchEvent.NewFile(base.toString.show, name))

case ENTRY_MODIFY =>
funnel.put(WatchEvent.Modify(base.toString.show, name))

case ENTRY_DELETE =>
funnel.put(WatchEvent.Delete(base.toString.show, name))

case _ =>
()

catch case err: Exception => ()

def stream: LazyList[WatchEvent] = funnel.stream

def watch(paths: Map[jnf.Path, Text => Boolean]): Unit =
val watches2 = paths.map:
case (path, filter) =>
val key = path.register(Watch.service.watchService, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE).nn

new PathWatch(key, path, funnel, filter).tap: watch =>
Watch.watches.mutate: map =>
map(key) = map.at(key).or(Set()) + watch

synchronized(watches ++= watches2)

class Watch
(private[surveillance] val key: jnf.WatchKey,
private[surveillance] val base: jnf.Path,
val funnel: Funnel[WatchEvent],
val filter: Text => Boolean)
def unregister(): Unit =
Watch.watches.mutate: map =>
watches.each: watch =>
map(watch.key) = map.at(watch.key).or(Set()) - watch

if map(watch.key).isEmpty then
watch.key.cancel()
map.remove(watch.key)

if map.isEmpty then synchronized:
Watch.serviceValue.let: service =>
service.stop()
Watch.serviceValue = Unset

enum WatchEvent:
case NewFile(dir: Text, file: Text)
Expand Down

0 comments on commit 652d109

Please sign in to comment.