### replace local data path to gcs

In [2]:
%%writefile train_gcs.py


# The datetime module used to work with dates as date objects.
import datetime
# The OS module in python provides functions for interacting with the operating system.
import os
# The shutil module in Python provides many functions of high-level operations on files and collections of files.
# This module helps in automating process of copying and removal of files and directories.
import shutil

# Here we'll import data processing libraries like Numpy, Pandas and Tensorflow
import numpy as np
import pandas as pd
import tensorflow as tf

# Import pyplot package from matplotlib library
from matplotlib import pyplot as plt
# Import keras package from tensorflow library
from tensorflow import keras

# Import Sequential function from tensorflow.keras.models
from tensorflow.keras.models import Sequential
# Import Dense, DenseFeatures function from tensorflow.keras.layers
from tensorflow.keras.layers import Dense, DenseFeatures
# Import TensorBoard function from tensorflow.keras.callbacks
from tensorflow.keras.callbacks import TensorBoard
# Defining the feature names into a list `CSV_COLUMNS`
CSV_COLUMNS = [
    'fare_amount',
    'pickup_datetime',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
    'passenger_count',
    'key'
]
LABEL_COLUMN = 'fare_amount'
# Defining the default values into a list `DEFAULTS`
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
UNWANTED_COLS = ['pickup_datetime', 'key']


def features_and_labels(row_data):
# The .pop() method will return item and drop from frame. 
    label = row_data.pop(LABEL_COLUMN)
    features = row_data
    # feat engg
    #row based and full-pass
    for unwanted_col in UNWANTED_COLS:
        features.pop(unwanted_col)

    return features, label


def create_dataset(pattern, batch_size=1, mode='eval'):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
    dataset = tf.data.experimental.make_csv_dataset(
        pattern, batch_size, CSV_COLUMNS, DEFAULTS)

# The map() function executes a specified function for each item in an iterable.
# The item is sent to the function as a parameter.
    dataset = dataset.map(features_and_labels)

    if mode == 'train':
# The shuffle() method takes a sequence (list, string, or tuple) and reorganize the order of the items.
        dataset = dataset.shuffle(buffer_size=1000).repeat()

    # take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(1)
    return dataset
# Defining the feature names into a list `INPUT_COLS`
INPUT_COLS = [
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
    'passenger_count',
]

# Create input layer of feature columns
# TODO 1
feature_columns = {
    colname: tf.feature_column.numeric_column(colname)
    for colname in INPUT_COLS
    }
# Build a keras DNN model using Sequential API
# TODO 2a
model = Sequential([
    DenseFeatures(feature_columns=feature_columns.values()),
    Dense(units=32, activation="relu", name="h1"),
    Dense(units=8, activation="relu", name="h2"),
    Dense(units=1, activation="linear", name="output")
    ])
# TODO 2b
# Create a custom evalution metric
def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


# Compile the keras model
model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])
TRAIN_BATCH_SIZE = 1000
#this is equal to no. of training data (10000) * epochs (5)
NUM_TRAIN_EXAMPLES = 10000 * 5  # training dataset will repeat, wrap around
#evaluation and checkpointing happens at the end of each epoch, we want 50 evaluations intead of original 5
#hence we need to make epocs =50
#but we dont want the data to repeat 50 times. hence we need to decrease the no. of steps_per_epoch aaccordingly
NUM_EVALS = 50  # how many times to evaluate
NUM_EVAL_EXAMPLES = 10000  # enough to get a reasonable sample

trainds = create_dataset(
    pattern='gs://vertex_e2e_taxi_data/taxifare/data/taxi-train-000000000000.csv',
    batch_size=TRAIN_BATCH_SIZE,
    mode='train')

evalds = create_dataset(
    pattern='gs://vertex_e2e_taxi_data/taxifare/data/taxi-valid-000000000000.csv',
    batch_size=1000,
    mode='eval').take(NUM_EVAL_EXAMPLES//1000)


# TODO 3
steps_per_epoch = NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS)

LOGDIR = "./taxi_trained"
# Train the sequential model
history = model.fit(x=trainds,
                    steps_per_epoch=steps_per_epoch,
                    epochs=NUM_EVALS,
                    validation_data=evalds,
                    callbacks=[TensorBoard(LOGDIR)])



Writing train_gcs.py


In [3]:
from google.cloud import aiplatform

In [4]:
bucket = "gs://vertex_e2e_example"

In [5]:
job = aiplatform.CustomTrainingJob(
    #display_name=JOB_NAME,
    display_name="model on vertex - gcs data",
    script_path="train_gcs.py",
    #container_uri=TRAIN_IMAGE,
    container_uri="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest",
    #requirements=["google-cloud-bigquery>=2.20.0"],
    #model_serving_container_image_uri=DEPLOY_IMAGE,
    staging_bucket=bucket,
    project='vf-grp-commercial-tst-explore',
    location='europe-west1'
)

In [6]:
model = job.run(
       # dataset=dataset,
       # model_display_name=MODEL_DISPLAY_NAME,
       # bigquery_destination=f"bq://{PROJECT_ID}",
       # args=CMDARGS,
        replica_count=1,
       # machine_type=TRAIN_COMPUTE,
    machine_type='n1-standard-4',
        accelerator_count=0,
    )

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://vertex_e2e_example/aiplatform-2021-11-01-17:50:23.036-aiplatform_custom_trainer_script-0.1.tar.gz.
INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://vertex_e2e_example/aiplatform-custom-training-2021-11-01-17:50:23.284 
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/europe-west1/training/4039816826681556992?project=387138108602
INFO:google.cloud.aiplatform.training_jobs:View backing custom job:
https://console.cloud.google.com/ai/platform/locations/europe-west1/training/8281996569432031232?project=387138108602
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/387138108602/locations/europe-west1/trainingPipelines/4039816826681556992 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/387138108602/locations/europe-west1/trainingPip

In [7]:
%%writefile --append train_gcs.py

tf.saved_model.save(model, os.environ["AIP_MODEL_DIR"])

Appending to train_gcs.py


In [9]:
job = aiplatform.CustomTrainingJob(
    #display_name=JOB_NAME,
    display_name="model on vertex - gcs data",
    script_path="train_gcs.py",
    #container_uri=TRAIN_IMAGE,
    container_uri="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest",
    #requirements=["google-cloud-bigquery>=2.20.0"],
    #model_serving_container_image_uri=DEPLOY_IMAGE,
    staging_bucket=bucket,
    project='vf-grp-commercial-tst-explore',
    location='europe-west1'
)

In [10]:
model = job.run(
       # dataset=dataset,
       # model_display_name=MODEL_DISPLAY_NAME,
       # bigquery_destination=f"bq://{PROJECT_ID}",
       # args=CMDARGS,
        replica_count=1,
       # machine_type=TRAIN_COMPUTE,
    machine_type='n1-standard-4',
        accelerator_count=0,
    )

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://vertex_e2e_example/aiplatform-2021-11-01-18:04:55.134-aiplatform_custom_trainer_script-0.1.tar.gz.
INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://vertex_e2e_example/aiplatform-custom-training-2021-11-01-18:04:55.390 
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/europe-west1/training/5323131614249615360?project=387138108602
INFO:google.cloud.aiplatform.training_jobs:View backing custom job:
https://console.cloud.google.com/ai/platform/locations/europe-west1/training/1621172720551067648?project=387138108602
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/387138108602/locations/europe-west1/trainingPipelines/5323131614249615360 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/387138108602/locations/europe-west1/trainingPip