Skip to content

Commit

Permalink
Add TapManager to use pipeline taps directly
Browse files Browse the repository at this point in the history
In some case user may want to use pipeline directly without worker
logic. TapManager allows user to create source and destination taps.
  • Loading branch information
Yifan Zhang committed Dec 7, 2020
1 parent 7a81879 commit db21552
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 65 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ venv: requirements.txt requirements.dev.txt
rm -rf venv && \
python3 -m venv venv && \
. venv/bin/activate && \
python3 -m pip install -r requirements.dev.txt &&\
pyenv local 3.7.7 3.8.2
python3 -m pip install -r requirements.dev.txt
pytest:
. venv/bin/activate; \
python3 -m pytest
Expand Down
4 changes: 2 additions & 2 deletions src/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
SplitterConfig, Splitter,
)
from .cache import CacheOf, CachedMessageClass
from .utils import parse_kind
from .utils import TapManager


__all__ = [
Expand All @@ -23,7 +23,7 @@
'SplitterConfig',
'Splitter',
'Message',
'parse_kind',
'CacheOf',
'CachedMessageClass',
'TapManager',
]
61 changes: 61 additions & 0 deletions src/pipeline/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import argparse
import os
import sys
import time

from .cache import KindsOfCache
from .tap import KindsOfSource


def parse_kind(args):
kindParser = argparse.ArgumentParser(add_help=False)
kindParser.add_argument('--kind', type=str, default=os.environ.get('PIPELINE', None),
choices=KindsOfSource(),
help='pipeline kind, can be {}'.format(','.join(KindsOfSource())))
kindParser.add_argument('--cacheKind', type=str, default=os.environ.get('CACHEKIND', None),
choices=KindsOfCache(),
help='cache kind, can be {}'.format(','.join(KindsOfCache())))
known, extras = kindParser.parse_known_args(args)
if known.kind is None:
kindParser.print_help(sys.stderr)
return known, extras


class Timer:
def __init__(self):
self.startTime = time.perf_counter()
self.startProcessTime = time.process_time()
self.totalTime = 0.0
self.totalProcessTime = 0.0
self.timeCount = 0
self.processTimeCount = 0

def elapsed_time(self):
t = time.perf_counter() - self.startTime
self.totalTime += t
self.timeCount += 1
return t

def average_time(self):
return self.totalTime / self.timeCount

def process_time(self):
t = time.process_time() - self.startProcessTime
self.totalProcessTime += t
self.processTimeCount += 1
return t

def average_process_time(self):
return self.totalProcessTime / self.processTimeCount

def start(self):
self.startTime = time.perf_counter()
self.startProcessTime = time.process_time()

def log(self, logger):
elapsedTime = self.elapsed_time()
averageTime = self.average_time()
logger.info('Elapsed Time: %.2f, Average Time: %.2f', elapsedTime, averageTime)
processTime = self.process_time()
averageProcessTime = self.average_process_time()
logger.info('Process Time: %.2f, Average Process Time: %.2f', processTime, averageProcessTime)
101 changes: 41 additions & 60 deletions src/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,42 @@
import argparse
import os
import sys
import time

from .cache import KindsOfCache
from .tap import KindsOfSource


def parse_kind(args):
kindParser = argparse.ArgumentParser(add_help=False)
kindParser.add_argument('--kind', type=str, default=os.environ.get('PIPELINE', None),
choices=KindsOfSource(),
help='pipeline kind, can be {}'.format(','.join(KindsOfSource())))
kindParser.add_argument('--cacheKind', type=str, default=os.environ.get('CACHEKIND', None),
choices=KindsOfCache(),
help='cache kind, can be {}'.format(','.join(KindsOfCache())))
known, extras = kindParser.parse_known_args(args)
if known.kind is None:
kindParser.print_help(sys.stderr)
return known, extras


class Timer:
def __init__(self):
self.startTime = time.perf_counter()
self.startProcessTime = time.process_time()
self.totalTime = 0.0
self.totalProcessTime = 0.0
self.timeCount = 0
self.processTimeCount = 0

def elapsed_time(self):
t = time.perf_counter() - self.startTime
self.totalTime += t
self.timeCount += 1
return t

def average_time(self):
return self.totalTime / self.timeCount

def process_time(self):
t = time.process_time() - self.startProcessTime
self.totalProcessTime += t
self.processTimeCount += 1
return t

def average_process_time(self):
return self.totalProcessTime / self.processTimeCount

def start(self):
self.startTime = time.perf_counter()
self.startProcessTime = time.process_time()

def log(self, logger):
elapsedTime = self.elapsed_time()
averageTime = self.average_time()
logger.info('Elapsed Time: %.2f, Average Time: %.2f', elapsedTime, averageTime)
processTime = self.process_time()
averageProcessTime = self.average_process_time()
logger.info('Process Time: %.2f, Average Process Time: %.2f', processTime, averageProcessTime)

from .tap import SourceOf, DestinationOf
from .helpers import parse_kind


class TapManager(object):
""" use TapManager to construct pipelines without worker logic
>>> tapManager = TapManager('MEM')
"""
def __init__(self, kind=None, noInput=False, noOutput=False):
assert not (noInput and noOutput)

parser = argparse.ArgumentParser('pipeline', conflict_handler='resolve')
known, extras = parse_kind(["--kind", kind])

if not noInput:
self.sources = {}
self.sourceClass = SourceOf(known.kind)
self.sourceClass.add_arguments(parser)

if not noOutput:
self.destinations = {}
self.destinationClass = DestinationOf(known.kind)
self.destinationClass.add_arguments(parser)

self.options = parser.parse_args(extras)

def addSourceTopic(self, name):
self.options.in_topic = name
self.sources[name] = self.sourceClass(self.options)

def addDestinationTopic(self, name):
self.options.out_topic = name
self.destinations[name] = self.destinationClass(self.options)

def sourceOf(self, name):
return self.sources[name]

def destinationOf(self, name):
return self.destinations[name]
2 changes: 1 addition & 1 deletion src/pipeline/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .message import Message
from .monitor import Monitor
from .tap import DestinationOf, SourceOf
from .utils import parse_kind, Timer
from .helpers import parse_kind, Timer

logger = logging.getLogger('pipeline')
logger.setLevel(logging.INFO)
Expand Down

0 comments on commit db21552

Please sign in to comment.