In [1]:
df = spark.read.options(header=True,inferSchema=True)\
            .csv("hdfs://localhost:9000/user/local/hadoop_tmp/hdfs/data/boston/realboston_train.csv")

In [2]:
df.describe().toPandas()

Unnamed: 0,summary,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,...,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,count,1460.0,1460.0,1460,1460.0,1460.0,1460,1460,1460,1460,...,1460.0,1460,1460,1460,1460.0,1460.0,1460.0,1460,1460,1460.0
1,mean,730.5,56.897260273972606,,70.04995836802665,10516.828082191782,,,,,...,2.758904109589041,,,,43.489041095890414,6.321917808219178,2007.8157534246573,,,180921.19589041092
2,stddev,421.6100093688479,42.30057099381045,,24.28475177448321,9981.26493237915,,,,,...,40.17730694453021,,,,496.1230244579441,2.7036262083595117,1.3280951205521143,,,79442.50288288663
3,min,1.0,20.0,C (all),100.0,1300.0,Grvl,Grvl,IR1,Bnk,...,0.0,Ex,GdPrv,Gar2,0.0,1.0,2006.0,COD,Abnorml,34900.0
4,max,1460.0,190.0,RM,,215245.0,Pave,Pave,Reg,Lvl,...,738.0,,,TenC,15500.0,12.0,2010.0,WD,Partial,755000.0


In [3]:
columnList = [item[0] for item in df.dtypes if not item[1].startswith('string')]
print(columnList)

['Id', 'MSSubClass', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold', 'SalePrice']


In [5]:
df_numberic = df.select(columnList)

In [6]:
import six
validColumn = []
for i in df_numberic.columns:
    if not( isinstance(df_numberic.select(i).take(1)[0][0], six.string_types)):
        if (df_numberic.stat.corr('SalePrice',i) > 0.5):
            validColumn.append(i)
            print( "Correlation to SalePrice for ", i, df_numberic.stat.corr('SalePrice',i))

Correlation to SalePrice for  OverallQual 0.7909816005838053
Correlation to SalePrice for  YearBuilt 0.522897332879497
Correlation to SalePrice for  YearRemodAdd 0.5071009671113869
Correlation to SalePrice for  TotalBsmtSF 0.6135805515591942
Correlation to SalePrice for  1stFlrSF 0.6058521846919153
Correlation to SalePrice for  GrLivArea 0.7086244776126517
Correlation to SalePrice for  FullBath 0.5606637627484453
Correlation to SalePrice for  TotRmsAbvGrd 0.5337231555820284
Correlation to SalePrice for  GarageCars 0.6404091972583519
Correlation to SalePrice for  GarageArea 0.6234314389183622
Correlation to SalePrice for  SalePrice 1.0


In [13]:
print(validColumn)

['OverallQual', 'YearBuilt', 'YearRemodAdd', 'TotalBsmtSF', '1stFlrSF', 'GrLivArea', 'FullBath', 'TotRmsAbvGrd', 'GarageCars', 'GarageArea', 'SalePrice']


In [7]:
df_valid = df_numberic.select(validColumn)

In [8]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['OverallQual', 'YearBuilt', 'YearRemodAdd', 'TotalBsmtSF', '1stFlrSF', 'GrLivArea', 'FullBath', 'TotRmsAbvGrd', 'GarageCars', 'GarageArea'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df_valid)
vhouse_df = vhouse_df.select(['features', 'SalePrice'])
vhouse_df.show(5)

+--------------------+---------+
|            features|SalePrice|
+--------------------+---------+
|[7.0,2003.0,2003....|   208500|
|[6.0,1976.0,1976....|   181500|
|[7.0,2001.0,2002....|   223500|
|[7.0,1915.0,1970....|   140000|
|[8.0,2000.0,2000....|   250000|
+--------------------+---------+
only showing top 5 rows



In [9]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit , CrossValidator

In [11]:
regParam = [x / 100.0 for x in range(1, 11)]
elasticNetParam = [x / 10.0 for x in range(0, 10)]
maxIter = [10,20,30]

In [12]:
train_df, test_df = vhouse_df.randomSplit([0.8, 0.2], seed=12345)

In [13]:
lr = LinearRegression(featuresCol = 'features', labelCol='SalePrice')

In [14]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.maxIter, maxIter) \
    .addGrid(lr.regParam, regParam) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, elasticNetParam)\
    .build()

In [15]:
cv = CrossValidator(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(predictionCol="prediction", \
                                 labelCol="SalePrice",metricName="r2"),
                           # 80% of the data will be used for training, 20% for validation.
                           numFolds=4)

In [16]:
model_cv = cv.fit(train_df)
print("Max iteration  : %g " %model_cv.bestModel._java_obj.getMaxIter())
print("Reg  : %g " %model_cv.bestModel._java_obj.getRegParam())
print("Elastic net Param : %g " %model_cv.bestModel._java_obj.getElasticNetParam())
print("r2 : %g " % model_cv.bestModel.summary.r2)
print("RMSE : %g " % model_cv.bestModel.summary.rootMeanSquaredError)

Max iteration  : 20 
Reg  : 0.07 
Elastic net Param : 0.6 
r2 : 0.80306 
RMSE : 35962.4 


In [17]:
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(predictionCol="prediction", \
                                 labelCol="SalePrice",metricName="r2"),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [18]:
model_tvs = tvs.fit(train_df)
print("Max iteration  : %g " %model_tvs.bestModel._java_obj.getMaxIter())
print("Reg  : %g " %model_tvs.bestModel._java_obj.getRegParam())
print("Elastic net Param : %g " %model_tvs.bestModel._java_obj.getElasticNetParam())
print("r2 : %g " % model_tvs.bestModel.summary.r2)
print("RMSE : %g " % model_tvs.bestModel.summary.rootMeanSquaredError)

Max iteration  : 10 
Reg  : 0.1 
Elastic net Param : 0.9 
r2 : 0.799356 
RMSE : 36299 
