# PHASE 1 — INGESTION
Setup: imports and config

In [0]:
# Standard PySpark + Python imports used throughout the notebook.
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, IntegerType, FloatType, StringType, BooleanType, DateType, ArrayType
)
from pyspark.sql.functions import col, regexp_replace, to_date, when, trim, lit, udf, from_json, explode
import json

# Optional: set a stable path for Delta tables 
BASE_DELTA_PATH = "/tmp/delta/airbnb_paris"

# Print spark version (useful in the presentation)
print("Spark version:", spark.version)


Spark version: 4.0.0


Read CSV into a raw DataFrame 

In [0]:
# Path to the uploaded CSV in DBFS
csv_path = "/Volumes/workspace/default/new/Listings.csv"   

raw_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .load(csv_path)

# Basic sanity checks
print("Rows:", raw_df.count())
raw_df.printSchema()
display(raw_df.limit(10))


Rows: 279712
root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: double (nullable = true)
 |-- host_acceptance_rate: double (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- amenities: string (nullable = true

listing_id,name,host_id,host_since,host_location,host_response_time,host_response_rate,host_acceptance_rate,host_is_superhost,host_total_listings_count,host_has_profile_pic,host_identity_verified,neighbourhood,district,city,latitude,longitude,property_type,room_type,accommodates,bedrooms,amenities,price,minimum_nights,maximum_nights,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,instant_bookable
281420,"Beautiful Flat in le Village Montmartre, Paris",1466919,2011-12-03,"Paris, Ile-de-France, France",,,,f,1,t,f,Buttes-Montmartre,,Paris,48.88668,2.33343,Entire apartment,Entire place,2,1,"[""Heating"", ""Kitchen"", ""Washer"", ""Wifi"", ""Long term stays allowed""]",53,2,1125,100,10,10,10,10,10,10,f
3705183,39 mÂ² Paris (Sacre CÅ“ur),10328771,2013-11-29,"Paris, Ile-de-France, France",,,,f,1,t,t,Buttes-Montmartre,,Paris,48.88617,2.34515,Entire apartment,Entire place,2,1,"[""Shampoo"", ""Heating"", ""Kitchen"", ""Essentials"", ""Washer"", ""Dryer"", ""Wifi"", ""Long term stays allowed""]",120,2,1125,100,10,10,10,10,10,10,f
4082273,"Lovely apartment with Terrace, 60m2",19252768,2014-07-31,"Paris, Ile-de-France, France",,,,f,1,t,f,Elysee,,Paris,48.88112,2.31712,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Washer"", ""Wifi"", ""Long term stays allowed""]",89,2,1125,100,10,10,10,10,10,10,f
4797344,Cosy studio (close to Eiffel tower),10668311,2013-12-17,"Paris, Ile-de-France, France",,,,f,1,t,t,Vaugirard,,Paris,48.84571,2.30584,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Wifi"", ""Long term stays allowed""]",58,2,1125,100,10,10,10,10,10,10,f
4823489,Close to Eiffel Tower - Beautiful flat : 2 rooms,24837558,2014-12-14,"Paris, Ile-de-France, France",,,,f,1,t,f,Passy,,Paris,48.855,2.26979,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Essentials"", ""Hair dryer"", ""Washer"", ""Dryer"", ""Bathtub"", ""Wifi"", ""Elevator"", ""Long term stays allowed"", ""Cable TV""]",60,2,1125,100,10,10,10,10,10,10,f
4898654,NEW - Charming apartment Le Marais,505535,2011-04-13,"Paris, Ile-de-France, France",,,,f,1,t,t,Temple,,Paris,48.86428,2.3537,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Essentials"", ""Washer"", ""Smoke alarm"", ""Wifi"", ""Long term stays allowed"", ""Cable TV""]",95,2,1125,100,10,10,10,10,10,10,f
6021700,2P - Entre Bastille et Republique,8053690,2013-08-09,"Paris, Ile-de-France, France",,,,f,1,t,t,Popincourt,,Paris,48.86384,2.37101,Entire apartment,Entire place,2,1,"[""Shampoo"", ""TV"", ""Kitchen"", ""Washer"", ""Smoke alarm"", ""Wifi"", ""Fire extinguisher"", ""Long term stays allowed""]",80,2,1125,100,10,10,10,10,10,10,f
6945740,57sqm btw. Bastille & PÃ¨re Lachaise,5924709,2013-04-14,"Paris, Ile-de-France, France",,,,f,1,t,t,Popincourt,,Paris,48.86043,2.37842,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Essentials"", ""Washer"", ""Dryer"", ""Smoke alarm"", ""Wifi"", ""Long term stays allowed""]",59,2,1125,100,10,10,10,10,10,10,f
7491966,Charming appartment near the Parc Buttes Chaumont,35812762,2015-06-14,"Paris, Ile-de-France, France",,,,f,1,t,t,Buttes-Chaumont,,Paris,48.87871,2.37489,Entire apartment,Entire place,2,1,"[""Paid parking off premises"", ""Shampoo"", ""Heating"", ""TV"", ""Iron"", ""Kitchen"", ""Hair dryer"", ""Essentials"", ""Washer"", ""Hot water"", ""Hangers"", ""Smoke alarm"", ""Wifi"", ""Long term stays allowed"", ""Dedicated workspace"", ""Host greets you"", ""Cable TV""]",80,2,1125,100,10,10,10,10,10,10,f
7849932,Bel appartement plein de charme !,20833291,2014-09-02,"Paris, Ile-de-France, France",,,,f,1,t,t,Opera,,Paris,48.8779,2.33122,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Iron"", ""Kitchen"", ""Essentials"", ""Hair dryer"", ""Washer"", ""Hangers"", ""Wifi"", ""Elevator"", ""Long term stays allowed"", ""Dedicated workspace"", ""Cable TV""]",90,2,1125,100,10,10,10,10,10,10,f


# PHASE 2 — CLEANING & PREPARATION
Overview of cleaning operations
## Cleaning & preparation plan

We will:
1. Standardize column names (trim / lower).
2. Normalize price field: remove currency symbols and thousands separators, cast to integer.
3. Convert boolean-like flags (`t`/`f`) into true booleans.
4. Parse dates (e.g., host_since).
5. Cast numeric columns (latitude, longitude, review scores).
6. Parse `amenities` from an encoded string to an array of strings.
7. Handle missing/invalid values with principled defaults or nulls.
8. Persist the cleaned dataset as a Delta table.


## Normalize column names & preview 

In [0]:
# Normalize column names: remove leading/trailing spaces and convert to snake_case-like style
def normalize_colname(c):
    return c.strip().lower().replace(" ", "_")

normalized_cols = [normalize_colname(c) for c in raw_df.columns]
raw_df = raw_df.toDF(*normalized_cols)

# Show first rows after renaming
raw_df.printSchema()
display(raw_df.limit(5))


root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: double (nullable = true)
 |-- host_acceptance_rate: double (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- amenities: string (nullable = true)
 |-- price:

listing_id,name,host_id,host_since,host_location,host_response_time,host_response_rate,host_acceptance_rate,host_is_superhost,host_total_listings_count,host_has_profile_pic,host_identity_verified,neighbourhood,district,city,latitude,longitude,property_type,room_type,accommodates,bedrooms,amenities,price,minimum_nights,maximum_nights,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,instant_bookable
281420,"Beautiful Flat in le Village Montmartre, Paris",1466919,2011-12-03,"Paris, Ile-de-France, France",,,,f,1,t,f,Buttes-Montmartre,,Paris,48.88668,2.33343,Entire apartment,Entire place,2,1,"[""Heating"", ""Kitchen"", ""Washer"", ""Wifi"", ""Long term stays allowed""]",53,2,1125,100,10,10,10,10,10,10,f
3705183,39 mÂ² Paris (Sacre CÅ“ur),10328771,2013-11-29,"Paris, Ile-de-France, France",,,,f,1,t,t,Buttes-Montmartre,,Paris,48.88617,2.34515,Entire apartment,Entire place,2,1,"[""Shampoo"", ""Heating"", ""Kitchen"", ""Essentials"", ""Washer"", ""Dryer"", ""Wifi"", ""Long term stays allowed""]",120,2,1125,100,10,10,10,10,10,10,f
4082273,"Lovely apartment with Terrace, 60m2",19252768,2014-07-31,"Paris, Ile-de-France, France",,,,f,1,t,f,Elysee,,Paris,48.88112,2.31712,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Washer"", ""Wifi"", ""Long term stays allowed""]",89,2,1125,100,10,10,10,10,10,10,f
4797344,Cosy studio (close to Eiffel tower),10668311,2013-12-17,"Paris, Ile-de-France, France",,,,f,1,t,t,Vaugirard,,Paris,48.84571,2.30584,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Wifi"", ""Long term stays allowed""]",58,2,1125,100,10,10,10,10,10,10,f
4823489,Close to Eiffel Tower - Beautiful flat : 2 rooms,24837558,2014-12-14,"Paris, Ile-de-France, France",,,,f,1,t,f,Passy,,Paris,48.855,2.26979,Entire apartment,Entire place,2,1,"[""Heating"", ""TV"", ""Kitchen"", ""Essentials"", ""Hair dryer"", ""Washer"", ""Dryer"", ""Bathtub"", ""Wifi"", ""Elevator"", ""Long term stays allowed"", ""Cable TV""]",60,2,1125,100,10,10,10,10,10,10,f


## Price cleaning

In [0]:
# Price often contains currency symbols or commas. Remove non-numeric characters and cast to integer.
clean_df = raw_df.withColumn(
    "price_clean",
    regexp_replace(col("price").cast("string"), "[^0-9]", "")  # remove anything not a digit
)

# Convert empty strings to null then to integer (to avoid cast errors)
clean_df = clean_df.withColumn("price_clean", when(col("price_clean") == "", None).otherwise(col("price_clean").cast(IntegerType())))


## Boolean fields cleaning 

In [0]:
# Standard boolean conversion for fields that show 't' / 'f' or 'True'/'False'
bool_cols = ["host_is_superhost", "host_has_profile_pic", "host_identity_verified", "instant_bookable"]

# If a column doesn't exist in the dataset, safely skip it
existing_bool_cols = [c for c in bool_cols if c in clean_df.columns]

for c in existing_bool_cols:
    clean_df = clean_df.withColumn(c + "_bool",
                                   when(col(c).rlike("(?i)^t$|(?i)^true$|(?i)^yes$"), True)
                                   .when(col(c).rlike("(?i)^f$|(?i)^false$|(?i)^no$"), False)
                                   .otherwise(None))




## Date parsing 

In [0]:
# Parse host_since into a DateType. The sample uses formats like '12/3/2011' or '11/29/2013'
# We'll attempt common US-style formats, and fallback to null if parse fails.
clean_df = clean_df.withColumn("host_since_parsed", to_date(col("host_since"), "M/d/yyyy"))

## Numeric casting 

In [0]:
# Cast geographic and numeric fields to float/int as appropriate
numeric_casts = {
    "latitude": FloatType(),
    "longitude": FloatType(),
    "review_scores_rating": FloatType(),
    "review_scores_accuracy": FloatType(),
    "review_scores_cleanliness": FloatType(),
    "review_scores_checkin": FloatType(),
    "review_scores_communication": FloatType(),
    "review_scores_location": FloatType(),
    "review_scores_value": FloatType(),
    "accommodates": IntegerType(),
    "bedrooms": FloatType(),   # bedrooms may contain decimals or be empty
    "minimum_nights": IntegerType(),
    "maximum_nights": IntegerType()
}

for colname, dtype in numeric_casts.items():
    if colname in clean_df.columns:
        clean_df = clean_df.withColumn(colname + "_num", col(colname).cast(dtype))


## Amenities parsing

In [0]:
from pyspark.sql.functions import col, regexp_replace, from_json, split
from pyspark.sql.types import ArrayType, StringType

if "amenities" in clean_df.columns:

    # STEP 1 — Clean corrupted characters (common in scraped Airbnb datasets)
    clean_df = clean_df.withColumn(
        "amenities_clean",
        regexp_replace(col("amenities"), "[âÃ¢Â]", "")
    )

    # STEP 2 — Ensure double quotes around items
    clean_df = clean_df.withColumn(
        "amenities_clean",
        regexp_replace(col("amenities_clean"), "\\'", "\"")
    )

    # STEP 3 — Try parsing as JSON array
    clean_df = clean_df.withColumn(
        "amenities_array",
        from_json(col("amenities_clean"), ArrayType(StringType()))
    )


    clean_df = clean_df.withColumn(
        "amenities_array",
        col("amenities_array").cast("array<string>")
    )

    
    clean_df = clean_df.withColumn(
        "amenities_array",
        split(
            regexp_replace(
                regexp_replace(col("amenities_clean"), r'^\[|\]$', ''),  # remove [ ]
                r'"\s*,\s*"', '","'  # normalize separators
            ),
            '","'
        )
    )
else:
    clean_df = clean_df.withColumn("amenities_array", F.array().cast(ArrayType(StringType())))


##  Column selection and final tidy-up 

In [0]:
# Construct a clean, well-named set of columns for the final table.
selected_cols = []

# Preserve original identifiers and text fields
for c in ["listing_id", "name", "host_id", "host_location", "neighbourhood", "district", "city", "property_type", "room_type"]:
    if c in clean_df.columns:
        selected_cols.append(col(c).alias(c))

# Add cleaned numeric and boolean columns with descriptive names
if "price_clean" in clean_df.columns:
    selected_cols.append(col("price_clean").alias("price"))

# Booleans
for c in existing_bool_cols:
    selected_cols.append(col(c + "_bool").alias(c))

# Dates and numeric fields (use parsed versions)
if "host_since_parsed" in clean_df.columns:
    selected_cols.append(col("host_since_parsed").alias("host_since"))

for orig, dtype in numeric_casts.items():
    new_col = orig + "_num"
    if new_col in clean_df.columns:
        # Rename bedroom_num -> bedrooms, latitude_num -> latitude etc.
        selected_cols.append(col(new_col).alias(orig))

# Amenities array
if "amenities_array" in clean_df.columns:
    selected_cols.append(col("amenities_array").alias("amenities"))

# Location
if "latitude" not in [c._jc.toString() if hasattr(c, "_jc") else None for c in selected_cols]: 
    if "latitude_num" in clean_df.columns: selected_cols.append(col("latitude_num").alias("latitude"))
    if "longitude_num" in clean_df.columns: selected_cols.append(col("longitude_num").alias("longitude"))

# Create final DataFrame
final_df = clean_df.select(*selected_cols)

# Quick validation
final_df.printSchema()
display(final_df.limit(10))


root
 |-- listing_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_location: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- host_is_superhost: boolean (nullable = true)
 |-- host_has_profile_pic: boolean (nullable = true)
 |-- host_identity_verified: boolean (nullable = true)
 |-- instant_bookable: boolean (nullable = true)
 |-- host_since: date (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- review_scores_rating: float (nullable = true)
 |-- review_scores_accuracy: float (nullable = true)
 |-- review_scores_cleanliness: float (nullable = true)
 |-- review_scores_checkin: float (nullable = true)
 |-- review_scores_communication: float (nullabl

listing_id,name,host_id,host_location,neighbourhood,district,city,property_type,room_type,price,host_is_superhost,host_has_profile_pic,host_identity_verified,instant_bookable,host_since,latitude,longitude,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,accommodates,bedrooms,minimum_nights,maximum_nights,amenities,latitude.1,longitude.1
281420,"Beautiful Flat in le Village Montmartre, Paris",1466919,"Paris, Ile-de-France, France",Buttes-Montmartre,,Paris,Entire apartment,Entire place,53,False,True,False,False,2011-12-03,48.88668,2.33343,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, Kitchen, Washer, Wifi, Long term stays allowed"")",48.88668,2.33343
3705183,39 mÂ² Paris (Sacre CÅ“ur),10328771,"Paris, Ile-de-France, France",Buttes-Montmartre,,Paris,Entire apartment,Entire place,120,False,True,True,False,2013-11-29,48.88617,2.34515,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Shampoo, Heating, Kitchen, Essentials, Washer, Dryer, Wifi, Long term stays allowed"")",48.88617,2.34515
4082273,"Lovely apartment with Terrace, 60m2",19252768,"Paris, Ile-de-France, France",Elysee,,Paris,Entire apartment,Entire place,89,False,True,False,False,2014-07-31,48.88112,2.31712,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Washer, Wifi, Long term stays allowed"")",48.88112,2.31712
4797344,Cosy studio (close to Eiffel tower),10668311,"Paris, Ile-de-France, France",Vaugirard,,Paris,Entire apartment,Entire place,58,False,True,True,False,2013-12-17,48.84571,2.30584,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Wifi, Long term stays allowed"")",48.84571,2.30584
4823489,Close to Eiffel Tower - Beautiful flat : 2 rooms,24837558,"Paris, Ile-de-France, France",Passy,,Paris,Entire apartment,Entire place,60,False,True,False,False,2014-12-14,48.855,2.26979,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Hair dryer, Washer, Dryer, Bathtub, Wifi, Elevator, Long term stays allowed, Cable TV"")",48.855,2.26979
4898654,NEW - Charming apartment Le Marais,505535,"Paris, Ile-de-France, France",Temple,,Paris,Entire apartment,Entire place,95,False,True,True,False,2011-04-13,48.86428,2.3537,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Washer, Smoke alarm, Wifi, Long term stays allowed, Cable TV"")",48.86428,2.3537
6021700,2P - Entre Bastille et Republique,8053690,"Paris, Ile-de-France, France",Popincourt,,Paris,Entire apartment,Entire place,80,False,True,True,False,2013-08-09,48.86384,2.37101,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Shampoo, TV, Kitchen, Washer, Smoke alarm, Wifi, Fire extinguisher, Long term stays allowed"")",48.86384,2.37101
6945740,57sqm btw. Bastille & PÃ¨re Lachaise,5924709,"Paris, Ile-de-France, France",Popincourt,,Paris,Entire apartment,Entire place,59,False,True,True,False,2013-04-14,48.86043,2.37842,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Washer, Dryer, Smoke alarm, Wifi, Long term stays allowed"")",48.86043,2.37842
7491966,Charming appartment near the Parc Buttes Chaumont,35812762,"Paris, Ile-de-France, France",Buttes-Chaumont,,Paris,Entire apartment,Entire place,80,False,True,True,False,2015-06-14,48.87871,2.37489,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Paid parking off premises, Shampoo, Heating, TV, Iron, Kitchen, Hair dryer, Essentials, Washer, Hot water, Hangers, Smoke alarm, Wifi, Long term stays allowed, Dedicated workspace, Host greets you, Cable TV"")",48.87871,2.37489
7849932,Bel appartement plein de charme !,20833291,"Paris, Ile-de-France, France",Opera,,Paris,Entire apartment,Entire place,90,False,True,True,False,2014-09-02,48.8779,2.33122,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Iron, Kitchen, Essentials, Hair dryer, Washer, Hangers, Wifi, Elevator, Long term stays allowed, Dedicated workspace, Cable TV"")",48.8779,2.33122


# PHASE 2 (continued) — Data quality checks & imputations 

In [0]:
from pyspark.sql.functions import col, sum as Fsum, when, isnan
from pyspark.sql.types import IntegerType, FloatType, DoubleType

# -------------------------------------------------------------------
# STEP 1 — Automatically Deduplicate Column Names
# -------------------------------------------------------------------

clean_cols = []
seen = {}

for c in final_df.columns:
    if c not in seen:
        clean_cols.append(c)
        seen[c] = 1
    else:
        new_name = f"{c}_{seen[c]}"
        clean_cols.append(new_name)
        seen[c] += 1

final_df = final_df.toDF(*clean_cols)

# -------------------------------------------------------------------
# STEP 2 — Column-by-column Null Count (safe for all types)
# -------------------------------------------------------------------

# Identify numeric columns allowed for isnan()
numeric_types = (IntegerType, FloatType, DoubleType)
numeric_cols = [f.name for f in final_df.schema.fields if isinstance(f.dataType, numeric_types)]

# Build null expressions safely
null_exprs = []

for c in final_df.columns:
    if c in numeric_cols:
        # numeric column → use both isnan() and isNull()
        expr = Fsum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
    else:
        # non-numeric (boolean, string, date, array...) → only check isNull()
        expr = Fsum(when(col(c).isNull(), 1).otherwise(0)).alias(c)

    null_exprs.append(expr)

null_counts = final_df.select(null_exprs)
display(null_counts)

# -------------------------------------------------------------------
# STEP 3 — Summary Statistics for Numeric Columns
# -------------------------------------------------------------------

if numeric_cols:
    display(final_df.select(numeric_cols).describe())
else:
    print("No numeric columns found for summary statistics.")

# -------------------------------------------------------------------
# STEP 4 — Filter Out Rows with NULL Price (Optional)
# -------------------------------------------------------------------

if "price" in final_df.columns:
    final_filtered_df = final_df.filter(col("price").isNotNull())
    print("Rows after filtering null price:", final_filtered_df.count())
else:
    print("Column 'price' not present — verify cleaned schema.")


listing_id,name,host_id,host_location,neighbourhood,district,city,property_type,room_type,price,host_is_superhost,host_has_profile_pic,host_identity_verified,instant_bookable,host_since,latitude,longitude,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,accommodates,bedrooms,minimum_nights,maximum_nights,amenities,latitude_1,longitude_1
0,171,0,832,0,242700,0,0,0,0,165,165,165,0,165,0,0,91405,91713,91665,91771,91687,91775,91785,0,29435,0,0,0,0,0


summary,listing_id,host_id,price,latitude,longitude,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,accommodates,bedrooms,minimum_nights,maximum_nights,latitude_1,longitude_1
count,279712.0,279712.0,279712.0,279712.0,279712.0,188307.0,187999.0,188047.0,187941.0,188025.0,187937.0,187927.0,279712.0,250277.0,279712.0,279712.0,279712.0,279712.0
mean,26381955.48982525,108165773.08553085,608.7927368150097,18.76186188815415,12.595075454310177,93.40519470864068,9.565476412108575,9.312868591362797,9.701533992050695,9.698593272171252,9.633994370453928,9.335364263783278,3.2887362715936392,1.5155088162316153,8.050966708614576,27558.59666728635,18.76186188815415,12.595075454310177
stddev,14425758.688779045,110856993.21589696,3441.8266110430686,32.56034341916531,73.08130942902805,10.070437471484452,0.9908778930052512,1.1460716187764926,0.8674338353504043,0.886883739934306,0.8332338009173219,1.0426246686467469,2.133378540645778,1.1530795029726428,31.51894649771491,7282875.162315783,32.56034341916531,73.08130942902805
min,2577.0,1822.0,0.0,-34.2644,-99.33963,20.0,2.0,2.0,2.0,2.0,2.0,2.0,0.0,1.0,1.0,1.0,-34.2644,-99.33963
max,48343530.0,390187445.0,625216.0,48.90491,151.33981,100.0,10.0,10.0,10.0,10.0,10.0,10.0,16.0,50.0,9999.0,2147483647.0,48.90491,151.33981


Rows after filtering null price: 279712


## PHASE 2 — Persist cleaned dataset as Delta 

In [0]:
# Robust table registration: prefer saveAsTable(), fallback to CTAS from delta.`path`
from pyspark.sql.utils import AnalysisException

# delta_path should already point to the folder where you wrote Delta files:
# delta_path = "/Volumes/workspace/default/new/airbnb_paris_clean"
print("Attempting to register table from Delta path:", delta_path)

# Step 0: Double-check data exists at delta_path by attempting to read it
try:
    test_df = spark.read.format("delta").load(delta_path)
    print("Successfully read Delta files; sample row count:", test_df.limit(5).count())
except Exception as ex:
    # If read fails here, the write succeeded but files may be inaccessible; raise a clear error.
    print("ERROR: Unable to read Delta files from the path. Aborting registration.")
    print("Read exception:", type(ex).__name__, ex)
    raise

# Target fully qualified table name in Unity Catalog
target_table = "workspace.default.airbnb_paris_clean"

# Drop existing table if present — but be conservative: catch exceptions
try:
    spark.sql(f"DROP TABLE IF EXISTS {target_table}")
    print("Dropped existing table (if present):", target_table)
except Exception as ex:
    print("Warning: could not drop existing table (continuing). Exception:", type(ex).__name__, ex)

# Attempt 1: create managed table using DataFrame API (saveAsTable)
try:
    # Using overwrite to ensure idempotency
    test_df.write.mode("overwrite").saveAsTable(target_table)
    print("SUCCESS: Table created using DataFrame.saveAsTable() ->", target_table)
except Exception as save_ex:
    print("saveAsTable() failed with:", type(save_ex).__name__, save_ex)
    print("Attempting fallback: CREATE TABLE ... AS SELECT FROM delta.`<path>`")

    # Attempt 2: CTAS from delta.`path` (this will copy data into a new table)
    try:
        spark.sql(f"CREATE TABLE {target_table} AS SELECT * FROM delta.`{delta_path}`")
        print("SUCCESS: Table created via CTAS from delta path ->", target_table)
    except AnalysisException as ae:
        # Specific spark/permission errors
        print("CTAS failed with AnalysisException:", ae)
        raise
    except Exception as ctas_ex:
        print("CTAS failed with:", type(ctas_ex).__name__, ctas_ex)
        raise

# Final verification: show top rows and table metadata
try:
    print("\nPreview rows from registered table:")
    display(spark.table(target_table).limit(10))
except Exception as ex:
    print("Warning: could not preview the table after registration:", type(ex).__name__, ex)

# Show table location (if available) and detail
try:
    print("\nDescribe detail for table (may show storage location):")
    display(spark.sql(f"DESCRIBE DETAIL {target_table}"))
except Exception as ex:
    print("Note: DESCRIBE DETAIL failed or not permitted:", type(ex).__name__, ex)

print("\nRegistration attempt completed.")

Attempting to register table from Delta path: /Volumes/workspace/default/new/airbnb_paris_clean
Successfully read Delta files; sample row count: 5
Dropped existing table (if present): workspace.default.airbnb_paris_clean
SUCCESS: Table created using DataFrame.saveAsTable() -> workspace.default.airbnb_paris_clean

Preview rows from registered table:


listing_id,name,host_id,host_location,neighbourhood,district,city,property_type,room_type,price,host_is_superhost,host_has_profile_pic,host_identity_verified,instant_bookable,host_since,latitude,longitude,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,accommodates,bedrooms,minimum_nights,maximum_nights,amenities,latitude_1,longitude_1
281420,"Beautiful Flat in le Village Montmartre, Paris",1466919,"Paris, Ile-de-France, France",Buttes-Montmartre,,Paris,Entire apartment,Entire place,53,False,True,False,False,2011-12-03,48.88668,2.33343,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, Kitchen, Washer, Wifi, Long term stays allowed"")",48.88668,2.33343
3705183,39 mÂ² Paris (Sacre CÅ“ur),10328771,"Paris, Ile-de-France, France",Buttes-Montmartre,,Paris,Entire apartment,Entire place,120,False,True,True,False,2013-11-29,48.88617,2.34515,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Shampoo, Heating, Kitchen, Essentials, Washer, Dryer, Wifi, Long term stays allowed"")",48.88617,2.34515
4082273,"Lovely apartment with Terrace, 60m2",19252768,"Paris, Ile-de-France, France",Elysee,,Paris,Entire apartment,Entire place,89,False,True,False,False,2014-07-31,48.88112,2.31712,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Washer, Wifi, Long term stays allowed"")",48.88112,2.31712
4797344,Cosy studio (close to Eiffel tower),10668311,"Paris, Ile-de-France, France",Vaugirard,,Paris,Entire apartment,Entire place,58,False,True,True,False,2013-12-17,48.84571,2.30584,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Wifi, Long term stays allowed"")",48.84571,2.30584
4823489,Close to Eiffel Tower - Beautiful flat : 2 rooms,24837558,"Paris, Ile-de-France, France",Passy,,Paris,Entire apartment,Entire place,60,False,True,False,False,2014-12-14,48.855,2.26979,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Hair dryer, Washer, Dryer, Bathtub, Wifi, Elevator, Long term stays allowed, Cable TV"")",48.855,2.26979
4898654,NEW - Charming apartment Le Marais,505535,"Paris, Ile-de-France, France",Temple,,Paris,Entire apartment,Entire place,95,False,True,True,False,2011-04-13,48.86428,2.3537,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Washer, Smoke alarm, Wifi, Long term stays allowed, Cable TV"")",48.86428,2.3537
6021700,2P - Entre Bastille et Republique,8053690,"Paris, Ile-de-France, France",Popincourt,,Paris,Entire apartment,Entire place,80,False,True,True,False,2013-08-09,48.86384,2.37101,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Shampoo, TV, Kitchen, Washer, Smoke alarm, Wifi, Fire extinguisher, Long term stays allowed"")",48.86384,2.37101
6945740,57sqm btw. Bastille & PÃ¨re Lachaise,5924709,"Paris, Ile-de-France, France",Popincourt,,Paris,Entire apartment,Entire place,59,False,True,True,False,2013-04-14,48.86043,2.37842,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Kitchen, Essentials, Washer, Dryer, Smoke alarm, Wifi, Long term stays allowed"")",48.86043,2.37842
7491966,Charming appartment near the Parc Buttes Chaumont,35812762,"Paris, Ile-de-France, France",Buttes-Chaumont,,Paris,Entire apartment,Entire place,80,False,True,True,False,2015-06-14,48.87871,2.37489,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Paid parking off premises, Shampoo, Heating, TV, Iron, Kitchen, Hair dryer, Essentials, Washer, Hot water, Hangers, Smoke alarm, Wifi, Long term stays allowed, Dedicated workspace, Host greets you, Cable TV"")",48.87871,2.37489
7849932,Bel appartement plein de charme !,20833291,"Paris, Ile-de-France, France",Opera,,Paris,Entire apartment,Entire place,90,False,True,True,False,2014-09-02,48.8779,2.33122,100.0,10.0,10.0,10.0,10.0,10.0,10.0,2,1.0,2,1125,"List(""Heating, TV, Iron, Kitchen, Essentials, Hair dryer, Washer, Hangers, Wifi, Elevator, Long term stays allowed, Dedicated workspace, Cable TV"")",48.8779,2.33122



Describe detail for table (may show storage location):


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,ecb933b6-afe8-4261-885b-b06aaa2880d9,workspace.default.airbnb_paris_clean,,,2025-12-07T09:04:09.259Z,2025-12-07T09:04:13.000Z,List(),List(),1,22432463,Map(delta.enableDeletionVectors -> true),3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False



Registration attempt completed.


# PHASE 3 — SQL ANALYSIS (Databricks SQL queries)

## Query 1: Average price per district (SQL)

In [0]:
%sql
-- Query 1: Average price per district
SELECT
  district,
  COUNT(*) AS num_listings,
  ROUND(AVG(price), 2) AS avg_price,
  ROUND(MEDIAN(CAST(price AS DOUBLE)), 2) AS median_price
FROM airbnb_paris_clean
GROUP BY district
ORDER BY avg_price DESC;


district,num_listings,avg_price,median_price
,242700,679.85,179.0
Manhattan,16553,179.43,120.0
Brooklyn,14474,119.06,89.0
Staten Island,289,109.22,80.0
Queens,4704,99.68,69.0
Bronx,992,93.7,65.0


## Query 2: Review score vs price correlation (SQL)

In [0]:
%sql
-- Query 2: Average price by review score rating (rounded)
SELECT
  review_scores_rating,
  COUNT(*) AS n,
  ROUND(AVG(price), 2) AS avg_price
FROM airbnb_paris_clean
GROUP BY review_scores_rating
ORDER BY review_scores_rating DESC;


review_scores_rating,n,avg_price
100.0,57458,585.35
99.0,8555,518.67
98.0,13616,491.64
97.0,12425,437.7
96.0,12261,443.83
95.0,10950,453.47
94.0,7739,360.6
93.0,10995,431.65
92.0,5626,355.63
91.0,4772,340.01


## Query 3 (recommended): Superhost vs non-superhost (SQL)

In [0]:
%sql
-- Query 3: Superhost performance (average price and count)
SELECT
  CASE WHEN host_is_superhost THEN 'superhost' WHEN host_is_superhost IS NULL THEN 'unknown' ELSE 'non-superhost' END AS host_status,
  COUNT(*) AS listings,
  ROUND(AVG(price), 2) AS avg_price,
  ROUND(AVG(review_scores_rating), 2) AS avg_review_rating
FROM airbnb_paris_clean
GROUP BY host_status
ORDER BY listings DESC;

host_status,listings,avg_price,avg_review_rating
non-superhost,229294,596.73,92.26
superhost,50253,664.34,97.0
unknown,165,453.1,93.06


## Query 4: Most common amenities (SQL using explode)

In [0]:
%sql
-- Query 4: Top amenities (unpivot array into rows)
SELECT amen AS amenity, COUNT(*) AS freq
FROM (
  SELECT EXPLODE(amenities) AS amen
  FROM airbnb_paris_clean
) t
GROUP BY amen
ORDER BY freq DESC
LIMIT 50;


amenity,freq
Wifi,248800
Essentials,238335
Kitchen,220097
Long term stays allowed,215997
Hangers,198799
TV,186230
Iron,172925
Dedicated workspace,166048
Hair dryer,164318
Hot water,162383


# PHASE 4 — Notebook Analysis 

## Load table and quick summary (Code)

In [0]:
# Read the table we registered
df = spark.table("airbnb_paris_clean")

# Quick summary
display(df.select("district", "city", "room_type").limit(10))
display(df.describe(["price", "review_scores_rating"]))


district,city,room_type
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place
,Paris,Entire place


summary,price,review_scores_rating
count,279712.0,188307.0
mean,608.7927368150097,93.40519470864068
stddev,3441.8266110430686,10.070437471484452
min,0.0,20.0
max,625216.0,100.0


## Grouped analyses 

In [0]:
# Listings count per district
district_counts = df.groupBy("district").count().orderBy(F.desc("count"))
display(district_counts)

# Average price by room_type
avg_price_room = df.groupBy("room_type").agg(F.round(F.avg("price"),2).alias("avg_price"), F.count("*").alias("n"))
display(avg_price_room.orderBy(F.desc("avg_price")))


district,count
,242700
Manhattan,16553
Brooklyn,14474
Queens,4704
Bronx,992
Staten Island,289


room_type,avg_price,n
Hotel room,800.21,5857
Entire place,673.35,182005
Shared room,579.92,4862
Private room,462.44,86988


## Simple display visualisation

In [0]:
display(df.groupBy("district").agg(F.avg("price").alias("avg_price")).orderBy(F.desc("avg_price")))

district,avg_price
,679.8506674907293
Manhattan,179.43442276324532
Brooklyn,119.05948597485146
Staten Island,109.22491349480968
Queens,99.68473639455782
Bronx,93.69858870967742


# BONUS PartRegression – Predict Price from Features 

In [0]:
# Step 1: Import PySpark ML libraries
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

## Step 2: Prepare the data
Convert categorical columns (room_type, host_is_superhost) to numeric using StringIndexer.
Assemble features into a single vector column.

In [0]:
from pyspark.sql.functions import col

ml_df = ml_df.withColumn("host_is_superhost_int", col("host_is_superhost").cast("integer"))


In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

# Index only room_type
room_indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index", handleInvalid="keep")

# Assemble features
feature_cols = ["review_scores_rating", "latitude", "longitude", "room_type_index", "host_is_superhost_int"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Pipeline
pipeline = Pipeline(stages=[room_indexer, assembler])
ml_ready_df = pipeline.fit(ml_df).transform(ml_df)

# Check
ml_ready_df.select("price", "features").show(5, truncate=False)


+-----+----------------------------------------------------+
|price|features                                            |
+-----+----------------------------------------------------+
|53   |[100.0,48.886680603027344,2.333430051803589,0.0,0.0]|
|120  |[100.0,48.88616943359375,2.3451499938964844,0.0,0.0]|
|89   |[100.0,48.88111877441406,2.31712007522583,0.0,0.0]  |
|58   |[100.0,48.84571075439453,2.305840015411377,0.0,0.0] |
|60   |[100.0,48.85499954223633,2.269789934158325,0.0,0.0] |
+-----+----------------------------------------------------+
only showing top 5 rows


In [0]:
# Train/Test Split
train_df, test_df = ml_ready_df.randomSplit([0.8, 0.2], seed=42)
print(f"Training rows: {train_df.count()}, Test rows: {test_df.count()}")


Training rows: 150442, Test rows: 37757


## Step 3 Fit Linear Regression Model

In [0]:

lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train_df)

# Print coefficients
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


Coefficients: [0.4346163592685861,-9.125707990484372,-0.998009258035509,-112.63085220833334,118.23551747426934]
Intercept: 635.0162790047349


## Step 4  Evaluate Model

In [0]:
predictions = lr_model.transform(test_df)
predictions.select("price", "prediction").show(10)

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE on test data: {rmse:.2f}")


+-----+------------------+
|price|        prediction|
+-----+------------------+
|    9|224.00859854912193|
|    9|169.47252083793472|
|    9|230.11721424304778|
|   10|-62.69445045465193|
|   10|279.36041221456696|
|   10| 267.1463594316727|
|   10|230.09695146255206|
|   11|-57.15051639390117|
|   12| 52.07374596132206|
|   12|223.82337152944848|
+-----+------------------+
only showing top 10 rows
RMSE on test data: 1564.59
