In [1]:
import pandas as pd
import vcf
import gzip
from itertools import chain

In [2]:


class VCFParser:
    def __init__(self, vcf_file, vcf_type="vardict", is_annotated=False):
        self.vcf_file = vcf_file
        self.vcf_type = vcf_type
        self.is_annotated = is_annotated
        self.df = None

    def parse_ann(self, ann_str):
        ann_list = []
        for ann in ann_str.split(","):
            ann_fields = ann.split("|")
            if len(ann_fields) >= 12:
                ann_dict = {
                    "Gene_Name": ann_fields[3],
                    "Feature_ID": ann_fields[6],
                    "Transcript_BioType": ann_fields[7],
                    "Annotation": ann_fields[1],
                    "HGVS.c": ann_fields[9],
                    "HGVS.p": ann_fields[10]
                }
                ann_list.append(ann_dict)
        return ann_list

    def parse_info(self, info_str):
        info_dict = {}
        for item in info_str.split(";"):
            key, value = item.split("=", 1)
            info_dict[key] = value
        return info_dict

    def parse_sample(self, sample_str, format_fields):
        sample_values = sample_str.split(":")
        sample_dict = dict(zip(format_fields, sample_values))
        return sample_dict

    def _read_vcf_lines(self):
        with gzip.open(self.vcf_file, "rt") if self.vcf_file.endswith(".gz") else open(self.vcf_file, "r") as file:
            lines = file.readlines()
        data_lines = []
        for line in lines:
            if not line.startswith("#"):
                data_lines.append(line.strip().split("\t"))
        return data_lines, lines

    def _get_header(self, lines):
        header = [h.strip("#") for h in lines if h.startswith("#CHROM")][0].strip().split("\t")
        return header

    def _get_num_samples(self, header):
        sample_names = header[9:]
        num_samples = len(sample_names)
        return num_samples
    def _get_new_cols(self, num_samples):
        cols_base = ["CHROM", "POS", "REF", "ALT", "FILTER"]
        cols_snvcall = ["STATUS", "TYPE", "MSI", "MSILEN"]
        cols_annotate = ["Gene_Name", "Feature_ID", "Transcript_BioType", "Annotation", "HGVS.c", "HGVS.p"]
        cols_paired = ["case_DP", "case_VD", "case_AF", "case_QUAL", "ctrl_DP", "ctrl_VD", "ctrl_AF", "ctrl_QUAL"]
        cols_single = ["DP", "VD", "AF", "QUAL"]

        if (self.is_annotated == True) and (num_samples == 2):
            new_cols = cols_base+cols_snvcall+cols_annotate+cols_paired
        elif (self.is_annotated == True) and (num_samples == 1):
            new_cols = cols_base+cols_snvcall+cols_annotate+cols_single
        elif  (self.is_annotated == False) and (num_samples == 2):
            new_cols = cols_base+cols_snvcall+cols_paired
        elif  (self.is_annotated == False) and (num_samples == 1):
            new_cols = cols_base+cols_snvcall+cols_single
        return new_cols
            
    def _process_vcf(self, df, num_samples):
        df["INFO"] = df["INFO"].apply(self.parse_info)

        df["STATUS"] = df["INFO"].apply(lambda x: x.get("STATUS", ""))
        df["TYPE"] = df["INFO"].apply(lambda x: x.get("TYPE", ""))
        df["MSI"] = df["INFO"].apply(lambda x: float(x.get("MSI", 0)))
        df["MSILEN"] = df["INFO"].apply(lambda x: float(x.get("MSILEN", 0)))
        format_fields = df["FORMAT"].iloc[0].split(":")
        
        if self.is_annotated == True:
            df["ANN"] = df["INFO"].apply(lambda x: x.get("ANN", ""))
            df["ANN"] = df["ANN"].apply(self.parse_ann)
            print(len(df))
            df = df.explode("ANN", ignore_index=True)
            print(len(df))
            df["Gene_Name"] = df["ANN"].apply(lambda x: x.get("Gene_Name"))
            df["Feature_ID"] = df["ANN"].apply(lambda x: x.get("Feature_ID"))
            df["Transcript_BioType"] = df["ANN"].apply(lambda x: x.get("Transcript_BioType"))
            df["Annotation"] = df["ANN"].apply(lambda x: x.get("Annotation"))
            df["HGVS.c"] = df["ANN"].apply(lambda x: x.get("HGVS.c"))
            df["HGVS.p"] = df["ANN"].apply(lambda x: x.get("HGVS.p"))

        if num_samples == 2:
            df = self._process_paired_samples(df, format_fields)
        elif num_samples == 1:
            df = self._process_single_sample(df, format_fields)
        
        new_cols = self._get_new_cols(num_samples)
        df = df[new_cols]
        
        return df

    def _process_paired_samples(self, df, format_fields):
        sample_names = df.columns[9:]
        df["case"] = df.apply(lambda row: self.parse_sample(row[sample_names[0]], format_fields), axis=1)
        df["ctrl"] = df.apply(lambda row: self.parse_sample(row[sample_names[1]], format_fields), axis=1)
        df["case_DP"] = df["case"].apply(lambda x: int(x.get("DP", 0)))
        df["case_VD"] = df["case"].apply(lambda x: int(x.get("VD", 0)))
        df["case_AF"] = df["case"].apply(lambda x: float(x.get("AF", 0)))
        df["case_QUAL"] = df["case"].apply(lambda x: float(x.get("QUAL", 0)))
        df["ctrl_DP"] = df["ctrl"].apply(lambda x: int(x.get("DP", 0)))
        df["ctrl_VD"] = df["ctrl"].apply(lambda x: int(x.get("VD", 0)))
        df["ctrl_AF"] = df["ctrl"].apply(lambda x: float(x.get("AF", 0)))
        df["ctrl_QUAL"] = df["ctrl"].apply(lambda x: float(x.get("QUAL", 0)))

        return df

    def _process_single_sample(self, df, format_fields):
        sample_name = df.columns[9]
        df["sample"] = df.apply(lambda row: self.parse_sample(row[sample_name], format_fields), axis=1)
        df["DP"] = df["sample"].apply(lambda x: int(x.get("DP", 0)))
        df["VD"] = df["sample"].apply(lambda x: int(x.get("VD", 0)))
        df["AF"] = df["sample"].apply(lambda x: float(x.get("AF", 0)))
        df["QUAL"] = df["INFO"].apply(lambda x: x.get("QUAL"))

        return df

    def vcf2dataframe(self):
        data_lines, lines = self._read_vcf_lines()
        header = self._get_header(lines)
        num_samples = self._get_num_samples(header)

        # Creating the DataFrame
        df = pd.DataFrame(data_lines, columns=header)

        if  self.vcf_type == "vardict":
            df = self._process_vcf(df, num_samples)
            # For non-annotated VCF, keep only the ID columns
        elif self.vcf_type == "dbsnp":
            cols_base = ["CHROM", "POS", "REF", "ALT", "ID"]
            df = df[cols_base]
            df["CHROM"] = "chr"+df["CHROM"].astype(str)
            
        else:
            print("vcf must be  format of vardict or dbsnp ")

        df["CHROM_POS"] = df["CHROM"] + ":" + df["POS"].astype(str)
        df["REF_ALT"] = df["REF"] + "_" + df["ALT"]
        df = df.set_index(["CHROM_POS", "REF_ALT"])

        return df

In [3]:
##test 
COAD_0009689008_vs_0012934312_vcf="/home/jovyan/work/10.data_CODA_ahslyy/05.SNVINDEL/COAD_0009689008_vs_0012934312.single_variant.vcf"
df_0009689008_vs_0012934312 = VCFParser(COAD_0009689008_vs_0012934312_vcf,is_annotated=False).vcf2dataframe()
COAD_0009689008_vs_0012934312_snpeff_vcf="/home/jovyan/work/10.data_CODA_ahslyy/05.SNVINDEL/COAD_0009689008_vs_0012934312.single_variant.snpeff.vcf"
df_0009689008_vs_0012934312_snpeff = VCFParser(COAD_0009689008_vs_0012934312_snpeff_vcf,is_annotated=True,vcf_type="vardict").vcf2dataframe()

1917
8025


In [4]:
##dbsnp input
vcf_dbsnp="/home/jovyan/work/00.database/14.dbSNP/All_20180418_in_WES.vcf.gz"
df_dbsnp = VCFParser(vcf_dbsnp,is_annotated=False,vcf_type="dbsnp").vcf2dataframe()

In [5]:
## run sample
samplename="0014215947"
sample_paired_vcf = "/home/jovyan/work/10.data_CODA_ahslyy/05.SNVINDEL/COAD_{0}.single_variant.snpeff.vcf".format(samplename)
df_paired = VCFParser(sample_paired_vcf,is_annotated=True,vcf_type="vardict").vcf2dataframe()

1236442
5051748


In [6]:
sample_normal_vcf="/home/jovyan/work/10.data_CODA_ahslyy/03.Result.SAKit2/{0}_N/results/08.SvInDelSnvCalling/COAD_{1}_N.single_variant.snpeff.vcf".format(samplename,samplename)
df_normal = VCFParser(sample_normal_vcf,is_annotated=True,vcf_type="vardict").vcf2dataframe()

909733
3755918


In [7]:
df_somatic = df_paired[df_paired["STATUS"]=="StrongSomatic"]
df_somatic_dpvdaf = df_somatic[(df_somatic["case_DP"] > 20) & 
                               (df_somatic["case_VD"] > 5) & 
                               (df_somatic["case_AF"] > 0.1) & 
                               (df_somatic["ctrl_DP"] > 20)]
df_somatic_dpvdaf_NM5 = df_somatic_dpvdaf[~(df_somatic_dpvdaf["FILTER"].str.contains("NM5.25") | 
                                            df_somatic_dpvdaf["FILTER"].str.contains("NM5.25"))]

print(len(df_paired))
print(len(df_somatic))
print(len(df_somatic_dpvdaf))
print(len(df_somatic_dpvdaf_NM5))

5051748
1045344
3213
2880


In [8]:
df_normal_dpvdaf = df_normal[(df_normal["DP"] > 10) & 
                              (df_normal["VD"] > 1) & 
                              (df_normal["AF"] > 0.01)]
normal_dpvdaf_index = df_normal_dpvdaf.index
df_somatic_dpvdaf_NM5_ctrlfiltered = df_somatic_dpvdaf_NM5.drop(normal_dpvdaf_index,errors="ignore")

In [9]:
dbsnp_index = df_dbsnp.index
df_somatic_dpvdaf_NM5_snp = df_somatic_dpvdaf_NM5.drop(dbsnp_index,errors="ignore")

In [10]:
df_somatic_dpvdaf_NM5_snpfiltered = df_somatic_dpvdaf_NM5_snp[df_somatic_dpvdaf_NM5_snp['HGVS.p'] != '']

In [11]:
df_somatic_dpvdaf_NM5_snpfiltered.to_csv("/home/jovyan/work/10.data_CODA_ahslyy/03.Result.SAKit2/{}_CA/results/08.SvInDelSnvCalling/COAD_{}_somatic_filtered.csv".format(samplename,samplename))

In [12]:
len(df_somatic_dpvdaf_NM5_snpfiltered)

136