# VRS workflow

## Setup Dependencies

In [18]:
%%capture
!pip install seqrepo ga4gh.vrs[extras]==2.0.0a3 ga4gh.vrs
%pip install --upgrade --no-cache-dir terra-notebook-utils

In [2]:
from datetime import datetime
from firecloud import api as fapi
from ga4gh.core import ga4gh_identify
from ga4gh.vrs import models
from ga4gh.vrs.extras.vcf_annotation import VCFAnnotator
from pathlib import Path
from terra_notebook_utils import drs
from time import time 

import ast
import datetime
import glob
import io
import logging
import multiprocessing
import os 
import pandas as pd
import pickle
import pysam
import requests
import subprocess
import vcf

In [3]:
# store relevant variables

%env SEQREPO_ROOT=/home/jupyter/seqrepo
%env VCFTOOLS_DIR=/home/jupyter/vcftools
%env PERL5LIB=/home/jupyter/vcftools/src/perl/
%env VCFTOOLS=/home/jupyter/vcftools/src/cpp/vcftools
%env OUTPUT=/home/jupyter/output
%env SPLIT_DIR=/home/jupyter/split
%env INPUT_DIR=/home/jupyter/vcf
!mkdir $INPUT_DIR
!mkdir $SPLIT_DIR
!mkdir $OUTPUT

SEQREPO_DIR = os.environ["SEQREPO_ROOT"]+"/latest"

env: SEQREPO_ROOT=/home/jupyter/seqrepo
env: VCFTOOLS_DIR=/home/jupyter/vcftools
env: PERL5LIB=/home/jupyter/vcftools/src/perl/
env: VCFTOOLS=/home/jupyter/vcftools/src/cpp/vcftools
env: OUTPUT=/home/jupyter/output
env: SPLIT_DIR=/home/jupyter/split
env: INPUT_DIR=/home/jupyter/vcf
mkdir: cannot create directory ‘/home/jupyter/vcf’: File exists
mkdir: cannot create directory ‘/home/jupyter/split’: File exists
mkdir: cannot create directory ‘/home/jupyter/output’: File exists


In [4]:
# install vcftools and complete setup
# don't worry about the pyvcf error 

!bash ~/setup.sh

Collecting ga4gh.vrs==2.0.0a2 (from ga4gh.vrs[extras]==2.0.0a2->-r /home/jupyter/requirements.txt (line 2))
  Using cached ga4gh.vrs-2.0.0a2-py2.py3-none-any.whl.metadata (10 kB)
Collecting pyVCF (from -r /home/jupyter/requirements.txt (line 4))
  Using cached PyVCF-0.6.8.tar.gz (34 kB)
  Preparing metadata (setup.py) ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[1 lines of output][0m
  [31m   [0m error in PyVCF setup command: use_2to3 is invalid.
  [31m   [0m [31m[end of output][0m
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
[1;31merror[0m: [1mmetadata-generation-failed[0m

[31m×[0m Encountered error while generating package metadata.
[31m╰─>[0m See above for output.

[1;35mnote[0m: This is an issue with the package mentioned above, not pip.
[1;36mhint

In [5]:
# helpful functions

def truncate(s, first_few, last_few):
    "truncate string printing only first_few and last_few characters"
    return f"{s[:first_few]}...{s[-last_few:]}"

## Get [1000G](https://anvil.terra.bio/#workspaces/anvil-datastorage/AnVIL_1000G_PRIMED-data-model/data) VCF Data for NA12878

In [19]:
# specify patient and chromosomes

chrs_of_interest = set("1")
patient = "NA12878"

In [6]:
# get metadata for filepaths
# openly sourced from https://anvil.terra.bio/#workspaces/anvil-datastorage/AnVIL_1000G_PRIMED-data-model/data

df = pd.read_csv(io.StringIO(fapi.get_entities_tsv("anvil-datastorage", \
                "AnVIL_1000G_PRIMED-data-model", "sequencing_file", model="flexible").text), sep='\t')
df.head()

Unnamed: 0,entity:sequencing_file_id,chromosome,file_path,file_type,md5sum,sequencing_dataset_id
0,00029d9f,4,gs://fc-2ee2ca2a-a140-48a1-b793-e27badb7945d/p...,PLINK2 pvar,9e193baa707fa07a007ed1f6595d8427,JPT_hg19_mega_hm3
1,000d7475,7,gs://fc-2ee2ca2a-a140-48a1-b793-e27badb7945d/p...,PLINK2 psam,1c0ce61414556e3f00b132a455d6132b,FIN_hg38_mega_hm3
2,00131101,14,gs://fc-2ee2ca2a-a140-48a1-b793-e27badb7945d/p...,PLINK2 psam,02fafde1613a0dae38ac0c0ef30da925,GIH_hg38_mega_hm3
3,0018383c,10,gs://fc-2ee2ca2a-a140-48a1-b793-e27badb7945d/p...,PLINK2 pgen,ad2cc5b44ad6d75d2fa1a420227214cd,ASW_hg38_hm3
4,00192ae2,11,gs://fc-2ee2ca2a-a140-48a1-b793-e27badb7945d/p...,PLINK2 pgen,fa2e3a6475bc43ab3bede56cf4b3f5f3,AMR_hg19_hm3


In [7]:
# get rid of gvcf data

df_vcf = df[df['file_type'].isin(['VCF', 'VCF index'])]
df_1kgp = df_vcf[df_vcf['file_path'].str.contains('1kGP')]

num_vcf_idx_files = sum(df_1kgp['file_type'] == 'VCF index')
num_vcf_files = sum(df_1kgp['file_type'] == 'VCF')
assert num_vcf_files == 23 and num_vcf_idx_files == 23, \
    f"check number of files, {num_vcf_files} vcfs and {num_vcf_idx_files} index files"

In [8]:
# load 1000G file if doesn't exist

df_chrs = df_1kgp[df_1kgp['chromosome'].isin(chrs_of_interest)]
uris = df_chrs['file_path']
file_names = [uri.split("/")[-1] for uri in uris]

for file_name, uri in zip(file_names, uris):
    if os.path.exists(f"{os.environ['INPUT_DIR']}/{file_name}"):
        print(f"{truncate(file_name, 35, 10)} already exists, not downloading")
    else:
        split_vcf_cmd = f"gsutil -u $GOOGLE_PROJECT cp {uri} $INPUT_DIR/"
        output = subprocess.run(split_vcf_cmd, shell=True, check=True)

1kGP_high_coverage_Illumina.chr1.fi...vcf.gz.tbi already exists, not downloading
1kGP_high_coverage_Illumina.chr1.fi...nel.vcf.gz already exists, not downloading


In [9]:
vcfs = df_chrs[df_chrs['file_type'] == 'VCF']
input_vcf = f"{os.environ['INPUT_DIR']}/{file_names[0]}"

print(input_vcf)
assert os.path.exists(input_vcf), "file doesn't exist"

/home/jupyter/vcf/1kGP_high_coverage_Illumina.chr1.filtered.SNV_INDEL_SV_phased_panel.vcf.gz.tbi


In [15]:
! ls -lh /home/jupyter/split/

total 565M
-rw-rw-r-- 1 jupyter users  167 Feb 23 06:16 NA12878-0-to-169407-hits.pkl
-rw-rw-r-- 1 jupyter users  48K Feb 22 23:00 NA12878.1000.vcf
-rw-rw-r-- 1 jupyter users  57K Feb 22 23:01 NA12878.1000.vcf.gz
-rw-rw-r-- 1 jupyter users 695K Feb 22 23:01 NA12878.1000-vrs-objects.pkl
-rw-rw-r-- 1 jupyter users  167 Feb 23 08:26 NA12878-169407-to-338815-hits.pkl
-rw-rw-r-- 1 jupyter users  16M Feb 22 01:38 NA12878.filtered.vcf
-rw-rw-r-- 1 jupyter users  21M Feb 23 00:06 NA12878.filtered.vcf.gz
-rw-rw-r-- 1 jupyter users 264M Feb 23 00:06 NA12878.filtered-vrs-objects.pkl
-rw-rw-r-- 1 jupyter users 264M Feb 22 01:06 NA12878.recode.vcf


In [16]:
! mv /home/jupyter/split/NA12878.filtered.vcf /home/jupyter/split/NA12878.chr1.filtered.vcf
! mv /home/jupyter/split/NA12878.recode.vcf /home/jupyter/split/NA12878.chr1.recode.vcf
! ls -lh /home/jupyter/split/

total 565M
-rw-rw-r-- 1 jupyter users  167 Feb 23 06:16 NA12878-0-to-169407-hits.pkl
-rw-rw-r-- 1 jupyter users  48K Feb 22 23:00 NA12878.1000.vcf
-rw-rw-r-- 1 jupyter users  57K Feb 22 23:01 NA12878.1000.vcf.gz
-rw-rw-r-- 1 jupyter users 695K Feb 22 23:01 NA12878.1000-vrs-objects.pkl
-rw-rw-r-- 1 jupyter users  167 Feb 23 08:26 NA12878-169407-to-338815-hits.pkl
-rw-rw-r-- 1 jupyter users  16M Feb 22 01:38 NA12878.chr1.filtered.vcf
-rw-rw-r-- 1 jupyter users 264M Feb 22 01:06 NA12878.chr1.recode.vcf
-rw-rw-r-- 1 jupyter users  21M Feb 23 00:06 NA12878.filtered.vcf.gz
-rw-rw-r-- 1 jupyter users 264M Feb 23 00:06 NA12878.filtered-vrs-objects.pkl


In [20]:
# get patient-level for single chr
patient_path_stem = f"{os.environ['SPLIT_DIR']}/{patient}" 

for c in chrs_of_interest:
    patient_vcf_path = f"{patient_path_stem}.chr{c}.recode.vcf"
    filtered_patient_vcf_path = f"{patient_path_stem}.chr{c}.filtered.vcf"

    # split vcf by patient
    if os.path.exists(patient_vcf_path):
        print(f"already split file: {patient_vcf_path}")
    else:
        split_vcf_cmd = f"$VCFTOOLS --recode --gzvcf {input_vcf} \
                    --out {patient_vcf_path} --indv {patient}"

        output = subprocess.run(split_vcf_cmd, shell=True, check=True)
        
    # filter to only relevant genotypes
    if os.path.exists(filtered_patient_vcf_path):
        print(f"already filtered file: {filtered_patient_vcf_path}")
    else:
        filter_genotypes_cmd = f'grep -v "0|0" $SPLIT_DIR/{patient}.recode.vcf' + \
                        f" > {filtered_patient_vcf_path}"
        subprocess.run(filter_genotypes_cmd, shell=True, check=True)

already split file: /home/jupyter/split/NA12878.chr1.recode.vcf
already filtered file: /home/jupyter/split/NA12878.chr1.filtered.vcf


In [12]:
# # OPTIONAL
# # filter to first num_lines

# num_lines = 1000
# head_vcf = f"{patient_path_stem}.{num_lines}.vcf"

# head_cmd = f"cat {filtered_patient_vcf_path} | head -n {num_lines} > {head_vcf}"
# output = subprocess.run(head_cmd, shell=True, check=True)

In [13]:
# !find ~ -name *1000.vcf

In [14]:
# get number of lines in 
! wc -l $SPLIT_DIR/NA12878.filtered.vcf

338926 /home/jupyter/split/NA12878.filtered.vcf


In [None]:
# checking my work, make sure it totals up to the final number

for term in ["0|1", "1|0", "1|1", "0|0", "#", "."]:
    subprocess.run(f"grep '{term}' $SPLIT_DIR/NA12878.recode.vcf | wc -l", \
                  shell=True, check=True)

In [None]:
# # create annotated vcf test file 
# def annotate_vcf(input_vcf, output_vcf, output_pkl, seqrepo_root_dir, require_validation=True, rle_seq_limit=50):
#     '''param stem: path of input vcf file'''
#     vcf_annotator = VCFAnnotator(seqrepo_root_dir=seqrepo_root_dir)
#     vcf_annotator.tlr.rle_seq_limit = rle_seq_limit
#     vcf_annotator.annotate(vcf_in=input_vcf, vcf_out=output_vcf, \
#         vrs_pickle_out=output_pkl, require_validation=require_validation)

# logger = logging.getLogger("ga4gh.vrs.extras.vcf_annotation")
# logger.setLevel(level=logging.INFO)

# curr_input_vcf = filtered_patient_vcf_path

# stem = curr_input_vcf.replace('.vcf', '')
# output_vcf = f"{stem}.vcf.gz"
# output_pkl = f"{stem}-vrs-objects.pkl"

# print("writing to...")
# print(output_vcf)
# print(output_pkl)

# t = time()
# annotate_vcf(curr_input_vcf, output_vcf, output_pkl, SEQREPO_DIR)
# elapsed_time = time()-t
# print(f"annotation: {(elapsed_time):.2f}s")

In [None]:
def unpickle(file_name):
    """Unpickle vrs objects to single dict"""
    with open(file_name, 'rb') as f:
        vrs_objects = pickle.load(f)
        for k, v in vrs_objects.items():
            vrs_objects[k] = ast.literal_eval(v)
    
    return vrs_objects

def meta_kb(id, recent=True, log=False):
    """Query metakb using vrs object"""
    # k, allele_dict = item
    
    # if translator is not None:
    #     if log: print(f"by {fmt}...")
    #     allele = models.Allele(**allele_dict)
    #     _id = translator.translate_to(allele, fmt)
    # else:
    #     if log: print("by vrs id...")
    #     _id = allele_dict["id"]
        
        
    if recent:
        if log: print("recent elasticbeanstalk api (VRS 2.0 models)")
        response = requests.get("http://metakb-dev-eb.us-east-2.elasticbeanstalk.com" \
                                f"/api/v2/search/studies?variation={id}&detail=false")
    else:
        if log: print("old api (VRS 1.3 models)")
        response = requests.get("https://dev-search.cancervariants.org" \
                                f"/api/v2/search?variation={id}&detail=false")
    
    response_json = response.json()
    
    if response_json['warnings'] == []:
        return (id, response_json)
    else:
        if log: print(response_json['warnings'])

def num_variants(input_vcf):
    # get total num_variants
    vcf_reader = pysam.VariantFile(open(input_vcf, 'r'))
    return sum(1 for _ in vcf_reader)

def parallelize(vrs_decorator, vrs_objects, worker_count=4, progress_interval=500, limit=None):
    """harvest data from service"""

    manager = multiprocessing.Manager()
    results = manager.list()

    with multiprocessing.Pool(worker_count) as pool:
        # call the function for each item in parallel
        c = 0
        print(datetime.now().isoformat(), c)

        for result in pool.imap(vrs_decorator, vrs_objects):
            c += 1
            if result:
                results.append(result)
            if c == limit:
                break
            elif c % progress_interval == 0:
                print(datetime.now().isoformat(), c)
    
    return results

def print_dict(d, indent=2):
    """pretty print object as json"""
    print(json.dumps(d, indent=indent))

def print_percent(a, b):
    "pretty print percentages"
    print(f"{a}/{b} = {(100.0*a/b):.1f}%")

In [None]:
# get total num_variants
curr_input_vcf = filtered_patient_vcf_path

vcf_reader = pysam.VariantFile(open(curr_input_vcf, 'r'))
num_variants = int(subprocess.run(f"grep -v '^#' {curr_input_vcf} | wc -l", \
                              stdout=subprocess.PIPE, shell=True, \
                              check=True, text=True).stdout)

print(f'num_vrs_objects to num_variants: {len(allele_dicts)}/{num_variants}={100*(len(allele_dicts)/num_variants):.2f}%')

In [None]:
with open("/home/jupyter/split/NA12878-0-to-169407-hits.pkl", 'rb') as file:
    thing = pickle.load(file)
    for t in thing:
        print(t)

In [None]:
id_start = num_variants
id_end = num_variants*2 # ids to process
progress_interval = 10000
metakb_output_pkl = f"{patient_path_stem}-{id_start}-to-{id_end}-hits.pkl"

print(f"writing to {metakb_output_pkl}")

stem = curr_input_vcf.replace('.vcf', '')
output_vcf = f"{stem}.vcf.gz"
output_pkl = f"{stem}-vrs-objects.pkl"
    
# get pickle totals
allele_dicts = unpickle(output_pkl)

# convert alleles to vrs ids
t = time()
vrs_ids = [ga4gh_identify(models.Allele(**allele_dict)) \
            for i, (_, allele_dict) in enumerate(allele_dicts.items()) \
            if i >= id_start and i < id_end]
print(f"{id_end-id_start} ids: {(time()-t):.2f} s")

# number of workers
worker_count = 4*os.cpu_count()

# ping metakb
print("pinging metakb...")
t = time()
hits = parallelize(meta_kb, vrs_ids, worker_count=worker_count, \
    progress_interval=progress_interval)
print(f"metakb: {(time()-t):.2f} s")

with open(metakb_output_pkl, 'wb') as file:
    pickle.dump(hits, file)

print("\nhits to ids queried...")
total = num_ids_limit if num_ids_limit else len(vrs_ids)
print_percent(len(hits), total)

## Split before annotate

In [None]:
! (seq 1 22; echo X; echo Y) | xargs -P 0 -I PATH $VCFTOOLS --recode --vcf "/home/jupyter/vcf/1KGP_haplotype_caller_NA12878.chr10.hc.vcf" --chr chrPATH --out $SPLIT_DIR/chrPATH

In [None]:
vcf_path = drs_vcfs[0]

! rm -r $SPLIT_DIR
split_vcf_cmd = f"(seq 1 22; echo X; echo Y) | \
               xargs -P 0 -I PATH $VCFTOOLS --recode --gzvcf {vcf_path} \
               --chr chrPATH --out $SPLIT_DIR/chrPATH"

output = subprocess.run(split_vcf_cmd, shell=True, check=True)
# output = subprocess.run(split_vcf_cmd, shell=True, check=True, \
#                         capture_output=True, text=True) 

# no chr prefix
# ! (seq 1 22; echo X; echo Y) | xargs -P 0 -I PATH ~/vcftools-vcftools-d511f46/src/cpp/vcftools --recode --vcf $VCF_PATH --chr PATH --out ~/split/chrPATH

In [None]:
# TODO: parse logs to get outputs on how many were filtered out
# get total num_variants

def get_num_variants(path):
    vcf_reader = pysam.VariantFile(open(path, 'r'))
    return sum(1 for record in vcf_reader)

split_vcf_paths = glob.glob(f"{os.environ.get('SPLIT_DIR')}/*.recode.vcf")
     
input_num_variants = get_num_variants(vcf_path[:-3])
split_num_variants = sum(get_num_variants(path) for path in split_vcf_paths)

print(f"{split_num_variants}/{input_num_variants} = ", \
      f"{100*split_num_variants/input_num_variants:.2f}% kept")

In [None]:
ls -l $SPLIT_DIR/*.recode.vcf | wc -l

In [None]:
# annotate each of them
# TODO: fix the outputs coming from this

! (ls -1 $SPLIT_DIR/*.recode.vcf | \
   xargs -P 0 -I PATH python3 -m ga4gh.vrs.extras.vcf_annotation \
   --vcf_in PATH --vcf_out PATH.vcf.gz --vrs_pickle_out PATH.pkl \
   --seqrepo_root_dir $SEQREPO_ROOT/latest \
   2> $SPLIT_DIR/chrPATH_log.txt)

# # GREGoR
# !python3 -m ga4gh.vrs.extras.vcf_annotation --vcf_in 1369747.merged.matefixed.sorted.markeddups.recal.g.vcf  --vcf_out 1369747.merged.matefixed.sorted.markeddups.recal.g.vcf.output.vcf.gz --vrs_pickle_out 1369747.merged.matefixed.sorted.markeddups.recal.g.vcf.vrs_objects.pkl  --seqrepo_root_dir ~/seqrepo/latest/

In [None]:
!ls -l $SPLIT_DIR/*.vcf.vcf.gz | wc -l
!ls -l $SPLIT_DIR/*.vcf.pkl | wc -l

# assert (!ls -l ~/split/*.vcf.vcf.gz | wc -l) == 24, "incorrect number of output vcf.gz files created"
# assert (!ls -l ~/split/*.vcf.pkl | wc -l) == 24, "incorrect number of outputted pickle files"

In [None]:
# join the files
!ls -1 $SPLIT_DIR/*.vcf.vcf.gz | xargs $PERL5LIB/vcf-concat > $OUTPUT/merged_output.vcf

In [None]:
!ls $OUTPUT_DIR

In [None]:
# TODO: remove the pair of them

### Random python annotate

In [None]:
import logging

logger = logging.getLogger("ga4gh.vrs.extras.vcf_annotation")
logger.setLevel(level=logging.INFO)

# create annotated vcf test file 
def annotate_vcf(path):
    '''param stem: path of input vcf file'''
    stem = path.replace(".vcf", "")
    
    input_vcf = path
    output_vcf = f"{stem}.output.vcf.gz"
    output_pkl = f"{stem}-vrs-objects.pkl"

    
    vcf_annotator = VCFAnnotator(seqrepo_root_dir="/home/jupyter/seqrepo/latest")
    vcf_annotator.annotate(vcf_in=input_vcf, vcf_out=output_vcf, vrs_pickle_out=output_pkl)
    # vcf_annotator.annotate(vcf_in=input_vcf, vrs_pickle_out=output_pkl)
    
# annotate_vcf("/home/jupyter/split", "chr1.recode")
successes = set()
for vcf_path in drs_vcfs:
    try:
        print("trying...", vcf_path)
        annotate_vcf(vcf_path)
        print("worked \n")
        successes.add(vcf_path)
    except Exception as e:
        print(e)
        print("unsucessful, see logs above \n")

print(f"total successes: {len(successes)}/{len(drs_vcfs)} \nList...")
for vcf_path in drs_vcfs:
    print(f"{vcf_path}: {'✓' if vcf_path in successes else 'x'}")

In [None]:
# annotate w vrs id asking for output vcf

import logging

logger = logging.getLogger("ga4gh.vrs.extras.vcf_annotation")
logger.setLevel(level=logging.INFO)

# create annotated vcf test file 
def annotate_vcf(path):
    '''param stem: path of input vcf file'''
    stem = path.replace(".vcf", "")
    
    input_vcf = path
    output_vcf = f"{stem}.output.vcf.gz"
    output_pkl = f"{stem}-vrs-objects.pkl"

    
    vcf_annotator = VCFAnnotator(seqrepo_root_dir="/home/jupyter/seqrepo/latest")
    vcf_annotator.annotate(vcf_in=input_vcf, vcf_out=output_vcf, vrs_pickle_out=output_pkl)
    # vcf_annotator.annotate(vcf_in=input_vcf, vrs_pickle_out=output_pkl)
    
# annotate_vcf("/home/jupyter/split", "chr1.recode")
successes = set()
for vcf_path in drs_vcfs:
    try:
        print("trying...", vcf_path)
        annotate_vcf(vcf_path)
        print("worked \n")
        successes.add(vcf_path)
    except Exception as e:
        print(e)
        print("unsucessful, see logs above \n")

print(f"total successes: {len(successes)}/{len(drs_vcfs)} \nList...")
for vcf_path in drs_vcfs:
    print(f"{vcf_path}: {'✓' if vcf_path in successes else 'x'}")

In [None]:
for vcf_path in drs_vcfs:
    if "HG02080vCHM13_20200921" in vcf_path:
        print(vcf_path)
    else:
        continue
#     if "chm13_hifi_HG007" in vcf_path:
#         print("trying...", vcf_path)
#         annotate_vcf(vcf_path)
#         print("worked \n")
    try:
        print("trying...", vcf_path)
        annotate_vcf(vcf_path)
        print("worked \n")
        successes.add(vcf_path)
    except Exception as e:
        print(e)
        print("unsucessful, see logs above \n")

In [None]:
# annotate w vrs id only pickle outputted

logger = logging.getLogger("ga4gh.vrs.extras.vcf_annotation")
# logger.setLevel(level=logging.ERROR)
logger.disabled = True

# create annotated vcf test file 
def annotate_vcf_pkl_only(path):
    '''param stem: path of input vcf file'''
    stem = path.replace(".vcf", "")
    
    input_vcf = path
    output_vcf = f"{stem}.output.vcf.gz"
    output_pkl = f"{stem}-vrs-objects.pkl"

    
    vcf_annotator = VCFAnnotator(seqrepo_root_dir="/home/jupyter/seqrepo/latest")
    vcf_annotator.annotate(vcf_in=input_vcf, vrs_pickle_out=output_pkl)
    
successes = set()
for i, vcf_path in enumerate(drs_vcfs):
    print("starting... \n")
    # annotate to output pkl
    try:
        print("trying...", vcf_path)
        annotate_vcf_pkl_only(vcf_path)
        print("worked \n")
        successes.add(vcf_path)
    except Exception as e:
        print(e)
        print("unsucessful, see logs above \n")
    
    # get pickle totals
    try:
        with open(output_pkl, 'rb') as f:
            vrs_objects = pickle.load(f)

        # get total num_variants
        vcf_reader = vcf.Reader(open(vcf_path, 'r'))
        num_variants = sum(1 for record in vcf_reader)

        # view details
        print(f'num_vrs_objects to num_varaints: {len(vrs_objects)}/{num_variants}={(len(vrs_objects)/num_variants):.2f}%')
    except:
        print("unable to get pickle totals, file may not exist")
    print()

print(f"total successes: {len(successes)}/{len(drs_vcfs)} \nList...")
for vcf_path in drs_vcfs:
    print(f"{vcf_path}: {'✓' if vcf_path in successes else 'x'}")

In [None]:
import vcf


# for input_vcf_file in ["/home/jupyter/vcf/long_read_sv_jasmine_Trios_IndividualCallsets_CHM13_HG005_Trio_HG006vCHM13_20200921_mm2_PBCCS_sniffles.s2l20.refined.nSVtypes.ism.vcf"]:
for input_vcf_file in ["/home/jupyter/vcf/long_read_minimap2_alignments_HG02080vCHM13_20200921_mm2_ONT_sniffles.s2l20.refined.nSVtypes.ism.vcf"]:
    output_vcf_file = "/home/jupyter/vcf/long_read.test.vcf"

    vcf_reader = vcf.Reader(open(input_vcf_file, 'r'))
    vcf_writer = vcf.Writer(open(output_vcf_file, 'w'), vcf_reader)

    for record in vcf_reader:
        record.INFO['VRS_ALLELE_ID'] = 'ga4gh:VA.xksahgfowdfdwofd,ga4gh:VA.xksahgfowdfdwofd'
        vcf_writer.write_record(record)

vcf_writer.close()

### show loaded files

In [None]:
# from pprint import pprint
# import pickle
# import ast
# import requests
# import datetime

# # log progress
# progress_interval = 50000

# # load pickled dict
# with open(output_pkl, 'rb') as f:
#     print(datetime.datetime.now().isoformat(), 'opened pickle')
#     vrs_objects = pickle.load(f)
#     c = 0
#     for k, v in vrs_objects.items():
#         vrs_objects[k] = ast.literal_eval(v)
#         c += 1
#         if c % progress_interval == 0:
#             print(datetime.datetime.now().isoformat(), c)

# # view details        
# print('number of vrs objects', len(vrs_objects))

In [None]:
pickle_paths = !ls -1 ~/split/*.vcf.pkl
pickle_paths

In [None]:
# get percent of loaded variants

# load pickled dict
# for vcf_path in drs_vcfs:

def unpickle_generator(file_name):
    """Unpickle vrs objects, yields (key,vrs_object)"""
    with open(file_name, 'rb') as f:
        vrs_objects = pickle.load(f)
        for k, v in vrs_objects.items():
            yield k, ast.literal_eval(v)
            
def unpickle(file_name):
    """Unpickle vrs objects to single dict"""
    with open(file_name, 'rb') as f:
        vrs_objects = pickle.load(f)
        for k, v in vrs_objects.items():
            vrs_objects[k] = ast.literal_eval(v)
    
    return vrs_objects

vrs_dicts = []

total_num_vrs_objs = 0

for path in pickle_paths:
    vrs_dict = unpickle(path)
    vrs_dicts.append(vrs_dict)

    # get total num_variants
    # TODO: reference the new merged file bc some might have been filtered out
    vcf_reader = vcf.Reader(open(path[:-4], 'r'))
    num_variants = sum(1 for record in vcf_reader)

#     num_vrs_objs = sum((1 for _ in vrs_objects))
    num_vrs_objs = len(vrs_dict)
    total_num_vrs_objs += num_vrs_objs

    # view details
    
    print(path.split("/")[-1], end=" ")
    if num_variants == 0: 
        print(f"no variants") 
    else:
        print(f'vrs_objects:variants = {num_vrs_objs}/{num_variants} = {(50*num_vrs_objs/num_variants):.1f}%')

total_variants = get_num_variants(vcf_path)
        
print(f"Totals: {total_num_vrs_objs}/{total_variants}", \
      f"= {(50*total_num_vrs_objs/total_variants):.2f}%")
        
# TODO on combining: have to think about this more bc large files will have to be held in memory

In [None]:
# error reporting from logs
num_val_errors = !(grep "raise ValidationError(err_msg)" $HOME/log.txt | wc -l)
num_val_errors = int(num_val_errors[0])
print(f"validations errors = {num_val_errors}, ie {50*num_val_errors/total_variants:.1f}%", \
      " if 2:1 VRS ID to variant")

num_invalid_files = !(grep "\[E::vcf_format\] Invalid BCF" $HOME/log.txt | wc -l)
num_invalid_files = int(num_invalid_files[0])
print(f"num invalid files: {num_invalid_files}")

# Query remote services

## MetaKB (cancervariants.org)

In [None]:
import json 

from biocommons.seqrepo import SeqRepo
from ga4gh.core import ga4gh_identify
from ga4gh.vrs import models
from ga4gh.vrs.dataproxy import SeqRepoDataProxy
from ga4gh.vrs.extras.translator import AlleleTranslator


def meta_kb(item: tuple, translator=None, fmt=None, recent=True):
    """Query metakb using vrs object"""
    k, allele_dict = item
    
    if translator is not None:
        print(f"by {fmt}...")
#         print(json.dumps(allele_dict, indent=2))
        
        allele = models.Allele(**allele_dict)
        _id = translator.translate_to(allele, fmt)
        
#         seq_ref = models.SequenceReference(**allele_dict["sequenceReference"])
#         location = models.SequenceLocation(**allele_dict["location"])
#         state = models.LiteralSequenceExpression(sequence=ins_seq)
#         allele = models.Allele(location=location, state=state)
#         allele = self._post_process_imported_allele(allele)


    else:
        print("by vrs id...")
        _id = allele_dict["id"]
        
        
    if recent:
        print("recent elasticbeanstalk api (VRS 2.0 models)")
        response = requests.get("http://metakb-dev-eb.us-east-2.elasticbeanstalk.com",
                                f"/api/v2/search/studies?variation={_id}&detail=false")
    else:
        print("old api (VRS 1.3 models)")
        response = requests.get("https://dev-search.cancervariants.org" \
                                f"/api/v2/search?variation={_id}&detail=false")
    
    response_json = response.json()
    
    if response_json['warnings'] == []:
        summary = {}
        summary['description'] = response_json['statements'][0]['description']
        return (k, _id, summary)

#########################
# setup translator
seqrepo_root_dir = f"{os.environ['SEQREPO_ROOT']}/latest"
data_proxy = SeqRepoDataProxy(SeqRepo(seqrepo_root_dir))
translator = AlleleTranslator(data_proxy)
        
for vrs_dict in vrs_dicts:
    hits = []
    
    for obj in vrs_dict.items(): 
        try:
            potential_hit = meta_kb(obj, translator, fmt="hgvs")
        except:
            continue
        if potential_hit:
            print(f"\n ~~~~~~~~ hit! {potential_hit} ~~~~~~~~~~ \n")
            hits.append(potential_hit)
    
    if len(vrs_dict) == 0:
        continue

    hit_rate = len(hits)/len(vrs_dict)    
    print(f"hit rate of VRS IDs: {len(hits)}/{len(vrs_dict)}={100*hit_rate:.1f}%" )
    if len(hits) > 0:
        print("first few hits")
        print(hits[:3])

In [None]:
for vrs_dict in vrs_dicts[:1]:
    hits = []
    
    for i, (k, v) in enumerate(vrs_dict.items()):
        print(v["id"])
        if i > 10: break

In [None]:
def meta_kb_by_sequence(item: tuple):
    """Query metakb"""
    k, _ = item
    

    response = requests.get(f"https://dev-search.cancervariants.org/api/v2/search?variation={}&detail=false")
    response_json = response.json()
    
    print(response_json['warnings'])
    if response_json['warnings'] == []:
        summary = {}
        summary['description'] = response_json['statements'][0]['description']
        return (k, _['_id'], summary)

print("trying old link")
for vrs_dict in vrs_dicts:
    hits = []
    
    for obj in vrs_dict.items():
        potential_hit = meta_kb_by_sequence(obj)
        if potential_hit:
            hits.append(potential_hit)
    
    if len(vrs_dict) == 0:
        print("no variants, skipping...")
        continue

    hit_rate = len(hits)/len(vrs_dict)    
    print(f"hit rate of VRS IDs: {len(hits)}/{len(vrs_dict)}={100*hit_rate:.1f}%" )
    if len(hits) > 0:
        print("first few hits")
        print(hits[:3])

In [None]:
def decorate(vrs_decorator, vrs_objects, limit=20):
    """harvest data from service"""

    # log progress
    progress_interval = 1000


    # number of workers
    worker_count = 12

    with multiprocessing.Pool(worker_count) as pool:
        # call the function for each item in parallel
        c = 0
        print(datetime.datetime.now().isoformat(), c)
        for result in pool.imap(vrs_decorator, vrs_objects.items()):
            c += 1
            if result:
                print(result[0], result[-1])
            if c == limit:
                break
            if c % progress_interval == 0:
                print(datetime.datetime.now().isoformat(), c)

    print(datetime.datetime.now().isoformat(), c)
            

In [None]:
decorate(vrs_decorator=meta_kb, vrs_objects=metakb_vrs_objects)

## ClinGen (clinicalgenome.org)

In [None]:
def clingen(item: tuple):
    """Query clingen (old version of normalizer)"""
    k, _ = item
    _id = _['_id'].split(':')[-1].split('.')[-1]
    response = requests.get(f"https://reg.genome.network/vrs-map/digest/vrs/{_id}")
    response_json = response.json()
    if response_json['status']['code'] == 200:
        iri_response = requests.get(response_json['data']['iri'])
        iri_response_json = iri_response.json()
        return (k, _['_id'], {'communityStandardTitle': iri_response_json['communityStandardTitle']})


> Note: at this time, there is a schema mismatch between vrs-python, metakb and clingen. We will use known identifiers. Normally the annotated identifiers from the variants of interest (vcf) would be used

In [None]:
decorate(vrs_decorator=clingen, vrs_objects=clingen_vrs_objects)