Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 76b90da

Browse files
committed
feat: new TA processor implementation
we make use of the TA timeseries functions to process testruns persisting them to Timescale instead of postgres in test_results_processor we toggle which implementation to use based on should_use_timeseries
1 parent 4458704 commit 76b90da

File tree

6 files changed

+642
-1
lines changed

6 files changed

+642
-1
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from typing import Any, cast
5+
6+
import sentry_sdk
7+
import test_results_parser
8+
from shared.config import get_config
9+
from shared.django_apps.core.models import Commit, Repository
10+
from shared.django_apps.reports.models import ReportSession, UploadError
11+
from shared.storage.base import BaseStorageService
12+
13+
from services.test_analytics.ta_timeseries import get_flaky_tests_set, insert_testrun
14+
from services.yaml import UserYaml, read_yaml_field
15+
16+
17+
@dataclass
18+
class TAProcInfo:
19+
repository: Repository
20+
branch: str | None
21+
bucket_name: str
22+
user_yaml: UserYaml
23+
24+
25+
def handle_file_not_found(upload: ReportSession):
26+
upload.state = "processed"
27+
upload.save()
28+
UploadError.objects.create(
29+
report_session=upload,
30+
error_code="file_not_in_storage",
31+
error_params={},
32+
)
33+
34+
35+
def handle_parsing_error(upload: ReportSession, exc: Exception):
36+
sentry_sdk.capture_exception(exc, tags={"upload_state": upload.state})
37+
upload.state = "processed"
38+
upload.save()
39+
UploadError.objects.create(
40+
report_session=upload,
41+
error_code="unsupported_file_format",
42+
error_params={"error_message": str(exc)},
43+
)
44+
45+
46+
def get_ta_processing_info(
47+
repoid: int,
48+
commitid: str,
49+
commit_yaml: dict[str, Any],
50+
) -> TAProcInfo:
51+
repository = Repository.objects.get(repoid=repoid)
52+
53+
commit = Commit.objects.get(repository=repository, commitid=commitid)
54+
branch = commit.branch
55+
if branch is None:
56+
raise ValueError("Branch is None")
57+
58+
bucket_name = cast(
59+
str, get_config("services", "minio", "bucket", default="archive")
60+
)
61+
user_yaml: UserYaml = UserYaml(commit_yaml)
62+
return TAProcInfo(
63+
repository,
64+
branch,
65+
bucket_name,
66+
user_yaml,
67+
)
68+
69+
70+
def should_delete_archive(user_yaml: UserYaml) -> bool:
71+
if get_config("services", "minio", "expire_raw_after_n_days"):
72+
return True
73+
return not read_yaml_field(user_yaml, ("codecov", "archive", "uploads"), _else=True)
74+
75+
76+
def delete_archive(
77+
storage_service: BaseStorageService, upload: ReportSession, bucket_name: str
78+
):
79+
archive_url = upload.storage_path
80+
if archive_url and not archive_url.startswith("http"):
81+
storage_service.delete_file(bucket_name, archive_url)
82+
83+
84+
def insert_testruns_timeseries(
85+
repoid: int,
86+
commitid: str,
87+
branch: str | None,
88+
upload: ReportSession,
89+
parsing_infos: list[test_results_parser.ParsingInfo],
90+
):
91+
flaky_test_set = get_flaky_tests_set(repoid)
92+
93+
for parsing_info in parsing_infos:
94+
insert_testrun(
95+
timestamp=upload.created_at,
96+
repo_id=repoid,
97+
commit_sha=commitid,
98+
branch=branch,
99+
upload_id=upload.id,
100+
flags=upload.flag_names,
101+
parsing_info=parsing_info,
102+
flaky_test_ids=flaky_test_set,
103+
)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import logging
2+
from typing import Any
3+
4+
import shared.storage
5+
from shared.django_apps.core.models import Commit
6+
from shared.django_apps.reports.models import ReportSession
7+
from shared.storage.exceptions import FileNotInStorageError
8+
from test_results_parser import parse_raw_upload
9+
10+
from services.processing.types import UploadArguments
11+
from services.test_analytics.ta_processing import (
12+
delete_archive,
13+
get_ta_processing_info,
14+
handle_file_not_found,
15+
handle_parsing_error,
16+
insert_testruns_timeseries,
17+
should_delete_archive,
18+
)
19+
20+
log = logging.getLogger(__name__)
21+
22+
23+
def ta_processor_impl(
24+
repoid: int,
25+
commitid: str,
26+
commit_yaml: dict[str, Any],
27+
argument: UploadArguments,
28+
) -> bool:
29+
log.info(
30+
"Processing single TA argument",
31+
extra=dict(
32+
upload_id=argument.get("upload_id"),
33+
repoid=repoid,
34+
commitid=commitid,
35+
),
36+
)
37+
38+
upload_id = argument.get("upload_id")
39+
if upload_id is None:
40+
return False
41+
42+
upload = ReportSession.objects.get(id=upload_id)
43+
if upload.state == "processed":
44+
# don't need to process again because the intermediate result should already be in redis
45+
return False
46+
47+
if upload.storage_path is None:
48+
handle_file_not_found(upload)
49+
return False
50+
51+
ta_proc_info = get_ta_processing_info(repoid, commitid, commit_yaml)
52+
53+
storage_service = shared.storage.get_appropriate_storage_service(
54+
ta_proc_info.repository.repoid
55+
)
56+
57+
try:
58+
payload_bytes = storage_service.read_file(
59+
ta_proc_info.bucket_name, upload.storage_path
60+
)
61+
except FileNotInStorageError:
62+
handle_file_not_found(upload)
63+
return False
64+
65+
try:
66+
parsing_infos, readable_file = parse_raw_upload(payload_bytes)
67+
except RuntimeError as exc:
68+
handle_parsing_error(upload, exc)
69+
return False
70+
71+
branch = Commit.objects.get(id=upload.report.commit_id).branch
72+
73+
insert_testruns_timeseries(repoid, commitid, branch, upload, parsing_infos)
74+
75+
if should_delete_archive(ta_proc_info.user_yaml):
76+
delete_archive(storage_service, upload, ta_proc_info.bucket_name)
77+
else:
78+
storage_service.write_file(
79+
ta_proc_info.bucket_name, upload.storage_path, bytes(readable_file)
80+
)
81+
82+
return True
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pytest
2+
from shared.storage import get_appropriate_storage_service
3+
from shared.storage.exceptions import BucketAlreadyExistsError
4+
5+
6+
@pytest.fixture
7+
def storage():
8+
storage_service = get_appropriate_storage_service()
9+
try:
10+
storage_service.create_root_storage("archive")
11+
except BucketAlreadyExistsError:
12+
pass
13+
return storage_service

0 commit comments

Comments
 (0)