Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/update_test_file_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ jobs:
CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_HUD_USER_USERNAME }}
CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_HUD_USER_PASSWORD }}
run: |
YESTERDAY=$(date -u -d "yesterday" +%Y-%m-%d)
now=$(date +%s)
TWO_DAYS_AGO=$((now - 2*86400))
python3 tools/torchci/test_insights/file_report_generator.py \
--add-dates "$YESTERDAY"
--add-dates "$TWO_DAYS_AGO" "$now"

- name: Generate regression notification
env:
Expand Down
211 changes: 91 additions & 120 deletions tools/torchci/test_insights/daily_regression.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import datetime
import json
from collections import defaultdict
from functools import lru_cache
from typing import Any
from urllib.parse import quote

import requests
from torchci.test_insights.file_report_generator import FileReportGenerator
from torchci.test_insights.weekly_notification import send_to_aws_alerting_lambda

Expand Down Expand Up @@ -88,44 +86,24 @@ class RegressionNotification:
previous_regression_sha_key = (
"additional_info/weekly_file_report/regression_metadata.json.gz"
)
keys = [
"cost",
"time",
"skipped",
"success",
"failure",
"flaky",
]

def __init__(self):
self.file_report_generator = FileReportGenerator(dry_run=True)

@lru_cache
def _previous_regression_sha(self) -> str:
text = self.file_report_generator._fetch_from_s3(
"ossci-raw-job-status",
self.previous_regression_sha_key,
)
try:
return json.loads(text)[0]["sha"]
except (json.JSONDecodeError, IndexError, KeyError):
return ""

def upload_new_regression_sha(self, sha: str) -> None:
body = [{"sha": sha}]
self.file_report_generator.upload_to_s3(
body,
"ossci-raw-job-status",
self.previous_regression_sha_key,
)

def gen_regression_for_team(
self,
team: dict[str, Any],
prev_invoking_file_info: list[dict[str, Any]],
curr_invoking_file_info: list[dict[str, Any]],
status_changes: list[dict[str, Any]],
) -> dict[str, Any]:
relevant_status_changes = [
change for change in status_changes if team["condition"](change)
]
# Aggregate status changes
aggregated_status_changes = defaultdict(int)
for change in relevant_status_changes:
aggregated_status_changes[change["status"]] += 1

# Invoking_file_info diff
relevant_curr_invoking_file_info = [
info for info in curr_invoking_file_info if team["condition"](info)
Expand All @@ -141,40 +119,22 @@ def gen_regression_for_team(
]

def _sum_invoking_file_info(data: list[dict[str, Any]]) -> dict[str, Any]:
info = {
"count": sum(item["count"] for item in data),
"cost": sum(item["cost"] for item in data),
"time": sum(item["time"] for item in data),
"skipped": sum(item["skipped"] for item in data),
}
info = {}
for key in self.keys:
info[key] = sum(item[key] for item in data)
return info

agg_prev_file_info = _sum_invoking_file_info(relevant_prev_invoking_file_info)
agg_curr_file_info = _sum_invoking_file_info(relevant_curr_invoking_file_info)

invoking_file_info_diff = {
"count": {
"previous": agg_prev_file_info["count"],
"current": agg_curr_file_info["count"],
},
"cost": {
"previous": agg_prev_file_info["cost"],
"current": agg_curr_file_info["cost"],
},
"time": {
"previous": agg_prev_file_info["time"],
"current": agg_curr_file_info["time"],
},
"skipped": {
"previous": agg_prev_file_info["skipped"],
"current": agg_curr_file_info["skipped"],
},
}
invoking_file_info_diff = {}
for key in self.keys:
invoking_file_info_diff[key] = {
"previous": agg_prev_file_info[key],
"current": agg_curr_file_info[key],
}

return {
"status_changes": aggregated_status_changes,
"invoking_file_info": invoking_file_info_diff,
}
return invoking_file_info_diff

def filter_thresholds(self, regression: dict[str, Any]) -> bool:
def _exceeds_threshold(value: float, total: float) -> bool:
Expand All @@ -184,36 +144,20 @@ def _exceeds_threshold(value: float, total: float) -> bool:

return (value / total) >= percent_threshold

def _status_change_exceeds_threshold(field: str, total_field: str) -> bool:
return _exceeds_threshold(
regression["status_changes"].get(field, 0),
regression["invoking_file_info"][total_field]["previous"],
)

def _diff_exceeds_threshold(field: str) -> bool:
return _exceeds_threshold(
abs(
regression["invoking_file_info"][field]["current"]
- regression["invoking_file_info"][field]["previous"]
),
regression["invoking_file_info"][field]["previous"],
abs(regression[field]["current"] - regression[field]["previous"]),
regression[field]["previous"],
)

return (
_status_change_exceeds_threshold("removed", "count")
or _status_change_exceeds_threshold("added", "count")
or _status_change_exceeds_threshold("started_skipping", "skipped")
or _status_change_exceeds_threshold("stopped_skipping", "skipped")
or any(
_diff_exceeds_threshold(key)
for key in ["cost", "count", "skipped", "time"]
)
)
keys = self.keys.copy()
keys.remove("flaky")
return any(_diff_exceeds_threshold(key) for key in keys)

def format_regression_string(self, team, regression: dict[str, Any]) -> str:
def _get_change(field: str, additional_processing) -> str:
current = regression["invoking_file_info"][field]["current"]
previous = regression["invoking_file_info"][field]["previous"]
current = regression[field]["current"]
previous = regression[field]["previous"]
change = current - previous
percent_change = (change / previous) * 100 if previous != 0 else 0
percent_change = round(percent_change, 2)
Expand All @@ -226,14 +170,12 @@ def _get_change(field: str, additional_processing) -> str:
return (
f"Regression detected for Team:{team['team']}:\n"
+ f"Link: {team['link']}\n"
+ f"New tests: {regression['status_changes'].get('added', 0)}\n"
+ f"Removed tests: {regression['status_changes'].get('removed', 0)}\n"
+ f"Started skipping: {regression['status_changes'].get('started_skipping', 0)}\n"
+ f"Stopped skipping: {regression['status_changes'].get('stopped_skipping', 0)}\n"
+ f"Cost ($) change: {_get_change('cost', additional_processing=lambda x: round(x, 2))}\n"
+ f"Time (min) change: {_get_change('time', additional_processing=lambda x: round(x / 60, 2))}\n"
+ f"Test count change: {_get_change('count', additional_processing=lambda x: round(x, 2))}\n"
+ f"\\# skipped change: {_get_change('skipped', additional_processing=lambda x: round(x, 2))}\n"
+ f"\\# success change: {_get_change('success', additional_processing=lambda x: round(x, 2))}\n"
+ f"\\# failure change: {_get_change('failure', additional_processing=lambda x: round(x, 2))}\n"
# + f"\\# flaky change: {_get_change('flaky', additional_processing=lambda x: round(x, 2))}\n"
)

def generate_alert_json(
Expand Down Expand Up @@ -265,50 +207,80 @@ def generate_alert_json(
},
}

def get_representative_data_for_time(
self, start_date, stop_date
) -> list[dict[str, Any]]:
response = requests.get(
f"https://hud.pytorch.org/api/flaky-tests/fileReport?startDate={start_date}&endDate={stop_date}"
)

if response.status_code != 200:
raise RuntimeError(
f"Failed to fetch file report data: {response.status_code} {response.text}"
)
data = response.json()
results = data["results"]
costInfo = data["costInfo"]
shas = data["shas"]
testOwnerLabels = data["testOwnerLabels"]

for row in results:
costMatch = next((r for r in costInfo if r["label"] == row["label"]), None)
ownerLabels = next(
(r for r in testOwnerLabels if r["file"] == f"{row['file']}.py"), None
)
commit = next((s for s in shas if s["sha"] == row["sha"]), None)
row["cost"] = (
row["time"] * (costMatch["price_per_hour"] if costMatch else 0)
) / 3600
row["short_job_name"] = f"{row['workflow_name']} / {row['job_name']}"
row["labels"] = ownerLabels["owner_labels"] if ownerLabels else ["unknown"]
row["push_date"] = commit["push_date"] if commit else 0
row["sha"] = commit["sha"] if commit else "unknown"

# choose a commit with the median number of rows
if not results:
raise RuntimeError("No data found for the given time range.")

# group by job name, file
grouped_data: dict[str, list[dict[str, Any]]] = {}
for row in results:
key = f"{row['short_job_name']}|{row['file']}"
if key not in grouped_data:
grouped_data[key] = []
grouped_data[key].append(row)

# get median for each job name, file
representative_data: list[dict[str, Any]] = []
for key, rows in grouped_data.items():
median_row = sorted(
rows,
key=lambda x: (x["failure"], x["flaky"], x["skipped"], x["success"]),
)[len(rows) // 2]
representative_data.append(median_row)
return representative_data

def determine_regressions(self) -> None:
"""
Determine regressions in the test data based on the provided filter.
Returns a list of regression entries.
"""
previous_regression_sha = self._previous_regression_sha()
metadata = self.file_report_generator.fetch_existing_metadata()
curr_sha = metadata[-1]

current_sha = curr_sha["sha"]
if previous_regression_sha == current_sha:
print(f"No new reports since last report: {previous_regression_sha}")
return
prev_sha = metadata[-2]["sha"]

status_changes = self.file_report_generator.get_status_changes(
sha1=prev_sha,
sha2=current_sha,
sha2_push_date=curr_sha["push_date"],
# Choose 5 commits between 5 hours ago and 1d5h ago
current_sha = self.get_representative_data_for_time(
datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 29,
datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 5,
)

def _s3_to_json(bucket: str, key: str) -> Any:
text = self.file_report_generator._fetch_from_s3(bucket, key)
data = []
for line in text.splitlines():
data.append(json.loads(line))

return data

previous_sha_invoking_file_info = _s3_to_json(
"ossci-raw-job-status",
f"additional_info/weekly_file_report/data_{prev_sha}.json.gz",
)
current_sha_invoking_file_info = _s3_to_json(
"ossci-raw-job-status",
f"additional_info/weekly_file_report/data_{current_sha}.json.gz",
yesterday_sha = self.get_representative_data_for_time(
datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 53,
datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 29,
)

for team in CONFIG:
change = self.gen_regression_for_team(
team=team,
prev_invoking_file_info=previous_sha_invoking_file_info,
curr_invoking_file_info=current_sha_invoking_file_info,
status_changes=status_changes,
prev_invoking_file_info=yesterday_sha,
curr_invoking_file_info=current_sha,
)
if self.filter_thresholds(change):
print(f"Regression detected for team: {team['team']}")
Expand All @@ -322,7 +294,6 @@ def _s3_to_json(bucket: str, key: str) -> Any:
send_to_aws_alerting_lambda(alert)
else:
print(f"No significant regression for team: {team['team']}")
self.upload_new_regression_sha(current_sha)


if __name__ == "__main__":
Expand Down
Loading