This file has the functions for
1. Reading cfDNA coordinate bed files specified in the "inputBedFileFolder" directory specified in the config file
2. Partitioning the data into training, validation and test set according to the values "percentTest" and "percent validation" specified in the config file. Note that this splitting happens in such a way that chromosomes are not shared between the training, test and validation set. cfDNA fragments could have overlapping sequences (due to sequencing depth). Making sure chromosomes are not shared between the different sets would ensure there is no data leakage.
3. Stores the output into H5PY files under separate datasets - trainingData, trainingLabels, validationData, validationLabels, testData and testLabels


In [None]:
import numpy as np
import pandas as pd

import sys
import importlib

import h5py

sys.path.insert(0,'/hpc/compgen/projects/fragclass/analysis/mvivekanandan/script/madhu_scripts')

import config
import sequenceUtils

import os

In [None]:
importlib.reload(config)
importlib.reload(sequenceUtils)

#Set arguments from config file.
arguments = {}
arguments["donorFile"] = config.filePaths.get("donorFile")
arguments["inputBedFileFolder"] = config.filePaths.get("inputBedFileFolder")
arguments['coordStoreDirectory'] = config.filePaths.get("coordStoreDirectory")
arguments['snpFilePath'] = config.filePaths.get("snpFile")
arguments['coordsStoreFilePath'] = config.filePaths.get("coordStoreFile")
arguments['testPercent'] = config.dataCreationConfig.get("percentTest")
arguments['validationPercent'] = config.dataCreationConfig.get("percentValidation")
arguments['numColsToExtract'] = config.dataCreationConfig.get("numColsToExtract")

In [None]:
"""
This function takes a numpy array of training/test/validation data and generates a label array filled with a single value that is provided in label argument.
Args:
dataNumpy(2D numpy array) - an 2D numpy array data for which labels need to be generated. Note that all samples belonging to this array should have the same label.
label(integer) - Single integer. The output labels array will be filled with this value.

Output: a 1D array whose length is the same as the number of rows in the dataNumpy.
"""
def getLabelsForData(dataNumpy, label):
    nrows, ncols = dataNumpy.shape
    if label == 0:
        return np.zeros(nrows).reshape(nrows, 1)
    if label == 1:
        return np.ones(nrows).reshape(nrows, 1)
    else:
        print(f"Invalid label for data : {label}")
        raise SystemExit(1)

"""
This function takes the "percentage of samples" column from the file_level_chrom_percent_df(last argument)
for a given chromosome(1st argument), adds it to the "percentage" argument and returns the sum

Args:
chrom (string, between 1 and 23, X, Y) - the chromosome number for which the percentage of samples from the input df needs to be
         added to the given percentage.
percentage (integer or floar value) - percentage to add to the "percentage of samples" value in the df
file_level_chrom_percent_df (a dataframe with columns #chrom and percentage of samples) - the percentage of samples vs chromosome number df. It should have the columns
                              #chrom and "percentage of samples".

output(integer/float) -  Value of the added percentages.
"""
def addPercentagesFunction(chrom, percentage, file_level_chrom_percent_df):
    percent_to_add = file_level_chrom_percent_df.loc[file_level_chrom_percent_df["#chrom"] == chrom]["percentage of samples"]
    if(percent_to_add.values.size == 0):
        return percentage
    else:
        return percentage + percent_to_add.values[0]

"""
This function is used to get the average percentage of samples from all the files which belong to each chromosome.

Output - returns a dataframe which has the following columns - #chrom and "percentage of samples". The #chrom column contains values
from 1 to 23, X and Y. "Percentage of samples" is the percentage of the total number of fragments that have that chromosome, averaged over all the files in the input bed folder directory.

Args:
inputBedFilesDirectoryPath - directory which has all the cfDNA fragment bed files.
columnNames - column names of the bed files - this is used for reading the bed files into a dataframe.
"""
def getChromosomePercentagesAverage(inputBedFilesDirectoryPath, columnNames):
    inputBedFilesDirectory = os.fsencode(inputBedFilesDirectoryPath)
    all_samples_df = pd.DataFrame(columns=['#chrom', "percentage of samples"])

    #Insert chromosome numbers.
    chroms = range(1, 23)
    list_chroms = list(map(lambda chrom: str(chrom), chroms)) + ["X"] + ["Y"]
    all_samples_df["#chrom"] = list_chroms
    all_samples_df["percentage of samples"] = [0] * 24

    num_files = 0
    for file in os.listdir(inputBedFilesDirectory):
        filename = os.fsencode(file).decode("utf-8")
        filepath = os.path.join(inputBedFilesDirectoryPath.decode("utf-8"), filename)
        num_files += 1
        cfdna_frag_df = pd.read_csv(filepath, sep = "\t", names = columnNames, skiprows=11)

        #If this string conversion is not done, for some files, #chrom till 14 are not strings. This creates problems while
        #matching to the string chromosomes from the all_samples_df
        cfdna_frag_df["#chrom"]= cfdna_frag_df["#chrom"].map(str)

        cfdna_chrom_sample_count = cfdna_frag_df.groupby("#chrom").size().reset_index()
        cfdna_chrom_sample_count.columns = ["#chrom", "percentage of samples"]

        #Transform from count to percentage
        total_samples = len(cfdna_frag_df)
        cfdna_chrom_sample_count["percentage of samples"] = cfdna_chrom_sample_count["percentage of samples"].transform(lambda x: x/total_samples * 100)

        # Pick the value from the cfdna_chrom_sample_count where #chrom in the chrom_sample_count df matches the #chrom
        # of the row being updated in the all_samples_df. The cdfna_chrom_sample_count.loc returns a pandas series.
        # values[0][1] is used to fetch the single int/float value of the percentage.
        # All samples should not contain the sum of percentages from all files for each chromosome.
        all_samples_df["percentage of samples"] = all_samples_df.apply(lambda x:  addPercentagesFunction(x["#chrom"], x["percentage of samples"], cfdna_chrom_sample_count), axis = 1)

    #Take the average of the percentages sum over all files.
    all_samples_df["percentage of samples"] =  all_samples_df["percentage of samples"].transform(lambda x: x/num_files)

    #Check to see if all the percentages in the final all_samples_df add upto 100.
    all_samples_avg = all_samples_df["percentage of samples"]
    all_samples_avg_sum = all_samples_avg.sum()
    if(round(all_samples_avg_sum) != 100):
        raise Exception(f"********* Something is wrong !! The sum of percentages of all files combined(${all_samples_avg_sum}) is not adding up to 100. \n After averaging, the all samples df is {all_samples_df.head(25)}")

    return all_samples_df

"""
Given a specific percentage to cover, this function returns the list of chromosmes which together account for the maxPercent of samples.

Args:
df -> The dataframe which has the chromosome numbers and the average percentage of samples covered by this chromosome. This df is expected to have the columns #chrom and "percentage of samples".
maxPercent -> The percentage that individual chromosome percentage coverage has to add upto.

Output - list of chromosomes which cover the given maxPercent
"""
def getChromosomesCoveringPercentSamples(df, maxPercent):
    chromosomes_list = []
    percent_covered = 0
    end_index = -1
    for i, row in df.iterrows():
        chrom = row["#chrom"]
        avg_percentage = row["percentage of samples"]
        percent_covered = percent_covered + avg_percentage
        chromosomes_list.append(chrom)
        if(percent_covered > maxPercent):
            end_index = i
            break
    if(end_index) == -1:
        raise Exception("Something is wrong, the inidividual percentages do not add upto the percentage requested")

    return (end_index, chromosomes_list)

"""
This function outputs 3 lists of chromosomes - for training, validation and test.
A sample output would be [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] , [13, 14, 15], [16, 17, 18, 19, 20, 21, 22, X, Y]
It internally calls a function that constructs a dataframe that has the average percentage of samples covered by each chromosomes (this averge is constructed by taking all the bed files into account). This is then used to calculate which chromosomes should be part of the training set to reach the trainPercent. The same process is repeated for the validation and test sets with the remaining chromosomes.

Args:
inputBedFilesDirectoryPath -> The directory which has the bed files
columnNames -> Column names inside bed files, this is used to read the bed file into a dataframe
trainPercent -> Percentage of samples to be allotted to the train set
validationPercent -> Percentage of samples to be alloted to the validation set.
"""
def getChromosomeListsForTrainingValidationTest(inputBedFilesDirectoryPath, columnNames, trainPercent, validationPercent):
    average_percentage_df = getChromosomePercentagesAverage(inputBedFilesDirectoryPath, columnNames)
    (training_end_index, training_chromosomes) = getChromosomesCoveringPercentSamples(average_percentage_df, trainPercent)
    (validation_end_index, validation_chromosomes) = getChromosomesCoveringPercentSamples(average_percentage_df.iloc[training_end_index + 1:], validationPercent)

    test_chromosomes = average_percentage_df.loc[validation_end_index + 1:]["#chrom"].values.tolist()
    return (training_chromosomes, validation_chromosomes, test_chromosomes)

"""
The validationPercent specified in the config file is relative to the non-test data.
If the config file has testPercent and validationPercent as 20 and 20, it means 20% of the 80% non-test data should be used for validation.
This function calculates the absolute percentage relative to the entire sample that should belong to the training, validation and test set. It outputs a tuple of absolute training percentage and absolute validation percentage.
"""
def getSampleDistributionPercents():
    testPercent = arguments["testPercent"]
    validationPercent = arguments["validationPercent"]

    nonTestPercent = (100 - testPercent)
    absValidationPercent = nonTestPercent * validationPercent/100
    absTrainingPercent = nonTestPercent * (100 - validationPercent)/100
    return (absTrainingPercent, absValidationPercent)

"""
This function partitions a given dataframe with cfDNA coordination information into 3 subsets - train, validation and test, based on which chromosome they belong to.

Args:
cfdna_frag_df -> The dataframe that was created from reading the bed file with coordinates
train_chroms -> The list of chromosomes which should belong to the train set
validation_chroms -> The list of chromosomes that should belong to the validation set
test_chroms -> The list of chromosomes that should belong to the test set,

Outputs:
cfdna coordinates for training, validation and test subsets.
"""
def getTrainTestValidationData(cfdna_frag_df, train_chroms, validation_chroms, test_chroms):
    numColumnsToExtract = arguments["numColsToExtract"]
    training_df = cfdna_frag_df.loc[cfdna_frag_df["#chrom"].isin(train_chroms)].iloc[:, 0:numColumnsToExtract]
    validation_df = cfdna_frag_df.loc[cfdna_frag_df["#chrom"].isin(validation_chroms)].iloc[:, 0:numColumnsToExtract]
    test_df = cfdna_frag_df.loc[cfdna_frag_df["#chrom"].isin(test_chroms)].iloc[:, 0:numColumnsToExtract]

    return (training_df, validation_df, test_df)

In [None]:
"""
This function does the following
1. Iterates through all the files present in inputBedFileFolder directory in config file and reads them as dataframes.
2. Calls functions to split the dataframe into training, validation and test dataframes. Also calls functions for label generation
3. Writes the training, validation and test data into HDF5 files as different subdatasets within the file.
"""
def fetchCoordinatesAndStore():
    #ColumnNames in the bed files for reading as a dataframe.
    columnNames  = ["#chrom", "start", "end", "read_id", "mapq", "cigar1", "cigar2"]

    inputBedFilesDirectory = os.fsencode(arguments["inputBedFileFolder"])
    trainingPercent, validationPercent = getSampleDistributionPercents()
    training_chromosomes, validation_chromosomes, test_chromosomes = getChromosomeListsForTrainingValidationTest(inputBedFilesDirectory, columnNames, trainingPercent, validationPercent)

    for file in os.listdir(inputBedFilesDirectory):
        filename = os.fsencode(file).decode("utf-8")

        if("donor.frag.bed.gz" in filename or "recipient.frag.bed.gz" in filename):
            label = 0 if "donor" in filename else 1

            filepath = os.path.join(inputBedFilesDirectory.decode("utf-8"), filename)
            cfdna_frag_df = pd.read_csv(filepath,
                        sep = "\t", names = columnNames, skiprows=11)

            train_data, validation_data, test_data = getTrainTestValidationData(cfdna_frag_df, training_chromosomes, validation_chromosomes, test_chromosomes)

            #Get labels for the data
            trainingLabels = getLabelsForData(train_data, label)
            validationLabels = getLabelsForData(validation_data, label)
            testLabels = getLabelsForData(test_data, label)

            #Store the data into H5PY files as separate datasets.
            coordStoreFilePath = arguments["coordStoreDirectory"] + "/" + filename.replace('.frag.bed.gz', '') + ".hdf5"

            with h5py.File(coordStoreFilePath, 'w') as h5_file:
                h5_file.create_dataset("trainingCoords", data=train_data.astype(str).to_numpy(), compression = "gzip", compression_opts=9)
                h5_file.create_dataset("trainingLabels", data=trainingLabels, compression = "gzip", compression_opts=9)
                h5_file.create_dataset("validationCoords", data=validation_data.astype(str).to_numpy(), compression = "gzip", compression_opts=9)
                h5_file.create_dataset("validationLabels", data=validationLabels, compression = "gzip", compression_opts=9)
                h5_file.create_dataset("testCoords", data=test_data.astype(str).to_numpy(), compression = "gzip", compression_opts=9)
                h5_file.create_dataset("testLabels", data=testLabels, compression = "gzip", compression_opts=9)


In [None]:
if __name__ == "__main__":
    print(f"Arguments is {arguments}")
    fetchCoordinatesAndStore()