Skip to content
Permalink
Browse files
fix: issue1460 intermediate files existing (or not) (#1541)
* chore: add testcase for issue #1460

* fix expected results

* try force update of inventory after upload

Signed-off-by: vsoch <vsoch@users.noreply.github.com>

* run gls tests first

Signed-off-by: vsoch <vsoch@users.noreply.github.com>

* pushing test fix

the issue is that the blob object held by a GS remote object can go sort
of stale, returning a False for blob.exists() when it clearly exists! To fix
we need to do an additional self.update_blob() and then returning the exists
check again. I am not sure if this can be made more efficient by only checking
under certain conditions, but since it seems likely we cannot perfectly know when
the blob has gone stale the sure way is to always update.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>

* wrong version of black

Signed-off-by: vsoch <vsoch@users.noreply.github.com>

* remove unused remote inventory, was just testing!

Signed-off-by: vsoch <vsoch@users.noreply.github.com>

Co-authored-by: Johannes Köster <johannes.koester@tu-dortmund.de>
Co-authored-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 2, 2022
1 parent 1112321 commit 1b3ede19159856a982de65e6293ab064c0987352
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 46 deletions.
@@ -115,6 +115,15 @@ jobs:
run: |
echo CONTAINER_IMAGE=snakemake/snakemake:$GITHUB_SHA >> $GITHUB_ENV
# TODO reactivate in April (we have no free resources left this month)
- name: Test Google Life Sciences Executor
if: env.GCP_AVAILABLE
run: |
# activate conda env
export PATH="/usr/share/miniconda/bin:$PATH"
source activate snakemake
pytest -s -v -x tests/test_google_lifesciences.py
# TODO reactivate in April (we have no free resources left this month)
- name: Test Kubernetes execution
if: env.GCP_AVAILABLE
@@ -139,15 +148,6 @@ jobs:
# pytest -v -x -s tests/test_tibanna.py

# TODO reactivate in April (we have no free resources left this month)
- name: Test Google Life Sciences Executor
if: env.GCP_AVAILABLE
run: |
# activate conda env
export PATH="/usr/share/miniconda/bin:$PATH"
source activate snakemake
pytest -s -v -x tests/test_google_lifesciences.py
- name: Test GA4GH TES executor
run: |
# activate conda env
@@ -33,7 +33,8 @@


class GoogleLifeSciencesExecutor(ClusterExecutor):
"""the GoogleLifeSciences executor uses Google Cloud Storage, and
"""
The GoogleLifeSciences executor uses Google Cloud Storage, and
Compute Engine paired with the Google Life Sciences API.
https://cloud.google.com/life-sciences/docs/quickstart
"""
@@ -125,7 +126,8 @@ def get_default_resources_args(self, default_resources=None):
)

def _get_services(self):
"""use the Google Discovery Build to generate API clients
"""
Use the Google Discovery Build to generate API clients
for Life Sciences, and use the google storage python client
for storage.
"""
@@ -184,7 +186,8 @@ def build_request(http, *args, **kwargs):
self._bucket_service = storage.Client()

def _get_bucket(self):
"""get a connection to the storage bucket (self.bucket) and exit
"""
Get a connection to the storage bucket (self.bucket) and exit
if the name is taken or otherwise invalid.
Parameters
@@ -223,7 +226,8 @@ def _get_bucket(self):
logger.debug("logs=%s" % self.gs_logs)

def _set_location(self, location=None):
"""The location is where the Google Life Sciences API is located.
"""
The location is where the Google Life Sciences API is located.
This can be meaningful if the requester has data residency
requirements or multi-zone needs. To determine this value,
we first use the locations API to determine locations available,
@@ -290,7 +294,8 @@ def _set_location(self, location=None):
)

def shutdown(self):
"""shutdown deletes build packages if the user didn't request to clean
"""
Shutdown deletes build packages if the user didn't request to clean
up the cache. At this point we've already cancelled running jobs.
"""
from google.api_core import retry
@@ -334,7 +339,8 @@ def cancel(self):
self.shutdown()

def get_available_machine_types(self):
"""Using the regions available at self.regions, use the GCP API
"""
Using the regions available at self.regions, use the GCP API
to retrieve a lookup dictionary of all available machine types.
"""
# Regular expression to determine if zone in region
@@ -374,7 +380,8 @@ def get_available_machine_types(self):
return machine_types

def _add_gpu(self, gpu_count):
"""Add a number of NVIDIA gpus to the current executor. This works
"""
Add a number of NVIDIA gpus to the current executor. This works
by way of adding nvidia_gpu to the job default resources, and also
changing the default machine type prefix to be n1, which is
the currently only supported instance type for using GPUs for LHS.
@@ -393,7 +400,8 @@ def _add_gpu(self, gpu_count):
self._machine_type_prefix = "n1"

def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None):
"""define a lookup dictionary for preemptible instance retries, which
"""
Define a lookup dictionary for preemptible instance retries, which
is supported by the Google Life Science API. The user can set a default
for all steps, specify per step, or define a default for all steps
that aren't individually customized.
@@ -418,7 +426,8 @@ def _set_preemptible_rules(self, preemption_default=None, preemptible_rules=None
rule.restart_times = restart_times

def _generate_job_resources(self, job):
"""given a particular job, generate the resources that it needs,
"""
Given a particular job, generate the resources that it needs,
including default regions and the virtual machine configuration
"""
# Right now, do a best effort mapping of resources to instance types
@@ -563,7 +572,8 @@ def _generate_job_resources(self, job):
return resources

def _get_accelerator(self, gpu_count, zone, gpu_model=None):
"""Get an appropriate accelerator for a GPU given a zone selection.
"""
Get an appropriate accelerator for a GPU given a zone selection.
Currently Google offers NVIDIA Tesla T4 (likely the best),
NVIDIA P100, and the same T4 for a graphical workstation. Since
this isn't a graphical workstation use case, we choose the
@@ -620,7 +630,8 @@ def get_snakefile(self):
return self.workflow.main_snakefile.replace(self.workdir, "").strip(os.sep)

def _set_workflow_sources(self):
"""We only add files from the working directory that are config related
"""
We only add files from the working directory that are config related
(e.g., the Snakefile or a config.yml equivalent), or checked into git.
"""
self.workflow_sources = []
@@ -640,7 +651,8 @@ def _set_workflow_sources(self):
)

def _generate_build_source_package(self):
"""in order for the instance to access the working directory in storage,
"""
In order for the instance to access the working directory in storage,
we need to upload it. This file is cleaned up at the end of the run.
We do this, and then obtain from the instance and extract.
"""
@@ -687,7 +699,8 @@ def _generate_build_source_package(self):
return hash_tar

def _upload_build_source_package(self, targz):
"""given a .tar.gz created for a workflow, upload it to source/cache
"""
Given a .tar.gz created for a workflow, upload it to source/cache
of Google storage, only if the blob doesn't already exist.
"""
from google.api_core import retry
@@ -728,7 +741,9 @@ def _generate_log_action(self, job):
return action

def _generate_job_action(self, job):
"""generate a single action to execute the job."""
"""
Generate a single action to execute the job.
"""
exec_job = self.format_job_exec(job)

# The full command to download the archive, extract, and run
@@ -764,7 +779,8 @@ def _get_jobname(self, job):
return "snakejob-%s-%s-%s" % (self.run_namespace, job.name, job.jobid)

def _generate_pipeline_labels(self, job):
"""generate basic labels to identify the job, namespace, and that
"""
Generate basic labels to identify the job, namespace, and that
snakemake is running the show!
"""
jobname = self._get_jobname(job)
@@ -788,7 +804,8 @@ def _generate_environment(self):
return envvars

def _generate_pipeline(self, job):
"""based on the job details, generate a google Pipeline object
"""
Based on the job details, generate a google Pipeline object
to pass to pipelines.run. This includes actions, resources,
environment, and timeout.
"""
@@ -864,7 +881,8 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)

def _job_was_successful(self, status):
"""based on a status response (a [pipeline].projects.locations.operations.get
"""
Based on a status response (a [pipeline].projects.locations.operations.get
debug print the list of events, return True if all return codes 0
and False otherwise (indication of failure). In that a nonzero exit
status is found, we also debug print it for the user.
@@ -897,7 +915,8 @@ def _job_was_successful(self, status):
return success

def _retry_request(self, request, timeout=2, attempts=3):
"""The Google Python API client frequently has BrokenPipe errors. This
"""
The Google Python API client frequently has BrokenPipe errors. This
function takes a request, and executes it up to number of retry,
each time with a 2* increase in timeout.
@@ -936,7 +955,8 @@ def _retry_request(self, request, timeout=2, attempts=3):
raise ex

def _wait_for_jobs(self):
"""wait for jobs to complete. This means requesting their status,
"""
Wait for jobs to complete. This means requesting their status,
and then marking them as finished when a "done" parameter
shows up. Even for finished jobs, the status should still return
"""
@@ -206,8 +206,10 @@ def exists(self):
return True
elif any(self.directory_entries()):
return True
else:
return False

# The blob object can get out of sync, one last try!
self.update_blob()
return self.blob.exists()

@retry.Retry(predicate=google_cloud_retry_predicate)
def mtime(self):
@@ -9,7 +9,9 @@

skip_on_windows = pytest.mark.skipif(ON_WINDOWS, reason="Unix stuff")
only_on_windows = pytest.mark.skipif(not ON_WINDOWS, reason="Windows stuff")
needs_strace = pytest.mark.xfail(os.system("strace -o /dev/null true") != 0, reason="Missing strace")
needs_strace = pytest.mark.xfail(
os.system("strace -o /dev/null true") != 0, reason="Missing strace"
)


@pytest.fixture(autouse=True)
@@ -65,13 +65,10 @@ def test_allow_missing():
"4_{c}.b",
]
# replace product
assert (
expand(
["{a}_{b}_{C}.ab", "{b}_{c}.b"],
zip,
a="1 2".split(),
b="3 4".split(),
allow_missing=True,
)
== ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"]
)
assert expand(
["{a}_{b}_{C}.ab", "{b}_{c}.b"],
zip,
a="1 2".split(),
b="3 4".split(),
allow_missing=True,
) == ["1_3_{C}.ab", "2_4_{C}.ab", "3_{c}.b", "4_{c}.b"]
@@ -0,0 +1,29 @@
rule all:
input:
"blob.txt",
"test.txt"


rule intermediate:
input:
"preblob.txt",
"pretest.txt",
output:
"blob.txt",
"test.txt",
shell:
"""
cp {input[0]} {output[0]}
cp {input[1]} {output[1]}
"""


rule create:
output:
"preblob.txt",
"pretest.txt"
shell:
'''
echo "test file" > {output[0]}
echo "test file" > {output[1]}
'''
Empty file.
@@ -20,23 +20,29 @@ def has_google_credentials():
)


def cleanup_google_storage(prefix, bucket_name="snakemake-testing"):
def cleanup_google_storage(prefix, bucket_name="snakemake-testing", restrict_to=None):
"""Given a storage prefix and a bucket, recursively delete files there
This is intended to run after testing to ensure that
the bucket is cleaned up.
Arguments:
prefix (str) : the "subfolder" or prefix for some files in the buckets
bucket_name (str) : the name of the bucket, default snakemake-testing
restrict_to (list) : only delete files in these paths (None deletes all)
"""
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix="source")
for blob in blobs:
blob.delete()
# Using API we get an exception about bucket deletion
shell("gsutil -m rm -r gs://{bucket.name}/* || true")
bucket.delete()
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
if restrict_to is None or f"{bucket_name}/{blob.name}" in restrict_to:
blob.delete()
if restrict_to is None:
# Using API we get an exception about bucket deletion
shell("gsutil -m rm -r gs://{bucket.name}/* || true")
bucket.delete()


def create_google_storage(bucket_name="snakemake-testing"):
@@ -126,3 +132,35 @@ def test_github_issue1396():
)
finally:
cleanup_google_storage(storage_prefix, bucket_name)


def test_github_issue1460():
bucket_name = "snakemake-testing-%s" % next(tempfile._get_candidate_names())
create_google_storage(bucket_name)
storage_prefix = "test_github_issue1460"
prefix = "%s/%s" % (bucket_name, storage_prefix)
workdir = dpath("test_github_issue1460")
try:
run(
workdir,
default_remote_prefix=prefix,
google_lifesciences=True,
google_lifesciences_cache=False,
)
cleanup_google_storage(
storage_prefix,
bucket_name,
restrict_to=[
f"{prefix}/test.txt",
f"{prefix}/blob.txt",
f"{prefix}/pretest.txt",
],
)
run(
workdir,
default_remote_prefix=prefix,
google_lifesciences=True,
google_lifesciences_cache=False,
)
finally:
cleanup_google_storage(storage_prefix, bucket_name)
@@ -72,4 +72,11 @@ def test_dicts_in_config():
),
file=f,
)
snakemake(path, workdir=tmpdir, config={"this_option": "does_not_break", "test": {'this_dict':'shoult_not_either'}})
snakemake(
path,
workdir=tmpdir,
config={
"this_option": "does_not_break",
"test": {"this_dict": "shoult_not_either"},
},
)

0 comments on commit 1b3ede1

Please sign in to comment.