Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Remove redundant code + add type checker #234

Merged
merged 8 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agogosml/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ clean-test: ## remove test and coverage artifacts

lint: ## check python style
flake8 agogosml tests
mypy agogosml
isort --check-only --recursive agogosml

test:
Expand Down
1 change: 1 addition & 0 deletions agogosml/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Sphinx = "*"
twine = "*"
pylint = "*"
pytest-cov = "*"
mypy = "*"

[packages]
confluent-kafka = "~=0.11"
Expand Down
1,766 changes: 903 additions & 863 deletions agogosml/Pipfile.lock

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions agogosml/agogosml/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-

"""Top-level package for Agogosml."""

__author__ = """Rami Sayar"""
__email__ = 'rami.sayar@microsoft.com'
__version__ = '0.1.2'
Expand Down
1 change: 0 additions & 1 deletion agogosml/agogosml/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
# -*- coding: utf-8 -*-
22 changes: 8 additions & 14 deletions agogosml/agogosml/common/abstract_streaming_client.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
"""Abstract streaming client class"""

from abc import ABC
from abc import abstractmethod
from functools import lru_cache
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Type

from agogosml.utils.imports import find_implementations

Callback = Callable[[str], None]


class AbstractStreamingClient(ABC):
@abstractmethod
def __init__(self, config: dict):
"""
Abstract Streaming Client

:param config: Dictionary file with all the relevant parameters.
"""
pass

@abstractmethod
def send(self, *args, **kwargs):
"""Send method."""
def send(self, message: str):
pass

@abstractmethod
def stop(self, *args, **kwargs):
"""Stop method."""
def stop(self):
pass

@abstractmethod
def start_receiving(self, *args, **kwargs):
"""Start receiving messages from streaming client."""
def start_receiving(self, on_message_received: Callback):
pass


Expand All @@ -51,7 +45,7 @@ def find_streaming_clients() -> Dict[str, StreamingClientType]:
}


def create_streaming_client_from_config(config: dict) -> AbstractStreamingClient:
def create_streaming_client_from_config(config: Optional[dict]) -> AbstractStreamingClient:
config = config or {}
try:
client_config = config['config']
Expand Down
16 changes: 1 addition & 15 deletions agogosml/agogosml/common/broadcast_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
class BroadcastStreamingClient(AbstractStreamingClient):
def __init__(self, config: dict):
"""
Class to create a BroadcastStreamingClient instance.

Configuration keys:
CLIENTS

:param config: Dictionary file with all the relevant parameters.
"""

self.clients = [
Expand All @@ -19,20 +15,10 @@ def __init__(self, config: dict):
for conf in config.get('CLIENTS', [])
]

def start_receiving(self, on_message_received_callback):
"""
Receive messages from all the clients

:param on_message_received_callback: Callback function.
"""
def start_receiving(self, on_message_received):
raise NotImplementedError

def send(self, message):
"""
Send a message to all the clients

:param message: A string input to upload to event hub.
"""
success = True
for client in self.clients:
success &= client.send(message)
Expand Down
9 changes: 0 additions & 9 deletions agogosml/agogosml/common/eventhub_processor_events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""EventProcessor host class for Event Hub"""

from azure.eventprocessorhost import AbstractEventProcessor

from ..utils.logger import Logger
Expand All @@ -8,14 +6,7 @@


class EventProcessor(AbstractEventProcessor):
"""Example Implementation of AbstractEventProcessor."""

def __init__(self, params):
"""
Init Event processor.

:param params: List of params.
"""
super().__init__()
self.on_message_received_callback = params[0]
self._msg_counter = 0
Expand Down
22 changes: 3 additions & 19 deletions agogosml/agogosml/common/eventhub_streaming_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Event Hub streaming client"""

import asyncio

from azure.eventhub import EventData
Expand All @@ -19,8 +17,6 @@
class EventHubStreamingClient(AbstractStreamingClient):
def __init__(self, config):
"""
Class to create an EventHubStreamingClient instance.

Configuration keys:
AZURE_STORAGE_ACCESS_KEY
AZURE_STORAGE_ACCOUNT
Expand All @@ -31,8 +27,6 @@ def __init__(self, config):
EVENT_HUB_SAS_POLICY
LEASE_CONTAINER_NAME
TIMEOUT

:param config: Dictionary file with all the relevant parameters.
"""

self.message_callback = None
Expand Down Expand Up @@ -88,19 +82,14 @@ def __init__(self, config):
logger.error('Failed to init EH send client: %s', e)
raise

def start_receiving(self, on_message_received_callback):
"""
Receive messages from an EventHubStreamingClient.

:param on_message_received_callback: Callback function.
"""
def start_receiving(self, on_message_received):
loop = asyncio.get_event_loop()
try:
host = EventProcessorHost(
EventProcessor,
self.eph_client,
self.storage_manager,
ep_params=[on_message_received_callback],
ep_params=[on_message_received],
eph_options=self.eh_options,
loop=loop)

Expand All @@ -111,12 +100,7 @@ def start_receiving(self, on_message_received_callback):
finally:
loop.stop()

def send(self, message):
"""
Send a message to an EventHubStreamingClient.

:param message: A string input to upload to event hub.
"""
def send(self, message: str):
c-w marked this conversation as resolved.
Show resolved Hide resolved
try:
self.sender.send(EventData(body=message))
logger.info('Sent message: %s', message)
Expand Down
3 changes: 1 addition & 2 deletions agogosml/agogosml/common/flask_http_listener_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
""" Flask client to receive messages from customer app"""
import threading

from flask import Flask
Expand All @@ -17,7 +16,7 @@ def __init__(self, config: dict):
PORT
HOST
"""
self.port = int(config.get('PORT'))
self.port = int(config['PORT']) if 'PORT' in config else None
c-w marked this conversation as resolved.
Show resolved Hide resolved
self.host = config.get('HOST', DEFAULT_HOST)

def thread_flask(self):
Expand Down
11 changes: 1 addition & 10 deletions agogosml/agogosml/common/http_message_sender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""HttpMessageSender."""

from agogosml.common.message_sender import MessageSender
from agogosml.utils.http_request import post_with_retries
from agogosml.utils.logger import Logger
Expand All @@ -8,8 +6,6 @@


class HttpMessageSender(MessageSender):
"""HttpMessageSender."""

def __init__(self, config: dict):
"""
Configuration keys:
Expand All @@ -29,7 +25,7 @@ def __init__(self, config: dict):
if not host:
raise ValueError('Host endpoint must be provided.')

if int(port) <= 0:
if not port or int(port) <= 0:
raise ValueError('Port cannot be 0 or less.')

if scheme not in ('http', 'https'):
Expand All @@ -42,11 +38,6 @@ def __init__(self, config: dict):
logger.info("server_address: %s", self.server_address)

def send(self, message):
"""
Sends messages to specified address via HTTP.

:param message: JSON formatted message.
"""
return_value = False
try:
status_code = post_with_retries(
Expand Down
40 changes: 6 additions & 34 deletions agogosml/agogosml/common/kafka_streaming_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Kafka streaming client"""

import datetime

from confluent_kafka import Consumer
Expand All @@ -17,17 +15,13 @@
class KafkaStreamingClient(AbstractStreamingClient):
def __init__(self, config):
"""
Class to create a KafkaStreamingClient instance.

Configuration keys:
APP_HOST
APP_PORT
KAFKA_ADDRESS
KAFKA_CONSUMER_GROUP
KAFKA_TOPIC
TIMEOUT

:param config: Dictionary file with all the relevant parameters.
"""

self.topic = config.get("KAFKA_TOPIC")
Expand Down Expand Up @@ -67,9 +61,6 @@ def create_kafka_config(user_config):
def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().

:param err: An error message.
:param msg: A string input to be uploaded to kafka.
"""

if err is not None:
Expand All @@ -78,12 +69,7 @@ def delivery_report(self, err, msg):
logger.info('Message delivered to %s [%s]',
msg.topic(), msg.partition())

def send(self, message: str, *args, **kwargs):
"""
Upload a message to a kafka topic.

:param message: A string input to upload to kafka.
"""
def send(self, message: str):
if not isinstance(message, str):
raise TypeError('str type expected for message')
try:
Expand All @@ -97,10 +83,10 @@ def send(self, message: str, *args, **kwargs):
logger.error('Error sending message to kafka: %s', e)
return False

def stop(self, *args, **kwargs):
def stop(self):
pass

def check_timeout(self, start):
def check_timeout(self, start: datetime.datetime):
"""
Checks how much time has elapsed since the kafka client started running.

Expand All @@ -112,11 +98,6 @@ def check_timeout(self, start):
raise KeyboardInterrupt

def handle_kafka_error(self, msg):
"""
Handle an error in kafka.

:param msg: Error message from kafka.
"""
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
logger.error('%% %s [%d] reached end at offset %d\n',
Expand All @@ -125,17 +106,8 @@ def handle_kafka_error(self, msg):
# Error
raise KafkaException(msg.error())

def start_receiving(self, on_message_received_callback):
"""
Receive messages from a kafka topic.

:param on_message_received_callback: Callback function.
"""
'''
TODO:
We are going to need documentation for Kafka
to ensure proper syntax is clear
'''
def start_receiving(self, on_message_received):
# TODO: We are going to need documentation for Kafka to ensure proper syntax is clear
try:
self.subscribe_to_topic()
start = datetime.datetime.now()
Expand All @@ -147,7 +119,7 @@ def start_receiving(self, on_message_received_callback):
# Poll messages from topic
msg = self.read_single_message()
if msg is not None:
on_message_received_callback(msg)
on_message_received(msg)
c-w marked this conversation as resolved.
Show resolved Hide resolved

except KeyboardInterrupt:
logger.info('Aborting listener...')
Expand Down
7 changes: 4 additions & 3 deletions agogosml/agogosml/common/listener_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Abstract listener client broker class"""

from abc import ABC
from abc import abstractmethod
from typing import Callable

Callback = Callable[[str], bool]


class ListenerClient(ABC):
Expand All @@ -11,7 +12,7 @@ def __init__(self, config: dict):
pass

@abstractmethod
def start(self, on_message_received):
def start(self, on_message_received: Callback):
pass

@abstractmethod
Expand Down
9 changes: 1 addition & 8 deletions agogosml/agogosml/common/message_sender.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
"""Abstract Message Sender class"""

from abc import ABC
from abc import abstractmethod


class MessageSender(ABC):
"""Message Sender class."""

@abstractmethod
def __init__(self, config: dict):
pass

@abstractmethod
def send(self, *args, **kwargs):
"""
Sends message.
"""
def send(self, message) -> bool:
pass
Loading