In [1]:
#Importer la bibliotheque findspark pour ajouter pyspark à sys.path au runtime, 
#cela rend sparkContext disponible dans le code
import findspark

#initialiser findspark pour que pyspark soit importable
findspark.init()

#Importer la bibliotheque pyspark
import pyspark

#Point d'entrée principale pour utiliser les fonctionnalités de spark. 
#SparkContext represente la connection à un cluster Spark.
from pyspark.context import SparkContext

#Point d'entrée pour interagir avec les Datasets et les DataFrames.
from pyspark.sql.session import SparkSession

#fonctions de la bibliotheque sql de pyspark
#count: retourne le nombre d'element dans une collection
#when : Evalue une liste de condition et retourne les elements qui les satisfaient 
#avg  : retourne la moyenne des elements d'une collection
#translate: change une valeur données pour une colonne par une autre valeur
#isnull: retourne vrai si l'element a une valeur nulle
from pyspark.sql.functions import isnull, when, count, avg, translate, col

#StringIndexer: encode qui a des valeurs sous forme de chaine de caractere à une colonne de valeur
#sous forme d'indice entier.
#VectorAssembler: un transformateur qui combine une liste donnée de colonnes en une seule colonne vectorielle.
from pyspark.ml.feature import StringIndexer, VectorAssembler

#LinearRegression:L'interface pour travailler avec des modèles de régression linéaire.
#RandomForestRegressor: interface pour utiliser la regression de Random Forest
from pyspark.ml.regression import LinearRegression, RandomForestRegressor

#Pour evaluer le modele de regression
from pyspark.ml.evaluation import RegressionEvaluator

#Pour lancer une sequence d'instructions/ d'algorithmes selon un ordre defini.
from pyspark.ml import Pipeline

In [2]:
#Recuperer une SparkSession ou bien créer une nouvelle en se basant sur les options 
#passées au Builder si la session est inexistente.
spark=SparkSession.builder.master('local').appName('house_price_pred').getOrCreate()

In [3]:
#nullValue: pour spécifier la chaine de caractere qui represente la valeur nulle
#inferSchema: déduire automatiquement le schéma d'entrée à partir des données.
#header: utilise la première ligne comme noms de colonnes.
train_df = spark.read.csv('train.csv',header=True,inferSchema=True,nullValue="NA")
test_df  = spark.read.csv('test.csv',header=True,inferSchema=True,nullValue="NA")

In [4]:
def printColsNullNumber(this_df):
    null_columns=this_df.select([count(when(isnull(c), c)).alias(c) for c in this_df.columns])
    return null_columns.collect()
    
printColsNullNumber(train_df)

[Row(Id=0, MSSubClass=0, MSZoning=0, LotFrontage=259, LotArea=0, Street=0, Alley=1369, LotShape=0, LandContour=0, Utilities=0, LotConfig=0, LandSlope=0, Neighborhood=0, Condition1=0, Condition2=0, BldgType=0, HouseStyle=0, OverallQual=0, OverallCond=0, YearBuilt=0, YearRemodAdd=0, RoofStyle=0, RoofMatl=0, Exterior1st=0, Exterior2nd=0, MasVnrType=8, MasVnrArea=8, ExterQual=0, ExterCond=0, Foundation=0, BsmtQual=37, BsmtCond=37, BsmtExposure=38, BsmtFinType1=37, BsmtFinSF1=0, BsmtFinType2=38, BsmtFinSF2=0, BsmtUnfSF=0, TotalBsmtSF=0, Heating=0, HeatingQC=0, CentralAir=0, Electrical=1, 1stFlrSF=0, 2ndFlrSF=0, LowQualFinSF=0, GrLivArea=0, BsmtFullBath=0, BsmtHalfBath=0, FullBath=0, HalfBath=0, BedroomAbvGr=0, KitchenAbvGr=0, KitchenQual=0, TotRmsAbvGrd=0, Functional=0, Fireplaces=0, FireplaceQu=690, GarageType=81, GarageYrBlt=81, GarageFinish=81, GarageCars=0, GarageArea=0, GarageQual=81, GarageCond=81, PavedDrive=0, WoodDeckSF=0, OpenPorchSF=0, EnclosedPorch=0, 3SsnPorch=0, ScreenPorch=0,

In [5]:
printColsNullNumber(test_df)

[Row(Id=0, MSSubClass=0, MSZoning=4, LotFrontage=227, LotArea=0, Street=0, Alley=1352, LotShape=0, LandContour=0, Utilities=2, LotConfig=0, LandSlope=0, Neighborhood=0, Condition1=0, Condition2=0, BldgType=0, HouseStyle=0, OverallQual=0, OverallCond=0, YearBuilt=0, YearRemodAdd=0, RoofStyle=0, RoofMatl=0, Exterior1st=1, Exterior2nd=1, MasVnrType=16, MasVnrArea=15, ExterQual=0, ExterCond=0, Foundation=0, BsmtQual=44, BsmtCond=45, BsmtExposure=44, BsmtFinType1=42, BsmtFinSF1=1, BsmtFinType2=42, BsmtFinSF2=1, BsmtUnfSF=1, TotalBsmtSF=1, Heating=0, HeatingQC=0, CentralAir=0, Electrical=0, 1stFlrSF=0, 2ndFlrSF=0, LowQualFinSF=0, GrLivArea=0, BsmtFullBath=2, BsmtHalfBath=2, FullBath=0, HalfBath=0, BedroomAbvGr=0, KitchenAbvGr=0, KitchenQual=1, TotRmsAbvGrd=0, Functional=2, Fireplaces=0, FireplaceQu=730, GarageType=76, GarageYrBlt=78, GarageFinish=78, GarageCars=1, GarageArea=1, GarageQual=78, GarageCond=78, PavedDrive=0, WoodDeckSF=0, OpenPorchSF=0, EnclosedPorch=0, 3SsnPorch=0, ScreenPorch=

In [6]:
#supprimer les colonne qui ont un taux de valeur null <NA> elevé
#Alley=1369, FireplaceQu=690, PoolQC=1453, Fence=1179, MiscFeature=1406
high_null_cols = ['Alley', 'FireplaceQu', 'PoolQC', 'Fence', 'MiscFeature']
train_df=train_df.drop(*high_null_cols)
test_df=test_df.drop(*high_null_cols)

In [7]:
#remplacer les valeurs nulles des entiers par la moyennes de colonnes
def fill_with_mean(this_df):
    included=[item[0] for item in this_df.dtypes if item[1].startswith('int')]
    stats = this_df.agg(*(avg(c).alias(c) for c in this_df.columns if c in included))
    return this_df.na.fill(stats.first().asDict())

train_df=fill_with_mean(train_df)
test_df=fill_with_mean(test_df)

In [8]:
train_df.head(1)

[Row(Id=1, MSSubClass=60, MSZoning=u'RL', LotFrontage=65, LotArea=8450, Street=u'Pave', LotShape=u'Reg', LandContour=u'Lvl', Utilities=u'AllPub', LotConfig=u'Inside', LandSlope=u'Gtl', Neighborhood=u'CollgCr', Condition1=u'Norm', Condition2=u'Norm', BldgType=u'1Fam', HouseStyle=u'2Story', OverallQual=7, OverallCond=5, YearBuilt=2003, YearRemodAdd=2003, RoofStyle=u'Gable', RoofMatl=u'CompShg', Exterior1st=u'VinylSd', Exterior2nd=u'VinylSd', MasVnrType=u'BrkFace', MasVnrArea=196, ExterQual=u'Gd', ExterCond=u'TA', Foundation=u'PConc', BsmtQual=u'Gd', BsmtCond=u'TA', BsmtExposure=u'No', BsmtFinType1=u'GLQ', BsmtFinSF1=706, BsmtFinType2=u'Unf', BsmtFinSF2=0, BsmtUnfSF=150, TotalBsmtSF=856, Heating=u'GasA', HeatingQC=u'Ex', CentralAir=u'Y', Electrical=u'SBrkr', 1stFlrSF=856, 2ndFlrSF=854, LowQualFinSF=0, GrLivArea=1710, BsmtFullBath=1, BsmtHalfBath=0, FullBath=2, HalfBath=1, BedroomAbvGr=3, KitchenAbvGr=1, KitchenQual=u'Gd', TotRmsAbvGrd=8, Functional=u'Typ', Fireplaces=0, GarageType=u'Attch

In [9]:
#remplacer les valeurs nulles des chaines de caracteres par la chaine de caractere X
#BsmtQual=37, BsmtCond=37, BsmtExposure=38, BsmtFinType1=37 BsmtFinType2=38, GarageType=81, GarageYrBlt=81, GarageFinish=81,GarageQual=81, GarageCond=81, Electrical=1,MasVnrType=8,
def fill_with_string(this_df,replacement):
    cols=[item[0] for item in this_df.dtypes if item[1].startswith('string')]
    for col in cols:
        this_df=this_df.na.fill(replacement,col)
    return this_df

train_df=fill_with_string(train_df,'X')
test_df=fill_with_string(test_df,'X')

In [10]:
def convertStringToIndex(this_df):
    cols=[item[0] for item in this_df.dtypes if item[1].startswith('string')]
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(this_df) 
                for column in list(set(cols)) ]
    pipeline = Pipeline(stages=indexers)
    this_df = pipeline.fit(this_df).transform(this_df)
    return this_df

In [11]:
#Convertir les valeurs sous forme de chaines de caracteres en indice numerique
train_df=convertStringToIndex(train_df)
test_df=convertStringToIndex(test_df)

In [12]:
printColsNullNumber(train_df)

[Row(Id=0, MSSubClass=0, MSZoning=0, LotFrontage=0, LotArea=0, Street=0, LotShape=0, LandContour=0, Utilities=0, LotConfig=0, LandSlope=0, Neighborhood=0, Condition1=0, Condition2=0, BldgType=0, HouseStyle=0, OverallQual=0, OverallCond=0, YearBuilt=0, YearRemodAdd=0, RoofStyle=0, RoofMatl=0, Exterior1st=0, Exterior2nd=0, MasVnrType=0, MasVnrArea=0, ExterQual=0, ExterCond=0, Foundation=0, BsmtQual=0, BsmtCond=0, BsmtExposure=0, BsmtFinType1=0, BsmtFinSF1=0, BsmtFinType2=0, BsmtFinSF2=0, BsmtUnfSF=0, TotalBsmtSF=0, Heating=0, HeatingQC=0, CentralAir=0, Electrical=0, 1stFlrSF=0, 2ndFlrSF=0, LowQualFinSF=0, GrLivArea=0, BsmtFullBath=0, BsmtHalfBath=0, FullBath=0, HalfBath=0, BedroomAbvGr=0, KitchenAbvGr=0, KitchenQual=0, TotRmsAbvGrd=0, Functional=0, Fireplaces=0, GarageType=0, GarageYrBlt=0, GarageFinish=0, GarageCars=0, GarageArea=0, GarageQual=0, GarageCond=0, PavedDrive=0, WoodDeckSF=0, OpenPorchSF=0, EnclosedPorch=0, 3SsnPorch=0, ScreenPorch=0, PoolArea=0, MiscVal=0, MoSold=0, YrSold=

In [13]:
printColsNullNumber(test_df)

[Row(Id=0, MSSubClass=0, MSZoning=0, LotFrontage=0, LotArea=0, Street=0, LotShape=0, LandContour=0, Utilities=0, LotConfig=0, LandSlope=0, Neighborhood=0, Condition1=0, Condition2=0, BldgType=0, HouseStyle=0, OverallQual=0, OverallCond=0, YearBuilt=0, YearRemodAdd=0, RoofStyle=0, RoofMatl=0, Exterior1st=0, Exterior2nd=0, MasVnrType=0, MasVnrArea=0, ExterQual=0, ExterCond=0, Foundation=0, BsmtQual=0, BsmtCond=0, BsmtExposure=0, BsmtFinType1=0, BsmtFinSF1=0, BsmtFinType2=0, BsmtFinSF2=0, BsmtUnfSF=0, TotalBsmtSF=0, Heating=0, HeatingQC=0, CentralAir=0, Electrical=0, 1stFlrSF=0, 2ndFlrSF=0, LowQualFinSF=0, GrLivArea=0, BsmtFullBath=0, BsmtHalfBath=0, FullBath=0, HalfBath=0, BedroomAbvGr=0, KitchenAbvGr=0, KitchenQual=0, TotRmsAbvGrd=0, Functional=0, Fireplaces=0, GarageType=0, GarageYrBlt=0, GarageFinish=0, GarageCars=0, GarageArea=0, GarageQual=0, GarageCond=0, PavedDrive=0, WoodDeckSF=0, OpenPorchSF=0, EnclosedPorch=0, 3SsnPorch=0, ScreenPorch=0, PoolArea=0, MiscVal=0, MoSold=0, YrSold=

In [14]:
train_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = false)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = false)
 |-- LotShape: string (nullable = false)
 |-- LandContour: string (nullable = false)
 |-- Utilities: string (nullable = false)
 |-- LotConfig: string (nullable = false)
 |-- LandSlope: string (nullable = false)
 |-- Neighborhood: string (nullable = false)
 |-- Condition1: string (nullable = false)
 |-- Condition2: string (nullable = false)
 |-- BldgType: string (nullable = false)
 |-- HouseStyle: string (nullable = false)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = false)
 |-- RoofMatl: string (nullable = false)
 |-- Exterior1st: string (nullable = false)
 |-- Exterior2nd: string 

In [15]:
#valeur initiale de la liste des variables est toute les colonnes ayant un type entier ou double.
#(les colonnes qui ont deja un type numerique et les colonnes generées lors de l'etape precedente)
features=[item[0] for item in train_df.dtypes if item[1].startswith('int') or item[1].startswith('double')]

#On enleve la colonne "Id" et "SalePrice" de la liste.
features.remove('Id')
features.remove('SalePrice')

#fonction qui retourne une liste de colonnes qui ont une une correlation >= corrMin ou <= -corrMin
def getCorrelatedFeatures(features,corrMin):
    result=[]
    for feature in features:
        corr=train_df.corr(feature,'SalePrice')
        #print(feature,corr)
        if corr >=corrMin or corr<=-corrMin:
            result.append(feature)
    return result

features=getCorrelatedFeatures(features,0.30)
features

['LotFrontage',
 'OverallQual',
 'YearBuilt',
 'YearRemodAdd',
 'MasVnrArea',
 'BsmtFinSF1',
 'TotalBsmtSF',
 '1stFlrSF',
 '2ndFlrSF',
 'GrLivArea',
 'FullBath',
 'TotRmsAbvGrd',
 'Fireplaces',
 'GarageYrBlt',
 'GarageCars',
 'GarageArea',
 'WoodDeckSF',
 'OpenPorchSF',
 'MasVnrType_index',
 'BsmtQual_index',
 'Foundation_index',
 'HeatingQC_index',
 'KitchenQual_index',
 'ExterQual_index']

In [16]:
assembler = VectorAssembler(inputCols = features, outputCol= "features")
output = assembler.transform(train_df)

In [17]:
datas=output.select('features','SalePrice')

In [18]:
reg_lin=LinearRegression(labelCol='SalePrice')

In [19]:
fited_reg_lin_model=reg_lin.fit(datas)

In [20]:
lin_model_sum=fited_reg_lin_model.summary
print("r2: %f" % lin_model_sum.r2)
print("rmse: %f" % lin_model_sum.rootMeanSquaredError)
print("mae: %f" % lin_model_sum.meanAbsoluteError)

r2: 0.805361
rmse: 35036.389694
mae: 21552.717511


In [21]:
lin_model_sum.predictions.describe().show()

+-------+------------------+------------------+
|summary|         SalePrice|        prediction|
+-------+------------------+------------------+
|  count|              1460|              1460|
|   mean|180921.19589041095|180921.19589041092|
| stddev| 79442.50288288663|  71293.2065382377|
|    min|           34900.0|13693.190058062784|
|    max|          755000.0| 680334.9436308902|
+-------+------------------+------------------+



In [22]:
lin_model_sum.predictions.head(10)

[Row(features=DenseVector([65.0, 7.0, 2003.0, 2003.0, 196.0, 706.0, 856.0, 856.0, 854.0, 1710.0, 2.0, 8.0, 0.0, 2003.0, 2.0, 548.0, 0.0, 61.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0]), SalePrice=208500.0, prediction=220309.7598002758),
 Row(features=DenseVector([80.0, 6.0, 1976.0, 1976.0, 0.0, 978.0, 1262.0, 1262.0, 0.0, 1262.0, 2.0, 6.0, 1.0, 1976.0, 2.0, 460.0, 298.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0]), SalePrice=181500.0, prediction=184576.54656944296),
 Row(features=DenseVector([68.0, 7.0, 2001.0, 2002.0, 162.0, 486.0, 920.0, 920.0, 866.0, 1786.0, 2.0, 6.0, 1.0, 2001.0, 2.0, 608.0, 0.0, 42.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0]), SalePrice=223500.0, prediction=221606.30539146264),
 Row(features=DenseVector([60.0, 7.0, 1915.0, 1970.0, 0.0, 216.0, 756.0, 961.0, 756.0, 1717.0, 1.0, 7.0, 1.0, 1998.0, 3.0, 642.0, 0.0, 35.0, 0.0, 0.0, 2.0, 2.0, 1.0, 0.0]), SalePrice=140000.0, prediction=187767.33178367477),
 Row(features=DenseVector([84.0, 8.0, 2000.0, 2000.0, 350.0, 655.0, 1145.0, 1145.0, 1053.0, 2198

In [23]:
rnd_for=RandomForestRegressor(featuresCol="features",labelCol="SalePrice")

In [24]:
fited_rnd_for_model=rnd_for.fit(datas)

In [25]:
rf_output=fited_rnd_for_model.transform(datas)

In [26]:
#afficher les valeurs des metriques 'r2','rmse','mae' en creant un evaluateur de regression 
def printMetrics(toEvaluate):
    metrics=['r2','rmse','mae']
    for m in metrics:
        evaluator = RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName=m)
        print (m,evaluator.evaluate(toEvaluate))

In [27]:
printMetrics(rf_output)

('r2', 0.8979327695864752)
('rmse', 25371.56762421738)
('mae', 16976.148391241644)


In [28]:
testAssembler = VectorAssembler(inputCols = features, outputCol= "features")

In [29]:
test_output = assembler.transform(test_df)

In [30]:
test_output=test_output.select('Id','features')

In [31]:
predictions=fited_rnd_for_model.transform(test_output)

In [32]:
predictions=predictions.select('Id',col('prediction').alias('SalePrice'))

In [33]:
predictions.show(n=10)

+----+------------------+
|  Id|         SalePrice|
+----+------------------+
|1461|122101.23339981663|
|1462|145536.06133842765|
|1463| 169104.3640318387|
|1464|178064.95122367036|
|1465|211042.95990434173|
|1466|179797.47703079114|
|1467|163819.51185783133|
|1468|172726.26755221578|
|1469|179654.62809513125|
|1470|122115.97458814182|
+----+------------------+
only showing top 10 rows



In [34]:
#sauvegarder le resultat sous format csv dans hdfs
predictions.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('./house_price_spark_ml_predictions.csv')

In [35]:
#sauvegarder le resultat sous format csv en local sans ajouter des indices
predictions.toPandas().to_csv('house_price_spark_ml_predictions.csv',index=False)