Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK: Robust Metamist Integrations #784

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
105 changes: 81 additions & 24 deletions cpg_workflows/metamist.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
import traceback
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
from typing import Any, Callable, Optional

from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from cpg_utils import Path, to_path
from cpg_utils.config import get_config
Expand All @@ -21,7 +28,7 @@
from cpg_workflows.utils import exists
from metamist import models
from metamist.apis import AnalysisApi
from metamist.exceptions import ApiException
from metamist.exceptions import ApiException, ServiceException
from metamist.graphql import gql, query

GET_SEQUENCING_GROUPS_QUERY = gql(
Expand Down Expand Up @@ -259,6 +266,49 @@ def __init__(self) -> None:
self.default_dataset: str = get_config()['workflow']['dataset']
self.aapi = AnalysisApi()

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=3, min=8, max=30),
retry=retry_if_exception_type(ServiceException),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way for us to be alerted if too many of these exceptions are raised?

reraise=True,
)
def make_retry_aapi_call(self, api_func: Callable, **kwargv: Any):
"""
Make a generic API call to self.aapi with retries.
Retry only if ServiceException is thrown

TODO: How many retries?
e.g. try 3 times, wait 2^3: 8, 16, 24 seconds
"""
try:
return api_func(**kwargv)
except ServiceException:
# raise here so the retry occurs
logging.warning(
f'Retrying {api_func} ...',
)
raise

def make_aapi_call(self, api_func: Callable, **kwargv: Any):
"""
Make a generic API call to self.aapi.
This is a wrapper around retry of API call to handle exceptions and logging.
"""
try:
return self.make_retry_aapi_call(api_func, **kwargv)
except (ServiceException, ApiException) as e:
# Metamist API failed even after retries
# log the error and continue
traceback.print_exc()
logging.error(
f'Error: {e} Call {api_func} failed with payload:\n{str(kwargv)}',
)
# TODO: discuss should we catch all here as well?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

certainly a good idea.

# except Exception as e:
# # Other exceptions?

return None

def get_sgs_for_cohorts(self, cohort_ids: list[str]) -> dict[str, dict[str, Any]]:
"""
Retrieve the sequencing groups per dataset for a list of cohort IDs.
Expand Down Expand Up @@ -314,13 +364,15 @@ def update_analysis(self, analysis: Analysis, status: AnalysisStatus):
"""
Update "status" of an Analysis entry.
"""
try:
self.aapi.update_analysis(
analysis.id,
models.AnalysisUpdateModel(status=models.AnalysisStatus(status.value)),
)
except ApiException:
traceback.print_exc()
self.make_aapi_call(
self.aapi.update_analysis,
analysis_id=analysis.id,
analysis_update_model=models.AnalysisUpdateModel(
status=models.AnalysisStatus(status.value),
),
)
# Keeping this as is for compatibility with the existing code
# However this should only be set after the API call is successful
analysis.status = status

# NOTE: This isn't used anywhere.
Expand All @@ -335,16 +387,19 @@ def find_joint_calling_analysis(
metamist_proj = dataset or self.default_dataset
if get_config()['workflow']['access_level'] == 'test':
metamist_proj += '-test'
try:
data = self.aapi.get_latest_complete_analysis_for_type(
project=metamist_proj,
analysis_type=models.AnalysisType('joint-calling'),
)
except ApiException:

data = self.make_aapi_call(
self.aapi.get_latest_complete_analysis_for_type,
project=metamist_proj,
analysis_type=models.AnalysisType('joint-calling'),
)
if data is None:
return None

a = Analysis.parse(data)
if not a:
return None

assert a.type == AnalysisType.JOINT_CALLING, data
assert a.status == AnalysisStatus.COMPLETED, data
if a.sequencing_group_ids != set(sequencing_group_ids):
Expand Down Expand Up @@ -428,10 +483,15 @@ def create_analysis(
sequencing_group_ids=list(sequencing_group_ids),
meta=meta or {},
)
try:
aid = self.aapi.create_analysis(project=metamist_proj, analysis=am)
except ApiException:
traceback.print_exc()
aid = self.make_aapi_call(
self.aapi.create_analysis,
project=metamist_proj,
analysis=am,
)
if aid is None:
logging.error(
f'Failed to create Analysis(type={type_}, status={status}, output={str(output)}) in {metamist_proj}',
)
return None
else:
logging.info(
Expand Down Expand Up @@ -635,11 +695,8 @@ def parse_reads( # pylint: disable=too-many-return-statements
index_location = None
if reads_data[0].get('secondaryFiles'):
index_location = reads_data[0]['secondaryFiles'][0]['location']
if (
location.endswith('.cram')
and not index_location.endswith('.crai')
or location.endswith('.bai')
and not index_location.endswith('.bai')
if (location.endswith('.cram') and not index_location.endswith('.crai')) or (
location.endswith('.bai') and not index_location.endswith('.bai')
):
raise MetamistError(
f'{sequencing_group_id}: ERROR: expected the index file to have an extension '
Expand Down
27 changes: 13 additions & 14 deletions cpg_workflows/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@ def complete_analysis_job(
update_analysis_meta (Callable | None): function to update analysis meta
tolerate_missing (bool): if True, allow missing output
"""
import traceback

from cpg_utils import to_path
from metamist.apis import AnalysisApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Gross that we used to do this here.

from metamist.exceptions import ApiException
from metamist.models import Analysis, AnalysisStatus

from .metamist import AnalysisStatus, get_metamist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just need to keep an eye on this it case it creates any pickling issues. Let's try a test run later!


assert isinstance(output, str)
output_cloudpath = to_path(output)
Expand Down Expand Up @@ -76,22 +74,20 @@ def complete_analysis_job(
if not output_cloudpath.is_dir():
meta |= {'size': output_cloudpath.stat().st_size}

this_analysis = Analysis(
type=analysis_type,
status=AnalysisStatus('completed'),
a_id = get_metamist().create_analysis(
output=output,
type_=analysis_type,
status=AnalysisStatus('completed'),
sequencing_group_ids=sg_ids,
dataset=project_name,
meta=meta,
)
aapi = AnalysisApi()
try:
a_id = aapi.create_analysis(project=project_name, analysis=this_analysis)
except ApiException:
traceback.print_exc()
if a_id is None:
print(f'Creation of Analysis failed (type={analysis_type}, output={output}) in {project_name}')
# What Exception should we raise here?
raise
else:
print(f'Created Analysis(id={a_id}, type={analysis_type}, output={output}) in {project_name}')
return


class StatusReporterError(Exception):
Expand Down Expand Up @@ -158,7 +154,10 @@ def create_analysis(

# find all relevant SG IDs
sg_ids = target.get_sequencing_group_ids()
py_job = b.new_python_job(f'Register analysis output {output}', job_attr or {} | {'tool': 'metamist'})
py_job = b.new_python_job(
f'Register analysis output {output}',
job_attr or {} | {'tool': 'metamist'},
)
py_job.image(get_config()['workflow']['driver_image'])
py_job.call(
complete_analysis_job,
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ black
pytest
pytest_mock
mypy
tenacity
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
'bokeh',
'numpy',
'click',
'tenacity',
'toml',
],
extras_require={
Expand Down
Loading
Loading