Skip to content

Commit

Permalink
feat: new TA processor implementation
Browse files Browse the repository at this point in the history
we make use of the TA timeseries functions to process testruns
persisting them to Timescale instead of postgres

in test_results_processor we toggle which implementation to use
based on impl_type
  • Loading branch information
joseph-sentry committed Mar 3, 2025
1 parent b23aa8b commit 27a1def
Showing 6 changed files with 663 additions and 1 deletion.
103 changes: 103 additions & 0 deletions services/test_analytics/ta_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, cast

import sentry_sdk
import test_results_parser
from shared.config import get_config
from shared.django_apps.core.models import Commit, Repository
from shared.django_apps.reports.models import ReportSession, UploadError
from shared.storage.base import BaseStorageService

from services.test_analytics.ta_timeseries import get_flaky_tests_set, insert_testrun
from services.yaml import UserYaml, read_yaml_field


@dataclass
class TAProcInfo:
repository: Repository
branch: str | None
bucket_name: str
user_yaml: UserYaml


def handle_file_not_found(upload: ReportSession):
upload.state = "processed"
upload.save()
UploadError.objects.create(
report_session=upload,
error_code="file_not_in_storage",
error_params={},
)


def handle_parsing_error(upload: ReportSession, exc: Exception):
sentry_sdk.capture_exception(exc, tags={"upload_state": upload.state})
upload.state = "processed"
upload.save()
UploadError.objects.create(
report_session=upload,
error_code="unsupported_file_format",
error_params={"error_message": str(exc)},
)


def get_ta_processing_info(
repoid: int,
commitid: str,
commit_yaml: dict[str, Any],
) -> TAProcInfo:
repository = Repository.objects.get(repoid=repoid)

commit = Commit.objects.get(repository=repository, commitid=commitid)
branch = commit.branch
if branch is None:
raise ValueError("Branch is None")

bucket_name = cast(
str, get_config("services", "minio", "bucket", default="archive")
)
user_yaml: UserYaml = UserYaml(commit_yaml)
return TAProcInfo(
repository,
branch,
bucket_name,
user_yaml,
)


def should_delete_archive(user_yaml: UserYaml) -> bool:
if get_config("services", "minio", "expire_raw_after_n_days"):
return True
return not read_yaml_field(user_yaml, ("codecov", "archive", "uploads"), _else=True)


def delete_archive(
storage_service: BaseStorageService, upload: ReportSession, bucket_name: str
):
archive_url = upload.storage_path
if archive_url and not archive_url.startswith("http"):
storage_service.delete_file(bucket_name, archive_url)


def insert_testruns_timeseries(
repoid: int,
commitid: str,
branch: str | None,
upload: ReportSession,
parsing_infos: list[test_results_parser.ParsingInfo],
):
flaky_test_set = get_flaky_tests_set(repoid)

for parsing_info in parsing_infos:
insert_testrun(
timestamp=upload.created_at,
repo_id=repoid,
commit_sha=commitid,
branch=branch,
upload_id=upload.id,
flags=upload.flag_names,
parsing_info=parsing_info,
flaky_test_ids=flaky_test_set,
)
90 changes: 90 additions & 0 deletions services/test_analytics/ta_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from typing import Any

import shared.storage
from shared.django_apps.core.models import Commit
from shared.django_apps.reports.models import ReportSession
from shared.storage.exceptions import FileNotInStorageError
from test_results_parser import parse_raw_upload

from services.processing.types import UploadArguments
from services.test_analytics.ta_processing import (
delete_archive,
get_ta_processing_info,
handle_file_not_found,
handle_parsing_error,
insert_testruns_timeseries,
should_delete_archive,
)

log = logging.getLogger(__name__)


def ta_processor_impl(
repoid: int,
commitid: str,
commit_yaml: dict[str, Any],
argument: UploadArguments,
update_state: bool = False,
) -> bool:
log.info(
"Processing single TA argument",
extra=dict(
upload_id=argument.get("upload_id"),
repoid=repoid,
commitid=commitid,
),
)

upload_id = argument.get("upload_id")
if upload_id is None:
return False

upload = ReportSession.objects.get(id=upload_id)
if upload.state == "processed":
# don't need to process again because the intermediate result should already be in redis
return False

if upload.storage_path is None:
if update_state:
handle_file_not_found(upload)
return False

ta_proc_info = get_ta_processing_info(repoid, commitid, commit_yaml)

storage_service = shared.storage.get_appropriate_storage_service(
ta_proc_info.repository.repoid
)

try:
payload_bytes = storage_service.read_file(
ta_proc_info.bucket_name, upload.storage_path
)
except FileNotInStorageError:
if update_state:
handle_file_not_found(upload)
return False

try:
parsing_infos, readable_file = parse_raw_upload(payload_bytes)
except RuntimeError as exc:
if update_state:
handle_parsing_error(upload, exc)
return False

branch = Commit.objects.get(id=upload.report.commit_id).branch

insert_testruns_timeseries(repoid, commitid, branch, upload, parsing_infos)

if update_state:
upload.state = "processed"
upload.save()

if should_delete_archive(ta_proc_info.user_yaml):
delete_archive(storage_service, upload, ta_proc_info.bucket_name)
else:
storage_service.write_file(
ta_proc_info.bucket_name, upload.storage_path, bytes(readable_file)
)

return True
13 changes: 13 additions & 0 deletions services/test_analytics/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from shared.storage import get_appropriate_storage_service
from shared.storage.exceptions import BucketAlreadyExistsError


@pytest.fixture
def storage():
storage_service = get_appropriate_storage_service()
try:
storage_service.create_root_storage("archive")
except BucketAlreadyExistsError:
pass
return storage_service
Loading
Oops, something went wrong.

0 comments on commit 27a1def

Please sign in to comment.