Skip to content

Commit

Permalink
Merge pull request #57 from openmsi/configurable_log_locations
Browse files Browse the repository at this point in the history
Configurable log locations
  • Loading branch information
eminizer committed Dec 11, 2023
2 parents 3e7425c + bf781a7 commit ba2470b
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Users who would like to test a particular workflow's interaction with watchdog c
Restarting the program
----------------------

Using a ``DataFileUploadDirectory`` to Produce chunks of files to the broker is robust if the code crashes and can be restarted. A special subdirectory called "LOGS" is created in ``[directory_path]`` when the code starts up (any files added to the "LOGS" subdirectory will not be uploaded). That subdirectory will include the log file, as well as one file called "``upload_to_[name_of_topic]_in_progress.csv``" and one (or several) more called something like "``uploaded_to_[name_of_topic].csv``". The .csv files are special datatable files that list the chunks of each recognized file that still need to be uploaded and information about files that have been fully uploaded, respectively.
Using a ``DataFileUploadDirectory`` to Produce chunks of files to the broker is robust if the code crashes and can be restarted. By default, a special subdirectory called "LOGS" is created in ``[directory_path]`` when the code starts up (any files added to the "LOGS" subdirectory will not be uploaded). That subdirectory will include the log file, as well as one file called "``upload_to_[name_of_topic]_in_progress.csv``" and one (or several) more called something like "``uploaded_to_[name_of_topic].csv``". The .csv files are special datatable files that list the chunks of each recognized file that still need to be uploaded and information about files that have been fully uploaded, respectively. The location these files end up in is configurable using the "``--logger_file_path``" command line option.

The list of chunks uploaded for each file is updated atomically upon receipt of a positive acknowledgment callback from the broker: chunks are not listed as uploaded until they have been fully received and acknowledged by the broker and therefore guaranteed to exist on the topic. When ``DataFileUploadDirectory`` is restarted pointing to the same directory and topic, any files that did not get fully uploaded will have their missing chunks re-enqeued for upload if they still exist in the same location.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ OpenMSIStream manually commits Consumer offsets to guarantee that every message
Restarting the program
----------------------

Using a ``GirderUploadStreamProcessor`` (and any other program whose underlying class inherits from :class:`~.DataFileStreamProcessor`) is robust if the code crashes and can be restarted. The output directory includes a ``LOGS`` subdirectory, which holds a log file as well as a file called "``consuming_from_[name_of_topic]_by_[consumer_group_ID]_in_progress.csv``" and one or several files with the pattern "``processed_from_[name_of_topic]_by_[consumer_group_ID]*.csv``". The .csv files are special datatable files (they can be read as :class:`~.utilities.DataclassTable` objects) that list the processing status of each recognized file and information about files that have been successfully uploaded, respectively. To decrease latency, there may be several files listing the processing results: one per thread, capped at 1,000 entries each. When the program is shut down normally or restarted, these files will be automatically concatenated. You can also concatenate them by hand when the program is not running.
Using a ``GirderUploadStreamProcessor`` (and any other program whose underlying class inherits from :class:`~.DataFileStreamProcessor`) is robust if the code crashes and can be restarted. By default, the output directory includes a ``LOGS`` subdirectory, which holds a log file as well as a file called "``consuming_from_[name_of_topic]_by_[consumer_group_ID]_in_progress.csv``" and one or several files with the pattern "``processed_from_[name_of_topic]_by_[consumer_group_ID]*.csv``". The .csv files are special datatable files (they can be read as :class:`~.utilities.DataclassTable` objects) that list the processing status of each recognized file and information about files that have been successfully uploaded, respectively. To decrease latency, there may be several files listing the processing results: one per thread, capped at 1,000 entries each. When the program is shut down normally or restarted, these files will be automatically concatenated. You can also concatenate them by hand when the program is not running. The location these files end up in is configurable using the "``--logger_file_path``" command line argument.

The status of each file is updated atomically upon receipt of each message. If any files fail to be uploaded during a run, or the program quits or crashes before all the messages for a file are received, a new run of ``GirderUploadStreamProcessor`` restarted with the same consumer group ID and configs will restart the consumers from the beginning of the topic and read only messages from those failed files until they catch up to where they would be otherwise. As long as all messages for the failed files still exist in the same topic, restarting will select and try uploading them again.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Other options for running the code include:
#. Changing the number of parallel threads: add the ``--n_threads [threads]`` argument where ``[threads]`` is the desired number of parallel threads to use (and, also, the number of consumers used in the group). The default is 2 threads/consumers; increasing this number may give Kafka warnings or errors depending on how many consumers can be subscribed to a particular topic (generally you can use as many threads as their are partitions to the topic).
#. Changing the consumer group ID: add the ``--consumer_group_id [group_id]`` argument where ``[group_id]`` is the string to use for the Consumer group ID. The default creates a new ID every time, but if you would like to keep track of which messages have already been consumed you can choose a consistent group ID to use every time, and only messages whose offsets haven't been comitted yet will be consumed. Please see the `documentation for Kafka Consumers here <https://docs.confluent.io/platform/current/clients/consumer.html>`_ for more details if consumer offset commits are unfamiliar to you.
#. Changing which files get processed: add the ``--download_regex [regex]`` argument where ``[regex]`` is a regular expression string that matches the relative filepath of every file you would like to process. Messages from files whose relative paths don't match the regex will be read from the topic but skipped.
#. Putting the logs and registry .csv files in a custom location: add the ``--output_dir [output_dir]`` argument, where ``[output_dir]`` is the path to a directory where the output should be saved (it will be created if it doesn't exist yet). By default the output is written to a directory called ``S3TransferStreamProcessor_output`` in the current working directory.
#. Putting the logs and registry .csv files in a custom location: add the ``--output_dir [output_dir]`` argument, where ``[output_dir]`` is the path to a directory where the output should be saved (it will be created if it doesn't exist yet). By default the output is written to a directory called ``S3TransferStreamProcessor_output`` in the current working directory. The logfile location is also independently configurable through the "``--logger_file_path``" command line argument.
#. Changing how files are downloaded: add the ``--mode [mode]`` argument, where ``[mode]`` is "``memory``", "``disk``", or "``both``". In "memory" mode (the default), files are reconstructed only in memory before being transferred, which is fastest. In "disk" mode, files are instead written to disk with the output directory as their root directory, which is useful if the reconstructed files are too large to hold more than one in memory at once. In "both" mode, files are written to disk and held in memory, which is useful to perform transfers quickly while still writing out local copies of files.

To see other optional command line arguments, run ``S3TransferStreamProcessor -h``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def run_from_command_line(cls, args=None):
update_secs=args.update_seconds,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# start the reconstructor running
run_start = datetime.datetime.now()
Expand Down
7 changes: 7 additions & 0 deletions openmsistream/data_file_io/actor/data_file_stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def __init__(
self._output_dir.mkdir(parents=True)
# create a subdirectory for the logs
self._logs_subdir = self._output_dir / self.LOG_SUBDIR_NAME
if "logger_file" in kwargs and kwargs["logger_file"] is not None:
logger_file_arg = kwargs["logger_file"].resolve()
if "." in logger_file_arg.name:
self._logs_subdir = logger_file_arg.parent
else:
self._logs_subdir = logger_file_arg
kwargs["logger_file"] = self._logs_subdir
if not self._logs_subdir.is_dir():
self._logs_subdir.mkdir(parents=True)
# put the log file in the subdirectory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ def __init__(
"""
# create a subdirectory for the logs
self._logs_subdir = dirpath / self.LOG_SUBDIR_NAME
if "logger_file" in kwargs and kwargs["logger_file"] is not None:
logger_file_arg = kwargs["logger_file"].resolve()
if "." in logger_file_arg.name:
self._logs_subdir = logger_file_arg.parent
else:
self._logs_subdir = logger_file_arg
kwargs["logger_file"] = self._logs_subdir
if not self._logs_subdir.is_dir():
self._logs_subdir.mkdir(parents=True)
# put the log file in the subdirectory
Expand Down Expand Up @@ -605,6 +612,7 @@ def run_from_command_line(cls, args=None):
update_secs=args.update_seconds,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# listen for new files in the directory and run uploads until shut down
run_start = datetime.datetime.now()
Expand Down
1 change: 1 addition & 0 deletions openmsistream/data_file_io/entity/upload_data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def run_from_command_line(cls, args=None):
args.filepath,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# chunk and upload the file
upload_file.upload_whole_file(
Expand Down
3 changes: 3 additions & 0 deletions openmsistream/girder/girder_upload_stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ def run_from_command_line(cls, args=None):
n_threads=args.n_threads,
update_secs=args.update_seconds,
consumer_group_id=args.consumer_group_id,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# start the processor running
msg = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def run_from_command_line(cls, args=None):
update_secs=args.update_seconds,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# cls.bucket_name = args.bucket_name
msg = (
Expand Down
1 change: 1 addition & 0 deletions openmsistream/s3_buckets/s3_transfer_stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def run_from_command_line(cls, args=None):
consumer_group_id=args.consumer_group_id,
streamlevel=args.logger_stream_level,
filelevel=args.logger_file_level,
logger_file=args.logger_file_path,
)
# cls.bucket_name = args.bucket_name
msg = (
Expand Down
2 changes: 1 addition & 1 deletion openmsistream/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.4"
__version__ = "1.7.5"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"matplotlib",
"methodtools",
"msgpack",
"openmsitoolbox>=1.1.1",
"openmsitoolbox>=1.2.1",
"watchdog>=3.0.0",
],
extras_require={
Expand Down

0 comments on commit ba2470b

Please sign in to comment.