-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
event_based_path_watcher.py
385 lines (311 loc) · 13.7 KB
/
event_based_path_watcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Declares the EventBasedPathWatcher class, which watches given paths in the file system.
How these classes work together
-------------------------------
- EventBasedPathWatcher : each instance of this is able to watch a single
file or directory at a given path so long as there's a browser interested in
it. This uses _MultiPathWatcher to watch paths.
- _MultiPathWatcher : singleton that watches multiple paths. It does this by
holding a watchdog.observer.Observer object, and manages several
_FolderEventHandler instances. This creates _FolderEventHandlers as needed,
if the required folder is not already being watched. And it also tells
existing _FolderEventHandlers which paths it should be watching for.
- _FolderEventHandler : event handler for when a folder is modified. You can
register paths in that folder that you're interested in. Then this object
listens to folder events, sees if registered paths changed, and fires
callbacks if so.
This module is lazy-loaded and used only if watchdog is installed.
"""
from __future__ import annotations
import os
import threading
from typing import Callable, Final, cast
from blinker import ANY, Signal
from watchdog import events
from watchdog.observers import Observer
from watchdog.observers.api import ObservedWatch
from streamlit.logger import get_logger
from streamlit.util import repr_
from streamlit.watcher import util
_LOGGER: Final = get_logger(__name__)
class EventBasedPathWatcher:
"""Watches a single path on disk using watchdog"""
@staticmethod
def close_all() -> None:
"""Close the _MultiPathWatcher singleton."""
path_watcher = _MultiPathWatcher.get_singleton()
path_watcher.close()
_LOGGER.debug("Watcher closed")
def __init__(
self,
path: str,
on_changed: Callable[[str], None],
*, # keyword-only arguments:
glob_pattern: str | None = None,
allow_nonexistent: bool = False,
) -> None:
"""Constructor for EventBasedPathWatchers.
Parameters
----------
path : str
The path to watch.
on_changed : Callable[[str], None]
Callback to call when the path changes.
glob_pattern : str or None
A glob pattern to filter the files in a directory that should be
watched. Only relevant when creating an EventBasedPathWatcher on a
directory.
allow_nonexistent : bool
If True, the watcher will not raise an exception if the path does
not exist. This can be used to watch for the creation of a file or
directory at a given path.
"""
self._path = os.path.abspath(path)
self._on_changed = on_changed
path_watcher = _MultiPathWatcher.get_singleton()
path_watcher.watch_path(
self._path,
on_changed,
glob_pattern=glob_pattern,
allow_nonexistent=allow_nonexistent,
)
_LOGGER.debug("Watcher created for %s", self._path)
def __repr__(self) -> str:
return repr_(self)
def close(self) -> None:
"""Stop watching the path corresponding to this EventBasedPathWatcher."""
path_watcher = _MultiPathWatcher.get_singleton()
path_watcher.stop_watching_path(self._path, self._on_changed)
class _MultiPathWatcher:
"""Watches multiple paths."""
_singleton: _MultiPathWatcher | None = None
@classmethod
def get_singleton(cls) -> _MultiPathWatcher:
"""Return the singleton _MultiPathWatcher object.
Instantiates one if necessary.
"""
if cls._singleton is None:
_LOGGER.debug("No singleton. Registering one.")
_MultiPathWatcher()
return cast("_MultiPathWatcher", _MultiPathWatcher._singleton)
# Don't allow constructor to be called more than once.
def __new__(cls) -> _MultiPathWatcher:
"""Constructor."""
if _MultiPathWatcher._singleton is not None:
raise RuntimeError("Use .get_singleton() instead")
return super().__new__(cls)
def __init__(self) -> None:
"""Constructor."""
_MultiPathWatcher._singleton = self
# Map of folder_to_watch -> _FolderEventHandler.
self._folder_handlers: dict[str, _FolderEventHandler] = {}
# Used for mutation of _folder_handlers dict
self._lock = threading.Lock()
# The Observer object from the Watchdog module. Since this class is
# only instantiated once, we only have a single Observer in Streamlit,
# and it's in charge of watching all paths we're interested in.
self._observer = Observer()
self._observer.start() # Start observer thread.
def __repr__(self) -> str:
return repr_(self)
def watch_path(
self,
path: str,
callback: Callable[[str], None],
*, # keyword-only arguments:
glob_pattern: str | None = None,
allow_nonexistent: bool = False,
) -> None:
"""Start watching a path."""
folder_path = os.path.abspath(os.path.dirname(path))
with self._lock:
folder_handler = self._folder_handlers.get(folder_path)
if folder_handler is None:
folder_handler = _FolderEventHandler()
self._folder_handlers[folder_path] = folder_handler
folder_handler.watch = self._observer.schedule(
folder_handler, folder_path, recursive=True
)
folder_handler.add_path_change_listener(
path,
callback,
glob_pattern=glob_pattern,
allow_nonexistent=allow_nonexistent,
)
def stop_watching_path(self, path: str, callback: Callable[[str], None]) -> None:
"""Stop watching a path."""
folder_path = os.path.abspath(os.path.dirname(path))
with self._lock:
folder_handler = self._folder_handlers.get(folder_path)
if folder_handler is None:
_LOGGER.debug(
"Cannot stop watching path, because it is already not being "
"watched. %s",
folder_path,
)
return
folder_handler.remove_path_change_listener(path, callback)
if not folder_handler.is_watching_paths():
self._observer.unschedule(folder_handler.watch)
del self._folder_handlers[folder_path]
def close(self) -> None:
with self._lock:
"""Close this _MultiPathWatcher object forever."""
if len(self._folder_handlers) != 0:
self._folder_handlers = {}
_LOGGER.debug(
"Stopping observer thread even though there is a non-zero "
"number of event observers!"
)
else:
_LOGGER.debug("Stopping observer thread")
self._observer.stop()
self._observer.join(timeout=5)
class WatchedPath:
"""Emits notifications when a single path is modified."""
def __init__(
self,
md5: str,
modification_time: float,
*, # keyword-only arguments:
glob_pattern: str | None = None,
allow_nonexistent: bool = False,
):
self.md5 = md5
self.modification_time = modification_time
self.glob_pattern = glob_pattern
self.allow_nonexistent = allow_nonexistent
self.on_changed = Signal()
def __repr__(self) -> str:
return repr_(self)
class _FolderEventHandler(events.FileSystemEventHandler):
"""Listen to folder events. If certain paths change, fire a callback.
The super class, FileSystemEventHandler, listens to changes to *folders*,
but we need to listen to changes to *both* folders and files. I believe
this is a limitation of the Mac FSEvents system API, and the watchdog
library takes the lower common denominator.
So in this class we watch for folder events and then filter them based
on whether or not we care for the path the event is about.
"""
def __init__(self) -> None:
super().__init__()
self._watched_paths: dict[str, WatchedPath] = {}
self._lock = threading.Lock() # for watched_paths mutations
self.watch: ObservedWatch | None = None
def __repr__(self) -> str:
return repr_(self)
def add_path_change_listener(
self,
path: str,
callback: Callable[[str], None],
*, # keyword-only arguments:
glob_pattern: str | None = None,
allow_nonexistent: bool = False,
) -> None:
"""Add a path to this object's event filter."""
with self._lock:
watched_path = self._watched_paths.get(path, None)
if watched_path is None:
md5 = util.calc_md5_with_blocking_retries(
path,
glob_pattern=glob_pattern,
allow_nonexistent=allow_nonexistent,
)
modification_time = util.path_modification_time(path, allow_nonexistent)
watched_path = WatchedPath(
md5=md5,
modification_time=modification_time,
glob_pattern=glob_pattern,
allow_nonexistent=allow_nonexistent,
)
self._watched_paths[path] = watched_path
watched_path.on_changed.connect(callback, weak=False)
def remove_path_change_listener(
self, path: str, callback: Callable[[str], None]
) -> None:
"""Remove a path from this object's event filter."""
with self._lock:
watched_path = self._watched_paths.get(path, None)
if watched_path is None:
return
watched_path.on_changed.disconnect(callback)
if not watched_path.on_changed.has_receivers_for(ANY):
del self._watched_paths[path]
def is_watching_paths(self) -> bool:
"""Return true if this object has 1+ paths in its event filter."""
return len(self._watched_paths) > 0
def handle_path_change_event(self, event: events.FileSystemEvent) -> None:
"""Handle when a path (corresponding to a file or dir) is changed.
The events that can call this are modification, creation or moved
events.
"""
# Check for both modified and moved files, because many programs write
# to a backup file then rename (i.e. move) it.
if event.event_type == events.EVENT_TYPE_MODIFIED:
changed_path = event.src_path
elif event.event_type == events.EVENT_TYPE_MOVED:
# Teach mypy that this event has a dest_path, because it can't infer
# the desired subtype from the event_type check
event = cast(events.FileSystemMovedEvent, event)
_LOGGER.debug(
"Move event: src %s; dest %s", event.src_path, event.dest_path
)
changed_path = event.dest_path
# On OSX with VI, on save, the file is deleted, the swap file is
# modified and then the original file is created hence why we
# capture EVENT_TYPE_CREATED
elif event.event_type == events.EVENT_TYPE_CREATED:
changed_path = event.src_path
else:
_LOGGER.debug("Don't care about event type %s", event.event_type)
return
changed_path = os.path.abspath(changed_path)
changed_path_info = self._watched_paths.get(changed_path, None)
if changed_path_info is None:
_LOGGER.debug(
"Ignoring changed path %s.\nWatched_paths: %s",
changed_path,
self._watched_paths,
)
return
modification_time = util.path_modification_time(
changed_path, changed_path_info.allow_nonexistent
)
# We add modification_time != 0.0 check since on some file systems (s3fs/fuse)
# modification_time is always 0.0 because of file system limitations.
if (
modification_time != 0.0
and modification_time == changed_path_info.modification_time
):
_LOGGER.debug("File/dir timestamp did not change: %s", changed_path)
return
changed_path_info.modification_time = modification_time
new_md5 = util.calc_md5_with_blocking_retries(
changed_path,
glob_pattern=changed_path_info.glob_pattern,
allow_nonexistent=changed_path_info.allow_nonexistent,
)
if new_md5 == changed_path_info.md5:
_LOGGER.debug("File/dir MD5 did not change: %s", changed_path)
return
_LOGGER.debug("File/dir MD5 changed: %s", changed_path)
changed_path_info.md5 = new_md5
changed_path_info.on_changed.send(changed_path)
def on_created(self, event: events.FileSystemEvent) -> None:
self.handle_path_change_event(event)
def on_modified(self, event: events.FileSystemEvent) -> None:
self.handle_path_change_event(event)
def on_moved(self, event: events.FileSystemEvent) -> None:
self.handle_path_change_event(event)