In [None]:
import dxpy
import dxdata
import subprocess
import ast
import numpy as np
import pandas as pd
import glob 
import os

In [None]:
# Required dependencies 
pip install pybgen
pip install zstandard

In [None]:
#split string input of multiple SNPs
def split_input(string_input):
    if ':' in string_input:
        parts = string_input.split(':')
    elif '-' in string_input:
        parts = string_input.split('-')
    chrom = parts[0]
    pos = int(parts[1])
    ref = parts[2]
    alt = parts[3]
    if len(parts) > 4:
        rsnum = parts[4]
        return chrom,pos,ref,alt,rsnum
    else:
        return chrom, pos, ref, alt


In [None]:
# Get Whole Exome Sequencing Dosages (per chromosome)
def getWESPath(chrom):
    wes_path = '/mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, BGEN format - final release/ukb23159_c' +chrom[3:]+ '_b0_v1.bgen'
    return wes_path

In [None]:
# Get TOPMED imputed dosages (per chromosome)
def getImputedPath(chrom):
    imputed_path = '/mnt/project/Bulk/Imputation/Imputation from genotype (TOPmed)/ukb21007_c' + chrom[3:] + '_b0_v1.bgen'
    return imputed_path

In [None]:
# Return Sample Dataframe for Imputed dosages
def getSampleDF(chrom):
    sample_path = '/mnt/project/Bulk/Imputation/Imputation from genotype (TOPmed)/ukb21007_c' + chrom[3:] + '_b0_v1.sample'
    sample_df = pd.read_csv(sample_path, sep=' ')
    sample_df = sample_df.drop(index=0)
    sample_df = sample_df.drop(['missing', 'sex'], axis=1)
    sample_df = sample_df.reset_index(drop=True)
    sample_df = sample_df.rename(columns={"ID_1": "FID", "ID_2": "IID"}, errors="raise")
    return sample_df

In [None]:
# Return Sample Dataframe for whole exome sequencing dosages
def getWESSampleDF(chrom):
    sample_path = '/mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, BGEN format - final release/ukb23159_c' + chrom[3:] + '_b0_v1.sample'
    sample_df = pd.read_csv(sample_path, sep=' ')
    sample_df = sample_df.drop(index=0)
    sample_df = sample_df.drop(['missing', 'sex'], axis=1)
    sample_df = sample_df.reset_index(drop=True)
    sample_df = sample_df.rename(columns={"ID_1": "FID", "ID_2": "IID"}, errors="raise")
    return sample_df

In [None]:
# Dosage Extraction Function 
from pybgen import PyBGEN
def getVariantDosagesfromSnp(interestingSNPs):
    sample_df = None
    for snpOfChoice in interestingSNPs:
        chrom, pos, ref, alt, snp = split_input(snpOfChoice)
        imputed_path = getImputedPath(chrom)
        wes_path = getWESPath(chrom)
        chromNum = chrom[3:]
        if chromNum == 'X':
            chromNum = 23
        with PyBGEN(imputed_path) as bgen: # Change to wes_path if using whole exome sequencing data
            try:
                variant = bgen.get_specific_variant(chrom, pos, ref, alt) #Use chromNum if WES, chrom if imputed
            except ValueError:
                print(f"Variant {snp} not found. Skipping.")
                continue 
        if sample_df is None:
            sample_df = getWESSampleDF(chrom)
        chromosome = chrom 
        position = str(pos)
        temp_df = pd.DataFrame(variant[0][1], columns= [snp])
        dosage_df = pd.concat([sample_df, temp_df], axis=1)
        sample_df = dosage_df
    return dosage_df

In [None]:
from pybgen import PyBGEN
# Converts a list of chr, pos, rsnum to a list of variants with alleles found in the bgen file 
# Specifically looks for all variants with dosage information available in given location including multiallelic variants, can ignore if chr:pos:ref:alt:rsnum is known 

def getVariants(snplocs):
    newsnps = []
    foundsnps = []
    for snp in snplocs:
        chrom, pos, rsnum = snp.split(':')
        imputed_path = getImputedPath(chrom)
        wes_path = getWESPath(chrom)
        chromNum = chrom[3:]
        if chromNum == 'X':
            chromNum = 23
        with PyBGEN(imputed_path) as bgen: 
            val = bgen.iter_variants_in_region(chrom, pos, pos)
            for v in val:
                print(np.nansum(v[1]))
                ref = (str(v[0]).split('_')[1].split('/')[0])
                alt = (str(v[0]).split('_')[1].split('/')[1]).strip('>')
                combined = ':'.join([chrom, pos, ref, alt, rsnum])
                found = ':'.join([chrom, pos, rsnum])
                #print(combined)
                newsnps.append(combined)
                foundsnps.append(found)
    missedsnps = list(set(snplocs) - set(foundsnps))
    for bad in missedsnps:
        print(bad + ' not found')
        
    return newsnps
            

In [None]:
#ADRB1 Example

ADRB1_snps = ['chr10:114044277:A:G:rs1801252', 
             'chr10:114045297:G:C:rs1801253']

multipleSNPdosages = getVariantDosagesfromSnp(ADRB1_snps)
multipleSNPdosages