Skip to content

Commit

Permalink
adding google life sciences logging save to storage bucket
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsochat@stanford.edu>
  • Loading branch information
vsoch committed Sep 2, 2020
1 parent 73c1be9 commit 6a1fd02
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 5 deletions.
44 changes: 39 additions & 5 deletions snakemake/executors/google_lifesciences.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def _get_bucket(self):
self.gs_subdir = re.sub(
"^{}/".format(bucket_name), "", self.workflow.default_remote_prefix
)
self.gs_logs = os.path.join(self.gs_subdir, "google-lifesciences-logs")

# Case 1: The bucket already exists
try:
Expand All @@ -199,6 +200,7 @@ def _get_bucket(self):

logger.debug("bucket=%s" % self.bucket.name)
logger.debug("subdir=%s" % self.gs_subdir)
logger.debug("logs=%s" % self.gs_logs)

def _set_location(self, location=None):
"""The location is where the Google Life Sciences API is located.
Expand Down Expand Up @@ -663,6 +665,30 @@ def _upload_build_source_package(self, targz):
if not blob.exists():
blob.upload_from_filename(targz, content_type="application/gzip")

def _generate_log_action(self, job):
"""generate an action to save the pipeline logs to storage.
"""
# script should be changed to this when added to version control!
# https://raw.githubusercontent.com/snakemake/snakemake/master/snakemake/executors/google_lifesciences_helper.py

# Save logs from /google/logs/output to source/logs in bucket
commands = [
"/bin/bash",
"-c",
"wget -O /gls.py https://gist.githubusercontent.com/vsoch/f5a6a6d1894be1e67aa4156c5b40c8e9/raw/a4e9ddbeba20996ca62745fcd4d9ecd7bfa3b311/gls.py && chmod +x /gls.py && source activate snakemake || true && python /gls.py save %s /google/logs %s/%s"
% (self.bucket.name, self.gs_logs, job.name),
]

# Always run the action to generate log output
action = {
"containerName": "snakelog-{}-{}".format(job.name, job.jobid),
"imageUri": self.container_image,
"commands": commands,
"labels": self._generate_pipeline_labels(job),
"alwaysRun": True,
}
return action

def _generate_job_action(self, job):
"""generate a single action to execute the job.
"""
Expand All @@ -676,13 +702,15 @@ def _generate_job_action(self, job):
# Now that we've parsed the job resource requirements, add to exec
exec_job += self.get_default_resources_args()

# script should be changed to this when added to version control!
# https://raw.githubusercontent.com/snakemake/snakemake/master/snakemake/executors/google_lifesciences_helper.py
# The full command to download the archive, extract, and run
# For snakemake bases, we must activate the conda environment, but
# for custom images we must allow this to fail (hence || true)
commands = [
"/bin/bash",
"-c",
"mkdir -p /workdir && cd /workdir && wget -O /download.py https://gist.githubusercontent.com/vsoch/84886ef6469bedeeb9a79a4eb7aec0d1/raw/181499f8f17163dcb2f89822079938cbfbd258cc/download.py && chmod +x /download.py && source activate snakemake || true && python /download.py download %s %s /tmp/workdir.tar.gz && tar -xzvf /tmp/workdir.tar.gz && %s"
"mkdir -p /workdir && cd /workdir && wget -O /download.py https://gist.githubusercontent.com/vsoch/f5a6a6d1894be1e67aa4156c5b40c8e9/raw/a4e9ddbeba20996ca62745fcd4d9ecd7bfa3b311/gls.py && chmod +x /download.py && source activate snakemake || true && python /download.py download %s %s /tmp/workdir.tar.gz && tar -xzvf /tmp/workdir.tar.gz && %s"
% (self.bucket.name, self.pipeline_package, exec_job),
]

Expand Down Expand Up @@ -730,13 +758,14 @@ def _generate_pipeline(self, job):
to pass to pipelines.run. This includes actions, resources,
environment, and timeout.
"""
# Generate actions (one per job) and resources
# Generate actions (one per job step) and log saving action (runs no matter what) and resources
resources = self._generate_job_resources(job)
action = self._generate_job_action(job)
log_action = self._generate_log_action(job)

pipeline = {
# Ordered list of actions to execute
"actions": [action],
"actions": [action, log_action],
# resources required for execution
"resources": resources,
# Technical question - difference between resource and action environment
Expand Down Expand Up @@ -785,8 +814,13 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
"Get status with:\n"
"gcloud config set project {project}\n"
"gcloud beta lifesciences operations describe {location}/operations/{jobid}\n"
"gcloud beta lifesciences operations list".format(
project=self.project, jobid=jobid, location=self.location
"gcloud beta lifesciences operations list\n"
"Logs will be saved to: {bucket}/{logdir}\n".format(
project=self.project,
jobid=jobid,
location=self.location,
bucket=self.bucket.name,
logdir=self.gs_logs,
)
)

Expand Down
122 changes: 122 additions & 0 deletions snakemake/executors/google_lifesciences_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python

# This is a helper script for the Google Life Sciences instance to be able to:
# 1. download a blob from storage, which is required at the onset of the Snakemake
# gls.py download <bucket> <source> <destination>
# workflow step to obtain the working directory.
# 2. Upload logs back to storage (or some specified directory of files)
# gls.py save <bucket> <source-dir> <destination-dir>
# gls.py save <bucket> /google/logs/output source/logs

import argparse
import datetime

from google.cloud import storage
from glob import glob
import sys
import os


def download_blob(bucket_name, source_blob_name, destination_file_name):
"""Downloads a blob from the bucket."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)

blob.download_to_filename(destination_file_name)

print("Blob {} downloaded to {}.".format(source_blob_name, destination_file_name))


def save_files(bucket_name, source_path, destination_path):
"""given a directory path, save all files recursively to storage
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

# destination path should be stripped of path indicators too
bucket_name = bucket_name.strip("/")
destination_path = destination_path.strip("/")

# These are fullpaths
filenames = get_source_files(source_path)
print("\nThe following files will be uploaded: %s" % "\n".join(filenames))

if not filenames:
print("Did not find any filenames under %s" % source_path)

# Do the upload!
for filename in filenames:

# The relative path of the filename from the source path
relative_path = filename.replace(source_path, "", 1).strip("/")

# The path in storage includes relative path from destination_path
storage_path = os.path.join(destination_path, relative_path)
full_path = os.path.join(bucket_name, storage_path)
print(f"{filename} -> {full_path}")

# Get the blob
blob = bucket.blob(storage_path)
if not blob.exists():
print("Uploading %s to %s" % (filename, full_path))
blob.upload_from_filename(filename)


def get_source_files(source_path):
"""Given a directory, return a listing of files to upload
"""
filenames = []
if not os.path.exists(source_path):
print("%s does not exist!" % source_path)
sys.exit(0)

for x in os.walk(source_path):
for name in glob(os.path.join(x[0], "*")):
if not os.path.isdir(name):
filenames.append(name)
return filenames


def add_ending_slash(filename):
"""Since we want to replace based on having an ending slash, ensure it's there
"""
if not filename.endswith("/"):
filename = "%s/" % filename
return filename


def blob_commands(args):
if args.command == "download":
download_blob(
args.bucket_name, args.source_blob_name, args.destination_file_name
)
elif args.command == "save":
save_files(args.bucket_name, args.source_path, args.destination_path)


def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter
)

subparsers = parser.add_subparsers(dest="command")

# Download file from storage
download_parser = subparsers.add_parser("download", help=download_blob.__doc__)
download_parser.add_argument("bucket_name", help="Your cloud storage bucket.")
download_parser.add_argument("source_blob_name")
download_parser.add_argument("destination_file_name")

# Save logs to storage
save_parser = subparsers.add_parser("save", help=save_files.__doc__)
save_parser.add_argument("bucket_name", help="Your cloud storage bucket.")
save_parser.add_argument("source_path")
save_parser.add_argument("destination_path")

args = parser.parse_args()
blob_commands(args)


if __name__ == "__main__":
main()

0 comments on commit 6a1fd02

Please sign in to comment.