In [65]:
import pandas as pd
import os
import threading
import time
import pandas as pd
import numpy as np
import tensorflow as tf

In [66]:
def load_data(file_name):
    loaded_data = []
    with open(file_name, 'r') as file:
        for line in file:
            # Split the line by tabs
            parts = line.strip().split('\t')
            if len(parts) == 10 and parts[0][3:]!='X' and parts[0][3:]!='Y':
                # Extract the required data
                #print("**",parts[0][3:],int(parts[0][3:])*1_000_000_000)
                chr_value = int(parts[0][3:])*1_000_000_000
                start_pos = int(parts[1])
                end_pos = int(parts[2])
                feature1 = float(parts[6])
                feature2 = float(parts[7])
                feature3 = float(parts[8])
                feature4 = int(parts[9])

                # Create a data point as a tuple
                data_point = (chr_value, start_pos, end_pos, feature1, feature2, feature3, feature4)
                loaded_data.append(data_point)
    return loaded_data
            
def save_as_tfrecords_multithreaded(path, original_data, columns=["sequence"], group_by_col="Label"):
    """Provided data gets splitted in to groups and processed concurrently.
    The outcome of this is a file per group.

    Args:
      path: Location where files should be stored
      original_data: dataframe which should be converted into files
      columns: a  list of columns which should be stored as sequences (Default value = ["sequence"])
      group_by_col: a column name by which split data into groups (Default value = "Label")
    Returns:

    """
    os.makedirs(path, exist_ok=True)
    threading_start = time.time()
    coord = tf.train.Coordinator()
    threads = []
    data = original_data.groupby(group_by_col)
    for group_id in data.groups:
        if isinstance(group_id, str):
            group_name = group_id.replace(".", "_").replace("-", "_")
        elif isinstance(group_id, int):
            group_name = str(group_id)
        else:
            group_name = "_".join([str(e) for e in group_id])
        filename = os.path.join(path, group_name)
        args = (filename, data.get_group(group_id), columns)
        t = threading.Thread(target=save_as_tfrecords, args=args)
        t.start()
        threads.append(t)
    coord.join(threads)
    print("Completed all threads in {} seconds".format(time.time() - threading_start))

def save_as_tfrecords(filename, data, columns=["sequence"], extension="tfrecords"):
    """Processes a dataframe and stores data into tfrecord file

    Args:
      filename: the absolute path of the tfrecords file where data should be stored
      data: dataframe containing data will be converted into tfrecord
      columns: list of columns that should be stored as varying-length sequences (Default value = ["sequence"])
      extension: file extension
    Returns:

    """
    try:
        filename = "{}.{}".format(filename, extension)
        with tf.io.TFRecordWriter(filename) as writer:
            for index, row in data.iterrows():
                feature = {
                    'label': to_int_feature([row[0]])
                }
                for col_name in columns:
                    value = row[col_name]
                    if isinstance(value, int):
                        feature[col_name] = to_int_feature([value])
                    elif isinstance(value, float):
                        feature[col_name] = to_float_feature([value])
                    elif not isinstance(value, (list,)) and not isinstance (value, int) and ((value.dtype == np.float32) or (value.dtype == np.float64)):
                        feature[col_name] = to_float_feature(value)
                    else:
                        feature[col_name] = to_int_feature(value)
                        feature['length_' + col_name]:  to_int_feature([len(value)])

                example = tf.train.Example(features=tf.train.Features(feature=feature))
                writer.write(example.SerializeToString())

        print("Data was stored in {}".format(filename))
    except Exception as e:
        print("Something went wrong went writting in to tfrecords file")
        print("Error is ", str(e))

def to_int_feature(data):
    """
    Converts int list to tf Feature
    Args:
        data: int list to be stored in tf record

    Returns:
        tf Feature that is used in building tfrecord
    """
    return tf.train.Feature(int64_list=tf.train.Int64List(value=data))

def to_float_feature(data):
    """
    Converts float list to tf Feature
    Args:
        data: float list to be stored in tf record

    Returns:
        tf Feature that is used in building tfrecord
    """
    return tf.train.Feature(float_list=tf.train.FloatList(value=data))



In [69]:
file_names = ["rep1.bed","rep2.bed","rep3.bed","rep4.bed","rep5.bed","rep6.bed"]
df_list = []
for file_name in file_names:
    loaded_data = load_data("/workspaces/Chip_Seq_GAN_Peaks/input/"+file_name)
    df = pd.DataFrame(loaded_data, columns=['chromosome','start','end','feature1','feature2','feature3','feature4'])
    df["replica_id"] = file_name
    df_list.append(df)

for df in df_list:
    save_as_tfrecords_multithreaded("/workspaces/Chip_Seq_GAN_Peaks/data", 
    df,['chromosome', 'start', 'end', 'feature1', 'feature2', 'feature3', 'feature4'],"replica_id")

Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep1_bed.tfrecords
Completed all threads in 6.013890027999878 seconds
Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep2_bed.tfrecords
Completed all threads in 4.0127623081207275 seconds
Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep3_bed.tfrecords
Completed all threads in 4.00920295715332 seconds
Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep4_bed.tfrecords
Completed all threads in 4.008511781692505 seconds
Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep5_bed.tfrecords
Completed all threads in 6.013692855834961 seconds
Data was stored in /workspaces/Chip_Seq_GAN_Peaks/data/rep6_bed.tfrecords
Completed all threads in 5.011901617050171 seconds


In [70]:
df_list[0]

Unnamed: 0,chromosome,start,end,feature1,feature2,feature3,feature4,replica_id
0,1000000000,225474842,225475404,482.85414,-1.0,4.94451,275,rep1.bed
1,6000000000,53171796,53172335,479.69350,-1.0,4.94451,249,rep1.bed
2,18000000000,47749024,47749618,468.12633,-1.0,4.94451,294,rep1.bed
3,19000000000,33777938,33778481,466.57779,-1.0,4.94451,248,rep1.bed
4,10000000000,75235848,75236367,466.44577,-1.0,4.94451,244,rep1.bed
...,...,...,...,...,...,...,...,...
38095,6000000000,108498102,108498572,5.44968,-1.0,0.15573,235,rep1.bed
38096,2000000000,96676628,96677098,5.44936,-1.0,0.15576,235,rep1.bed
38097,3000000000,125585789,125586259,5.44907,-1.0,0.15574,235,rep1.bed
38098,1000000000,14924607,14925077,5.44882,-1.0,0.15568,235,rep1.bed
