# Variant Analysis

This Jupyter notebook is designed for filtration of common variants and analysis as outlibed by the steps in my [github repository](https://github.com/Intro-Sci-Comp-UIowa/biol-4386-course-project-tvarovski). This notebook will be using spark distributred computing environment for faster computation.

## Installing Dependencies and Importing Libraries

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CLC_mutations").getOrCreate()

#!pip install scikit-allel[full]

In [None]:
import numpy as np
import pandas as pd
import functools
from pyspark.sql.functions import lit, udf, struct
from pyspark.sql.types import StringType
#import allel

## Loading Datasets

In [None]:
# If the vxf file is not converted into a a "normal" table do it here, but better 
# to do it via GATK's tools if possible

fields=['CHROM', 'POS', 'DP','AD', 'AF', 'REF', 'ALT','INFO', 'ID']

allel.vcf_to_csv('/content/SRR4047707_mutect.vcf', 'mutect_07.csv', fields=fields)
allel.vcf_to_csv('/content/SRR4047717_mutect.vcf', 'mutect_17.csv', fields=fields)
allel.vcf_to_csv('/content/SRR4047722_mutect.vcf', 'mutect_22.csv', fields=fields)
allel.vcf_to_csv('/content/SRR4047723_mutect.vcf', 'mutect_23.csv', fields=fields)

#allel.vcf_to_csv('.vcf', 'haplotype_07.csv', fields=fields)
#allel.vcf_to_csv('.vcf', 'haplotype_15.csv', fields=fields)
#allel.vcf_to_csv('.vcf', 'haplotype_17.csv', fields=fields)
#allel.vcf_to_csv('.vcf', 'haplotype_22.csv', fields=fields)
#allel.vcf_to_csv('.vcf', 'haplotype_23.csv', fields=fields)


Load csv files into Data Frames; insert DF objects to lists

In [None]:
df_mutect_07 = spark.read.csv('/content/drive/MyDrive/Sani/out07.tsv', header=True, inferSchema=True, sep='\t')
df_mutect_17 = spark.read.csv('/content/drive/MyDrive/Sani/out17.tsv', header=True, inferSchema=True, sep='\t')
df_mutect_22 = spark.read.csv('/content/drive/MyDrive/Sani/out22.tsv', header=True, inferSchema=True, sep='\t')
df_mutect_23 = spark.read.csv('/content/drive/MyDrive/Sani/out23.tsv', header=True, inferSchema=True, sep='\t')

df_haplotype_07 = spark.read.csv('/content/drive/MyDrive/Sani/haplotype_table_07.tsv', header=True, inferSchema=True, sep='\t')
df_haplotype_15 = spark.read.csv('/content/drive/MyDrive/Sani/haplotype_table_15.tsv', header=True, inferSchema=True, sep='\t')
df_haplotype_17 = spark.read.csv('/content/drive/MyDrive/Sani/haplotype_table_17.tsv', header=True, inferSchema=True, sep='\t')
df_haplotype_22 = spark.read.csv('/content/drive/MyDrive/Sani/haplotype_table_22.tsv', header=True, inferSchema=True, sep='\t')
df_haplotype_23 = spark.read.csv('/content/drive/MyDrive/Sani/haplotype_table_23.tsv', header=True, inferSchema=True, sep='\t')

In [50]:
df_common_somatic = spark.read.csv('/content/common_somatic_alfa.txt', header=True, inferSchema=True, sep='\t')

In [None]:
df_common_somatic.show()
df_common_somatic.describe().show()

In [None]:
mutect_df_list = [df_mutect_07, df_mutect_17, df_mutect_22, df_mutect_23]
haplotype_df_list = [df_haplotype_15,df_haplotype_07, df_haplotype_17, df_haplotype_22, df_haplotype_23]

In [None]:
#describing All Variant Stats to have an idea what we are dealing with
for i in mutect_df_list:
  #first show first few rows
  i.show()
  #describe and show basic stats
  i.describe().show()

for i in haplotype_df_list:
  i.show()
  i.describe().show()

## Preparing DataFrames for Filtering

Select and Rename DataFrame Columns

In [None]:
# cut out the normal columns (I think they are not relevant)
remove_normal_name = lambda col: 'h4.lib1' in col

# rename sample columns so that they are consistent across all samples
renaming_AF = lambda col: 'AF' in col
renaming_AD = lambda col: 'AD' in col

for i in range(len(mutect_df_list)):
  mutect_df_list[i] = mutect_df_list[i].drop(*filter(remove_normal_name, mutect_df_list[i].columns))
  mutect_df_list[i] = mutect_df_list[i].withColumnRenamed(*filter(renaming_AF, mutect_df_list[i].columns), 'AF')
  mutect_df_list[i] = mutect_df_list[i].withColumnRenamed(*filter(renaming_AD, mutect_df_list[i].columns), 'AD')

for i in range(len(haplotype_df_list)):
  haplotype_df_list[i] = haplotype_df_list[i].withColumnRenamed(*filter(renaming_AF, haplotype_df_list[i].columns), 'AF')
  haplotype_df_list[i] = haplotype_df_list[i].withColumnRenamed(*filter(renaming_AD, haplotype_df_list[i].columns), 'AD')

In [None]:
#describing All Variant Stats to check if renaming worked

for i in mutect_df_list:
  #first show first few rows
  i.show()

for i in haplotype_df_list:
  i.show()

## Quality and SNP Filtering

In [None]:
for i in range(len(mutect_df_list)):
  mutect_df_list[i] = mutect_df_list[i].filter(
               mutect_df_list[i].TYPE == "SNP").filter(
               (mutect_df_list[i].AF >= 0.45) & (mutect_df_list[i].AF <= 0.55) | (mutect_df_list[i].AF >= 0.90))

for i in range(len(haplotype_df_list)):
  haplotype_df_list[i] = haplotype_df_list[i].filter(
               haplotype_df_list[i].TYPE == "SNP").drop(haplotype_df_list[i].AF)

## Subtraction of Common Variants Between Samples

Maybe should do it by position only?

In [None]:
# Working on this, it will be usefull for the HaplotypeCaller Data
subtracted_df_list = []

for df in haplotype_df_list:
# this will subtract all common rows between normal/control and experimental
# sample. I might need to revist to filter variants by position only not all columns
  temp_df = df
  for df_2 in haplotype_df_list:
    if df != df_2:
      temp_df = temp_df.select('CHROM','POS','REF','ALT').subtract(df_2.select('CHROM','POS','REF','ALT')) #,'REF','ALT'
  subtracted_df_list.append(temp_df)

In [None]:
for i in subtracted_df_list:
  i.describe().show()

Take into account only positions that are common between HaplotypeCaller and Mutect2

In [53]:
intersect_df = []
for i in range(4):
  temp_df = mutect_df_list[i].select('CHROM','POS','REF','ALT').intersect(subtracted_df_list[i+1])
  intersect_df.append(temp_df)

In [None]:
SNPs_removed = []
for df in intersect_df:
  df = df.join(df_common_somatic, ["POS"], "leftanti")
  SNPs_removed.append(df)

for df in SNPs_removed:
  #df.show()
  df.describe().show()

## Preparing the Data for plotting

In [56]:
# Create a user define function (UDF) to work on multiple columns to extract 
# the mutation spectra information

#define a function with logic for the mutation type assignment
def findType(colA, colB):
  if ((colA == 'C') & (colB == 'T') | (colA == 'G') & (colB == 'A')):
    return('C_to_T')
  if ((colA == 'C') & (colB == 'A') | (colA == 'G') & (colB == 'T')):
    return('C_to_A')
  if ((colA == 'C') & (colB == 'G') | (colA == 'G') & (colB == 'C')):
    return('C_to_G')
  if ((colA == 'T') & (colB == 'C') | (colA == 'A') & (colB == 'G')):
    return('T_to_C')
  if ((colA == 'T') & (colB == 'G') | (colA == 'A') & (colB == 'C')):
    return('T_to_G')
  if ((colA == 'T') & (colB == 'A') | (colA == 'A') & (colB == 'T')):
    return('T_to_A')

#this function applies the findType udf onto a target dataframe
def applyType(df_list): 
  for i in range(len(df_list)):
    df_list[i] = df_list[i].withColumn('Type', createType(df_list[i].REF,df_list[i].ALT))
  return df_list


# apply pySpark's udf to the custom function 
createType = udf(findType, StringType())

#add a column containing the output of the udf
SNPs_removed = applyType(SNPs_removed)

In [None]:
#check if the adding columns worked
for i in intersect_df:
  i.show()

These columns now can be taken and used for plotting in excel (doing stacked percent bar charts in python is weirdly difficult)

In [None]:
# Count all occurences of each type of mutation and 
# display the resulting summary table for each DF
for i in SNPs_removed:
  i.groupBy("Type").count().show()