Skip to content

Commit

Permalink
improve tap
Browse files Browse the repository at this point in the history
  • Loading branch information
Yifan Zhang committed Dec 16, 2021
1 parent ccad562 commit 37865d3
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ confluent-kafka = {version = "1.3.*", optional = true}
pulsar-client = {version = "2.5.*", optional = true}
elasticsearch = {version = "7.15.*", optional = true}
pymongo = {version = "3.12.*", optional = true}
python-dotenv = "^0.19.2"
python-dotenv = {version = "^0.19.2", optional = true}

[tool.poetry.dev-dependencies]
tox = "^3.23.1"
Expand All @@ -36,6 +36,7 @@ kafka=["conflucent-kafka"]
pulsar=["pulsar-client"]
elastic=["elasticsearch"]
mongodb=["pymongo"]
cli=["python-dotenv"]

[tool.poetry.urls]
homepage="https://github.com/yifan/pipeline"
Expand Down
4 changes: 2 additions & 2 deletions src/pipeline/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def add_source_topic(self, name: str) -> None:
settings=settings, logger=self.logger
)
self.sources[name] = source
self.logger.info(f"Source {source} added for {name}")
self.logger.info(f"Source {str(source)} added for `{name}`")

def add_destination_topic(self, name: str) -> None:
"""Add a new :class:`DestinationTap` with a defined topic(queue) name
Expand All @@ -125,7 +125,7 @@ def add_destination_topic(self, name: str) -> None:
settings=settings, logger=self.logger
)
self.destinations[name] = destination
self.logger.info(f"Destination {destination} added for {name}")
self.logger.info(f"Destination {destination} added for `{name}`")

def source_of(self, name: str) -> SourceTap:
"""Return the :class:`SourceTap` of specified topic(queue) name"""
Expand Down
12 changes: 12 additions & 0 deletions src/pipeline/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def __init__(
self.topic = settings.topic
self.logger = logger

def __str__(self) -> str:
return self.__repr__()

def __repr__(self) -> str:
return f"Source(topic=({self.topic}), settings=({self.settings}))"

@abstractmethod
def read(self) -> Iterator[MessageBase]:
"""receive message."""
Expand Down Expand Up @@ -101,6 +107,12 @@ def __init__(self, settings: DestinationSettings, logger: Logger = pipelineLogge
self.topic = settings.topic
self.logger = logger

def __str__(self) -> str:
return self.__repr__()

def __repr__(self) -> str:
return f"Destination(topic=({self.topic}), settings=({self.settings}))"

@abstractmethod
def write(self, message: MessageBase) -> int:
"""send message."""
Expand Down
11 changes: 11 additions & 0 deletions tests/test_taps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@


class TestTaps(TestCase):
def test_repr(self):
destination_and_settings_classes = DestinationTap.of(TapKind.MEM)
settings = destination_and_settings_classes.settings_class()
destination = destination_and_settings_classes.destination_class(settings)
assert str(destination).startswith("Destination")

source_and_settings_classes = SourceTap.of(TapKind.MEM)
settings = source_and_settings_classes.settings_class()
source = source_and_settings_classes.source_class(settings)
assert str(source).startswith("Source")

def test_file(self):
destination_and_settings_classes = DestinationTap.of(TapKind.FILE)
with tempfile.NamedTemporaryFile() as tmpfile:
Expand Down

0 comments on commit 37865d3

Please sign in to comment.