In [20]:
import json
from cluster import ClusterApi, BatchJobSpec, SecretVolume, PersistentClaimVolume, ConfigMapVolume, Container
# turn off warnings due to dev cluster cert
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [21]:
namespace = 'volume-project'
job_name = "job8"
job_claim_name = "{}-claim".format(job_name)
job_secret_name = "{}-dukeds".format(job_name)

stage_job_config_map_name = "{}-stage-config".format(job_name)
stage_job_name = "{}-stage".format(job_name)

download_workflow_job_name = "{}-down-wf".format(job_name)

cwl_job_config_map_name = "{}-cwl-config".format(job_name)
cwl_job_name = "{}-cwl".format(job_name)

save_results_job_config_map_name = "{}-save-config".format(job_name)
save_results_job_name = "{}-save".format(job_name)
save_results_project_name = "results_from_job8"

In [22]:
api = ClusterApi(namespace)

In [23]:
# Create PVC to hold data for the job
claim = api.create_persistent_volume_claim(job_claim_name, storage_size_in_g=2)
persistent_claim_volume = PersistentClaimVolume("data", mount_path="/data", volume_claim_name=job_claim_name)

In [24]:
# Create Secret to hold DukeDSClient keys
agent_key = 'TODO'
user_key = 'TODO'
ddsclient_secret_config = json.dumps({"agent_key": agent_key, "user_key": user_key})
secret = api.create_secret(job_secret_name, {"config": ddsclient_secret_config})
ddsclient_secret_volume = SecretVolume("ddsclient", mount_path="/etc/ddsclient", secret_name=job_secret_name)

In [25]:
# record config map of files to be downloaded
# TODO: make format more generic
config_data = {
  "files": [
    {
      "key": "TODO",
      "dest": "data.txt"
    },
    {
      "key": "TODO",
      "dest": "SRR_1_first_100000.fastq.gz"
    }
  ]
}
payload = {
    "commands": json.dumps(config_data)
}
config_map = api.create_config_map(name=stage_job_config_map_name, data=payload)
stage_data_config_volume = ConfigMapVolume("config", mount_path="/etc/config", 
                                           config_map_name=stage_job_config_map_name, 
                                           source_key="commands", source_path="commands")

In [26]:
# Run job to stage data based on the config map
container = Container(
    name=stage_job_name, 
    image_name="jbradley/duke-ds-staging", 
    command="python", 
    args=["/app/download.py", "/etc/config/commands"], 
    working_dir="/data", 
    env_dict={"DDSCLIENT_CONF": "/etc/ddsclient/config"},
    requested_cpu="100m",
    requested_memory="64Mi",
    volumes=[
        persistent_claim_volume,    
        ddsclient_secret_volume, 
        stage_data_config_volume,
    ])
job_spec = BatchJobSpec(stage_job_name, container=container)
job = api.create_job(stage_job_name, job_spec)

In [27]:
# Wait then cleanup job
api.wait_for_jobs(job_names=[stage_job_name])
api.delete_job(stage_job_name)
api.delete_config_map(stage_job_config_map_name)

Jobs complete: job8-stage


In [28]:
# Download workflow
container = Container(
    name=download_workflow_job_name, 
    image_name="jbradley/duke-ds-staging", 
    command="wget", 
    args=["https://raw.githubusercontent.com/Duke-GCB/bespin-cwl/master/tools/fastqc.cwl"],
    working_dir="/data", 
    env_dict={},
    requested_cpu="100m",
    requested_memory="64Mi",
    volumes=[persistent_claim_volume]
)
job_spec = BatchJobSpec(download_workflow_job_name, container=container)
job = api.create_job(download_workflow_job_name, job_spec)
# Wait then cleanup
api.wait_for_jobs(job_names=[download_workflow_job_name])
api.delete_job(download_workflow_job_name)

Jobs complete: job8-down-wf


In [29]:
config_data = {
  "input_fastq_file": {
      "class": "File",
      "path": "/data/SRR_1_first_100000.fastq.gz",
  }
}
payload = {
    "joborder": json.dumps(config_data)
}
config_map = api.create_config_map(name=cwl_job_config_map_name, data=payload)
joborder_volume = ConfigMapVolume("config", 
                    mount_path="/etc/config", 
                    config_map_name=cwl_job_config_map_name, 
                    source_key="joborder", 
                    source_path="joborder")

In [30]:
# Run job to create taco.txt
container = Container(
    name=cwl_job_name, 
    image_name="jbradley/cwltool-fastqc", 
    command="cwltool", 
    args=["--no-container", "--outdir", "/data/results", "fastqc.cwl", "/etc/config/joborder"],
    working_dir="/data", 
    env_dict={},
    requested_cpu="100m",
    requested_memory="64Mi",
    volumes=[persistent_claim_volume, joborder_volume])
job_spec = BatchJobSpec(cwl_job_name, container=container)
job = api.create_job(cwl_job_name, job_spec)

In [31]:
# Wait then cleanup
api.wait_for_jobs(job_names=[cwl_job_name])
api.delete_config_map(cwl_job_config_map_name)
api.delete_job(cwl_job_name)

Jobs complete: job8-cwl


In [32]:
# record config map of files to be uploaded
# TODO: make format more generic
config_data = {
    "destination": save_results_project_name,
    "paths": [ "/data/results" ]
}
payload = {
    "commands": json.dumps(config_data)
}
config_map = api.create_config_map(name=save_results_job_config_map_name, data=payload)
save_results_config_volume = ConfigMapVolume("config", mount_path="/etc/config", 
                                             config_map_name=save_results_job_config_map_name, 
                                             source_key="commands", source_path="commands")

In [35]:
# Run job to store output to a project
container = Container(
    name=save_results_job_name, 
    image_name="jbradley/duke-ds-staging", 
    command="python", 
    args=["/app/upload.py", "/etc/config/commands"], 
    working_dir="/data", 
    env_dict={"DDSCLIENT_CONF": "/etc/ddsclient/config"},
    requested_cpu="100m",
    requested_memory="64Mi",
    volumes=[ddsclient_secret_volume, persistent_claim_volume, save_results_config_volume])
job_spec = BatchJobSpec(save_results_job_name, container=container)
job = api.create_job(save_results_job_name, job_spec)

In [36]:
# After stage job finishes cleanup
api.wait_for_jobs(job_names=[save_results_job_name])
api.delete_job(save_results_job_name)
api.delete_config_map(save_results_job_config_map_name)

Jobs complete: job8-save


In [37]:
# cleanup job volume and secret
api.delete_persistent_volume_claim(job_claim_name)
api.delete_secret(job_secret_name)