Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new command to check backfills completion on last pushes #736

Merged
merged 4 commits into from May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions mozci/console/application.py
Expand Up @@ -4,12 +4,14 @@

from cleo import Application

from mozci.console.commands.check_backfills import CheckBackfillsCommand
from mozci.console.commands.decision import DecisionCommand
from mozci.console.commands.push import PushCommands


def cli():
application = Application()
application.add(CheckBackfillsCommand())
application.add(PushCommands())
application.add(DecisionCommand())
application.run()
Expand Down
167 changes: 167 additions & 0 deletions mozci/console/commands/check_backfills.py
@@ -0,0 +1,167 @@
# -*- coding: utf-8 -*-
import os
from collections import namedtuple
from itertools import groupby

import requests
from cleo import Command
from loguru import logger

from mozci import config
from mozci.push import make_push_objects
from mozci.util.taskcluster import (
COMMUNITY_TASKCLUSTER_ROOT_URL,
find_task_id,
index_current_task,
list_dependent_tasks,
list_indexed_tasks,
notify_matrix,
)

BackfillTask = namedtuple("BackfillTask", ["task_id", "th_symbol", "state"])

NOTIFICATION_BACKFILL_GROUP_COMPLETED = "Backfill tasks associated to the Treeherder symbol {th_symbol} for push {push.branch}/{push.rev} are all completed."
marco-c marked this conversation as resolved.
Show resolved Hide resolved


class CheckBackfillsCommand(Command):
"""
Check if backfills on last pushes are finished and notify Sheriffs when they are.

check-backfills
{--branch=autoland : Branch the pushes belongs to (e.g autoland, try, etc).}
{--nb-pushes=100 : Number of recent pushes to retrieve for the check.}
{--environment=testing : Environment in which the analysis is running (testing, production, ...)}

The command will execute the following workflow:
1. Retrieve the last <--nb-pushes> Pushes on branch <--branch>
=> Then for each Push:
2. Check if it has any associated actions triggered by Treeherder, using Taskcluster indexation
3. Find potential backfill tasks
4. Group them by backfill groups
=> For each backfill group on the Push:
5. Check if all backfill tasks in this group are completed
6. If so (and if the notification wasn't already sent), send a notification on Matrix alerting that this backfill group is completed
7. Add the current task in a dedicated index to avoid sending multiple times the same notification
"""

def handle(self) -> None:
branch = self.option("branch")
environment = self.option("environment")
matrix_room = config.get("matrix-room-id")
current_task_id = os.environ.get("TASK_ID")

try:
nb_pushes = int(self.option("nb-pushes"))
except ValueError:
self.line("<error>Provided --nb-pushes should be an int.</error>")
exit(1)

self.line("<comment>Loading pushes...</comment>")
self.pushes = make_push_objects(nb=nb_pushes, branch=branch)
nb_pushes = len(self.pushes)

for index, push in enumerate(self.pushes, start=1):
self.line(
f"<comment>Processing push {index}/{nb_pushes}: {push.push_uuid}</comment>"
)
backfill_tasks = []

try:
indexed_tasks = list_indexed_tasks(
f"gecko.v2.{push.branch}.revision.{push.rev}.taskgraph.actions"
)
except requests.exceptions.HTTPError as e:
self.line(
f"<error>Couldn't fetch indexed tasks on push {push.push_uuid}: {e}</error>"
)
continue

for indexed_task in indexed_tasks:
task_id = indexed_task["taskId"]
try:
children_tasks = list_dependent_tasks(task_id)
except requests.exceptions.HTTPError as e:
self.line(
f"<error>Couldn't fetch dependent tasks of indexed task {task_id} on push {push.push_uuid}: {e}</error>"
)
continue

for child_task in children_tasks:
task_action = (
child_task.get("task", {}).get("tags", {}).get("action", "")
)
# We are looking for the Treeherder symbol because Sheriffs are
# only interested in backfill-tasks holding the '-bk' suffix in TH
th_symbol = (
child_task.get("task", {})
.get("extra", {})
.get("treeherder", {})
.get("symbol", "")
)
status = child_task.get("status", {})
if task_action == "backfill-task" and th_symbol.endswith("-bk"):
assert status.get(
"taskId"
), "Missing taskId attribute in backfill task status"
assert status.get(
"state"
), "Missing state attribute in backfill task status"
backfill_tasks.append(
BackfillTask(status["taskId"], th_symbol, status["state"])
)
else:
logger.debug(
f"Skipping non-backfill task {status.get('taskId')}"
)

def group_key(task):
return task.th_symbol

# Sorting backfill tasks by their Treeherder symbol
backfill_tasks = sorted(backfill_tasks, key=group_key)
# Grouping ordered backfill tasks by their associated Treeherder symbol
for th_symbol, value in groupby(backfill_tasks, group_key):
tasks = list(value)
if all(task.state == "completed" for task in tasks):
index_path = f"project.mozci.check-backfill.{environment}.{push.branch}.{push.rev}.{th_symbol}"
try:
find_task_id(
index_path, root_url=COMMUNITY_TASKCLUSTER_ROOT_URL
)
except requests.exceptions.HTTPError:
pass
else:
logger.debug(
f"A notification was already sent for the backfill tasks associated to the Treeherder symbol {th_symbol}."
)
continue

notification = NOTIFICATION_BACKFILL_GROUP_COMPLETED.format(
th_symbol=th_symbol,
push=push,
)

if not matrix_room:
self.line(
f"<comment>A notification should be sent for the backfill tasks associated to the Treeherder symbol {th_symbol} but no matrix room was provided in the secret.</comment>"
)
logger.debug(f"The notification: {notification}")
continue

# Sending a notification to the Matrix channel defined in secret
notify_matrix(
room=matrix_room,
body=notification,
)

if not current_task_id:
self.line(
f"<comment>The current task should be indexed in {index_path} but TASK_ID environment variable isn't set.</comment>"
)
continue

# Populating the index with the current task to prevent sending the notification once again
index_current_task(
index_path,
root_url=COMMUNITY_TASKCLUSTER_ROOT_URL,
)
59 changes: 59 additions & 0 deletions mozci/util/taskcluster.py
Expand Up @@ -3,7 +3,9 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import datetime
import os
from urllib.parse import urlencode

import markdown2
import taskcluster
Expand Down Expand Up @@ -81,6 +83,48 @@ def find_task_id(index_path, use_proxy=False, root_url=PRODUCTION_TASKCLUSTER_RO
return response.json()["taskId"]


def index_current_task(
index_path,
rank=0,
expires=None,
data={},
root_url=PRODUCTION_TASKCLUSTER_ROOT_URL,
):
if expires is None:
expires = datetime.datetime.now() + datetime.timedelta(days=1 * 365)

response = _do_request(
get_index_url(index_path, root_url=root_url),
data={
"data": data,
"expires": expires,
"rank": rank,
"taskId": os.environ["TASK_ID"],
},
)
return response.json()


def get_indexed_tasks_url(namespace, root_url=PRODUCTION_TASKCLUSTER_ROOT_URL):
return liburls.api(
root_url,
"index",
"v1",
f"tasks/{namespace}",
)


def list_indexed_tasks(namespace, root_url=PRODUCTION_TASKCLUSTER_ROOT_URL):
url = get_indexed_tasks_url(namespace, root_url=root_url)
token = False
# Support pagination using continuation token
while token is not None:
extra_params = "?" + urlencode({"continuationToken": token}) if token else ""
results = _do_request(url + extra_params).json()
yield from results.get("tasks", [])
token = results.get("continuationToken")


def get_task_url(task_id):
return liburls.api(
PRODUCTION_TASKCLUSTER_ROOT_URL, "queue", "v1", f"task/{task_id}"
Expand All @@ -91,6 +135,21 @@ def get_task(task_id, use_proxy=False):
return queue.task(task_id)


def get_dependent_tasks_url(task_id, root_url=PRODUCTION_TASKCLUSTER_ROOT_URL):
return liburls.api(root_url, "queue", "v1", f"task/{task_id}/dependents")


def list_dependent_tasks(task_id, root_url=PRODUCTION_TASKCLUSTER_ROOT_URL):
url = get_dependent_tasks_url(task_id, root_url=root_url)
token = False
# Support pagination using continuation token
while token is not None:
extra_params = "?" + urlencode({"continuationToken": token}) if token else ""
results = _do_request(url + extra_params).json()
yield from results.get("tasks", [])
token = results.get("continuationToken")


def create_task(task_id, task):
options = taskcluster.optionsFromEnvironment()
options["rootUrl"] = PRODUCTION_TASKCLUSTER_ROOT_URL
Expand Down