Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions events/code_review_events/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ def main():
"events",
required=("admins", "PHABRICATOR", "repositories"),
existing=dict(
APP_CHANNEL="development",
admins=["babadie@mozilla.com", "mcastelluccio@mozilla.com"],
repositories=[],
user_blacklist=[],
autoland_enabled=False,
),
local_secrets=yaml.safe_load(args.configuration)
if args.configuration
Expand Down
89 changes: 68 additions & 21 deletions events/code_review_events/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

logger = structlog.get_logger(__name__)

PULSE_TASK_GROUP_RESOLVED = "exchange/taskcluster-queue/v1/task-group-resolved"
PULSE_TASK_COMPLETED = "exchange/taskcluster-queue/v1/task-completed"


class CodeReview(PhabricatorActions):
"""
Expand All @@ -43,7 +46,7 @@ def __init__(
community_config=None,
user_blacklist=[],
*args,
**kwargs
**kwargs,
):
super().__init__(*args, **kwargs)
self.publish = publish
Expand Down Expand Up @@ -167,7 +170,10 @@ def is_blacklisted(self, revision: dict):
return True

def publish_results(self, payload):
assert self.publish is True, "Publication disabled"
if not self.publish:
Comment thread
La0 marked this conversation as resolved.
logger.debug("Skipping Phabricator publication")
return

mode, build, extras = payload
logger.debug("Publishing a Phabricator build update", mode=mode, build=build)

Expand Down Expand Up @@ -219,7 +225,45 @@ def publish_results(self, payload):
return True

def parse_pulse(self, payload):
assert self.publish is True, "Publication disabled"

routing = payload["routing"]

# Process autoland payloads
if routing["exchange"] == PULSE_TASK_GROUP_RESOLVED:
Comment thread
La0 marked this conversation as resolved.
try:
self.trigger_autoland(payload["body"])
Comment thread
La0 marked this conversation as resolved.
except Exception as e:
logger.warn(
"Autoland trigger failure", key=routing["key"], error=str(e)
)
else:
logger.debug("Skipping pulse message", key=routing["key"])

def trigger_autoland(self, body: dict):
"""
Trigger a code review autoland ingestion task
If the task is an autoland decision task
"""
# Load first task in task group, check if it's an autoland
queue = taskcluster_config.get_service("queue")
task_group_id = body["taskGroupId"]
logger.info("Checking autoland task", task_group_id=task_group_id)
task = queue.task(task_group_id)
Comment thread
La0 marked this conversation as resolved.
repo = task["payload"]["env"].get("GECKO_HEAD_REPOSITORY")
if repo != "https://hg.mozilla.org/integration/autoland":
logger.info("Not an autoland task", task=task_group_id)
return

# Trigger the autoland ingestion task
env = taskcluster_config.secrets["APP_CHANNEL"]
hooks = taskcluster_config.get_service("hooks")
task = hooks.triggerHook(
"project-relman",
f"code-review-{env}",
{"AUTOLAND_TASK_GROUP_ID": task_group_id},
)
task_id = task["status"]["taskId"]
logger.info("Triggered a new autoland ingestion task", id=task_id)

async def start_risk_analysis(self, build):
"""
Expand Down Expand Up @@ -314,25 +358,31 @@ def __init__(self, cache_root):
self.webserver = WebServer(QUEUE_WEB_BUILDS)
self.webserver.register(self.bus)

# Create pulse listener for unit test failures
# Create pulse listener
exchanges = []
if taskcluster_config.secrets["autoland_enabled"]:
logger.info("Autoland ingestion is enabled")
exchanges += [
# autoland ingestion
(PULSE_TASK_GROUP_RESOLVED, "#.gecko-level-3.#")
]
if publish:
# unit test failures
exchanges += [(PULSE_TASK_COMPLETED, ["*.*.gecko-level-3._"])]

if exchanges:
self.pulse = PulseListener(
QUEUE_PULSE,
[
(
"exchange/taskcluster-queue/v1/task-completed",
["*.*.gecko-level-3._"],
)
],
exchanges,
taskcluster_config.secrets["pulse_user"],
taskcluster_config.secrets["pulse_password"],
)

# Manually register to set queue as redis
self.pulse.bus = self.bus
self.bus.add_queue(QUEUE_PULSE, redis=True)
else:
self.pulse = None
self.bus.add_queue(QUEUE_PULSE, redis=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we only need this when we set pulse, as it was before?

else:
self.webserver = None
self.pulse = None
Expand Down Expand Up @@ -386,16 +436,13 @@ def run(self):

# Code review main workflow
if self.workflow:
consumers.append(self.workflow.run())

# Publish results on Phabricator
if self.workflow.publish:
consumers += [
self.bus.run(
self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS
),
self.bus.run(self.workflow.parse_pulse, QUEUE_PULSE),
]
consumers += [
self.workflow.run(),
# Publish results on Phabricator
self.bus.run(self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS),
# Parse and redirect pulse messages
self.bus.run(self.workflow.parse_pulse, QUEUE_PULSE),
]

# Add mercurial task
if self.mercurial:
Expand Down