Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions libmozevent/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import pickle
from queue import Empty
from typing import Callable
from typing import Any, Callable

import structlog

Expand All @@ -27,27 +27,28 @@ def __init__(self):
self.redis_enabled = "REDIS_URL" in os.environ
logger.info("Redis support", enabled=self.redis_enabled and "yes" or "no")

def add_queue(self, name, mp=False, redis=False, maxsize=-1):
def add_queue(
self, name: str, mp: bool = False, redis: bool = False, maxsize: int = -1
):
"""
Create a new queue on the message bus
* asyncio by default
* multiprocessing when mp=True
By default, there are no size limit enforced (maxsize=-1)
"""
assert name not in self.queues, "Queue {} already setup".format(name)
assert isinstance(maxsize, int)
assert name not in self.queues, f"Queue {name} already setup"
if self.redis_enabled and redis:
self.queues[name] = RedisQueue(f"libmozevent:{name}")
elif mp:
self.queues[name] = multiprocessing.Queue(maxsize=maxsize)
else:
self.queues[name] = asyncio.Queue(maxsize=maxsize)

async def send(self, name, payload):
async def send(self, name: str, payload: Any):
"""
Send a message on a specific queue
"""
assert name in self.queues, "Missing queue {}".format(name)
assert name in self.queues, f"Missing queue {name}"
queue = self.queues[name]

if isinstance(queue, RedisQueue):
Expand All @@ -63,12 +64,12 @@ async def send(self, name, payload):
None, lambda: queue.put(payload)
)

async def receive(self, name):
async def receive(self, name: str):
"""
Wait for a message on a specific queue
This is a blocking operation
"""
assert name in self.queues, "Missing queue {}".format(name)
assert name in self.queues, f"Missing queue {name}"
queue = self.queues[name]

logger.debug("Wait for message on bus", queue=name, instance=queue)
Expand Down Expand Up @@ -111,11 +112,11 @@ async def run(
Optionally applies some conversions methods
This is also the "ideal" usage between 2 queues
"""
assert input_name in self.queues, "Missing queue {}".format(input_name)
assert input_name in self.queues, f"Missing queue {input_name}"
for output_name in output_names:
assert (
output_name is None or output_name in self.queues
), "Missing queue {}".format(output_name)
), f"Missing queue {output_name}"

assert (
sequential is True or len(output_names) == 0
Expand Down