# House Price Prediction

In [2]:
import pandas as pd
import numpy as np

import google.cloud.aiplatform as aip
from google.cloud import storage
import gcsfs

from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp.v2 import compiler


In [3]:
BUCKET_URI = 'gs://mle-gcp-kfp-v1'
PIPELINE_ROOT = "{}/pipeline_root/house_price".format(BUCKET_URI)

In [4]:
PROJECT_ID = "tiger-mle"
REGION = "us-east1"
ZONE = ""

In [5]:
! gcloud config set project $PROJECT_ID
! gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


In [6]:
!gcloud auth list

                  Credentialed Accounts
ACTIVE  ACCOUNT
*       378786916136-compute@developer.gserviceaccount.com

To set the active account, run:
    $ gcloud config set account `ACCOUNT`



In [7]:
project_id = 'tiger-mle'
file_uri = 'gs://vertex-ai-bucket-house-price-pred/data_base/housing.csv'

### Reading data from GCP Bucket

In [8]:
@component(packages_to_install=['gcsfs==2022.02.0','pandas==1.1.4','scikit-learn==1.0.1'])
def get_data(project_id: str,file_uri: str,house_dataset: Output[Dataset]):
    import gcsfs
    import pandas as pd
    from sklearn.model_selection import train_test_split as tts
    
    fs = gcsfs.GCSFileSystem()
    f = fs.open(file_uri)
    df = pd.read_csv(f)
    
    """train, test = tts(df, test_size=0.3)
    
    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)"""
    
    df.to_csv(house_dataset.path)
    #return df

### Preprocessing the data

In [9]:
@component(  packages_to_install = [
        "pandas==1.3.4",
        "xgboost==1.5.1",
        "scikit-learn==1.0.1",
        "numpy"
    ],
)
def preprocess(
        dataset: Input[Dataset],
        dataset_train: Output[Dataset],
        dataset_test: Output[Dataset]):
    import pandas as pd
    from sklearn.impute import SimpleImputer
    from sklearn.model_selection import train_test_split as tts
    from sklearn.model_selection import StratifiedShuffleSplit
    import numpy as np
    
    housing = pd.read_csv(dataset.path)
    print('ok')
    
    #train_set, test_set = tts(data, test_size=0.2, random_state=42)
    
    
    imputer = SimpleImputer(strategy="median")

    housing_num = housing.drop("ocean_proximity", axis=1)
    
    housing_num["income_cat"] = pd.cut(
        housing_num["median_income"],
        bins=[0.0, 1.5, 3.0, 4.5, 6.0, np.inf],
        labels=[1, 2, 3, 4, 5]
    )
    
    imputer.fit(housing_num)
    X = imputer.transform(housing_num)
    housing_tr = pd.DataFrame(X, index=housing_num.sort_index().index,columns=housing_num.columns)

    split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    for train_index, test_index in split.split(housing_tr, housing_tr["income_cat"]):
        strat_train_set = housing_tr.loc[train_index]
        strat_test_set = housing_tr.loc[test_index]
        
    strat_train_set.to_csv(dataset_train.path)
    strat_test_set.to_csv(dataset_test.path)
    print(strat_train_set.columns,strat_train_set.shape)
    print(strat_test_set.columns,strat_test_set.shape)

### Train a model

In [10]:
@component(
    packages_to_install=["scikit-learn==1.0.1","pandas","xgboost==1.5.1","joblib","google.cloud"])
def train_model(
    dataset_train: Input[Dataset],
    model_artifact: Output[Model],
    model: Output[Model]
    
):
    
    from sklearn.linear_model import LinearRegression
    from xgboost import XGBRegressor
    import pandas as pd
    import joblib
    from google.cloud import storage
    import os
    
    housing_train = pd.read_csv(dataset_train.path)
    
    housing_prepared = housing_train.drop(columns=['median_house_value']).values
    print("housing_prepared",housing_prepared)
    housing_labels = housing_train["median_house_value"].copy()
    print("housing_labels",housing_labels)
    #print('ok')
    
    #lin_reg = LinearRegression()
    xgmodel = XGBRegressor()
    print('start--')
    xgmodel.fit(housing_prepared,housing_labels)
    print('fit--')
    score = xgmodel.score(
        housing_prepared,
        housing_labels
    )
    print('save--')
    
    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "xgboost"

    print(model_artifact.path)
    
    xgmodel.save_model(model_artifact.path)
    #model.save(model.path)
    
    #save model
    print("Dump")
    filename = "house_model.joblib"
    joblib.dump(xgmodel,filename)
    
    print(model_artifact.path)
    print(model.path)
    
    print("gcp save")
    print(filename,type(filename))
    
    bucket = storage.Client().bucket('mle-gcp-kfp-v1')
    #des_bob = 'models'
    path = model.path
    path = path.replace('/gcs/mle-gcp-kfp-v1/','')
    #path = path.replace('model_save_gcp','')
    print('path--',path)
    blob = bucket.blob(path+str('.joblib'))
    print('blob--',blob)
    blob.upload_from_filename(filename)
    """
    #lient = storage.Client()
    #bucket = client.get_bucket('mle-gcp-tbk-1')
    bucket = storage.Client().bucket('gs://mle-gcp-kfp-v1')
    #ucket = client.get_bucket(model.path)
    #blob = bucket.blob('model')
    path = model_save_gcp.path
    path = path.replace('/gcs/mle-gcp-kfp-v1/','/')
    path = path.replace('model_save_gcp','')
    print(path)
    print('saving')
    #blob = bucket.blob('{}/{}'.format(path,filename))
    #blob = bucket.blob('{}'.format(path))
    #blob = blob.replace(', /','/')
    print(blob)
    blob.upload_from_filename(filename) """
    
    """
    model_directory = os.environ[model_save_gcp.path]
    print('1')
    storage_path = os.path.join(model_directory, filename)
    print('2')
    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
    print('1')
    blob.upload_from_filename(filename) """
    
    """
    # Upload model artifact to Cloud Storage
    model_directory = os.environ['AIP_MODEL_DIR']
    storage_path = os.path.join(model_directory, artifact_filename)
    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
    blob.upload_from_filename(local_path)
    """
    

In [11]:
from google import cloud

### Evaluation

In [12]:
@component(
    packages_to_install = [
        "pandas==1.3.4",
        "scikit-learn==1.0.1",
        "xgboost==1.5.1"
    ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
) -> NamedTuple("Outputs", [("deploy", str)]): 
    from xgboost import XGBRegressor
    import pandas as pd
    
    data = pd.read_csv(test_set.path)
    model = XGBRegressor()
    model.load_model(xgb_model.path)
    
    
    housing_prepared = data.drop(columns=['median_house_value']).values
    housing_labels = data["median_house_value"].copy()
    
    score = model.score(
        housing_prepared,
        housing_labels,
    )
    
    xgb_model.metadata["test_score"] = float(score)

    deploy = "true"
    #compare threshold or to previous

    return (deploy,)

### Deployment

In [42]:
@component(packages_to_install=["google-cloud-aiplatform==1.3.0"])
def deploying(
    model: Input[Model],
    project: str,
    region: str,):
  
    import logging
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    
    
    logging.basicConfig(level=logging.DEBUG)
    logging.debug(model)

    print('model--',model)

    import os
    path,file = os.path.split(model.uri)

    import datetime

      # datetime.datetime.now().strftime('%Y%m%d%H%M%S')
      # serving image https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#xgboost
    deployed_model = aiplatform.Model.upload(
           display_name="xgboost-pipeline-1",
           artifact_uri = path,
           serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest"
      ) 

    """
    #end point
    print('create end point')
    endpoint = aiplatform.Endpoint.create(
        display_name='xgboost-pipeline-1', project=project, location=region,
    )
    print('ep name',endpoint.display_name)
    print('ep rname',endpoint.resource_name)
    
    print('Deploy model')
    deploy_model = aiplatform.Model(model_name='xgboost-pipeline-1')
    
    deply_model.deploy(
        endpoint = endpoint,
        deployed_model_display_name = "xgboost-pipeline-1",
        machine_type= "n1-standard-2",
        min_replica_count = 1,
        max_replica_count = 1,
        sync = True,
    )
    
    deployed_model.wait()
    
    print('dm name--',deployed_model.display_name)
    print('dm rname--',deployed_model.resource_name)
    
    """

### Building a Pipeline

In [43]:
@dsl.pipeline(
    name="house-price-pred",
    description="House prediction",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(project_id: str = 'tiger-mle',
                 file_uri: str = 'gs://vertex-ai-bucket-house-price-pred/data_base/housing.csv'):
    get_data_op = get_data(project_id,file_uri)
    pipe_preprocess = preprocess(get_data_op.outputs['house_dataset'])
    pipe_train_model = train_model(pipe_preprocess.outputs['dataset_train'])
    eval_op = eval_model(
        pipe_preprocess.outputs['dataset_test'],
        xgb_model=pipe_train_model.outputs["model_artifact"]
    )
    deploy_op = deploying(pipe_train_model.outputs["model_artifact"], 
                         "tiger-mle",
                         "us-east1")

### Complie

In [44]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(pipeline_func=pipeline, package_path="house_pipeline.json")

In [16]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Run the job

In [45]:
DISPLAY_NAME = "intro_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="house_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/378786916136/locations/us-central1/pipelineJobs/house-price-pred-20220517130001
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/378786916136/locations/us-central1/pipelineJobs/house-price-pred-20220517130001')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/house-price-pred-20220517130001?project=378786916136
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/378786916136/locations/us-central1/pipelineJobs/house-price-pred-20220517130001 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/378786916136/locations/us-central1/pipelineJobs/h