From dd170acf38ab32def95fb6aabc375b49d0154176 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 17:08:05 +0800 Subject: [PATCH 1/9] Added CLI dependencies --- poetry.lock | 148 ++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 4 ++ 2 files changed, 150 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 8173b83..140a532 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,19 @@ # This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand. +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + [[package]] name = "colorama" version = "0.4.6" @@ -75,6 +89,20 @@ files = [ [package.extras] toml = ["tomli"] +[[package]] +name = "exceptiongroup" +version = "1.1.3" +description = "Backport of PEP 654 (exception groups)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "exceptiongroup-1.1.3-py3-none-any.whl", hash = "sha256:343280667a4585d195ca1cf9cef84a4e178c4b6cf2274caef9859782b567d5e3"}, + {file = "exceptiongroup-1.1.3.tar.gz", hash = "sha256:097acd85d473d75af5bb98e41b61ff7fe35efe6675e4f9370ec6ec5126d160e9"}, +] + +[package.extras] +test = ["pytest (>=6)"] + [[package]] name = "iniconfig" version = "2.0.0" @@ -86,6 +114,41 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "markdown-it-py" +version = "3.0.0" +description = "Python port of markdown-it. Markdown parsing, done right!" +optional = false +python-versions = ">=3.8" +files = [ + {file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"}, + {file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"}, +] + +[package.dependencies] +mdurl = ">=0.1,<1.0" + +[package.extras] +benchmarking = ["psutil", "pytest", "pytest-benchmark"] +code-style = ["pre-commit (>=3.0,<4.0)"] +compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "mistletoe (>=1.0,<2.0)", "mistune (>=2.0,<3.0)", "panflute (>=2.3,<3.0)"] +linkify = ["linkify-it-py (>=1,<3)"] +plugins = ["mdit-py-plugins"] +profiling = ["gprof2dot"] +rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] +testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] + +[[package]] +name = "mdurl" +version = "0.1.2" +description = "Markdown URL utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"}, + {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, +] + [[package]] name = "numpy" version = "1.26.2" @@ -157,6 +220,21 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "pygments" +version = "2.17.1" +description = "Pygments is a syntax highlighting package written in Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "pygments-2.17.1-py3-none-any.whl", hash = "sha256:1b37f1b1e1bff2af52ecaf28cc601e2ef7077000b227a0675da25aef85784bc4"}, + {file = "pygments-2.17.1.tar.gz", hash = "sha256:e45a0e74bf9c530f564ca81b8952343be986a29f6afe7f5ad95c5f06b7bdf5e8"}, +] + +[package.extras] +plugins = ["importlib-metadata"] +windows-terminal = ["colorama (>=0.4.6)"] + [[package]] name = "pytest" version = "7.4.3" @@ -170,13 +248,33 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} iniconfig = "*" packaging = "*" pluggy = ">=0.12,<2.0" +tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} [package.extras] testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "rich" +version = "13.7.0" +description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" +optional = false +python-versions = ">=3.7.0" +files = [ + {file = "rich-13.7.0-py3-none-any.whl", hash = "sha256:6da14c108c4866ee9520bbffa71f6fe3962e193b7da68720583850cd4548e235"}, + {file = "rich-13.7.0.tar.gz", hash = "sha256:5cb5123b5cf9ee70584244246816e9114227e0b98ad9176eede6ad54bf5403fa"}, +] + +[package.dependencies] +markdown-it-py = ">=2.2.0" +pygments = ">=2.13.0,<3.0.0" + +[package.extras] +jupyter = ["ipywidgets (>=7.5.1,<9)"] + [[package]] name = "ruff" version = "0.1.5" @@ -203,6 +301,52 @@ files = [ {file = "ruff-0.1.5.tar.gz", hash = "sha256:5cbec0ef2ae1748fb194f420fb03fb2c25c3258c86129af7172ff8f198f125ab"}, ] +[[package]] +name = "shellingham" +version = "1.5.4" +description = "Tool to Detect Surrounding Shell" +optional = false +python-versions = ">=3.7" +files = [ + {file = "shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686"}, + {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, +] + +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + +[[package]] +name = "typer" +version = "0.9.0" +description = "Typer, build great CLIs. Easy to code. Based on Python type hints." +optional = false +python-versions = ">=3.6" +files = [ + {file = "typer-0.9.0-py3-none-any.whl", hash = "sha256:5d96d986a21493606a358cae4461bd8cdf83cbf33a5aa950ae629ca3b51467ee"}, + {file = "typer-0.9.0.tar.gz", hash = "sha256:50922fd79aea2f4751a8e0408ff10d2662bd0c8bbfa84755a699f3bada2978b2"}, +] + +[package.dependencies] +click = ">=7.1.1,<9.0.0" +colorama = {version = ">=0.4.3,<0.5.0", optional = true, markers = "extra == \"all\""} +rich = {version = ">=10.11.0,<14.0.0", optional = true, markers = "extra == \"all\""} +shellingham = {version = ">=1.3.0,<2.0.0", optional = true, markers = "extra == \"all\""} +typing-extensions = ">=3.7.4.3" + +[package.extras] +all = ["colorama (>=0.4.3,<0.5.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] +dev = ["autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "pre-commit (>=2.17.0,<3.0.0)"] +doc = ["cairosvg (>=2.5.2,<3.0.0)", "mdx-include (>=1.4.1,<2.0.0)", "mkdocs (>=1.1.2,<2.0.0)", "mkdocs-material (>=8.1.4,<9.0.0)", "pillow (>=9.3.0,<10.0.0)"] +test = ["black (>=22.3.0,<23.0.0)", "coverage (>=6.2,<7.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.910)", "pytest (>=4.4.0,<8.0.0)", "pytest-cov (>=2.10.0,<5.0.0)", "pytest-sugar (>=0.9.4,<0.10.0)", "pytest-xdist (>=1.32.0,<4.0.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] + [[package]] name = "typing-extensions" version = "4.8.0" @@ -216,5 +360,5 @@ files = [ [metadata] lock-version = "2.0" -python-versions = "^3.11" -content-hash = "07e6ff0a0176752eab68a233e414e2009bd7b1cd732ca68167dd80f1b4b206aa" +python-versions = "^3.9" +content-hash = "a2958d3bed5f28eddd1a0fea932d032a0df6fa534edb29d0094f6a5f6b8bd12f" diff --git a/pyproject.toml b/pyproject.toml index 6b4a620..a4f44c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,10 +20,14 @@ classifiers = [ [tool.poetry.urls] "Bug Tracker" = "https://github.com/caffeine-addictt/thread/issues" +[tool.poetry.scripts] +thread = "thread.__main__:app" + [tool.poetry.dependencies] python = "^3.9" numpy = "^1.26.2" typing-extensions = "^4.8.0" +typer = {extras = ["all"], version = "^0.9.0"} [tool.poetry.group.dev.dependencies] From 1ad6f089f9d4c24cb428d8ff4d60167bc3031159 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 17:08:37 +0800 Subject: [PATCH 2/9] Coverage --- .coverage | Bin 53248 -> 53248 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/.coverage b/.coverage index 5a7fa844d141697a2f082bb2aba6fe13c4bb5834..6088252f685d527e593c83cc3d9588f9b869c018 100644 GIT binary patch delta 81 zcmV-X0IvUlpaX!Q1F)WtLf!@jI0JX4?*{;Q_a6-a5&(cU5hyc&atSB_NC1H31V8`) nnI{2q7`V6Y006W%JpdU%jsOtw2>{6m00G(k1_Cg%;Ey0cD#{t7 delta 81 zcmV-X0IvUlpaX!Q1F)WtLcR?Qa0c#7-wy!r?mrp;Bme+yB2Z=k Date: Tue, 21 Nov 2023 17:09:04 +0800 Subject: [PATCH 3/9] CLI utils --- src/thread/cli/utils.py | 53 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/thread/cli/utils.py diff --git a/src/thread/cli/utils.py b/src/thread/cli/utils.py new file mode 100644 index 0000000..cc82a06 --- /dev/null +++ b/src/thread/cli/utils.py @@ -0,0 +1,53 @@ +# Verbose Command Processor # +import typer +import logging + + +# Verbose Options # +DebugOption = typer.Option( + False, '--debug', + help = 'Set verbosity level to DEBUG', + is_flag = True +) +VerboseOption = typer.Option( + False, '--verbose', '-v', + help = 'Set verbosity level to INFO', + is_flag = True +) +QuietOption = typer.Option( + False, '--quiet', '-q', + help = 'Set verbosity level to ERROR', + is_flag = True +) + + +# Helper functions # + + +# Processors # +def verbose_args_processor(debug: bool, verbose: bool, quiet: bool): + """Handles setting and raising exceptions for verbose""" + if verbose and quiet: + raise typer.BadParameter('--quiet cannot be used with --verbose') + + if verbose and debug: + raise typer.BadParameter('--debug cannot be used with --verbose') + + logging.getLogger('base').setLevel(( + (debug and logging.DEBUG) or + (verbose and logging.INFO) or + logging.ERROR + )) + +def kwargs_processor(ctx: typer.Context) -> dict: + """Processes overflow arguments into kwargs""" + kwargs = {} + length = len(ctx.args) // 2 + + for i in range(length): + kv_pair = ctx.args[i*2:i*2+2] + kwargs[kv_pair[0]] = kv_pair[1] + + return kwargs + + From 407a416d45e3ce26643201f668e2b241d03f6b21 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 17:09:21 +0800 Subject: [PATCH 4/9] CLI stdout config file --- src/thread/config.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/thread/config.py diff --git a/src/thread/config.py b/src/thread/config.py new file mode 100644 index 0000000..647d396 --- /dev/null +++ b/src/thread/config.py @@ -0,0 +1,31 @@ +import logging +from colorama import init, Fore, Style + +init(autoreset = True) + + +# Stdout color config # +class ColorFormatter(logging.Formatter): + COLORS = { + 'DEBUG' : Fore.BLUE, + 'INFO' : Fore.GREEN, + 'WARNING' : Fore.YELLOW, + 'ERROR' : Fore.RED, + 'CRITICAL': Fore.RED + Style.BRIGHT + } + + def format(self, record): + color = self.COLORS.get(record.levelname, '') + if color: + record.levelname = color + Style.BRIGHT + f'{record.levelname:<9}|' + record.msg = color + Fore.WHITE + Style.NORMAL + record.msg + return logging.Formatter.format(self, record) + + +class ColorLogger(logging.Logger): + def __init__(self, name): + logging.Logger.__init__(self, name, logging.DEBUG) + color_formatter = ColorFormatter('%(levelname)s %(message)s' + Style.RESET_ALL) + console = logging.StreamHandler() + console.setFormatter(color_formatter) + self.addHandler(console) From b9822e9a64c77c8426c46203843255fd077d9e74 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 17:09:40 +0800 Subject: [PATCH 5/9] CLI initializing --- src/thread/__main__.py | 6 ++++++ src/thread/cli/__init__.py | 11 +++++++++++ 2 files changed, 17 insertions(+) create mode 100644 src/thread/__main__.py create mode 100644 src/thread/cli/__init__.py diff --git a/src/thread/__main__.py b/src/thread/__main__.py new file mode 100644 index 0000000..5a54980 --- /dev/null +++ b/src/thread/__main__.py @@ -0,0 +1,6 @@ + +# To make CLI accessible with py/python/python3 -m thread ... +from .cli import app + +if __name__ == '__main__': + app(prog_name = 'thread') diff --git a/src/thread/cli/__init__.py b/src/thread/cli/__init__.py new file mode 100644 index 0000000..9c09510 --- /dev/null +++ b/src/thread/cli/__init__.py @@ -0,0 +1,11 @@ +""" +Import and config CLI commands +""" + +__version__ = '0.1.2' +from ..config import logging, ColorLogger +logging.setLoggerClass(ColorLogger) + + +# Import # +from .base import cli_base as app From a479053b1dc0a0ae0148a07299811c3884fd061d Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 17:09:54 +0800 Subject: [PATCH 6/9] CLI base tool --- src/thread/cli/base.py | 164 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 src/thread/cli/base.py diff --git a/src/thread/cli/base.py b/src/thread/cli/base.py new file mode 100644 index 0000000..edeccf7 --- /dev/null +++ b/src/thread/cli/base.py @@ -0,0 +1,164 @@ +import os +import time +import json +import typer +import inspect +import importlib + +import logging +from typing import Union, Pattern, Required, Optional, Callable + +from . import __version__ +from .utils import DebugOption, VerboseOption, QuietOption, verbose_args_processor, kwargs_processor +logger = logging.getLogger('base') + + +cli_base = typer.Typer( + no_args_is_help = True, + context_settings = { + 'help_option_names': ['-h', '--help'] + } +) + + +def version_callback(value: bool): + if value: + typer.echo(f'v{__version__}') + raise typer.Exit() + + +@cli_base.callback(invoke_without_command = True) +def callback( + version: bool = typer.Option( + None, '--version', + callback = version_callback, + help = 'Get the current installed version', + is_eager = True + ), + + debug: bool = DebugOption, + verbose: bool = VerboseOption, + quiet: bool = QuietOption +): + """Thread CLI""" + verbose_args_processor(debug, verbose, quiet) + + + +@cli_base.command(context_settings = {'allow_extra_args': True}) +def process( + ctx: typer.Context, + func: str = typer.Argument(help = '(path.to.file:function_name) OR (lambda x: x)'), + dataset: str = typer.Argument(help = '(path/to/file.txt) OR ([ i for i in range(2) ])'), + + args: list[str] = typer.Option([], '--args', '-a', help = 'Arguments passed to each thread'), + threads: int = typer.Option(8, '--threads', '-t', help = 'Maximum number of threads (will scale down based on dataset size)'), + + daemon: bool = typer.Option(False, '--daemon', '-d', help = 'Threads to run in daemon mode'), + + debug: bool = DebugOption, + verbose: bool = VerboseOption, + quiet: bool = QuietOption +): + """ + Execute parallel processing\n + Kwargs can be parsed by adding overflow arguments in pairs\n + $ thread process ... k1 v1 k2 v2\n + => kwargs = {k1: v1, k2: v2}\n\n + + Single kwargs will be ignored\n + $ thread process ... a1\n + => kwargs = {} + """ + verbose_args_processor(debug, verbose, quiet) + kwargs = kwargs_processor(ctx) + logger.debug('Processed kwargs: %s' % kwargs) + + + # Loading function + f = None + try: + logger.info('Attempted to interpret function') + f = eval(func) # I know eval is bad practice, but I have yet to find a safer replacement + logger.debug(f'Evaluated function: %s' % f) + + if not inspect.isfunction(f): + logger.info('Invalid function') + except Exception: + logger.info('Failed to interpret function') + + if not f: + try: + logger.info('Attempting to fetch function file') + + fPath, fName = func.split(':') + f = importlib.import_module(fPath).__dict__[fName] + logger.debug(f'Evaluated function: %s' % f) + + if not inspect.isfunction(f): + logger.info('Not a function') + raise Exception('Not a function') + except Exception as e: + logger.warning('Failed to fetch function') + raise typer.BadParameter('Failed to fetch function') from e + + + + + # Loading dataset + ds: Union[list, tuple, set, None] = None + try: + logger.info('Attempting to interpret dataset') + ds = json.loads(dataset) + logger.debug(f'Evaluated dataset: %s' % ds) + + if not isinstance(ds, (list, tuple, set)): + logger.info('Invalid dataset literal') + ds = None + + except Exception: + logger.info('Failed to interpret dataset') + + if not ds: + try: + logger.info('Attempting to fetch data file') + if not os.path.isfile(dataset): + logger.info('Invalid file path') + raise Exception('Invalid file path') + + with open(dataset, 'r') as a: + ds = [ i.endswith('\n') and i[:-2] for i in a.readlines() ] + except Exception as e: + logger.warning('Failed to read dataset') + raise typer.BadParameter('Failed to read dataset') from e + + logger.info('Interpreted dataset') + + + # Setup + logger.debug('Importing module') + from ..thread import ParallelProcessing + logger.info('Spawning threads... [Expected: {tcount} threads]'.format(tcount=min(len(ds), threads))) + + newProcess = ParallelProcessing( + function = f, + dataset = list(ds), + args = args, + kwargs = kwargs, + daemon = daemon, + max_threads = threads + ) + + logger.info('Created parallel process') + logger.info('Starting parallel process') + + start_t = time.perf_counter() + newProcess.start() + + logger.info('Started parallel process') + logger.info('Waiting for parallel process to complete, this may take a while...') + + result = newProcess.get_return_values() + + logger.info(f'Completed in {(time.perf_counter() - start_t):.5f}s') + typer.echo(result) From 94c3660ed07259f09064a9b6e5f9df4ff3e29968 Mon Sep 17 00:00:00 2001 From: AlexNg Date: Thu, 7 Dec 2023 23:39:53 +0800 Subject: [PATCH 7/9] Added Progress bar --- src/thread/cli/base.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/thread/cli/base.py b/src/thread/cli/base.py index edeccf7..4148aeb 100644 --- a/src/thread/cli/base.py +++ b/src/thread/cli/base.py @@ -1,11 +1,12 @@ import os import time import json -import typer import inspect import importlib +import typer import logging +from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, TimeElapsedColumn from typing import Union, Pattern, Required, Optional, Callable from . import __version__ @@ -109,8 +110,8 @@ def process( ds: Union[list, tuple, set, None] = None try: logger.info('Attempting to interpret dataset') - ds = json.loads(dataset) - logger.debug(f'Evaluated dataset: %s' % ds) + ds = eval(dataset) + logger.debug(f'Evaluated dataset: %s' % (str(ds)[:125] + '...' if len(str(ds)) > 125 else ds)) if not isinstance(ds, (list, tuple, set)): logger.info('Invalid dataset literal') @@ -158,7 +159,23 @@ def process( logger.info('Started parallel process') logger.info('Waiting for parallel process to complete, this may take a while...') + with Progress( + TextColumn('[bold blue]{task.description}', justify='right'), + BarColumn(bar_width=None), + '[progress.percentage]{task.percentage:>3.1f}%', + '•', + TimeRemainingColumn(), + TimeElapsedColumn(), + ) as progress: + percentage = 0 + job = progress.add_task('Working...', total = 100, fields = 'a') + + while percentage < 100: + percentage = round(sum(i.progress for i in newProcess._threads) / (len(newProcess._threads) or 8), 2) * 100 + progress.update(job, completed = percentage) + time.sleep(0.1) + result = newProcess.get_return_values() logger.info(f'Completed in {(time.perf_counter() - start_t):.5f}s') - typer.echo(result) + # typer.echo(result) From 655b0dfd7f210dea68352eaa68ccd8e4f059f994 Mon Sep 17 00:00:00 2001 From: AlexNg Date: Thu, 7 Dec 2023 23:40:08 +0800 Subject: [PATCH 8/9] Added Progress bar support --- src/thread/thread.py | 46 ++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/src/thread/thread.py b/src/thread/thread.py index c8345e8..b06d671 100644 --- a/src/thread/thread.py +++ b/src/thread/thread.py @@ -10,7 +10,7 @@ from typing import ( Any, List, Callable, Union, Optional, Literal, - Mapping, Sequence, Tuple + Mapping, Sequence, Tuple, TypedDict ) @@ -81,7 +81,7 @@ def __init__( :param **: These are arguments parsed to `thread.Thread` """ _target = self._wrap_target(target) - self.returned_values = None + self.returned_value = None self.status = 'Idle' self.hooks = [] @@ -312,6 +312,14 @@ def start(self) -> None: +class _ThreadWorker: + progress: float + thread: Thread + + def __init__(self, thread: Thread, progress: float = 0) -> None: + self.thread = thread + self.progress = progress + class ParallelProcessing: """ Multi-Threaded Parallel Processing @@ -320,15 +328,14 @@ class ParallelProcessing: Type-Safe and provides more functionality on top """ - _threads : List[Thread] + _threads : List[_ThreadWorker] _completed : int - _return_vales : Mapping[int, List[Data_Out]] status : ThreadStatus function : Callable[..., List[Data_Out]] dataset : Sequence[Data_In] max_threads : int - + overflow_args : Sequence[Overflow_In] overflow_kwargs: Mapping[str, Overflow_In] @@ -378,11 +385,12 @@ def _wrap_function( function: Callable[..., Data_Out] ) -> Callable[..., List[Data_Out]]: @wraps(function) - def wrapper(data_chunk: Sequence[Data_In], *args: Any, **kwargs: Any) -> List[Data_Out]: + def wrapper(index: int, data_chunk: Sequence[Data_In], *args: Any, **kwargs: Any) -> List[Data_Out]: computed: List[Data_Out] = [] - for data_entry in data_chunk: + for i, data_entry in enumerate(data_chunk): v = function(data_entry, *args, **kwargs) computed.append(v) + self._threads[index].progress = round(i/len(data_chunk), 2) self._completed += 1 if self._completed == len(self._threads): @@ -407,8 +415,8 @@ def results(self) -> Data_Out: raise exceptions.ThreadNotInitializedError() results: List[Data_Out] = [] - for thread in self._threads: - results += thread.result + for entry in self._threads: + results += entry.thread.result return results @@ -422,7 +430,7 @@ def is_alive(self) -> bool: """ if len(self._threads) == 0: raise exceptions.ThreadNotInitializedError() - return any(thread.is_alive() for thread in self._threads) + return any(entry.thread.is_alive() for entry in self._threads) def get_return_values(self) -> List[Data_Out]: @@ -434,9 +442,9 @@ def get_return_values(self) -> List[Data_Out]: :returns Any: The return value of the target function """ results: List[Data_Out] = [] - for thread in self._threads: - thread.join() - results += thread.result + for entry in self._threads: + entry.thread.join() + results += entry.thread.result return results @@ -459,8 +467,8 @@ def join(self) -> bool: if self.status == 'Idle': raise exceptions.ThreadNotRunningError() - for thread in self._threads: - thread.join() + for entry in self._threads: + entry.thread.join() return True @@ -473,8 +481,8 @@ def kill(self) -> None: ThreadNotInitializedError: If the thread is not initialized ThreadNotRunningError: If the thread is not running """ - for thread in self._threads: - thread.kill() + for entry in self._threads: + entry.thread.kill() def start(self) -> None: @@ -498,11 +506,11 @@ def start(self) -> None: for i, data_chunk in enumerate(numpy.array_split(self.dataset, max_threads)): chunk_thread = Thread( target = self.function, - args = [data_chunk.tolist(), *parsed_args, *self.overflow_args], + args = [i, data_chunk.tolist(), *parsed_args, *self.overflow_args], name = name_format and name_format % i or None, **self.overflow_kwargs ) - self._threads.append(chunk_thread) + self._threads.append(_ThreadWorker(chunk_thread, 0)) chunk_thread.start() From a97f232c813972e2811d5b95d7e000800f0ef627 Mon Sep 17 00:00:00 2001 From: AlexNg Date: Fri, 8 Dec 2023 10:16:33 +0800 Subject: [PATCH 9/9] Add output vectors --- src/thread/cli/base.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/thread/cli/base.py b/src/thread/cli/base.py index 4148aeb..27328b4 100644 --- a/src/thread/cli/base.py +++ b/src/thread/cli/base.py @@ -56,6 +56,9 @@ def process( threads: int = typer.Option(8, '--threads', '-t', help = 'Maximum number of threads (will scale down based on dataset size)'), daemon: bool = typer.Option(False, '--daemon', '-d', help = 'Threads to run in daemon mode'), + output: str = typer.Option('./output.json', '--output', '-o', help = 'Output file location'), + fileout: bool = typer.Option(True, '--fileout', is_flag = True, help = 'Weather to write output to a file'), + stdout: bool = typer.Option(False, '--stdout', is_flag = True, help = 'Weather to print the output'), debug: bool = DebugOption, verbose: bool = VerboseOption, @@ -76,6 +79,16 @@ def process( logger.debug('Processed kwargs: %s' % kwargs) + # Verify output + if not fileout and not stdout: + raise typer.BadParameter('No output method specified') + + if fileout and not os.path.exists('/'.join(output.split('/')[:-1])): + raise typer.BadParameter('Output file directory does not exist') + + + + # Loading function f = None try: @@ -160,15 +173,16 @@ def process( logger.info('Waiting for parallel process to complete, this may take a while...') with Progress( - TextColumn('[bold blue]{task.description}', justify='right'), - BarColumn(bar_width=None), + TextColumn('[bold blue]{task.description}', justify = 'right'), + BarColumn(bar_width = None), '[progress.percentage]{task.percentage:>3.1f}%', '•', TimeRemainingColumn(), - TimeElapsedColumn(), + '•', + TimeElapsedColumn() ) as progress: percentage = 0 - job = progress.add_task('Working...', total = 100, fields = 'a') + job = progress.add_task('Working...', total = 100) while percentage < 100: percentage = round(sum(i.progress for i in newProcess._threads) / (len(newProcess._threads) or 8), 2) * 100 @@ -178,4 +192,15 @@ def process( result = newProcess.get_return_values() logger.info(f'Completed in {(time.perf_counter() - start_t):.5f}s') - # typer.echo(result) + if fileout: + logger.info(f'Writing file to {output}...') + try: + with open(output, 'w') as f: + json.dump(result, f, indent = 2) + logger.info(f'Wrote to file') + except Exception as e: + logger.error('Failed to write to file') + logger.debug(str(e)) + + if stdout: + typer.echo(result)