In [1]:
import os

print("CWD:", os.getcwd())
print("Fichiers:", os.listdir("."))

CWD: C:\Users\ilyes\M2\Algorithmique\Projet
Fichiers: ['.ipynb_checkpoints', 'cities.csv', 'idf_ventes.csv', 'nettoyage_pyspark.ipynb', 'scrap.py']


In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("ImmoIDF-Nettoyage")
         .getOrCreate())

In [None]:
from pyspark.sql.functions import col, trim, count, when

EXPECTED_COLS = ["Ville", "Type", "Surface", "NbrPieces", "NbrChambres", "NbrSdb", "DPE", "Prix"]

annonces = (
    spark.read
    .option("header", True)
    .option("encoding", "utf-8")
    .csv("data/raw/idf_ventes.csv")
)

# Trim sur toutes les colonnes (conformité)
for c in annonces.columns:
    annonces = annonces.withColumn(c, trim(col(c)))

print("=== Colonnes ===")
print(annonces.columns)

missing = [c for c in EXPECTED_COLS if c not in annonces.columns]
extra = [c for c in annonces.columns if c not in EXPECTED_COLS]
print("Missing:", missing)
print("Extra:", extra)

print("\n=== Schéma ===")
annonces.printSchema()

print("\n=== Aperçu ===")
annonces.show(10, truncate=False)

print("\n=== Nb lignes ===")
print(annonces.count())

print("\n=== Null counts (devrait être 0 partout si CSV bien formé) ===")
annonces.select([count(when(col(c).isNull(), 1)).alias(f"null_{c}") for c in EXPECTED_COLS]).show(truncate=False)

print("\n=== Distinct Type / DPE ===")
annonces.select("Type").distinct().show(truncate=False)
annonces.select("DPE").distinct().show(truncate=False)

=== Colonnes ===
['Ville', 'Type', 'Surface', 'NbrPieces', 'NbrChambres', 'NbrSdb', 'DPE', 'Prix']
Missing: []
Extra: []

=== Schéma ===
root
 |-- Ville: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Surface: string (nullable = true)
 |-- NbrPieces: string (nullable = true)
 |-- NbrChambres: string (nullable = true)
 |-- NbrSdb: string (nullable = true)
 |-- DPE: string (nullable = true)
 |-- Prix: string (nullable = true)


=== Aperçu ===
+------------------+------+-------+---------+-----------+------+------+------+
|Ville             |Type  |Surface|NbrPieces|NbrChambres|NbrSdb|DPE   |Prix  |
+------------------+------+-------+---------+-----------+------+------+------+
|Paris 15ème       |Maison|220    |8        |4          |3     |Vierge|250000|
|Meaux             |Maison|325    |10       |5          |-     |-     |749000|
|Misy-sur-Yonne    |Maison|128    |5        |4          |1     |E     |279000|
|Nanteuil-lès-Meaux|Maison|89     |5        |3          |-    

In [4]:
annonces = annonces.replace("-", "Vierge", subset=["DPE"])

In [5]:
annonces.select("DPE").distinct().show(truncate=False)

from pyspark.sql.functions import col
print("Nb DPE='-':", annonces.filter(col("DPE") == "-").count())

+------+
|DPE   |
+------+
|F     |
|E     |
|B     |
|D     |
|C     |
|Vierge|
|G     |
+------+

Nb DPE='-': 0


In [6]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType

num_cols = ["Surface", "NbrPieces", "NbrChambres", "NbrSdb"]

for c in num_cols:
    annonces = annonces.withColumn(
        c,
        when((col(c) == "-") | (col(c) == "") | col(c).isNull(), None).otherwise(col(c))
    ).withColumn(c, col(c).cast(DoubleType()))

In [7]:
from pyspark.sql.functions import avg, lit

means_row = annonces.agg(*[avg(col(c)).alias(c) for c in num_cols]).collect()[0]
means = {c: means_row[c] for c in num_cols}
print("Moyennes:", means)

for c in num_cols:
    annonces = annonces.withColumn(c, when(col(c).isNull(), lit(means[c])).otherwise(col(c)))

Moyennes: {'Surface': 91.27362204724409, 'NbrPieces': 5.5265225933202355, 'NbrChambres': 2.525879917184265, 'NbrSdb': 1.2905405405405406}


In [8]:
from pyspark.sql.functions import count

# Compter les nulls restants
null_counts = annonces.select([count(when(col(c).isNull(), 1)).alias(f"null_{c}") for c in num_cols])
null_counts.show(truncate=False)

# S'il reste des nulls (ex: colonne entièrement vide => moyenne = None), on supprime ces lignes
annonces = annonces.dropna(subset=num_cols)

print("Nb lignes après dropna:", annonces.count())

+------------+--------------+----------------+-----------+
|null_Surface|null_NbrPieces|null_NbrChambres|null_NbrSdb|
+------------+--------------+----------------+-----------+
|0           |0             |0               |0          |
+------------+--------------+----------------+-----------+

Nb lignes après dropna: 530


In [9]:
from pyspark.sql.functions import col

dpe_vals  = [r[0] for r in annonces.select("DPE").distinct().collect()]
type_vals = [r[0] for r in annonces.select("Type").distinct().collect()]

print("DPE distinct:", sorted(dpe_vals))
print("Type distinct:", sorted(type_vals))

DPE distinct: ['B', 'C', 'D', 'E', 'F', 'G', 'Vierge']
Type distinct: ['Appartement', 'Maison']


In [10]:
from pyspark.sql.functions import when, lit
import re

def _safe_colname(prefix: str, value: str) -> str:
    # Nettoyage minimal pour faire un nom de colonne Spark valide
    v = str(value).strip()
    v = v.replace(" ", "_")
    v = re.sub(r"[^0-9A-Za-z_]", "_", v)  # remplace accents/symboles par _
    v = re.sub(r"_+", "_", v).strip("_")
    return f"{prefix}_{v}"

# DPE -> colonnes DPE_*
for v in dpe_vals:
    cname = _safe_colname("DPE", v)
    annonces = annonces.withColumn(cname, when(col("DPE") == lit(v), 1).otherwise(0).cast("int"))

# Type -> colonnes Type_*
for v in type_vals:
    cname = _safe_colname("Type", v)
    annonces = annonces.withColumn(cname, when(col("Type") == lit(v), 1).otherwise(0).cast("int"))

In [11]:
# Affiche quelques colonnes indicatrices
cols_to_show = ["Type", "DPE"] + [c for c in annonces.columns if c.startswith("Type_") or c.startswith("DPE_")]
annonces.select(cols_to_show).show(10, truncate=False)

+------+------+-----+-----+-----+-----+-----+----------+-----+----------------+-----------+
|Type  |DPE   |DPE_F|DPE_E|DPE_B|DPE_D|DPE_C|DPE_Vierge|DPE_G|Type_Appartement|Type_Maison|
+------+------+-----+-----+-----+-----+-----+----------+-----+----------------+-----------+
|Maison|Vierge|0    |0    |0    |0    |0    |1         |0    |0               |1          |
|Maison|Vierge|0    |0    |0    |0    |0    |1         |0    |0               |1          |
|Maison|E     |0    |1    |0    |0    |0    |0         |0    |0               |1          |
|Maison|F     |1    |0    |0    |0    |0    |0         |0    |0               |1          |
|Maison|B     |0    |0    |1    |0    |0    |0         |0    |0               |1          |
|Maison|B     |0    |0    |1    |0    |0    |0         |0    |0               |1          |
|Maison|E     |0    |1    |0    |0    |0    |0         |0    |0               |1          |
|Maison|C     |0    |0    |0    |0    |1    |0         |0    |0               |1

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexers = [
    StringIndexer(inputCol="DPE", outputCol="DPE_idx", handleInvalid="keep"),
    StringIndexer(inputCol="Type", outputCol="Type_idx", handleInvalid="keep"),
]

encoders = [
    OneHotEncoder(inputCol="DPE_idx", outputCol="DPE_ohe"),
    OneHotEncoder(inputCol="Type_idx", outputCol="Type_ohe"),
]

pipe = Pipeline(stages=indexers + encoders)
model = pipe.fit(annonces)
annonces_ohe = model.transform(annonces)

annonces_ohe.select("DPE", "DPE_idx", "DPE_ohe", "Type", "Type_idx", "Type_ohe").show(10, truncate=False)

+------+-------+-------------+------+--------+-------------+
|DPE   |DPE_idx|DPE_ohe      |Type  |Type_idx|Type_ohe     |
+------+-------+-------------+------+--------+-------------+
|Vierge|0.0    |(7,[0],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|Vierge|0.0    |(7,[0],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|E     |2.0    |(7,[2],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|F     |4.0    |(7,[4],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|B     |5.0    |(7,[5],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|B     |5.0    |(7,[5],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|E     |2.0    |(7,[2],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|C     |3.0    |(7,[3],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|G     |6.0    |(7,[6],[1.0])|Maison|1.0     |(2,[1],[1.0])|
|C     |3.0    |(7,[3],[1.0])|Maison|1.0     |(2,[1],[1.0])|
+------+-------+-------------+------+--------+-------------+
only showing top 10 rows



In [None]:
import os
from pyspark.sql import functions as F

print(os.listdir("."))  # vérifiez que cities.csv est bien là

# Essai séparateur ';'
villes = (spark.read
          .option("header", True)
          .option("encoding", "utf-8")
          .option("sep", ";")
          .csv("data/processed/cities.csv"))

# Si une seule colonne, on retente avec ','
if len(villes.columns) == 1:
    villes = (spark.read
              .option("header", True)
              .option("encoding", "utf-8")
              .option("sep", ",")
              .csv("data/processed/cities.csv"))

print("Colonnes villes:", villes.columns)
villes.show(5, truncate=False)
villes.printSchema()

['.ipynb_checkpoints', 'cities.csv', 'idf_ventes.csv', 'nettoyage_pyspark.ipynb', 'scrap.py']
Colonnes villes: ['insee_code', 'city_code', 'zip_code', 'label', 'latitude', 'longitude', 'department_name', 'department_number', 'region_name', 'region_geojson_name']
+----------+-------------------+--------+-------------------+------------+-----------+---------------+-----------------+-----------------------+-----------------------+
|insee_code|city_code          |zip_code|label              |latitude    |longitude  |department_name|department_number|region_name            |region_geojson_name    |
+----------+-------------------+--------+-------------------+------------+-----------+---------------+-----------------+-----------------------+-----------------------+
|25620     |ville du pont      |25650   |ville du pont      |46.999873398|6.498147193|doubs          |25               |bourgogne-franche-comté|Bourgogne-Franche-Comté|
|25624     |villers grelot     |25640   |villers grelot     |

In [14]:
import unicodedata
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

def strip_accents_py(s: str) -> str:
    if s is None:
        return None
    s = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in s if not unicodedata.combining(ch))

strip_accents = F.udf(strip_accents_py, StringType())

def add_city_key(df, colname: str, out: str = "Ville_key"):
    x = F.lower(F.trim(F.col(colname)))
    x = strip_accents(x)
    x = F.regexp_replace(x, r"\b\d+\s*(er|e|eme|ème)\b", "")   # arrondissements
    x = F.regexp_replace(x, r"[^0-9a-z]+", " ")               # séparateurs -> espace
    x = F.regexp_replace(x, r"\s+", " ")
    x = F.trim(x)
    return df.withColumn(out, x)

In [15]:
IDF_DEPTS = ["75","77","78","91","92","93","94","95"]

villes_idf = villes.filter(F.col("department_number").isin(IDF_DEPTS))

villes_k = (add_city_key(villes_idf, "label", "Ville_key")
            .select(
                "Ville_key",
                F.col("latitude").cast("double").alias("latitude"),
                F.col("longitude").cast("double").alias("longitude"),
            )
            .groupBy("Ville_key")
            .agg(F.avg("latitude").alias("latitude"), F.avg("longitude").alias("longitude"))
           )

villes_k.show(5, truncate=False)

+-------------------+------------+-----------+
|Ville_key          |latitude    |longitude  |
+-------------------+------------+-----------+
|barbizon           |48.448347603|2.600809608|
|cergy              |49.039967587|2.051139021|
|coulommiers        |48.81234041 |3.091269785|
|reau               |48.60739655 |2.623860846|
|fontenay le vicomte|48.547225879|2.400248804|
+-------------------+------------+-----------+
only showing top 5 rows



In [16]:
annonces_tmp = add_city_key(annonces_ohe, "Ville", "Ville_key")

annonces_tmp = annonces_tmp.withColumn(
    "Ville_key",
    F.when(F.col("Ville_key").isin("evry", "courcouronnes"), F.lit("evry courcouronnes"))
     .when(F.col("Ville_key") == "le chesnay", F.lit("le chesnay rocquencourt"))
     .otherwise(F.col("Ville_key"))
)

annonces_geo = annonces_tmp.join(villes_k, on="Ville_key", how="left")

print("Nb lignes annonces_geo:", annonces_geo.count())
print("Nb lignes sans coordonnées (avant suppression):",
      annonces_geo.filter(F.col("latitude").isNull() | F.col("longitude").isNull()).count())

Nb lignes annonces_geo: 530
Nb lignes sans coordonnées (avant suppression): 27


In [17]:
annonces_geo = annonces_geo.dropna(subset=["latitude", "longitude"])
print("Nb lignes après suppression lat/lon manquants:", annonces_geo.count())

Nb lignes après suppression lat/lon manquants: 503


In [18]:
final_cols = [
    "Surface", "NbrPieces", "NbrChambres", "NbrSdb", "Prix",
    "DPE_B", "DPE_C", "DPE_D", "DPE_E", "DPE_F", "DPE_G", "DPE_Vierge",
    "Type_Appartement", "Type_Maison",
    "latitude", "longitude"
]

for c in final_cols:
    if c not in annonces_geo.columns:
        annonces_geo = annonces_geo.withColumn(c, F.lit(0))

annonces = annonces_geo.select(*final_cols)

from pyspark.sql.types import DoubleType, IntegerType

double_cols = ["Surface", "NbrPieces", "NbrChambres", "NbrSdb", "Prix", "latitude", "longitude"]
int_cols = [c for c in final_cols if c not in double_cols]

for c in double_cols:
    annonces = annonces.withColumn(c, F.col(c).cast(DoubleType()))
for c in int_cols:
    annonces = annonces.withColumn(c, F.col(c).cast(IntegerType()))

annonces = annonces.dropna()

print("Nb lignes:", annonces.count())
print("Nb colonnes:", len(annonces.columns))
annonces.printSchema()

Nb lignes: 503
Nb colonnes: 16
root
 |-- Surface: double (nullable = true)
 |-- NbrPieces: double (nullable = true)
 |-- NbrChambres: double (nullable = true)
 |-- NbrSdb: double (nullable = true)
 |-- Prix: double (nullable = true)
 |-- DPE_B: integer (nullable = false)
 |-- DPE_C: integer (nullable = false)
 |-- DPE_D: integer (nullable = false)
 |-- DPE_E: integer (nullable = false)
 |-- DPE_F: integer (nullable = false)
 |-- DPE_G: integer (nullable = false)
 |-- DPE_Vierge: integer (nullable = false)
 |-- Type_Appartement: integer (nullable = false)
 |-- Type_Maison: integer (nullable = false)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

