diff --git a/libmozevent/bus.py b/libmozevent/bus.py index df17e7f..a61ea21 100644 --- a/libmozevent/bus.py +++ b/libmozevent/bus.py @@ -6,7 +6,7 @@ import os import pickle from queue import Empty -from typing import Callable +from typing import Any, Callable import structlog @@ -27,15 +27,16 @@ def __init__(self): self.redis_enabled = "REDIS_URL" in os.environ logger.info("Redis support", enabled=self.redis_enabled and "yes" or "no") - def add_queue(self, name, mp=False, redis=False, maxsize=-1): + def add_queue( + self, name: str, mp: bool = False, redis: bool = False, maxsize: int = -1 + ): """ Create a new queue on the message bus * asyncio by default * multiprocessing when mp=True By default, there are no size limit enforced (maxsize=-1) """ - assert name not in self.queues, "Queue {} already setup".format(name) - assert isinstance(maxsize, int) + assert name not in self.queues, f"Queue {name} already setup" if self.redis_enabled and redis: self.queues[name] = RedisQueue(f"libmozevent:{name}") elif mp: @@ -43,11 +44,11 @@ def add_queue(self, name, mp=False, redis=False, maxsize=-1): else: self.queues[name] = asyncio.Queue(maxsize=maxsize) - async def send(self, name, payload): + async def send(self, name: str, payload: Any): """ Send a message on a specific queue """ - assert name in self.queues, "Missing queue {}".format(name) + assert name in self.queues, f"Missing queue {name}" queue = self.queues[name] if isinstance(queue, RedisQueue): @@ -63,12 +64,12 @@ async def send(self, name, payload): None, lambda: queue.put(payload) ) - async def receive(self, name): + async def receive(self, name: str): """ Wait for a message on a specific queue This is a blocking operation """ - assert name in self.queues, "Missing queue {}".format(name) + assert name in self.queues, f"Missing queue {name}" queue = self.queues[name] logger.debug("Wait for message on bus", queue=name, instance=queue) @@ -111,11 +112,11 @@ async def run( Optionally applies some conversions methods This is also the "ideal" usage between 2 queues """ - assert input_name in self.queues, "Missing queue {}".format(input_name) + assert input_name in self.queues, f"Missing queue {input_name}" for output_name in output_names: assert ( output_name is None or output_name in self.queues - ), "Missing queue {}".format(output_name) + ), f"Missing queue {output_name}" assert ( sequential is True or len(output_names) == 0