In [1]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'valiant-ocean-401219'

In [2]:
REGION = 'us-east1'


In [3]:
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
from sklearn import datasets

In [4]:
gcs = storage.Client(project = PROJECT_ID)
bq = bigquery.Client(project = PROJECT_ID)

In [5]:
BUCKET = PROJECT_ID

In [7]:
if not gcs.lookup_bucket(BUCKET):
    bucketDef = gcs.bucket(BUCKET)
    bucket = gcs.create_bucket(bucketDef, project=PROJECT_ID, location=REGION)
    print(f'Created Bucket: {gcs.lookup_bucket(BUCKET).name}')
else:
    bucketDef = gcs.bucket(BUCKET)
    print(f'Bucket already exist: {bucketDef.name}')

Bucket already exist: valiant-ocean-401219


In [8]:
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'21381076265-compute@developer.gserviceaccount.com'

In [9]:
!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" --format='table(bindings.role)' --flatten="bindings[].members"

ROLE
roles/editor
roles/run.admin
roles/storage.objectAdmin


In [10]:
!pip install kfp -U -q

In [11]:
!pip install google-cloud-pipeline-components -U -q


In [12]:
from google.cloud import aiplatform
aiplatform.__version__

'1.33.1'

In [13]:
from google.cloud import bigquery
from google.cloud import storage

In [14]:
#loading data
bq = bigquery.Client(project = PROJECT_ID)
gcs = storage.Client(project = PROJECT_ID)

In [21]:
EXPERIMENT = '01'
SERIES = '01'

BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'weather'
BQ_TABLE = 'weatherData_noNull_prepped'

In [16]:
datasets = list(bq.list_datasets())
for d in datasets:
    print(d.dataset_id)

weather


In [46]:
query = f"""
SELECT *
FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}` TABLESAMPLE SYSTEM (1 PERCENT)
#LIMIT 5
"""
bq.query(query = query).to_dataframe()

Unnamed: 0,Formatted_Date,Summary,Precip_Type,Temperature__C_,Apparent_Temperature__C_,Humidity,Wind_Speed__km_h_,Wind_Bearing__degrees_,Visibility__km_,Loud_Cover,Pressure__millibars_,Daily_Summary,row_ID,splits
0,2012-03-21 14:00:00+00:00,Dry,rain,20.000000,20.000000,0.21,14.4900,320,9.9820,0,1032.20,Clear throughout the day.,4944af1e-4627-44b4-ba42-a4e99ade82bc,TRAIN
1,2012-03-17 16:00:00+00:00,Dry,rain,20.000000,20.000000,0.24,12.8800,190,9.9820,0,1021.10,Partly cloudy in the morning.,b183c577-863c-4025-8436-6c50b50afc18,TEST
2,2012-03-17 10:00:00+00:00,Dry,rain,20.000000,20.000000,0.26,14.4900,200,9.9820,0,1025.20,Partly cloudy in the morning.,2f6f438e-2e75-464c-a1e9-fa827f3f874e,TRAIN
3,2012-08-20 08:00:00+00:00,Dry,rain,30.000000,28.677778,0.29,4.4919,355,9.9820,0,1022.68,Clear throughout the day.,aa31c8ae-7a6d-4e89-b05b-fce6d303e122,TRAIN
4,2007-07-22 11:00:00+00:00,Dry,rain,38.750000,36.622222,0.15,21.8799,230,9.9820,0,1008.74,Partly cloudy starting in the afternoon.,8051d38c-4391-4eee-addb-b3b84ce185f5,TRAIN
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95931,2009-04-28 12:00:00+00:00,Breezy and Partly Cloudy,rain,21.022222,21.022222,0.27,35.7742,140,10.3684,0,1011.27,Breezy starting in the morning continuing unti...,6a5beb8a-d4cc-4a82-a949-696d4a10e272,TRAIN
95932,2016-08-10 17:00:00+00:00,Breezy and Partly Cloudy,rain,19.772222,19.772222,0.64,31.4433,310,9.9820,0,1011.06,Partly cloudy throughout the day.,c215fb20-b164-4e98-bf33-a24ded6f3d91,TRAIN
95933,2015-05-10 13:00:00+00:00,Breezy and Partly Cloudy,rain,23.772222,23.772222,0.31,34.6633,311,16.1000,0,1018.48,Mostly cloudy until night and breezy starting ...,c28e9a77-283d-4828-91ab-411a6c89c03d,TEST
95934,2012-08-26 17:00:00+00:00,Breezy and Partly Cloudy,rain,23.772222,23.772222,0.42,33.0533,319,9.9820,0,1008.17,Breezy starting in the afternoon continuing un...,b30dc9f8-851e-41af-a01e-1a044369ebed,TRAIN


In [18]:
#data review
query = f"""
SELECT Precip_Type
FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}`
"""
df = bq.query(query = query).to_dataframe()

In [19]:
df['Precip_Type'].value_counts()

rain    85224
snow    10712
Name: Precip_Type, dtype: int64

Additional preprocessing

In [42]:
def assignUniqueNumber(df):
  map = {'rain': 0, 'snow': 1}
  labelLowered = df['Precip_Type'].values
  labelNum = []
  for label in labelLowered:
    labelNum.append(map[label])
  df['label'] = labelNum
  return df

In [43]:
df = assignUniqueNumber(df)
y = df.label
print('Shape of label tensor:', y.shape)

Shape of label tensor: (95936,)


In [44]:
df.head()
df.dropna()

Unnamed: 0,Precip_Type,label
0,snow,1
1,snow,1
2,snow,1
3,snow,1
4,snow,1
...,...,...
95931,rain,0
95932,rain,0
95933,rain,0
95934,rain,0


In [45]:
df.head()

Unnamed: 0,Precip_Type,label
0,snow,1
1,snow,1
2,snow,1
3,snow,1
4,snow,1


In [54]:
!pip install pandas-gbq

Collecting pandas-gbq
  Downloading pandas_gbq-0.19.2-py2.py3-none-any.whl (25 kB)
Collecting pydata-google-auth>=1.5.0 (from pandas-gbq)
  Obtaining dependency information for pydata-google-auth>=1.5.0 from https://files.pythonhosted.org/packages/28/6b/3320c9ddbfc572108917e8432a07e8bd1e40054d94b5ad40c755afdc1160/pydata_google_auth-1.8.2-py2.py3-none-any.whl.metadata
  Downloading pydata_google_auth-1.8.2-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting google-api-core<3.0.0dev,>=2.10.2 (from pandas-gbq)
  Obtaining dependency information for google-api-core<3.0.0dev,>=2.10.2 from https://files.pythonhosted.org/packages/4d/ce/4fd62ea66b3508debc795e475336ce915929765870f0ad52328426ba016e/google_api_core-2.12.0-py3-none-any.whl.metadata
  Downloading google_api_core-2.12.0-py3-none-any.whl.metadata (2.7 kB)
Collecting google-auth>=2.13.0 (from pandas-gbq)
  Obtaining dependency information for google-auth>=2.13.0 from https://files.pythonhosted.org/packages/39/7c/2e4fa55a99f83ef9ef229ac5

In [55]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler

In [56]:
from pandas.io import gbq

In [74]:
#data review
query = f"""
SELECT *
FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}`
"""
data = gbq.read_gbq(query, project_id=BQ_PROJECT)

In [75]:
# Drop the rows with missing values
data = data.dropna()

# One-hot encode categorical variables
encoder = OneHotEncoder(sparse=False)
categorical_data = encoder.fit_transform(data[categorical_columns])

# Get feature names
feature_names = encoder.categories_

# Create a DataFrame from the categorical data
categorical_df = pd.DataFrame(categorical_data, columns=feature_names)

# Concatenate the original DataFrame with the new `categorical_df`
data = pd.concat([data, categorical_df], axis=1)

# Normalize continuous variables
scaler = StandardScaler()
continuous_columns = ['Temperature__C_', 'Humidity', 'Apparent_Temperature__C_', 'Wind_Speed__km_h_', 'Wind_Bearing__degrees_', 'Pressure__millibars_']  
data[continuous_columns] = scaler.fit_transform(data[continuous_columns])



In [76]:
data.head()

Unnamed: 0,Formatted_Date,Summary,Precip_Type,Temperature__C_,Apparent_Temperature__C_,Humidity,Wind_Speed__km_h_,Wind_Bearing__degrees_,Visibility__km_,Loud_Cover,Pressure__millibars_,Daily_Summary,row_ID,splits,"(rain,)","(snow,)"
0,2012-03-21 14:00:00+00:00,Dry,rain,0.842059,0.852554,-2.681548,0.532471,1.233706,9.982,0,0.247705,Clear throughout the day.,4944af1e-4627-44b4-ba42-a4e99ade82bc,TRAIN,1.0,0.0
1,2012-03-17 16:00:00+00:00,Dry,rain,0.842059,0.852554,-2.52827,0.299835,0.023106,9.982,0,0.153057,Partly cloudy in the morning.,b183c577-863c-4025-8436-6c50b50afc18,TEST,1.0,0.0
2,2012-03-17 10:00:00+00:00,Dry,rain,0.842059,0.852554,-2.426085,0.532471,0.116229,9.982,0,0.188017,Partly cloudy in the morning.,2f6f438e-2e75-464c-a1e9-fa827f3f874e,TRAIN,1.0,0.0
3,2012-08-20 08:00:00+00:00,Dry,rain,1.886923,1.662218,-2.272808,-0.912197,1.559636,9.982,0,0.166529,Clear throughout the day.,aa31c8ae-7a6d-4e89-b05b-fce6d303e122,TRAIN,1.0,0.0
4,2007-07-22 11:00:00+00:00,Dry,rain,2.801179,2.403459,-2.988104,1.600268,0.395598,9.982,0,0.047665,Partly cloudy starting in the afternoon.,8051d38c-4391-4eee-addb-b3b84ce185f5,TRAIN,1.0,0.0


In [77]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 95936 entries, 0 to 95935
Data columns (total 16 columns):
 #   Column                    Non-Null Count  Dtype              
---  ------                    --------------  -----              
 0   Formatted_Date            95936 non-null  datetime64[ns, UTC]
 1   Summary                   95936 non-null  object             
 2   Precip_Type               95936 non-null  object             
 3   Temperature__C_           95936 non-null  float64            
 4   Apparent_Temperature__C_  95936 non-null  float64            
 5   Humidity                  95936 non-null  float64            
 6   Wind_Speed__km_h_         95936 non-null  float64            
 7   Wind_Bearing__degrees_    95936 non-null  float64            
 8   Visibility__km_           95936 non-null  float64            
 9   Loud_Cover                95936 non-null  Int64              
 10  Pressure__millibars_      95936 non-null  float64            
 11  Daily_Summary  

In [20]:
!pip install tensorflow==2.6.0


Collecting tensorflow==2.6.0
  Downloading tensorflow-2.6.0-cp39-cp39-manylinux2010_x86_64.whl (458.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m458.4/458.4 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting typing-extensions~=3.7.4 (from tensorflow==2.6.0)
  Downloading typing_extensions-3.7.4.3-py3-none-any.whl (22 kB)
Installing collected packages: typing-extensions, tensorflow
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 3.10.0.2
    Uninstalling typing-extensions-3.10.0.2:
      Successfully uninstalled typing-extensions-3.10.0.2
  Attempting uninstall: tensorflow
    Found existing installation: tensorflow 2.6.5
    Uninstalling tensorflow-2.6.5:
      Successfully uninstalled tensorflow-2.6.5
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
apache-beam 2.4

In [24]:
from google.cloud import aiplatform
from datetime import datetime
import pkg_resources
from IPython.display import Markdown as md
from google.cloud import bigquery
from google.cloud import storage
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np
import pandas as pd

In [25]:
aiplatform.init(project = PROJECT_ID, location = REGION)

In [26]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [27]:
URI = f"gs://{BUCKET}/{SERIES}/{EXPERIMENT}"
DIR = f"temp/{EXPERIMENT}"


In [28]:
!rm -rf {DIR}
!mkdir -p {DIR}

In [29]:
FRAMEWORK = 'tf'
TASK = 'classification'
MODEL_TYPE = 'dnn'
EXPERIMENT_NAME = f'experiment-{SERIES}-{EXPERIMENT}-{FRAMEWORK}-{TASK}-{MODEL_TYPE}'
RUN_NAME = f'run-{TIMESTAMP}'

Get Vertex AI Experiments Tensorboard Instance Name
Vertex AI Experiments has managed Tensorboard instances that you can track Tensorboard Experiments (a training run or hyperparameter tuning sweep).

The training job will show up as an experiment for the Tensorboard instance and have the same name as the training job ID.

This code checks to see if a Tensorboard Instance has been created in the project, retrieves it if so, creates it otherwise:

In [32]:
tb = aiplatform.Tensorboard.list(filter=f"labels.series={SERIES}")
if tb:
    tb = tb[0]
else: 
    tb = aiplatform.Tensorboard.create(display_name = SERIES, labels = {'series' : f'{SERIES}'})

In [33]:
tb.resource_name


'projects/21381076265/locations/us-east1/tensorboards/7569565811191316480'

In [34]:
aiplatform.init(experiment = EXPERIMENT_NAME, experiment_tensorboard = tb.resource_name)


In [35]:
SCRIPT_PATH = './code/train.py'

In [36]:
import os
from IPython.display import Markdown as md

In [37]:
if os.path.exists('code'):
    print('The code directory alredy exists')
else:
    print('Creating the code directory')
    os.makedirs('code')

Creating the code directory


In [38]:
%%writefile {SCRIPT_PATH}

# package import
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
import tensorflow as tf
from google.cloud import bigquery
from google.cloud import aiplatform
import argparse
import os
import sys

Writing ./code/train.py


In [39]:
%%writefile -a {SCRIPT_PATH}

# import argument to local variables
parser = argparse.ArgumentParser()
# the passed param, dest: a name for the param, default: if absent fetch this param from the OS, type: type to convert to, help: description of argument
parser.add_argument('--epochs', dest = 'epochs', default = 10, type = int, help = 'Number of Epochs')
parser.add_argument('--batch_size', dest = 'batch_size', default = 32, type = int, help = 'Batch Size')
parser.add_argument('--var_target', dest = 'var_target', type=str)
parser.add_argument('--var_omit', dest = 'var_omit', type=str, nargs='*')
parser.add_argument('--project_id', dest = 'project_id', type=str)
parser.add_argument('--bq_project', dest = 'bq_project', type=str)
parser.add_argument('--bq_dataset', dest = 'bq_dataset', type=str)
parser.add_argument('--bq_table', dest = 'bq_table', type=str)
parser.add_argument('--region', dest = 'region', type=str)
parser.add_argument('--experiment', dest = 'experiment', type=str)
parser.add_argument('--series', dest = 'series', type=str)
parser.add_argument('--experiment_name', dest = 'experiment_name', type=str)
parser.add_argument('--run_name', dest = 'run_name', type=str)
args = parser.parse_args()

Appending to ./code/train.py


In [40]:
%%writefile -a {SCRIPT_PATH}

# clients
bq = bigquery.Client(project = args.project_id)
aiplatform.init(project = args.project_id, location = args.region)

Appending to ./code/train.py


In [41]:
%%writefile -a {SCRIPT_PATH}

# Vertex AI Experiment run setup
if args.run_name in [run.name for run in aiplatform.ExperimentRun.list(experiment = args.experiment_name)]:
    expRun = aiplatform.ExperimentRun(run_name = args.run_name, experiment = args.experiment_name)
else:
    expRun = aiplatform.ExperimentRun.create(run_name = args.run_name, experiment = args.experiment_name)
expRun.log_params({'experiment': args.experiment, 'series': args.series, 'project_id': args.project_id})

Appending to ./code/train.py


In [65]:
%%writefile -a {SCRIPT_PATH}

# get schema from bigquery source
query = f"SELECT * FROM {args.bq_project}.{args.bq_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{args.bq_table}'"
schema = bq.query(query).to_dataframe()

# get number of classes from bigquery source
nclasses = bq.query(query = f'SELECT DISTINCT {args.var_target} FROM {args.bq_project}.{args.bq_dataset}.{args.bq_table} WHERE {args.var_target} is not null').to_dataframe()
nclasses = nclasses.shape[0]
expRun.log_params({'data_source': f'bq://{args.bq_project}.{args.bq_dataset}.{args.bq_table}', 'nclasses': nclasses, 'var_split': 'splits', 'var_target': args.var_target})

# Make a list of columns to omit
OMIT = args.var_omit + ['splits']

# use schema to prepare a list of columns to read from BigQuery
selected_fields = schema[~schema.column_name.isin(OMIT)].column_name.tolist()

# all the columns in this data source are either float64 or int64
output_types = [dtypes.float64 if x=='FLOAT64' else dtypes.int64 for x in schema[~schema.column_name.isin(OMIT)].data_type.tolist()]

Appending to ./code/train.py


In [66]:
%%writefile -a {SCRIPT_PATH}

# remap input data to Tensorflow inputs of features and target
def transTable(row_dict):
    target = row_dict.pop(args.var_target)
    target = tf.one_hot(tf.cast(target, tf.int64), nclasses)
    target = tf.cast(target, tf.float32)
    return(row_dict, target)

# function to setup a bigquery reader with Tensorflow I/O
def bq_reader(split):
    reader = BigQueryClient()

    training = reader.read_session(
        parent = f"projects/{args.project_id}",
        project_id = args.bq_project,
        table_id = args.bq_table,
        dataset_id = args.bq_dataset,
        selected_fields = selected_fields,
        output_types = output_types,
        row_restriction = f"splits='{split}'",
        requested_streams = 3
    )
    
    return training

# setup feed for train, validate and test
train = bq_reader('TRAIN').parallel_read_rows().prefetch(1).map(transTable).shuffle(args.batch_size*10).batch(args.batch_size)
validate = bq_reader('VALIDATE').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size)
test = bq_reader('TEST').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size)
expRun.log_params({'training.batch_size': args.batch_size, 'training.shuffle': 10*args.batch_size, 'training.prefetch': 1})

Appending to ./code/train.py


In [67]:
%%writefile -a {SCRIPT_PATH}

# Logistic Regression

# model input definitions
feature_columns = {header: tf.feature_column.numeric_column(header) for header in selected_fields if header != args.var_target}
feature_layer_inputs = {header: tf.keras.layers.Input(shape = (1,), name = header) for header in selected_fields if header != args.var_target}

# feature columns to a Dense Feature Layer
feature_layer_outputs = tf.keras.layers.DenseFeatures(feature_columns.values(), name = 'feature_layer')(feature_layer_inputs)

# batch normalization of inputs
normalized = tf.keras.layers.BatchNormalization(name = 'batch_normalization_layer')(feature_layer_outputs)

# logistic - using softmax activation to nclasses
logistic = tf.keras.layers.Dense(nclasses, activation = tf.nn.softmax, name = 'logistic')(normalized)

# the model
model = tf.keras.Model(
    inputs = feature_layer_inputs,
    outputs = logistic,
    name = args.experiment
)

# compile
model.compile(
    optimizer = tf.keras.optimizers.SGD(), #SGD or Adam
    loss = tf.keras.losses.CategoricalCrossentropy(),
    metrics = ['accuracy', tf.keras.metrics.AUC(curve = 'PR', name = 'auprc')]

Appending to ./code/train.py


In [68]:
%%writefile -a {SCRIPT_PATH}

# setup tensorboard logs and train
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'], histogram_freq=1)
history = model.fit(train, epochs = args.epochs, callbacks = [tensorboard_callback], validation_data = validate)
expRun.log_params({'training.epochs': history.params['epochs']})
for e in range(0, history.params['epochs']):
    expRun.log_time_series_metrics(
        {
            'train_loss': history.history['loss'][e],
            'train_accuracy': history.history['accuracy'][e],
            'train_auprc': history.history['auprc'][e],
            'val_loss': history.history['val_loss'][e],
            'val_accuracy': history.history['val_accuracy'][e],
            'val_auprc': history.history['val_auprc'][e]
        }
    )

Appending to ./code/train.py


In [69]:
%%writefile -a {SCRIPT_PATH}

# test evaluations:
loss, accuracy, auprc = model.evaluate(test)
expRun.log_metrics({'test_loss': loss, 'test_accuracy': accuracy, 'test_auprc': auprc})

# val evaluations:
loss, accuracy, auprc = model.evaluate(validate)
expRun.log_metrics({'val_loss': loss, 'val_accuracy': accuracy, 'val_auprc': auprc})

# training evaluations:
loss, accuracy, auprc = model.evaluate(train)
expRun.log_metrics({'train_loss': loss, 'train_accuracy': accuracy, 'train_auprc': auprc})

Appending to ./code/train.py


In [70]:
%%writefile -a {SCRIPT_PATH}

# output the model save files
model.save(os.getenv("AIP_MODEL_DIR"))
expRun.log_params({'model.save': os.getenv("AIP_MODEL_DIR")})
expRun.end_run()

Appending to ./code/train.py


In [78]:
with open(SCRIPT_PATH, 'r') as file:
    d = file.read()
md(f"```python\n\n{d}\n```")

```python


# package import
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
import tensorflow as tf
from google.cloud import bigquery
from google.cloud import aiplatform
import argparse
import os
import sys

# import argument to local variables
parser = argparse.ArgumentParser()
# the passed param, dest: a name for the param, default: if absent fetch this param from the OS, type: type to convert to, help: description of argument
parser.add_argument('--epochs', dest = 'epochs', default = 10, type = int, help = 'Number of Epochs')
parser.add_argument('--batch_size', dest = 'batch_size', default = 32, type = int, help = 'Batch Size')
parser.add_argument('--var_target', dest = 'var_target', type=str)
parser.add_argument('--var_omit', dest = 'var_omit', type=str, nargs='*')
parser.add_argument('--project_id', dest = 'project_id', type=str)
parser.add_argument('--bq_project', dest = 'bq_project', type=str)
parser.add_argument('--bq_dataset', dest = 'bq_dataset', type=str)
parser.add_argument('--bq_table', dest = 'bq_table', type=str)
parser.add_argument('--region', dest = 'region', type=str)
parser.add_argument('--experiment', dest = 'experiment', type=str)
parser.add_argument('--series', dest = 'series', type=str)
parser.add_argument('--experiment_name', dest = 'experiment_name', type=str)
parser.add_argument('--run_name', dest = 'run_name', type=str)
args = parser.parse_args()

# clients
bq = bigquery.Client(project = args.project_id)
aiplatform.init(project = args.project_id, location = args.region)

# Vertex AI Experiment run setup
if args.run_name in [run.name for run in aiplatform.ExperimentRun.list(experiment = args.experiment_name)]:
    expRun = aiplatform.ExperimentRun(run_name = args.run_name, experiment = args.experiment_name)
else:
    expRun = aiplatform.ExperimentRun.create(run_name = args.run_name, experiment = args.experiment_name)
expRun.log_params({'experiment': args.experiment, 'series': args.series, 'project_id': args.project_id})

# get schema from bigquery source
query = f"SELECT * FROM {args.bq_project}.{args.bq_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{args.bq_table}'"
schema = bq.query(query).to_dataframe()

# get number of classes from bigquery source
nclasses = bq.query(query = f'SELECT DISTINCT {args.var_target} FROM {args.bq_project}.{args.bq_dataset}.{args.bq_table} WHERE {args.var_target} is not null').to_dataframe()
nclasses = nclasses.shape[0]
expRun.log_params({'data_source': f'bq://{args.bq_project}.{args.bq_dataset}.{args.bq_table}', 'nclasses': nclasses, 'var_split': 'splits', 'var_target': args.var_target})

# Make a list of columns to omit
OMIT = args.var_omit + ['splits']

# use schema to prepare a list of columns to read from BigQuery
selected_fields = schema[~schema.column_name.isin(OMIT)].column_name.tolist()

# all the columns in this data source are either float64 or int64
output_types = [dtypes.float64 if x=='FLOAT64' else dtypes.int64 for x in schema[~schema.column_name.isin(OMIT)].data_type.tolist()]

# remap input data to Tensorflow inputs of features and target
def transTable(row_dict):
    target = row_dict.pop(args.var_target)
    target = tf.one_hot(tf.cast(target, tf.int64), nclasses)
    target = tf.cast(target, tf.float32)
    return(row_dict, target)

# function to setup a bigquery reader with Tensorflow I/O
def bq_reader(split):
    reader = BigQueryClient()

    training = reader.read_session(
        parent = f"projects/{args.project_id}",
        project_id = args.bq_project,
        table_id = args.bq_table,
        dataset_id = args.bq_dataset,
        selected_fields = selected_fields,
        output_types = output_types,
        row_restriction = f"splits='{split}'",
        requested_streams = 3
    )
    
    return training

# setup feed for train, validate and test
train = bq_reader('TRAIN').parallel_read_rows().prefetch(1).map(transTable).shuffle(args.batch_size*10).batch(args.batch_size)
validate = bq_reader('VALIDATE').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size)
test = bq_reader('TEST').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size)
expRun.log_params({'training.batch_size': args.batch_size, 'training.shuffle': 10*args.batch_size, 'training.prefetch': 1})

# Logistic Regression

# model input definitions
feature_columns = {header: tf.feature_column.numeric_column(header) for header in selected_fields if header != args.var_target}
feature_layer_inputs = {header: tf.keras.layers.Input(shape = (1,), name = header) for header in selected_fields if header != args.var_target}

# feature columns to a Dense Feature Layer
feature_layer_outputs = tf.keras.layers.DenseFeatures(feature_columns.values(), name = 'feature_layer')(feature_layer_inputs)

# batch normalization of inputs
normalized = tf.keras.layers.BatchNormalization(name = 'batch_normalization_layer')(feature_layer_outputs)

# logistic - using softmax activation to nclasses
logistic = tf.keras.layers.Dense(nclasses, activation = tf.nn.softmax, name = 'logistic')(normalized)

# the model
model = tf.keras.Model(
    inputs = feature_layer_inputs,
    outputs = logistic,
    name = args.experiment
)

# compile
model.compile(
    optimizer = tf.keras.optimizers.SGD(), #SGD or Adam
    loss = tf.keras.losses.CategoricalCrossentropy(),
    metrics = ['accuracy', tf.keras.metrics.AUC(curve = 'PR', name = 'auprc')]

# setup tensorboard logs and train
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'], histogram_freq=1)
history = model.fit(train, epochs = args.epochs, callbacks = [tensorboard_callback], validation_data = validate)
expRun.log_params({'training.epochs': history.params['epochs']})
for e in range(0, history.params['epochs']):
    expRun.log_time_series_metrics(
        {
            'train_loss': history.history['loss'][e],
            'train_accuracy': history.history['accuracy'][e],
            'train_auprc': history.history['auprc'][e],
            'val_loss': history.history['val_loss'][e],
            'val_accuracy': history.history['val_accuracy'][e],
            'val_auprc': history.history['val_auprc'][e]
        }
    )

# test evaluations:
loss, accuracy, auprc = model.evaluate(test)
expRun.log_metrics({'test_loss': loss, 'test_accuracy': accuracy, 'test_auprc': auprc})

# val evaluations:
loss, accuracy, auprc = model.evaluate(validate)
expRun.log_metrics({'val_loss': loss, 'val_accuracy': accuracy, 'val_auprc': auprc})

# training evaluations:
loss, accuracy, auprc = model.evaluate(train)
expRun.log_metrics({'train_loss': loss, 'train_accuracy': accuracy, 'train_auprc': auprc})

# output the model save files
model.save(os.getenv("AIP_MODEL_DIR"))
expRun.log_params({'model.save': os.getenv("AIP_MODEL_DIR")})
expRun.end_run()

```

In [79]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 95936 entries, 0 to 95935
Data columns (total 16 columns):
 #   Column                    Non-Null Count  Dtype              
---  ------                    --------------  -----              
 0   Formatted_Date            95936 non-null  datetime64[ns, UTC]
 1   Summary                   95936 non-null  object             
 2   Precip_Type               95936 non-null  object             
 3   Temperature__C_           95936 non-null  float64            
 4   Apparent_Temperature__C_  95936 non-null  float64            
 5   Humidity                  95936 non-null  float64            
 6   Wind_Speed__km_h_         95936 non-null  float64            
 7   Wind_Bearing__degrees_    95936 non-null  float64            
 8   Visibility__km_           95936 non-null  float64            
 9   Loud_Cover                95936 non-null  Int64              
 10  Pressure__millibars_      95936 non-null  float64            
 11  Daily_Summary  

In [81]:
# Resources
TRAIN_IMAGE = 'us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-7:latest'
DEPLOY_IMAGE ='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest'
TRAIN_COMPUTE = 'n1-standard-4'
DEPLOY_COMPUTE = 'n1-standard-4'

# Model Training
VAR_TARGET = 'Precip_Type'
VAR_OMIT = 'row_ID','Summary', 'Formatted_Date', 'Loud_Cover', 'Daily_Summary', 'splits'
EPOCHS = 1
BATCH_SIZE = 100

In [82]:
CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
    "--var_target=" + VAR_TARGET,
    "--var_omit=" + VAR_OMIT,
    "--project_id=" + PROJECT_ID,
    "--bq_project=" + BQ_PROJECT,
    "--bq_dataset=" + BQ_DATASET,
    "--bq_table=" + BQ_TABLE,
    "--region=" + REGION,
    "--experiment=" + EXPERIMENT,
    "--series=" + SERIES,
    "--experiment_name=" + EXPERIMENT_NAME,
    "--run_name=" + RUN_NAME
]

TypeError: can only concatenate str (not "tuple") to str