Skip to content

Commit

Permalink
refactor(watcher): replace Queue with flag (threading.Event)
Browse files Browse the repository at this point in the history
The current implementation stores all unprocessed filesystem change
events in an unbounded Queue.  There can possibly be quite a number of
events, so this can be (in some situations) a memory consumption
issue.

All we really care is whether there has been *any* interesting event
since the last time we checked. All we need for that is a flag, not a
queue.
  • Loading branch information
dairiki committed Mar 10, 2023
1 parent 1d790bb commit e1bb0d6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 140 deletions.
11 changes: 5 additions & 6 deletions lektor/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# pylint: disable=import-outside-toplevel
import os
import sys
import time
import warnings
from contextlib import suppress

import click

Expand Down Expand Up @@ -147,14 +147,13 @@ def _build():
if not watch:
return sys.exit(0 if success else 1)

from lektor.watcher import watch
from lektor.watcher import Watcher

click.secho("Watching for file system changes", fg="cyan")
last_build = time.time()
for ts, _, _ in watch(env):
if ts > last_build:
with Watcher(env) as watcher, suppress(KeyboardInterrupt):
while True:
watcher.wait()
_build()
last_build = time.time()


@cli.command("clean")
Expand Down
11 changes: 4 additions & 7 deletions lektor/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self, env, output_path, prune=True, verbosity=0, extra_flags=None):
self.output_path = output_path
self.prune = prune
self.verbosity = verbosity
self.last_build = time.time()
self.extra_flags = extra_flags

def build(self, update_source_info_first=False):
Expand All @@ -42,16 +41,14 @@ def build(self, update_source_info_first=False):
builder.prune()
except Exception:
traceback.print_exc()
else:
self.last_build = time.time()

def run(self):
with CliReporter(self.env, verbosity=self.verbosity):
self.build(update_source_info_first=True)
with Watcher(self.env, self.output_path) as watcher:
for ts, _, _ in watcher:
if self.last_build is None or ts > self.last_build:
self.build()
self.build(update_source_info_first=True)
while True:
watcher.wait()
self.build()


def browse_to_address(addr):
Expand Down
60 changes: 39 additions & 21 deletions lektor/watcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import os
import queue
import threading
import time
from collections import OrderedDict
from contextlib import suppress
from itertools import zip_longest

import click
Expand All @@ -14,9 +17,14 @@


class EventHandler(FileSystemEventHandler):
def __init__(self):
def __init__(self, is_interesting):
super().__init__()
self.queue = queue.Queue()
self.is_interesting = is_interesting
self.event = threading.Event()

def _check(self, path):
if self.is_interesting(None, None, path):
self.event.set()

# Generally we only care about changes (modification, creation, deletion) to files
# within the monitored tree. Changes in directories do not directly affect Lektor
Expand All @@ -32,19 +40,18 @@ def __init__(self):

def on_created(self, event):
if not event.is_directory:
self.queue.put((time.time(), event.event_type, event.src_path))
self._check(event.src_path)

def on_deleted(self, event):
self.queue.put((time.time(), event.event_type, event.src_path))
self._check(event.src_path)

def on_modified(self, event):
if not event.is_directory:
self.queue.put((time.time(), event.event_type, event.src_path))
self._check(event.src_path)

def on_moved(self, event):
time_ = time.time()
for path in event.src_path, event.dest_path:
self.queue.put((time_, event.event_type, path))
self._check(event.src_path)
self._check(event.dest_path)


def _fullname(cls):
Expand All @@ -64,7 +71,7 @@ def __init__(
observer_classes=(Observer, PollingObserver),
observer_timeout=DEFAULT_OBSERVER_TIMEOUT, # testing
):
self.event_handler = EventHandler()
self.event_handler = EventHandler(self.is_interesting)
self.paths = paths
self.observer_classes = observer_classes
self.observer_timeout = observer_timeout
Expand Down Expand Up @@ -118,11 +125,24 @@ def is_interesting(self, time, event_type, path):
# pylint: disable=no-self-use
return True

def __iter__(self):
while 1:
time_, event_type, path = self.event_handler.queue.get()
if self.is_interesting(time_, event_type, path):
yield time_, event_type, path
@property
def event(self):
return self.event_handler.event

def wait(self, timeout: float | None = None):
"""Wait for watched filesystem change.
This waits for a “new” non-ignored filesystem change. Here “new” means that
the change happened since the last return from ``wait``.
Waits a maximum of ``timeout`` seconds (or forever if ``timeout`` is ``None``).
Returns ``True`` if a change occurred, ``False`` on timeout.
"""
if not self.event.wait(timeout=timeout):
return False
self.event.clear()
return True


class Watcher(BasicWatcher):
Expand All @@ -148,9 +168,7 @@ def is_interesting(self, time, event_type, path):

def watch(env):
"""Returns a generator of file system events in the environment."""
with Watcher(env) as watcher:
try:
for event in watcher:
yield event
except KeyboardInterrupt:
pass
with Watcher(env) as watcher, suppress(KeyboardInterrupt):
while True:
watcher.wait()
yield time.time(), None, None

0 comments on commit e1bb0d6

Please sign in to comment.