In [1]:
import pandas as pd
import numpy as np 
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.ensemble import RandomForestRegressor
from src.preprocessing import categorical_transform,numerical_transform,FeatureSelector
import warnings
warnings.filterwarnings('ignore')

pandas_df = pd.read_csv("gs://custom_data/kubeflow_kc_house_data.csv")

In [2]:
pandas_df.columns

Index(['id', 'date', 'price', 'bedrooms', 'bathrooms', 'sqft_living',
       'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade',
       'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode',
       'lat', 'long', 'sqft_living15', 'sqft_lot15'],
      dtype='object')

## Run the pipeline locally

In [3]:
cateforical_features = ['date', 'waterfront', 'view', 'yr_renovated']
numerical_features = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors',
                      'condition', 'grade', 'sqft_basement', 'yr_built']

In [10]:
cat_pipeline = Pipeline(steps=[ ('cat_selector',FeatureSelector(cateforical_features)),
                                 ('cat_transform',categorical_transform()),
                                 ('one_hot_encoding',OneHotEncoder())
                              ])

numerical_pipeline = Pipeline(steps= [ ('num_selector', FeatureSelector(numerical_features)),
#                                       ('num_transformer', numerical_transform()),
                                      ('imputer', SimpleImputer(strategy = 'median')),
                                      ('std_scaler', StandardScaler()) 
                                     ])

In [11]:
full_pipeline = FeatureUnion(transformer_list=[
    ('cat_pipe',cat_pipeline),
    ('num_pipe',numerical_pipeline)
])

pipeline_model = Pipeline(steps=[
    ('full_transformation',full_pipeline),
    ('model',RandomForestRegressor())
])

In [12]:
from sklearn.model_selection import train_test_split

data =pandas_df
X = data.drop('price', axis = 1)
#You can covert the target variable to numpy 
y = data['price'].values

X_train, X_test, y_train, y_test = train_test_split( X, y , test_size = 0.2 , random_state = 42 )

In [13]:
pipeline_model.get_params().keys()

dict_keys(['memory', 'steps', 'verbose', 'full_transformation', 'model', 'full_transformation__n_jobs', 'full_transformation__transformer_list', 'full_transformation__transformer_weights', 'full_transformation__verbose', 'full_transformation__cat_pipe', 'full_transformation__num_pipe', 'full_transformation__cat_pipe__memory', 'full_transformation__cat_pipe__steps', 'full_transformation__cat_pipe__verbose', 'full_transformation__cat_pipe__cat_selector', 'full_transformation__cat_pipe__cat_transform', 'full_transformation__cat_pipe__one_hot_encoding', 'full_transformation__cat_pipe__cat_selector__feature_names', 'full_transformation__cat_pipe__cat_transform__use_date', 'full_transformation__cat_pipe__one_hot_encoding__categories', 'full_transformation__cat_pipe__one_hot_encoding__drop', 'full_transformation__cat_pipe__one_hot_encoding__dtype', 'full_transformation__cat_pipe__one_hot_encoding__handle_unknown', 'full_transformation__cat_pipe__one_hot_encoding__sparse', 'full_transformation

In [14]:
pipeline_model.set_params(model__random_state = 42,
                         model__max_depth=7,
                         model__n_jobs=-1,
                         model__n_estimators = 100)

Pipeline(steps=[('full_transformation',
                 FeatureUnion(transformer_list=[('cat_pipe',
                                                 Pipeline(steps=[('cat_selector',
                                                                  FeatureSelector(feature_names=['date',
                                                                                                 'waterfront',
                                                                                                 'view',
                                                                                                 'yr_renovated'])),
                                                                 ('cat_transform',
                                                                  categorical_transform()),
                                                                 ('one_hot_encoding',
                                                                  OneHotEncoder())])),
                                

In [15]:
pipeline_model.fit(X_train, y_train)

Pipeline(steps=[('full_transformation',
                 FeatureUnion(transformer_list=[('cat_pipe',
                                                 Pipeline(steps=[('cat_selector',
                                                                  FeatureSelector(feature_names=['date',
                                                                                                 'waterfront',
                                                                                                 'view',
                                                                                                 'yr_renovated'])),
                                                                 ('cat_transform',
                                                                  categorical_transform()),
                                                                 ('one_hot_encoding',
                                                                  OneHotEncoder())])),
                                

In [16]:
import warnings
warnings.filterwarnings('ignore')
pipeline_model.predict(X_test)

array([477923.25007546, 612002.71736556, 992598.90406656, ...,
       484834.35747353, 554510.30189209, 507092.73108525])

In [17]:
from sklearn.metrics import mean_squared_error
np.sqrt(mean_squared_error(pipeline_model.predict(X_test),y_test))

222951.62471218855

## Setup

In [18]:
!gsutil ls

gs://artifacts.qwiklabs-gcp-00-a49ce03b66ad.appspot.com/
gs://custom_data/
gs://qwiklabs-gcp-00-a49ce03b66ad-kubeflowpipelines-default/
gs://qwiklabs-gcp-00-a49ce03b66ad_cloudbuild/


In [21]:
REGION = 'us-central1'
ARTIFACT_STORE = 'gs://qwiklabs-gcp-00-a49ce03b66ad-kubeflowpipelines-default/'

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
DATA_ROOT='{}/data'.format(ARTIFACT_STORE)
JOB_DIR_ROOT='{}/jobs'.format(ARTIFACT_STORE)

TRAINING_FILE_PATH="gs://custom_data/kubeflow_kc_house_data.csv"
VALIDATION_FILE_PATH="gs://custom_data/kubeflow_kc_house_data.csv"

## Prepare the hyperparameter tuning application

In [22]:
import os
TRAINING_APP_FOLDER = 'training_app'
os.makedirs(TRAINING_APP_FOLDER, exist_ok=True)

In [52]:
%%writefile {TRAINING_APP_FOLDER}/train.py

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.ensemble import RandomForestRegressor
from src.preprocessing import categorical_transform,numerical_transform,FeatureSelector
import warnings
warnings.filterwarnings('ignore')
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import os
import subprocess
import sys

import fire
import pickle
import numpy as np
import pandas as pd

import hypertune
def train_evaluate(job_dir,dataset_path,n_estimators,max_depth, hptune):
    # read data 
    pandas_df = pd.read_csv("gs://custom_data/kubeflow_kc_house_data.csv")
    
    # train test split
    data =pandas_df
    X = data.drop('price', axis = 1)
    y = data['price'].values
    X_train, X_test, y_train, y_test = train_test_split(X,y,test_size = 0.2,random_state = 42)
    
    cateforical_features = ['date', 'waterfront', 'view', 'yr_renovated']
    numerical_features = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 
                          'floors','condition', 'grade', 'sqft_basement', 'yr_built']
    
    cat_pipeline = Pipeline(steps=[ ('cat_selector',FeatureSelector(cateforical_features)),
                                 ('cat_transform',categorical_transform()),
                                 ('one_hot_encoding',OneHotEncoder())
                              ])

    numerical_pipeline = Pipeline(steps= [ ('num_selector', FeatureSelector(numerical_features)),
#                                           ('num_transformer', numerical_transform()),
                                          ('imputer', SimpleImputer(strategy = 'median')),
                                          ('std_scaler', StandardScaler()) 
                                         ])
    
    full_pipeline = FeatureUnion(transformer_list=[('cat_pipe',cat_pipeline),
                                                    ('num_pipe',numerical_pipeline)])

    pipeline_model = Pipeline(steps=[('full_transformation',full_pipeline),
                                    ('model',RandomForestRegressor())
                                    ])
    pipeline_model.set_params(model__random_state = 42,
                         model__max_depth=max_depth,
                         model__n_jobs=-1,
                         model__n_estimators = n_estimators)
    
    pipeline_model.fit(X_train, y_train)
    rmse = np.sqrt(mean_squared_error(pipeline_model.predict(X_test),y_test))
    
    if hptune:
        hpt = hypertune.HyperTune()
        hpt.report_hyperparameter_tuning_metric(hyperparameter_metric_tag='rmse',
                                                  metric_value=rmse)
    if not hptune:
        model_filename = 'model.pkl'
        with open(model_filename, 'wb') as model_file:
            pickle.dump(pipeline_model, model_file)
        gcs_model_path = "{}/{}".format(job_dir, model_filename)
        subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
        print("Saved model in: {}".format(gcs_model_path))

if __name__ == "__main__":
    fire.Fire(train_evaluate)

Overwriting training_app/train.py


## package script in docker

In [53]:
%%writefile {TRAINING_APP_FOLDER}/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune scikit-learn==0.20.4 pandas==0.24.2
WORKDIR /app
COPY train.py .
COPY src/ src/ 
ENTRYPOINT ["python", "train.py"]

Overwriting training_app/Dockerfile


## build docker image using cloud build

In [54]:
IMAGE_NAME='trainer_image'
IMAGE_TAG='latest'
IMAGE_URI='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, IMAGE_TAG)

In [55]:
! gcloud builds submit --tag $IMAGE_URI $TRAINING_APP_FOLDER

Creating temporary tarball archive of 5 file(s) totalling 6.4 KiB before compression.
Uploading tarball of [training_app] to [gs://qwiklabs-gcp-00-a49ce03b66ad_cloudbuild/source/1613189903.225468-6c63c95b9a864bbab4668472149da655.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-00-a49ce03b66ad/locations/global/builds/63931f38-c4ed-4f2a-8da0-fb484c7474db].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/63931f38-c4ed-4f2a-8da0-fb484c7474db?project=576958157824].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "63931f38-c4ed-4f2a-8da0-fb484c7474db"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-00-a49ce03b66ad_cloudbuild/source/1613189903.225468-6c63c95b9a864bbab4668472149da655.tgz#1613189903601852
Copying gs://qwiklabs-gcp-00-a49ce03b66ad_cloudbuild/source/1613189903.225468-6c63c95b9a864bbab4668472149da655.tgz#1613189903601852...
/ [1 files][  2.5 KiB/  2.5 KiB]                    

## Hyperparameter tuning on AI platforms

In [56]:
%%writefile {TRAINING_APP_FOLDER}/hptuning_config.yaml

trainingInput:
  hyperparameters:
    goal: MAXIMIZE
    maxTrials: 4
    maxParallelTrials: 4
    hyperparameterMetricTag: rmse
    enableTrialEarlyStopping: TRUE 
    params:
    - parameterName: n_estimators
      type: DISCRETE
      discreteValues: [
          200,
          500
          ]
    - parameterName: max_depth
      type: DISCRETE
      discreteValues: [
          7,
          9
          ]

Overwriting training_app/hptuning_config.yaml


In [57]:
import time
JOB_NAME = "JOB_{}".format(time.strftime("%Y%m%d_%H%M%S"))
JOB_DIR = "{}/{}".format(JOB_DIR_ROOT, JOB_NAME)
SCALE_TIER = "BASIC"

In [58]:
! gcloud ai-platform jobs submit training $JOB_NAME \
--region=$REGION \
--job-dir=$JOB_DIR \
--master-image-uri=$IMAGE_URI \
--scale-tier=$SCALE_TIER \
--config $TRAINING_APP_FOLDER/hptuning_config.yaml \
-- \
--dataset_path=$TRAINING_FILE_PATH \
--hptune

Job [JOB_20210213_042105] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe JOB_20210213_042105

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs JOB_20210213_042105
jobId: JOB_20210213_042105
state: QUEUED


In [59]:
!gcloud ai-platform jobs describe $JOB_NAME

createTime: '2021-02-13T04:21:07Z'
etag: xoznWDcLzAU=
jobId: JOB_20210213_042105
state: PREPARING
trainingInput:
  args:
  - --dataset_path=gs://custom_data/kubeflow_kc_house_data.csv
  - --hptune
  hyperparameters:
    enableTrialEarlyStopping: true
    goal: MAXIMIZE
    hyperparameterMetricTag: rmse
    maxParallelTrials: 4
    maxTrials: 4
    params:
    - discreteValues:
      - 200.0
      - 500.0
      parameterName: n_estimators
      type: DISCRETE
    - discreteValues:
      - 7.0
      - 9.0
      parameterName: max_depth
      type: DISCRETE
  jobDir: gs://qwiklabs-gcp-00-a49ce03b66ad-kubeflowpipelines-default//jobs/JOB_20210213_042105
  masterConfig:
    imageUri: gcr.io/qwiklabs-gcp-00-a49ce03b66ad/trainer_image:latest
  region: us-central1
trainingOutput:
  isHyperparameterTuningJob: true

View job in the Cloud Console at:
https://console.cloud.google.com/mlengine/jobs/JOB_20210213_042105?project=qwiklabs-gcp-00-a49ce03b66ad

View logs at:
https://console.cloud.google.c

In [60]:
JOB_NAME

'JOB_20210213_042105'

In [61]:
from googleapiclient import discovery
from googleapiclient import errors


In [64]:
ml = discovery.build('ml', 'v1')

job_id = 'projects/{}/jobs/{}'.format(PROJECT_ID, JOB_NAME)
request = ml.projects().jobs().get(name=job_id)

try:
    response = request.execute()
except errors.HttpError as err:
    print(err)
except:
    print("Unexpected error")
    
response

{'jobId': 'JOB_20210213_042105',
 'trainingInput': {'args': ['--dataset_path=gs://custom_data/kubeflow_kc_house_data.csv',
   '--hptune'],
  'hyperparameters': {'goal': 'MAXIMIZE',
   'params': [{'parameterName': 'n_estimators',
     'type': 'DISCRETE',
     'discreteValues': [200, 500]},
    {'parameterName': 'max_depth',
     'type': 'DISCRETE',
     'discreteValues': [7, 9]}],
   'maxTrials': 4,
   'maxParallelTrials': 4,
   'hyperparameterMetricTag': 'rmse',
   'enableTrialEarlyStopping': True},
  'region': 'us-central1',
  'jobDir': 'gs://qwiklabs-gcp-00-a49ce03b66ad-kubeflowpipelines-default//jobs/JOB_20210213_042105',
  'masterConfig': {'imageUri': 'gcr.io/qwiklabs-gcp-00-a49ce03b66ad/trainer_image:latest'}},
 'createTime': '2021-02-13T04:21:07Z',
 'startTime': '2021-02-13T04:21:09Z',
 'state': 'RUNNING',
 'trainingOutput': {'isHyperparameterTuningJob': True,
  'hyperparameterMetricTag': 'rmse'},
 'etag': 'in2TTNfKIuc='}

In [63]:
response['trainingOutput']['trials'][0]

KeyError: 'trials'