# Recap train at scale

In [3]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
# First, check that you run this notebook with the correct taxifare-env kernel
import taxifare
taxifare.__file__

'/Users/bingobango/code/lewagon/data-recap-train-at-scale/taxifare/__init__.py'

In [10]:
# You should be able to load the following files
import os
from taxifare.params import *
data_processed_path_200k = os.path.join(LOCAL_DATA_PATH, "processed","processed_2009-01-01_2015-01-01_200k.csv")
data_processed_path_all = os.path.join(LOCAL_DATA_PATH, "processed","processed_2009-01-01_2015-01-01_all.csv")

<details>
    <summary markdown='span'>If files are missings</summary>

```bash
make reset_local_files_with_csv_solutions
```

# 1) Explain concepts of incremental fit by chunks

<img src='https://wagon-public-datasets.s3.amazonaws.com/data-science-images/07-ML-OPS/train_by_chunk.png'>

# 2) Explain code solution for `main_local.train()`

```python
def train(min_date:str = '2009-01-01', max_date:str = '2015-01-01') -> None:
    """
    Incremental train on the (already preprocessed) dataset locally stored.
    - Loading data chunk-by-chunk
    - Updating the weight of the model for each chunk
    - Saving validation metrics at each chunks, and final model weights on local disk
    """
    # ...
```

Let's launch a training by batch on 200k rows! (set DATA_SIZE='200k' in params.py)

In [5]:
from taxifare.interface.main_local import train
train()

2023-08-18 21:36:12.082131: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[34m
Loading TensorFlow...[0m

✅ TensorFlow loaded (0.0s)
[35m
 ⭐️ Use case: train in batches[0m
Training on preprocessed chunk n°0


2023-08-18 21:36:19.739861: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


✅ Model initialized
✅ Model compiled
[34m
Training model...[0m
Restoring model weights from the end of the best epoch: 8.
Epoch 10: early stopping
✅ Model trained on 900 rows with min val MAE: 8.53
8.527969360351562
Training on preprocessed chunk n°1
✅ Model compiled
[34m
Training model...[0m
Restoring model weights from the end of the best epoch: 1.
Epoch 3: early stopping
✅ Model trained on 900 rows with min val MAE: 9.59
9.591536521911621
Training on preprocessed chunk n°2
✅ Model compiled
[34m
Training model...[0m
Restoring model weights from the end of the best epoch: 56.
Epoch 58: early stopping
✅ Model trained on 900 rows with min val MAE: 7.65
7.649648666381836
Training on preprocessed chunk n°3
✅ Model compiled
[34m
Training model...[0m
Restoring model weights from the end of the best epoch: 1.
Epoch 3: early stopping
✅ Model trained on 900 rows with min val MAE: 6.14
6.135122299194336
Training on preprocessed chunk n°4
✅ Model compiled
[34m
Training model...[0m
Rest

# 3) 💻 Tensorflow tricks to partial fit without manual chunks


**📚Resources📚**
- tf CSV guide: https://www.tensorflow.org/guide/data#consuming_csv_data
- tf CSV tuto: https://www.tensorflow.org/tutorials/load_data/csv
- tf Datasets https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/data.ipynb#scrollTo=x5z5B11UjDTd

**Import packages**

In [11]:
import tensorflow as tf
from tensorflow import keras
from keras import Sequential, layers, regularizers
from keras.callbacks import EarlyStopping
import pandas as pd
import numpy as np

2023-08-21 16:39:59.351866: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


**Import model**

We'll copy paste it below to make it more explicit

In [7]:
def build_model():
    
    reg = regularizers.l1_l2(l2=0.005)

    model = Sequential()
    model.add(layers.Input(shape=(65,)))
    model.add(layers.Dense(100, activation="relu", kernel_regularizer=reg))
    model.add(layers.BatchNormalization(momentum=0.9))
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(50, activation="relu"))
    model.add(layers.BatchNormalization(momentum=0.9))  # use momentum=0 to only use statistic of the last seen minibatch in inference mode ("short memory"). Use 1 to average statistics of all seen batch during training histories.
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(1, activation="linear"))
    
    optimizer = keras.optimizers.Adam(learning_rate= 0.001)
    model.compile(loss="mean_squared_error", optimizer=optimizer, metrics=["mae"])
    
    return model


In [8]:
es = EarlyStopping(monitor="val_loss",
                       patience=2,
                       restore_best_weights=True,
                       verbose=0)

In [13]:
BATCH_SIZE=265

## 3.1) If data fit in memory 😇

In [15]:
df_small = pd.read_csv(data_processed_path_200k, header=None)
df_small

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,56,57,58,59,60,61,62,63,64,65
0,0.000000,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.200000
1,0.142857,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,11.800000
2,0.000000,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,26.600000
3,0.142857,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,19.799999
4,0.000000,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.400000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
88376,0.000000,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,14.500000
88377,0.142857,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.500000
88378,0.142857,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,74.160004
88379,0.000000,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.500000


In [11]:
features = df_small.drop(columns=[65]).to_numpy()
target = df_small[[65]].to_numpy()

In [12]:
print(features.shape)
print(target.shape)
n_samples = features.shape[0]
n_features = features.shape[1]

(88381, 65)
(88381, 1)


### a) passing numpy arrays

In [13]:
model = build_model()

model.fit(x=features, y=target, batch_size=BATCH_SIZE, validation_split=0.3, callbacks=[es], epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x18132ae60>

### b) passing `datasets` iterators

In [14]:
ds = tf.data.Dataset.from_tensor_slices((features, target))
ds = ds.batch(BATCH_SIZE)  # Set batch size

In [16]:
ds.element_spec

(TensorSpec(shape=(None, 65), dtype=tf.float64, name=None),
 TensorSpec(shape=(None, 1), dtype=tf.float64, name=None))

In [54]:
# First sample: feature_1, target_1
f1, t1 = next(iter(ds))
(f1.shape, t1.shape)

(TensorShape([265, 65]), TensorShape([265]))

In [22]:
f2, t2 = next(iter(ds))
(f2.shape, t2.shape)

(TensorShape([265, 65]), TensorShape([265, 1]))

In [23]:
model = build_model()
model.fit(ds, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x181873a30>

## 3.2) If data is too large to fit in memory ? 🧐 

💡 Use `make_csv_dataset` helper

More info on this tutorial https://www.tensorflow.org/tutorials/load_data/csv

In [None]:
# The differnce is we dont need to read in all the data in one csv
# We can make use of make_csv_dataset to create an iterable of csv's in real time of batch size
# We can then fit our model to these smaller csv's without RAM limitations

In [16]:
ds = tf.data.experimental.make_csv_dataset(
    data_processed_path_all,
    batch_size=BATCH_SIZE,
    header=False,
    column_names=list(df_small.columns),
    label_name=65,
    num_epochs=1,
    ignore_errors=True)

2023-08-21 16:41:06.955989: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [6]:
# ds.element_spec

We can now iterate on our dataset `ds` without ever loading all the CSV in memory!

In [17]:
feat1, target1 = next(iter(ds))

In [58]:
feat2, target2 = next(iter(ds))

In [7]:
# target1

In [8]:
# target2

Let's inspect the first element (feat1, target1)

👇 target1 is simply a 1D tensor that contains BATCH_SIZE prices

In [30]:
print('target1.shape: ', target1.shape)

target1.shape:  (265,)


👇 feat1 is a bit more complex, it's an ordered dict that contains N_FEAT=65 elements, each being a BATCH_SIZE = 256 1D vector

In [9]:
# target1

In [74]:
type(feat1)

collections.OrderedDict

In [23]:
print(type(feat1))
print(len(feat1))
print(feat1[0].shape)

<class 'collections.OrderedDict'>
65
(265,)


Let's rearrange it as a (BATCH_SIZE, N_FEAT) tensor as we are used to manipulate

In [24]:
def stack(x):
    return tf.stack([x[i] for i in range(65)], axis=1)

print(stack(feat1).shape)

(265, 65)


We can now `map` our dataset iterator with this transformation

In [79]:
ds = ds.map(lambda x,y: (stack(x),y))

In [81]:
ds.element_spec

(TensorSpec(shape=(None, 65), dtype=tf.float32, name=None),
 TensorSpec(shape=(None,), dtype=tf.float32, name=None))

In [41]:
ds.element_spec

(TensorSpec(shape=(None, 65), dtype=tf.float32, name=None),
 TensorSpec(shape=(None,), dtype=tf.float32, name=None))

And use it directly to train our model on the **full dataset**! 

We can train on TB size CSV without RAM limitation!

In [42]:
model = build_model()
model.fit(ds, epochs=5)

Epoch 1/5
   1670/Unknown - 26s 15ms/step - loss: 35.8838 - mae: 3.4139

KeyboardInterrupt: 