Setup MELTv2.2.0 workflow running on a dedicated cromwell server

In [57]:
WRKDIR='your_working_dir'
LOCALTOOLS='path_to/MELTv2.2.0-pipeline'
PROJECT_ID='gcp_project_ID'
TASK='melt'
COHORT='cohort'
BUCKET='gs://your_bucket'
GCRESOURCES='gs://your_resource_bucket'
SAMPLE_BUCKET='gs://bucket/{}/hg38/crams'.format(COHORT)

(1) Setup a cromwell server

In [64]:
print('#copy and run locally:')
print('#fire up the cromwell instance')
print('cd {}/verily-amp-pd-source/setup_cromwell_vm/'.format(WRKDIR))
print('chmod +x {}/verily-amp-pd-source/setup_cromwell_vm/*.sh'.format(WRKDIR))
print('./create_cromwell_server.sh {}-cromwell {} n1-highmem-8'\
      .format(TASK.replace('_','-'),PROJECT_ID))
print('./configure.sh {}-cromwell {} {}'\
      .format(TASK.replace('_','-'),PROJECT_ID,BUCKET))

#copy and run locally:
#fire up the cromwell instance
cd your_working_dir/verily-amp-pd-source/setup_cromwell_vm/
chmod +x your_working_dir/verily-amp-pd-source/setup_cromwell_vm/*.sh
./create_cromwell_server.sh melt-cromwell gcp_project_ID n1-highmem-8
./configure.sh melt-cromwell gcp_project_ID gs://your_bucket


In [65]:
print('#When that is up, ssh to the instance')
print('gcloud --project {} compute ssh {}-cromwell'.format(PROJECT_ID,TASK))

#When that is up, ssh to the instance
gcloud --project gcp_project_ID compute ssh melt-cromwell


In [37]:
print('#And in that SSH session, run:')
print('cd /install')
print('docker-compose -f /install/workspace/config/docker-compose.yml up')

#And in that SSH session, run:
cd /install
docker-compose -f /install/workspace/config/docker-compose.yml up


In [66]:
print('#copy and run locally:')
print('#When cromwell is up, create an SSH tunnel from your workstation, if not already connected:')
print('gcloud --project {} compute ssh {}-cromwell -- -L 8000:localhost:8000'.\
      format(PROJECT_ID,TASK.replace('_','-')))

#copy and run locally:
#When cromwell is up, create an SSH tunnel from your workstation, if not already connected:
gcloud --project gcp_project_ID compute ssh melt-cromwell -- -L 8000:localhost:8000


(2) Format input files

In [16]:
print('cd {}/{}'.format(WRKDIR,COHORT))

cd your_working_dir/cohort


In [17]:
print('mkdir jsons')

mkdir jsons


In [38]:
import pandas as pd
SampleInfor_df=pd.read_csv('{}/{}/{}.hg38.cram'.format(WRKDIR,COHORT,COHORT), sep='\t', names=['SampleID'])
print(SampleInfor_df.shape)
print(SampleInfor_df.head())

(742, 1)
    SampleID
0   2020-159
1   25020484
2  NIHPD0657
3     NIHPD1
4   NIHPD103


In [39]:
import json
json_label_template = '{}/templates/blank.meltv2.2.0.label.json'.format(LOCALTOOLS)
json_input_template = '{}/templates/blank.meltv2.2.0.input.json'.format(LOCALTOOLS)
json_options_template = '{}/templates/blank.meltv2.2.0.options.json'.format(LOCALTOOLS)

for sample_id in SampleInfor_df['SampleID']:
    json_label_outfile_name = '{}/{}/jsons/{}.meltv2.2.0.label.json'.format(WRKDIR,COHORT,sample_id)
    json_input_outfile_name = '{}/{}/jsons/{}.meltv2.2.0.input.json'.format(WRKDIR,COHORT,sample_id)
    json_options_outfile_name = '{}/{}/jsons/{}.meltv2.2.0.options.json'.format(WRKDIR,COHORT,sample_id)
    
    with open(json_label_template) as json_file:
        label_data = json.load(json_file)
        
        label_data['cohort'] = COHORT.lower()
        label_data['sample'] = sample_id.lower()
        
        with open(json_label_outfile_name, 'w') as json_outfile:
            json.dump(label_data,json_outfile,sort_keys=False,indent=4)
            
    with open(json_options_template) as json_file:
        options_data = json.load(json_file)
        
        options_data['final_workflow_outputs_dir'] = "{}/{}/hg38/sv-melt/work/{}".format(BUCKET,COHORT,sample_id)
        options_data['final_workflow_log_dir'] = "{}/{}/logs/melt/{}".format(BUCKET,COHORT,sample_id)
        options_data['final_call_logs_dir'] = "{}/{}/logs/melt/{}".format(BUCKET,COHORT,sample_id)

        with open(json_options_outfile_name, 'w') as json_outfile:
            json.dump(options_data,json_outfile,sort_keys=False,indent=4)
            
    with open(json_input_template) as json_file:
        input_data = json.load(json_file)
        
        input_data['MELTSingleDelFlow.sample_name'] = sample_id
        input_data['MELTSingleDelFlow.input_cram'] = "{}/{}.cram".format(SAMPLE_BUCKET, sample_id)
        input_data['MELTSingleDelFlow.input_cram_crai'] = "{}/{}.cram.crai".format(SAMPLE_BUCKET, sample_id)
        input_data['MELTSingleDelFlow.ref_fasta'] = "{}/broad/Homo_sapiens_assembly38.fasta".format(GCRESOURCES)
        input_data['MELTSingleDelFlow.ref_fasta_index'] = "{}/broad/Homo_sapiens_assembly38.fasta.fai".format(GCRESOURCES)
        input_data['MELTSingleDelFlow.ref_dict'] = "{}/broad/Homo_sapiens_assembly38.dict".format(GCRESOURCES)
        input_data['MELTSingleDelFlow.region_bed'] = "{}/MELT-v2.2.0/MELT_DEL_subset.bed".format(GCRESOURCES)
        input_data['MELTSingleDelFlow.prior_files'] = "{}/MELT-v2.2.0/prior_files.tar.gz".format(GCRESOURCES)
        input_data['MELTSingleDelFlow.meltv2_docker'] = "us.gcr.io/nih-nia-lng-cbg/meltv2:2020-4"
        
        with open(json_input_outfile_name, 'w') as json_outfile:
            json.dump(input_data,json_outfile,sort_keys=False,indent=4)

(3) Format the commands

In [40]:
def formatgcpcmd(this_sample):
    this_cmd = 'echo -n {SAMPLE} \n\
python2 {WRKDIR}/tools/verily-amp-pd-source/wdl_workflow_runner/cromwell_client.py \
--wdl {LOCALTOOLS}/MELTv2.2.0_Single_DEL.wdl \
--workflow-inputs {WRKDIR}/{COHORT}/jsons/{SAMPLE}.meltv2.2.0.input.json \
--workflow-options {WRKDIR}/{COHORT}/jsons/{SAMPLE}.meltv2.2.0.options.json \
--workflow-labels {WRKDIR}/{COHORT}/jsons/{SAMPLE}.meltv2.2.0.label.json'
    return(this_cmd.format(WRKDIR=WRKDIR,LOCALTOOLS=LOCALTOOLS,COHORT=COHORT,SAMPLE=this_sample))

In [53]:
cmds = [formatgcpcmd(sample_id) for sample_id in SampleInfor_df['SampleID']]
temp_script_file = '{}/{}/run_MELTv2.2.0_Single_DEL.sh'.format(WRKDIR,COHORT)
with open(temp_script_file, 'w') as file_handler:
    for this_cmd in cmds:
        file_handler.write("{}\n".format(this_cmd))

(4) Run run_MELTv2.2.0_Single_Del scripts

In [59]:
print('#copy and run locally:')
print('chmod +x ' + temp_script_file)
print('nohup ' + temp_script_file + ' > {}/{}/run_MELTv2.2.0_Single_Del.log &'.format(WRKDIR,COHORT))

#copy and run locally:
chmod +x your_working_dir/cohort/run_MELTv2.2.0_Single_DEL.sh
nohup your_working_dir/cohort/run_MELTv2.2.0_Single_DEL.sh > your_working_dir/cohort/run_MELTv2.2.0_Single_Del.log &


(5) Monitor the jobs

In [77]:
print('#if tunnel established can check cromwell status\n')
print('curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Running"')
print('curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Submitted"')
print('curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Failed"')
print('curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Succeeded"')

#if tunnel established can check cromwell status

curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Running"
curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Submitted"
curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Failed"
curl -X GET "http://localhost:8000/api/workflows/v1/query?status=Succeeded"


(6) Transfer results files to cohort bucket

In [60]:
print('gsutil -mq mv {}/{}/hg38/sv-melt/work/**/*.gz {}/{}/hg38/sv-melt/'\
      .format(BUCKET,COHORT.replace('_','-'),BUCKET,COHORT.replace('_','-')))

gsutil -mq mv gs://your_bucket/cohort/hg38/sv-melt/work/**/*.gz gs://your_bucket/cohort/hg38/sv-melt/


(7) Clean up cromwell workspace

In [61]:
print ('gsutil -mq rm -r {}/cromwell-execution/MELTSingleDelFlow'.format(BUCKET))

gsutil -mq rm -r gs://your_bucket/cromwell-execution/MELTSingleDelFlow


(8) Delete VM instance

In [63]:
print('gcloud compute instances delete {}-cromwell --project {} --zone us-central1-a'.format(TASK.replace('_','-'),PROJECT_ID))

gcloud compute instances delete melt-cromwell --project gcp_project_ID --zone us-central1-a
