In [None]:
import tensorflow as tf
import numpy as np
import csv
import time
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence
import os
import zipfile

try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

# Large Structured Data

When dealing with large amounts of structured text data we can also do some stuff to speed things up, though there are some key differences that lessen our toolkit:
<ul>
<li> Data manipulation (filtering out features, customized data cleanup, etc...) is far easier to do in a tabular format like a dataframe. If there is going to be a lot of that, and the data is really large, we can do the 'manual' prep separately, write the data to a file, and then read it in again as ready-to-use data. </li>
<li> As the data gets really large, most real life scenarios will either use big data approaches like Spark, or store structured data in a DB. That's the 'real' way to deal with large amounts of strucutred data, so there are not as many easy to use tools for this as we find with images. </li>
<li> Further to the two ponts above, if the dataset is a CSV, we can likely load it into memory in its entirety as "too many rows to fit in memory" and "this data is stored in a CSV file" tend not to come around together all that often in a situation where there is actual infrastructure. </li>
<li> Really large amounts of text can be broken into multiple smaller files, then we load a file at a time, similar to how we deal with images. This is common with NLP text, much more so than structured data. </li>
<li> There is often an assumption that when needing to deal with large amounts of structured CSV data that we have the data already split into training and validation sets. This makes sense, as 
</ul>

On the whole, dealing with large amounts of structured data tends to not be as large of an issue to be solved as dealing with large amounts of unstrucutured data in a non-big data environment. This is because huge data goes to big data strategies, or at least a DB, less huge data can just fit in memory and be dealt with how we have dealt with all other CSV based data. 

## TensorFlow Datasets

Tensorflow Datasets are something that we used when loading image files from disk, as loading all of the data at once can be impossible for larger datasets. These datasets serve the same function as a regular dataframe for modle training purposes, but they are designed more to be able to efficiently load large amounts of data from disk than to allow easy viewing and manipulation of the data. 

Tensorflow datasets allow us to set several options on how the data is loaded, that we can use to make a dataset that is more efficient for our purposes.

## Dataset for Structured CSV

The function below reads CSV data from disk and generates training and validation datasets that we can feed to our model. We also add batching, shuffle the training data, and use prefetch to make the data loading process more efficient.

In [8]:
# Define a function to load the CSV data and create a tf.data.Dataset object
def create_dataset(csv_path, batch_size=32, buffer_size=1024, validation_split=0.2, shuffle=True, start=None):
    # Load the CSV data
    with open(csv_path) as f:
        csv_reader = csv.reader(f)
        header = next(csv_reader)
        feature_names = header[start:-1]
        label_name = header[-1]
        features = []
        labels = []
        for row in csv_reader:
            features.append([float(x) for x in row[start:-1]])
            labels.append(float(row[-1]))
    features = np.array(features)
    labels = np.array(labels)

    # Split the data into training and validation sets
    split_idx = int(len(features) * (1.0 - validation_split))
    train_features, train_labels = features[:split_idx], labels[:split_idx]
    val_features, val_labels = features[split_idx:], labels[split_idx:]

    # Create a tf.data.Dataset object for the training data
    train_ds = tf.data.Dataset.from_tensor_slices((train_features, train_labels))
    train_ds = train_ds.cache()
    if shuffle:
        train_ds = train_ds.shuffle(buffer_size=buffer_size)
    train_ds = train_ds.batch(batch_size)

    # Create a tf.data.Dataset object for the validation data
    val_ds = tf.data.Dataset.from_tensor_slices((val_features, val_labels)).prefetch(tf.data.experimental.AUTOTUNE)
    val_ds = val_ds.cache()
    val_ds = val_ds.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    return train_ds, val_ds

BASE_EPOCHS  = 20
VAL_SPLIT = 0.2
DIABETES_CSV_PATH = 'diabetes.csv'
BATCH_SIZE = 2048

In [9]:
if not os.path.exists(DIABETES_CSV_PATH):
    url = 'https://jrssbcrsefilesnait.blob.core.windows.net/3950data1/diabetes.csv'
    d_path = tf.keras.utils.get_file(origin=url, extract=True, archive_format='auto')
    print(d_path)

/Users/akeems/.keras/datasets/diabetes.csv


#### Simple and Small Example

We can test the generator on a small file. 

<b>Note:</b> with small examples, we won't really see any advantage in terms of speed as we can probably just load the data into memory without concern, no matter what. This starts to matter more when dealing with large files, where the disk access time can actually add up. 

In [10]:
# Load the CSV data and create the tf.data.Dataset objects
train_ds, val_ds = create_dataset(d_path, batch_size=BATCH_SIZE)

# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics="accuracy")

# Fit the model to the data
start = time.time()
model.fit(train_ds, epochs=BASE_EPOCHS, validation_data=val_ds)
end = time.time()
print("DS Training time: {} seconds".format(end - start))

Epoch 1/20


2023-04-04 11:01:43.272105: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
DS Training time: 2.965869188308716 seconds


In [11]:
# Time dataframe for comparison
df_small = pd.read_csv(d_path)
df_small_y = df_small["Outcome"]
df_small_X = df_small.drop(columns=["Outcome"])
width = df_small_X.shape[1]
# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(width,)),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model

model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics="accuracy")
# Fit the model to the data
start = time.time()
model.fit(x=df_small_X, y=df_small_y, epochs=BASE_EPOCHS, validation_split=0.2, batch_size=BATCH_SIZE)
end = time.time()
print("DS Training time: {} seconds".format(end - start))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
DS Training time: 2.6833109855651855 seconds


#### Larger Example

We can download a larger file, and try it out. We ill also use the .cache() method to cache the data in memory, so that we don't have to reload it every time we run the code. This CSV file is roughly 150mb in size, so it is large enough to be noticable when we need to load the entire thing, but small enough to fit in memory. For most CSV data that we might encounter, this is probably a good approach - most systems can handle the memory demands of the CSV file size we might see. 

In [12]:
# Download the file

zip_name = 'fraud.zip'
if not os.path.exists(zip_name):
    url = 'https://jrssbcrsefilesnait.blob.core.windows.net/3950data1/creditcard.csv'
    zip_path = tf.keras.utils.get_file(origin=url, extract=True, archive_format='auto')
    print(zip_path)

/Users/akeems/.keras/datasets/creditcard.csv


In [13]:
#big_file = "/Users/akeems/.keras/datasets/creditcard.csv"
big_file = zip_path
# Load the CSV data and create the tf.data.Dataset objects
train_ds, val_ds = create_dataset(big_file, start=1, batch_size=BATCH_SIZE)

# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics="accuracy")

# Fit the model to the data
# time the fit

start = time.time()
model.fit(train_ds, epochs=BASE_EPOCHS, validation_data=val_ds)
end = time.time()
print("Time to fit: ", end - start)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Time to fit:  250.69341111183167


#### Dataframe for Comparison

In [14]:
df_large = pd.read_csv(big_file)
df_large_y = df_large["Class"]
df_large_X = df_large.drop(columns={"Class"})
width = df_large_X.shape[1]
# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(width,)),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(),
                loss=tf.keras.losses.BinaryCrossentropy(),
                metrics="accuracy")

# Fit the model to the data
# time the fit
start = time.time()
model.fit(x=df_large_X, y=df_large_y, epochs=BASE_EPOCHS, validation_split=VAL_SPLIT, batch_size=BATCH_SIZE)
end = time.time()
print("Time to fit: ", end - start)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Time to fit:  267.8153579235077


### Dataframe to Dataset

If we have a dataframe we can convert it to a dataset using the from_tensor_slices() method. Manipulating the data in a dataframe is far easier, so we can prep in a df then convert to a dataset. The function below creates a dataset from a dataframe, long with a few of the other things we commonly want to do in our data prep. 

In [15]:
def get_keras_dataset(df, target="target", val_split=0.2, batch_size=32):
    # Splitting the dataframe into training and validation sets
    train_df, val_df = train_test_split(df, test_size=val_split, random_state=42)

    # Extracting the target variable from the dataframes
    train_y = train_df.pop(target)
    val_y = val_df.pop(target)

    # Converting the target variable to categorical if necessary
    num_classes = len(train_y.unique())
    if num_classes > 2:
        train_y = to_categorical(train_y, num_classes)
        val_y = to_categorical(val_y, num_classes)

    # Creating a tf.data.Dataset for training and validation sets
    train_ds = tf.data.Dataset.from_tensor_slices((train_df.values, train_y))
    val_ds = tf.data.Dataset.from_tensor_slices((val_df.values, val_y))

    train_ds =train_ds.cache()
    val_ds = val_ds.cache()

    # Shuffling and batching the datasets
    train_ds = train_ds.shuffle(len(train_df)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
    val_ds = val_ds.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    return train_ds, val_ds

In [16]:
train_ds_df, val_ds_df = get_keras_dataset(df_large, target="Class", val_split=VAL_SPLIT, batch_size=BATCH_SIZE)

# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics="accuracy")

# Fit the model to the data
# time the fit

start = time.time()
model.fit(train_ds_df, epochs=BASE_EPOCHS, validation_data=val_ds_df)
end = time.time()
print("Time to fit: ", end - start)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Time to fit:  348.6873939037323


### Polars DataFrame

We can also use the faster and more efficient Polars DataFrame to load the data. This is a DataFrame that is written in Rust, and is much faster than Pandas. Polars dataframes aren't promised to be a one-to-one replacement for Pandas, but they are very similar, and can be used in most cases where Pandas is used with few, if any, changes.

#### Polars Specifics

Polars offers a fair bit of stuff for performance, as that is it's main selling point. Among them:
<ul>
<li> Low memory parameter - this will try to load the data in a way that uses less memory, but may be slower. </li>
<li> Lazy execution - Polars has options to work lazily, which means that it won't actually do work like loading data until it is needed. </li>
<li> Parallel execution - Polars can use multiple threads to do work, which can speed things up. </li>
</ul>

In [17]:
if IN_COLAB:
  !pip install polars
import polars as pl

In [18]:
# Read file at zip path into a polars dataframe
df_polar = pl.read_csv(zip_path, ignore_errors=True, low_memory=True)
df_polar.head()

Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64
0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,0.090794,-0.5516,-0.617801,-0.99139,-0.311169,1.468177,-0.470401,0.207971,0.025791,0.403993,0.251412,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,-0.166974,1.612727,1.065235,0.489095,-0.143772,0.635558,0.463917,-0.114805,-0.183361,-0.145783,-0.069083,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
1,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,0.207643,0.624501,0.066084,0.717293,-0.165946,2.345865,-2.890083,1.109969,-0.121359,-2.261857,0.52498,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
1,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,-0.054952,-0.226487,0.178228,0.507757,-0.287924,-0.631418,-1.059647,-0.684093,1.965775,-1.232622,-0.208038,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
2,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,0.753074,-0.822843,0.538196,1.345852,-1.11967,0.175121,-0.451449,-0.237033,-0.038195,0.803487,0.408542,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


### Using Polars

Polars doesn't have the same native support in TensorFlow as Pandas does, so we need to convert it to a Pandas dataframe or an array to feed it into any models. One thing that may be useful with Polars would be to split a very large csv into multiple smallers ones, that could then be loaded one at a time. Something like the function below could be adapted to load a csv into a Polars dataframe, do whatever data manipulation is needed, then write it out to several smaller csv files. The make_csv_dataset is able to natively read in multiple csv files. 

<b>Note:</b> If we were actually doing something like this, it is likely easier to do a train-validation-test split as we write the output into different subfolders. Manipulating data is easier in a dataframe than a dataset. 

In [None]:
if IN_COLAB:
  !mkdir polar_out
  !mkdir polar_out/train
  !mkdir polar_out/val

In [19]:
i = 0
split = 0.2
#Shuffle data. Takes a peak of 2x memory to do so. 
df_polar = df_polar.sample(frac=1.0)
for frame in df_polar.iter_slices(n_rows=BATCH_SIZE):
    record_batch = frame
    if i % 5 == 0:
        fname = "polar_out/val/data_{}.csv".format(i)
    else:
        fname = "polar_out/train/data_{}.csv".format(i)
    record_batch.write_csv(fname)
    i += 1

#### Read Folder

We can create datasets from a folder of csv files. This is useful if we have a large csv file that we have split into multiple smaller ones. We can utilize any of the tuning things like cache and batch size to control the memory usage. 

In [20]:
split_ds = tf.data.experimental.make_csv_dataset(
            file_pattern = "polar_out/train/*.csv",
            batch_size=64, 
            num_epochs=BASE_EPOCHS,
            num_parallel_reads=20,
            shuffle_buffer_size=10000)

## Generators

We can also make a generator for the above files. This one loads each file as one batch, so it fits with the idea above on embedding all the processing into the step that writes the files. 

In [21]:
class CSVGenerator(Sequence):
    def __init__(self, folder_path, shuffle=True):
        self.folder_path = folder_path
        self.shuffle = shuffle
        self.files = sorted(os.listdir(self.folder_path))
        self.indexes = np.arange(len(self.files))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
        file_path = os.path.join(self.folder_path, self.files[index])
        data = pd.read_csv(file_path)
        X = data.iloc[:, :-1].values
        y = data.iloc[:, -1].values
       
        return X, y



In [22]:
train_generator = CSVGenerator("polar_out/train")
val_generator = CSVGenerator("polar_out/val")

In [23]:
# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics="accuracy")

# Fit the model to the data
# time the fit

start = time.time()
model.fit(train_generator, epochs=BASE_EPOCHS, validation_data=val_generator, steps_per_epoch=len(train_generator))
end = time.time()
print("Time to fit: ", end - start)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Time to fit:  65.70329189300537
