In [34]:
# Retrieve and set PROJECT_ID and REGION environment variables.
PROJECT = !(gcloud config get-value core/project)
PROJECT = PROJECT[0]
REGION = 'asia-northeast3'
# Google Cloud Storage bucket for artifact storage.
BUCKET = 'mlops-test-kay'
BUCKET_URI = 'gs://' + BUCKET

In [35]:
from google.cloud import aiplatform
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
import numpy as np
import argparse
from sklearn import preprocessing
import pickle

In [36]:
TRAIN_VERSION = "tf-cpu.2-6"
DEPLOY_VERSION = "tf2-cpu.2-6"

TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

In [37]:
aiplatform.init(project=PROJECT, location=REGION, staging_bucket=BUCKET)

In [38]:
MACHINE_TYPE = "n1-standard"

VCPU = "8"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "8"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-8
Deploy machine type n1-standard-8


In [39]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

JOB_NAME = "airpot_passenger-" + TIMESTAMP

# Training parameters
MODEL_NAME = 'airport_passenger'

EPOCHS = 400
BATCH_SIZE = 8
LEARNING_RATE = 0.001
DROPOUT = 0.05

CMDARGS = [
    "--learning_rate=" + str(LEARNING_RATE),
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
    "--dropout=" + str(DROPOUT)
]

In [40]:
# Ignore the warnings
import warnings
warnings.filterwarnings('always')
warnings.filterwarnings('ignore')

# Data manipulation, visualization and useful functions
import argparse
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import pickle

# gcp functions
from google.cloud import bigquery
from google.cloud import storage

# Keras and tensorflow
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential, Model, load_model
from keras.layers import Input, Dense, Activation, Flatten, Dropout
from keras.layers import SimpleRNN, LSTM, GRU
from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras import layers


# Download data
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.get_bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

download_blob("mlops-test-kay", "airport_passenger/X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")

with open('X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    x_val = pickle.load(f)

with open('X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    x_train = pickle.load(f)

with open('Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    y_val = pickle.load(f)

with open('Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    y_train = pickle.load(f)
    
x_train = np.asarray(x_train).astype(np.float32)
y_train = np.asarray(y_train).astype(np.float32)
x_val  = np.asarray(x_val).astype(np.float32)
y_val  = np.asarray(y_val).astype(np.float32)

Downloaded storage object airport_passenger/X_train_multi_scaled_nm_ns_7_lt21_tr.pkl from bucket mlops-test-kay to local file X_train_multi_scaled_nm_ns_7_lt21_tr.pkl.
Downloaded storage object airport_passenger/X_val_multi_scaled_nm_ns_7_lt21_tr.pkl from bucket mlops-test-kay to local file X_val_multi_scaled_nm_ns_7_lt21_tr.pkl.
Downloaded storage object airport_passenger/Y_train_multi_scaled_nm_ns_7_lt21_tr.pkl from bucket mlops-test-kay to local file Y_train_multi_scaled_nm_ns_7_lt21_tr.pkl.
Downloaded storage object airport_passenger/Y_val_multi_scaled_nm_ns_7_lt21_tr.pkl from bucket mlops-test-kay to local file Y_val_multi_scaled_nm_ns_7_lt21_tr.pkl.


In [41]:
np.shape(x_val)

(15, 7, 420)

In [42]:
%%writefile task.py


# Ignore the warnings
import warnings
warnings.filterwarnings('always')
warnings.filterwarnings('ignore')

# Data manipulation, visualization and useful functions
import argparse
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import pickle

# gcp functions
from google.cloud import bigquery
from google.cloud import storage

# Keras and tensorflow
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential, Model, load_model
from keras.layers import Input, Dense, Activation, Flatten, Dropout
from keras.layers import SimpleRNN, LSTM, GRU
from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras import layers


# Download data
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.get_bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

download_blob("mlops-test-kay", "airport_passenger/X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")
download_blob("mlops-test-kay", "airport_passenger/Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl", "Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl")

with open('X_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    x_val = pickle.load(f)

with open('X_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    x_train = pickle.load(f)

with open('Y_val_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    y_val = pickle.load(f)

with open('Y_train_multi_scaled_nm_ns_7_lt21_tr_fm.pkl','rb') as f:
    y_train = pickle.load(f)
    
    
x_train = np.asarray(x_train).astype(np.float32)
y_train = np.asarray(y_train).astype(np.float32)
x_val  = np.asarray(x_val).astype(np.float32)
y_val  = np.asarray(y_val).astype(np.float32)



past = 7
n_steps = 7

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--learning_rate', dest='learning_rate',
                    default=0.001, type=float,
                    help='Learning rate')
parser.add_argument('--epochs', dest='epochs',
                    default=400, type=int,
                    help='Number of epochs.')
parser.add_argument('--batch_size', dest='batch_size',
                    default=8, type=int,
                    help='Batch size.')
parser.add_argument('--dropout', dest='dropout', 
                    default=0.05, type=float,
                    help='Dropput ratio')
args = parser.parse_args()

def build_model(X_train_multi_gru):
    model = Sequential()
    model.add(GRU(256, input_shape=(x_train.shape[1], x_train.shape[2]), return_sequences=True, kernel_initializer='he_normal',activation='relu'))
    model.add(Dropout(args.dropout)) 
    model.add(GRU(768, return_sequences=True, kernel_initializer='he_normal',activation="relu"))
    model.add(Dropout(args.dropout)) 
    model.add(GRU(512, return_sequences=True, kernel_initializer='he_normal',activation="relu"))
    model.add(Dropout(args.dropout)) 
    model.add(GRU(64, return_sequences=False, kernel_initializer='he_normal',activation="relu"))
    model.add(Dropout(args.dropout)) 
    model.add(Dense(n_steps))
    model.compile(keras.optimizers.Adam(learning_rate=args.learning_rate), loss='mean_squared_error')
    return model

model = build_model(x_train)
#es = EarlyStopping(monitor='val_loss', mode='min', verbose=1)
model.fit(x_train, y_train, 
                      batch_size=args.batch_size, epochs=args.epochs,validation_data=(x_val, y_val), shuffle=True,
                      verbose=1,)

tf.saved_model.save(model, os.getenv("AIP_MODEL_DIR"))

Overwriting task.py


In [43]:
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri=TRAIN_IMAGE,
    #requirements=["google-cloud-bigquery>=2.20.0", "db-dtypes"],
    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "airpot_passenger-" + TIMESTAMP

# Start the training
model = job.run(
    #dataset=dataset,
    model_display_name=MODEL_DISPLAY_NAME,
    #bigquery_destination=f"bq://{PROJECT_ID}",
    args=CMDARGS,
    replica_count=1,
    machine_type=TRAIN_COMPUTE)

Training script copied to:
gs://mlops-test-kay/aiplatform-2022-08-25-06:23:02.198-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://mlops-test-kay/aiplatform-custom-training-2022-08-25-06:23:02.258 
View Training:
https://console.cloud.google.com/ai/platform/locations/asia-northeast3/training/4660548315165753344?project=392016637758
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/asia-northeast3/training/4570194847641632768?project=392016637758
CustomTrainingJob projects/392016637758/locations/asia-northeast3/trainingPipelines/4660548315165753344 current state:
PipelineState.PIPELINE_STATE_RUNNING


KeyboardInterrupt: 

## Deploy the model

In [None]:
DEPLOYED_NAME = f"{MODEL_NAME}_deployed-" + TIMESTAMP

endpoint = model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    machine_type=DEPLOY_COMPUTE,
    min_replica_count=1,
    max_replica_count=1,
    traffic_split={"0": 100},
)