In [1]:
# module imports

from glob import glob
import pandas as pd
import re
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from random import sample
import scienceplots
import csv
from kneed import KneeLocator
from scipy.stats import entropy
from scipy.stats import spearmanr

In [2]:
# user input parameters

# boolean parameter to dictate use of print statements
debug = False

# git location
git_fpath = '/Users/sarahfisher/Documents/project/variability'

# path to folder containing data gathered in a single experiment
experiment_folder_fpath = '/Users/sarahfisher/Documents/project/data/20230404/CSV_FB_20230404/'

# results filepath
results_fpath = '/Users/sarahfisher/Documents/project/data/20230404/results/'

# temperature filepath
temp_fpath = '/Users/sarahfisher/Documents/project/data/20230404/Temp_df.csv'

# filepath extension for results related to temperature
temperature_ext_fpath = 'temperature_plots/'

# filepath extension for results related to bin selection
bin_selection_ext = 'bin_selection/'

# filepath extension for results related to bin validation
bin_validation_ext = 'bin_validation/'

# filepath extension for results related to mi calculation 
mi_calc_ext= 'mi_calculation/'

# filepath extension for results related to pairwise mi calculation 
pairwise_mi_ext = 'pairwise_mi_calculation/'

# filepath extension for results related to correlation
correlation_ext = 'intrawell_correlation/'

# generic file extension for .csv data files (replace unique identifiers with `*`)
raw_csv_file_ext = 'Time_points_dfs_*.csv'

# searchable file extension for .csv data file identifiers (replace `*` with `(.+?)`)
raw_id_search_ext = 'Time_points_dfs_(.+?).csv'

# list of column names of features of interest on .csv files 
feature_ref_list = ['mean_F_C2','mean_F_C3','area']

# list of renamed features of interest for clarity
feature_name_list = ['gfp','rfp','area']

# dictionary mapping each feature name to its corresponding proper name used in figures
proper_feature = {'gfp':'GFP', 'rfp':'RFP', 'area':'Area'}

# reference name of column describing the temerature from .csv data file
temp_ref = 'T_CHIP(celcius)'

# reference name of colummn describing the timestep from .csv data files
timestep_ref = 'Time_interval'

# renamed column describing the timestep
timestep_name = 'timestep'

# reference name of number of colummn describing the number of cells from .csv data files
num_cells_ref = 'Number_cells'

# renamed column describing the number of cells
num_cells_name = 'ncells'

In [3]:
# function declaration

# input: path to .csv file to append to (csv_fpath), list of the row to append to the file (row)
# output: None (appends to the specified .csv file)
def append_row_csv(csv_fpath, row):
    with open(csv_fpath,'a') as f:
        writer = csv.writer(f)
        writer.writerow(row)
    return

# input: path to folder containing data (data_fpath) and the generic file extension of interest (file_ext)
# output: list of data files from specified input directory
def find_data_files(data_fpath, file_extension):
    data_files = glob(data_fpath + file_extension)
    data_files.sort()
    return data_files

# input: list of csv files (csv_files) and the file path extension that specifies its id with (.+?) (id_search_ext)
# output: dictionary mapping a .csv file path itendifier to a dataframe containing its contents
def create_dataframe_dict(csv_files, id_search_ext):
    df_dict = {}
    for file in csv_files:
        df = pd.read_csv(file)
        df_id = re.search(id_search_ext, file).group(1)
        df_dict[df_id] = df
    return df_dict

# input: dictionary mapping an identifier to its dataframe (df_dict), list of reference column names of interest (col_ref_list), and list of new corresponding column names if different (col_name_list) 
# output: dataframe containing columns of interet, concatenated across all dataframes in input dictionary
def create_df_subset(df_dict, col_ref_list, col_name_list=None):
    if col_name_list == None: col_name_list = col_ref_list
    df_subset_result = pd.DataFrame(columns = col_name_list)
    for df_id in df_dict:
        df = df_dict[df_id]
        df_subset = pd.DataFrame()
        df_subset[col_name_list] = df[col_ref_list]
        df_subset_result = pd.concat([df_subset_result, df_subset])
    return df_subset_result

# input: dictionary and an ordered list of its keys
# output: a lost of ordered values corresponding to keys
def create_list_from_dict(dictionary, ordered_keys):
    res_list = []
    for key in ordered_keys:
        res_list.append(dictionary[key])
    return res_list

# input: list of values (old_vals) and a list of values with a range over which the first values should be scales (new_vals)
# output: list of scaled input values
def scale(old_vals, new_vals):
    scaled = [] 
    old_range = max(old_vals) - min(old_vals)
    new_range = max(new_vals) - min(new_vals)
    for val in old_vals:
        new_val = ( (val - min(old_vals)) / old_range ) * new_range + min(new_vals)
        scaled.append(new_val)
    return scaled

# input: a set of x-values (x), a set of y-values (y), and a number over which they should be averaged (num)
# output: averaged x and y values
def avg_plot(x, y, num):
    if num == 0:
        return x, y
    x_new = []
    y_new = []
    x_temp = []
    y_temp = []
    n = 0
    for i in range(len(x)):
        x_temp.append(x[i])
        y_temp.append(y[i])
        n += 1
        if n == num:
            x_new.append(sum(x_temp)/len(x_temp))
            y_new.append(sum(y_temp)/len(y_temp))
            x_temp = []
            y_temp = []
            n = 0
    if n != 0:
        x_new.append(sum(x_temp)/len(x_temp))
        y_new.append(sum(y_temp)/len(y_temp))
    return x_new, y_new

In [4]:
# specialized function declaration

# requires: `timestep_list`, `experiment_dict`, `timestep_ref`, `num_cells_ref`
# output: dictionary mapping each timestep to a list of number of cells in each well at that timestep
def create_ncells_list_dict():
    nmeasure_list_dict = {key:[] for key in timestep_list}
    ncells_list_dict = {key:[] for key in timestep_list}
    for well in experiment_dict:
        well_df = experiment_dict[well]
        for t in timestep_list:
            ncells = (list((well_df.loc[well_df[timestep_ref] == t])[num_cells_ref]))[0]
            nmeasure = len(list((well_df.loc[well_df[timestep_ref] == t])[num_cells_ref]))
            ncells_list_dict[t].append(ncells)
            nmeasure_list_dict[t].append(nmeasure)
    return ncells_list_dict, nmeasure_list_dict

# requires: `ncells_list_dict`
# output: dictionaries mapping each timestep to its corresponding average number of cells and standard deviation in number of cells
def create_ncells_dicts():
    ncells_avg_dict = {}
    ncells_std_dict = {} 
    nmeasure_min_dict = {}
    for t in ncells_list_dict:
        ncells_avg_dict[t] = sum(ncells_list_dict[t])/len(ncells_list_dict[t])
        ncells_std_dict[t] = np.std(ncells_list_dict[t])
        nmeasure_min_dict[t] = min(min(nmeasure_list_dict[t]), min(ncells_list_dict[t]))
    return ncells_avg_dict, ncells_std_dict, nmeasure_min_dict

# requires: `results_fpath`, `bin_selection_ext`, existence of `*_optimal_bins.csv` files
# output: dictionaries mapping each timestep to its corresponding average number of cells and standard deviation in number of cells
def create_bin_edges_dict():
    optimal_bins_files = find_data_files(results_fpath+bin_selection_ext, '*_optimal_bins.csv')
    optimal_bins_dict = create_dataframe_dict(optimal_bins_files, f'{results_fpath}{bin_selection_ext}(.+?)_optimal_bins.csv')
    bin_edges_dict = {}
    for feature in optimal_bins_dict:
        bin_edges_dict[feature] = list(pd.read_csv(optimal_bins_dict[feature])['bin_edges'])[0]
    return bin_edges_dict

# requires: `temp_series`
# output: list of seconds over which experiment was taken
def get_temp_secs():
    secs = []
    for i in range(len(temp_series)):
        secs.append(i)
    return secs 

# requires: `nmeasure_min_list`, `temp_series`, `timestep_list`
# output: point at which the experiment starts
def get_start_plot():
    last = 0
    for i in range(len(nmeasure_min_list)):
        if nmeasure_min_list[i] > 250 and last == 0:
            last = i
    start_plot = last*(len(temp_series)/len(timestep_list))
    return int(start_plot)

In [5]:
# `main()` function declaration

# intended use in coordination with ...
# requires: module imports and user input parameters specified above
# output: returns None, assigns new global variables: `well_fpath_list`, `experiment_dict`, `feature_df`

def main():
    
    # list of filepaths of .csv files, each containing data from a single well of the experiment
    global well_fpath_list
    well_fpath_list = find_data_files(experiment_folder_fpath, raw_csv_file_ext)
    if debug:
        print('well_fpath_list:', type(well_fpath_list), '\n', well_fpath_list)
    
    # dictionary mapping each well number in the experiment to a dataframe containing its .csv file data contents
    global experiment_dict
    experiment_dict = create_dataframe_dict(well_fpath_list, raw_id_search_ext)
    if debug:
        print('experiment_dict:', type(experiment_dict), '\n', experiment_dict)
    
    # list of all well ids
    global well_id_list
    well_id_list = list(experiment_dict)
    if debug:
        print('well_id_list:', type(well_id_list), '\n', well_id_list)
    
    # list of all timesteps
    global timestep_list
    timestep_list = list(set(experiment_dict[well_id_list[0]][timestep_ref]))
    if debug:
        print('timestep_list:', type(timestep_list), '\n', timestep_list)
        
    # series of recorded temperatures
    global temp_series
    temp_series = pd.read_csv(temp_fpath)[temp_ref]
    if debug:
        print('temp_series:', type(temp_series), '\n', temp_series)
        
    # list of times at which temperatures were recorded
    global temp_secs
    temp_secs = get_temp_secs()
    if debug:
        print('temp_secs:', type(temp_secs), '\n', temp_secs)
        
    # list of recorded timesteps scaled to seconds
    global timestep_scaled
    timestep_scaled = scale(timestep_list, temp_secs)
    if debug:
        print('timestep_scaled:', type(timestep_scaled), '\n', timestep_scaled)
    
    # dataframe containing a specified subset of columns, contents concatenated across all wells of the experiment
    global feature_df
    feature_df = create_df_subset(experiment_dict, feature_ref_list, feature_name_list)
    if debug:
        print('feature_df:', type(feature_df), '\n', feature_df)
    
    # dictionary mapping a timestep to a list of the number of cells in each well and number of cells measured at that timestep
    global ncells_list_dict
    global nmeasure_list_dict
    ncells_list_dict, nmeasure_list_dict = create_ncells_list_dict()
    if debug:
        print('ncells_list_dict:', type(ncells_list_dict), '\n', ncells_list_dict)
        print('nmeasure_list_dict:', type(nmeasure_list_dict), '\n', nmeasure_list_dict)

    # dictionary mapping a timestep to the average, standard deviation in number of cells at that timestep
    global ncells_avg_dict
    global ncells_std_dict
    global nmeasure_min_dict
    ncells_avg_dict, ncells_std_dict, nmeasure_min_dict = create_ncells_dicts()
    if debug:
        print('ncells_avg_dict:', type(ncells_avg_dict), ncells_avg_dict)
        print('ncells_std_dict:', type(ncells_std_dict), ncells_std_dict)
        print('nmeasure_min_dict:', type(nmeasure_min_dict), nmeasure_min_dict)

    # list of the average number of cells through time
    global ncells_avg_list
    ncells_avg_list = create_list_from_dict(ncells_avg_dict, timestep_list)
    if debug:
        print('ncells_avg_list:', type(ncells_avg_list), '\n', ncells_avg_list)

    # list of the standard deviation in number of cells through time
    global ncells_std_list
    ncells_std_list = create_list_from_dict(ncells_std_dict, timestep_list)
    if debug:
        print('ncells_std_list:', type(ncells_std_list), '\n', ncells_std_list)

    # list of the average number of measured cells through time
    global nmeasure_min_list
    nmeasure_min_list = create_list_from_dict(nmeasure_min_dict, timestep_list)
    if debug:
        print('nmeasure_min_list:', type(nmeasure_min_list), '\n', nmeasure_min_list)
        
    # second mark of experiment where the minimum number of cells measured is 250
    global start_plot
    start_plot = get_start_plot()
    if debug:
        print('start_plot:', type(start_plot), '\n', start_plot)
    
        
    return

In [6]:
# call to `main()` function

main()