# Joint Cohort Calling
For power users of CAVATICA with cohorts larger than 2,200, but also one in which any instance being used will not exceed 4TB in EBS storage
1. Set up CAVATICA credentials
1. Set up split Split by chr jobs and run
1. Tag split jobs so that they can be easily found and deleted later
1. Run Genotyper

## Set up imports and creds
Credential set up using developer token allows you to set up task jobs. By design, token is deleted after creds file is created. Creds file should disappear after session which is secure. If you're running locally, adjust accordingly

In [None]:
import sys
import os
import sevenbridges as sbg
from sevenbridges.errors import SbgError
from sevenbridges.http.error_handlers import rate_limit_sleeper, maintenance_sleeper
from getpass import getpass
import pdb

config_name =  "/home/jovyan/.sevenbridges/credentials"
try:
    os.mkdir("/home/jovyan/.sevenbridges")
except Exception as e:
    print(e)
config_file=open(config_name, 'w')

endpoint='api_endpoint = https://cavatica-api.sbgenomics.com/v2\n'
token = getpass('Enter your sbg token:')
config_file.write("[default]\n" + endpoint + "auth_token = " + token)
config_file.close()
config = sbg.Config(profile='default')
api = sbg.Api(config=config, error_handlers=[rate_limit_sleeper, maintenance_sleeper])
del token

## Set up split jobs
Need to have created a list of chromosomes to use for splitting and downstream joint calling. Recommend chr1-22,X,Y,M. **Also recommend at project level to turn on Spot Instances and Memoization**

In [None]:
project = 'd3b-bixu/kf-chd-all-joint-call-temp'
chr_list = 'split_vcf_chr_list.txt'
chr_obj = api.files.query(project=project, names=[chr_list])[0]
chr_array = chr_obj.content().rstrip('\n').split("\n")
split_by_chr = True
gvcf_files = api.files.query(project=project, tags=["PORTAL"]).all()

gvcf_set_list = []
gvcf_set_list.append([])
j=0
x = 1
n = 39 # set so that 60 tasks are made
# create set tasks - n per set
for gvcf in gvcf_files:
    if x > n:
        gvcf_set_list.append([])
        j += 1
        x=1
        print('Creating next set of ' + str(n), file=sys.stderr)
    gvcf_set_list[j].append(gvcf)
    x += 1


In [None]:
ct = 1
# Tinker with this based on expected input size a level of desired stacking
instance_type: "c5.12xlarge;ebs-gp2;1500"
app_name = project + "/split-vcf-mini-workflow"
for gvcf_set in gvcf_set_list:
    in_dict = {"chr_list": chr_obj, "chr_array": chr_array, "input_vcf": gvcf_set}
    task_name = "Split VCF by chr set: " + str(ct)
    task = api.tasks.create(app=app_name, name=task_name, inputs=in_dict, project=project, run=False, execution_settings = {"instance_type": [instance_type]})
    task.save()
    ct +=1



### Run the split tasks

In [None]:
tasks = api.tasks.query(project=project, status="DRAFT").all()
phrase = "Split VCF by chr set"
for task in tasks:
    if task.name.startswith(phrase):
        task.run()

## Tag outputs
Useful to organize inputs. Uses bulk get and bulk update - absolutely critical as per-file get and save will cause you to hit the api limit even if you have a turbo token real fast!

In [None]:
project = 'd3b-bixu/kf-chd-all-joint-call-temp'
chr_list = 'split_vcf_chr_list.txt'
chr_obj = api.files.query(project=project, names=[chr_list])[0]
chr_array = chr_obj.content().rstrip('\n').split("\n")

tasks = api.tasks.query(project=project, status="COMPLETED").all()
phrase = "Split VCF by chr set"
for task in tasks:
    if task.name.startswith(phrase):
        print(task.name)
        for outdir in task.outputs['split_vcfs']:
            split_set = api.files.bulk_get(outdir)
            update_set = []
            for bulk_file in split_set:
                try:
                    # update vcfs
                    parts = bulk_file.resource.name.split("_")
                    bulk_file.resource.tags.extend([parts[0], "INTERMEDIATE"])
                    update_set.append(bulk_file.resource)
                    
                except Exception as e:
                    print("{} {}".format(e, "\nFailed in VCF loop"))
                    pdb.set_trace()
                    hold = 1
            # update secondary files, just the intermediate tag to make it easy to find to delete
            secondary = [x.secondary_files[0] for x in outdir]
            bulk_index = api.files.bulk_get(secondary)
            for index_file in bulk_index:
                try:
                    # update indexes
                    index_file.resource.tags.append("INTERMEDIATE")
                    update_set.append(index_file.resource)
                    
                except Exception as e:
                    print("{} {}".format(e, "\nFailed in index loop"))
                    pdb.set_trace()
                    hold = 1
            try:
                api.files.bulk_update(update_set)
            except Exception as e:
                print("{} {}".format(e, "\nFailed in update"))
                pdb.set_trace()
                hold=1


## Set up joint call tasks
Adjust the variables at the start as-needed to fit your project/preferences

In [None]:
# Set up some static inputs here
cpus = 48
# split by chr is beefy - maxing out disk space to be safe
instance_type = "c5.12xlarge;ebs-gp2;4000"
reference_name = "Homo_sapiens_assembly38.fasta"
dbsnp_name = "Homo_sapiens_assembly38.dbsnp.vcf.gz"
sentieon_license="10.5.64.221:8990"
out_file_prefix = "KF-CHDALL_"
ref_dict={ "reference": api.files.query(project=project, names=[reference_name])[0], "dbSNP": api.files.query(project=project, names=[dbsnp_name])[0], "sentieon_license": sentieon_license, "cpu_per_job": cpus }
app_name = project + "/sentieon-gvcftyper"

for chrom in chr_array:
    task_name = "Sentieon GVCFtyper: " + chrom
    # convert collection object to list of objects, then sort to ensure each job has the samples in the same order
    input_gvcf_collection = api.files.query(project=project, tags=[chrom]).all()
    # list conversion might take 10 seconds or so
    input_gvcf_list = list(input_gvcf_collection)
    input_gvcf_list.sort(key=lambda x: x.name)
    in_dict = {}
    for key in ref_dict:
        in_dict[key] = ref_dict[key]
    in_dict['input_gvcf_files'] = input_gvcf_list
    in_dict['interval'] = chrom
    in_dict['output_file_name'] = out_file_prefix + chrom + ".vcf.gz"
    task = api.tasks.create(app=app_name, name=task_name, inputs=in_dict, project=project, run=False, execution_settings = {"instance_type": [instance_type]} )
    print("Created " + task.name)
        
    

### Run the GT Tasks

In [None]:
tasks = api.tasks.query(project=project, status="DRAFT").all()
phrase = "Sentieon GVCFtyper"
for task in tasks:
    if task.name.startswith(phrase):
        task.run()

## MISC
Not needed but might be useful to get some cost and run time info

In [None]:
project = 'd3b-bixu/kf-chd-all-joint-call-temp'
tasks = api.tasks.query(project=project).all()
phrase = "Split VCF by chr set"
gt_phrase = "Sentieon GVCFtyper:"
split_total = 0
gt_total = 0
gt_run_time = {}
split_run_time = {}
total_total = 0
for task in tasks:
    if task.price is not None:
        run_hours=(task.end_time - task.start_time).seconds/3600
        total_total += task.price.amount
        if task.name.startswith(phrase):
            split_total += task.price.amount
            if task.name not in split_run_time:
                split_run_time[task.name] = run_hours
            else:
                split_run_time[task.name] += run_hours
        elif task.name.startswith(gt_phrase):
            gt_total += task.price.amount
            if task.name not in split_run_time:
                gt_run_time[task.name] = run_hours
            else:
                gt_run_time[task.name] += run_hours

print("Total spent so far: {}, Split costs: {}, GT costs {}".format(total_total, split_total, gt_total))
with open("split_run_times.txt", 'w') as s:
    for task_name in split_run_time:
        print("{}\t{}".format(task_name, split_run_time[task_name]), file=s)
with open("gt_run_times.txt", 'w') as f:
    for task_name in gt_run_time:
        print("{}\t{}".format(task_name, gt_run_time[task_name]), file=f)