In [None]:
# !apt-get install openjdk-11-jdk-headless -qq > /dev/null
# !pip install pyspark==3.3.0 delta-spark==2.2.0 pyarrow

# Initialize Spark with Delta Lake

In [None]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("DeltaLakeETL") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "4g")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.3.0


## **Mount Google Drive**


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **Data Ingestion**

In [None]:
airbnb_path = "/content/drive/MyDrive/Colab Notebooks/Airbnb_Open_Data.parquet"
airbnb = spark.read.parquet(airbnb_path, header=True, inferSchema=False)

In [None]:
airbnb.printSchema()
print(f"Original row count: {airbnb.count()}")

root
 |-- id: long (nullable = true)
 |-- NAME: string (nullable = true)
 |-- host id: long (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- host name: string (nullable = true)
 |-- neighbourhood group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- country: string (nullable = true)
 |-- country code: string (nullable = true)
 |-- instant_bookable: boolean (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- room type: string (nullable = true)
 |-- Construction year: double (nullable = true)
 |-- price: string (nullable = true)
 |-- service fee: string (nullable = true)
 |-- minimum nights: double (nullable = true)
 |-- number of reviews: double (nullable = true)
 |-- last review: string (nullable = true)
 |-- reviews per month: double (nullable = true)
 |-- review rate number: double (nullable = true)
 |-- calculated host listings count: d

## **🟫 Bronze Layer: Raw Data Storage**

In [None]:
from pyspark.sql.functions import *

# Clean column names
cleaned_columns = [col(c).alias(c.replace(' ', '_')) for c in airbnb.columns]
airbnb_cleaned = airbnb.select(*cleaned_columns)

# Write to Delta Lake
bronze_path = "/content/drive/MyDrive/Colab Notebooks/bronze"
(
    airbnb_cleaned
    .write
    .format("delta")
    .mode("overwrite")
    .save(bronze_path)
)

In [None]:
# Read back for validation
airbnb_bronze = spark.read.format("delta").load(bronze_path)
print(f"Bronze layer row count: {airbnb_bronze.count()}")

Bronze layer row count: 102599


In [None]:

airbnb_bronze.show()

+-------+--------------------+-----------+----------------------+---------+-------------------+------------------+--------+---------+-------------+------------+----------------+-------------------+---------------+-----------------+-------+-----------+--------------+-----------------+-----------+-----------------+------------------+------------------------------+----------------+--------------------+-------+
|     id|                NAME|    host_id|host_identity_verified|host_name|neighbourhood_group|     neighbourhood|     lat|     long|      country|country_code|instant_bookable|cancellation_policy|      room_type|Construction_year|  price|service_fee|minimum_nights|number_of_reviews|last_review|reviews_per_month|review_rate_number|calculated_host_listings_count|availability_365|         house_rules|license|
+-------+--------------------+-----------+----------------------+---------+-------------------+------------------+--------+---------+-------------+------------+----------------+-

## **⏹ Silver Layer: Cleaned & Validated Data**


In [None]:
# Keep 'host name', 'lat', and 'long' columns
columns_to_drop = ['license' , 'house_rules' , 'calculated host listings count' ,'neighbourhood' ,'neighbourhood group' ,'NAME']
airbnb_bronze = airbnb_bronze.drop(*columns_to_drop)

In [None]:
from pyspark.sql.functions import regexp_replace, to_date, lower, trim, coalesce, lit
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

# Define validation lists
VALID_COUNTRIES = ["United States", "US"]
VALID_ROOM_TYPES = ["Hotel room", "Shared room", "Private room", "Entire home/apt"]
VALID_CANCELLATION = ["flexible", "strict", "moderate"]

# Transformation pipeline
airbnb_silver = (
    airbnb_bronze
    .filter(col("country").isin(VALID_COUNTRIES))
    .filter(col("room_type").isin(VALID_ROOM_TYPES))
    .filter(col("cancellation_policy").isin(VALID_CANCELLATION))

    # Type conversions
    .withColumn("id", col("id").cast(IntegerType()))
    .withColumn("host_id", trim(col("host_id")).cast('long'))
    .withColumn("price", regexp_replace(col("price"), "[$,]", "").cast(DoubleType()))
    .withColumn("service_fee", regexp_replace(col("service_fee"), "[$,]", "").cast(DoubleType()))
    .withColumn("review_rate_number", coalesce(col("review_rate_number").cast(IntegerType()), lit(0)))
    .withColumn("availability_365", coalesce(col("availability_365").cast(IntegerType()), lit(0)))
    .withColumn("instant_bookable", lower(col("instant_bookable")) == "true")

    # Date conversion with error handling
    .withColumn("last_review",
                to_date(col("last_review"), "M/d/yyyy"))

    # Data quality checks
    .filter(col("review_rate_number").between(1, 5))
    .filter(col("availability_365").between(0, 365))
)

# Write to Delta Lake
silver_path = "/content/drive/MyDrive/Colab Notebooks/silver"
(
    airbnb_silver
    .write
    .format("delta")
    .mode("overwrite")
    .save(silver_path)
)

In [None]:

airbnb_silver.show()

+-------+-----------+----------------------+---------+-------------------+--------+---------+-------------+------------+----------------+-------------------+---------------+-----------------+------+-----------+--------------+-----------------+-----------+-----------------+------------------+------------------------------+----------------+
|     id|    host_id|host_identity_verified|host_name|neighbourhood_group|     lat|     long|      country|country_code|instant_bookable|cancellation_policy|      room_type|Construction_year| price|service_fee|minimum_nights|number_of_reviews|last_review|reviews_per_month|review_rate_number|calculated_host_listings_count|availability_365|
+-------+-----------+----------------------+---------+-------------------+--------+---------+-------------+------------+----------------+-------------------+---------------+-----------------+------+-----------+--------------+-----------------+-----------+-----------------+------------------+--------------------------

## **🟨 Gold Layer

In [None]:
from pyspark.sql.functions import *

# Read silver data
silver_data = spark.read.format("delta").load(silver_path)

# **1. Dim Date Table**

In [None]:
dim_date = (
    silver_data
    .select("last_review")
    .distinct()
    .filter(col("last_review").isNotNull())
    .select(
        col("last_review").alias("date_key"),
        year("last_review").alias("year"),
        month("last_review").alias("month"),
        dayofmonth("last_review").alias("day"),
        dayofweek("last_review").alias("day_of_week"),
        date_format("last_review", "E").alias("day_of_week_name"),
        weekofyear("last_review").alias("week_of_year"),
        quarter("last_review").alias("quarter")
    )
    .withColumn("date_id", monotonically_increasing_id())
)

# Write dim_date
dim_date_path = "/content/drive/MyDrive/Colab Notebooks/gold/dim_date"
dim_date.write.format("delta").mode("overwrite").save(dim_date_path)

In [None]:
dim_date.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2443 entries, 0 to 2442
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   date_key          2443 non-null   object
 1   year              2443 non-null   int32 
 2   month             2443 non-null   int32 
 3   day               2443 non-null   int32 
 4   day_of_week       2443 non-null   int32 
 5   day_of_week_name  2443 non-null   object
 6   week_of_year      2443 non-null   int32 
 7   quarter           2443 non-null   int32 
 8   date_id           2443 non-null   int64 
dtypes: int32(6), int64(1), object(2)
memory usage: 114.6+ KB


In [None]:
dim_date.show(5)

+----------+----+-----+---+-----------+----------------+------------+-------+-------+
|  date_key|year|month|day|day_of_week|day_of_week_name|week_of_year|quarter|date_id|
+----------+----+-----+---+-----------+----------------+------------+-------+-------+
|2019-06-04|2019|    6|  4|          3|             Tue|          23|      2|      0|
|2019-05-08|2019|    5|  8|          4|             Wed|          19|      2|      1|
|2018-05-28|2018|    5| 28|          2|             Mon|          22|      2|      2|
|2017-08-11|2017|    8| 11|          6|             Fri|          32|      3|      3|
|2018-08-10|2018|    8| 10|          6|             Fri|          32|      3|      4|
+----------+----+-----+---+-----------+----------------+------------+-------+-------+
only showing top 5 rows



# **2. Dim Host Table**

In [None]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Read the silver layer data
silver_data = spark.read.format('delta').load('/content/drive/MyDrive/Colab Notebooks/silver')

# Create dim_host table with additional columns
dim_host = (silver_data
    .select('host_name', 'host_identity_verified','host_id').distinct().filter(col('host_name').isNotNull())
)

# Add a surrogate key
dim_host = dim_host.withColumn('host_identity_key', monotonically_increasing_id())

# Select and reorder columns for readability
dim_host = dim_host.select(
    col('host_id'),
    col('host_identity_key'),
    col('host_name'),
    col('host_identity_verified')
)

# Show the first few rows and the schema
dim_host.show(5)
dim_host.printSchema()

# Save the dim_host table to the gold layer
dim_host.write.format('delta').mode('overwrite').save('/content/drive/MyDrive/Colab Notebooks/gold/dim_host')

+-----------+-----------------+---------+----------------------+
|    host_id|host_identity_key|host_name|host_identity_verified|
+-----------+-----------------+---------+----------------------+
|40348053102|                0|   Tucker|           unconfirmed|
| 9133292331|                1|   Martin|              verified|
|16135561331|                2|  Freddie|              verified|
|78440704783|                3|   Gianna|                  null|
|88013998883|                4|   Daniel|              verified|
+-----------+-----------------+---------+----------------------+
only showing top 5 rows

root
 |-- host_id: long (nullable = true)
 |-- host_identity_key: long (nullable = false)
 |-- host_name: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)



In [None]:
dim_host.count()

97657

In [None]:
dim_host.createOrReplaceTempView('dimhost')

spark.sql("""
SELECT host_identity_verified,
COUNT (*)
FROM dimhost
GROUP BY host_identity_verified
ORDER BY COUNT (*) DESC
""").show()

+----------------------+--------+
|host_identity_verified|count(1)|
+----------------------+--------+
|              verified|    9873|
|           unconfirmed|    9818|
|                  null|     230|
+----------------------+--------+



In [None]:
dim_host.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 97657 entries, 0 to 97656
Data columns (total 4 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   host_id                 97657 non-null  int64 
 1   host_identity_key       97657 non-null  int64 
 2   host_name               97657 non-null  object
 3   host_identity_verified  97397 non-null  object
dtypes: int64(2), object(2)
memory usage: 3.0+ MB


# **3. Dim GEO**

In [None]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Read the silver layer data
silver_data = spark.read.format('delta').load('/content/drive/MyDrive/Colab Notebooks/silver')

# Create dim_geo table with geographical columns
dim_geo = silver_data.select('long', 'lat', 'country').distinct().filter(col('country').isNotNull())

# Add a surrogate key
dim_geo = dim_geo.withColumn('geo_key', monotonically_increasing_id())

# Select and reorder columns for readability
dim_geo = dim_geo.select(
    col('geo_key'),
    col('long'),
    col('lat'),
    col('country')
)

# Show the first few rows and the schema
dim_geo.show(5)
dim_geo.printSchema()

# Save the dim_geo table to the gold layer
dim_geo.write.format('delta').mode('overwrite').save('/content/drive/MyDrive/Colab Notebooks/gold/dim_geo')

+-------+---------+--------+-------------+
|geo_key|     long|     lat|      country|
+-------+---------+--------+-------------+
|      0|-73.94314|40.68236|United States|
|      1|-73.94254|40.72212|United States|
|      2|-73.92357|40.69055|United States|
|      3|-73.96199|40.71312|United States|
|      4|-73.97447|40.78304|United States|
+-------+---------+--------+-------------+
only showing top 5 rows

root
 |-- geo_key: long (nullable = false)
 |-- long: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- country: string (nullable = true)



In [None]:
dim_geo.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 64561 entries, 0 to 64560
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype  
---  ------   --------------  -----  
 0   geo_key  64561 non-null  int64  
 1   long     64560 non-null  float64
 2   lat      64560 non-null  float64
 3   country  64561 non-null  object 
dtypes: float64(2), int64(1), object(1)
memory usage: 2.0+ MB


# **4. Dim Room**

In [None]:
# from pyspark.sql.functions import col, monotonically_increasing_id

# # Create dim_room_type table
# dim_room_type = silver_data.select('room_type','cancellation_policy','host_id').distinct().filter(col('room_type').isNotNull())

# # Add a surrogate key
# dim_room_type = dim_room_type.withColumn('room_type_key', monotonically_increasing_id())

# # Select and reorder columns for readability
# dim_room_type = dim_room_type.select(
#     col('room_type_key'),
#     col('room_type').alias('room_type_name'),
#     col('cancellation_policy'),
#     col('host_id')
# )

# # Show the first few rows and the schema
# dim_room_type.show(5)
# dim_room_type.printSchema()

# # Save the dim_room_type table to the gold layer
# dim_room_type.write.format('delta').mode('overwrite').save('/content/drive/MyDrive/Colab Notebooks/gold/dim_room_type')

+-------------+---------------+-------------------+-----------+
|room_type_key| room_type_name|cancellation_policy|    host_id|
+-------------+---------------+-------------------+-----------+
|            0|   Private room|           flexible|80294758485|
|            1|Entire home/apt|           flexible|30424058431|
|            2|Entire home/apt|           flexible|91305743194|
|            3|   Private room|             strict|83631106404|
|            4|Entire home/apt|           moderate|13624468375|
+-------------+---------------+-------------------+-----------+
only showing top 5 rows

root
 |-- room_type_key: long (nullable = false)
 |-- room_type_name: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- host_id: long (nullable = true)



In [None]:
dim_room_type.show()

+-------------+---------------+-------------------+-----------+
|room_type_key| room_type_name|cancellation_policy|    host_id|
+-------------+---------------+-------------------+-----------+
|            0|   Private room|           flexible|80294758485|
|            1|Entire home/apt|           flexible|30424058431|
|            2|Entire home/apt|           flexible|91305743194|
|            3|   Private room|             strict|83631106404|
|            4|Entire home/apt|           moderate|13624468375|
|            5|Entire home/apt|             strict|58072189362|
|            6|   Private room|             strict|82587835441|
|            7|   Private room|           moderate|55835327316|
|            8|Entire home/apt|             strict|11609383313|
|            9|Entire home/apt|             strict|97157556455|
|           10|   Private room|           moderate|53844747475|
|           11|Entire home/apt|           flexible| 9985701676|
|           12|   Private room|         

## **5. Fact Listings (Fact Table)**

In [None]:
# Read Delta files as Spark DataFrames
def read_delta_as_spark(path):
    return spark.read.format('delta').load(path)

# Load all data as Spark DataFrames
silver_df_spark = read_delta_as_spark('/content/drive/MyDrive/Colab Notebooks/silver')
dim_date_spark = read_delta_as_spark('/content/drive/MyDrive/Colab Notebooks/gold/dim_date')
dim_host_spark = read_delta_as_spark('/content/drive/MyDrive/Colab Notebooks/gold/dim_host')
dim_geo_spark = read_delta_as_spark('/content/drive/MyDrive/Colab Notebooks/gold/dim_geo')
dim_room_spark = read_delta_as_spark('/content/drive/MyDrive/Colab Notebooks/gold/dim_room_type')


# Convert last_review_date in silver_df_spark
silver_df_spark = silver_df_spark.withColumn('last_review_date', silver_df_spark['last_review'].cast('date'))

# Define columns to keep from silver layer for the fact table (non-dimensional and join keys)
# These are the measures and foreign keys for the fact table
fact_cols_from_silver = [
    'id', 'host_id', 'instant_bookable', 'Construction_year', 'price',
    'service_fee', 'minimum_nights', 'number_of_reviews', 'reviews_per_month','cancellation_policy' ,
    'review_rate_number', 'availability_365', 'last_review_date', 'long', 'lat', 'country', 'room_type'
]

# Select initial columns from silver layer for the fact table
fact_df_spark = silver_df_spark.select(fact_cols_from_silver)

In [None]:
fact_df_spark.show()

+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+
|     id|    host_id|instant_bookable|Construction_year| price|service_fee|minimum_nights|number_of_reviews|reviews_per_month|cancellation_policy|review_rate_number|availability_365|last_review_date|     long|     lat|      country|      room_type|
+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+
|1001254|80014485718|           false|           2020.0| 966.0|      193.0|          10.0|              9.0|             0.21|             strict|                 4|             286|      2021-10-19|-73.97237|40.64749|United States|   Private room|
|100

In [None]:
# # Show the schema of the fact table
# df = fact_df_spark.dropDuplicates()

# # Show the total number of rows in the fact table
# print(f"Total row count in fact_df_spark: {df.count()}")

Total row count in fact_df_spark: 98049


# Join with dim_date

In [None]:
# Join with dim_date
fact_df_spark = fact_df_spark.alias('fact').join(
    dim_date_spark.alias('date'),
    col('fact.last_review_date') == col('date.date_key'),
    'inner'
).select('fact.*', 'date.date_id')


In [None]:
fact_df_spark.show()

+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+
|     id|    host_id|instant_bookable|Construction_year| price|service_fee|minimum_nights|number_of_reviews|reviews_per_month|cancellation_policy|review_rate_number|availability_365|last_review_date|     long|     lat|      country|      room_type|date_id|
+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+
|1001254|80014485718|           false|           2020.0| 966.0|      193.0|          10.0|              9.0|             0.21|             strict|                 4|             286|      2021-10-19|-73.97237|40.64749|United Stat

# Join with dim_host

In [None]:
# Join with dim_host
fact_df_spark = fact_df_spark.alias('fact').join(
    dim_host_spark.alias('host'),
    col('fact.host_id') == col('host.host_id'),
    'inner'
).select('fact.*', 'host.host_identity_key')


In [None]:
fact_df_spark.show()

+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+-----------------+
|     id|    host_id|instant_bookable|Construction_year| price|service_fee|minimum_nights|number_of_reviews|reviews_per_month|cancellation_policy|review_rate_number|availability_365|last_review_date|     long|     lat|      country|      room_type|date_id|host_identity_key|
+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+-----------------+
|1001254|80014485718|           false|           2020.0| 966.0|      193.0|          10.0|              9.0|             0.21|             strict|                 4|          

# Join with dim_geo

In [None]:
# Join with dim_geo
fact_df_spark = fact_df_spark.alias('fact').join(
    dim_geo_spark.alias('geo'),
    (col('fact.long') == col('geo.long')) & (col('fact.lat') == col('geo.lat')) & (col('fact.country') == col('geo.country')),
    'inner'
).select('fact.*', 'geo.geo_key')

print(f"Column count after joining with dim_geo: {len(fact_df_spark.columns)}")

Column count after joining with dim_geo: 20


In [None]:
fact_df_spark.show()

+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+-----------------+-------+
|     id|    host_id|instant_bookable|Construction_year| price|service_fee|minimum_nights|number_of_reviews|reviews_per_month|cancellation_policy|review_rate_number|availability_365|last_review_date|     long|     lat|      country|      room_type|date_id|host_identity_key|geo_key|
+-------+-----------+----------------+-----------------+------+-----------+--------------+-----------------+-----------------+-------------------+------------------+----------------+----------------+---------+--------+-------------+---------------+-------+-----------------+-------+
|1001254|80014485718|           false|           2020.0| 966.0|      193.0|          10.0|              9.0|             0.21|             strict|     

# **Load Fact**

In [None]:
fact_df_spark.write.format('delta').mode('overwrite').save('/content/drive/MyDrive/Colab Notebooks/gold/fact_listings')