Skip to content

Commit

Permalink
Merge pull request #58 from openmsi/polling_observer
Browse files Browse the repository at this point in the history
Add option to use a fallback file watcher on Windows
  • Loading branch information
eminizer committed Dec 12, 2023
2 parents ba2470b + d4d3782 commit 7ae4a12
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ Directory monitoring with watchdog

The upload directory uses a Python library called `watchdog <https://pypi.org/project/watchdog/>`_ to monitor the filesystem for events that would trigger files to be uploaded. The implementation is purposefully minimal to reflect the limitations of using sub/pub messaging systems to reconstruct directories on disparate machines: specifically, ``DataFileUploadDirectory`` programs are "append-only" in that they will never send messages to delete certain files.

For example, if you move a bunch of files to a new subdirectory while the program is running, the consumer side will get new messages for the files in the subdirectory, but any reconstructed files in the original location will remain. Any files that are modified will be produced again; deleting files does not send any new messages. Because many common editing programs (such as vim) use short-lived temporary files to maintain atomicity of their outputs, files are not be marked to be produced until they have existed unmodified for 3 seconds. Files that are created and deleted within those three seconds will not be produced. The default 3 second lag time is configurable from the command line using the ``--watchdog_lag_time`` argument on the command line.
For example, if you move a bunch of files to a new subdirectory while the program is running, the consumer side will get new messages for the files in the subdirectory, but any reconstructed files in the original location will remain. Any files that are modified will be produced again; deleting files does not send any new messages. Because many common editing programs (such as vim) use short-lived temporary files to maintain atomicity of their outputs, files are not be marked to be produced until they have existed unmodified for 3 seconds (by default). Files that are created and deleted within those three seconds will not be produced. The default 3 second lag time is configurable from the command line using the ``--watchdog_lag_time`` argument.

Users who would like to test a particular workflow's interaction with watchdog can use the `testing script <https://github.com/openmsi/openmsistream/blob/main/test/watchdog_testing.py>`_ in the repository to see what events get triggered when. Running that script pointed to a directory will log messages for each recognized watchdog event. Only events for files are relevant for OpenMSIStream.
On Windows platforms, watchdog monitors directories using ReadDirectoryChangesW by default. But that notifier doesn't perform reliably on networked drives, or really on any filesystems that's not NTFS. In those cases, Microsoft recommends keeping periodic snapshots of directory contents and comparing with them to find changes in directory trees. Watchdog `implements a fallback solution to do this<https://github.com/gorakhargosh/watchdog?tab=readme-ov-file#about-using-watchdog-with-cifs>`_ (called a ``PollingObserver``), but it's more memory-intensive and less performant. To use that alternate observer in a ``DataFileUploadDirectory``, add the ``--use_polling_observer`` flag.

Users who would like to test a particular workflow's interaction with watchdog can use the `testing script <https://github.com/openmsi/openmsistream/blob/main/test/watchdog_testing.py>`_ in the repository to see what events get triggered when. There is also `an alternate version <https://github.com/openmsi/openmsistream/blob/main/test/watchdog_testing_polling_observer.py>`_ that uses the ``PollingObserver``. Running one of those scripts pointed to a directory will log messages for each recognized watchdog event. Only events for files are relevant for OpenMSIStream.

Restarting the program
----------------------
Expand Down
13 changes: 12 additions & 1 deletion openmsistream/data_file_io/actor/data_file_upload_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from threading import Lock
from queue import Queue
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from openmsitoolbox import Runnable
from openmsitoolbox.utilities.misc import populated_kwargs
from openmsitoolbox.utilities.exception_tracking_thread import ExceptionTrackingThread
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(
upload_regex=RUN_CONST.DEFAULT_UPLOAD_REGEX,
datafile_type=UploadDataFile,
watchdog_lag_time=RUN_CONST.DEFAULT_WATCHDOG_LAG_TIME,
use_polling_observer=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -96,7 +98,14 @@ def __init__(
self.__datafile_type = datafile_type
self.__wait_time = self.MIN_WAIT_TIME
self.__lock = Lock()
self.__observer = Observer(timeout=self.WATCHDOG_OBSERVER_TIMEOUT)
if use_polling_observer:
self.logger.debug(
"Using a PollingObserver instead of the default watchdog Observer"
)
observer_class = PollingObserver
else:
observer_class = Observer
self.__observer = observer_class(timeout=self.WATCHDOG_OBSERVER_TIMEOUT)
self.__event_handler = UploadDirectoryEventHandler(
upload_regex=upload_regex,
logs_subdir=self._logs_subdir,
Expand Down Expand Up @@ -586,6 +595,7 @@ def get_command_line_arguments(cls):
"update_seconds",
"upload_existing",
"watchdog_lag_time",
"use_polling_observer",
]
kwargs = {**superkwargs, "n_threads": RUN_CONST.N_DEFAULT_UPLOAD_THREADS}
return args, kwargs
Expand All @@ -609,6 +619,7 @@ def run_from_command_line(cls, args=None):
args.config,
upload_regex=args.upload_regex,
watchdog_lag_time=args.watchdog_lag_time,
use_polling_observer=args.use_polling_observer,
update_secs=args.update_seconds,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
Expand Down
12 changes: 12 additions & 0 deletions openmsistream/utilities/argument_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ class variable, which is a dictionary. The keys are names of arguments, and the
),
},
],
"use_polling_observer": [
"optional",
{
"action": "store_true",
"help": (
"Add this flag to use a PollingObserver to trigger watchdog events "
"instead of the default Observer. This is recommended when reading "
"files from CIFS/SMB mounted directories, or if the default Observer "
"causes any problems in a deployment."
),
},
],
"consumer_group_id": [
"optional",
{"default": "create_new", "help": "ID to use for all consumers in the group"},
Expand Down
2 changes: 1 addition & 1 deletion openmsistream/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.5"
__version__ = "1.7.6"
151 changes: 151 additions & 0 deletions test/test_scripts/test_polling_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# imports
import pathlib, filecmp
from openmsistream.utilities.dataclass_table import DataclassTableReadOnly
from openmsistream.data_file_io.actor.file_registry.producer_file_registry import (
RegistryLineInProgress,
RegistryLineCompleted,
)
from openmsistream.data_file_io.actor.data_file_upload_directory import (
DataFileUploadDirectory,
)
from config import TEST_CONST # pylint: disable=import-error,wrong-import-order

# pylint: disable=import-error,wrong-import-order
from test_base_classes import (
TestWithKafkaTopics,
TestWithDataFileUploadDirectory,
TestWithDataFileDownloadDirectory,
TestWithEnvVars,
)


class TestPollingObserver(
TestWithKafkaTopics,
TestWithDataFileUploadDirectory,
TestWithDataFileDownloadDirectory,
TestWithEnvVars,
):
"""
Class for testing DataFileUploadDirectory with a PollingObserver instead of the
default Observer
"""

TOPIC_NAME = "test_polling_observer"

TOPICS = {TOPIC_NAME: {}}

def run_data_file_upload_directory(self, upload_file_dict, **create_kwargs):
"""
Called by the test method below to run an upload directory from start to finish
"""
# create the upload directory with the default config file
self.create_upload_directory(**create_kwargs)
# start it running in a new thread
self.start_upload_thread(topic_name=self.TOPIC_NAME)
try:
# put the test file(s) in the watched directory
for filepath, filedict in upload_file_dict.items():
rootdir = filedict["rootdir"] if "rootdir" in filedict else None
self.copy_file_to_watched_dir(filepath, filepath.relative_to(rootdir))
# put the "check" command into the input queue a couple times to test it
self.upload_directory.control_command_queue.put("c")
self.upload_directory.control_command_queue.put("check")
# shut down the upload thread
self.stop_upload_thread()
# make sure that the ProducerFileRegistry files were created
# and they list the file(s) as completely uploaded
log_subdir = self.watched_dir / DataFileUploadDirectory.LOG_SUBDIR_NAME
in_prog_filepath = log_subdir / f"upload_to_{self.TOPIC_NAME}_in_progress.csv"
completed_filepath = log_subdir / f"uploaded_to_{self.TOPIC_NAME}.csv"
self.assertTrue(in_prog_filepath.is_file())
in_prog_table = DataclassTableReadOnly(
RegistryLineInProgress,
filepath=in_prog_filepath,
logger=self.logger,
)
self.assertEqual(in_prog_table.obj_addresses_by_key_attr("filename"), {})
self.assertTrue(completed_filepath.is_file())
completed_table = DataclassTableReadOnly(
RegistryLineCompleted,
filepath=completed_filepath,
logger=self.logger,
)
addrs_by_fp = completed_table.obj_addresses_by_key_attr("rel_filepath")
for filepath, filedict in upload_file_dict.items():
if filedict["upload_expected"]:
rootdir = filedict["rootdir"] if "rootdir" in filedict else None
if rootdir:
rel_path = filepath.relative_to(rootdir)
else:
rel_path = pathlib.Path(filepath.name)
self.assertTrue(rel_path in addrs_by_fp)
except Exception as exc:
raise exc

def run_data_file_download_directory(self, download_file_dict, **other_create_kwargs):
"""
Called by the test method below to run a download directory from start to finish
"""
# make a list of relative filepaths we'll be waiting for
relevant_files = {}
for filepath, filedict in download_file_dict.items():
if filedict["download_expected"]:
rootdir = filedict["rootdir"] if "rootdir" in filedict else None
if rootdir:
rel_path = filepath.relative_to(rootdir)
else:
rel_path = pathlib.Path(filepath.name)
relevant_files[filepath] = rel_path
# create the download directory
self.create_download_directory(topic_name=self.TOPIC_NAME, **other_create_kwargs)
# start reconstruct in a separate thread so we can time it out
self.start_download_thread()
try:
# put the "check" command into the input queue a couple times
self.download_directory.control_command_queue.put("c")
self.download_directory.control_command_queue.put("check")
# wait for the timeout for the test file(s) to be completely reconstructed
self.wait_for_files_to_reconstruct(relevant_files.values())
# make sure the reconstructed file(s) exists with the same content as the original
for orig_fp, rel_fp in relevant_files.items():
reco_fp = self.reco_dir / rel_fp
self.assertTrue(reco_fp.is_file())
if not filecmp.cmp(orig_fp, reco_fp, shallow=False):
errmsg = (
"ERROR: files are not the same after reconstruction! "
"(This may also be due to the timeout being too short)"
)
raise RuntimeError(errmsg)
except Exception as exc:
raise exc

def test_polling_observer_kafka(self):
"""
Test the upload and download directories while applying regular expressions
"""
files_roots = {
TEST_CONST.TEST_DATA_FILE_PATH: {
"rootdir": TEST_CONST.TEST_DATA_FILE_ROOT_DIR_PATH,
"upload_expected": True,
"download_expected": True,
},
TEST_CONST.FAKE_PROD_CONFIG_FILE_PATH: {
"rootdir": TEST_CONST.TEST_DATA_DIR_PATH,
"upload_expected": True,
"download_expected": True,
},
TEST_CONST.TEST_METADATA_DICT_PICKLE_FILE: {
"rootdir": TEST_CONST.TEST_DATA_DIR_PATH,
"upload_expected": True,
"download_expected": True,
},
}
self.run_data_file_upload_directory(files_roots, use_polling_observer=True)
consumer_group_id = (
f"run_data_file_download_directory_polling_observer_{TEST_CONST.PY_VERSION}"
)
self.run_data_file_download_directory(
files_roots,
consumer_group_id=consumer_group_id,
)
self.success = True # pylint: disable=attribute-defined-outside-init
27 changes: 27 additions & 0 deletions test/watchdog_testing_polling_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Watchdog testing script from the PyPI page (https://pypi.org/project/watchdog/),
except using a PollingObserver instead of the default
"""

import sys
import time
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
path = sys.argv[1] if len(sys.argv) > 1 else "."
event_handler = LoggingEventHandler()
observer = PollingObserver()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
finally:
observer.stop()
observer.join()

0 comments on commit 7ae4a12

Please sign in to comment.