# Preprocessing des données et Feature engineering

In [1]:
import findspark
import pyspark
from pyspark import HiveContext
findspark.init()

In [2]:
from pyspark.sql.functions import col
from pyspark.sql.functions import count
from pyspark.sql.functions import isnull
from pyspark.sql.functions import split
from pyspark.sql.functions import sum
from pyspark.sql.functions import when

In [42]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Import").getOrCreate()

In [43]:
df = spark.read.parquet("LaForet_listings.parquet", header=True, inferSchema=True) 

df.limit(10).toPandas()

Unnamed: 0,lien,titre,surface,nb_pieces,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,prix
0,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 15,10 mÂ²,1 piÃ¨ce(s),,,2,Cuisine Kitchenet,,0,99â¯500Â â¬
1,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 18,25 mÂ²,1 piÃ¨ce(s),,,4,Cuisine Ouverte\n \n ...,,0,243â¯000Â â¬
2,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Boulogne-Billancourt,12.5 mÂ²,1 piÃ¨ce(s),,Charges mensuelles copro : 64 â¬,4,Cuisine Kitchenet\n \n ...,,0,112â¯500Â â¬
3,https://www.laforet.com/agence-immobiliere/dij...,Appartement T3 Dijon,65.84 mÂ²,3 piÃ¨ce(s),2 chambre(s),Ann. const. : 2005,10,Cuisine Nue\n \n ...,,1,211â¯700Â â¬
4,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 11,15 mÂ²,1 piÃ¨ce(s),,Soumis Ã la copropriÃ©tÃ©,2,Cuisine Kitchenet,,0,187â¯250Â â¬
5,https://www.laforet.com/agence-immobiliere/par...,Appartement T3 Paris 15,75 mÂ²,3 piÃ¨ce(s),2 chambre(s),Ann. const. : 1970,8,Cuisine IndÃ©pendante\n ...,,0,725â¯500Â â¬
6,https://www.laforet.com/agence-immobiliere/ron...,Appartement T2 Lille,29.34 mÂ²,2 piÃ¨ce(s),1 chambre(s),Soumis Ã la copropriÃ©tÃ©,4,Cuisine Kitchenet\n \n ...,,0,83â¯000Â â¬
7,https://www.laforet.com/agence-immobiliere/sar...,Maison T3 Sartrouville,57.41 mÂ²,3 piÃ¨ce(s),2 chambre(s),Ann. const. : 1948,8,Cuisine Nue\n \n ...,,1,336â¯000Â â¬
8,https://www.laforet.com/agence-immobiliere/tav...,Appartement T4 Taverny,76.44 mÂ²,4 piÃ¨ce(s),3 chambre(s),Ann. const. : 1970,4,1 terrasse(s)\n \n ...,,0,249â¯000Â â¬
9,https://www.laforet.com/agence-immobiliere/mou...,Maison T6 prÃ¨s de Bury,156 mÂ²,6 piÃ¨ce(s),3 chambre(s),,2,Cuisine AmÃ©nagÃ©e/Ã©quipÃ©e,,1,293â¯000Â â¬


In [44]:
print("Shape: (rows, columns) -", (df.count(), len(df.columns)))

Shape: (rows, columns) - (1100, 11)


In [45]:
# On regarde le nombre de valeurs manquantes par colonne
df.select([sum(when(df[col].isNull(), 1).otherwise(0)).alias(col) for col in df.columns]).show()


+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+
|lien|titre|surface|nb_pieces|nb_chambres|annee_constr|nb_features_int|features_int|features_ext|terrain|prix|
+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+
|   0|    0|      0|        2|         87|         162|              0|           0|           0|      0|   0|
+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+



Notre base de données contient relativement peu de valeurs manquantes.

## Nettoyage des données

 Nettoyage et creation de nouvelles valeurs à partir de la colonne "titre"

In [46]:
## On extrait le premier mot dans une colonne "type" pour obtenir le type de logement
df = df.withColumn("type", split(col("titre"), " ")[0])

In [47]:
df.groupBy("type").count().sort("type").show()

+-----------+-----+
|       type|count|
+-----------+-----+
|Appartement|  687|
|       Loft|    1|
|     Maison|  412|
+-----------+-----+



In [484]:
#Possibilité de changer la valeur 'Loft'à 'appartement' car il n'y en a qu'1

#from pyspark.sql.functions import expr

#data = df.withColumn("type", expr("CASE WHEN type='Loft' THEN 'Appartement' ELSE type END"))


In [48]:
## On extrait egalement la ville du titre

# pour faire une fonction à la main qui peut faire des operation plus compliquées, on importe UDF (user defined function)
import re #pour les regex
from pyspark.sql.functions import udf #pour pouvoir appliquer notre fonction a un df
from pyspark.sql.types import StringType

# On definit la fonction de recherche de ville
def find_city_name(label: str) -> str:
    try:
        res = re.search(r'.*T\d+(.*)',label).group(1) #le nom de la ville est après 'Appartement T1' par exemple
        return res
    except:
        res= label.rsplit(None, 1)[-1] #sinon on prend le dernier element de la cell
        return res

# On cree un UDF a partir de notre fonction
find_city_name_udf = udf(find_city_name, StringType())

# On cree une nouvelle colonne avec le nom des villes
df = df.withColumn("villes", find_city_name_udf(df["titre"]))


In [49]:
df.limit(1).toPandas()

Unnamed: 0,lien,titre,surface,nb_pieces,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,prix,type,villes
0,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 15,10 mÂ²,1 piÃ¨ce(s),,,2,Cuisine Kitchenet,,0,99â¯500Â â¬,Appartement,Paris 15


In [50]:
#Les colonnes voulues sont ajoutées, mais on n'a pas encore des noms de villes nettoyés
from pyspark.sql.functions import regexp_replace 
from pyspark.sql.functions import trim

# On remplace la mention 'pres de'
df = df.withColumn("villes", regexp_replace(df["villes"], "prÃ¨s de", ""))

#on enleve les espaces en trop
df = df.withColumn("villes", trim(df["villes"]))



### Nettoyage du au problème d'encoding

In [51]:
df = df.select([regexp_replace(col, 'Ã[¨©]', 'e').alias(col) for col in df.columns]) 
df = df.select([regexp_replace(col, 'Ã´', 'o').alias(col) for col in df.columns])
df = df.select([regexp_replace(col, 'a', 'E').alias(col) for col in df.columns])
df = df.select([regexp_replace(col, '[Ã]', 'a').alias(col) for col in df.columns])  

### Nettoyage des colonnes 'surface', 'nb_pieces' 'nb_chambres' et 'prix'

In [52]:
#On enleve les mesures: 'mÂ²' et 'piÃ¨ce(s)' et les lettres polluantes dans le prix
from pyspark.sql.functions import regexp_extract

df = df.withColumn("surface", regexp_replace(df["surface"], "mÂ²", "")) 
df = df.withColumn("nb_pieces", regexp_extract(df["nb_pieces"], "^(\d+)", 1))
df = df.withColumn("nb_chambres", regexp_extract(df["nb_chambres"], "^(\d+)", 1))
df= df.withColumn("prix", regexp_replace(df["prix"], "[^\d]+", ""))


In [53]:
# on trim au cas ou 
df = df.withColumn("surface", trim(df["surface"]))
df = df.withColumn("nb_pieces", trim(df["nb_pieces"]))
df = df.withColumn("nb_chambres", trim(df["nb_chambres"]))
df = df.withColumn("prix", trim(df["prix"]))

In [14]:
df.limit(1).toPandas()

Unnamed: 0,lien,titre,surface,nb_pieces,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,prix,type,villes
0,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 15,10,1,,,2,Cuisine Kitchenet,,0,99500,Appartement,Paris 15


### Nettoyage de la colonne ann_constr
Les données de cette colonne peuvent correspondre à d'autres types d'information que l'année de construction, car sur le site, quand l'année de construction du bien n'est pas mentionnée, une autre information se place au meme endroit

On se debarasse donc de cette information.

In [54]:
#On ne garde que l'information souhaitée
df = df.withColumn("annee_constr", when(df["annee_constr"].rlike("^Ann. const."), df["annee_constr"]).otherwise(None))

#On ne garde que les digits
df= df.withColumn("annee_constr", regexp_replace(df["annee_constr"], "[^\d]+", ""))

### Traitement des valeurs manquantes
On commence par effectuer une transformation string/numeric aux colonnes ou c'est adapté.

On pourra par la suite faire une imputation de valeurs manquantes par la moyenne/médiane..

In [55]:
df.printSchema()

root
 |-- lien: string (nullable = true)
 |-- titre: string (nullable = true)
 |-- surface: string (nullable = true)
 |-- nb_pieces: string (nullable = true)
 |-- nb_chambres: string (nullable = true)
 |-- annee_constr: string (nullable = true)
 |-- nb_features_int: string (nullable = true)
 |-- features_int: string (nullable = true)
 |-- features_ext: string (nullable = true)
 |-- terrain: string (nullable = true)
 |-- prix: string (nullable = true)
 |-- type: string (nullable = true)
 |-- villes: string (nullable = true)



In [56]:
#On commence par transformer les types des colonnes en numerique
columns_to_num = ['surface','nb_pieces','nb_chambres','annee_constr', 'nb_features_int','terrain','prix']

df = df.select([col(c).cast("double").alias(c) if c in columns_to_num else c for c in df.columns])

In [57]:
#On regarde le nombre de valeurs manquantes par colonne
df.select([sum(when(df[col].isNull(), 1).otherwise(0)).alias(col) for col in df.columns]).show()

+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+----+------+
|lien|titre|surface|nb_pieces|nb_chambres|annee_constr|nb_features_int|features_int|features_ext|terrain|prix|type|villes|
+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+----+------+
|   0|    0|      0|        2|         90|         411|              0|           0|           0|      0|   0|   0|     0|
+----+-----+-------+---------+-----------+------------+---------------+------------+------------+-------+----+----+------+



In [58]:
exemple = df.filter(col('nb_pieces').isNull())
exemple.limit(2).toPandas()

Unnamed: 0,lien,titre,surface,nb_pieces,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,prix,type,villes
0,https://www.laforet.com/agence-immobiliere/bri...,Maison Lavaudieu,90.0,,,,2.0,1 cave(s),,0.0,111000.0,Maison,Lavaudieu
1,https://www.laforet.com/agence-immobiliere/bri...,Maison Lavaudieu,90.0,,,,2.0,1 cave(s),,0.0,144000.0,Maison,Lavaudieu


In [59]:
#Colonne nb_pieces: on fait une imputation par la moyenne comme il n'y a que 2 valeurs manquantes

from pyspark.sql.functions import ceil, mean

# On calcule la moyenne 
mean_nb_pieces = df.agg(ceil(mean(df['nb_pieces']))).first()[0]

# On remplit les valeurs manquantes de la colonne avec la moyenne
df = df.na.fill({'nb_pieces': mean_nb_pieces})


In [60]:
#Colonne nb_chambres: on deduit le nombre de chambres en faisant une imputation par la moyenne groupée 
#en fonction du nombre de pieces des biens

from pyspark.sql.functions import coalesce

df2 = df.groupby("nb_pieces").agg(mean("nb_chambres").alias("mean_nb_chambres"))
df = df.join(df2, on="nb_pieces", how="left")
df = df.withColumn('nb_chambres', coalesce('nb_chambres', 'mean_nb_chambres'))
df = df.drop("mean_nb_chambres")

In [61]:
df.select([sum(when(df[col].isNull(), 1).otherwise(0)).alias(col) for col in df.columns]).show()

+---------+----+-----+-------+-----------+------------+---------------+------------+------------+-------+----+----+------+
|nb_pieces|lien|titre|surface|nb_chambres|annee_constr|nb_features_int|features_int|features_ext|terrain|prix|type|villes|
+---------+----+-----+-------+-----------+------------+---------------+------------+------------+-------+----+----+------+
|        0|   0|    0|      0|          0|         411|              0|           0|           0|      0|   0|   0|     0|
+---------+----+-----+-------+-----------+------------+---------------+------------+------------+-------+----+----+------+



On décide de ne pas remplacer les valeurs manquantes de la colonne 'annee_constr', car elles represetent un trop gros nombre d'observations et qu'aucune autre variable ne semble pouvoir instrumentaliser celle-ci (même en faisant des moyennes groupées, les valeurs remplies ne seront pas proche de la realité). On prefere plutot mettre des 0 afin de marquer les observations ou l'année n'est pas donnée

In [87]:
from pyspark.sql.functions import lit

df = df.withColumn("annee_constr", 
                   when(df["annee_constr"].isNull(), lit(0)).otherwise(df["annee_constr"]))


## Feature engineering

Les variables 'features_ext' et 'features_int' ne sont pas directement exploitables: elles représentent les aménagements exterieurs et interieurs des biens. Ces caractéristiques peuvent être différentes pour chaque logement, on décide donc de transformer un peu ces variables.

In [62]:
#on commence par nettoyer les colonnes features_ext et features_int
df = df.withColumn("features_ext", regexp_replace("features_ext", "\n                       \n                        ", ", "))
df = df.withColumn("features_int", regexp_replace("features_int", "\n                       \n                        ", ", "))

In [63]:
#On regarde les valeurs uniques dans la colonne features_ext et les occurences de ces valeurs
df.groupBy("features_ext").count().sort("features_ext").show(truncate=False)

+--------------------------------+-----+
|features_ext                    |count|
+--------------------------------+-----+
|                                |1009 |
|1 place(s) ext.                 |21   |
|1 place(s) int.                 |16   |
|1 place(s) int., 1 place(s) ext.|5    |
|1 place(s) int., 4 place(s) ext.|1    |
|2 place(s) ext.                 |1    |
|2 place(s) ext., Avec piscine   |2    |
|2 place(s) int.                 |2    |
|2 place(s) int., 2 place(s) ext.|1    |
|3 place(s) int., Avec piscine   |1    |
|31 place(s) ext.                |1    |
|4 place(s) ext.                 |1    |
|4 place(s) int.                 |1    |
|4 place(s) int., Avec piscine   |1    |
|Avec piscine                    |37   |
+--------------------------------+-----+



Pour la variable features_ext, on remarque que le grosse majorité des logements n'en a pas. Cette variable est reservée aux logements qui ont un espace exterieur, donc des logements plus chers. On peut creer des variables à partir de ces valeurs

In [64]:
from pyspark.sql.functions import when, lit

#on cree une variable "piscine"
df = df.withColumn("piscine", when(df["features_ext"].contains("piscine"), lit(1)).otherwise(lit(0)))

In [65]:
#on cree une fonction qui nous permet de récuperer le nombre de features specifiques que contient le bien

from pyspark.sql.types import IntegerType

def extract_amount(value, substring):
    if substring in value:
        return int(value.split(substring)[0].strip().split(" ")[-1]) #on extrait le chiffre precedant le feature qui nous interesse
    else:
        return 0

udf_extract_amount = udf(extract_amount, IntegerType())


In [66]:
#on applique la fonction à la variable "parking_int" que l'on veut creer a partir de la mention "place(s) int." dans la colonne "features_ext"

df = df.withColumn("parking_int", udf_extract_amount(df['features_ext'], lit("place(s) int.")))

In [67]:
#on verifie que la fonction marche bien
df.groupBy("parking_int").count().sort("parking_int").show(truncate=False) 

+-----------+-----+
|parking_int|count|
+-----------+-----+
|0          |1072 |
|1          |22   |
|2          |3    |
|3          |1    |
|4          |2    |
+-----------+-----+



In [68]:
#meme principe pour les places de parking exterieures

df = df.withColumn("parking_ext", udf_extract_amount(df['features_ext'], lit("place(s) ext.")))

In [69]:
#on peut creer une seule variable 'parking', qui est l'addition des 2
df = df.withColumn("place_parking", col("parking_int") + col("parking_ext"))
df = df.drop("parking_int").drop("parking_ext")


In [70]:
df.limit(1).toPandas()

Unnamed: 0,nb_pieces,lien,titre,surface,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,prix,type,villes,piscine,place_parking
0,1.0,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 15,10.0,1.0,,2.0,Cuisine Kitchenet,,0.0,99500.0,Appartement,Paris 15,0,0


On passe maintenant au traitement de la colonne features_int

In [73]:
features=df.groupBy("features_int").count().sort("features_int")
features.limit(100).toPandas()

Unnamed: 0,features_int,count
0,,140
1,1 balcon(s),6
2,"1 balcon(s), 1 cave(s)",3
3,"1 balcon(s), 1 cave(s), 2 garage(s)",1
4,"1 balcon(s), 1 cave(s), Avec sous-sol, 1 garag...",4
...,...,...
95,"Cuisine Amenagee/equipee, 1 balcon(s), 1 cave(s)",4
96,"Cuisine Amenagee/equipee, 1 balcon(s), 1 cave(...",3
97,"Cuisine Amenagee/equipee, 1 balcon(s), 1 terra...",1
98,"Cuisine Amenagee/equipee, 1 balcon(s), 1 terra...",1


On peut voir que la majorité des logements ont des aménagements interieurs mentionnés sur le site, un certain nombre de termes reviennent souvent, on peut donc en créer des variables

In [72]:
#creation des variables binaires

#on cree une variable "cuisine_amenagee"
df = df.withColumn("cuisine_amenagee", when(df["features_int"].contains("Cuisine Amenagee"), lit(1)).otherwise(lit(0)))

#on cree une variable "ascenseur"
df = df.withColumn("ascenseur", when(df["features_int"].contains("ascenseur"), lit(1)).otherwise(lit(0)))

#on cree une variable "sous_sol"
df = df.withColumn("sous_sol", when(df["features_int"].contains("sous-sol"), lit(1)).otherwise(lit(0)))

In [74]:
#on cree des variables numeriques

#on compte le nombre de balcons
df = df.withColumn("balcons", udf_extract_amount(df['features_int'], lit("balcon(s)")))

#on extrait le nombre de terrasses
df = df.withColumn("terrasses", udf_extract_amount(df['features_int'], lit("terrasse(s)")))

#on extrait le nombre de caves
df = df.withColumn("caves", udf_extract_amount(df['features_int'], lit("cave(s)")))

#on extrait le nombre de garages
df = df.withColumn("garages", udf_extract_amount(df['features_int'], lit("garage(s)")))


In [77]:
df.groupBy("garages").count().sort("garages").show(truncate=False) 


+-------+-----+
|garages|count|
+-------+-----+
|0      |878  |
|1      |198  |
|2      |22   |
|3      |1    |
|4      |1    |
+-------+-----+



In [89]:
df.limit(1).toPandas()

Unnamed: 0,nb_pieces,lien,titre,surface,nb_chambres,annee_constr,nb_features_int,features_int,features_ext,terrain,...,villes,piscine,place_parking,cuisine_amenagee,ascenseur,sous_sol,balcons,terrasses,caves,garages
0,1,https://www.laforet.com/agence-immobiliere/par...,Appartement T1 Paris 15,10,1,0,2,Cuisine Kitchenet,,0,...,Paris 15,0,0,0,0,0,0,0,0,0


On peut a present voir les fréquences et statistiques descriptives de notre base

In [88]:
# on change le type de certaine scolonne en float pour avoir une meilleur visualisation par la suite
from pyspark.sql.types import IntegerType

cols = ['prix', 'terrain', 'nb_features_int','annee_constr','nb_chambres','surface','nb_pieces']

for col in cols:
    df = df.withColumn(col, df[col].cast(IntegerType()))


In [90]:
import pandas as pd
pandas_df = df.toPandas()
#pandas_df.to_parquet('Clean_data.parquet')
pandas_df.to_csv("Clean_data.csv", index=False)
