Skip to content

Commit

Permalink
Merge pull request #14 from jcollado/use-mypy
Browse files Browse the repository at this point in the history
Use mypy
  • Loading branch information
jcollado committed Jan 18, 2017
2 parents 8842f71 + ce3777b commit 62bb2ef
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 16 deletions.
1 change: 1 addition & 0 deletions .landscape.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requirements:
ignore-paths:
- docs
- travis_pypi_setup.py
- stubs
python-targets:
- 2
- 3
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ python: 3.5
env:
- TOXENV=py35
- TOXENV=py27
- TOXENV=flake8
- TOXENV=mypy2
- TOXENV=mypy3
install: pip install -U tox
script: tox -e ${TOXENV}
deploy:
Expand Down
11 changes: 9 additions & 2 deletions rabbithole/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import blinker
import pika

from typing import Dict # noqa

LOGGER = logging.getLogger(__name__)


Expand All @@ -30,6 +32,7 @@ class Consumer(object):
"""

def __init__(self, server):
# type: (str) -> None
"""Configure queue."""
LOGGER.info('Connecting to %r...', server)
parameters = pika.ConnectionParameters(server)
Expand All @@ -45,9 +48,10 @@ def __init__(self, server):

self.channel = channel
self.queue_name = queue_name
self.signals = {}
self.signals = {} # type: Dict[str, blinker.Signal]

def __call__(self, exchange_name):
# type: (str) -> blinker.Signal
"""Create signal to send when a message from a exchange is received.
:param exchange_name: Exchange name to bind to the queue
Expand Down Expand Up @@ -75,6 +79,7 @@ def __call__(self, exchange_name):
return signal

def run(self):
# type: () -> None
"""Run ioloop and consume messages."""
logging.info('Waiting for messages...')
self.channel.start_consuming()
Expand All @@ -84,8 +89,10 @@ def message_received_cb(self, channel, method_frame, header_frame, body):
:param channel: Connection channel with AMQP server
:type channel: pika.channel.Channel
:param method_frame: AMPQ method related data
:param method_frame: AMQP method related data
:type method_frame: pika.spec.Deliver
:param header_frame: AMQP message related data
:type header_frame: pika.spec.BasicProperties
:param body: Message body
:type body: str
Expand Down
22 changes: 17 additions & 5 deletions rabbithole/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

import blinker

from typing import ( # noqa
Dict,
List,
Optional,
)

LOGGER = logging.getLogger(__name__)


Expand All @@ -35,25 +41,27 @@ class Batcher(object):
DEFAULT_TIME_LIMIT = 15

def __init__(self, size_limit=None, time_limit=None):
# type: (int, int) -> None
"""Initialize internal data structures."""
self.size_limit = size_limit or self.DEFAULT_SIZE_LIMIT
self.time_limit = time_limit or self.DEFAULT_TIME_LIMIT

self.batch = []
self.batch = [] # type: List[Dict[str, object]]
self.lock = threading.Lock()
self.timer = None
self.timer = None # type: Optional[threading.Timer]
self.batch_ready = blinker.Signal()

def message_received_cb(self, sender, payload):
# type: (object, Dict[str, object]) -> None
"""Handle message received event.
This callback is executed when message is received by the AMQP
consumer.
:param sender: The consumer who sent the message
:type sender: rabbithole.consumer.Consumer
:param payload: Records to send to the output
:type payload: list(dict(str))
:type sender: object
:param payload: Record to send to the output
:type payload: dict(str)
"""
# Use a lock to make sure that callback execution doesn't interleave
Expand All @@ -73,6 +81,7 @@ def message_received_cb(self, sender, payload):
self.cancel_timer()

def time_expired_cb(self):
# type: () -> None
"""Handle time expired event.
This callback is executed in a timer thread when the time limit for a
Expand All @@ -96,6 +105,7 @@ def time_expired_cb(self):
)

def queue_batch(self):
# type: () -> None
"""Queue batch before sending to the output.
A batch is queued either by the main thread when the size limit is
Expand All @@ -112,6 +122,7 @@ def queue_batch(self):
self.batch = []

def start_timer(self):
# type: () -> None
"""Start timer thread.
A timer thread is started to make sure that the batch will be sent to
Expand All @@ -133,6 +144,7 @@ def start_timer(self):
self.timer = timer

def cancel_timer(self):
# type: () -> None
"""Cancel timer thread.
A timer thread might be cancelled if the size limit for a batch is
Expand Down
22 changes: 18 additions & 4 deletions rabbithole/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
import six
import yaml

from typing import ( # noqa
Any,
List,
)

from rabbithole.amqp import Consumer
from rabbithole.sql import Database
from rabbithole.batcher import Batcher
Expand All @@ -27,6 +32,7 @@


def main(argv=None):
# type: (List[str]) -> int
"""Console script for rabbithole
:param argv: Command line arguments
Expand All @@ -37,8 +43,8 @@ def main(argv=None):
argv = sys.argv[1:]

args = parse_arguments(argv)
config = args.config
configure_logging(args.log_level)
config = args['config']
configure_logging(args['log_level'])
logging.debug('Configuration:\n%s', pformat(config))

namespace = {
Expand All @@ -64,6 +70,7 @@ def main(argv=None):


def create_block_instance(block):
# type: (Dict[str, Any]) -> object
"""Create block instance from its configuration
:param block: Block configuration
Expand Down Expand Up @@ -94,6 +101,7 @@ def create_block_instance(block):


def create_flow(flow, namespace, batcher_config):
# type: (List[Dict[str, Any]], Dict[str, Any], Dict[str, int]) -> None
"""Create flow by connecting block signals.
:param flow: Flow configuration
Expand Down Expand Up @@ -145,6 +153,7 @@ def create_flow(flow, namespace, batcher_config):


def run_input_blocks(namespace):
# type: (Dict[str, object]) -> List[threading.Thread]
"""Run inputs blocks and start receiving messages from them.
:param namespace: Block instances namespace
Expand All @@ -164,15 +173,19 @@ def run_input_blocks(namespace):


def parse_arguments(argv):
# type: (List[str]) -> Dict[str, Any]
"""Parse command line arguments.
:param argv: Command line arguments
:type argv: list(str)
:returns: Parsed arguments
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser(description=__doc__)

def yaml_file(path):
# type: (str) -> str
"""Yaml file argument.
:param path: Path to the yaml file
Expand Down Expand Up @@ -206,12 +219,13 @@ def yaml_file(path):
'(%(default)s by default)'
.format(', '.join(log_levels[:-1]), log_levels[-1])))

args = parser.parse_args(argv)
args.log_level = getattr(logging, args.log_level.upper())
args = vars(parser.parse_args(argv))
args['log_level'] = getattr(logging, args['log_level'].upper())
return args


def configure_logging(log_level):
# type: (int) -> None
"""Configure logging based on command line argument.
:param log_level: Log level passed form the command line
Expand Down
3 changes: 3 additions & 0 deletions rabbithole/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ class Database(object):
"""

def __init__(self, url):
# type: (str) -> None
"""Create database engine."""
engine = create_engine(url)
self.connection = engine.connect()
LOGGER.debug('Connected to: %r', url)

def __call__(self, query):
# type: (str) -> partial
"""Return callback to use when a batch is ready.
:param query: The query to execute to insert the batch
Expand All @@ -41,6 +43,7 @@ def __call__(self, query):
return partial(self.batch_ready_cb, query=text(query))

def batch_ready_cb(self, sender, query, batch):
# type: (object, object, List[Dict[str, object]]) -> None
"""Execute insert query for the batch that is ready.
:param sender: The batcher who sent the batch_ready signal
Expand Down
2 changes: 2 additions & 0 deletions requirements/test_py2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r test.txt
typing
1 change: 1 addition & 0 deletions requirements/test_py3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-r test.txt
14 changes: 14 additions & 0 deletions stubs/blinker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import (
Optional,
)


class Signal(object):
def __init__(self) -> None:
pass

def connect(self, receiver: object, weak: bool) -> None:
pass

def send(self, sender: Optional[object], **kwargs: object) -> None:
pass
6 changes: 6 additions & 0 deletions stubs/mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class MagicMock(object):
pass


def patch():
pass
11 changes: 11 additions & 0 deletions stubs/pika.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class ConnectionParameters(object):
def __init__(self, server):
pass


class BlockingConnection(object):
def __init__(self, parameters):
pass

def channel(self):
pass
6 changes: 6 additions & 0 deletions stubs/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def create_engine(url):
pass


def text(query):
pass
2 changes: 2 additions & 0 deletions stubs/sqlalchemy/exc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class SQLAlchemyError(Exception):
pass
6 changes: 6 additions & 0 deletions stubs/yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def load(file_):
pass


class YAMLError(Exception):
pass
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import yaml

from mock import (
Mock,
MagicMock as Mock,
patch,
)
from six import StringIO
Expand Down Expand Up @@ -186,7 +186,7 @@ def test_config_file_load_success(self):
StringIO(yaml.dump(expected_value)))
args = parse_arguments(['some file'])

self.assertDictEqual(args.config, expected_value)
self.assertDictEqual(args['config'], expected_value)


class TestConfigureLogging(TestCase):
Expand Down
24 changes: 21 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
[tox]
envlist = py27, py35, flake8
envlist = py27, py35, flake8, mypy2, mypy3

[testenv:py27]
deps = -r{toxinidir}/requirements/test_py2.txt

[testenv:py35]
deps = -r{toxinidir}/requirements/test_py3.txt

[testenv:flake8]
basepython=python
deps=flake8
commands=flake8 rabbithole

[testenv:mypy2]
basepython=python3
setenv =
MYPYPATH = {toxinidir}/stubs
deps=mypy-lang
commands=mypy --py2 rabbithole tests

[testenv:mypy3]
basepython=python3
setenv =
MYPYPATH = {toxinidir}/stubs
deps=mypy-lang
commands=mypy rabbithole tests

[testenv]
passenv = TRAVIS TRAVIS_JOB_ID TRAVIS_BRANCH
deps = -r{toxinidir}/requirements/test.txt
setenv =
PYTHONPATH = {toxinidir}:{toxinidir}/rabbithole
commands =
Expand Down

0 comments on commit 62bb2ef

Please sign in to comment.