In [None]:
import pandas as pd
import numpy as np
import altair as alt
from natsort import natsorted
from scipy.stats import gaussian_kde
from pathlib import Path

In [None]:
file = '../Data/20250423_BARD1_snvscores_IGVFupload.tsv'
scores_path = '/Users/ivan/Downloads/all_sge'
output_folder = '/Users/ivan/Desktop/batch_sge_figs/'
output_format = '.png'

In [None]:
def get_files(path):
    scores = Path(path)

    files = sorted(list(scores.glob('*tsv')))

    return files

In [None]:
def histogram(file, output_folder, output_format):
    
    df = pd.read_csv(file, sep = '\t')
    df = df[['exon','consequence','score']]
    first_exon = df['exon'][0]

    gene_name = first_exon.split('_')[0]

    df = df.rename(columns = {'score': 'snv_score', 'consequence': 'Consequence'})

    df.loc[df['Consequence'].str.contains('missense'), 'Consequence'] = 'Missense'
    df.loc[df['Consequence'] == 'synonymous_variant', 'Consequence'] = 'Synonymous'
    df.loc[df['Consequence'] == 'intron_variant', 'Consequence'] = 'Intron'
    df.loc[df['Consequence'] == 'stop_gained', 'Consequence'] = 'Stop Gained'
    df.loc[df['Consequence'] == 'stop_lost', 'Consequence'] = 'Stop Lost'
    df.loc[df['Consequence'].str.contains('splic'), 'Consequence'] = 'Splice'
    df.loc[df['Consequence'].str.contains('UTR'), 'Consequence'] = 'UTR Variant'
    df.loc[df['Consequence'] == 'start_lost', 'Consequence'] = 'Start Lost'

    alt.data_transformers.disable_max_rows() #gets rid of max data length problem
    
    length = str(len(df)) #gets length of data for title

    final_tital = 'Distribution of ' + gene_name + ' SGE Scores ' + '(n = ' + length + ')' #used to build title
    bins = 50 #number of bins

    sorted = ["Intron", "Missense", "Synonymous", "Stop Gained", "Splice", "Start Lost", 'Stop Lost', 'UTR'] #order for the legend

    #Builds histogram
    histogram = alt.Chart(df).mark_bar().encode(
            alt.X('snv_score', axis = alt.Axis(title = 'SGE Score', labelFontSize = 16, titleFontSize = 20), bin = alt.Bin(maxbins = bins)),
            alt.Y('count()', axis = alt.Axis(title = 'Number of Variants', labelFontSize = 16, titleFontSize = 20)),
            color = alt.Color('Consequence:N', scale = alt.Scale(scheme = 'category10'), sort = sorted, legend = alt.Legend(titleFontSize = 16, labelFontSize = 14))
    ).properties(
        width = 800,
        height = 400,
        title = alt.TitleParams(text = final_tital, fontSize = 22)
    ).interactive()

    file_name = output_folder + gene_name + '_SGE_histogram' + output_format
    
    histogram.save(file_name)
    histogram.display()

    return gene_name
    

In [None]:
def ridgeline_plot(file, gene_name, output_folder, output_format):
    
    def read_scores(file): #reads SGE scores
    
        df = pd.read_csv(file, sep = '\t')
        df = df.rename(columns = {'score': 'snv_score', 'consequence': 'Consequence'})
        df = df[['exon','Consequence','snv_score']]
        df = df.loc[df['Consequence'].isin(['missense_variant', 'synonymous_variant', 'intron_variant', 'stop_gained', 'start_lost', 'splicing_variant', 'splice_site_variant', 'UTR_variant'])]

        return df

    def prep_data(df): #Renames categories to be nicer
    
        df.loc[df['Consequence'].str.contains('missense'), 'Consequence'] = 'Missense'
        df.loc[df['Consequence'] == 'synonymous_variant', 'Consequence'] = 'Syn.'
        df.loc[df['Consequence'] == 'intron_variant', 'Consequence'] = 'Intron'
        df.loc[df['Consequence'] == 'stop_gained', 'Consequence'] = 'Stop'
        df.loc[df['Consequence'].str.contains('splic'), 'Consequence'] = 'Splice'
        df.loc[df['Consequence'].str.contains('UTR'), 'Consequence'] = 'UTR'
        df.loc[df['Consequence'].str.contains('start'), 'Consequence'] = 'Start Lost'
   
        return df

    def compute_density(df, category): #used to calculate the density - mostly GPT code
        values = df['snv_score'].values
        density = gaussian_kde(values)
        xs = np.linspace(values.min(), values.max(), 200)
        ys = density(xs)
    
        return pd.DataFrame({'SGE_score': xs, 'density': ys, 'Consequence': category})

    def make_plot(df,rawdf): #creates plot

        df = pd.concat([df,rawdf], ignore_index = True) #concatenates the density data and raw data so that the ticks and density plots share a dataframe (required for graphing)
    
        #Base creates the density plots using the density data 
        base = alt.Chart(df).mark_area(
            interpolate = 'monotone',
            line = True
        ).encode(
            x = alt.X('SGE_score:Q', axis = alt.Axis(title = 'SGE Score', titleFontSize = 20, labelFontSize = 16)),
            y = alt.Y('density:Q', axis = None),
            color = alt.Color('Consequence:N', legend = None)
        )
    
        #ticks creates the tick marks using the raw data
        ticks = alt.Chart(df).mark_tick(
            color = 'black',
            thickness = 0.5,
            size = 5
        ).encode(
            x = alt.X('snv_score:Q', title = ''),
            y = alt.value(77.5)
        )
    
        #Plots are layered
        combined_plot = alt.layer(ticks, base).properties(
            width = 1000,
            height = 75
        )

        sorted = ["Intron", "Missense", "Syn.", "Stop", "Splice", "Start Lost", 'UTR']
        
        #plots are faceted by consequence of the variant to yield the final plot (faceting requires that all data is in the same dataframe
        plot = combined_plot.facet(facet = alt.Facet(
            row = alt.Row('Consequence:N', title = 'Consequence', sort = sorted)),
            spacing = 2, 
        ).properties(
            title = '',
            bounds = 'flush'
        ).configure_facet(
            spacing= 1
        ).configure_header(
            titleFontSize = 20,
            labelFontSize = 16
        ).configure_title(
            anchor='start'
        ).configure_axis(
            grid=False
        ).configure_view(
            stroke = None
        )

        file_name = output_folder + gene_name + '_SGE_RidgelinePlot' + output_format
        plot.save(file_name)
        plot.show()

    def main():
        alt.data_transformers.disable_max_rows()
        data = read_scores(file)
        relabeled_data = prep_data(data)
        density_data = pd.concat([compute_density(relabeled_data[relabeled_data['Consequence'] == category], category)
                              for category in relabeled_data['Consequence'].unique()])
        
        make_plot(density_data,relabeled_data)

    main()

In [None]:
def corr_matrix(file, gene_name, output_folder, output_format):

    def read_scores(file): #Reads SGE scores
        df = pd.read_csv(file, sep = '\t')
    
        all_columns = ['target','D05_R1_lib1','D05_R2_lib1','D05_R3_lib1','D13_R1_lib1','D13_R2_lib1','D13_R3_lib1'] #List of columns to retain
        
        df = df[all_columns] #Gets necessary columns
        
        df = df.rename(
            columns = {
                'D05_R1_lib1': 'D05 R1', 
                'D05_R2_lib1': 'D05 R2', 
                'D05_R3_lib1': 'D05 R3', 
                'D13_R1_lib1': 'D13 R1',
                'D13_R2_lib1': 'D13 R2', 
                'D13_R3_lib1': 'D13 R3',
            }
        ) #Renames columns to be consistent with downstream code

            
        df['target'] = df['target'].str.extract(r'_X(.*)$') #Rewrites targets column to contain target name only
    
        return df

    
    def group_targets(df): #Groups targets for correlation testing
        
        grouped = df.groupby('target') #Creates groupby objects for each SGE region
        r_lists = [] #list that will hold lists that have correlation
        combos = [('D05 R1','D05 R2'),('D05 R1','D05 R3'),('D05 R2','D05 R3'),
                  ('D13 R1','D13 R2'),('D13 R1','D13 R3'),('D13 R2','D13 R3')
                 ] #pairwise arguments for correlation testing
        
        for group_name, group_df in grouped: #iterates through groupby objects
            group_df = group_df[['D05 R1', 'D05 R2', 'D05 R3', 
               'D13 R1', 'D13 R2', 'D13 R3']] #pulls out count columns only
            
            for elem in combos: #iterates through each combination of replicates for r-testing
                output = [] #output list
                col1, col2 = elem #unpacks tuple that is combination
                output.append(group_name) #adds SGE target name
                output.append(col1 + ' vs ' + col2) #adds which replicates were tested
                output.append(compute_r(group_df,col1,col2)) #appends r
                r_lists.append(output) #appends final list to output list
                
        return r_lists
    
    
    def compute_r(group,col1,col2): #does the correlation math
        return group[col1].corr(group[col2])
    
    
    def parse_r_lists(r_list): #Parses through grouped lists and create Pearson R values
        
        target = [] #list to hold SGE target names
        test_type = [] #list to hold the replicates compared
        r = [] #list to hold r values
        
        for elem in r_list: #iterates through each item in list and appends respective information
            target.append(elem[0])
            test_type.append(elem[1])
            r.append(round(elem[2],3))
    
        data = {
                'Targets' : target,
                'Tests' : test_type,
                'r_correlation': r
        } #Dictionary to build dataframe
        
        to_map = pd.DataFrame(data) #Final dataframe with all R-values
        
        return to_map

    def heatmap(pivoted):
        
        targets = set(pivoted['Targets'].tolist())
        targets = natsorted(targets)
        title = 'Correlation of Replicates' + ' (' + gene_name + ')'
        
        graph = alt.Chart(pivoted, title = alt.TitleParams(text = title, fontSize = 32)).mark_rect().encode(
                    x = alt.X('Tests:N', axis = alt.Axis(title = '', titleFontSize = 28, labelFontSize = 22, labelLimit = 300, labelAngle = 45)),
                    y = alt.Y('Targets', axis = alt.Axis(title = 'SGE Target Region', titleFontSize = 28, labelFontSize = 22), sort = targets),
                    color = alt.Color('r_correlation:Q', scale = alt.Scale(domain = [.2, 1]), legend = alt.Legend(title = "Pearson's r", titleFontSize = 24,labelFontSize = 22)),
                    tooltip = [alt.Tooltip('r_correlation', title = "Pearson's r: ")]
        ).properties(
            width = 800,
            height = 700
        )

        file_name = output_folder + gene_name + '_SGE_CorrelationMatrix' + output_format
        graph.save(file_name)
        graph.display()
    
    
    def main():
        data = read_scores(file) 
        r_lists = group_targets(data)
        test = parse_r_lists(r_lists)
        heatmap(test)

    main()


In [None]:
def stacked_bars(file, gene_name, output_folder, output_format):

    def read_data(path): #Reads data
        df = pd.read_csv(file, sep = '\t') #Reads SGE score file
    
        df = df.rename(columns = {'consequence': 'Consequence', 'score': 'snv_score'}) #Renames to harmonize old code with new column names
        
        df = df[['exon','Consequence','snv_score', 'functional_consequence']] #pulls out relevant columns in the dataframe
    
        filtered_consequences = ['missense_variant','synonymous_variant','stop_gained'] #Focusing on these variant types only 
    
        df = df.loc[df['Consequence'].isin(filtered_consequences)] #Filters for desired variant types
        
        df = df.reset_index(drop = True) #Resets index
        
        return df

    def prep_data(df): #renames VEP consequence categories to be more human friendly
        
        df.loc[df['Consequence'].str.contains('missense_variant'), 'Consequence'] = 'Missense'
        df.loc[df['Consequence'].str.contains('synonymous_variant'), 'Consequence'] = 'Synonymous'
        #df.loc[df['Consequence'] == 'intron_variant', 'Consequence'] = 'Intron'
        df.loc[df['Consequence'].str.contains('stop_gained'), 'Consequence'] = 'Stop'
        #df.loc[(df['Consequence'] == 'splice_polypyrimidine_tract_variant') |(df['Consequence'] == 'splice_region_variant') | (df['Consequence'] == 'splice_acceptor_variant') | (df['Consequence'] == 'splice_donor_region_variant') | (df['Consequence'] == 'splice_donor_5th_base_variant') | (df['Consequence'] == 'splice_donor_variant'),'Consequence'] = 'Splice'
        #df.loc[df['Consequence'] == '3_prime_UTR_variant', 'Consequence'] = 'UTR'
    
        return df


    def rewrite_targets(df): #Rewrites SGE target to get exon number only
        
        df['target'] = df['exon'].str.extract(r'_X(\d+)')
        
        return df


    def exon_stats(df): #Generates summary dataframe with % of variants in each functional class for each exon
    
        exon_df_list = [] #List to hold summary dataframes for each target
        grouped_exons = df.groupby(['target','Consequence'])
        
        for group_name, group_df in grouped_exons:
            exon, var_type = group_name
            scores = group_df['functional_consequence'].tolist()
    
            non = 0 #counters for nonfunctional group
    
            inter = 0 #counter for indeterminate group
            
            for elem in scores: #Iterates through grouped scores and determines number of variants in each functional class
                if elem == 'functionally_abnormal': #Boolean for non-functional 
                    non += 1
                elif elem == 'indeterminate': #Boolean for indeterminate
                    inter += 1
    
            non_per = (non / len(group_df)) * 100 #Gets % non-functional
            inter_per = (inter / len(group_df)) * 100 #Gets % intermediate
            same_per = (100 - (non_per + inter_per)) #Gets % functional
    
            #Creates 3 separate dataframes for each functional class
            exon_non_df = pd.DataFrame({'Exon': exon, 'Consequence': var_type, 'Function Type': 'Nonfunctional', 'Percent': non_per}, index = [0])
            exon_inter_df = pd.DataFrame({'Exon': exon, 'Consequence': var_type, 'Function Type': 'Indeterminate', 'Percent': inter_per}, index = [0])
            exon_same_df = pd.DataFrame({'Exon': exon, 'Consequence': var_type, 'Function Type': 'Functional', 'Percent': same_per}, index = [0])
                
            exon_df = pd.concat([exon_non_df, exon_inter_df, exon_same_df]) #Concatnates all dataframes together
    
            exon_df_list.append(exon_df) #Appends to final dataframe list
    
        all_exons_df = pd.concat(exon_df_list) #Concatenates all summary dataframes
        
        all_exons_df = all_exons_df.reset_index(drop = True) #resets index
        
        return all_exons_df


    def stacked_bars(df): #Creates the stacked bar chart
        
        exons = natsorted(set(list(df['Exon'].tolist()))) #Gets sorted list of exons
        
        #Builds stacked bar chart
        chart = alt.Chart(df).mark_bar().encode(
                x = alt.X('Exon:O', axis = alt.Axis(labelAngle = 0, labelFontSize = 16, titleFontSize = 20), sort = exons),
                y = alt.Y('Percent',axis = alt.Axis(labelFontSize = 16, titleFontSize = 20)),
                tooltip = [alt.Tooltip('Function Type', title = 'Functional Class: '), 
                            alt.Tooltip('Percent', title = 'Percent: ')],
                color = alt.Color('Function Type', title = 'Functional Class', legend = alt.Legend(titleFontSize = 18, labelFontSize = 16))
        ).properties(
            width =400,
            height = 500
        ).facet(facet = alt.Facet('Consequence',
            sort = ['Synonymous', 'Missense', 'Stop']
                    )
        )
    
        chart = chart.configure_header(
            titleFontSize = 20, 
            labelFontSize = 16
        )

        file_name = output_folder + gene_name + '_SGE_StackedBars' + output_format
        chart.save(file_name)
        chart.show()


    def main():
        data = read_data(file)
        reannotated = prep_data(data)
        num_exons = rewrite_targets(data)
        all_stats = exon_stats(num_exons)
        stacked_bars(all_stats)



    main()

In [None]:
def strip_plot(file, gene_name,output_folder,output_format):
    
    def read_data(file):
        df = pd.read_csv(file, sep = '\t')
        df = df.rename(columns = {'consequence': 'Consequence'})
        df.loc[df['Consequence'].str.contains('missense'), 'Consequence'] = 'Missense'
        df.loc[df['Consequence'] == 'synonymous_variant', 'Consequence'] = 'Synonymous'
        df.loc[df['Consequence'] == 'intron_variant', 'Consequence'] = 'Intron'
        df.loc[df['Consequence'] == 'stop_gained', 'Consequence'] = 'Stop Gained'
        df.loc[df['Consequence'] == 'stop_lost', 'Consequence'] = 'Stop Lost'
        df.loc[df['Consequence'].str.contains('site'), 'Consequence'] = 'Canonical Splice'
        df.loc[df['Consequence'].str.contains('ing_var'), 'Consequence'] = 'Splice Region'
        df.loc[df['Consequence'].str.contains('UTR'), 'Consequence'] = 'UTR Variant'
        df.loc[df['Consequence'] == 'start_lost', 'Consequence'] = 'Start Lost'
        
        return df

    def strip_plot(df):
    
        
        nf_line = alt.Chart(pd.DataFrame({'x': [-0.089]})).mark_rule(color = 'red').encode(
            x = 'x')
    
        func_lin = alt.Chart(pd.DataFrame({'x': [-0.077]})).mark_rule(color = 'blue').encode(
            x = 'x')
    
        sorted = ["Intron", "Missense", "Synonymous", "Stop Gained", "Splice", "Start Lost", 'Stop Lost', 'Splice Region','Canonical Splice', 'UTR']
        controls_sorted = ["Intron",  "Synonymous", "Stop Gained", "Missense", "Splice", "Start Lost", 'Stop Lost', 'UTR']
        df = df.loc[df['Consequence'].isin(sorted)]

        final_tital = 'Distribution of ' + gene_name + ' SGE Scores' #used to build title
    
        # Get the category10 colors
        category10_colors = [
            '#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', 
            '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'
        ]
        
        # Map each category to a specific color
        color_mapping = {
            "Intron": category10_colors[0],
            "Synonymous": category10_colors[1],
            "Stop Gained": category10_colors[2],
            "Missense": category10_colors[3],
            "Splice": category10_colors[4],
            "Start Lost": category10_colors[5],
            "Stop Lost": category10_colors[6],
            "UTR": category10_colors[7]
        }
        
        plot = alt.Chart(df).mark_tick(opacity = 1).encode(
            x = alt.X('score:Q',
                      axis = alt.Axis(title = '', 
                                      titleFontSize = 20,
                                     labelFontSize = 24)
                     ),
            y = alt.Y('Consequence:N', 
                      sort = sorted,
                      axis = alt.Axis(title = '',
                                     labelFontSize = 24)
                     ),
            color = alt.Color('Consequence:N',
                    legend=None,
                    sort = sorted,
                    scale = alt.Scale(scheme = 'category10')
                             )
            ).properties(
                width = 800,
                height = 400,
                title = alt.TitleParams(final_tital, fontSize = 22)
            )
        
        #plot = plot + nf_line + func_lin
    
        plot = plot.configure_axis(
            grid = False
        )

        file_name = output_folder + gene_name + '_SGE_StripPlot' + output_format
        plot.save(file_name)
        plot.display()
        

    def main():
        data = read_data(file)
        strip_plot(data)

    main()

In [None]:
def main():
    files = get_files(scores_path)

    for file in files:
        gene_name = histogram(file, output_folder, output_format)
        ridgeline_plot(file, gene_name, output_folder, output_format)
        corr_matrix(file, gene_name, output_folder, output_format)
        stacked_bars(file, gene_name, output_folder, output_format)
        strip_plot(file, gene_name,output_folder,output_format)


In [None]:
main()