## Consumer-side programs

Here we'll see three examples of Consumer-type programs provided by OpenMSIStream

In [1]:
# imports
import pathlib, importlib, logging, datetime, json, platform
from threading import Thread
from openmsitoolbox.logging import OpenMSILogger
from openmsistream import DataFileDownloadDirectory, DataFileStreamProcessor, MetadataJSONReproducer

In [2]:
# Configure a logger (only needed when running in a Jupyter notebook like this)
logger = OpenMSILogger("OpenMSIConsumers", filelevel=None)
importlib.reload(logging)

<module 'logging' from '/usr/local/anaconda3/envs/openmsistream_short_course/lib/python3.9/logging/__init__.py'>

In [3]:
# The name of the topic to consume files from
CONSUMER_TOPIC_NAME = "tutorial_data"

# Path to the root directory of this repo
repo_root_dir = pathlib.Path().resolve().parent

### Consuming to the local filesystem

Read chunks of files from the topic and write them to a location on your local filesystem

In [4]:
def download_task(download_directory):
    """Run "reconstruct" for a given DataFileDownloadDirectory, and log some messages
    when it gets shut down

    Args:
        download_directory (DataFileDownloadDirectory): the DataFileDownloadDirectory to run
    """
    # This call to "reconstruct" waits until the program is shut down
    (
        n_read,
        n_processed,
        n_complete_files,
        complete_filepaths,
    ) = download_directory.reconstruct()
    download_directory.close()
    msg = f"{n_read} total messages were consumed"
    if len(complete_filepaths) > 0:
        msg += (
            f", {n_processed} messages were successfully processed, and "
            f'{n_complete_files} file{" was" if n_complete_files==1 else "s were"} '
            "successfully reconstructed"
        )
    else:
        msg += f" and {n_processed} messages were successfully processed"
    msg += (
        f". Most recent completed files (up to {download_directory.N_RECENT_FILES}):\n\t"
    )
    msg += "\n\t".join([str(filepath) for filepath in complete_filepaths])
    download_directory.logger.info(msg)

In [5]:
# Paths to the config file and the directory holding the test files
CONFIG_FILE_PATH = repo_root_dir / "config_files" / "confluent_cloud_broker.config"
TEST_RECO_DIR = repo_root_dir.parent / "reconstructed_test_files"

In [6]:
# Create the DataFileDownloadDirectory
dfdd = DataFileDownloadDirectory(
    TEST_RECO_DIR,
    CONFIG_FILE_PATH,
    CONSUMER_TOPIC_NAME,
    logger=logger,
)
# Start running its "reconstruct" function in a separate thread
download_thread = Thread(
    target=download_task,
    args=(dfdd,),
)
download_thread.start()

[OpenMSIConsumers 2024-01-11 18:57:07] Will reconstruct files from messages in the tutorial_data topic using 2 threads


#### While the above cell is running, if any new files get produced to the topic you'll see them reconstructed on your file system

In [7]:
# Manually shut down the download directory (if running from the command line this would
# be like typing "q" in the Terminal window)
dfdd.control_command_queue.put("q")
download_thread.join()

[OpenMSIConsumers 2024-01-11 18:57:49] 7 total messages were consumed, 7 messages were successfully processed, and 6 files were successfully reconstructed. Most recent completed files (up to 50):
	frankenstein.txt
	peanut_and_sid.jpg
	sid_and_peanut.jpg
	open_MSI_logo.png
	the_picture_of_dorian_gray.txt
	monty_python_holy_grail.txt


### A dummy stream processor program

In [23]:
class PlaceholderStreamProcessor(DataFileStreamProcessor):
    """Performs a placeholder task (writing out a file to the local system) for every
    data file reconstructed from a topic
    """

    def _process_downloaded_data_file(self, datafile, lock):
        "Writes out a file with a timestamp for each reconstructed file"
        try:
            timestamp = datetime.datetime.now()
            rel_filepath = datafile.relative_filepath
            rel_fp_str = str(rel_filepath.as_posix()).replace("/","_").replace(".","_")
            output_filepath = self._output_dir / f"{rel_fp_str}_placeholder.txt"
            with lock:
                with open(output_filepath, "w") as filep:
                    filep.write(
                        f"Processing timestamp: {timestamp.strftime('%m/%d/%Y, %H:%M:%S')}"
                    )
        except Exception as exc:
            return exc
        return None
    
    @classmethod
    def run_from_command_line(cls, args=None):
        "Not used in this example... stay tuned for the live coding tomorrow!"
        pass


In [24]:
def stream_processor_task(stream_processor):
    """Run "process_files_as_read" for the given stream processor, and log a message
    when it gets shuts down
    
    Args:
        stream_processor (openmsistream.DataFileStreamProcessor): The stream processor to run
    """
    # This call to "process_files_as_read" hangs until the stream processor is shut down
    (
        n_m_r, # The number of messages read
        n_m_p, # The number of messages processed
        n_f_p, # The number of files successfully processed
        p_fps, # Paths to the most recently-processed files
    ) = stream_processor.process_files_as_read()
    stream_processor.close()
    msg = f"{n_m_r} total messages were consumed"
    if n_f_p > 0:
        msg += (
            f", {n_m_p} messages were processed,"
            f" and {n_f_p} files were successfully processed"
        )
    else:
        msg += f" and {n_m_p} messages were successfully processed"
    msg += (
        f". Up to {stream_processor.N_RECENT_FILES} most recently "
        "processed files:\n\t"
    )
    msg += "\n\t".join([str(fp) for fp in p_fps])
    stream_processor.logger.info(msg)

In [25]:
# Path to the directory to store the StreamProcessor output
STREAM_PROCESSOR_OUTPUT_DIR = repo_root_dir.parent / "PlaceholderStreamProcessor_output"

In [26]:
# Create the StreamProcessor
psp = PlaceholderStreamProcessor(
    CONFIG_FILE_PATH,
    CONSUMER_TOPIC_NAME,
    output_dir=STREAM_PROCESSOR_OUTPUT_DIR,
    logger=logger,
)
# Start running its "process_files_as_read" function in a separate thread
processor_thread = Thread(
    target=stream_processor_task,
    args=(psp,),
)
processor_thread.start()

[OpenMSIConsumers 2024-01-11 19:23:32] Log files and output will be in /Users/margareteminizer/Desktop/short_course/PlaceholderStreamProcessor_output
[OpenMSIConsumers 2024-01-11 19:23:32] Will process files from messages in the tutorial_data topic using 2 threads


#### After starting the above cell running, you should see the expected output appear. If more files are added to the topic, output will be created for them, too.

In [22]:
# Manually shut down the stream processor (if running from the command line this would
# be like typing "q" in the Terminal window)
psp.control_command_queue.put("q")
processor_thread.join()

[OpenMSIConsumers 2024-01-11 19:22:13] 7 total messages were consumed and 1 messages were successfully processed. Up to 50 most recently processed files:
	


### Extracting some simple metadata and producing to another topic

In [27]:
class SimpleMetadataReproducer(MetadataJSONReproducer):
    """Reads DataFile messages from one topic and produces a JSON-formatted string with
    some very simple metadata to another topic
    """

    def _get_metadata_dict_for_file(self, datafile):
        """See docs here:
        https://openmsistream.readthedocs.io/en/latest/user_info/base_classes/metadata_json_reproducer.html
        for more information on writing custom MetadataJSONReproducers
        """
        # create a dictionary of very simple info about the consumed file
        metadata_dict = {
            "relative_filepath": datafile.relative_filepath.as_posix(),
            "size_in_bytes": len(datafile.bytestring),
            "consumed_from": self.consumer_topic_name,
            "consumed_on": platform.system(),
        }
        # add a timestamp
        metadata_dict["metadata_extracted_at"] = datetime.datetime.now().strftime(
            "%m/%d/%Y, %H:%M:%S"
        )
        # return the dictionary of metadata
        self.logger.debug(
            f"Producing JSON metadata message: {json.dumps(metadata_dict)}"
        )
        return metadata_dict

In [28]:
def reproducer_task(reproducer):
    """Run "produce_processing_results_for_files_as_read" for a given
    MetadataJSONReproducer, and log some messages when it gets shut down

    Args:
        reproducer (MetadataJSONReproducer): the MetadataJSONReproducer to run
    """
    # This call to "produce_processing_results_for_files_as_read" hangs until the program
    # is shut down
    (
        n_m_r, # number of messages read
        n_m_p, # number of messages processed
        n_f_r, # number of files read
        n_f_mp, # number of files that had metadata produced
        m_p_fps, # paths to files that had metadata produced (up to 50)
    ) = reproducer.produce_processing_results_for_files_as_read()
    reproducer.close()
    # Create a log a message stating the files that were processed during the run
    msg = ""
    if n_m_r > 0:
        msg += f'{n_m_r} total message{"s were" if n_m_r!=1 else " was"} consumed, '
    if n_m_p > 0:
        msg += f'{n_m_p} message{"s were" if n_m_p!=1 else " was"} successfully processed, '
    if n_f_r > 0:
        msg += f'{n_f_r} file{"s were" if n_f_r!=1 else " was"} fully read, '
    if n_f_mp > 0:
        msg += (
            f'{n_f_mp} file{"s" if n_f_mp!=1 else ""} had json metadata produced '
            f'to the "{reproducer.producer_topic_name}" topic. '
            f"Up to {reproducer.N_RECENT_FILES} most recent:\n\t"
        )
    msg += "\n\t".join([str(fp) for fp in m_p_fps])
    reproducer.logger.info(msg)

In [29]:
# Path to the config file to use for the Reproducer
REPRODUCER_CONFIG_FILE_PATH = (
    repo_root_dir / "config_files" / "confluent_cloud_broker_for_reproducer.config"
)

# Path to the directory to store the Reproducer registry files
REPRODUCER_OUTPUT_DIR = repo_root_dir.parent / "SimpleMetadataReproducer_output"

# Name of the topic to produce the metadata messages to
PRODUCER_TOPIC_NAME = "tutorial_metadata"

In [30]:
# Create the MetadataReproducer
smdr = SimpleMetadataReproducer(
    REPRODUCER_CONFIG_FILE_PATH,
    CONSUMER_TOPIC_NAME,
    PRODUCER_TOPIC_NAME,
    output_dir=REPRODUCER_OUTPUT_DIR,
    logger=logger,
)
# Start running its "reconstruct" function in a separate thread
reproducer_thread = Thread(
    target=reproducer_task,
    args=(smdr,),
)
reproducer_thread.start()

[OpenMSIConsumers 2024-01-11 19:26:09] Log files and output will be in /Users/margareteminizer/Desktop/short_course/SimpleMetadataReproducer_output
[OpenMSIConsumers 2024-01-11 19:26:09] Will process files from messages in the tutorial_data topic using 2 threads and produce their processing results to the tutorial_metadata topic using 1 thread


#### After you start the above cell running, you should see new messages added to the producer topic

In [31]:
# Manually shut down the reproducer (if running from the command line this would
# be like typing "q" in the Terminal window)
smdr.control_command_queue.put("q")
reproducer_thread.join()

[OpenMSIConsumers 2024-01-11 19:28:23] Will quit after all currently enqueued messages are received.
[OpenMSIConsumers 2024-01-11 19:28:23] 7 total messages were consumed, 7 messages were successfully processed, 6 files were fully read, 6 files had json metadata produced to the "tutorial_metadata" topic. Up to 50 most recent:
	peanut_and_sid.jpg
	frankenstein.txt
	sid_and_peanut.jpg
	open_MSI_logo.png
	the_picture_of_dorian_gray.txt
	monty_python_holy_grail.txt
