In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, StopWordsRemover,RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, IndexToString, StandardScaler
import shutil
import os

In [59]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.81.jar,xgboost4j-0.81.jar pyspark-shell'

In [58]:
spark.stop()

In [60]:
#open Spark Session
spark = SparkSession.builder.appName('prices_houses').master("local[*]").\
config("spark.jars", "xgboost4j-spark-0.81.jar,xgboost4j-0.81.jar").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.addPyFile("sparkxgb.zip")

In [61]:
#read source I remove last 10 entries for predictions 
data=spark.read.csv("train.csv", inferSchema=True,sep=',',header=True)
datatest=spark.read.csv("test.csv", schema=data.schema,sep=',',header=True)

In [62]:
len(data.columns),len(datatest.columns)

(81, 81)

In [63]:
columnListS = [item[0] for item in data.dtypes if item[1].startswith('string')]
columnListI = [item[0] for item in data.dtypes if item[1].startswith('int')]
columnListS_T = [item[0] for item in datatest.dtypes if item[1].startswith('string')]
columnListI_T = [item[0] for item in datatest.dtypes if item[1].startswith('int')]

In [64]:
datatest=datatest.fillna("NO_Value",subset=columnListS_T)
datatest=datatest.fillna(0,subset=columnListI_T)
data=data.fillna("NO_Value",subset=columnListS)

In [46]:
temp=data.groupby('SaleCondition').agg(sf.count(sf.lit(0)).alias("Num Of Records"))

In [9]:
num=temp.select('SaleCondition').count()

In [10]:
num

6

In [11]:
for item in columnListS:
    temp=data.groupby(sf.col(item)).count()
    num=temp.select(sf.col(item)).count()
    print(str(item)+" number of Categories: {}".format(num))

MSZoning number of Categories: 5
LotFrontage number of Categories: 111
Street number of Categories: 2
Alley number of Categories: 3
LotShape number of Categories: 4
LandContour number of Categories: 4
Utilities number of Categories: 2
LotConfig number of Categories: 5
LandSlope number of Categories: 3
Neighborhood number of Categories: 25
Condition1 number of Categories: 9
Condition2 number of Categories: 8
BldgType number of Categories: 5
HouseStyle number of Categories: 8
RoofStyle number of Categories: 6
RoofMatl number of Categories: 8
Exterior1st number of Categories: 15
Exterior2nd number of Categories: 16
MasVnrType number of Categories: 5
MasVnrArea number of Categories: 328
ExterQual number of Categories: 4
ExterCond number of Categories: 5
Foundation number of Categories: 6
BsmtQual number of Categories: 5
BsmtCond number of Categories: 5
BsmtExposure number of Categories: 5
BsmtFinType1 number of Categories: 7
BsmtFinType2 number of Categories: 7
Heating number of Categories

In [12]:
resumen=data.select(*columnListI).toPandas()

In [13]:
resumen.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Id,1460.0,730.5,421.610009,1.0,365.75,730.5,1095.25,1460.0
MSSubClass,1460.0,56.89726,42.300571,20.0,20.0,50.0,70.0,190.0
LotArea,1460.0,10516.828082,9981.264932,1300.0,7553.5,9478.5,11601.5,215245.0
OverallQual,1460.0,6.099315,1.382997,1.0,5.0,6.0,7.0,10.0
OverallCond,1460.0,5.575342,1.112799,1.0,5.0,5.0,6.0,9.0
YearBuilt,1460.0,1971.267808,30.202904,1872.0,1954.0,1973.0,2000.0,2010.0
YearRemodAdd,1460.0,1984.865753,20.645407,1950.0,1967.0,1994.0,2004.0,2010.0
BsmtFinSF1,1460.0,443.639726,456.098091,0.0,0.0,383.5,712.25,5644.0
BsmtFinSF2,1460.0,46.549315,161.319273,0.0,0.0,0.0,0.0,1474.0
BsmtUnfSF,1460.0,567.240411,441.866955,0.0,223.0,477.5,808.0,2336.0


In [65]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index",handleInvalid="skip").fit(data) for column in list(columnListS) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(data).transform(data)

indexers2 = [StringIndexer(inputCol=column, outputCol=column+"_index",handleInvalid="skip").fit(datatest) for column in list(columnListS_T) ]
pipeline2 = Pipeline(stages=indexers2)
df_rt = pipeline2.fit(datatest).transform(datatest)

#df_r.show()

In [66]:
len(columnListI),len(columnListI_T)

(35, 35)

In [67]:
NumericalColumns=columnListI[1:34]
NumericalColumns2=columnListI_T[1:34]
                             


In [68]:
assembler=VectorAssembler(inputCols=NumericalColumns, outputCol="features") 
assembler2=VectorAssembler(inputCols=NumericalColumns2, outputCol="features") 


In [69]:
scaler=StandardScaler(withMean = True, withStd = True, inputCol="features",outputCol="features_standard")
scaler2=StandardScaler(withMean = True, withStd = True, inputCol="features",outputCol="features_standard")

In [70]:
data_assembed=assembler.transform(df_r)
data_test_assembed=assembler2.transform(df_rt)

In [71]:
data_scaled=scaler.fit(data_assembed).transform(data_assembed)
data_test_scaled=scaler2.fit(data_test_assembed).transform(data_test_assembed)

In [72]:
data_test_scaled.select('features_standard').show(n=1,truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_standard                                                                                                                                                                                                                                                                                                                                

In [73]:
columnListS_Cat=[item for item in df_r.columns if str(item).endswith('_index')]


In [74]:
assembler2=VectorAssembler(inputCols=columnListS_Cat, outputCol="features2")
assembler2t=VectorAssembler(inputCols=columnListS_Cat, outputCol="features2")


In [75]:
data_scaled=assembler2.transform(data_scaled)
data_scaled_test=assembler2t.transform(data_test_scaled)

In [76]:
Dataout=data_scaled.select('features_standard',"features2" ,"SalePrice")
Dataout_test=data_scaled_test.select('features_standard',"features2")

In [77]:
assembler3=VectorAssembler(inputCols=('features_standard',"features2"), outputCol="features")
assembler3t=VectorAssembler(inputCols=('features_standard',"features2"), outputCol="features")

In [78]:
Dataout=assembler3.transform(Dataout)
Dataout_test=assembler3t.transform(Dataout_test)

In [79]:
Dataout=Dataout.withColumn("label",sf.col("SalePrice"))
Dataout_test=Dataout.withColumn("label",sf.col("SalePrice"))

In [80]:
Dataout_test.show()

+--------------------+--------------------+---------+--------------------+------+
|   features_standard|           features2|SalePrice|            features| label|
+--------------------+--------------------+---------+--------------------+------+
|[0.07334983082099...|(46,[1,9,13,18,19...|   208500|(79,[0,1,2,3,4,5,...|208500|
|[-0.8722638821913...|(46,[1,7,9,10,16,...|   181500|(79,[0,1,2,3,4,5,...|181500|
|[0.07334983082099...|(46,[1,4,9,13,18,...|   223500|(79,[0,1,2,3,4,5,...|223500|
|[0.30975325907408...|(46,[1,4,7,9,13,1...|   140000|(79,[0,1,2,3,4,5,...|140000|
|[0.07334983082099...|(46,[1,4,7,9,13,1...|   250000|(79,[0,1,2,3,4,5,...|250000|
|[-0.1630535974320...|(46,[1,4,9,13,22,...|   143000|(79,[0,1,2,3,4,5,...|143000|
|[-0.8722638821913...|(46,[1,9,18,19,20...|   307000|(79,[0,1,2,3,4,5,...|307000|
|[0.07334983082099...|(46,[4,7,9,10,13,...|   200000|(79,[0,1,2,3,4,5,...|200000|
|[-0.1630535974320...|(46,[0,1,9,10,13,...|   129900|(79,[0,1,2,3,4,5,...|129900|
|[3.146594398111

In [81]:
train_data,test_data = Dataout.randomSplit([0.7,0.3])

In [84]:
from pyspark.ml.regression import LinearRegression

In [85]:
lr = LinearRegression()

In [86]:
lrModel = lr.fit(Dataout)

In [87]:
# Print the coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-10173.748005,4265.45239415,19430.0309676,6413.34511969,6590.79166769,337.029490434,4955.88562981,98.2206122003,-519.127973985,4665.61320442,8043.92792298,9945.13964055,-688.857814971,14115.704405,3231.95304063,295.19033042,482.579393307,-194.77290328,-5621.7406523,-2698.55077804,6915.38830542,1550.68825486,7853.28668429,1820.87695324,2577.85001772,337.32776641,-129.074362323,909.84754499,2938.20693125,-3612.85643738,479.824972358,-453.869594943,-897.667043538,-2030.72537526,130.404064812,-49266.7069153,-142.389616939,-713.718282841,-1515.08338496,-59549.8834116,947.377961337,7701.47166979,680.284199185,-3619.7787031,-7496.81793792,3037.37548905,666.686845053,5311.73783713,-17770.3201954,487.590756386,-264.010142102,-462.985018355,17.4884129379,11799.0791186,271.855589011,-2435.55063314,3958.19628031,-4675.41256809,3999.75640799,515.050941469,55.2770449333,-242.960518147,-891.077804726,-855.071924819,1957.31891136,5349.50868576,-5919.42681654,621.673459117,1781.9909417,3

In [88]:
test_results = lrModel.evaluate(Dataout_test)

In [89]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))

RMSE: 31304.492102483888
MSE: 979971225.794476
R2: 0.8446164458293475


In [90]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [91]:
gbt = GBTRegressor(maxIter=30,maxBins=350,maxDepth=3)

In [92]:
model = gbt.fit(Dataout)

In [93]:
predictions = model.transform(Dataout_test)

In [94]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.963805


In [102]:
output=predictions.select('prediction')

In [104]:
output.write.csv('output.csv')

In [102]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [108]:
rf = RandomForestRegressor(maxBins=350,numTrees=250,subsamplingRate=1.0,maxDepth=3,minInfoGain=0.1)

In [109]:
modelrf = rf.fit(Dataout)

In [110]:
predictionsrf = modelrf.transform(Dataout_test)

In [111]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(predictionsrf)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.832183
