Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions libmozevent/pulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import asyncio
import fnmatch
import itertools
import json
from typing import List, Tuple
from typing import Dict, List, Tuple

import aioamqp
import structlog

logger = structlog.get_logger(__name__)

BusQueue = str
Queue = str
Route = str
PulseBinding = Tuple[Queue, List[Route]]
Expand Down Expand Up @@ -92,27 +95,26 @@ class PulseListener(object):

def __init__(
self,
output_queue_name,
queues_routes: List[PulseBinding],
queues_routes: Dict[BusQueue, List[PulseBinding]],
user,
password,
virtualhost="/",
):
self.queue_name = output_queue_name
self.queues_routes = queues_routes
self.user = user
self.password = password
self.virtualhost = virtualhost

def register(self, bus):
self.bus = bus
self.bus.add_queue(self.queue_name)
for name in self.queues_routes:
self.bus.add_queue(name)

async def connect(self):
protocol = await create_pulse_listener(
self.user,
self.password,
self.queues_routes,
itertools.chain(*self.queues_routes.values()),
self.got_message,
self.virtualhost,
)
Expand All @@ -137,9 +139,33 @@ async def run(self):
pulse = None
await asyncio.sleep(5)

def find_matching_queues(self, payload_exchange: str, payload_routes: List[bytes]):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd probably be better to use another of the solutions (especially the multiple channels one), but we can do it in a follow-up. Can you file it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Detect all the bus that match the current routing
"""

def _match(exchange, route):

# Exchanges must match exactly
if payload_exchange != exchange:
return False

# One of the pauload routes must match the current route
route = route.replace("#", "*").encode("utf-8")
return len(fnmatch.filter(payload_routes, route)) > 0

return {
bus_queue
for bus_queue, queue_routes in self.queues_routes.items()
for pulse_exchange, pulse_routes in queue_routes
for pulse_route in pulse_routes
if _match(pulse_exchange, pulse_route)
}

async def got_message(self, channel, body, envelope, properties):
"""
Generic Pulse consumer callback
Generic Pulse consumer callback that detects all matching bus queues
to automatically route the pulse messages
"""
assert isinstance(body, bytes), "Body is not in bytes"

Expand All @@ -156,7 +182,9 @@ async def got_message(self, channel, body, envelope, properties):

# Push the message in the message bus
logger.debug("Received a pulse message")
await self.bus.send(self.queue_name, {"routing": routing, "body": body})
routes = [routing["key"].encode("utf-8")] + routing["other_routes"]
for bus_queue in self.find_matching_queues(routing["exchange"], routes):
await self.bus.send(bus_queue, {"routing": routing, "body": body})

# Ack the message so it is removed from the broker's queue
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
70 changes: 70 additions & 0 deletions tests/test_pulse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-

from libmozevent.pulse import PulseListener


def test_matching_routes():
Comment thread
La0 marked this conversation as resolved.
"""
Test the queue detection for a pulse message
"""
config = {
"queue_A": [("exchange/x", ["#"])],
"queue_B": [("exchange/x", ["prefix.#"]), ("exchange/y", ["#-gecko-#"])],
}
pulse = PulseListener(config, "user", "password")

# Simple test case
assert pulse.find_matching_queues("exchange/x", [b"whatever"]) == {"queue_A"}

# Bad exchange
assert not pulse.find_matching_queues("exchange/YY", [b"whatever"])

# get A and B
assert pulse.find_matching_queues("exchange/x", [b"prefix.sometask.XYZ"]) == {
"queue_A",
"queue_B",
}

# Only gecko
assert pulse.find_matching_queues("exchange/y", [b"something-gecko-123"]) == {
"queue_B"
}


def test_bugbug_routes():
"""
Test the bugbug routes match our expected configuration
"""
config = {
"bugbug_firefox": [
("exchange/taskcluster-queue/v1/task-completed", ["#.gecko-level-1.#"]),
("exchange/taskcluster-queue/v1/task-failed", ["#.gecko-level-1.#"]),
],
"bugbug_community": [
(
"exchange/taskcluster-queue/v1/task-completed",
["route.project.relman.bugbug.test_select"],
)
],
}
pulse = PulseListener(config, "user", "password")

assert pulse.find_matching_queues(
"exchange/taskcluster-queue/v1/task-completed",
[
b"primary.fUAKaIdkSF6K1NlOgx7-LA.0.aws.i-0a45c84b1709af6a7.gecko-t.t-win10-64.gecko-level-1.RHY-YSgBQ7KlTAaQ5ZWP5g._",
b"route.tc-treeherder.v2.try.028980a035fb3e214f7645675a01a52234aad0fe.455891",
],
) == {"bugbug_firefox"}

assert pulse.find_matching_queues(
"exchange/taskcluster-queue/v1/task-completed",
[
b"primary.OhtlizLqT9ah2jVkUL-yvg.0.community-tc-workers-google.8155538221748661937.proj-relman.compute-large.-.OhtlizLqT9ah2jVkUL-yvg._",
b"route.notify.email.release-mgmt-analysis@mozilla.com.on-failed",
b"route.notify.irc-channel.#bugbug.on-failed",
b"route.index.project.relman.bugbug.test_select.latest",
b"route.index.project.relman.bugbug.test_select.diff.196676",
b"route.project.relman.bugbug.test_select",
],
) == {"bugbug_community"}