In [156]:
import json
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import OutputPath

In [157]:
KFP_HOST_NAME='https://2401d996fdc4badc-dot-us-central2.pipelines.googleusercontent.com'
client = kfp.Client(host=KFP_HOST_NAME)

In [158]:
# import numpy as np
# import pandas as pd
# import matplotlib.pyplot as plt
# %matplotlib inline

# # preprocessing
# from sklearn.preprocessing import LabelEncoder, StandardScaler,OneHotEncoder
# from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, StratifiedKFold
# from sklearn.pipeline import make_pipeline,Pipeline
# from sklearn.compose import make_column_transformer
# from sklearn.model_selection import cross_validate,ShuffleSplit

# from sklearn import metrics


# # models

# from sklearn.linear_model import LinearRegression, SGDRegressor, RidgeCV
# from sklearn.svm import SVR, LinearSVR
# from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor 
# from sklearn.model_selection import cross_val_predict as cvp
# from sklearn import metrics
# from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
# from sklearn.neural_network import MLPRegressor
# from sklearn.tree import DecisionTreeRegressor

In [159]:
#Data Preprocessing
def data_preprocessing(raw_data_path: str, prep_data_path: str, bucket: str) -> str:

    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import LabelEncoder, StandardScaler,OneHotEncoder
    from sklearn.pipeline import make_pipeline,Pipeline
    from sklearn.compose import make_column_transformer
    from google.cloud import storage    
    from io import BytesIO

    # client = storage.Client()
    # file_name = raw_data
    # bucket = client.get_bucket(bucket_name)
    # blob = bucket.get_blob(raw_data)
    # content = blob.download_as_string()
    #df = pd.read_csv(BytesIO(content))
    df = pd.read_csv(bucket + raw_data_path)


    #deleting unwanted columns
    drop_columns = ['id','url', 'region', 'region_url','model','title_status', 'title_status','county', 'vin', 'description','size', 'image_url', 'lat','long','state','paint_color','cylinders']
    df = df.drop(columns=drop_columns)
    #deleting rows with nan values
    df = df.dropna()
    #reformatting/cleaning numeric columns
    df['price'] = df['price'].astype(int)
    df['year'] = df['year'].astype(int)
    df['odometer'] = df['odometer'].astype(int)
    df['odometer'] = df['odometer'] // 5000
    df = df[df['year'] > 110]
    df = df[(df['price']>1000) & (df['price']<50000)]

    #reformatting/cleaning categorical columns
    df['manufacturer'] = df['manufacturer'].astype(str)
    df['condition'] = df['condition'].astype(str)
    # df['cylinders'] = df['cylinders'].astype(str)
    df['fuel'] = df['fuel'].astype(str)
    df['transmission'] = df['transmission'].astype(str)
    df['drive'] = df['drive'].astype(str)
    df['type'] = df['type'].astype(str)
    df=df[df['transmission']!='other']
    df=df.reset_index()

    #label encode columns

    lab_cat_columns=['condition','transmission']

    for col in lab_cat_columns:
        if col in df.columns:
            le = LabelEncoder()
            le.fit(list(df[col].astype(str).values))
            df[col] = le.transform(list(df[col].astype(str).values))
            
    #Creating pipeline

    numerical_features=['year', 'odometer']
    one_hot_cat_columns=['manufacturer','fuel','drive','type']


    categoric_transformer = make_pipeline(OneHotEncoder(sparse=False,handle_unknown='ignore'))

    # Creating a pipeline with mean imputer for numerical data 
    numeric_transformer =  make_pipeline(StandardScaler())  

    #Creating label transformer

    # label_transformer=make_pipeline(LabelEncoder())

    # Combining both pipelines such that each pipeline works on the columns it was meant for
    preprocessor = make_column_transformer((categoric_transformer,one_hot_cat_columns),
                                            (numeric_transformer,numerical_features))
    #                                           (label_transformer,lab_cat_columns))

    pipe=Pipeline(steps = [('prep',preprocessor)])
    results=pipe.fit_transform(df)
    results=pd.DataFrame(data=results, columns=list(pd.get_dummies(df[one_hot_cat_columns]).columns)+numerical_features )

    final_df=results
    # final_df['year']=df['year']
    # final_df['odometer']=df['odometer']
    final_df['condition']=df['condition']
    final_df['transmission']=df['transmission']
    final_df['price']=df['price']

    final_df.to_csv(bucket + prep_data_path)
    return prep_data_path

In [160]:
data_preprocessing_op = comp.create_component_from_func(
    data_preprocessing, output_component_file='data_preprocessing.yaml', packages_to_install=['pandas','scikit-learn', 'fsspec', 'gcsfs', 'google-cloud-storage'])

In [161]:
# def _read_and_split_data (path, bucket):
#     #read csv file from bucket
#     import pandas as pd
#     import numpy as np
#     df = pd.read_csv(bucket + path)

#     #Seperating dataset and target variable
#     target_name = 'price'
#     df_target = df[target_name]
#     df = df.drop([target_name], axis=1)
#     #Train test split
#     train, test, target, target_test = train_test_split(df, df_target, test_size=0.2, random_state=0)
#     return [train, test, target, target_test]

In [162]:
# linreg = LinearRegression()
# linreg.fit(train, target)
# score=linreg.score(train, target)
# y_pred=linreg.predict(test)
# # # values_predictions = pd.DataFrame({'Actual': target_test, 'Predicted': predictions})
# # print('Mean Absolute Error:', metrics.mean_absolute_error(target_test, y_pred))
# # print('Mean Squared Error:', metrics.mean_squared_error(target_test, y_pred))
# # print('Root Mean Squared Error:', np.sqrt(metrics.mean_squared_error(target_test, y_pred)))
# print('R^2 on the train set', score)
# print('R^2 on the test set', metrics.r2_score(target_test, y_pred))


In [163]:
# decision_tree = DecisionTreeRegressor()
# decision_tree.fit(train, target)
# score=decision_tree.score(train, target)
# y_pred=decision_tree.predict(test)
# print('R^2 on the train set', score)
# print('R^2 on the test set', metrics.r2_score(target_test, y_pred))



In [164]:
def rf_model_training(prep_data_path: str, bucket: str, bucket_name: str, model_path: str) ->str:
    
    import pandas as pd
    import numpy as np
    from google.cloud import storage  
    from sklearn.model_selection import train_test_split, ShuffleSplit, GridSearchCV, StratifiedKFold
    import _pickle as cPickle 
    from sklearn import metrics
    from sklearn.ensemble import RandomForestRegressor  

    df = pd.read_csv(bucket + prep_data_path)

    #Seperating dataset and target variable
    target_name = 'price'
    df_target = df[target_name]
    df = df.drop([target_name], axis=1)
    #Train test split
    train, test, target, target_test = train_test_split(df, df_target, test_size=0.2, random_state=0)
    #return [train, test, target, target_test]

    #read preprocessed data
    #train, test, target, target_test = _read_and_split_data(prep_data_path, bucket)
    #Tuning RF Parameters
    rf_param_grid = {'n_estimators': [100,500],
                'max_features': [0.2,0.7]
                }
    rf_GS = GridSearchCV(RandomForestRegressor(n_jobs=-1), param_grid=rf_param_grid,
                    cv=ShuffleSplit(n_splits=3,random_state=1), verbose=False, pre_dispatch='2*n_jobs')

    rf_GS.fit(train, target)


    score=rf_GS.score(train, target)
    y_pred=rf_GS.predict(test)
    #print('R^2 on the train set', score)
    print('R2 score', metrics.r2_score(target_test, y_pred))

    temp_model_path='/tmp/rf_model.pickle'
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(rf_GS, f, -1)
    
    # parse = urlparse(url=tuned_model_path, allow_fragments=False)
    
    # if parse.path[0] =='/':
    #     model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    return model_path

In [165]:
rf_model_training_op = comp.create_component_from_func(
    rf_model_training, output_component_file='rf_model_training.yaml', packages_to_install=['pandas','scikit-learn', 'fsspec', 'gcsfs', 'google-cloud-storage'])

In [166]:
@dsl.pipeline(
  name='Used car value',
  description='estimating the price of used cars'
)

def used_car_pipeline(raw_data_path, prep_data_path, bucket, bucket_name, model_path, disable_cache): 
    
    print(raw_data_path, prep_data_path, bucket)
    data_preprocessing_task = data_preprocessing_op(raw_data_path, prep_data_path, bucket)    
    rf_model_training_task = rf_model_training_op(data_preprocessing_task.output, bucket, bucket_name, model_path)

    if disable_cache:
        data_preprocessing_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        rf_model_training_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

arguments = {
    'raw_data_path': '/data/raw_vehicles.csv',
    'prep_data_path': '/data/prep_vehicles.csv',
    'bucket': 'gs://de-3',
    'bucket_name': 'de-3',
    'model_path': '/models/rf_model.pickle',
    'disable_cache': False
}

# Create a pipeline run
client.create_run_from_pipeline_func(used_car_pipeline, arguments= arguments)

{{pipelineparam:op=;name=raw_data_path}} {{pipelineparam:op=;name=prep_data_path}} {{pipelineparam:op=;name=bucket}}


RunPipelineResult(run_id=43ca8eb0-448b-4041-a2ca-2aa7b8cfacba)