# Transforming files from CVS to TFRecords
Here we are transforming our test and training data to TFRecords, so that we can use it for model training. TFRecords' binary format is more efficient to read than the text in the original csv file, so this helps our model be more efficient.

In [8]:
# Necessary imports

import pandas as pd
import numpy as np
import tensorflow as tf

In [3]:
import os, gc
import contextlib2
from object_detection.dataset_tools import tf_record_creation_util

from tqdm import tqdm
tqdm.pandas()

In [4]:
#!pip install object_detection

In [5]:
#!pip install contextlib2

In [7]:
# Loading in the train data

df_train = pd.read_csv('train_data_QUICK_START.csv')
print(df_train.shape)
df_train.head()

(335616, 416)


Unnamed: 0,sequence_id,sequence,experiment_type,dataset_name,reactivity_0001,reactivity_0002,reactivity_0003,reactivity_0004,reactivity_0005,reactivity_0006,...,reactivity_error_0197,reactivity_error_0198,reactivity_error_0199,reactivity_error_0200,reactivity_error_0201,reactivity_error_0202,reactivity_error_0203,reactivity_error_0204,reactivity_error_0205,reactivity_error_0206
0,0000d87cab97,GGGAACGACUCGAGUAGAGUCGAAAAAGAUCGCCACGCACUUACGA...,2A3_MaP,DasLabBigLib_OneMil_RFAM_windows_100mers_2A3,,,,,,,...,,,,,,,,,,
1,0000d87cab97,GGGAACGACUCGAGUAGAGUCGAAAAAGAUCGCCACGCACUUACGA...,DMS_MaP,DasLabBigLib_OneMil_RFAM_windows_100mers_DMS,,,,,,,...,,,,,,,,,,
2,0001ca9d21b0,GGGAACGACUCGAGUAGAGUCGAAAAGGUGGCCGGCAGAAUCGCGA...,2A3_MaP,DasLabBigLib_OneMil_OpenKnot_Round_2_train_2A3,,,,,,,...,,,,,,,,,,
3,0001ca9d21b0,GGGAACGACUCGAGUAGAGUCGAAAAGGUGGCCGGCAGAAUCGCGA...,DMS_MaP,DasLabBigLib_OneMil_OpenKnot_Round_2_train_DMS,,,,,,,...,,,,,,,,,,
4,00021f968267,GGGAACGACUCGAGUAGAGUCGAAAACAUUGUUAAUGCCUAUAUUA...,2A3_MaP,DasLabBigLib_OneMil_Replicates_from_previous_l...,,,,,,,...,,,,,,,,,,


In [9]:
# Creating a dictionary for encoding the RNA sequence letters into numbers 

train = df_train.sequence.to_numpy()
encoding_dict = {'A':1, 'C': 2, 'G': 3, 'U': 4}
encoding_dict

{'A': 1, 'C': 2, 'G': 3, 'U': 4}

In [10]:
# Kernel links we used in creating this notebook:
# https://www.kaggle.com/code/irohith/aslfr-preprocess-dataset/notebook
# https://www.kaggle.com/code/konstantinboyko/convert-original-csv-file-to-tfrecord

In [15]:
# Adding the 206 column names for reactivity and reactivity errors into lists, to later use the columns as a whole

lo_react_cols = df_train.filter(like='reactivity_0').columns
lo_error_cols = df_train.filter(like='reactivity_error_0').columns

lst_react_cols = lo_react_cols.to_list()
lst_error_cols = lo_error_cols.to_list()

In [16]:
print(lst_react_cols)
print(lst_error_cols)

['reactivity_0001', 'reactivity_0002', 'reactivity_0003', 'reactivity_0004', 'reactivity_0005', 'reactivity_0006', 'reactivity_0007', 'reactivity_0008', 'reactivity_0009', 'reactivity_0010', 'reactivity_0011', 'reactivity_0012', 'reactivity_0013', 'reactivity_0014', 'reactivity_0015', 'reactivity_0016', 'reactivity_0017', 'reactivity_0018', 'reactivity_0019', 'reactivity_0020', 'reactivity_0021', 'reactivity_0022', 'reactivity_0023', 'reactivity_0024', 'reactivity_0025', 'reactivity_0026', 'reactivity_0027', 'reactivity_0028', 'reactivity_0029', 'reactivity_0030', 'reactivity_0031', 'reactivity_0032', 'reactivity_0033', 'reactivity_0034', 'reactivity_0035', 'reactivity_0036', 'reactivity_0037', 'reactivity_0038', 'reactivity_0039', 'reactivity_0040', 'reactivity_0041', 'reactivity_0042', 'reactivity_0043', 'reactivity_0044', 'reactivity_0045', 'reactivity_0046', 'reactivity_0047', 'reactivity_0048', 'reactivity_0049', 'reactivity_0050', 'reactivity_0051', 'reactivity_0052', 'reactivity

In [17]:
# Deleting the now redundant columns
del lo_error_cols, lo_react_cols

In [22]:
# Adding all the reactivities and errors into two big columns
# Now there is one big list for each row including the columns of reactivities, the same for reactivity errors
df_train['reactivity'] = df_train[lst_react_cols].values.tolist()
df_train['error'] = df_train[lst_error_cols].values.tolist()

In [23]:
# Example of the 'reactivity' column
print(df_train['reactivity'][0])

[nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, 0.023, -0.039, 0.114, 0.019, 0.171, -0.162, 0.094, 0.0, 0.247, -0.44, 0.718, 0.361, 0.094, 0.38, 0.749, 0.468, 0.571, 2.759, 1.37, -0.23, -0.804, -0.002, 0.245, -0.023, -0.04, 0.076, -0.039, 0.133, -0.287, 0.227, 0.228, -0.153, 0.0, 0.057, 0.0, -0.134, 0.0, 0.0, 0.571, 0.571, 0.352, 0.751, 0.437, 0.19, 0.114, 0.076, 0.19, 0.076, 0.019, 0.247, 1.389, 0.571, 1.427, 1.269, 0.272, 0.037, 1.046, 1.484, 0.856, 0.0, -0.439, -0.096, 0.057, 0.305, -0.096, -0.02, -0.02, 0.375, 0.788, 0.266, -0.038, -0.345, -0.65, -1.223, -0.096, 0.057, 0.306, 0.241, 0.368, -0.478, -0.153, 1.087, 1.213, 0.693, 0.057, 0.457, 1.673, 0.328, 1.139, 0.633, 0.724, 1.201, 0.648, 1.201, 2.153, 0.36, 0.876, 0.298, 0.355, 0.074, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan

In [24]:
# Dropping the previous separate columns for reactivity/errors

df_train.drop(columns=lst_react_cols,inplace=True)
df_train.drop(columns=lst_error_cols,inplace=True)

In [25]:
# Getting indexes for all the columns

all_cols = df_train.columns

idx_seq_id          = all_cols.get_loc('sequence_id')
idx_sequence        = all_cols.get_loc('sequence')
idx_dataset_name    = all_cols.get_loc('dataset_name')
idx_reactivity      = all_cols.get_loc('reactivity')
idx_error           = all_cols.get_loc('error')

In [26]:
# Separating the DMS_Map and 2A3_Map rows into two separate datasets

df_DMS = df_train.loc[df_train.experiment_type=='DMS_MaP'].reset_index(drop=True)
df_2A3 = df_train.loc[df_train.experiment_type=='2A3_MaP'].reset_index(drop=True)

# Delete the original dataset, now we have two separate ones instead !
del df_train

# Garbage collector time ! Free up memory no longer in use
gc.collect()

0

In [29]:
# Functions for converting dataset values into TensorFlow features, to prepare data for storing it in a TFRecord file

def int64_feature(value):
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def int64_list_feature(value):
  return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

def bytes_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def bytes_list_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

def float_feature(value):
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def float_list_feature(value):
  return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def string_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value.encode()]))

def string_list_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=value.encode()))

In [30]:
# Function for creating a tf.train.Example message for writing it into a TFRecord file later

def serialize_row(row_2A3, row_DMS):
  
    # Check if the row indexes and sequence indexes are matching 
    assert row_2A3[idx_seq_id] == row_DMS[idx_seq_id] and row_2A3[idx_sequence] == row_DMS[idx_sequence]
    
    # Create a dictionary mapping the feature name to the tf.train.Example-compatible data type
    feature = {
        "id": string_feature(row_2A3[idx_seq_id]),
        "seq": float_list_feature([encoding_dict[char] for char in row_2A3[idx_sequence]]), 

        "dataset_name_2A3": string_feature(row_2A3[idx_dataset_name]),
        "reactivity_2A3": float_list_feature(row_2A3[idx_reactivity]),
        "error_2A3": float_list_feature(row_2A3[idx_error]),

        "dataset_name_DMS": string_feature(row_DMS[idx_dataset_name]),
        "reactivity_DMS": float_list_feature(row_DMS[idx_reactivity]),
        "error_DMS": float_list_feature(row_DMS[idx_error])
        }

    # Create a Features message using tf.train.Example
    features=tf.train.Features(feature=feature)

    serialized_row = tf.train.Example(features=features)
    return serialized_row

In [31]:
# Imports for creating TFRecords of TF examples
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function


def open_sharded_output_tfrecords(exit_stack, base_path, num_shards):
    # Opens all TFRecord shards for writing and adds them to an exit stack.

    # exit_stack: A context2.ExitStack used to automatically close the TFRecords opened in this function.
    # base_path: The base folder for all the TFRecord files
    # num_shards: The number of shards
  
    # Creating a proper filename to keep consistency in the folder
    tf_record_filename = ['{}/{:03d}.tfrecord'.format(base_path, idx) for idx in range(num_shards)]

    # options = tf.io.TFRecordOptions(compression_type="GZIP")
    tfrecords = [
        exit_stack.enter_context(tf.io.TFRecordWriter(file_name, options="GZIP"))
            for file_name in tf_record_filename 
    ]
    
    # Returns the list of TFRecords
    return tfrecords

In [37]:
# Setting the number of shards (we used 164) and a folder pathname for TFRecord files
num_shards=164
file_folder_path='tfrfile'

# Creating the TFRecord files
with contextlib2.ExitStack() as tf_record_close_stack:
    output_tfrecords = open_sharded_output_tfrecords(tf_record_close_stack, file_folder_path, num_shards)
    for index, (row_2A3, row_DMS) in enumerate(zip(df_2A3.itertuples(index=False), df_DMS.itertuples(index=False))):
        tf_example = serialize_row(row_2A3, row_DMS)
        shard_index = index % num_shards
        output_tfrecords[shard_index].write(tf_example.SerializeToString())



In [38]:
# Function for checking the insides of a tfrecord file 

def parse_tfrecord(tfrecord_file):
    raw_dataset = tf.data.TFRecordDataset(tfrecord_file, compression_type=None,)

    # Read only the first 5 records, since there are a lot of them there
    for raw_record in raw_dataset.take(5):  
        example = tf.train.Example()
        example.ParseFromString(raw_record.numpy())
        # Print the parsed example 
        print(example)  

# Test the function on 001.tfrecord
parse_tfrecord('tfrfile/001.tfrecord')

DataLossError: {{function_node __wrapped__IteratorGetNext_output_types_1_device_/job:localhost/replica:0/task:0/device:CPU:0}} corrupted record at 0 (Is this even a TFRecord file?) [Op:IteratorGetNext] name: 