In [None]:
CUR, RUN_DS = "s3a://airbnb-bucket-data/curated", "2025-08-25"

listings = spark.read.parquet(f"{CUR}/dim_listing/ds={RUN_DS}")
reviews = spark.read.parquet(f"{CUR}/fact_reviews_stage/ds={RUN_DS}")
dim_date = spark.read.parquet(f"{CUR}/dim_date/ds={RUN_DS}")


In [None]:
listings.printSchema()
reviews.printSchema()
dim_date.printSchema()


                                                                                

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Private room in lighthouse        |2    |
|Private room in loft              |171  |
|Private room in earthen home      |3    |
|Entire chalet                     |4    |
|Earthen home                      |2    |
|Shared room in bus                |1    |
|Farm stay                         |4    |
|Entire rental unit                |39372|
|Shared room in hostel             |59   |
|Shared room                       |4    |
|Private room in condo             |3515 |
|Room in boutique hotel            |239  |
|Private room in religious building|4    |
|Room in bed and breakfast         |24   |
|Private room in casa particular   |86   |
|Entire guesthouse                 |228  |
|Private room in bungalow          |64   |
|Entire cabin                      |39   |
|Hut                               |2    |
|Private room in nature lodge      |2    |
+----------

In [None]:
# Properties that have more bathrooms than bedrooms
listings.filter(
    (listings.bathrooms > listings.bedrooms)
) \
.select('listing_id', 'bathrooms', 'bedrooms') \
.show(10, truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+----------------------------------+-----+
|property_type                     |count|
+----------------------------------+-----+
|Entire rental unit                |39372|
|Private room in rental unit       |14837|
|Private room in home              |11835|
|Entire condo                      |8800 |
|Entire home                       |8756 |
|Private room in condo             |3515 |
|Entire serviced apartment         |1852 |
|Private room in townhouse         |1294 |
|Entire townhouse                  |1123 |
|Room in hotel                     |783  |
|Private room in bed and breakfast |486  |
|Entire loft                       |367  |
|Private room in guesthouse        |316  |
|Room in boutique hotel            |239  |
|Entire guesthouse                 |228  |
|Shared room in rental unit        |191  |
|Entire guest suite                |181  |
|Private room in guest suite       |178  |
|Private room in loft              |171  |
|Private room in serviced apartment|144  |
+----------

                                                                                

In [None]:
# Properties where the price is greater than 5,000. Collect the result as a Python list

listings.filter(
    (listings.price > 5000)
) \
.select('listing_id', 'price') \
.show(10, truncate=False)



[Stage 8:>                                                          (0 + 1) / 1]

+----------------------------------+-----+---------------------------+
|property_type                     |count|avg(review_scores_location)|
+----------------------------------+-----+---------------------------+
|Entire rental unit                |39372|4.72794867465818           |
|Private room in rental unit       |14837|4.732797964376552          |
|Private room in home              |11835|4.6990531037827195         |
|Entire condo                      |8800 |4.763906656266228          |
|Entire home                       |8756 |4.722269211782102          |
|Private room in condo             |3515 |4.7656638325703            |
|Entire serviced apartment         |1852 |4.7218975332068345         |
|Private room in townhouse         |1294 |4.75918656056587           |
|Entire townhouse                  |1123 |4.812793814432991          |
|Room in hotel                     |783  |4.649164619164618          |
|Private room in bed and breakfast |486  |4.7173976608187145         |
|Entir

                                                                                

In [None]:
# A list of properties with the following characteristics:
# * price < 150
# * more than 20 reviews
# * review_scores_rating > 4.5

listings.filter(
    (listings.price < 150) &
    (listings.number_of_reviews > 20) &
    (listings.review_scores_rating > 4.5)
  ) \
.select('listing_id', 'price', 'number_of_reviews', 'review_scores_rating') \
.show(truncate=False)

                                                                                

In [None]:
# A list of properties with the following characteristics:
# * price < 150 OR more than one bathroom

listings.filter(
    (listings.price < 150) | (listings.bedrooms > 1)
  ) \
.select('listing_id', 'price', 'bedrooms') \
.show(truncate=False)

StructField('listing_id', LongType(), True)
StructField('id', LongType(), True)
StructField('date', DateType(), True)
StructField('reviewer_id', IntegerType(), True)
StructField('reviewer_name', StringType(), True)
StructField('comments', StringType(), True)


In [None]:
# A list of properties with the following with price < 150 OR more than one bathroom

listings_with_price.filter(
    (listings_with_price.price_numeric < 150) | (listings_with_price.bedrooms > 1)
  ) \
.select('name', 'price_numeric', 'bedrooms') \
.show(truncate=False)

In [None]:
# The highest listing price in this dataset

from pyspark.sql.functions import max

listings \
  .select(max('price')) \
  .show()

In [None]:
# The number of hosts in the dataset
listings \
  .select('host_id') \
  .distinct() \
  .count()

In [None]:
# The total number of listings and average review score per host

from pyspark.sql.functions import avg, count

host_stats = listings \
  .filter(listings.review_scores_rating.isNotNull()) \
  .groupBy('host_id') \
  .agg(
    count('host_id').alias('total_listings'),
    avg('review_scores_rating').alias('average_review_score')
  ) \
  .show(10)

In [None]:
# The top ten listings with the highest number of reviews

reviews \
  .groupBy('listing_id') \
  .count() \
  .orderBy('count', ascending=False) \
  .limit(10) \
  .show()

In [None]:
# The top five neighborhoods with the most listings

listings \
  .groupBy('neighbourhood_cleansed') \
  .count() \
  .orderBy('count', ascending=False) \
  .limit(5) \
  .show()

In [None]:
# The number of reviews per listing using the "reviews" dataset

reviews_per_listing = reviews \
  .groupBy('listing_id') \
  .count() \
  .show(10)

In [None]:
# The number of reviews per listing using the "reviews" dataset

reviews_per_listing = reviews \
  .groupBy('listing_id') \
  .count() \
  .show(10)

In [None]:
# The total number of listings and average review score per host

from pyspark.sql.functions import avg, count

host_stats = listings \
  .filter(listings.review_scores_rating.isNotNull()) \
  .groupBy('host_id') \
  .agg(
    count('id').alias('total_listings'),
    avg('review_scores_rating').alias('average_review_score')
  ) \
  .show(10)

In [None]:
# The top ten listings with the highest number of reviews

reviews \
  .groupBy('listing_id') \
  .count() \
  .orderBy('count', ascending=False) \
  .limit(10) \
  .show()

In [None]:
# A data frame with the following two columns:
# * Reviewer's name
# * Review's comment

listings.join(reviews, listings.listing_id == reviews.listing_id, 'inner') \
  .select(listings.listing_id, 'review_id', 'comments') \
  .show(truncate=False)

In [None]:
# Listings without reviews
listings_without_reviews = listings \
  .join(
    reviews,
    listings.listing_id == reviews.listing_id,
    how='left_anti'
  ) \
  .select('listing_id') \
  .show(truncate=False)

In [None]:
# Adding a new column to the listings DataFrame that computes a string category depending on its price.

# The category is defined in the following way:
# * price < 50 -> "Budget"
# * 50 <= price < 150 -> "Mid-range"
# * price >= 150 -> "Luxury"


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def categorize_price(price):
    if price is None:
        return 'Unknown'
    elif price < 50:
        return 'Budget'
    elif 50 <= price < 150:
        return 'Mid-range'
    elif price >= 150:
        return 'Luxury'
    else:
        return 'Unknown'

categorize_price_udf = udf(categorize_price, StringType())

listings_with_category = listings \
  .filter(listings.price.isNotNull()) \
  .withColumn(
    'price_category',
    categorize_price_udf(listings.price)
  ) \
  .groupBy('price_category') \
  .count() \
  .show()

In [None]:

# Top five listings with the highest average review comment length. 

reviews.createOrReplaceTempView("reviews")
listings.createOrReplaceTempView("listings")


sql_query = """
SELECT
  r.listing_id,
  AVG(LENGTH(r.comments)) AS average_comment_length,
  COUNT(r.id) AS reviews_count
FROM
  reviews r
JOIN
  listings l
  ON r.listing_id = l.id
GROUP BY
  r.listing_id
HAVING
  COUNT(r.id) >= 5
ORDER BY
  average_comment_length DESC
"""

spark \
  .sql(sql_query) \
  .show()

In [None]:
# Average time passed from the first review for each listing in the listings dataset. 

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import PandasUDFType
import pandas as pd


@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def average_days_since_first_review_udf(first_review_series) -> float:
    today = pd.to_datetime('today')
    listing_ages = (today - pd.to_datetime(first_review_series)).dt.days
    if listing_ages.empty:
        return None
    return listing_ages.mean()

listings \
  .filter(
    listings.first_review.isNotNull()
  ) \
  .groupBy('host_id') \
  .agg(
    average_days_since_first_review_udf(listings.first_review).alias('average_days_since_first_review_days')
  ) \
  .show()