### E2E Pipeline

In [1]:
import logging
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras import callbacks, models
from tensorflow.keras.layers import (
    Concatenate,
    Dense,
    Discretization,
    Embedding,
    Flatten,
    Input,
    Lambda,
)
from tensorflow.keras.layers.experimental.preprocessing import HashedCrossing

logging.info(tf.version.VERSION)

CSV_COLUMNS = [
    "fare_amount",
    "pickup_datetime",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude",
    "passenger_count",
    "key",
]

LABEL_COLUMN = "fare_amount"
DEFAULTS = [[0.0], ["na"], [0.0], [0.0], [0.0], [0.0], [0.0], ["na"]]
UNWANTED_COLS = ["pickup_datetime", "key"]

INPUT_COLS = [
    c for c in CSV_COLUMNS if c != LABEL_COLUMN and c not in UNWANTED_COLS
]

def features_and_labels(row_data):
    for unwanted_col in UNWANTED_COLS:
        row_data.pop(unwanted_col)
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label


def load_dataset(pattern, batch_size, num_repeat):
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS,
        num_epochs=num_repeat,
        shuffle_buffer_size=1000000,
    )
    return dataset.map(features_and_labels)


def create_train_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=None)
    return dataset.prefetch(1)


def create_eval_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=1)
    return dataset.prefetch(1)


def euclidean(params):
    lon1, lat1, lon2, lat2 = params
    londiff = lon2 - lon1
    latdiff = lat2 - lat1
    return tf.sqrt(londiff * londiff + latdiff * latdiff)


def scale_longitude(lon_column):
    return (lon_column + 78) / 8.0


def scale_latitude(lat_column):
    return (lat_column - 37) / 8.0


def transform(inputs, nbuckets):
    transformed = {}

    transformed["scaled_plon"] = Lambda(scale_longitude, name="scale_plon")(
        inputs["pickup_longitude"]
    )
    transformed["scaled_dlon"] = Lambda(scale_longitude, name="scale_dlon")(
        inputs["dropoff_longitude"]
    )

    transformed["scaled_plat"] = Lambda(scale_latitude, name="scale_plat")(
        inputs["pickup_latitude"]
    )
    transformed["scaled_dlat"] = Lambda(scale_latitude, name="scale_dlat")(
        inputs["dropoff_latitude"]
    )

    transformed["euclidean_distance"] = Lambda(euclidean, name="euclidean")(
        [
            inputs["pickup_longitude"],
            inputs["pickup_latitude"],
            inputs["dropoff_longitude"],
            inputs["dropoff_latitude"],
        ]
    )

    latbuckets = np.linspace(start=0.0, stop=1.0, num=nbuckets).tolist()
    lonbuckets = np.linspace(start=0.0, stop=1.0, num=nbuckets).tolist()

    plon = Discretization(lonbuckets, name="plon_bkt")(
        transformed["scaled_plon"]
    )
    plat = Discretization(latbuckets, name="plat_bkt")(
        transformed["scaled_plat"]
    )
    dlon = Discretization(lonbuckets, name="dlon_bkt")(
        transformed["scaled_dlon"]
    )
    dlat = Discretization(latbuckets, name="dlat_bkt")(
        transformed["scaled_dlat"]
    )

    p_fc = HashedCrossing(num_bins=nbuckets * nbuckets, name="p_fc")(
        (plon, plat)
    )
    d_fc = HashedCrossing(num_bins=nbuckets * nbuckets, name="d_fc")(
        (dlon, dlat)
    )
    pd_fc = HashedCrossing(num_bins=nbuckets**4, name="pd_fc")((p_fc, d_fc))

    transformed["pd_embed"] = Flatten()(
        Embedding(input_dim=nbuckets**4, output_dim=10, name="pd_embed")(
            pd_fc
        )
    )

    transformed["passenger_count"] = inputs["passenger_count"]

    return transformed


def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


def build_dnn_model(nbuckets, nnsize, lr):
    inputs = {
        colname: Input(name=colname, shape=(1,), dtype="float32")
        for colname in INPUT_COLS
    }

    transformed = transform(inputs, nbuckets)
    dnn_inputs = Concatenate()(transformed.values())

    x = dnn_inputs
    for layer, nodes in enumerate(nnsize):
        x = Dense(nodes, activation="relu", name=f"h{layer}")(x)
    output = Dense(1, name="fare")(x)

    model = models.Model(inputs, output)
    lr_optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    model.compile(optimizer=lr_optimizer, loss="mse", metrics=[rmse, "mse"])

    return model


def train_and_evaluate(hparams):
    batch_size = hparams["batch_size"]
    nbuckets = hparams["nbuckets"]
    lr = hparams["lr"]
    nnsize = [int(s) for s in hparams["nnsize"].split()]
    eval_data_path = hparams["eval_data_path"]
    num_evals = hparams["num_evals"]
    num_examples_to_train_on = hparams["num_examples_to_train_on"]
    output_dir = hparams["output_dir"]
    train_data_path = hparams["train_data_path"]

    model_export_path = os.path.join(output_dir, "savedmodel")
    checkpoint_path = os.path.join(output_dir, "checkpoints")
    tensorboard_path = os.path.join(output_dir, "tensorboard")

    if tf.io.gfile.exists(output_dir):
        tf.io.gfile.rmtree(output_dir)

    model = build_dnn_model(nbuckets, nnsize, lr)
    logging.info(model.summary())

    trainds = create_train_dataset(train_data_path, batch_size)
    evalds = create_eval_dataset(eval_data_path, batch_size)

    steps_per_epoch = num_examples_to_train_on // (batch_size * num_evals)

    checkpoint_cb = callbacks.ModelCheckpoint(
        checkpoint_path, save_weights_only=True, verbose=1
    )
    tensorboard_cb = callbacks.TensorBoard(tensorboard_path, histogram_freq=1)

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=num_evals,
        steps_per_epoch=max(1, steps_per_epoch),
        verbose=2,
        callbacks=[checkpoint_cb, tensorboard_cb],
    )

    model.save(model_export_path)
    return history

2023-05-18 22:59:48.974084: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-18 22:59:49.017026: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-18 22:59:49.017935: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
hparams = dict(
    batch_size=32,
    nnsize="32 8",
    nbuckets=10,
    lr=0.001,
    num_evals=5,
    num_examples_to_train_on=100,
    output_dir='babyweight-trained',
    train_data_path='../datasets/taxi-train.csv',
    eval_data_path='../datasets/taxi-valid.csv'
)
train_and_evaluate(hparams)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 pickup_longitude (InputLayer)  [(None, 1)]          0           []                               
                                                                                                  
 dropoff_longitude (InputLayer)  [(None, 1)]         0           []                               
                                                                                                  
 pickup_latitude (InputLayer)   [(None, 1)]          0           []                               
                                                                                                  
 dropoff_latitude (InputLayer)  [(None, 1)]          0           []                               
                                                                                              

2023-05-18 22:59:51.496642: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-05-18 22:59:51.497627: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-05-18 22:59:53.284057: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{n


Epoch 1: saving model to babyweight-trained/checkpoints
1/1 - 3s - loss: 316.5316 - rmse: 17.7913 - mse: 316.5316 - val_loss: 248.1806 - val_rmse: 15.2128 - val_mse: 248.1806 - 3s/epoch - 3s/step
Epoch 2/5

Epoch 2: saving model to babyweight-trained/checkpoints
1/1 - 0s - loss: 424.0381 - rmse: 20.5922 - mse: 424.0381 - val_loss: 247.9235 - val_rmse: 15.2091 - val_mse: 247.9235 - 245ms/epoch - 245ms/step
Epoch 3/5

Epoch 3: saving model to babyweight-trained/checkpoints
1/1 - 0s - loss: 287.3563 - rmse: 16.9516 - mse: 287.3563 - val_loss: 247.6700 - val_rmse: 15.1065 - val_mse: 247.6700 - 263ms/epoch - 263ms/step
Epoch 4/5

Epoch 4: saving model to babyweight-trained/checkpoints
1/1 - 0s - loss: 115.9618 - rmse: 10.7686 - mse: 115.9618 - val_loss: 247.4210 - val_rmse: 15.1356 - val_mse: 247.4210 - 254ms/epoch - 254ms/step
Epoch 5/5

Epoch 5: saving model to babyweight-trained/checkpoints
1/1 - 0s - loss: 118.4053 - rmse: 10.8814 - mse: 118.4053 - val_loss: 247.1783 - val_rmse: 15.157

2023-05-18 22:59:55.548141: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'dropoff_longitude' with dtype float and shape [?,1]
	 [[{{node dropoff_longitude}}]]
2023-05-18 22:59:55.548292: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'pickup_longitude' with dtype float and shape [?,1]
	 [[{{node pickup_longitude}}]]
2023-05-18 22:59:55.592993: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'dropoff_longitude' with dtype float and sha

INFO:tensorflow:Assets written to: babyweight-trained/savedmodel/assets


INFO:tensorflow:Assets written to: babyweight-trained/savedmodel/assets


<keras.callbacks.History at 0x7f0c6afcfbe0>

In [3]:
%load_ext tensorboard

In [5]:
%tensorboard --host=0.0.0.0 --port=6006 --logdir babyweight-trained/tensorboard