# Create Sentieon Joint Genotyping "Workflow"
This workflow consists of the following steps, mangaged in a more "manual" way using this notebook:
1. Batch run sharding of gVCFs, in sets of N/12
1. Move Sharded gVCFs into each of the subprojects(3)
1. Run Senteion GVCFTyper tool on each set of sharded samples
1. Copy/move results to main project
1. Profit?

In [1]:
import sevenbridges as sbg
from sevenbridges.errors import SbgError
from sevenbridges.http.error_handlers import rate_limit_sleeper, maintenance_sleeper
import sys
import re
import pdb
import concurrent.futures
from requests import request
import json
config = sbg.Config(profile='turbo')
api = sbg.Api(config=config, error_handlers=[rate_limit_sleeper, maintenance_sleeper])

## Patch metadata to files
Example input:

```text
File Name	Data Type	File Format	Experiment Strategy	Participants ID	Proband	Family Id	Sample External ID	Aliquot External ID
ea542aa1-9ee9-499d-a1d9-8d51bc818c21.g.vcf.gz	gVCF	gvcf	WGS	PT_SGZCGWED	Yes	FM_K8K528BS	CG0034-9301	CG0034-9301
20046342-f24b-43a6-a79a-ed5687801a87.g.vcf.gz	gVCF	gVCF	WGS	PT_MJXDMGZC	No	FM_C8TH0DP9	CG0006-3366	--
57521e6e-0add-4793-a28f-6a3703c3b7dd.g.vcf.gz	gVCF	gvcf	WGS	PT_VXWFWHG3	Yes	FM_ERZ72PM5	CG0000-3398	CG0000-3398
```

In [None]:
def patch_gvcf_metadata(line, meta_dict):
    try:
        info = line.rstrip('\n').split('\t')
        gvcf = api.files.get(info[0])
        gvcf.metadata = meta_dict[gvcf.name]
        gvcf.save()
    except Exception as e:
        print (e)
        exit(1)

In [None]:
metadata_file = open('/Users/brownm28/Documents/2022-May-6_KF-CHD_joint_cohort_call/gvcf_metadata.tsv')
gvcf_manifest = api.files.get('627535674d85bc2e028b5ff3').content().split('\n')
meta_dict = {}
head = next(metadata_file)
header = head.rstrip('\n').split('\t')
meta_keys = { 'experiment_strategy': header.index("Experiment Strategy"),
            'case_id': header.index("Participants ID"), 'Proband': header.index('Proband') }
for line in metadata_file:
    info = line.rstrip('\n').split('\t')
    meta_dict[info[0]] = {}
    for key in meta_keys:
        meta_dict[info[0]][key] = info[meta_keys[key]]
# pdb.set_trace()
with concurrent.futures.ThreadPoolExecutor(16) as executor:
    results = {executor.submit(patch_gvcf_metadata, gvcf_manifest[i], meta_dict): gvcf_manifest[i] for i in range(1, len(gvcf_manifest), 1)}
    


## Batch run GVCF Shard
Requires interval lists to have been created

### Set up files and refs

In [None]:
def get_batch_refs(api, project):
    ref_dict = {}
    bed_files = api.files.query(project=project, metadata={"data_type": "padded_bed"})
    ref_dict['shard_bed'] = bed_files
    ref_dict['cores'] = 16
    return ref_dict

In [None]:
project='kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'
app = project + '/shard_vcf'

# read manifest directly into array
gvcf_manifest = api.files.get('627535674d85bc2e028b5ff3').content().split('\n')
gvcf_set_list = []
gvcf_set_list.append([])
j=0
x = 1
n = 300
# create batch tasks - 300 per batch
for i in range(1, len(gvcf_manifest) - 1, 1):
    # pdb.set_trace()
    if x > n:
        gvcf_set_list.append([])
        j += 1
        x=1
        sys.stderr.write('Creating next set of 300\n')
    info = gvcf_manifest[i].split('\t')
    gvcf_set_list[j].append(api.files.get(info[0]))
    x += 1
ref_dict = get_batch_refs(api, project)

    


### Set up batch tasks

In [None]:
batch_by = {'type': 'item'}
batch_input = 'input_vcf'

for i in range(1, len(gvcf_set_list), 1):
    task_name = "Batch Shard Set " + str(i)
    in_dict = {}
    for key in ref_dict:
        in_dict[key] = ref_dict[key]
    in_dict['input_vcf'] = gvcf_set_list[i]
    try:
        task = api.tasks.create(
            name=task_name, project=project, app=app, inputs=in_dict,
            batch_input=batch_input, batch_by=batch_by, run=False
        )
        task.save()
    except SbError:
        print('Could not create batch task: ' + task_name)


### Capture missed batch?!?!?

In [None]:
project='kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'
app = project + '/shard_vcf'
missed_manifest = open('/Users/brownm28/Documents/2022-May-6_KF-CHD_joint_cohort_call/batch_missed.tsv')
ref_dict = get_batch_refs(api, project)
head = next(missed_manifest)
input_vcf = []
for line in missed_manifest:
    info = line.rstrip('\n').split('\t')
    try:
        input_vcf.append(api.files.get(info[0]))
    except Exception as e:
        print (e)
        print("Skipping file " + info[0] + " " + info[1] + " due to error")


In [None]:
batch_by = {'type': 'item'}
batch_input = 'input_vcf'

task_name="MISSED SHARD SET"
in_dict = {}
in_dict['input_vcf'] = input_vcf
for key in ref_dict:
    in_dict[key] = ref_dict[key]

task = api.tasks.create(
    name=task_name, project=project, app=app, inputs=in_dict,
    batch_input=batch_input, batch_by=batch_by, run=False
)
task.save()


### Copy metadata for missed batch

In [None]:
batch_task = api.tasks.get("ee0c7576-1d32-458c-a018-b3f67c5157da")
for task in batch_task.get_batch_children().all():
    metadata = task.inputs['input_vcf'].metadata
    for output in task.outputs['sharded_vcf']:
        vcf = api.files.get(output)
        tbi = api.files.get(output.secondary_files[0])
        for key in metadata:
            vcf.metadata[key] = metadata[key]
            tbi.metadata[key] = metadata[key]
        shard = int(re.match('.*-(\d+)\.g\.vcf\.gz', vcf.name).group(1))
        vcf.metadata['shard'] = shard
        tbi.metadata['shard'] = shard
        try:
            vcf.save()
            tbi.save()
        except Exception as e:
            print(e)
    print("Processed child task: " + task.name)


### Copy metadata to outputs
Note: Like, 70% worked

In [None]:
def copy_metadata(batch_task):
    try:
        print("Processing " + batch_task.name)
        for task in batch_task.get_batch_children().all():
            if task.status=="COMPLETED":
                metadata = task.inputs['input_vcf'].metadata
                for output in task.outputs['sharded_vcf']:
                    vcf = api.files.get(output)
                    tbi = api.files.get(output.secondary_files[0])
                    for key in metadata:
                        vcf.metadata[key] = metadata[key]
                        tbi.metadata[key] = metadata[key]
                    shard = int(re.match('.*-(\d+)\.g\.vcf\.gz', vcf.name).group(1))
                    vcf.metadata['shard'] = shard
                    tbi.metadata['shard'] = shard
                    try:
                        vcf.save()
                        tbi.save()
                    except Exception as e:
                        print(e)
            else:
                print ("task had failed, skipping: " + task.name)
    except Exception as e:
        print(e)
        print("Might be a single task, trying that\n")
        try:
            task=batch_task
            metadata = task.inputs['input_vcf'].metadata
            for output in task.outputs['sharded_vcf']:
                vcf = api.files.get(output)
                tbi = api.files.get(output.secondary_files[0])
                for key in metadata:
                    vcf.metadata[key] = metadata[key]
                    tbi.metadata[key] = metadata[key]
                shard = int(re.match('.*-(\d+)\.g\.vcf\.gz', vcf.name).group(1))
                vcf.metadata['shard'] = shard
                tbi.metadata['shard'] = shard
                vcf.save()
                tbi.save()
        except Exception as e:
            print(e)
            exit(1)

In [None]:
project='kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'
tasks = api.tasks.query(project=project, status="COMPLETED").all()
phrase = 'Batch Shard Set'
batch_task_list = []
for task in tasks:
    if re.search(phrase, task.name):
        batch_task_list.append(task)

# with concurrent.futures.ThreadPoolExecutor(16) as executor:
#     results = {executor.submit(copy_metadata, batch_task): batch_task for batch_task in batch_task_list}
for batch_task in batch_task_list:
    copy_metadata(batch_task)
    

### Move files to sub-projects
First copy, then remove from parent project.
Note: Had weird duplicates after copy

In [None]:
def cp_files(shard_file, sp_cp):
    try:
        shard_file.copy(project=sp_cp, name=shard_file.name)
    except Exception as e:
        print (e)
        exit(1)

In [None]:
def rm_files(shard_file):
    try:
        shard_file.delete()
    except Exception as e:
        print (e)
        exit(1)

In [None]:
src_prj = 'kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'

num_shards = 12
k = 1
for i in range(0, num_shards, 4):
    sp_cp = src_prj + "-" + str(k)
    for j in range(i, i+4, 1):
        shard_set = api.files.query(project=src_prj, metadata={'shard': j}).all()
        with concurrent.futures.ThreadPoolExecutor(16) as executor:
            results = {executor.submit(cp_files, shard_file, sp_cp): shard_file for shard_file in shard_set}
        sys.stderr.write('Finished copying set ' + str(j) + '\n')
    k += 1    


In [None]:
src_prj = 'kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'
check = input("Type YASS if you are sure you want to delete files sharded files from " + src_prj + "\n")
if check=='YASS':
    num_shards = 12
    for i in range(0, num_shards, 1):
        shard_set = api.files.query(project=src_prj, metadata={'shard': j}).all()
        with concurrent.futures.ThreadPoolExecutor(16) as executor:
            results = {executor.submit(rm_files, shard_file): shard_file for shard_file in shard_set}
        sys.stderr.write('Finished deleting set ' + str(i) + '\n')
else:
    print ("Invalid input, skipping delete")


## Set up JG Calls
VCFs MUST be given in the same sorted order to each shard job.
Unfortunately, you may have to re-input gVCF in the GUI anyway, as every load I have tried (2,511 files) is incomplete

In [2]:
def get_jg_refs(api, project):
    ref_dict = {}
    ref_dict['reference'] = api.files.query(project=project, names=['Homo_sapiens_assembly38.fasta'])[0]
    ref_dict['dbSNP'] = api.files.query(project=project, names=['Homo_sapiens_assembly38.dbsnp.vcf.gz'])[0]
    ref_dict['cpu_per_job'] = 36
    ref_dict['emit_mode'] = 'variant'
    ref_dict['genotype_model'] = 'multinomial'
    ref_dict['sentieon_license'] = '10.5.59.108:8990'
    return ref_dict

In [3]:
shard_dict = {
    5: ["chr1:1-248956422", "chr2:1-16145119"],
    6: ["chr2:16146120-242193529", "chr3:1-90565295"],
    9: ["chr3:90565296-198295559", "chr4:1-190123121"],
    4: ["chr4:190173122-190214555", "chr5:1-181538259","chr6:1-61370554"],
    10: ["chr6:61370555-170805979", "chr7:1-62456779"],
    2: ["chr7:62506780-159345973", "chr8:1-145138636"],
    3: ["chr9:1-138394717", "chr10:1-124121200"],
    8: ["chr10:124121201-133797422", "chr11:1-135086622", "chr12:1-132223362"],
    0: ["chr12:132224363-133275309", "chr13:1-114364328", "chr14:1-107043718", "chr15:1-23226874"],
    1: ["chr15:23276875-101991189", "chr16:1-90338345", "chr17:1-83257441"],
    7: ["chr18:1-80373285", "chr19:1-58617616", "chr20:1-64444167", "chr21:1-46709983", "chr22:1-18659564"],
    11: ["chr22:18709565-50818468", "chrX:1-156040895", "chrY:1-57227415"]
}

src_prj = 'kfdrc-harmonization/sd-preasa7s-cohort-joint-genotyping'


### Set up Subproject 1

In [4]:
for i in range(0,4,1):
    cur = src_prj + "-1"
    app_name = cur + "/sentieon_gvcftyper"
    refs = get_jg_refs(api, cur)
    task_name = "Joint Call Shard Set: " + str(i)
    in_dict = {}
    for key in refs:
        in_dict[key] = refs[key]
    shard_list = []
    for shard in shard_dict[i]:
        shard_list.append("--shard " + shard)
    in_dict['advanced_driver_options'] = " ".join(shard_list)
    vcf_collection=api.files.query(project=cur, metadata={"shard": i}).all()
    vcf_list = list(vcf_collection)
    vcf_list.sort(key=lambda x: x.name)
    in_dict['input_vcfs'] = vcf_list
    in_dict['output_file_name'] = "KF-CHD_joint_genotyping_cohort_call-" + str(i) + ".vcf.gz"
    task = api.tasks.create(app=app_name, name=task_name, inputs=in_dict, project=cur, run=False)
    task.save()
    print("Created task for shard " + str(i))

Created task for shard 0
Created task for shard 1
Created task for shard 2
Created task for shard 3


In [None]:
vcf_collection=api.files.query(project=cur, metadata={"shard": i}).all()
vcf_list = list(vcf_collection)
pdb.set_trace()
hold=1

### Set up Subproject 2

In [13]:
for i in range(4,8,1):
    cur = src_prj + "-2"
    app_name = cur + "/sentieon_gvcftyper"
    refs = get_jg_refs(api, cur)
    task_name = "Joint Call Shard Set: " + str(i)
    in_dict = {}
    for key in refs:
        in_dict[key] = refs[key]
    shard_list = []
    for shard in shard_dict[i]:
        shard_list.append("--shard " + shard)
    in_dict['advanced_driver_options'] = " ".join(shard_list)
    vcf_collection=api.files.query(project=cur, metadata={"shard": i}).all()
    vcf_list = list(vcf_collection)
    vcf_list.sort(key=lambda x: x.name)
    in_dict['input_vcfs'] = vcf_list
    in_dict['output_file_name'] = "KF-CHD_joint_genotyping_cohort_call-" + str(i) + ".vcf.gz"
    task = api.tasks.create(app=app_name, name=task_name, inputs=in_dict, project=cur, run=False)
    task.save()
    print("Created task for shard " + str(i))

Created task for shard 4
Created task for shard 5
Created task for shard 6
Created task for shard 7


### Set up Subproject 3

In [14]:
for i in range(8,12,1):
    cur = src_prj + "-3"
    app_name = cur + "/sentieon_gvcftyper"
    refs = get_jg_refs(api, cur)
    task_name = "Joint Call Shard Set: " + str(i)
    in_dict = {}
    for key in refs:
        in_dict[key] = refs[key]
    shard_list = []
    for shard in shard_dict[i]:
        shard_list.append("--shard " + shard)
    in_dict['advanced_driver_options'] = " ".join(shard_list)
    vcf_collection=api.files.query(project=cur, metadata={"shard": i}).all()
    vcf_list = list(vcf_collection)
    vcf_list.sort(key=lambda x: x.name)
    in_dict['input_vcfs'] = vcf_list
    in_dict['output_file_name'] = "KF-CHD_joint_genotyping_cohort_call-" + str(i) + ".vcf.gz"
    task = api.tasks.create(app=app_name, name=task_name, inputs=in_dict, project=cur, run=False)
    task.save()
    print("Created task for shard " + str(i))

Created task for shard 8
Created task for shard 9
Created task for shard 10
Created task for shard 11
