In [1]:
# File location and type
file_location = "/FileStore/tables/kc_house_data.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,date,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view,condition,grade,sqft_above,sqft_basement,yr_built,yr_renovated,zipcode,lat,long,sqft_living15,sqft_lot15
7129300520,20141013T000000,221900.0,3,1.0,1180,5650,1.0,0,0,3,7,1180,0,1955,0,98178,47.5112,-122.257,1340,5650
6414100192,20141209T000000,538000.0,3,2.25,2570,7242,2.0,0,0,3,7,2170,400,1951,1991,98125,47.721,-122.319,1690,7639
5631500400,20150225T000000,180000.0,2,1.0,770,10000,1.0,0,0,3,6,770,0,1933,0,98028,47.7379,-122.233,2720,8062
2487200875,20141209T000000,604000.0,4,3.0,1960,5000,1.0,0,0,5,7,1050,910,1965,0,98136,47.5208,-122.393,1360,5000
1954400510,20150218T000000,510000.0,3,2.0,1680,8080,1.0,0,0,3,8,1680,0,1987,0,98074,47.6168,-122.045,1800,7503
7237550310,20140512T000000,1225000.0,4,4.5,5420,101930,1.0,0,0,3,11,3890,1530,2001,0,98053,47.6561,-122.005,4760,101930
1321400060,20140627T000000,257500.0,3,2.25,1715,6819,2.0,0,0,3,7,1715,0,1995,0,98003,47.3097,-122.327,2238,6819
2008000270,20150115T000000,291850.0,3,1.5,1060,9711,1.0,0,0,3,7,1060,0,1963,0,98198,47.4095,-122.315,1650,9711
2414600126,20150415T000000,229500.0,3,1.0,1780,7470,1.0,0,0,3,7,1050,730,1960,0,98146,47.5123,-122.337,1780,8113
3793500160,20150312T000000,323000.0,3,2.5,1890,6560,2.0,0,0,3,7,1890,0,2003,0,98038,47.3684,-122.031,2390,7570


In [2]:
pandas_df = df.toPandas()

In [3]:
import numpy as np 
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.linear_model import LinearRegression
from src.preprocessing import categorical_transform,numerical_transform,FeatureSelector

In [4]:
cateforical_features = ['date', 'waterfront', 'view', 'yr_renovated']
numerical_features = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors',
                'condition', 'grade', 'sqft_basement', 'yr_built']

In [5]:
cat_pipeline = Pipeline(steps=[ ('cat_selector',FeatureSelector(cateforical_features)),
                                 ('cat_transform',categorical_transform()),
                                 ('one_hot_encoding',OneHotEncoder())
                              ])

In [6]:
numerical_pipeline = Pipeline(steps= [ ('num_selector', FeatureSelector(numerical_features)),
                                      ('num_transformer', numerical_transform()),
                                      ('imputer', SimpleImputer(strategy = 'median')),
                                      ('std_scaler', StandardScaler()) 
                                     ])

In [7]:
full_pipeline = FeatureUnion(transformer_list=[
    ('cat_pipe',cat_pipeline),
    ('num_pipe',numerical_pipeline)
])

In [8]:
pipeline_model = Pipeline(steps=[
    ('full_transformation',full_pipeline),
    ('model',LinearRegression())
])

In [9]:
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

data =pandas_df

X = data.drop('price', axis = 1)
#You can covert the target variable to numpy 
y = data['price'].values 

X_train, X_test, y_train, y_test = train_test_split( X, y , test_size = 0.2 , random_state = 42 )

In [10]:
pipeline_model.get_params().keys()

In [11]:
# from sklearn.model_selection import GridSearchCV
# grid_params = {
#   'model__fit_intercept': [True,False],
#   'full_transformation__cat_pipe__cat_selector__feature_names':[cateforical_features],
#   'full_transformation__num_pipe__num_selector__feature_names':[numerical_features]
# }
# clf = GridSearchCV(pipeline_model, grid_params)
# clf.fit(X_train,y_train)

In [12]:
import sys
!{sys.executable} -m pip install mlflow
!{sys.executable} -m pip install hyperopt

In [13]:
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK, SparkTrials
from hyperopt.pyll import scope
from sklearn.metrics import mean_squared_error
import mlflow

In [14]:
mlflow.set_tracking_uri("/Users/rohit.shah@citiustech.com/Research_Project/test_experiment")

In [15]:
def train(params):
  mlflow.sklearn.autolog()
  with mlflow.start_run(nested=True):
    model__fit_intercept = params['model__fit_intercept']
    full_transformation__cat_pipe__cat_selector__feature_names = params['full_transformation__cat_pipe__cat_selector__feature_names']
    full_transformation__num_pipe__num_selector__feature_names = params['full_transformation__cat_pipe__cat_selector__feature_names']

    pipeline_model.fit(X_train, y_train)
    predictions = pipeline_model.predict(X_test)
    score = mean_squared_error(y_test, predictions)
    mlflow.log_metric('score', score)
  return {'loss': -score, 'status': STATUS_OK}

In [16]:
space = {
    'model__fit_intercept': hp.choice('model__fit_intercept', [True,False]),
    'full_transformation__cat_pipe__cat_selector__feature_names':hp.choice('full_transformation__cat_pipe__cat_selector__feature_names',[[cateforical_features]]),
    'full_transformation__num_pipe__num_selector__feature_names':hp.choice('full_transformation__num_pipe__num_selector__feature_names',[[numerical_features]]),
}

tpe_algorithm = tpe.suggest

In [17]:
# best_hyperparameters = fmin(
#   fn=train,
#   space=space,
#   algo=tpe_algorithm,
#   max_evals=2)

In [18]:
spark_trials = SparkTrials(parallelism=2)

best_hyperparameters = fmin(
  fn=train,
  space=space,
  algo=tpe_algorithm,
  trials=spark_trials,
  max_evals=2)