Skip to content

Commit

Permalink
Release 1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
christiam committed May 11, 2023
1 parent 338016f commit f35e19a
Show file tree
Hide file tree
Showing 27 changed files with 273 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ clouseau*
submit-and-wait-for-results.sh
aws-docker-login.txt
elb-run-report.csv
aws-credentials
4 changes: 2 additions & 2 deletions CITATION.cff
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cff-version: "1.2.0"
message: "If you use this software, please cite it using these metadata."
title: ElasticBLAST
version: "1.0.0"
date-released: 2022-12-05
version: "1.1.0"
date-released: 2023-05-11
license: "NCBI Public Domain"
repository-code: "https://github.com/ncbi/elastic-blast/"
url: "https://blast.ncbi.nlm.nih.gov/doc/elastic-blast/"
Expand Down
2 changes: 2 additions & 0 deletions bin/blast-tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ def main():
mt_mode = mt_mode,
query = query_data)
conf[CFG_BLAST][CFG_BLAST_PROGRAM] = args.program
task = ElbSupportedPrograms().get_task(args.program, args.options)
conf[CFG_BLAST][CFG_BLAST_BATCH_LEN] = str(get_batch_length(cloud_provider = cloud_provider,
program = args.program,
task = task,
mt_mode = mt_mode,
num_cpus = num_cpus))

Expand Down
4 changes: 4 additions & 0 deletions bin/elastic-blast
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ from elastic_blast.util import validate_installation, check_positive_int, config
from elastic_blast.util import ElbSupportedPrograms, clean_up
from elastic_blast import constants
from elastic_blast.gcp import check_prerequisites
from elastic_blast.aws import check_auxiliary_versions
from elastic_blast.config import configure
from elastic_blast.elb_config import ElasticBlastConfig
from elastic_blast.constants import ElbCommand
Expand Down Expand Up @@ -73,8 +74,11 @@ def main():
config_logging(args)
cfg = configure(args)
logging.info(f"ElasticBLAST {args.subcommand} {VERSION}")
logging.info(f'python version: {":".join(sys.version.split())}')
if CFG_CP_GCP_PROJECT in cfg[CFG_CLOUD_PROVIDER]:
check_prerequisites()
else:
check_auxiliary_versions()
task = ElbCommand(args.subcommand.lower())
cfg = ElasticBlastConfig(cfg, args.dry_run, task=task)
logging.debug(pformat(cfg.asdict()))
Expand Down
2 changes: 1 addition & 1 deletion docker-blast/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ GCP_IMG?=gcr.io/ncbi-sandbox-blast/${IMG}
AWS_SERVER?=public.ecr.aws/i6v3i0i9
AWS_IMG?=${AWS_SERVER}/elasticblast-elb
AWS_REGION?=us-east-1
VERSION?=1.1.3
VERSION?=1.2.0

ifeq (, $(shell which vmtouch 2>/dev/null))
NOVMTOUCH?=--no-vmtouch
Expand Down
16 changes: 8 additions & 8 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
wheel==0.37.1
setuptools==65.5.1
importlib-resources==5.4.0
importlib-metadata==4.11.1
pex==2.1.117
boto3==1.26.20
botocore==1.29.20
wheel==0.40.0
setuptools==67.6.1
importlib-resources==5.10.2
importlib-metadata==6.0.0
pex==2.1.134
boto3==1.26.122
botocore==1.29.122
awslimitchecker==12.0.0
tenacity==8.1.0
tenacity==8.2.2
dataclasses-json==0.5.7
types-pkg-resources==0.1.3
13 changes: 7 additions & 6 deletions requirements/test.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
-r base.txt

pytest==7.2.0
pytest==7.3.1
pytest-cov==4.0.0
pytest-mock==3.10.0
teamcity-messages==1.32
mypy==0.991
mypy==1.2.0
pylint==2.7.4
tox==3.27.1
yamllint==1.28.0
moto==4.0.11
tox==4.4.12
virtualenv==20.21.0
yamllint==1.31.0
moto==4.1.8
docker==6.0.1
cfn-lint==0.72.1
cfn-lint==0.77.3
18 changes: 16 additions & 2 deletions src/elastic_blast/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from .util import convert_labels_to_aws_tags, convert_disk_size_to_gb
from .util import convert_memory_to_mb, UserReportError
from .util import ElbSupportedPrograms, get_usage_reporting, sanitize_aws_batch_job_name
from .util import get_resubmission_error_msg
from .util import get_resubmission_error_msg, safe_exec
from .constants import BLASTDB_ERROR, CLUSTER_ERROR, ELB_QUERY_LENGTH, PERMISSIONS_ERROR
from .constants import ELB_QUERY_BATCH_DIR, ELB_METADATA_DIR
from .constants import ELB_DOCKER_IMAGE_AWS, INPUT_ERROR, ELB_QS_DOCKER_IMAGE_AWS
Expand All @@ -70,6 +70,7 @@
from .base import DBSource
from .elb_config import ElasticBlastConfig, sanitize_aws_tag
from .elasticblast import ElasticBlast
from . import VERSION


CF_TEMPLATE = os.path.join(os.path.dirname(__file__), 'templates', 'elastic-blast-cf.yaml')
Expand Down Expand Up @@ -146,6 +147,17 @@ def to_list(self) -> List[str]:
return id_list


def check_auxiliary_versions():
"""Check version of auxiliary tools: awscli. No errors will be reported
if the tools are not accessible."""
try:
p = safe_exec('aws --version')
except:
logging.debug('Could not check awscli version')
else:
logging.debug(f'{":".join(p.stdout.decode().split())}')


class ElasticBlastAws(ElasticBlast):
""" Implementation of core ElasticBLAST functionality in AWS.
Uses a CloudFormation template and AWS Batch for its main operation.
Expand Down Expand Up @@ -852,7 +864,9 @@ def is_int(value: str):
{'name': 'BLAST_USAGE_REPORT',
'value': 'true'},
{'name': 'BLAST_ELB_BATCH_NUM',
'value': str(i)}]
'value': str(i)},
{'name': 'BLAST_ELB_VERSION',
'value': VERSION}]
else:
overrides['environment'] = [{'name': 'BLAST_USAGE_REPORT',
'value': 'false'}]
Expand Down
9 changes: 7 additions & 2 deletions src/elastic_blast/aws_traits.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .util import UserReportError, check_aws_region_for_invalid_characters
from .base import InstanceProperties, PositiveInteger, MemoryStr
from .constants import ELB_DFLT_AWS_REGION, INPUT_ERROR, PERMISSIONS_ERROR
from .constants import DEPENDENCY_ERROR


def create_aws_config(region: Optional[str] = None) -> Config:
Expand Down Expand Up @@ -90,12 +91,13 @@ def get_machine_properties(instance_type: str, boto_cfg: Config = None) -> Insta

def get_instance_type_offerings(region: str) -> List[str]:
"""Get a list of instance types offered in an AWS region"""
ec2 = boto3.client('ec2')
boto_cfg = create_aws_config(region)
ec2 = boto3.client('ec2', config=boto_cfg)
try:
current = ec2.describe_instance_type_offerings(LocationType='region', Filters=[{'Name': 'location', 'Values': [region]}])
instance_types = current['InstanceTypeOfferings']
while 'NextToken' in current:
current = ec2.describe_instance_type_offerings(LocationType='regioon', Filters=[{'Name': 'location', 'Values': [region]}], NextToken=current['NextToken'])
current = ec2.describe_instance_type_offerings(LocationType='region', Filters=[{'Name': 'location', 'Values': [region]}], NextToken=current['NextToken'])
instance_types += current['InstanceTypeOfferings']
except ClientError as err:
logging.debug(err)
Expand All @@ -104,6 +106,9 @@ def get_instance_type_offerings(region: str) -> List[str]:
logging.debug(err)
raise UserReportError(returncode=PERMISSIONS_ERROR, message=str(err))

if not instance_types:
raise UserReportError(returncode=DEPENDENCY_ERROR,
message=f'Could not get instance types available in region: {region}')
return [it['InstanceType'] for it in instance_types]


Expand Down
9 changes: 7 additions & 2 deletions src/elastic_blast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def __str__(self):
ELB_DFLT_AWS_REGION = 'us-east-1'
ELB_UNKNOWN_GCP_PROJECT = 'elb-unknown-gcp-project'

ELB_DOCKER_VERSION = '1.1.3'
ELB_DOCKER_VERSION = '1.2.0' # ElasticBLAST 1.1.0 uses BLAST+ 2.14.0
ELB_QS_DOCKER_VERSION = '0.1.4'
ELB_JANITOR_DOCKER_VERSION = '0.3.0'
ELB_JOB_SUBMIT_DOCKER_VERSION = '3.0.0'
Expand All @@ -226,6 +226,11 @@ def __str__(self):
ELB_DFLT_AWS_PROVISIONED_IOPS = '2000'
ELB_DFLT_AWS_SPOT_BID_PERCENTAGE = '100'

# This value is assigned to gke_version. It should be set to None to use the
# default k8s version in GKE, otherwise it should be set to a specific version
# supported by GKE (e.g.: 1.25)
ELB_DFLT_GCP_K8S_VERSION = '1.25'

# Config sections
CFG_CLOUD_PROVIDER = 'cloud-provider'
CFG_CLUSTER = 'cluster'
Expand All @@ -239,7 +244,7 @@ def __str__(self):
CFG_CP_GCP_ZONE = 'gcp-zone'
CFG_CP_GCP_NETWORK = 'gcp-network'
CFG_CP_GCP_SUBNETWORK = 'gcp-subnetwork'
CFG_CP_GCP_GKE_VERSION = 'gke-version'
CFG_CP_GCP_K8S_VERSION = 'gke-version'
CFG_CP_AWS_REGION = 'aws-region'
CFG_CP_AWS_KEY_PAIR = 'aws-key-pair'
CFG_CP_AWS_VPC = 'aws-vpc'
Expand Down
15 changes: 10 additions & 5 deletions src/elastic_blast/elb_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from .constants import CFG_CLOUD_PROVIDER
from .constants import CFG_CP_GCP_PROJECT, CFG_CP_GCP_REGION, CFG_CP_GCP_ZONE
from .constants import CFG_CP_GCP_NETWORK, CFG_CP_GCP_SUBNETWORK
from .constants import CFG_CP_GCP_GKE_VERSION
from .constants import CFG_CP_GCP_K8S_VERSION
from .constants import CFG_CP_AWS_REGION, CFG_CP_AWS_VPC, CFG_CP_AWS_SUBNET
from .constants import CFG_CP_AWS_JOB_ROLE, CFG_CP_AWS_BATCH_SERVICE_ROLE
from .constants import CFG_CP_AWS_INSTANCE_ROLE, CFG_CP_AWS_SPOT_FLEET_ROLE
Expand Down Expand Up @@ -86,6 +86,7 @@
from .constants import BLASTDB_ERROR, ELB_UNKNOWN, ELB_JANITOR_SCHEDULE
from .constants import ELB_DFLT_GCP_REGION, ELB_DFLT_GCP_ZONE
from .constants import ELB_DFLT_AWS_REGION, ELB_UNKNOWN_GCP_PROJECT
from .constants import ELB_DFLT_GCP_K8S_VERSION
from .util import validate_gcp_string, check_aws_region_for_invalid_characters
from .util import validate_gke_cluster_name, ElbSupportedPrograms
from .util import get_query_batch_size, get_gcp_project
Expand Down Expand Up @@ -225,8 +226,7 @@ class GCPConfig(CloudProviderBaseConfig, ConfigParserToDataclassMapper):
network: Optional[str] = None
subnet: Optional[str] = None
user: Optional[str] = None
# gke_version should be set to None to use the default GKE, otherwise use a specific version supported by GKE (e.g.: 1.25)
gke_version: Optional[str] = None
gke_version: Optional[str] = ELB_DFLT_GCP_K8S_VERSION
# if True, GCP project will be passed to gsutil calls that download files
# from GCS and users will be charged for the downloads.
requester_pays: bool = False
Expand All @@ -240,7 +240,7 @@ class GCPConfig(CloudProviderBaseConfig, ConfigParserToDataclassMapper):
'user': None,
'network': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_NETWORK),
'subnet': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_SUBNETWORK),
'gke_version': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_GKE_VERSION),
'gke_version': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_K8S_VERSION),
'requester_pays': None}

def __post_init__(self):
Expand Down Expand Up @@ -723,8 +723,11 @@ def __init__(self, *args, **kwargs):
# set batch length
if self.blast and not self.cluster.dry_run:
if self.blast.batch_len == ELB_NOT_INITIALIZED_NUM:
blast_task = ElbSupportedPrograms().get_task(self.blast.program,
self.blast.options)
self.blast.batch_len = get_batch_length(self.cloud_provider.cloud,
self.blast.program,
blast_task,
mt_mode,
self.cluster.num_cpus,
self.blast.db_metadata)
Expand Down Expand Up @@ -806,6 +809,7 @@ def _init_from_parameters(self,
program: Optional[str] = None,
db: Optional[str] = None,
queries: Optional[str] = None,
options: str = '',
dry_run: Optional[bool] = None,
cluster_name: Optional[str] = None,
machine_type: str = ''):
Expand Down Expand Up @@ -842,7 +846,8 @@ def _init_from_parameters(self,
raise ValueError('BLAST queries are missing')
self.blast = BlastConfig(program = BLASTProgram(program),
db = db,
queries_arg = queries)
queries_arg = queries,
options = options)

self.timeouts = TimeoutsConfig()
self.appstate = AppState()
Expand Down
10 changes: 5 additions & 5 deletions src/elastic_blast/filehelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"""

import subprocess, os, io, gzip, tarfile, re, tempfile, shutil, sys
import logging
import logging, shlex
import urllib.request
from string import digits
from random import sample
Expand Down Expand Up @@ -83,12 +83,12 @@ def harvest_query_splitting_results(bucket_name: str, dry_run: bool = False, bot
for line in qlist:
query_batches.append(line.strip())
else:
raise NotImplementedError(f'Harvesting query splitting results from {bucket} not supported')
raise NotImplementedError(f'Harvesting query splitting results from {bucket_name} not supported')

return QuerySplittingResults(query_length=qlen, query_batches=query_batches)


@retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
@retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) # type: ignore
def upload_file_to_gcs(filename: str, gcs_location: str, dry_run: bool = False) -> None:
""" Function to copy the filename provided to GCS """
cmd = f'gsutil -qm cp {filename} {gcs_location}'
Expand Down Expand Up @@ -484,8 +484,8 @@ def open_for_read(fname: str, gcp_prj: Optional[str] = None):
mode = 'rb' if binary else 'rt'
if fname.startswith(ELB_GCS_PREFIX):
prj = f'-u {gcp_prj}' if gcp_prj else ''
cmd = f'gsutil {prj} cat {fname}'
proc = subprocess.Popen(cmd.split(),
cmd = f'gsutil {prj} cat ' + shlex.quote(fname)
proc = subprocess.Popen(shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=not binary)
Expand Down
17 changes: 12 additions & 5 deletions src/elastic_blast/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from .elb_config import ElasticBlastConfig
from .elasticblast import ElasticBlast
from .gcp_traits import enable_gcp_api
from . import VERSION

class ElasticBlastGcp(ElasticBlast):
""" Implementation of core ElasticBLAST functionality in GCP. """
Expand Down Expand Up @@ -422,6 +423,7 @@ def job_substitutions(self) -> Dict[str, str]:
'ELB_DOCKER_IMAGE': ELB_DOCKER_IMAGE_GCP,
'ELB_TIMEFMT': '%s%N', # timestamp in nanoseconds
'BLAST_ELB_JOB_ID': uuid.uuid4().hex,
'BLAST_ELB_VERSION': VERSION,
'BLAST_USAGE_REPORT': str(usage_reporting).lower(),
'K8S_JOB_GET_BLASTDB' : K8S_JOB_GET_BLASTDB,
'K8S_JOB_LOAD_BLASTDB_INTO_RAM' : K8S_JOB_LOAD_BLASTDB_INTO_RAM,
Expand Down Expand Up @@ -547,7 +549,7 @@ def delete_disk(name: str, cfg: ElasticBlastConfig) -> None:
safe_exec(cmd)


@retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
@retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) # type: ignore
def _get_pd_id(cfg: ElasticBlastConfig) -> List[str]:
""" Try to get the GCP persistent disk ID from elastic-blast records"""
retval = list()
Expand Down Expand Up @@ -600,7 +602,7 @@ def delete_cluster_with_cleanup(cfg: ElasticBlastConfig) -> None:
try_kubernetes = True
pds = []
try:
pds = _get_pd_id(cfg)
pds = _get_pd_id(cfg) # type: ignore
except Exception as e:
logging.error(f'Unable to read disk id from GS: {e}')
else:
Expand Down Expand Up @@ -915,22 +917,27 @@ def check_prerequisites() -> None:
If execution of one of these tools is unsuccessful
it will throw UserReportError exception."""
try:
safe_exec('gcloud --version')
p = safe_exec('gcloud --version')
except SafeExecError as e:
message = f"Required pre-requisite 'gcloud' doesn't work, check installation of GCP SDK.\nDetails: {e.message}"
raise UserReportError(DEPENDENCY_ERROR, message)
logging.debug(f'{":".join(p.stdout.decode().split())}')

try:
# client=true prevents kubectl from addressing server which can be down at the moment
safe_exec('kubectl version --client=true')
p = safe_exec('kubectl version --client=true')
except SafeExecError as e:
message = f"Required pre-requisite 'kubectl' doesn't work, check Kubernetes installation.\nDetails: {e.message}"
raise UserReportError(DEPENDENCY_ERROR, message)
logging.debug(f'{":".join(p.stdout.decode().split())}')

# Check we have gsutil available
try:
safe_exec('gsutil --version')
p = safe_exec('gsutil --version')
except SafeExecError as e:
message = f"Required pre-requisite 'gsutil' doesn't work, check installation of GCP SDK.\nDetails: {e.message}\nNote: this is because your query is located on GS, you may try another location"
raise UserReportError(DEPENDENCY_ERROR, message)
logging.debug(f'{":".join(p.stdout.decode().split())}')


def remove_split_query(cfg: ElasticBlastConfig) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/elastic_blast/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def copy_to_results_bucket_if_not_present(filename: str, bucket: str):
if bucket.startswith(ELB_S3_PREFIX):
copy_file_to_s3(bucket, Path(filename))
else:
upload_file_to_gcs(filename, bucket)
upload_file_to_gcs(filename, bucket) # type: ignore


def janitor(elb: ElasticBlast) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/elastic_blast/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_persistent_disks(k8s_ctx: str, dry_run: bool = False) -> List[str]:
return list()


@retry( stop=(stop_after_delay(ELB_K8S_JOB_SUBMISSION_TIMEOUT) | stop_after_attempt(ELB_K8S_JOB_SUBMISSION_MAX_RETRIES)), wait=wait_random(min=ELB_K8S_JOB_SUBMISSION_MIN_WAIT, max=ELB_K8S_JOB_SUBMISSION_MAX_WAIT))
@retry( stop=(stop_after_delay(ELB_K8S_JOB_SUBMISSION_TIMEOUT) | stop_after_attempt(ELB_K8S_JOB_SUBMISSION_MAX_RETRIES)), wait=wait_random(min=ELB_K8S_JOB_SUBMISSION_MIN_WAIT, max=ELB_K8S_JOB_SUBMISSION_MAX_WAIT)) # type: ignore
def submit_jobs_with_retries(k8s_ctx: str, path: pathlib.Path, dry_run=False) -> List[str]:
""" Retry kubernetes job submissions with the parameters specified in the decorator """
return submit_jobs(k8s_ctx, path, dry_run)
Expand Down Expand Up @@ -146,7 +146,7 @@ def submit_jobs(k8s_ctx: str, path: pathlib.Path, dry_run=False) -> List[str]:
elif num_files > K8S_MAX_JOBS_PER_DIR:
files = os.listdir(str(path))
for i, f in enumerate(sorted(files, key=lambda x: int(os.path.splitext(x)[0].split('_')[1]))):
retval += submit_jobs_with_retries(k8s_ctx, pathlib.Path(os.path.join(path, f)), dry_run)
retval += submit_jobs_with_retries(k8s_ctx, pathlib.Path(os.path.join(path, f)), dry_run) # type: ignore
perc_done = i / num_files * 100.
if i % 50 == 0:
logging.debug(f'Submitted job file # {i} of {num_files} {perc_done:.2f}% done')
Expand Down
Loading

0 comments on commit f35e19a

Please sign in to comment.