diff --git a/events/code_review_events/cli.py b/events/code_review_events/cli.py index fe6cc0b65..775b066e7 100644 --- a/events/code_review_events/cli.py +++ b/events/code_review_events/cli.py @@ -47,9 +47,11 @@ def main(): "events", required=("admins", "PHABRICATOR", "repositories"), existing=dict( + APP_CHANNEL="development", admins=["babadie@mozilla.com", "mcastelluccio@mozilla.com"], repositories=[], user_blacklist=[], + autoland_enabled=False, ), local_secrets=yaml.safe_load(args.configuration) if args.configuration diff --git a/events/code_review_events/workflow.py b/events/code_review_events/workflow.py index e2a652cb2..0e8a6b92a 100644 --- a/events/code_review_events/workflow.py +++ b/events/code_review_events/workflow.py @@ -29,6 +29,9 @@ logger = structlog.get_logger(__name__) +PULSE_TASK_GROUP_RESOLVED = "exchange/taskcluster-queue/v1/task-group-resolved" +PULSE_TASK_COMPLETED = "exchange/taskcluster-queue/v1/task-completed" + class CodeReview(PhabricatorActions): """ @@ -43,7 +46,7 @@ def __init__( community_config=None, user_blacklist=[], *args, - **kwargs + **kwargs, ): super().__init__(*args, **kwargs) self.publish = publish @@ -167,7 +170,10 @@ def is_blacklisted(self, revision: dict): return True def publish_results(self, payload): - assert self.publish is True, "Publication disabled" + if not self.publish: + logger.debug("Skipping Phabricator publication") + return + mode, build, extras = payload logger.debug("Publishing a Phabricator build update", mode=mode, build=build) @@ -219,7 +225,45 @@ def publish_results(self, payload): return True def parse_pulse(self, payload): - assert self.publish is True, "Publication disabled" + + routing = payload["routing"] + + # Process autoland payloads + if routing["exchange"] == PULSE_TASK_GROUP_RESOLVED: + try: + self.trigger_autoland(payload["body"]) + except Exception as e: + logger.warn( + "Autoland trigger failure", key=routing["key"], error=str(e) + ) + else: + logger.debug("Skipping pulse message", key=routing["key"]) + + def trigger_autoland(self, body: dict): + """ + Trigger a code review autoland ingestion task + If the task is an autoland decision task + """ + # Load first task in task group, check if it's an autoland + queue = taskcluster_config.get_service("queue") + task_group_id = body["taskGroupId"] + logger.info("Checking autoland task", task_group_id=task_group_id) + task = queue.task(task_group_id) + repo = task["payload"]["env"].get("GECKO_HEAD_REPOSITORY") + if repo != "https://hg.mozilla.org/integration/autoland": + logger.info("Not an autoland task", task=task_group_id) + return + + # Trigger the autoland ingestion task + env = taskcluster_config.secrets["APP_CHANNEL"] + hooks = taskcluster_config.get_service("hooks") + task = hooks.triggerHook( + "project-relman", + f"code-review-{env}", + {"AUTOLAND_TASK_GROUP_ID": task_group_id}, + ) + task_id = task["status"]["taskId"] + logger.info("Triggered a new autoland ingestion task", id=task_id) async def start_risk_analysis(self, build): """ @@ -314,25 +358,31 @@ def __init__(self, cache_root): self.webserver = WebServer(QUEUE_WEB_BUILDS) self.webserver.register(self.bus) - # Create pulse listener for unit test failures + # Create pulse listener + exchanges = [] + if taskcluster_config.secrets["autoland_enabled"]: + logger.info("Autoland ingestion is enabled") + exchanges += [ + # autoland ingestion + (PULSE_TASK_GROUP_RESOLVED, "#.gecko-level-3.#") + ] if publish: + # unit test failures + exchanges += [(PULSE_TASK_COMPLETED, ["*.*.gecko-level-3._"])] + + if exchanges: self.pulse = PulseListener( QUEUE_PULSE, - [ - ( - "exchange/taskcluster-queue/v1/task-completed", - ["*.*.gecko-level-3._"], - ) - ], + exchanges, taskcluster_config.secrets["pulse_user"], taskcluster_config.secrets["pulse_password"], ) # Manually register to set queue as redis self.pulse.bus = self.bus - self.bus.add_queue(QUEUE_PULSE, redis=True) else: self.pulse = None + self.bus.add_queue(QUEUE_PULSE, redis=True) else: self.webserver = None self.pulse = None @@ -386,16 +436,13 @@ def run(self): # Code review main workflow if self.workflow: - consumers.append(self.workflow.run()) - - # Publish results on Phabricator - if self.workflow.publish: - consumers += [ - self.bus.run( - self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS - ), - self.bus.run(self.workflow.parse_pulse, QUEUE_PULSE), - ] + consumers += [ + self.workflow.run(), + # Publish results on Phabricator + self.bus.run(self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS), + # Parse and redirect pulse messages + self.bus.run(self.workflow.parse_pulse, QUEUE_PULSE), + ] # Add mercurial task if self.mercurial: