From 28034b3bf53ff2dfdd26e3b64c44b09390d4803a Mon Sep 17 00:00:00 2001 From: Bastien Abadie Date: Tue, 3 Dec 2019 11:56:31 +0100 Subject: [PATCH 1/4] pulse: Support routing by exchange/topic, fixes #28. --- libmozevent/pulse.py | 70 ++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/libmozevent/pulse.py b/libmozevent/pulse.py index 5ddb4ee..2b22c15 100644 --- a/libmozevent/pulse.py +++ b/libmozevent/pulse.py @@ -5,13 +5,14 @@ import asyncio import json -from typing import List, Tuple +from typing import Dict, List, Tuple import aioamqp import structlog logger = structlog.get_logger(__name__) +BusQueue = str Queue = str Route = str PulseBinding = Tuple[Queue, List[Route]] @@ -92,13 +93,11 @@ class PulseListener(object): def __init__( self, - output_queue_name, - queues_routes: List[PulseBinding], + queues_routes: Dict[BusQueue, List[PulseBinding]], user, password, virtualhost="/", ): - self.queue_name = output_queue_name self.queues_routes = queues_routes self.user = user self.password = password @@ -106,16 +105,21 @@ def __init__( def register(self, bus): self.bus = bus - self.bus.add_queue(self.queue_name) + for name in self.queues_routes: + self.bus.add_queue(name) async def connect(self): - protocol = await create_pulse_listener( - self.user, - self.password, - self.queues_routes, - self.got_message, - self.virtualhost, - ) + listeners = [ + create_pulse_listener( + self.user, + self.password, + queues_routes, + self.build_callback(bus_queue), + self.virtualhost, + ) + for bus_queue, queues_routes in self.queues_routes.items() + ] + protocol = await asyncio.gather(*listeners) logger.info( "Worker starts consuming messages", queues_routes=self.queues_routes ) @@ -130,33 +134,37 @@ async def run(self): # Check pulse server is still connected # AmqpClosedConnection will be thrown otherwise - await pulse.ensure_open() + await asyncio.gather(p.ensure_open() for p in pulse) await asyncio.sleep(7) - except (aioamqp.AmqpClosedConnection, OSError) as e: + except (aioamqp.AmqpClosedConnection, OSError, RuntimeError) as e: logger.exception("Reconnecting pulse client in 5 seconds", error=str(e)) pulse = None await asyncio.sleep(5) - async def got_message(self, channel, body, envelope, properties): + def build_callback(self, bus_queue: BusQueue): """ - Generic Pulse consumer callback + Build a generic Pulse consumer callback, attached to an inner queue """ - assert isinstance(body, bytes), "Body is not in bytes" - # Build routing information to identify the payload source - routing = { - "exchange": envelope.exchange_name, - "key": envelope.routing_key, - "other_routes": properties.headers.get("CC", []), - } + async def got_message(channel, body, envelope, properties): + assert isinstance(body, bytes), "Body is not in bytes" + + # Build routing information to identify the payload source + routing = { + "exchange": envelope.exchange_name, + "key": envelope.routing_key, + "other_routes": properties.headers.get("CC", []), + } + + # Automatically decode json payloads + if properties.content_type == "application/json": + body = json.loads(body) - # Automatically decode json payloads - if properties.content_type == "application/json": - body = json.loads(body) + # Push the message in the message bus + logger.debug("Received a pulse message") + await self.bus.send(bus_queue, {"routing": routing, "body": body}) - # Push the message in the message bus - logger.debug("Received a pulse message") - await self.bus.send(self.queue_name, {"routing": routing, "body": body}) + # Ack the message so it is removed from the broker's queue + await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) - # Ack the message so it is removed from the broker's queue - await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) + return got_message From 794501eceb9aa8f12533c8f845160ead9911bd8f Mon Sep 17 00:00:00 2001 From: Bastien Abadie Date: Tue, 3 Dec 2019 12:39:59 +0100 Subject: [PATCH 2/4] pulse: Detect the internal queues that can route a message --- libmozevent/pulse.py | 84 +++++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 32 deletions(-) diff --git a/libmozevent/pulse.py b/libmozevent/pulse.py index 2b22c15..ffaef99 100644 --- a/libmozevent/pulse.py +++ b/libmozevent/pulse.py @@ -4,6 +4,8 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio +import fnmatch +import itertools import json from typing import Dict, List, Tuple @@ -109,17 +111,13 @@ def register(self, bus): self.bus.add_queue(name) async def connect(self): - listeners = [ - create_pulse_listener( - self.user, - self.password, - queues_routes, - self.build_callback(bus_queue), - self.virtualhost, - ) - for bus_queue, queues_routes in self.queues_routes.items() - ] - protocol = await asyncio.gather(*listeners) + protocol = await create_pulse_listener( + self.user, + self.password, + itertools.chain(*self.queues_routes.values()), + self.got_message, + self.virtualhost, + ) logger.info( "Worker starts consuming messages", queues_routes=self.queues_routes ) @@ -134,37 +132,59 @@ async def run(self): # Check pulse server is still connected # AmqpClosedConnection will be thrown otherwise - await asyncio.gather(p.ensure_open() for p in pulse) + await pulse.ensure_open() await asyncio.sleep(7) - except (aioamqp.AmqpClosedConnection, OSError, RuntimeError) as e: + except (aioamqp.AmqpClosedConnection, OSError) as e: logger.exception("Reconnecting pulse client in 5 seconds", error=str(e)) pulse = None await asyncio.sleep(5) - def build_callback(self, bus_queue: BusQueue): + def find_matching_queues(self, payload_exchange: str, payload_routes: List[bytes]): """ - Build a generic Pulse consumer callback, attached to an inner queue + Detect all the bus that match the current routing """ - async def got_message(channel, body, envelope, properties): - assert isinstance(body, bytes), "Body is not in bytes" + def _match(exchange, route): - # Build routing information to identify the payload source - routing = { - "exchange": envelope.exchange_name, - "key": envelope.routing_key, - "other_routes": properties.headers.get("CC", []), - } + # Exchanges must match exactly + if payload_exchange != exchange: + return False - # Automatically decode json payloads - if properties.content_type == "application/json": - body = json.loads(body) + # One of the pauload routes must match the current route + route = route.replace("#", "*").encode("utf-8") + return len(fnmatch.filter(payload_routes, route)) > 0 - # Push the message in the message bus - logger.debug("Received a pulse message") - await self.bus.send(bus_queue, {"routing": routing, "body": body}) + return { + bus_queue + for bus_queue, queue_routes in self.queues_routes.items() + for pulse_exchange, pulse_routes in queue_routes + for pulse_route in pulse_routes + if _match(pulse_exchange, pulse_route) + } - # Ack the message so it is removed from the broker's queue - await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) + async def got_message(self, channel, body, envelope, properties): + """ + Generic Pulse consumer callback that detects all matching bus queues + to automatically route the pulse messages + """ + assert isinstance(body, bytes), "Body is not in bytes" + + # Build routing information to identify the payload source + routing = { + "exchange": envelope.exchange_name, + "key": envelope.routing_key, + "other_routes": properties.headers.get("CC", []), + } + + # Automatically decode json payloads + if properties.content_type == "application/json": + body = json.loads(body) + + # Push the message in the message bus + logger.debug("Received a pulse message") + routes = [routing["key"].encode("utf-8")] + routing["other_routes"] + for bus_queue in self.find_matching_queues(routing["exchange"], routes): + await self.bus.send(bus_queue, {"routing": routing, "body": body}) - return got_message + # Ack the message so it is removed from the broker's queue + await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) From 02531fc6ec8969ed27bfa88f0a241285561f6019 Mon Sep 17 00:00:00 2001 From: Bastien Abadie Date: Tue, 3 Dec 2019 12:47:08 +0100 Subject: [PATCH 3/4] Add unit test for matching mechanism --- tests/test_pulse.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/test_pulse.py diff --git a/tests/test_pulse.py b/tests/test_pulse.py new file mode 100644 index 0000000..534b6ab --- /dev/null +++ b/tests/test_pulse.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +from libmozevent.pulse import PulseListener + + +def test_matching_routes(): + """ + Test the queue detection for a pulse message + """ + config = { + "queue_A": [("exchange/x", ["#"])], + "queue_B": [("exchange/x", ["prefix.#"]), ("exchange/y", ["#-gecko-#"])], + } + pulse = PulseListener(config, "user", "password") + + # Simple test case + assert pulse.find_matching_queues("exchange/x", [b"whatever"]) == {"queue_A"} + + # Bad exchange + assert not pulse.find_matching_queues("exchange/YY", [b"whatever"]) + + # get A and B + assert pulse.find_matching_queues("exchange/x", [b"prefix.sometask.XYZ"]) == { + "queue_A", + "queue_B", + } + + # Only gecko + assert pulse.find_matching_queues("exchange/y", [b"something-gecko-123"]) == { + "queue_B" + } From 033f77690bd6d10aa02fd0050f0fb948c76214d6 Mon Sep 17 00:00:00 2001 From: Bastien Abadie Date: Tue, 3 Dec 2019 13:04:38 +0100 Subject: [PATCH 4/4] pulse: Add bugbug test cases --- tests/test_pulse.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_pulse.py b/tests/test_pulse.py index 534b6ab..323b95b 100644 --- a/tests/test_pulse.py +++ b/tests/test_pulse.py @@ -29,3 +29,42 @@ def test_matching_routes(): assert pulse.find_matching_queues("exchange/y", [b"something-gecko-123"]) == { "queue_B" } + + +def test_bugbug_routes(): + """ + Test the bugbug routes match our expected configuration + """ + config = { + "bugbug_firefox": [ + ("exchange/taskcluster-queue/v1/task-completed", ["#.gecko-level-1.#"]), + ("exchange/taskcluster-queue/v1/task-failed", ["#.gecko-level-1.#"]), + ], + "bugbug_community": [ + ( + "exchange/taskcluster-queue/v1/task-completed", + ["route.project.relman.bugbug.test_select"], + ) + ], + } + pulse = PulseListener(config, "user", "password") + + assert pulse.find_matching_queues( + "exchange/taskcluster-queue/v1/task-completed", + [ + b"primary.fUAKaIdkSF6K1NlOgx7-LA.0.aws.i-0a45c84b1709af6a7.gecko-t.t-win10-64.gecko-level-1.RHY-YSgBQ7KlTAaQ5ZWP5g._", + b"route.tc-treeherder.v2.try.028980a035fb3e214f7645675a01a52234aad0fe.455891", + ], + ) == {"bugbug_firefox"} + + assert pulse.find_matching_queues( + "exchange/taskcluster-queue/v1/task-completed", + [ + b"primary.OhtlizLqT9ah2jVkUL-yvg.0.community-tc-workers-google.8155538221748661937.proj-relman.compute-large.-.OhtlizLqT9ah2jVkUL-yvg._", + b"route.notify.email.release-mgmt-analysis@mozilla.com.on-failed", + b"route.notify.irc-channel.#bugbug.on-failed", + b"route.index.project.relman.bugbug.test_select.latest", + b"route.index.project.relman.bugbug.test_select.diff.196676", + b"route.project.relman.bugbug.test_select", + ], + ) == {"bugbug_community"}