From db215528209bd3f58df3cbbeeb1c209783921fcb Mon Sep 17 00:00:00 2001 From: Yifan Zhang Date: Mon, 7 Dec 2020 14:20:58 +0300 Subject: [PATCH] Add TapManager to use pipeline taps directly In some case user may want to use pipeline directly without worker logic. TapManager allows user to create source and destination taps. --- Makefile | 3 +- src/pipeline/__init__.py | 4 +- src/pipeline/helpers.py | 61 +++++++++++++++++++++++ src/pipeline/utils.py | 101 ++++++++++++++++----------------------- src/pipeline/worker.py | 2 +- 5 files changed, 106 insertions(+), 65 deletions(-) create mode 100644 src/pipeline/helpers.py diff --git a/Makefile b/Makefile index bd75cf6..9bc0517 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,7 @@ venv: requirements.txt requirements.dev.txt rm -rf venv && \ python3 -m venv venv && \ . venv/bin/activate && \ - python3 -m pip install -r requirements.dev.txt &&\ - pyenv local 3.7.7 3.8.2 + python3 -m pip install -r requirements.dev.txt pytest: . venv/bin/activate; \ python3 -m pytest diff --git a/src/pipeline/__init__.py b/src/pipeline/__init__.py index 686908a..798ea54 100644 --- a/src/pipeline/__init__.py +++ b/src/pipeline/__init__.py @@ -7,7 +7,7 @@ SplitterConfig, Splitter, ) from .cache import CacheOf, CachedMessageClass -from .utils import parse_kind +from .utils import TapManager __all__ = [ @@ -23,7 +23,7 @@ 'SplitterConfig', 'Splitter', 'Message', - 'parse_kind', 'CacheOf', 'CachedMessageClass', + 'TapManager', ] diff --git a/src/pipeline/helpers.py b/src/pipeline/helpers.py new file mode 100644 index 0000000..f55d229 --- /dev/null +++ b/src/pipeline/helpers.py @@ -0,0 +1,61 @@ +import argparse +import os +import sys +import time + +from .cache import KindsOfCache +from .tap import KindsOfSource + + +def parse_kind(args): + kindParser = argparse.ArgumentParser(add_help=False) + kindParser.add_argument('--kind', type=str, default=os.environ.get('PIPELINE', None), + choices=KindsOfSource(), + help='pipeline kind, can be {}'.format(','.join(KindsOfSource()))) + kindParser.add_argument('--cacheKind', type=str, default=os.environ.get('CACHEKIND', None), + choices=KindsOfCache(), + help='cache kind, can be {}'.format(','.join(KindsOfCache()))) + known, extras = kindParser.parse_known_args(args) + if known.kind is None: + kindParser.print_help(sys.stderr) + return known, extras + + +class Timer: + def __init__(self): + self.startTime = time.perf_counter() + self.startProcessTime = time.process_time() + self.totalTime = 0.0 + self.totalProcessTime = 0.0 + self.timeCount = 0 + self.processTimeCount = 0 + + def elapsed_time(self): + t = time.perf_counter() - self.startTime + self.totalTime += t + self.timeCount += 1 + return t + + def average_time(self): + return self.totalTime / self.timeCount + + def process_time(self): + t = time.process_time() - self.startProcessTime + self.totalProcessTime += t + self.processTimeCount += 1 + return t + + def average_process_time(self): + return self.totalProcessTime / self.processTimeCount + + def start(self): + self.startTime = time.perf_counter() + self.startProcessTime = time.process_time() + + def log(self, logger): + elapsedTime = self.elapsed_time() + averageTime = self.average_time() + logger.info('Elapsed Time: %.2f, Average Time: %.2f', elapsedTime, averageTime) + processTime = self.process_time() + averageProcessTime = self.average_process_time() + logger.info('Process Time: %.2f, Average Process Time: %.2f', processTime, averageProcessTime) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index f55d229..809c41b 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -1,61 +1,42 @@ import argparse -import os -import sys -import time - -from .cache import KindsOfCache -from .tap import KindsOfSource - - -def parse_kind(args): - kindParser = argparse.ArgumentParser(add_help=False) - kindParser.add_argument('--kind', type=str, default=os.environ.get('PIPELINE', None), - choices=KindsOfSource(), - help='pipeline kind, can be {}'.format(','.join(KindsOfSource()))) - kindParser.add_argument('--cacheKind', type=str, default=os.environ.get('CACHEKIND', None), - choices=KindsOfCache(), - help='cache kind, can be {}'.format(','.join(KindsOfCache()))) - known, extras = kindParser.parse_known_args(args) - if known.kind is None: - kindParser.print_help(sys.stderr) - return known, extras - - -class Timer: - def __init__(self): - self.startTime = time.perf_counter() - self.startProcessTime = time.process_time() - self.totalTime = 0.0 - self.totalProcessTime = 0.0 - self.timeCount = 0 - self.processTimeCount = 0 - - def elapsed_time(self): - t = time.perf_counter() - self.startTime - self.totalTime += t - self.timeCount += 1 - return t - - def average_time(self): - return self.totalTime / self.timeCount - - def process_time(self): - t = time.process_time() - self.startProcessTime - self.totalProcessTime += t - self.processTimeCount += 1 - return t - - def average_process_time(self): - return self.totalProcessTime / self.processTimeCount - - def start(self): - self.startTime = time.perf_counter() - self.startProcessTime = time.process_time() - - def log(self, logger): - elapsedTime = self.elapsed_time() - averageTime = self.average_time() - logger.info('Elapsed Time: %.2f, Average Time: %.2f', elapsedTime, averageTime) - processTime = self.process_time() - averageProcessTime = self.average_process_time() - logger.info('Process Time: %.2f, Average Process Time: %.2f', processTime, averageProcessTime) + +from .tap import SourceOf, DestinationOf +from .helpers import parse_kind + + +class TapManager(object): + """ use TapManager to construct pipelines without worker logic + + >>> tapManager = TapManager('MEM') + """ + def __init__(self, kind=None, noInput=False, noOutput=False): + assert not (noInput and noOutput) + + parser = argparse.ArgumentParser('pipeline', conflict_handler='resolve') + known, extras = parse_kind(["--kind", kind]) + + if not noInput: + self.sources = {} + self.sourceClass = SourceOf(known.kind) + self.sourceClass.add_arguments(parser) + + if not noOutput: + self.destinations = {} + self.destinationClass = DestinationOf(known.kind) + self.destinationClass.add_arguments(parser) + + self.options = parser.parse_args(extras) + + def addSourceTopic(self, name): + self.options.in_topic = name + self.sources[name] = self.sourceClass(self.options) + + def addDestinationTopic(self, name): + self.options.out_topic = name + self.destinations[name] = self.destinationClass(self.options) + + def sourceOf(self, name): + return self.sources[name] + + def destinationOf(self, name): + return self.destinations[name] diff --git a/src/pipeline/worker.py b/src/pipeline/worker.py index 5561286..494c88c 100644 --- a/src/pipeline/worker.py +++ b/src/pipeline/worker.py @@ -11,7 +11,7 @@ from .message import Message from .monitor import Monitor from .tap import DestinationOf, SourceOf -from .utils import parse_kind, Timer +from .helpers import parse_kind, Timer logger = logging.getLogger('pipeline') logger.setLevel(logging.INFO)