diff --git a/.github/workflows/update_test_file_report.yml b/.github/workflows/update_test_file_report.yml index 2f2014dbb0..e0f16a773f 100644 --- a/.github/workflows/update_test_file_report.yml +++ b/.github/workflows/update_test_file_report.yml @@ -30,9 +30,10 @@ jobs: CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_HUD_USER_USERNAME }} CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_HUD_USER_PASSWORD }} run: | - YESTERDAY=$(date -u -d "yesterday" +%Y-%m-%d) + now=$(date +%s) + TWO_DAYS_AGO=$((now - 2*86400)) python3 tools/torchci/test_insights/file_report_generator.py \ - --add-dates "$YESTERDAY" + --add-dates "$TWO_DAYS_AGO" "$now" - name: Generate regression notification env: diff --git a/tools/torchci/test_insights/daily_regression.py b/tools/torchci/test_insights/daily_regression.py index 557f4a67ce..e6a881379f 100644 --- a/tools/torchci/test_insights/daily_regression.py +++ b/tools/torchci/test_insights/daily_regression.py @@ -1,10 +1,8 @@ import datetime -import json -from collections import defaultdict -from functools import lru_cache from typing import Any from urllib.parse import quote +import requests from torchci.test_insights.file_report_generator import FileReportGenerator from torchci.test_insights.weekly_notification import send_to_aws_alerting_lambda @@ -88,44 +86,24 @@ class RegressionNotification: previous_regression_sha_key = ( "additional_info/weekly_file_report/regression_metadata.json.gz" ) + keys = [ + "cost", + "time", + "skipped", + "success", + "failure", + "flaky", + ] def __init__(self): self.file_report_generator = FileReportGenerator(dry_run=True) - @lru_cache - def _previous_regression_sha(self) -> str: - text = self.file_report_generator._fetch_from_s3( - "ossci-raw-job-status", - self.previous_regression_sha_key, - ) - try: - return json.loads(text)[0]["sha"] - except (json.JSONDecodeError, IndexError, KeyError): - return "" - - def upload_new_regression_sha(self, sha: str) -> None: - body = [{"sha": sha}] - self.file_report_generator.upload_to_s3( - body, - "ossci-raw-job-status", - self.previous_regression_sha_key, - ) - def gen_regression_for_team( self, team: dict[str, Any], prev_invoking_file_info: list[dict[str, Any]], curr_invoking_file_info: list[dict[str, Any]], - status_changes: list[dict[str, Any]], ) -> dict[str, Any]: - relevant_status_changes = [ - change for change in status_changes if team["condition"](change) - ] - # Aggregate status changes - aggregated_status_changes = defaultdict(int) - for change in relevant_status_changes: - aggregated_status_changes[change["status"]] += 1 - # Invoking_file_info diff relevant_curr_invoking_file_info = [ info for info in curr_invoking_file_info if team["condition"](info) @@ -141,40 +119,22 @@ def gen_regression_for_team( ] def _sum_invoking_file_info(data: list[dict[str, Any]]) -> dict[str, Any]: - info = { - "count": sum(item["count"] for item in data), - "cost": sum(item["cost"] for item in data), - "time": sum(item["time"] for item in data), - "skipped": sum(item["skipped"] for item in data), - } + info = {} + for key in self.keys: + info[key] = sum(item[key] for item in data) return info agg_prev_file_info = _sum_invoking_file_info(relevant_prev_invoking_file_info) agg_curr_file_info = _sum_invoking_file_info(relevant_curr_invoking_file_info) - invoking_file_info_diff = { - "count": { - "previous": agg_prev_file_info["count"], - "current": agg_curr_file_info["count"], - }, - "cost": { - "previous": agg_prev_file_info["cost"], - "current": agg_curr_file_info["cost"], - }, - "time": { - "previous": agg_prev_file_info["time"], - "current": agg_curr_file_info["time"], - }, - "skipped": { - "previous": agg_prev_file_info["skipped"], - "current": agg_curr_file_info["skipped"], - }, - } + invoking_file_info_diff = {} + for key in self.keys: + invoking_file_info_diff[key] = { + "previous": agg_prev_file_info[key], + "current": agg_curr_file_info[key], + } - return { - "status_changes": aggregated_status_changes, - "invoking_file_info": invoking_file_info_diff, - } + return invoking_file_info_diff def filter_thresholds(self, regression: dict[str, Any]) -> bool: def _exceeds_threshold(value: float, total: float) -> bool: @@ -184,36 +144,20 @@ def _exceeds_threshold(value: float, total: float) -> bool: return (value / total) >= percent_threshold - def _status_change_exceeds_threshold(field: str, total_field: str) -> bool: - return _exceeds_threshold( - regression["status_changes"].get(field, 0), - regression["invoking_file_info"][total_field]["previous"], - ) - def _diff_exceeds_threshold(field: str) -> bool: return _exceeds_threshold( - abs( - regression["invoking_file_info"][field]["current"] - - regression["invoking_file_info"][field]["previous"] - ), - regression["invoking_file_info"][field]["previous"], + abs(regression[field]["current"] - regression[field]["previous"]), + regression[field]["previous"], ) - return ( - _status_change_exceeds_threshold("removed", "count") - or _status_change_exceeds_threshold("added", "count") - or _status_change_exceeds_threshold("started_skipping", "skipped") - or _status_change_exceeds_threshold("stopped_skipping", "skipped") - or any( - _diff_exceeds_threshold(key) - for key in ["cost", "count", "skipped", "time"] - ) - ) + keys = self.keys.copy() + keys.remove("flaky") + return any(_diff_exceeds_threshold(key) for key in keys) def format_regression_string(self, team, regression: dict[str, Any]) -> str: def _get_change(field: str, additional_processing) -> str: - current = regression["invoking_file_info"][field]["current"] - previous = regression["invoking_file_info"][field]["previous"] + current = regression[field]["current"] + previous = regression[field]["previous"] change = current - previous percent_change = (change / previous) * 100 if previous != 0 else 0 percent_change = round(percent_change, 2) @@ -226,14 +170,12 @@ def _get_change(field: str, additional_processing) -> str: return ( f"Regression detected for Team:{team['team']}:\n" + f"Link: {team['link']}\n" - + f"New tests: {regression['status_changes'].get('added', 0)}\n" - + f"Removed tests: {regression['status_changes'].get('removed', 0)}\n" - + f"Started skipping: {regression['status_changes'].get('started_skipping', 0)}\n" - + f"Stopped skipping: {regression['status_changes'].get('stopped_skipping', 0)}\n" + f"Cost ($) change: {_get_change('cost', additional_processing=lambda x: round(x, 2))}\n" + f"Time (min) change: {_get_change('time', additional_processing=lambda x: round(x / 60, 2))}\n" - + f"Test count change: {_get_change('count', additional_processing=lambda x: round(x, 2))}\n" + f"\\# skipped change: {_get_change('skipped', additional_processing=lambda x: round(x, 2))}\n" + + f"\\# success change: {_get_change('success', additional_processing=lambda x: round(x, 2))}\n" + + f"\\# failure change: {_get_change('failure', additional_processing=lambda x: round(x, 2))}\n" + # + f"\\# flaky change: {_get_change('flaky', additional_processing=lambda x: round(x, 2))}\n" ) def generate_alert_json( @@ -265,50 +207,80 @@ def generate_alert_json( }, } + def get_representative_data_for_time( + self, start_date, stop_date + ) -> list[dict[str, Any]]: + response = requests.get( + f"https://hud.pytorch.org/api/flaky-tests/fileReport?startDate={start_date}&endDate={stop_date}" + ) + + if response.status_code != 200: + raise RuntimeError( + f"Failed to fetch file report data: {response.status_code} {response.text}" + ) + data = response.json() + results = data["results"] + costInfo = data["costInfo"] + shas = data["shas"] + testOwnerLabels = data["testOwnerLabels"] + + for row in results: + costMatch = next((r for r in costInfo if r["label"] == row["label"]), None) + ownerLabels = next( + (r for r in testOwnerLabels if r["file"] == f"{row['file']}.py"), None + ) + commit = next((s for s in shas if s["sha"] == row["sha"]), None) + row["cost"] = ( + row["time"] * (costMatch["price_per_hour"] if costMatch else 0) + ) / 3600 + row["short_job_name"] = f"{row['workflow_name']} / {row['job_name']}" + row["labels"] = ownerLabels["owner_labels"] if ownerLabels else ["unknown"] + row["push_date"] = commit["push_date"] if commit else 0 + row["sha"] = commit["sha"] if commit else "unknown" + + # choose a commit with the median number of rows + if not results: + raise RuntimeError("No data found for the given time range.") + + # group by job name, file + grouped_data: dict[str, list[dict[str, Any]]] = {} + for row in results: + key = f"{row['short_job_name']}|{row['file']}" + if key not in grouped_data: + grouped_data[key] = [] + grouped_data[key].append(row) + + # get median for each job name, file + representative_data: list[dict[str, Any]] = [] + for key, rows in grouped_data.items(): + median_row = sorted( + rows, + key=lambda x: (x["failure"], x["flaky"], x["skipped"], x["success"]), + )[len(rows) // 2] + representative_data.append(median_row) + return representative_data + def determine_regressions(self) -> None: """ Determine regressions in the test data based on the provided filter. Returns a list of regression entries. """ - previous_regression_sha = self._previous_regression_sha() - metadata = self.file_report_generator.fetch_existing_metadata() - curr_sha = metadata[-1] - - current_sha = curr_sha["sha"] - if previous_regression_sha == current_sha: - print(f"No new reports since last report: {previous_regression_sha}") - return - prev_sha = metadata[-2]["sha"] - - status_changes = self.file_report_generator.get_status_changes( - sha1=prev_sha, - sha2=current_sha, - sha2_push_date=curr_sha["push_date"], + # Choose 5 commits between 5 hours ago and 1d5h ago + current_sha = self.get_representative_data_for_time( + datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 29, + datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 5, ) - def _s3_to_json(bucket: str, key: str) -> Any: - text = self.file_report_generator._fetch_from_s3(bucket, key) - data = [] - for line in text.splitlines(): - data.append(json.loads(line)) - - return data - - previous_sha_invoking_file_info = _s3_to_json( - "ossci-raw-job-status", - f"additional_info/weekly_file_report/data_{prev_sha}.json.gz", - ) - current_sha_invoking_file_info = _s3_to_json( - "ossci-raw-job-status", - f"additional_info/weekly_file_report/data_{current_sha}.json.gz", + yesterday_sha = self.get_representative_data_for_time( + datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 53, + datetime.datetime.now(datetime.timezone.utc).timestamp() - 3600 * 29, ) for team in CONFIG: change = self.gen_regression_for_team( team=team, - prev_invoking_file_info=previous_sha_invoking_file_info, - curr_invoking_file_info=current_sha_invoking_file_info, - status_changes=status_changes, + prev_invoking_file_info=yesterday_sha, + curr_invoking_file_info=current_sha, ) if self.filter_thresholds(change): print(f"Regression detected for team: {team['team']}") @@ -322,7 +294,6 @@ def _s3_to_json(bucket: str, key: str) -> Any: send_to_aws_alerting_lambda(alert) else: print(f"No significant regression for team: {team['team']}") - self.upload_new_regression_sha(current_sha) if __name__ == "__main__": diff --git a/tools/torchci/test_insights/file_report_generator.py b/tools/torchci/test_insights/file_report_generator.py index c6f44e6025..6c6a6d9b31 100644 --- a/tools/torchci/test_insights/file_report_generator.py +++ b/tools/torchci/test_insights/file_report_generator.py @@ -18,33 +18,29 @@ Usage: python file_report_generator.py --add-dates python file_report_generator.py --add-shas - python file_report_generator.py --remove-sha """ import argparse -import concurrent.futures import gzip import io import json import logging -import re -import time +import subprocess import urllib.request -from collections import defaultdict from datetime import datetime, timezone from functools import lru_cache from pathlib import Path -from typing import Any, cast, Dict, List, Optional +from typing import Any, Dict, List try: import boto3 # type: ignore[import] + from botocore.exceptions import ClientError # type: ignore[import] except ImportError: # for unit tests without boto3 installed boto3 = None # type: ignore[assignment] -from torchci.clickhouse import query_clickhouse -from torchci.test_insights.ec2_pricing import get_price_for_label +from torchci.clickhouse import query_clickhouse, query_clickhouse_saved logger = logging.getLogger(__name__) @@ -76,144 +72,65 @@ def __init__(self, dry_run: bool = True): self.dry_run = dry_run @lru_cache - def load_test_owners(self) -> List[Dict[str, Any]]: - """Load the test owner labels JSON file from S3""" - S3_URL = "https://ossci-metrics.s3.us-east-1.amazonaws.com/test_owner_labels/test_owner_labels.json.gz" - logger.debug(f"Fetching test owner labels from S3: {S3_URL}") - with urllib.request.urlopen(S3_URL) as response: - compressed_data = response.read() - decompressed_data = gzip.decompress(compressed_data) - test_owners = [] - for line in decompressed_data.decode("utf-8").splitlines(): - if line.strip(): - test_owners.append(json.loads(line)) - return test_owners - - def get_runner_cost(self, runner_label: str) -> float: - """Get the cost per hour for a given runner""" - if runner_label.startswith("lf."): - runner_label = runner_label[3:] - cost = get_price_for_label(runner_label) - if cost is None: - return 0.0 - return cost - - def _get_first_suitable_sha(self, shas: list[dict[str, Any]]) -> Optional[str]: - """Get the first suitable SHA from a list of SHAs.""" - lens = [] - for sha in shas: - head_sha = sha["head_sha"] - test_data = self._get_invoking_file_test_data_for_sha(head_sha) - - has_no_job_name = False - for entry in test_data: - if "NoJobName" in entry.get("short_job_name", ""): - has_no_job_name = True - break - if has_no_job_name: - logger.debug(f"Has entries with no job name for {head_sha}") - continue - - lens.append((head_sha, len(test_data))) - del test_data - - if len(lens) > 1: - lens.sort(key=lambda x: x[1], reverse=True) - sha1, len1 = lens[0] - _, len2 = lens[1] - - if abs(len1 - len2) * 2 / (len1 + len2) < 0.1: - logger.debug(f"Using SHA {sha1} with {len1} entries") - return sha1 - return None - - def find_suitable_sha(self, date: str) -> Optional[str]: + def get_all_shas(self) -> List[Dict[str, Any]]: """ - Auto-select suitable SHA from PyTorch main branch for a given date. - Usage: - - Provide a date to select a SHA from that day - Criteria: - - SHA is from main branch - - Workflow jobs are successful (green) - - S3 test data is available - - All test entries have job names + Get all shas and commit dates between two dates on pytorch/pytorch main + branch. Date is in epoch timestamp format. """ - - logger.debug("Searching for suitable SHAs from PyTorch main branch...") + repo_root = Path(__file__).resolve().parent.parent.parent.parent + + commits = subprocess.check_output( + [ + "git", + "log", + "--pretty=format:%H %ct", + "origin/main", + ], + cwd=repo_root / ".." / "pytorch", + ).decode("utf-8") + return [ + {"sha": line.split(" ")[0], "push_date": int(line.split(" ")[1])} + for line in commits.splitlines() + ] + + def _get_status_counts_for_sha(self, sha: str) -> List[Dict[str, Any]]: + """ + Get status counts for a specific SHA using ClickHouse. + """ + # get workflow ids first + + workflow_ids = query_clickhouse( + """ + select distinct id from default.workflow_run + where head_sha = {sha: String} + and head_branch = 'main' + and name in ('pull', 'trunk', 'periodic', 'inductor', 'slow') + """, + {"sha": sha}, + ) params = { - "start_date": date + " 00:00:00", - "stop_date": date + " 23:59:59", + "workflowIds": [int(row["id"]) for row in workflow_ids], + "shas": [sha], } - - # Single query with conditional logic using CASE expressions - query = """ - SELECT - w.head_sha, - toUnixTimestamp(w.head_commit.'timestamp') as push_date - FROM default.workflow_run w - WHERE w.head_branch = 'main' - AND w.repository.full_name = 'pytorch/pytorch' - AND w.name in ('pull', 'trunk', 'inductor', 'slow') - AND w.conclusion = 'success' - AND w.head_commit.'timestamp' >= {start_date: DateTime} - AND w.head_commit.'timestamp' <= {stop_date: DateTime} - GROUP BY - w.head_sha, w.head_commit.'timestamp' - HAVING count(distinct w.name) = 4 - ORDER BY - min(w.head_commit.'timestamp') DESC - """ - logger.debug(f"Querying ClickHouse for successful shas on {date}") - candidates = query_clickhouse(query, params) - - logger.debug(f"Found {len(candidates)} candidate SHAs") - - return self._get_first_suitable_sha(candidates) - - @lru_cache - def _get_workflow_jobs_for_sha(self, sha: str) -> List[Dict[str, Any]]: - """Get workflow runs for a specific SHA using ClickHouse.""" - query = """ - with workflow_ids as ( - SELECT - w.id, - FROM - default .workflow_run w - WHERE - w.head_branch = 'main' - AND w.repository.full_name = 'pytorch/pytorch' - AND w.name in ('pull', 'trunk', 'inductor', 'slow') - AND w.conclusion = 'success' - AND w.head_sha = {sha: String} + logger.debug( + f"Querying ClickHouse for status counts with SHA: {sha} and workflow IDs: {params['workflowIds']}" ) - SELECT - DISTINCT j.id as job_id, - j.name as job_name, - j.labels as job_labels, - j.run_id as workflow_id, - j.run_attempt, - j.workflow_name as workflow_name, - j.conclusion - FROM - default .workflow_job j - WHERE - j.run_id in (select id from workflow_ids) - """ - - params = {"sha": sha} + result = query_clickhouse_saved( + "tests/test_status_counts_on_commits_by_file", params + ) + for key in ["success", "flaky", "skipped", "failure"]: + for row in result: + row[key] = int(row[key]) - logger.debug(f"Querying ClickHouse for workflow runs with SHA: {sha}") - result = query_clickhouse(query, params) + logger.debug(f"Retrieved {len(result)} status count records for SHA: {sha}") + return result + def get_status_counts_for_sha(self, sha: str) -> List[Dict[str, Any]]: + result = self._get_status_counts_for_sha(sha) for row in result: - row["short_job_name"] = ( - f"{row.get('workflow_name')} / {self._parse_job_name(row.get('job_name', ''))}" - ) - row["runner_type"] = self._get_runner_label_from_job_info(row) - row["cost"] = self.get_runner_cost(row.get("runner_type", 0)) - row["frequency"] = self.get_frequency(row.get("workflow_name", 0)) - + row["sha"] = sha + row["frequency"] = self.get_frequency(row["workflow_name"]) return result @lru_cache @@ -241,40 +158,6 @@ def get_frequency(self, workflow_name: str) -> float: return int(row["count"]) return 1 - def _parse_job_name(self, job_name: str) -> str: - """ - Parse job name to remove shard information. - Example: 'linux-jammy-py3.10-clang18-asan / test (default, 1, 6, linux.4xlarge)' - becomes: 'linux-jammy-py3.10-clang18-asan / test (default)' - """ - if not job_name: - return "unknown" - - # Replace with just the first part in parentheses - # First extract the part before the comma if it exists - match = re.search(r"\(([^,]+),.*\)", job_name) - if match: - base_part = job_name[: job_name.find("(")] - first_param = match.group(1) - return f"{base_part}({first_param})" - - return job_name - - def _get_runner_label_from_job_info(self, job_info: Dict[str, Any]) -> str: - """ - Extract runner label from job information. - Tries multiple sources: runner_name, job_labels with 'self-hosted' patterns. - """ - # Then try to find runner info in job_labels - job_labels = job_info.get("job_labels", []) - for label in job_labels: - if label.startswith("lf."): - label = label[3:] - if get_price_for_label(label) is not None: - return label - - return "unknown" - def _get_local_cache_file_loc(self, bucket: str, key: str) -> Path: """Get the local cache file location for a given S3 bucket and key.""" return get_temp_dir() / f"cache_{bucket}_{key.replace('/', '_')}" @@ -304,298 +187,6 @@ def _fetch_from_s3(self, bucket: str, key: str) -> str: logger.debug(f"Failed to fetch from s3://{bucket}/{key}: {e}") raise e - def _fetch_invoking_file_summary_from_s3( - self, workflow_run_id: int, workflow_run_attempt: int - ) -> list[dict[str, Any]]: - """ - Use local cache for a specific workflow run. - """ - bucket = "ossci-raw-job-status" - key = f"additional_info/invoking_file_summary/{workflow_run_id}/{workflow_run_attempt}" - - start_time = time.time() - text_data = self._fetch_from_s3(bucket, key) - test_data = json.loads(text_data) - - data_as_list = [] - for build, entries in test_data.items(): - for config, entries in entries.items(): - for _, entry in entries.items(): - entry["run_id"] = workflow_run_id - entry["run_attempt"] = workflow_run_attempt - # TODO remove this later - entry["short_job_name"] = f"{build} / test ({config})" - data_as_list.append(entry) - - logger.debug( - f"Fetched {len(data_as_list)} test entries from {key}, took {time.time() - start_time:.2f} seconds" - ) - return data_as_list - - def _get_invoking_file_test_data_for_sha(self, sha: str) -> List[Dict[str, Any]]: - """ - Fetch all test data for a given SHA once and cache it. - Returns a flat list of test entries with job info embedded. - """ - workflow_jobs = self._get_workflow_jobs_for_sha(sha) - - # Create job_info mapping from the returned job data - job_info = {} - workflow_runs = set() - - for job_data in workflow_jobs: - job_id = str(job_data.get("job_id")) - job_info[job_id] = { - "job_name": job_data.get("job_name", ""), - "short_job_name": job_data.get("short_job_name", ""), - "job_labels": job_data.get("job_labels", []), - "runner_type": job_data.get("runner_type", ""), - "cost_per_hour": job_data.get("cost", 0.0), - "workflow_name": job_data.get("workflow_name", ""), - "frequency": job_data.get("frequency", 0), - } - workflow_runs.add((job_data["workflow_id"], job_data["run_attempt"])) - - all_test_data = [] - - # Use threads instead of processes for IO-bound S3 fetching - results = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - futures = [ - executor.submit( - self._fetch_invoking_file_summary_from_s3, run_id, run_attempt - ) - for run_id, run_attempt in workflow_runs - ] - # Maintain order to match workflow_runs - results = [f.result() for f in futures] - - for test_data in results: - all_test_data.extend(test_data) - - # Create lookup table for workflow_id -> workflow_name to construct - # short_job_name - workflow_id_to_name = { - job["workflow_id"]: job["workflow_name"] for job in workflow_jobs - } - # Map short job name to full job data to get cost per hour and runner - # type. There will be duplicates but the only info we need is - # cost/runner which should be the same - job_name_to_job = {job["short_job_name"]: job for job in job_info.values()} - - # Embed workflow name and job info into each test entry - for entry in all_test_data: - run_id = entry.get("run_id") - workflow_name = workflow_id_to_name.get(run_id) - entry["short_job_name"] = ( - f"{workflow_name} / {entry.get('short_job_name', '')}" - ) - _job_info = job_name_to_job[entry["short_job_name"]] - entry["sha"] = sha - entry["push_date"] = self.get_push_date_for_sha(sha) - entry["labels"] = self.get_label_for_file(entry["file"]) - entry["cost"] = ( - _job_info.get("cost_per_hour", 0.0) / 3600.0 * entry.get("time", 0) - ) - entry["frequency"] = _job_info["frequency"] - - return all_test_data - - def get_label_for_file(self, file: str) -> List[str]: - for row in self.load_test_owners(): - if row["file"] == file: - return row["owner_labels"] - return [] - - def _fetch_status_changes_from_s3( - self, workflow_run_id: int, workflow_run_attempt: int - ) -> list[dict[str, Any]]: - """ - Use local cache for a specific workflow run. - """ - bucket = "ossci-raw-job-status" - key = f"additional_info/test_status/{workflow_run_id}/{workflow_run_attempt}" - - start_time = time.time() - text_data = self._fetch_from_s3(bucket, key) - - test_data = [] - for line in text_data.splitlines(): - data = json.loads(line) - data["run_id"] = workflow_run_id - test_data.append(data) - - logger.debug( - f"Fetched {len(test_data)} test entries from {key}, took {time.time() - start_time:.2f} seconds" - ) - return test_data - - def _get_status_changes_for_sha(self, sha: str) -> List[Dict[str, Any]]: - """ """ - workflow_jobs = self._get_workflow_jobs_for_sha(sha) - workflow_runs = set() - for job_data in workflow_jobs: - workflow_runs.add((job_data["workflow_id"], job_data["run_attempt"])) - - all_test_data = [] - - # Use threads instead of processes for IO-bound S3 fetching - results = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - futures = [ - executor.submit(self._fetch_status_changes_from_s3, run_id, run_attempt) - for run_id, run_attempt in workflow_runs - ] - # Maintain order to match workflow_runs - results = [f.result() for f in futures] - - for test_data in results: - all_test_data.extend(test_data) - - # Create lookup table for workflow_id -> workflow_name to construct - # short_job_name - workflow_id_to_name = { - job["workflow_id"]: job["workflow_name"] for job in workflow_jobs - } - - # Embed workflow name and job info into each test entry - for entry in all_test_data: - run_id = entry["run_id"] - workflow_name = workflow_id_to_name.get(run_id) - entry["short_job_name"] = ( - f"{workflow_name} / {entry.get('short_job_name', '')}" - ) - - return all_test_data - - def _check_status_change_already_exists(self, sha1: str, sha2: str) -> bool: - """ - Check if status changes between two SHAs already exist in S3. - """ - bucket = "ossci-raw-job-status" - key = f"additional_info/weekly_file_report/status_changes_{sha1}_{sha2}.json.gz" - url = f"https://{bucket}.s3.amazonaws.com/{key}" - try: - with urllib.request.urlopen(url) as response: - if response.status == 200: - logger.debug( - f"Status changes for {sha1} to {sha2} already exist in S3." - ) - return True - except Exception: - pass - return False - - def get_status_changes( - self, sha1: str, sha2: str, sha2_push_date: str - ) -> list[dict[str, Any]]: - """ - Compare test data between two pre-fetched datasets. - Returns a dictionary with file as keys and job diffs as values. - """ - - tests1 = self._get_status_changes_for_sha(sha1) - tests2 = self._get_status_changes_for_sha(sha2) - - # Group by key - map1 = {(v["short_job_name"], v["file"], v["name"]): v for v in tests1} - map2 = {(v["short_job_name"], v["file"], v["name"]): v for v in tests2} - - status_changes = [] - - for key in map1.keys() | map2.keys(): - status = None - - if key in map1 and key not in map2: - status = "removed" - elif key not in map1 and key in map2: - status = "added" - else: - skipped1 = map1[key]["status"] == "skipped" - skipped2 = map2[key]["status"] == "skipped" - if not skipped1 and skipped2: - status = "started_skipping" - elif skipped1 and not skipped2: - status = "stopped_skipping" - if status is not None: - status_changes.append( - { - "short_job_name": key[0], - "file": key[1], - "test_name": key[2], - "status": status, - "labels": self.get_label_for_file(key[1]), - "sha": sha2, - "push_date": sha2_push_date, - } - ) - - # Too large so truncate for now - just keep first 10 of each type - counts = defaultdict(list) - for entry in status_changes: - counts[(entry["short_job_name"], entry["file"], entry["status"])].append( - entry - ) - to_write = [] - for key, entries in counts.items(): - to_write.extend(entries[:10]) - logger.debug( - f"Found {len(status_changes)} status changes between {sha1} and {sha2}, truncated to {len(to_write)} for upload" - ) - - self.upload_to_s3( - to_write, - "ossci-raw-job-status", - f"additional_info/weekly_file_report/status_changes_{sha1}_{sha2}.json.gz", - ) - - return status_changes - - def get_data_for_sha(self, sha: str) -> Dict[str, Any]: - push_date = self.get_push_date_for_sha(sha) - invoking_file_test_data = self._get_invoking_file_test_data_for_sha(sha) - data = { - "sha": sha, - "push_date": push_date, - } - self.upload_to_s3( - invoking_file_test_data, - "ossci-raw-job-status", - f"additional_info/weekly_file_report/data_{sha}.json.gz", - ) - return data - - @lru_cache - def get_push_date_for_sha(self, sha: str) -> Optional[str]: - """ - Get the push date for a given SHA from ClickHouse push table. - Returns the date as an ISO string, or None if not found. - """ - query = """ - SELECT toUnixTimestamp(min(p.head_commit.timestamp)) as pushed_at - FROM default.push p - WHERE p.after = {sha: String} - AND p.ref = 'refs/heads/main' - AND p.repository.full_name = 'pytorch/pytorch' - """ - params = {"sha": sha} - result = query_clickhouse(query, params) - if result: - return result[0]["pushed_at"] - return None - - def fetch_existing_metadata(self) -> list[dict[str, Any]]: - """Fetch existing metadata from the reports directory""" - metadata_str = self._fetch_from_s3( - "ossci-raw-job-status", - "additional_info/weekly_file_report/commits_metadata.json.gz", - ) - metadata = [] - for line in metadata_str.splitlines(): - metadata.append(json.loads(line)) - - return metadata - @lru_cache def get_s3_resource(self): s3 = boto3.resource("s3") @@ -636,14 +227,29 @@ def upload_to_s3( ContentType="application/json", ) - def remove_key_from_s3(self, bucket: str, key: str) -> None: - """Remove a specific key from S3""" - html_url = f"https://{bucket}.s3.amazonaws.com/{key}" - if self.dry_run: - logger.info(f"Dry run: would remove from s3: {html_url}") - return - logger.info(f"Removing from s3: {html_url}") - self.get_s3_resource().Object(bucket, key).delete() + def check_if_s3_object_exists(self, sha): + obj = self.get_s3_resource().Object( + "ossci-raw-job-status", + f"additional_info/weekly_file_report2/data_{sha}.json.gz", + ) + try: + obj.load() + return True + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + else: + raise + + def upload_for_sha(self, sha): + counts = self.get_status_counts_for_sha(sha) + if counts: + logger.info(f"Adding SHA {sha}: {len(counts)} test count records") + self.upload_to_s3( + counts, + "ossci-raw-job-status", + f"additional_info/weekly_file_report2/data_{sha}.json.gz", + ) def main() -> None: @@ -661,91 +267,52 @@ def main() -> None: "--add-shas", nargs="+", help="List of commit SHAs to compare in sequence" ) parser.add_argument( - "--add-dates", nargs="+", help="List of commit SHAs to compare in sequence" + "--add-dates", nargs=2, help="Add shas between two dates (epoch timestamps)" ) parser.add_argument( "--dry-run", action="store_true", help="Dry run without writing files" ) - parser.add_argument("--remove-sha", help="Remove a specific SHA from the report") - args = parser.parse_args() generator = FileReportGenerator(args.dry_run) - existing_metadata = generator.fetch_existing_metadata() + # commit data = list of {sha, push_date} + # test counts = list of {file, workflow_name, job_name, time, success, flaky, skipped, failure, labels (runner), sha} - _existing_dates = set( - datetime.fromtimestamp(entry["push_date"], timezone.utc).strftime("%Y-%m-%d") - for entry in existing_metadata - ) - _existing_shas = set(entry["sha"] for entry in existing_metadata) - - if args.remove_sha: - for i, entry in enumerate(existing_metadata): - if entry["sha"] == args.remove_sha: - logger.info(f"Removing SHA {args.remove_sha} from existing metadata") - generator.remove_key_from_s3( - "ossci-raw-job-status", - f"additional_info/weekly_file_report/data_{args.remove_sha}.json.gz", - ) - if i > 0: - prev_sha = existing_metadata[i - 1]["sha"] - generator.remove_key_from_s3( - "ossci-raw-job-status", - f"additional_info/weekly_file_report/status_changes_{prev_sha}_{args.remove_sha}.json.gz", - ) - if i < len(existing_metadata) - 1: - next_sha = existing_metadata[i + 1]["sha"] - generator.remove_key_from_s3( - "ossci-raw-job-status", - f"additional_info/weekly_file_report/status_changes_{args.remove_sha}_{next_sha}.json.gz", - ) - existing_metadata.pop(i) - break - - shas: list[str] = [] - for date in args.add_dates or []: - if date in _existing_dates: - logger.info(f"Date {date} already exists in metadata, skipping") - continue - sha = generator.find_suitable_sha(date) - if sha is None: - logger.info(f"No suitable SHA found for date {date}, skipping") - continue - logger.info(f"Found suitable SHA {sha} for date {date}") - shas.append(sha) - - for sha in args.add_shas or []: - shas.append(cast(str, sha)) - - logger.info(f"Adding SHAs: {shas}") - - # Load data to get dates/ordering - for sha in shas: - sha_data = generator.get_data_for_sha(sha) - if sha not in _existing_shas: - existing_metadata.append(sha_data) - - existing_metadata = sorted(existing_metadata, key=lambda x: x["push_date"]) - - logger.debug("Calculating diffs for all files and grouping by labels...") - for i in range(1, len(existing_metadata)): - if not generator._check_status_change_already_exists( - existing_metadata[i - 1]["sha"], - existing_metadata[i]["sha"], - ): - generator.get_status_changes( - existing_metadata[i - 1]["sha"], - existing_metadata[i]["sha"], - existing_metadata[i]["push_date"], - ) + # The client needs to put together: cost, short_job_name, frequency on their + # own if they want it. They can use the commit data to see which shas exist + # and choose which to compare + # Construct and upload the commit metadata everytime since it's cheap + commit_data = generator.get_all_shas() generator.upload_to_s3( - existing_metadata, + commit_data, "ossci-raw-job-status", - "additional_info/weekly_file_report/commits_metadata.json.gz", + "additional_info/weekly_file_report2/commits_metadata.json.gz", ) + shas_to_add = [] + + if args.add_dates: + start_date, stop_date = args.add_dates + start_date, stop_date = int(start_date), int(stop_date) + logger.info(f"Adding SHAs between dates: {start_date} to {stop_date}") + for commit in commit_data: + # if the commit is within the date range, add its SHA + if start_date <= commit["push_date"] <= stop_date: + shas_to_add.append(commit["sha"]) + + if args.add_shas: + shas = args.add_shas + logger.info(f"Adding SHAs: {shas}") + shas_to_add.extend(shas) + + for sha in shas_to_add: + try: + generator.upload_for_sha(sha) + except Exception as e: + logger.error(f"Failed to upload data for SHA {sha}: {e}") + if __name__ == "__main__": main() diff --git a/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/params.json b/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/params.json new file mode 100644 index 0000000000..e6ebb58d60 --- /dev/null +++ b/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/params.json @@ -0,0 +1,7 @@ +{ + "params": { + "shas": "Array(String)", + "workflowIds": "Array(Int64)" + }, + "tests": [] +} diff --git a/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/query.sql b/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/query.sql new file mode 100644 index 0000000000..8c6a4060c6 --- /dev/null +++ b/torchci/clickhouse_queries/tests/test_status_counts_on_commits_by_file/query.sql @@ -0,0 +1,75 @@ +with job as ( + select + distinct + id, + regexp_replace( + name, + '(\\([^,]+, )(?:[0-9]+, )*(?:lf\\.)?([^)]+\\))', + '\\1\\2' + ) AS name, + workflow_name, + labels + from + default .workflow_job + where + run_id in {workflowIds: Array(Int64) } +), +statuses as ( + SELECT + replaceAll(invoking_file, '.', '/') as invoking_file, + all_test_runs.name as name, + classname, + multiIf( + countIf( + failure_count = 0 + AND error_count = 0 + AND skipped_count = 0 + AND rerun_count = 0 + ) = count(*), + 'success', + sum(skipped_count) > 0, + 'skipped', + countIf( + failure_count = 0 + AND error_count = 0 + ) > 0, + 'flaky', + 'failure' + ) AS status, + sum(time) / count(distinct workflow_id, workflow_run_attempt) as time, + job.name AS job_name, + job.workflow_name as workflow_name, + arrayDistinct(arrayFlatten(groupArray(job.labels))) as labels + FROM + tests.all_test_runs + JOIN job ON job.id = all_test_runs.job_id + WHERE + job_id IN ( + SELECT + id + FROM + job + ) + GROUP BY + invoking_file, + name, + classname, + job.name, + job.workflow_name +) +select + invoking_file as file, + workflow_name, + job_name, + round(sum(time), 2) as time, + countIf(status = 'success') as success, + countIf(status = 'flaky') as flaky, + countIf(status = 'skipped') as skipped, + countIf(status = 'failure') as failure, + arrayDistinct(arrayFlatten(groupArray(labels))) as labels +from + statuses +group by + file, + workflow_name, + job_name diff --git a/torchci/pages/api/flaky-tests/fileReport.ts b/torchci/pages/api/flaky-tests/fileReport.ts new file mode 100644 index 0000000000..85377dae15 --- /dev/null +++ b/torchci/pages/api/flaky-tests/fileReport.ts @@ -0,0 +1,265 @@ +import { queryClickhouse } from "lib/clickhouse"; +import _ from "lodash"; +import type { NextApiRequest, NextApiResponse } from "next"; +import pLimit from "p-limit"; +import zlib from "zlib"; + +const SUPPORTED_WORKFLOWS = ["periodic", "pull", "trunk", "slow", "inductor"]; + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + const startDate = req.query.startDate as string; + const endDate = req.query.endDate as string; + const data = await getInfo(parseInt(startDate), parseInt(endDate)); + res + .status(200) + .setHeader("Content-Encoding", "gzip") + .send(zlib.gzipSync(JSON.stringify(data))); +} + +const JOBS_ON_SHAS = ` +select distinct +regexp_replace( + name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' +) AS name, head_sha as sha from default.workflow_job +where id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in {shas: Array(String)}) +and workflow_name in {supported_workflows: Array(String)} +`; + +async function getInfo(startDate: number, endDate: number) { + // Get commits for each workflow in parallel + let allPyTorchCommits: { sha: string; push_date: number }[] = + await fetchJSONLines( + `https://ossci-raw-job-status.s3.us-east-1.amazonaws.com/additional_info/weekly_file_report2/commits_metadata.json.gz` + ); + + allPyTorchCommits = allPyTorchCommits.filter( + (c) => c.push_date >= startDate && c.push_date <= endDate + ); + + const shaToDate = new Map(allPyTorchCommits.map((c) => [c.sha, c.push_date])); + + const workflowsOnShas = await queryClickhouse(JOBS_ON_SHAS, { + shas: allPyTorchCommits.map((c) => c.sha), + supported_workflows: SUPPORTED_WORKFLOWS, + }); + + // Group by workflow + const shasByWorkflow = _(workflowsOnShas) + .groupBy((s) => s.name) + .values() + .sortBy((vals) => vals.length) + .value(); + + // Get up to 20 evenly distributed SHAs from the workflow with fewest options + const selectedShas = new Set(); + selectedShas.add( + workflowsOnShas.sort((row) => shaToDate.get(row.sha) || 0)[0].sha + ); + selectedShas.add( + workflowsOnShas.sort((row) => -(shaToDate.get(row.sha) || 0))[0].sha + ); + + // Calculate intervals for coverage check + const allDates = allPyTorchCommits.map((c) => c.push_date); + const minDate = Math.min(...allDates); + const maxDate = Math.max(...allDates); + const numIntervals = 10; + const intervalSize = (maxDate - minDate) / numIntervals; + + // Helper to find largest gap and pick SHA in between + function fillGaps( + existingShas: string[], + allAvailableShas: string[], + targetCount: number + ): void { + const result = [...existingShas]; + const needed = targetCount - existingShas.length; + + // Sort existing SHAs by push_date + const sortedExisting = result + .map((sha) => ({ sha, push_date: shaToDate.get(sha) || 0 })) + .sort((a, b) => a.push_date - b.push_date); + + for (let i = 0; i < needed; i++) { + // Collect all gaps with their indices + const gaps: { gap: number; startIdx: number }[] = []; + for (let j = 0; j < sortedExisting.length - 1; j++) { + const gap = + sortedExisting[j + 1].push_date - sortedExisting[j].push_date; + gaps.push({ gap, startIdx: j }); + } + + // Sort gaps by size (descending) + gaps.sort((a, b) => b.gap - a.gap); + + // Try each gap in order until we find a candidate SHA + let candidateSha = null; + let gapStartIdx = -1; + + for (const { startIdx } of gaps) { + const startDate = sortedExisting[startIdx].push_date; + const endDate = sortedExisting[startIdx + 1].push_date; + const midDate = (startDate + endDate) / 2; + + // Find available SHA closest to midpoint + const candidate = allAvailableShas + .filter((sha) => { + const date = shaToDate.get(sha); + return ( + date && + date > startDate && + date < endDate && + !result.includes(sha) + ); + }) + .map((sha) => ({ + sha, + distance: Math.abs((shaToDate.get(sha) || 0) - midDate), + })) + .sort((a, b) => a.distance - b.distance)[0]; + + if (candidate) { + candidateSha = candidate; + gapStartIdx = startIdx; + break; + } + } + + if (!candidateSha) break; + + result.push(candidateSha.sha); + selectedShas.add(candidateSha.sha); + sortedExisting.splice(gapStartIdx + 1, 0, { + sha: candidateSha.sha, + push_date: shaToDate.get(candidateSha.sha) || 0, + }); + } + + // Second pass: Ensure each interval has at least one SHA + // Only do this if we haven't already exceeded a reasonable limit + const maxTotalShas = targetCount + 5; // Allow up to 5 extra for interval coverage + + for (let i = 0; i < numIntervals; i++) { + if (result.length >= maxTotalShas) break; + + const intervalStart = minDate + i * intervalSize; + const intervalEnd = minDate + (i + 1) * intervalSize; + + // Check if any selected SHA falls in this interval + const hasShaMInInterval = result.some((sha) => { + const date = shaToDate.get(sha); + return date && date >= intervalStart && date < intervalEnd; + }); + + // If no SHA in this interval, find the closest one and add it + if (!hasShaMInInterval) { + const intervalMid = (intervalStart + intervalEnd) / 2; + const closestSha = allAvailableShas + .filter((sha) => { + const date = shaToDate.get(sha); + return ( + date && + date >= intervalStart && + date < intervalEnd && + !result.includes(sha) + ); + }) + .map((sha) => ({ + sha, + distance: Math.abs((shaToDate.get(sha) || 0) - intervalMid), + })) + .sort((a, b) => a.distance - b.distance)[0]; + + if (closestSha) { + result.push(closestSha.sha); + selectedShas.add(closestSha.sha); + } + } + } + } + + // For each workflow, fill gaps up to 10 SHAs and ensure interval coverage + _(shasByWorkflow).forEach((vals) => { + const workflowShas = vals.map((v) => v.sha); + // Find which of the base SHAs this workflow has + const existingShas = Array.from(selectedShas).filter((sha) => + workflowShas.includes(sha) + ); + // Fill gaps to get to 10, then ensure all intervals have coverage + fillGaps(existingShas, workflowShas, 10); + }); + + const finalShas = Array.from(selectedShas); + + const limit = pLimit(10); // max 5 concurrent batch requests + + const results = await Promise.all( + finalShas.map((sha) => + limit(async () => { + try { + return await fetchJSONLines( + `https://ossci-raw-job-status.s3.us-east-1.amazonaws.com/additional_info/weekly_file_report2/data_${sha}.json.gz` + ).then((data) => + data.map((item) => ({ + ...item, + sha: sha, + })) + ); + } catch (error) { + // console.error(`Error fetching data for ${sha.sha}:`, error); + return []; + } + }) + ) + ).then((arrays) => arrays.flat()); + + const newShas = allPyTorchCommits.filter((s) => + results.some((r) => r.sha === s.sha) + ); + + async function fetchJSONLines(url: string): Promise { + const res = await fetch(url); + const text = await res.text(); + return text + .split("\n") + .filter((line) => line.trim() !== "") + .map((line) => JSON.parse(line)); + } + + const costInfo = await fetchJSONLines( + `https://ossci-metrics.s3.us-east-1.amazonaws.com/ec2_pricing.json.gz` + ); + + const testOwnerLabels = await fetchJSONLines( + `https://ossci-metrics.s3.us-east-1.amazonaws.com/test_owner_labels/test_owner_labels.json.gz` + ); + + results.forEach((item) => { + // find first label in item.labels that is in costInfo + let label = "unknown"; + for (let l of item.labels) { + if (l.startsWith("lf.")) { + l = l.slice(3); + } + const costData = costInfo.find((cost) => cost.label === l); + if (costData) { + label = l; + break; + } + } + item.label = label; + delete item.labels; + }); + + return { + results, + costInfo, + shas: newShas, + testOwnerLabels, + }; +} diff --git a/torchci/pages/api/flaky-tests/statusChanges.ts b/torchci/pages/api/flaky-tests/statusChanges.ts new file mode 100644 index 0000000000..4e7deb0793 --- /dev/null +++ b/torchci/pages/api/flaky-tests/statusChanges.ts @@ -0,0 +1,313 @@ +import { queryClickhouse } from "lib/clickhouse"; +import _ from "lodash"; +import type { NextApiRequest, NextApiResponse } from "next"; +import zlib from "zlib"; + +// Query for status changes between two SHAs for given job IDs and files +const QUERY = ` +with job as ( + select + id, + regexp_replace( + name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' + ) AS name, + head_sha, + workflow_name + from + default .workflow_job + where + id in {jobIds: Array(Int64)} +), +statuses as ( + SELECT + replaceAll(invoking_file, '.', '/') as invoking_file, + all_test_runs.name as name, + classname, + multiIf( + countIf( + failure_count = 0 + AND error_count = 0 + AND skipped_count = 0 + AND rerun_count = 0 + ) = count(*), + 'success', + sum(skipped_count) > 0, + 'skipped', + countIf( + failure_count = 0 + AND error_count = 0 + ) > 0, + 'flaky', + 'failure' + ) AS status, + job.name AS job_name, + job.workflow_name AS workflow_name, + job.head_sha AS head_sha + FROM + tests.all_test_runs join job on all_test_runs.job_id = job.id + PREWHERE + job_id IN {jobIds: Array(Int64)} + and replaceAll(invoking_file, '.', '/') IN {files: Array(String)} + GROUP BY + invoking_file, + name, + classname, + job.name, + job.workflow_name, + job.head_sha +), +pivoted AS ( + SELECT + invoking_file, + name, + classname, + job_name, + workflow_name, + maxIf(status, head_sha = {sha1: String}) AS prev_status, + maxIf(status, head_sha = {sha2: String}) AS new_status + FROM + statuses + GROUP BY + invoking_file, + name, + classname, + job_name, + workflow_name +) +SELECT + name, + classname, + invoking_file, + workflow_name, + job_name, + prev_status, + new_status +FROM + pivoted +WHERE + prev_status != new_status +order by + job_name, + invoking_file, + classname, + name +LIMIT 200 +`; + +function getQueryForJobIds(fuzzy: boolean, before: boolean): string { + // Get query for job ids based on fuzzy or exact matching. Before indicates if we + // want jobs before the given sha (true) or after (false) when fuzzy is true. + if (!fuzzy) { + return ` +select + regexp_replace( + name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' + ) AS name, + workflow_name, + id, + head_sha +from + default .workflow_job +where + head_sha in {shas: Array(String) } +`; + } + return ` +WITH ref AS ( + SELECT + head_commit.timestamp AS ref_ts + FROM + default .workflow_run + WHERE + head_commit.id = {sha :String } + LIMIT + 1 +), workflow_runs AS ( + select + distinct head_commit.timestamp AS commit_ts, + head_commit.id AS sha + from + default .workflow_run + where + head_commit.timestamp <= ( + select + ref_ts + from + ref + ) + and head_branch = 'main' + order by + head_commit.timestamp DESC + limit + 1000 +) +SELECT + workflow_name, + regexp_replace( + t.name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' + ) as name, + argMax(t.id, commit_ts) AS id, + argMax(sha, commit_ts) AS head_sha +FROM + default .workflow_job t + join workflow_runs on t.head_sha = workflow_runs.sha +WHERE + concat( + workflow_name, + ' / ', + regexp_replace( + t.name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' + ) + ) IN {names :Array(String) } -- input list of names + and t.id in ( + select + id + from + materialized_views.workflow_job_by_head_sha + where + head_sha in ( + select + sha + from + workflow_runs + ) + ) +GROUP BY + workflow_name, + regexp_replace( + t.name, + '(\\\\([^,]+, )(?:[0-9]+, )*(?:lf\\\\.)?([^)]+\\\\))', + '\\\\1\\\\2' + ) +`; +} + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + // Support both GET and POST methods. POST is preferred for large lists of + // files/jobs. + let sha1: string; + let sha2: string; + let filesParam: string; + let jobsParam: string; + // Fuzzy = true means to match jobs before sha1 and after sha2 based on + // timestamps if a job in the job list doesn't exist for those SHAs (ex + // periodic. Not every sha runs periodic, so instead we find the closest jobs + // before (for the first sha) and after (for the second sha) to get the status + // for that test + let fuzzy: boolean; + + if (req.method === "POST") { + // Read from POST body + sha1 = req.body.sha1; + sha2 = req.body.sha2; + filesParam = JSON.stringify(req.body.files || []); + jobsParam = JSON.stringify(req.body.jobs || []); + fuzzy = req.body.fuzzy === true || req.body.fuzzy === "true"; + } else { + // Read from query parameters (GET) + sha1 = req.query.sha1 as string; + sha2 = req.query.sha2 as string; + filesParam = req.query.files as string; + jobsParam = req.query.jobs as string; + fuzzy = req.query.fuzzy === "true"; + } + + if (!sha1 || !sha2) { + res.status(400).json({ error: "sha1 and sha2 are required" }); + return; + } + + // Parse the files and jobs arrays + const files: string[] = filesParam ? JSON.parse(filesParam) : []; + const jobs: string[] = jobsParam ? JSON.parse(jobsParam) : []; + + const jobInfo = []; + if (fuzzy) { + // If it's fuzzy, we need to get the job ids before the first sha, and the + // job ids after the second sha. If there is nothing after the second sha, + // get the jobs before the + const before = await queryClickhouse(getQueryForJobIds(true, true), { + sha: sha1, + names: jobs, + }); + let after = await queryClickhouse(getQueryForJobIds(true, false), { + sha: sha2, + names: jobs, + }); + if (after.length === 0) { + after = await queryClickhouse(getQueryForJobIds(true, true), { + sha: sha2, + names: jobs, + }); + } + jobInfo.push(...before.map((job: any) => ({ ...job, before: true }))); + jobInfo.push(...after.map((job: any) => ({ ...job, before: false }))); + } else { + jobInfo.push( + ...(await queryClickhouse(getQueryForJobIds(false, false), { + shas: [sha1, sha2], + })) + ); + } + + const filteredJobs = _(jobInfo) + .filter((job) => { + const fullJobName = `${job.workflow_name} / ${job.name}`; + // If jobs list is provided, filter by it; otherwise include all + if (jobs.length > 0) { + return jobs.includes(fullJobName); + } + return true; + }) + .groupBy((j) => `${j.workflow_name} / ${j.name}`) + .values() + .sortBy((jobs) => `${jobs[0].workflow_name} / ${jobs[0].name}`) + .value(); + + const results = []; + + for (const jobs of filteredJobs) { + if (results.length >= 200) { + break; + } + const jobIds = jobs.map((j) => j.id); + const dummySha1 = fuzzy + ? jobs.find((j) => j.before)?.head_sha || sha1 + : sha1; + const dummySha2 = fuzzy + ? jobs.find((j) => !j.before)?.head_sha || sha2 + : sha2; + + let statusChanges = await queryClickhouse(QUERY, { + jobIds, + sha1: dummySha1, + sha2: dummySha2, + files, + }); + + // Apply file filter if specified + if (files.length > 0) { + statusChanges = statusChanges.filter((change: any) => { + return files.includes(change.invoking_file); + }); + } + + results.push(...statusChanges); + } + + res + .status(200) + .setHeader("Content-Encoding", "gzip") + .send(zlib.gzipSync(JSON.stringify(results))); +} diff --git a/torchci/pages/tests/fileReport.tsx b/torchci/pages/tests/fileReport.tsx index 91878c80bf..26b68e46a7 100644 --- a/torchci/pages/tests/fileReport.tsx +++ b/torchci/pages/tests/fileReport.tsx @@ -1,3 +1,8 @@ +import AddCircleOutlineIcon from "@mui/icons-material/AddCircleOutline"; +import CheckCircleIcon from "@mui/icons-material/CheckCircle"; +import ErrorIcon from "@mui/icons-material/Error"; +import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline"; +import WarningRoundedIcon from "@mui/icons-material/WarningRounded"; import { Box, Button, @@ -15,6 +20,9 @@ import { GridRenderCellParams, GridTreeNodeWithRender, } from "@mui/x-data-grid"; +import { AdapterDayjs } from "@mui/x-date-pickers/AdapterDayjs"; +import { DateTimePicker } from "@mui/x-date-pickers/DateTimePicker"; +import { LocalizationProvider } from "@mui/x-date-pickers/LocalizationProvider"; import CopyLink from "components/common/CopyLink"; import LoadingPage from "components/common/LoadingPage"; import RegexButton from "components/common/RegexButton"; @@ -22,16 +30,14 @@ import { durationDisplay } from "components/common/TimeUtils"; import dayjs from "dayjs"; import isoWeek from "dayjs/plugin/isoWeek"; import ReactECharts from "echarts-for-react"; -import { encodeParams } from "lib/GeneralUtils"; +import { encodeParams, fetcher } from "lib/GeneralUtils"; import _ from "lodash"; import { useRouter } from "next/router"; import { useEffect, useRef, useState } from "react"; +import useSWRImmutable from "swr/immutable"; dayjs.extend(isoWeek); -const S3_LOCATION = - "https://ossci-raw-job-status.s3.amazonaws.com/additional_info/weekly_file_report"; - function formatTimestamp(ts: number) { return new Date(ts * 1000).toLocaleDateString().slice(0, 10); } @@ -93,49 +99,107 @@ function matchLabel( } function Diffs({ + shas, data, setFileFilter, setJobFilter, }: { + shas: { sha: string; push_date: number }[]; data: { [key: string]: any }[]; setFileFilter: (v: string) => void; setJobFilter: (v: string) => void; }) { - const groupByOptions = {}; - // Compute diffs for every row (except the earliest) in each (file, short_job_name) group - const groupedDiff = _.groupBy(data, (d) => `${d.file}|||${d.short_job_name}`); - // Map from id (row) to diff object for every row (except the first in group) - const rowDiffs: Record = {}; - Object.entries(groupedDiff).forEach(([key, arr]) => { - // Sort by push_date ascending (oldest to newest) - const sorted = _.sortBy(arr, (d) => d.push_date); - for (let i = 1; i < sorted.length; ++i) { - const curr = sorted[i]; - const prev = sorted[i - 1]; - function diff(field: string) { - if (!curr || !prev) return null; - return (curr[field] || 0) - (prev[field] || 0); + // Get all unique commits sorted by push_date + const allCommits = shas.sort((a, b) => a.push_date - b.push_date); + + // State for selected commits (default to first and last) + const [firstCommitIndex, setFirstCommitIndex] = useState(0); + const [lastCommitIndex, setLastCommitIndex] = useState(allCommits.length - 1); + + // Update indices when data changes + useEffect(() => { + setFirstCommitIndex(0); + setLastCommitIndex(allCommits.length - 1); + }, [allCommits.length]); + + const firstCommit = allCommits[firstCommitIndex] || allCommits[0]; + const lastCommit = + allCommits[lastCommitIndex] || allCommits[allCommits.length - 1]; + + // Group data by (file, short_job_name) + const groupedData = _.groupBy(data, (d) => `${d.file}|||${d.short_job_name}`); + + // Create one row per (file, job) with diffs. If the data is missing for the + // specific shas, use interpolation based on nearest available data points. + const diffRows = Object.entries(groupedData).map(([key, arr], index) => { + const [file, jobName] = key.split("|||"); + + // Helper to get interpolated value for a commit + const getInterpolatedValue = (targetCommit: any, field: string) => { + // Find exact match first + const exactMatch = arr.find((row) => row.sha === targetCommit.sha); + if (exactMatch) { + return exactMatch[field] || 0; } - rowDiffs[curr.id] = { - count_diff: diff("count"), - cost_diff: diff("cost"), - time_diff: diff("time"), - skipped_diff: diff("skipped"), - errors_diff: diff("errors"), - failures_diff: diff("failures"), - successes_diff: diff("successes"), - }; - } + + // Sort by push_date for interpolation + const sorted = _.sortBy(arr, "push_date"); + + // Find the closest existing values before and after + const before = sorted + .filter((row) => row.push_date <= targetCommit.push_date) + .sort((a, b) => b.push_date - a.push_date)[0]; + const after = sorted + .filter((row) => row.push_date > targetCommit.push_date) + .sort((a, b) => a.push_date - b.push_date)[0]; + + // If we have data on the target commit or later, use the first available + if (after) return after[field] || 0; + // Otherwise use the last available before + if (before) return before[field] || 0; + // Default to 0 if no data exists + return 0; + }; + + // Get interpolated values for first and last commits + const firstValues = { + time: getInterpolatedValue(firstCommit, "time"), + cost: getInterpolatedValue(firstCommit, "cost"), + count: getInterpolatedValue(firstCommit, "count"), + skipped: getInterpolatedValue(firstCommit, "skipped"), + success: getInterpolatedValue(firstCommit, "success"), + }; + + const lastValues = { + time: getInterpolatedValue(lastCommit, "time"), + cost: getInterpolatedValue(lastCommit, "cost"), + count: getInterpolatedValue(lastCommit, "count"), + skipped: getInterpolatedValue(lastCommit, "skipped"), + success: getInterpolatedValue(lastCommit, "success"), + frequency: getInterpolatedValue(lastCommit, "frequency"), + }; + + return { + id: index, + file, + short_job_name: jobName, + // Last commit values + time: lastValues.time, + cost: lastValues.cost, + skipped: lastValues.skipped, + count: lastValues.count, + success: lastValues.success, + frequency: lastValues.frequency, + // Deltas (last - first) + time_diff: lastValues.time - firstValues.time, + cost_diff: lastValues.cost - firstValues.cost, + skipped_diff: lastValues.skipped - firstValues.skipped, + count_diff: lastValues.count - firstValues.count, + success_diff: lastValues.success - firstValues.success, + }; }); const columns: GridColDef[] = [ - { field: "sha", headerName: "SHA", flex: 1 }, - { - field: "push_date", - headerName: "Push Date", - flex: 1, - renderCell: (params: any) => formatTimestamp(params.value), - }, { field: "file", headerName: "File", @@ -167,28 +231,6 @@ function Diffs({ ), }, - { - field: "count", - headerName: "Count", - flex: 1, - renderHeader: () => renderHeader("Count", "Number of tests"), - }, - { - field: "count_diff", - headerName: "Δ Count", - flex: 1, - cellClassName: (params: any) => { - const value = parseFloat(params.value); - const base = parseFloat(params.row?.count); - if (!isNaN(value) && base && Math.abs(value) / base > 0.2) { - return "highlight"; - } - if (Math.abs(value) > 20) { - return "highlight"; - } - return "change"; - }, - }, { field: "time", headerName: "Duration", @@ -241,6 +283,12 @@ function Diffs({ }, renderCell: roundedCostCell, }, + { field: "success", headerName: "Success", flex: 1 }, + { + field: "success_diff", + headerName: "Δ Success", + flex: 1, + }, // { field: "errors", headerName: "Errors", flex: 1 }, // { // field: "errors_diff", @@ -282,13 +330,6 @@ function Diffs({ "Estimated frequency of test runs for this file (# commits it is run on) in the last week" ), }, - // { field: "successes", headerName: "Successes", flex: 1 }, - // { - // field: "successes_diff", - // headerName: "Δ Successes", - // flex: 1, - // getCellClassName: () => "change", - // }, ]; const styling = { @@ -308,24 +349,50 @@ function Diffs({ File Test Counts This table displays test run statistics for each test file and job - combination. The Δ (delta) columns show the change in each metric - compared to the previous commit for the same file and job. Double click - on the file or job to filter by that value. Highlighted cells are large - changes (either by percent or absolute value) and may indicate - regressions or improvements. + combination, comparing two selected commits. The Δ (delta) columns show + the change in each metric. Values are interpolated if a file/job + combination does not exist on the exact commits (using the nearest + available data point). Double click on the file or job to filter by that + value. Highlighted cells are large changes (either by percent or + absolute value) and may indicate regressions or improvements. - Pricing is approximate and per commit. Some pricing data may be missing (ex mac, rocm), in those cases the cost will be 0. + + Compare: + + vs + + + ({ - ...row, - ...(rowDiffs[row.id] || {}), - }))} + rows={diffRows} sx={styling} columns={columns} initialState={{ @@ -339,314 +406,151 @@ function Diffs({ ); } -function Overview({ - data, - setFileFilter, - setJobFilter, - setLabelFilter, -}: { - data: { [key: string]: any }[]; - setFileFilter: (_: string) => void; - setJobFilter: (_: string) => void; - setLabelFilter: (_: string) => void; -}) { - const groupByOptions = { - file: { - headerName: "File", - field: "file", - buttonText: "Group by File", - onDoubleClick: (value: any) => setFileFilter(value), - onDoubleClickHelpText: "Double-click to filter by this file", - groupByKey: (v: any) => [v.file], +function CommitTimeline({ data }: { data: any[] }) { + const sortedData = [...data].sort( + (a, b) => new Date(a.push_date).getTime() - new Date(b.push_date).getTime() + ); + + const option = { + tooltip: { + trigger: "axis", + formatter: (params: any) => { + const p = params[0].data; + return `SHA: ${p.sha}
Date: ${new Date( + p.date * 1000 + ).toLocaleString()}`; + }, }, - job: { - headerName: "Job", - field: "short_job_name", - buttonText: "Group by Job", - onDoubleClick: (value: any) => setJobFilter(value), - onDoubleClickHelpText: "Double-click to filter by this job", - groupByKey: (v: any) => [v.short_job_name], + xAxis: { + type: "time", + name: "Push Date", }, - label: { - headerName: "Label", - field: "labels", - buttonText: "Group by Label", - onDoubleClick: (value: any) => setLabelFilter(value), - onDoubleClickHelpText: "Double-click to filter by this label", - groupByKey: (v: any) => v.labels, + yAxis: { + type: "value", + show: false, // optional, since commits are all on the same level }, - total: { - headerName: "Total", - field: "total", - buttonText: "Total", - onDoubleClick: () => {}, - onDoubleClickHelpText: "", - groupByKey: (_: any) => ["total"], + series: [ + { + type: "line", + data: sortedData.map((commit) => ({ + value: [new Date(commit.push_date * 1000), 1], + sha: commit.sha, + date: commit.push_date, + })), + symbolSize: 10, + lineStyle: { + color: "#1976d2", + }, + itemStyle: { + color: "#1976d2", + }, + showSymbol: true, // show dots at commits + }, + ], + grid: { + left: "10%", + right: "10%", + bottom: "20%", + top: "20%", }, }; - const [groupBy, setGroupBy] = useState("file"); - const columns: any[] = [ - { - field: groupByOptions[groupBy].field, - headerName: groupByOptions[groupBy].headerName, - flex: 4, - renderCell: (params: any) => ( - - groupByOptions[groupBy].onDoubleClick(params.value) - } - title={groupByOptions[groupBy].onDoubleClickHelpText} - > - {params.value} - - ), - renderHeader: () => - renderHeader( - groupByOptions[groupBy].headerName, - groupByOptions[groupBy].onDoubleClickHelpText - ), - }, - { - field: "count", - headerName: "Count", - flex: 1, - renderHeader: () => renderHeader("Count", "Number of tests"), - }, - { - field: "time", - headerName: "Duration", - flex: 1, - renderCell: renderTimeCell, - renderHeader: () => - renderHeader( - "Duration", - "Duration of the test(s) for one commit if run sequentially" - ), - }, - { - field: "cost", - headerName: "Cost ($)", - flex: 1, - renderCell: roundedCostCell, - renderHeader: () => - renderHeader( - "Cost ($)", - "Estimated cost of the test(s) for one commit" - ), - }, - { - field: "skipped", - headerName: "Skipped", - flex: 1, - renderHeader: () => renderHeader("Skipped", "Number of skipped tests"), - }, - // { - // field: "frequency", - // headerName: "Frequency", - // flex: 1, - // renderHeader: () => - // renderHeader("Frequency", "Frequency of test runs for this file"), - // }, - ]; - - const groupByTarget = _.reduce( - data, - (acc, row) => { - const keys = groupByOptions[groupBy].groupByKey(row) as string[]; - keys.forEach((key) => { - acc[key] = acc[key] || []; - acc[key].push(row); - }); - return acc; - }, - {} as Record - ); - - const groupedRows = _.map(groupByTarget, (rows, key) => { - // Sum - const summed = _.reduce( - rows, - (acc, row) => { - acc.count += row.count || 0; - acc.time += row.time || 0; - acc.cost += row.cost || 0; - acc.skipped += row.skipped || 0; - acc.frequency += row.frequency || 0; - return acc; - }, - { count: 0, time: 0, cost: 0, skipped: 0, frequency: 0 } - ); - - // Average across sha data points - const numShas = _.uniq(rows.map((r) => r.sha)).length; - return { - id: rows[0].id, - file: rows[0].file, - short_job_name: rows[0].short_job_name, - labels: key, - count: summed.count / numShas, - time: summed.time / numShas, - cost: summed.cost / numShas, - skipped: summed.skipped / numShas, - frequency: summed.frequency / numShas, - }; - }); return ( - - Overview - - This section provides an overview of the test statistics. Values are - summed within a commit, then averaged. + <> + + Commit Timeline: The timeline below visualizes the + sequence of commits included in this report. Each point represents a + commit, arranged chronologically from left to right. Hover over a point + to see the commit SHA and its push date. - - Pricing is approximate and per commit. Some pricing data may be missing - (ex mac, rocm), in those cases the cost will be 0. - - - - {Object.entries(groupByOptions).map(([key, setting]) => ( - - ))} - - - - - - - ); -} - -function CommitInfo({ data }: { data: any[] }) { - const commits = _.reduce( - data, - (acc, row) => { - const key = row.sha; - acc[key] = row.push_date; - return acc; - }, - {} as Record - ); - return ( - - Commits - - These are the commits that are included in the report. - - ({ - id: sha, - push_date: pushDate, - }))} - columns={[ - { field: "id", headerName: "SHA", flex: 1 }, - { - field: "push_date", - headerName: "Push Date", - flex: 1, - renderCell: (params: any) => formatTimestamp(params.value), - }, - ]} - initialState={{ - sorting: { - sortModel: [{ field: "push_date", sort: "asc" }], - }, - }} - /> - + + ); } function Graphs({ data }: { data: any[] }) { - // Map selector value to field and label - const groupByOptions = { - file: { - getGroupByField: (d: any) => d.file, - groupByButtonText: "Group by File", - }, - job: { - getGroupByField: (d: any) => d.short_job_name, - groupByButtonText: "Group by Job", - }, - filejob: { - getGroupByField: (d: any) => `${d.short_job_name} | ${d.file}`, - groupByButtonText: "Group by File + Job", - }, - total: { - getGroupByField: (_: any) => `total`, - groupByButtonText: "Total", - }, - }; + data = data.map((item) => { + return { ...item, push_date: item.push_date * 1000 }; + }); + const shasByDate: Record = data.reduce((acc, item) => { + acc[item.push_date] = item.sha; + return acc; + }, {} as Record); + const metricOptions = { - count: { label: "Count", field: "count" }, cost: { label: "Cost", field: "cost" }, duration: { label: "Duration", field: "time" }, skips: { label: "Skips", field: "skipped" }, + successes: { label: "Success", field: "success" }, }; - const [metric, setMetric] = useState("count"); - const [groupBy, setGroupBy] = useState("file"); + const [metric, setMetric] = useState("cost"); + + const fieldName = metricOptions[metric].field; + + const echartData = _.map( + _.groupBy(data, (row) => `${row.file} | ${row.short_job_name}`), + (rows) => { + // Sort each series by push_date to ensure lines go forward in time and + // strip out the unnecessary data + const lineData = rows + .sort((a, b) => a.push_date - b.push_date) + .map((r) => [r.push_date, Number(r[fieldName]) || 0]); - const chartData = _.map( - // Group by the sha and the option that is selected - _.groupBy(data, (d) => { - return [d.sha, groupByOptions[groupBy].getGroupByField(d)]; - }), - // Sum over each group - (rows, key) => { return { - push_date: rows[0].push_date, - key: key.split(",")[1], - [metricOptions[metric].field]: _.sumBy( - rows, - (d) => d[metricOptions[metric].field] - ), + name: `${rows[0].file} | ${rows[0].short_job_name}`, + type: "line", + data: lineData, + connectNulls: false, }; } ); - // Convert to series - const echartData = _.map(_.groupBy(chartData, "key"), (rows) => ({ - name: rows[0].key, - type: "line", - data: rows.map((r) => [r.push_date, r[metricOptions[metric].field]]), - })); + const option = { - tooltip: { trigger: "axis" }, + tooltip: { + trigger: "item", + formatter: (params: any) => { + const date = new Date(params.value[0]).toLocaleString(); + return `${params.seriesName}
Date: ${date}
${ + metricOptions[metric].label + }: ${params.value[1]}
${shasByDate[params.value[0]]}`; + }, + }, legend: { - type: "scroll", - orient: "vertical", - right: 10, - selector: [ - { - type: "all", - title: "All", - }, - { - type: "inverse", - title: "Inv", - }, - ], + show: false, }, xAxis: { type: "time", name: "Push Date" }, yAxis: { type: "value", name: metricOptions[metric].label }, series: echartData, + grid: { + left: "10%", + right: "5%", + bottom: "10%", + top: "5%", + containLabel: true, + }, + dataZoom: [ + { + type: "slider", // horizontal slider for x-axis + xAxisIndex: 0, + start: 0, + end: 100, + }, + { + type: "inside", // enable mouse wheel/pinch zoom for x-axis + xAxisIndex: 0, + }, + { + type: "slider", // vertical slider for y-axis + yAxisIndex: 0, + start: 0, + end: 100, + }, + { + type: "inside", // enable mouse wheel/pinch zoom for y-axis + yAxisIndex: 0, + }, + ], }; return ( @@ -669,21 +573,10 @@ function Graphs({ data }: { data: any[] }) { ))} - - {Object.entries(groupByOptions).map(([key, option]) => ( - - ))} - @@ -692,135 +585,224 @@ function Graphs({ data }: { data: any[] }) { ); } +function StatusIcon({ status }: { status: string }) { + let icon = null; + if (status === "failure") { + icon = ; + } else if (status === "flaky") { + icon = ; + } else if (status === "success") { + icon = ; + } else if (status === "skipped") { + icon = ; + } else if (status === "removed") { + icon = ; + } else if (status === "added") { + icon = ; + } + return icon; +} + +function useStatusChangeData( + uniqueFiles: string[], + uniqueJobs: string[], + sha1: string, + sha2: string +) { + // Sort for consistent cache keys and remove .py suffixes + const sortedFiles = [...uniqueFiles].sort(); + const sortedJobs = [...uniqueJobs].sort(); + + const swrKey = + sha1 && sha2 + ? `/api/flaky-tests/statusChanges:${sha1}:${sha2}:${JSON.stringify( + sortedFiles + )}:${JSON.stringify(sortedJobs)}` + : null; + + // Custom fetcher for POST requests to get around URL header length limits + const postFetcher = async () => { + const response = await fetch("/api/flaky-tests/statusChanges", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + sha1, + sha2, + files: sortedFiles, + jobs: sortedJobs, + fuzzy: true, // Enable fuzzy matching to find nearest jobs + }), + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + return response.json(); + }; + + const { data, error, isLoading } = useSWRImmutable(swrKey, postFetcher); + + return { + data: data || [], + error, + isLoading, + }; +} + function TestStatus({ shas, - rowMatchesFilters, + data, }: { shas: { sha: string; push_date: number }[]; - rowMatchesFilters: (row: any) => boolean; + data: any[]; }) { - const options = shas; - const [selectedIndex, setSelectedIndex] = useState( - shas.length > 0 ? shas.length - 1 : 0 - ); + // Sort commits by date ascending (oldest first) + const allCommits = [...shas].sort((a, b) => a.push_date - b.push_date); + // State for selected commits (default to first and last) + const [firstCommitIndex, setFirstCommitIndex] = useState(0); + const [lastCommitIndex, setLastCommitIndex] = useState(allCommits.length - 1); + + // Update indices when data changes useEffect(() => { - if (shas.length > 0 && selectedIndex === 0) { - setSelectedIndex(shas.length - 1); - } - }, [shas, selectedIndex]); - let data = useData( - selectedIndex !== 0 - ? `${S3_LOCATION}/status_changes_${options[selectedIndex - 1].sha}_${ - options[selectedIndex].sha - }.json.gz` - : undefined + setFirstCommitIndex(0); + setLastCommitIndex(allCommits.length - 1); + }, [allCommits.length]); + + // Extract unique files and jobs from the filtered data + const uniqueFiles = _.uniq(data.map((d) => d.file)); + const uniqueJobs = _.uniq(data.map((d) => d.short_job_name)); + + // Fetch status changes from API + const sha1 = allCommits[firstCommitIndex]?.sha; + const sha2 = allCommits[lastCommitIndex]?.sha; + + const statusChangeResult = useStatusChangeData( + uniqueFiles, + uniqueJobs, + sha1, + sha2 ); - // Apply the same file/job/label filter to statusChangeData - data = data?.filter(rowMatchesFilters); + // Transform the data + const statusData = (statusChangeResult.data || []).map( + (row: any, index: number) => ({ + id: index, + prev_status: row.prev_status, + new_status: row.new_status, + file: row.invoking_file, + test_name: row.name, + short_job_name: `${row.workflow_name} / ${row.job_name}`, + classname: row.classname, + }) + ); const columns: any[] = [ - { field: "status", headerName: "Status", flex: 1 }, - { field: "file", headerName: "File", flex: 4 }, - { field: "test_name", headerName: "Test", flex: 4 }, - { field: "short_job_name", headerName: "Job", flex: 4 }, - { field: "sha", headerName: "SHA", flex: 1 }, { - field: "push_date", - headerName: "Push Date", - flex: 1, - renderCell: (params: any) => formatTimestamp(params.value), + field: "status", + headerName: "Status", + flex: 2, + valueGetter: (_value: any, row: any) => { + // Create a sortable string from prev_status and new_status + const prev = row.prev_status || "none"; + const next = row.new_status || "none"; + return `${prev} → ${next}`; + }, + renderCell: (params: any) => { + const prevStatus = params.row.prev_status || ""; + const newStatus = params.row.new_status || ""; + + const prevText = prevStatus === "" ? "none" : prevStatus; + const newText = newStatus === "" ? "none" : newStatus; + + return ( + + {prevStatus && } + + {prevText} + + + {newStatus && } + + {newText} + + + ); + }, }, + { field: "file", headerName: "File", flex: 2 }, + { field: "test_name", headerName: "Test", flex: 4 }, + { field: "short_job_name", headerName: "Job", flex: 4 }, ]; return ( - - setFirstCommitIndex(Number(e.target.value))} + sx={{ minWidth: 200 }} + > + {allCommits.map((commit, index) => ( + + {commit.sha.slice(0, 7)} ({formatTimestamp(commit.push_date)}) - ); - })} - - - - - ); -} - -// Custom hook to fetch real data from the local JSON file -function useData(link: string | undefined) { - const [data, setData] = useState([]); - - useEffect(() => { - if (!link) return; - - fetch(link) - .then((response) => - response.ok ? response.text() : Promise.reject("Failed to load") - ) - .then((text) => { - const final = []; - for (const line of text.split("\n")) { - if (line.trim()) { - final.push(JSON.parse(line)); - } - } - setData(final.map((item, index) => ({ ...item, id: index }))); - }); - }, [link]); - return data; -} - -function useWeeksData(commitMetadata: any[], headShaIndex: number) { - const [data, setData] = useState([]); + ))} + + vs + + - useEffect(() => { - if (headShaIndex == -1 || commitMetadata.length === 0) return; + {statusChangeResult.isLoading && ( + + Loading status changes... + + )} - const shasToFetch = []; - for (let i = headShaIndex; i >= 0 && i > headShaIndex - 7; --i) { - shasToFetch.push(commitMetadata[i].sha); - } + {statusChangeResult.error && ( + + Error loading status changes: {statusChangeResult.error.message} + + )} - Promise.all( - shasToFetch.map((sha) => - fetch(`${S3_LOCATION}/data_${sha}.json.gz`) - .then((response) => - response.ok ? response.text() : Promise.reject("Failed to load") - ) - .then((text) => { - const final = []; - for (const line of text.split("\n")) { - if (line.trim()) { - final.push(JSON.parse(line)); - } - } - return final.map((item, index) => ({ ...item, id: index })); - }) - ) - ).then((allData) => { - // Flatten the array of arrays - setData(allData.flat()); - }); - }, [commitMetadata, headShaIndex]); - return data; + + + + + ); } export default function Page() { @@ -835,6 +817,12 @@ export default function Page() { const jobInputRef = useRef(null); const labelInputRef = useRef(null); const [baseUrl, setBaseUrl] = useState(""); + const [endDate, setEndDate] = useState( + dayjs().subtract(5, "hour") + ); + const [startDate, setStartDate] = useState( + dayjs().subtract(5, "hour").subtract(7, "day") + ); // Keep input fields in sync when filters are set programmatically useEffect(() => { @@ -854,21 +842,20 @@ export default function Page() { }, [labelFilter]); const router = useRouter(); - const commitMetadata = useData(`${S3_LOCATION}/commits_metadata.json.gz`); - const [headShaIndex, setHeadShaIndex] = useState( - commitMetadata.length - 1 - ); - - let data = useWeeksData(commitMetadata, headShaIndex).map((item, index) => ({ - ...item, - id: index, - })); - useEffect(() => { - if (headShaIndex == -1 && commitMetadata.length > 0) { - setHeadShaIndex(commitMetadata.length - 1); - } - }, [commitMetadata, headShaIndex]); + const { + data: rawData, + isLoading, + error, + } = useSWRImmutable( + startDate && endDate + ? `/api/flaky-tests/fileReport?${encodeParams({ + startDate: startDate.unix().toString(), + endDate: endDate.unix().toString(), + })}` + : null, + fetcher + ); useEffect(() => { // Sync filters from the router query params in one effect to avoid @@ -892,6 +879,40 @@ export default function Page() { ); }, [router.query]); + let { + results: data, + costInfo, + shas, + testOwnerLabels, + } = rawData || { + results: [], + costInfo: [], + shas: [], + testOwnerLabels: [], + }; + + data = data.map((row: any, index: number) => { + const match = costInfo.find((r: any) => r.label === row.label); + const ownerLabels = testOwnerLabels.find( + (r: any) => r.file === `${row.file}.py` + ); + const commit = shas.find((s: any) => s.sha === row.sha); + const { workflow_name, job_name, ...rest } = row; + return { + ...rest, + id: index, + cost: (row.time * (match?.price_per_hour ?? 0)) / 3600, + short_job_name: `${workflow_name} / ${job_name}`, + labels: ownerLabels ? ownerLabels.owner_labels : ["unknown"], + push_date: commit ? commit.push_date : 0, + sha: commit ? commit.sha : "unknown", + }; + }); + + shas.forEach((commit: any, index: number) => { + commit.id = index; + }); + if (!router.isReady) { return ; } @@ -912,7 +933,9 @@ export default function Page() { return fileMatch && jobMatch && labelMatch; } - data = data.filter(rowMatchesFilters); + const filteredData = data.filter(rowMatchesFilters); + data = filteredData; + return ( @@ -954,20 +977,23 @@ export default function Page() { Select a commit to view data from the week leading up to that commit. - + + Date Range: + + setStartDate(newValue)} + slotProps={{ textField: { size: "small" } }} + /> + setEndDate(newValue)} + slotProps={{ textField: { size: "small" } }} + /> + + - - { - setFileFilter(input); - setFileRegex(false); - }} - setJobFilter={(input) => { - setJobFilter(input); - setJobRegex(false); - }} - setLabelFilter={(input) => { - setLabelFilter(input); - setLabelRegex(false); - }} - /> + { setFileFilter(input); @@ -1099,16 +1111,10 @@ export default function Page() { Status Changes This table lists the tests that were added, removed, started skipping, - or stopped skipping. This will only show at most 50 entries per commit - pair due to file size. + or stopped skipping. This will only show at most 200 entries due to + API limits. - = 0 ? headShaIndex - 7 : 0, - headShaIndex + 1 - )} - rowMatchesFilters={rowMatchesFilters} - /> + );