# Recap train at scale

# 1) Explain concepts of incremental fit by chunks

<img src='https://wagon-public-datasets.s3.amazonaws.com/data-science-images/07-ML-OPS/train_by_chunk.png'>

# 2) Explain code solution for `main_local.train()`

```python
def train():
    """
    Training on the full (already preprocessed) dataset, by loading it
    chunk-by-chunk, and updating the weight of the model for each chunks.
    Save model, compute validation metrics on a holdout validation set that is
    common to all chunks.
    """
    print("\n ⭐️ use case: train")

    # Validation Set: Load a validation set common to all chunks and create X_val, y_val
    data_val_processed_path = os.path.abspath(os.path.join(
        LOCAL_DATA_PATH, "processed", f"val_processed_{VALIDATION_DATASET_SIZE}.csv"))

    
    data_val_processed = pd.read_csv(
        data_val_processed_path,
        header=None,
        dtype=DTYPES_PROCESSED_OPTIMIZED
        ).to_numpy()

    X_val = data_val_processed[:, :-1]
    y_val = data_val_processed[:, -1]
    

    # Iterate on the full training dataset chunk per chunks.
    # Break out of the loop if you receive no more data to train upon!
    model = None
    chunk_id = 0
    metrics_val_list = []  # store each metrics_val_chunk

    while (True):
        print(f"loading and training on preprocessed chunk n°{chunk_id}...")

        # Load chunk of preprocess data and create (X_train_chunk, y_train_chunk)
        
        path = os.path.abspath(os.path.join(
            LOCAL_DATA_PATH, "processed", f"train_processed_{DATASET_SIZE}.csv"))

        try:
            data_processed_chunk = pd.read_csv(
                    path,
                    header=None,
                    skiprows=(chunk_id * CHUNK_SIZE),
                    nrows=CHUNK_SIZE,
                    dtype=DTYPES_PROCESSED_OPTIMIZED,
                    ).to_numpy()

        except pd.errors.EmptyDataError:
            data_processed_chunk = None  # end of data

        # Break out of while loop if we have no data to train upon
        if data_processed_chunk is None:
            break

        X_train_chunk = data_processed_chunk[:, :-1]
        y_train_chunk = data_processed_chunk[:, -1]
        

        # Train a model incrementally and print validation metrics for this chunk
        learning_rate = 0.001
        batch_size = 256
        
        if model is None:
            model = initialize_model(X_train_chunk)
            model = compile_model(model, learning_rate)

        model, history = train_model(model,
                                     X_train_chunk,
                                     y_train_chunk,
                                     batch_size,
                                     validation_data=(X_val, y_val))
        metrics_val_chunk = np.min(history.history['val_mae'])
        metrics_val_list.append(metrics_val_chunk)
        print(metrics_val_chunk)
        

        chunk_id += 1

    # Save model and training params
    params = dict(
        learning_rate=learning_rate,
        batch_size=batch_size,
        incremental=True,
        chunk_size=CHUNK_SIZE)

    metrics_val_mean_all_chunks = np.mean(np.array(metrics_val_list))
    metrics = dict(mean_val=metrics_val_mean_all_chunks)

    save_model(model, params=params, metrics=metrics)

    print("✅ model trained and saved")
```

# 3) 💻 Tensorflow tricks to partial fit without manual chunks


**📚Resources📚**
- tf CSV guide: https://www.tensorflow.org/guide/data#consuming_csv_data
- tf CSV tuto: https://www.tensorflow.org/tutorials/load_data/csv
- tf Datasets https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/data.ipynb#scrollTo=x5z5B11UjDTd

**Import packages**

In [61]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential, layers, regularizers
from tensorflow.keras.callbacks import EarlyStopping
import pandas as pd
import numpy as np
import os

In [62]:
import taxifare
deepCab.__file__

'/home/juan-garassino/code/juan-garassino/MLops-taxiFare/taxifare/__init__.py'

In [63]:
#from deepCab.ml_logic.params import LOCAL_DATA_PATH

In [64]:
data_processed_path_small = "/home/juan-garassino/code/juan-garassino/MLops-taxiFare/taxifare/data/processed/train_processed_10k.csv"#os.path.join(LOCAL_DATA_PATH, "processed","train_processed_10K.csv")
#data_processed_path = os.path.join(LOCAL_DATA_PATH, "processed","train_processed_500K.csv")

In [65]:
data_processed_path_small

'/home/juan-garassino/code/juan-garassino/MLops-taxiFare/taxifare/data/processed/train_processed_10k.csv'

**Import model**

We'll copy paste it below to make it more explicit

In [66]:
BATCH_SIZE=265

In [67]:
def build_model():
    
    reg = regularizers.l1_l2(l2=0.005)
    model = Sequential()
    model.add(layers.BatchNormalization())
    model.add(layers.Dense(100, activation="relu", kernel_regularizer=reg))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(50, activation="relu", kernel_regularizer=reg))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(10, activation="relu"))
    model.add(layers.BatchNormalization(momentum=0.99))  # use momentum=0 for to only use statistic of the last seen minibatch in inference mode ("short memory"). Use 1 to average statistics of all seen batch during training histories.
    model.add(layers.Dropout(rate=0.1))
    model.add(layers.Dense(1, activation="linear"))
    
    optimizer = keras.optimizers.Adam(learning_rate= 0.001)
    model.compile(loss="mean_squared_error", optimizer=optimizer, metrics=["mae"])
    
    return model


In [68]:
es = EarlyStopping(monitor="val_loss",
                       patience=2,
                       restore_best_weights=True,
                       verbose=0)

## 3.1) If data fit in memory 😇

In [71]:
df_small = pd.read_csv(data_processed_path_small)
df_small.head(3)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,56,57,58,59,60,61,62,63,64,65
0,0.142857,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,41.299999
1,0.142857,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
2,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0


In [72]:
features = df_small.drop(columns=['65']).to_numpy()
target = df_small[['65']].to_numpy()

In [73]:
print(features.shape)
print(target.shape)
n_samples = features.shape[0]
n_features = features.shape[1]

(9765, 65)
(9765, 1)


### a) passing numpy arrays

In [74]:
model = build_model()

model.fit(x=features, y=target, batch_size=BATCH_SIZE, validation_split=0.3, callbacks=[es], epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10


<keras.callbacks.History at 0x7f7345d9efd0>

### b) passing `datasets`

In [75]:
ds = tf.data.Dataset.from_tensor_slices((features, target))
ds = ds.batch(BATCH_SIZE)  # Set batch size

In [76]:
ds.element_spec

(TensorSpec(shape=(None, 65), dtype=tf.float64, name=None),
 TensorSpec(shape=(None, 1), dtype=tf.float64, name=None))

In [77]:
# First sample: feature_1, target_1
f1, t1 = next(iter(ds))
(f1.shape, t1.shape)

2022-08-22 17:35:09.069752: W tensorflow/core/data/root_dataset.cc:247] Optimization loop failed: CANCELLED: Operation was cancelled


(TensorShape([265, 65]), TensorShape([265, 1]))

In [78]:
model = build_model()
model.fit(ds, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f7346b11700>

## 3.2) If data is too large to fit in memory ? 🧐 

💡 Use `make_csv_dataset` helper

More info on this tutorial https://www.tensorflow.org/tutorials/load_data/csv

In [79]:
ds = tf.data.experimental.make_csv_dataset(
    data_processed_path_small, #data_processed_path,
    batch_size=BATCH_SIZE,
    header=False,
    column_names=list(df_small.columns),
    label_name='65',
    num_epochs=1,
    ignore_errors=True)

In [39]:
ds.element_spec

(OrderedDict([('0', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('1', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('2', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('3', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('4', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('5', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('6', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('7', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('8', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('9', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('10', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('11', TensorSpec(shape=(None,), dtype=tf.float32, name=None)),
              ('12', TensorSpec(shape=(None,), dtype=tf.float32, name=None

In [80]:
feat1, target1 = next(iter(ds))

In [84]:
feat1['0']

<tf.Tensor: shape=(265,), dtype=float32, numpy=
array([0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.14285715,
       0.14285715, 0.14285715, 0.        , 0.        , 0.        ,
       0.        , 0.5714286 , 0.        , 0.        , 0.14285715,
       0.5714286 , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.2857143 , 0.71428573, 0.        , 0.2857143 ,
       0.14285715, 0.        , 0.        , 0.2857143 , 0.        ,
       0.2857143 , 0.2857143 , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.14285715, 0.        , 0.        ,
       0.        , 0.14285715, 0.14285715, 0.5714286 , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.14285715, 0.        , 0.        , 0.        ,
       0.14285715, 0.        , 0.        , 0.        , 0.5714286 ,
       0.     

In [57]:
feat1['0']

<tf.Tensor: shape=(265,), dtype=float32, numpy=
array([0.        , 0.        , 0.14285715, 0.        , 0.        ,
       0.14285715, 0.        , 0.        , 0.        , 0.        ,
       0.5714286 , 0.        , 0.42857143, 0.5714286 , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.14285715, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.14285715, 0.        ,
       0.71428573, 0.14285715, 0.        , 0.        , 0.14285715,
       0.        , 0.5714286 , 0.14285715, 0.14285715, 0.        ,
       0.        , 0.        , 0.        , 0.14285715, 0.        ,
       0.        , 0.        , 0.        , 0.5714286 , 0.        ,
       0.2857143 , 0.        , 0.5714286 , 0.        , 0.        ,
       0.42857143, 0.        , 0.        , 0.14285715, 0.14285715,
       0.        , 0.        , 0.        , 0.        , 0.14285715,
       0.14285715, 0.2857143 , 0.2857143 , 0.        , 0.        ,
       0.14285

In [58]:
feat1['1']

<tf.Tensor: shape=(265,), dtype=float32, numpy=
array([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 0., 1., 0., 0., 0., 0.,
       0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
       0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.,
       1., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
       1., 1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
       0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.,
       1., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0

In [60]:
feat1['64']

<tf.Tensor: shape=(265,), dtype=float32, numpy=
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0

In [42]:
def stack(x):
    return tf.stack([x[f'{i}'] for i in range(65)], axis=1)

stack(feat1).shape

TensorShape([265, 65])

In [43]:
ds = ds.map(lambda x,y: (stack(x),y))

In [88]:
x=stack(feat1)

y=target1

In [89]:
model = build_model()
model.fit(x,y, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f73bbc4bdf0>