# AirBNB New York Revenue Analysis  <font size=2>v2.0</font>
  <code style="color:#4a304f"><font color= #d689e6>bootcamp project</font></code>

### üíÅ‚Äç‚ôÄÔ∏è Team

- [Gabrielle Rosa](https://github.com/gabxrosa)
- [Nayara Ramos](https://github.com/nay-ramos)

---

### üóÉÔ∏è Dataset

><font size=2>>>> [NY Airbnb open data - Kaggle.com](https://www.kaggle.com/datasets/dgomonov/new-york-city-airbnb-open-data) by  Dgomonov  **and** [NY Airbnb listing details - Inside Airbnb](https://insideairbnb.com/get-the-data/) by Inside Airbnb</font>


>

---

### ‚ùì‚ùì The question:

1. What are the main factors (accommodation characteristics and host profile) that drive revenue from Airbnb listings in New York, and how can these insights be translated into effective strategies to increase the platform's revenue in the city?


---

###  ‚öôÔ∏è Methodology:
  1. Data Consolidation
  2. Data Selection
  3. Database normalization
  4. Understanding and cleaning the data
  5. Preparing dataframe for PowerBI


---

### üî® Tools

<img width=150 title="Databricks" alt="Databricks" src="https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Flogos-world.net%2Fwp-content%2Fuploads%2F2024%2F01%2FDatabricks-Logo.jpg&f=1&nofb=1&ipt=1bbae2ef5109af0db178c7b5f304ccea32bf5c8081ac43a065ca1e34cf8de7b4"> <img width=198 title="PowerBI" alt="PowerBI" src="https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fdatascientest.com%2Fes%2Ffiles%2F2020%2F10%2Fpower-bi-logo-1.jpg&f=1&nofb=1&ipt=099003c0f01745db97361ebe56ede7ec363ea05c5cf7a77759c528d984242c15">




In [0]:
# importing libs
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest

# spark session
spark = SparkSession.builder.appName("spk").getOrCreate()

# reading data from airbnb listings in New York without infering schema
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .option("delimiter", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .csv('/Volumes/workspace/default/primeirovolume/airbnb_NYC_2025.csv')

# reading complementar data from airbnb listings New York without infering schema
dfplus = spark.read \
        .option("header", "true") \
        .option("inferSchema", "false") \
        .option("delimiter", ",") \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .option("multiLine", "true") \
        .csv('/Volumes/workspace/default/primeirovolume/Detailed Listings data_nyc_.csv')

In [0]:
# Visualize columns names, types and counting
df.printSchema()
dfplus.printSchema()

# verify amount of rows
print("=== rows amount ===")
print("df:", df.count(),"/ dfplus: ",dfplus.count())

# renaming neighborhood_highlight colunm
dfplus = dfplus.withColumnRenamed("NeighborhoodFALSEhighlights", "neighborhood_highlights")

### 1. Data Consolidation

Both source DataFrames contain identical record counts (36403 rows each). This 1:1 correspondence enables seamless integration without data loss. The goal of this stage is to merge the primary Dataframe (`df`) with the supplementary Dataframe (`dfplus`) into a single, comprehensive dataset. To ensure data integrity, this process involves three key steps:
- **Verifying the join key (`id`)** to indentify and correct any mismatches between the two sources. 
- **Removing redudant columns** that are present in both Dataframes. 
- **Performing an inner join** to create the final unified Dataframe, (`df_bnb`.)



Both source DataFrames contain identical record counts (36403 rows each). This 1:1 correspondence enables seamless integration without data loss.

We will perform a horizontal combination of both datasets (`inner join`), creating a unified dataframe that preserves all available attributes while maintaining data integrity.

In [0]:
'''
The left_anti join is used as a diagnostic tool to confirm that every
 id in df has a corresponding match in dfplus, and vice-versa.
'''
 
# verifying if there are any rows with id not in df (anti join)
only_df = df.join(dfplus, on="id", how="left_anti")
only_dfplus = dfplus.join(df, on="id", how="left_anti")
only_df.display()
only_dfplus.display()

**Findings:** 

The anti-join revealed 7 ids that did not match. The investigation showed a formatting discrepancy:

**df:** using full number  (ex.: 565252455567607000)

**dfplus:** using cientific notation (ex.: 5,65252E+17)

To resolve this, the mismatched `id`s in `dfplus` will be manually corrected to match the format used in `df`. After the correction, the anti-join is run again to confirm that there are zero mismatched rows.


**Mismatching rows:**

**df:** using full number  (ex.: 565252455567607000)
**dfplus:** using cientific notation (ex.: 5,65252E+17)

Since there is only 7 of them, we'll set both ids as `string` and replace them for the for the full number following `df`'s original schema.

In [0]:
# converting dfplus id column to string
dfplus = dfplus.withColumn("id", f.col("id").cast("string"))

# replacing failed conversion ids
dfplus = dfplus.withColumn("id",
        f.when(f.col("id") == "5,65252E+17", "565252455567607000")
        .when(f.col("id") == "6,68902E+17", "668902255426418000")
        .when(f.col("id") == "8,05806E+17", "805805963320996000")
        .when(f.col("id") == "8,29274E+17", "829274057349888000")
        .when(f.col("id") == "8,70213E+17", "870212853386092000")
        .when(f.col("id") == "8,94142E+17", "894142017712115000")
        .when(f.col("id") == "9,81184E+17", "981183733840544000")
        .otherwise(f.col("id")
 )
)

# verifying if there aren't any rows with id not in dfplus
only_df = df.join(dfplus, on="id", how="left_anti")
only_dfplus = dfplus.join(df, on="id", how="left_anti")
only_df.display()
only_dfplus.display()

In [0]:
'''
 Before merging, it's necessary to remove columns that exist in both DataFrames to avoid ambiguity.
 The intersection of both column sets is identified and these duplicate columns are dropped from dfplus.
'''

# identifying duplicate columns (ignoring id)
cols_df = df.columns
cols_dfplus = dfplus.columns

# columns intersection
dup_cols = set(cols_df).intersection(cols_dfplus)

# ignore id
dup_cols.remove("id")

# showing duplicate columns
print("Duplicate columns:", dup_cols)

# dropping duplicate columns
dfplus_no_dups = dfplus.drop(*dup_cols)

In [0]:
'''
With the join key validated and redundant columns removed, an inner join
 is performed to create the final unified DataFrame, df_bnb.
'''

# ascending id classification
print("df rows count:", df.count())
print("dfplus_no_dups rows count:", dfplus_no_dups.count())
df = df.orderBy("id", ascend=True)
dfplus_no_dups = dfplus_no_dups.orderBy("id", ascend=True)

# inner join df and dfplus_no_dups
df_bnb = df.join(dfplus_no_dups, on="id", how="inner")
print("df_bnb rows count:", df_bnb.count())
df_bnb.display()

In [0]:
df_bnb.printSchema()

In [0]:
# list of columns to cast to another type
cast_types = {
    'price': 'double',
    'minimum_nights': 'int',
    'maximum_nights': 'int',
    'availability_365': 'int',
    'number_of_reviews': 'int',
    'review_scores_rating': 'double',
    'review_scores_accuracy': 'double',
    'review_scores_cleanliness': 'double',
    'review_scores_checkin': 'double',
    'review_scores_communication': 'double',
    'review_scores_location': 'double',
    'review_scores_value': 'double',
    'number_of_reviews_ly': 'int',
    'estimated_occupancy_l365d': 'int',
    'number_of_reviews_ltm': 'int',
    'number_of_reviews_l30d': 'int',
    'last_review': 'date',
    'first_review': 'date',
    'bedrooms': 'int',
    'acomodates': 'int',
    'host_listings_count': 'int',
    'host_total_listings_count': 'int',
    'host_total_listings_count': 'int',
    'host_since': 'date',
    'host_response_time': 'string'
}
 
# casting types only on listed columns and keeping the other columns as they are
df_bnb = df_bnb.select([
    f.col(col).cast(cast_types[col]).alias(col) if col in cast_types 
    else f.col(col) 
    for col in df_bnb.columns
])

df_bnb.printSchema()

### 2. Data selection

The primary goal of this analysis is to identify strategies for increasing Airbnb's revenue. To achieve this, we must first select the most relevant and high-quality data. This section will follow three main steps:
- **Analyze the null value distribution** to understand data completeness.
- **Define and isolate active listings** to focus on the most relevant segment of the market.
- **Identify and remove columns** that are irrelevant to our analysis.


Our analysis main goal is to raise Airbnb's revenue for the next year. To make it happen we need do to understand witch data is relevant to create an effective analysis. Here are the steps do define or priority:
1. Understanding `Null` values distribution
2. Discover active listings
3. Remove columns that won't influence our analysis

In [0]:
# function for null percentage analysis on a DataFrame
def mappingNullvalues(df):
    df_count = df.count()

    null_info = []
    for col in df.columns:
        null_count = df.filter(f.col(col).isNull()).count()
        null_percentage = (null_count / df_count) * 100
        null_info.append((col, null_count, round(null_percentage, 2)))

    # creating a new, sorted DataFrame to display the results clearly
    schema = ["column_name", "null_count", "null_percentage"]
    df_null_mapping = spark.createDataFrame(null_info, schema)

    print(f"Null value analysis for {df_count} total records:")
    display(df_null_mapping.orderBy(f.desc("null_percentage")))
  

In [0]:
# First, replace "N/A" strings with actual null values to ensure accurate counting
df_bnb = df_bnb.replace("N/A", None)

# calculate null counts and percentages after setting N/A
mappingNullvalues(df_bnb)


#### Hypothesis on Missing Price

With **41.55%** of `price`data missing, we hypothesize that the absence of a price is a indicator of an inactive listing. Given that the dataset contains historical data since 2008, it's plausible that many of these records without a price correspond to rooms that are no longer on the market. 

Since our goal is to inform revenue strategy for the upcoming year (2026), analyzing recent trends is more relevant than studying decade-old data.

**Next Step:** To understand the relationship between price and room activity to determine a timeframe of data that makes sense for this analysis. Therefore, we will begin by investigating the characteristics of listings with `null` prices, using other activity indicators to test the hypothesis that they are inactive.

#### Hypothesis on Missing Review Data

A notable pattern is observed in the review-related columns, as many of them share a similar null rate of approximately **31%**. This is not necessarily a data quality issue, but rather an expected outcome for listings that have never received a review.

We hypothesize two primary reasons for a listing having no reviews:
1.  **New Listings:** The room was recently registered and has not yet had time to accumulate reviews.
2.  **Inactive Listings:** The room is old and no longer active, thus receiving no new reviews.

**Next Step:** To differentiate between these two cases and measure a listing's current activity level, we will use proxy metrics that are independent of reviews, such as `availability_365`, `number_of_reviews_ly`, and `estimated_occupancy_l365d`.


#### 2.1 Accommodation Analysis Based on Review Activity

**Hypothesis:** A significant portion of missing values ‚Äã‚Äãin review-related columns (such as `review_scores_rating`, `last_review`, `reviews_per_month`, etc.) is concentrated in accommodations that have no recent reviews (`number_of_reviews_ltm == 0`).

**Rationale:** These accommodations can be considered "inactive" or potentially new, and it is plausible that they do not have enough recent data to populate fields related to ratings and review history. By segmenting the dataset based on this criterion, a more targeted cleaning and analysis strategy can be developed for each group.

**Methodology:** To validate this hypothesis, the dataset will be divided into two distinct subsets:
1. **Active Accommodations:** Accommodations with one or more reviews (`number_of_reviews_ltm > 0`).

2. **Inactive/New Listings:** Listings with zero reviews (`number_of_reviews_ltm == 0`).

After splitting, a `null` value analysis will be performed on each subset. This will allow a direct comparison of data quality and confirm whether `nulls` are, in fact, disproportionately present in the "Inactive/New" group.

In [0]:
# df_review: rooms WITH reviews in the last 12 months
df_reviews = df_bnb.filter(f.col("number_of_reviews_ltm") > 0)

# df_without_review: rooms WITHOUT reviews in the last 12 months 
df_without_reviews = df_bnb.filter(f.col("number_of_reviews_ltm") == 0)

In [0]:
# original dataframe row count
total_original = df_bnb.count()

# new dataframes row count
with_reviews_count = df_reviews.count()
without_reviews_count = df_without_reviews.count()

# data consistency verification
if total_original == (with_reviews_count + without_reviews_count):
    print("Verification is successful!.")
else:
    print("The sum of the parts is NOT equal to the total.")

In [0]:
# calculating null counts and percentages for each column
mappingNullvalues(df_reviews)

In [0]:
# calculating null counts and percentages for each column
mappingNullvalues(df_without_reviews)

**Conclusion & New Hypothesis**

The analysis confirms our initial hypothesis. The null rate for review-related columns is higher in the inactive group.

An important secondary finding is that the null rate for `price` also dropped significantly, from **~40%** to **12%** in the active listings group. This suggests a strong association between recent activity and the commercial viability of a listing.

However, limiting our analysis to only the 11,059 listings with reviews in the last 12 months appears to be too restrictive. We observed that approximately 12,000 listings with valid price information are excluded by this filter, suggesting they could still be relevant for our revenue analysis despite not having a review in the past year. 

We hypothesize that expanding the timeframe to include a longer period of "recent activity" could provide a larger, still relevant, dataset. To test this, we will now analyze listings with a last_review date within the last 2.5 years.

In [0]:
# df_review: rooms WITH at least 1 review in the last 30 months (considering august-2025)
from pyspark.sql.window import Window

# getting the most recent last_review date 
''' We ar doing this to create a dashboard that do not require manual updates
setting a relative time helps us understing the most recente listings behavior
and also give us flexibility to change it if necessary'''
max_last_review = df_bnb.agg(f.max("last_review")).first()[0]

# setting range to 2.5-year (30 months)
rev_range = 30

# calculating the lower bound date (max_last_review - time range)
lower_bound = f.add_months(f.lit(max_last_review), - rev_range)

# filtering rows where last_review is not null and is greater than lower_bound
df_reviews_ranged = df_bnb.filter(
                 (f.col("last_review").isNotNull()) &
                 (f.col("last_review") >= lower_bound)
            )

# calculating null counts and percentages for each column
mappingNullvalues(df_reviews_ranged)

#### Conclusion & Next Hypothesis

The analysis of the 2.5-year window reveals a clear trade-off:
-   **12-Month Window:** Higher data quality (only 12% null prices) but a smaller dataset (~11k rows).
-   **30-Month Window:** A significantly larger dataset (~16k rows) but with a slight decrease in quality (18% null prices).

This confirms our suspicion that older records are more likely to have missing price information. The presence of a `price` appears to be a crucial indicator of a listing's overall data quality and commercial viability.

This leads to our next core hypothesis: that the dataset can be effectively segmented into "high-quality" and "low-quality" records based solely on the presence of a `price`.

**Next Step:** To test this, the next analysis will compare the overall data completeness of two groups:
1.  Listings where `price` is present.
2.  Listings where `price` is null.

The results of this comparison will inform the final strategic decision on which records to include in our `df_final` dataset.

####2.2 Price-Based Analysis

**Hypothesis:** Accommodations where `price` is not `null` are expected to represent higher-quality records and, therefore, _have fewer null values_ in other columns when compared to accommodations where `price` is missing.

**Rationale:** The absence of a price may indicate an incomplete, outdated, or invalid record. It is plausible that such records are also missing other important information, impacting overall data quality.

**Methodology:** To test this hypothesis, the dataframe will be segmented into two groups:
1. price is not null
2. price is null 

A `null` value analysis will be performed on each group to compare the overall data completeness.

In [0]:
# df_with_price: Accommodations WITH price
df_with_price = df_bnb.filter(f.col("price").isNotNull())

# df_without_price: Accommodations WITHOUT price
df_without_price = df_bnb.filter(f.col("price").isNull())

# original dataframe row count
total_original = df_bnb.count()

# new dataframes row count
with_price_count = df_with_price.count()
without_price_count = df_without_price.count()

# checking data consistency
if total_original == (with_price_count + without_price_count):
    print("verification was successful!")
else:
    print("The sum of the parts is NOT equal to the total.")

In [0]:
# calculating null counts and percentages for each column
mappingNullvalues(df_with_price)

In [0]:
# calculating null counts and percentages for each column
mappingNullvalues(df_without_price)

#### Conclusion: Price as a Key Indicator of Data Quality

The analysis strongly confirms our hypothesis. The group of listings **with a price** (`df_with_price`) demonstrates significantly higher data quality across nearly all columns compared to the group without a price. For instance, the null rate for `bathrooms` drops from **98.19%** to **0.03%**, and for `host_response_rate` from **76.79%** to **13.71%**.

This investigation, combined with the previous analysis on review activity, has allowed us to identify the two primary characteristics of a high-quality, relevant listing for our analysis:

1.  **Commercial Viability:** The listing must have a `price`. The absence of a price has proven to be the strongest indicator of an incomplete or inactive record.
2.  **Recent Market Activity:** The listing should have recent engagement. As determined previously, filtering by recent reviews (e.g., within the last 12 or 30 months) is an effective proxy metric for this.

Therefore, relying on just one of these filters is suboptimal. The most robust strategy is to define our final analytical dataset, `df_final`, by selecting the records that satisfy **both** criteria. This will create a core dataset of listings that are both commercially viable and recently active, providing the most reliable foundation for our analysis.

#### 2.3 Defining the Final Analysis Dataset: `df_final`

The preceding analyses have established two primary criteria for identifying relevant listings: **Commercial Viability** (the presence of a `price`) and **Recent Market Activity** (a review within the last 2.5 years).

The strategic decision has been made to create a broader and more inclusive dataset for our analysis. This approach ensures that we do not discard any record that meets at least one of these relevance criteria.

This comprehensive dataset is ideal for our objectives of exploratory analysis, creating comprehensive dashboards, and understanding the market as a whole. It allows us to analyze the full spectrum of listings that are either commercially viable, recently active, or both.

Therefore, `df_final` will be constructed as the **union** of these two subsets:
1.  All listings that have a valid `price`.
2.  All listings that have a `last_review` within the last 2.5 years.

The resulting DataFrame will serve as the complete and final basis for our subsequent cleaning and normalization steps.

In [0]:
# inner join df and dfplus_no_dups
df_final = df_reviews_ranged.union(df_with_price).distinct()

# checking data consistency
final_count = df_final.count()
id_final_count = df_final.select("id").distinct().count()

if final_count == id_final_count:
    print("dataframes union was successful!")
else:
    print("dataframe ids are not unique, union unsuccessful.")

In [0]:
# generating null counts and percentages for each column
mappingNullvalues(df_final)

reflex√£o

#### 2.4 Removing unecessary colunms

Based on our analytical goal (understanding revenue drivers), the following columns have been identified as unnecessary. The primary criterion for removal is a lack of a clear and relevant relationship to successful ad behavior.

**Action Plan:** Instead of actively dropping these columns from the `df_final` DataFrame now, they will simply be **excluded** during the database normalization step. This is a more efficient approach, as we will only select the columns we need when creating the new tables

colunm  | motivation
--- |---
neighborhood_highlights | when neighborhood_overview has data, neighborhood_highlights = true otherwise it's false. we already have this info at neighborhood_overview column
 has_avaliability | data dictionary doesn't explain the column behavior in a way that makes sense with avaliability_365
 host_name | doesn't impact in host appealing for the clients
 calculated_host_listings_count data | dictionary doesn't explain how it's calculated
 host_total_listings_count | dictionary doesn't explain how it's calculated and we can count the listings
host_listings_count| dictionary doesn't explain how it's calculated and we can count the listings



### 3. Database normalization

![colunms from both dataframe](/Volumes/workspace/default/primeirovolume/colunms_analysis.jpg)


The columns in our dataset can be grouped into four primary categories: **Host, Room, Review** and **Location** data. To improve data integrity, flexibility and storage efficiency, we will normalize this dataset into three primary tables. 

Additionally, the `amenities` column contains a list of features for each listing. To properly structure this one-to-may relationship, a fourth table, `df_amenities`, will be crated. 

df | type of data | Primary Key (PK) | Foreign Keys (FK)
---- | ----- | -----  | ----- 
df_rooms | room details | `id` | `host_id`(references `df_hosts`)
df_hosts | host related information | `host_id` | -
df_reviews | client perception related information | `room_id` | `room_id`(references `df_rooms`)
df_amenities | room amenities list |  `room_id`  | `room_id`(references `df_rooms`)

In [0]:
# creating table df_rooms
df_rooms = df_final.select(
            "id", "name","host_id", "neighbourhood_group", "neighbourhood", "latitude", "longitude",
            "room_type", "price", "minimum_nights", "maximum_nights", "availability_365",
            "neighborhood_overview", "license", "accommodates",
            "bathrooms", "bedrooms", "estimated_occupancy_l365d"
)

df_rooms.describe().display()

In [0]:
# creating table df_hosts and removing duplicates at the same time
df_hosts = df_final.select(
           "host_id", "host_since", "host_response_time", "host_response_rate",
            "host_acceptance_rate", "host_is_superhost", "host_identity_verified"
            ).dropDuplicates(["host_id"])

df_hosts.describe().display()

In [0]:
# creating table df_reviews
df_reviews = df_final.select("id", "number_of_reviews","last_review", "reviews_per_month",
            "number_of_reviews_ltm", "number_of_reviews_ly","first_review", "review_scores_rating",
            "review_scores_accuracy", "review_scores_cleanliness", "review_scores_checkin",
            "review_scores_communication", "review_scores_location", "review_scores_value"
            ).withColumnRenamed("id", "room_id")

df_reviews.describe().display()

In [0]:
# creating table df_amenities
df_amenities = df_final.select("id","amenities"
              ).withColumnRenamed("id", "room_id")

df_amenities.describe().display()

### 4. Understanding and cleaning data

1. Casting types
2. Treating null/empty fields


#### 4.1 df_rooms

##### 4.1.1 Understanding columns

In [0]:
# schema 
# checking the data types to plan for the casting step
df_rooms.printSchema()

# displaying descriptive statistics
df_rooms.describe().display()
df_rooms.display()

**Regarding data types:**

The `bathrooms`column could not be directly cast to an integer because it contains float values (e.g., "1.5"). This requires special handling, as a listing can have a "half-bath". The other columns have the expected types based on the data dictionary.

**Observation and Next Steps Regarding Price Distribution:**

The descriptive statistics reveal an extremely wide range for the `price` column, with a minimum value likely near $3 and a maximum value exceeding $50,000. This discrepancy for a daily rate strongly suggests the presence of outliers or potential data entry errors.

Such extreme values can significantly distort measures like the mean and median, potentially leading to inaccurate conclusions about typical pricing.

These price outliers will be identified and handled using the robust Interquartile Range (IQR) method. This will provide a cleaner dataset focused on more representative pricing patterns.

##### 4.1.2 Type casting

First, we address the specific case of the `bathrooms` column, which requires special handling due to the presence of "half-bath" values.

Next, we cast the geographical coordinates to ensure they are in the correct numeric format for mapping visuals.

In [0]:
### BATHROOMS
''' 
From date dictonary information, it should be numeric representing the amount of bathrooms in the property.
However, there are values we could classify as [float]. After researching on Airbnb website, we found that the value
"0.5" could represent a bathroom without a bathing facility or wihout a toilet. With that information, we will create
a complementar colunm called "incomplete_bathroom" do register that information in a way we can analize with more clarity.
'''
# counting incomplete bathrooms (if the row has ".5" count it)
inc_bath_count = df_rooms.filter(df_rooms.bathrooms.rlike("\\.5$")).count()
print(f"ther are {inc_bath_count} incomplete bathrooms")

# removing 0.5 from bathrooms conlumn and keeping the null values
df_rooms = df_rooms.withColumn(
            # creating a column to register if the value is 0.5 keeping null values
            "incomplete_bathroom",
            f.when(
                f.col("bathrooms").isNotNull() & 
                f.col("bathrooms").rlike("\\.5$"),
                1
            ).otherwise(0)
        ).withColumn("bathrooms",
            # removing 0.5 from bathrooms conlumn and keeping the null values and casting it as int
            f.when(
                  f.col("bathrooms").isNotNull(),
                  f.split(f.col("bathrooms"), "\\.").getItem(0).try_cast("int"))
            .otherwise(None)
        )

# verifying data consistency
# if bathroom col has more then 0 values ending with ".5", fail, otherwise: success
if df_rooms.filter(f.col("bathrooms").rlike("\\.5$")).count() > 0:
   print("- conversion failed")
else:
   print("- conversion was successful")

# getting the rows count from the new column by summing the values
inc_bath_count2 = df_rooms.agg(f.sum("incomplete_bathroom")).collect()[0][0]

# comparing the two counts
if inc_bath_count2 != inc_bath_count:
   print("- incomplete_bathroom column creation failed.")
else:
   print("- incomplete_bathroom column creation was successful!")    
df_rooms.display()

In [0]:
# latitude and longitude

# cast latitude and longitude from string to double
df_rooms = df_rooms.withColumn(
    "latitude",
    f.col("latitude").cast("double")
).withColumn(
    "longitude",
    f.col("longitude").cast("double")
)

print("\nCasting for latitude and longitude complete.")

# --- Verification for Lat/Lon ---
print("\nSchema after casting latitude and longitude:")
df_rooms.printSchema()

print("\nSample values:")
df_rooms.select("latitude", "longitude").show(5)

In [0]:
# calculate the count and percentage of null values in each column
mappingNullvalues(df_rooms)

##### 4.1.3 Removing Price Outliers (IQR Method)

To ensure our revenue analysis is based on plausible and representative data, we will remove price outliers using the Interquartile Range (IQR) method.

The IQR method identifies outliers as values falling below Q1 - 1.5*IQR or above Q3 + 1.5*IQR.


In [0]:
df_rooms.agg(f.min("price"), f.max("price")).show()

# --- 1. IQR Calculation ---
# calculate Q1 (25th percentile) and Q3 (75th percentile) for the 'price' 
quantiles = df_rooms.approxQuantile("price", [0.25, 0.75], 0.0)

# proceed only if quantiles were successfully calculated
if len(quantiles) == 2 :
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    print(f"\nQ1 Calculated: {Q1}")
    print(f"Q3 Calculated: {Q3}")

    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    print(f"IQR: {IQR}")
    print(f"Calculated Lower Bound: {lower_bound}")
    print(f"Calculated Upper Bound: {upper_bound}")

    # --- 2. Filtering ---
    # filter the DataFrame, keeping rows within the IQR bounds OR where price is null.
    df_rooms_no_outliers = df_rooms.filter(
        (f.col("price") >= lower_bound) &
        (f.col("price") <= upper_bound) |  
        f.col("price").isNull()             
    )

    count_after = df_rooms_no_outliers.count()
    print(f"\nRecord count after price outlier removal: {count_after}")
    print(f"Number of price outliers removed: {df_rooms.count() - count_after}")

    # --- 3. Verification ---
    print("\nPrice range after outlier removal:")
    df_rooms_no_outliers.agg(f.min("price"), f.max("price")).show()

    # overwriting the original df_rooms with the cleaned version
    df_rooms = df_rooms_no_outliers
    print("\ndf_rooms updated with outliers removed.")

else:
    print("\nWARNING: Could not calculate Q1/Q3 for 'price'. Outlier removal was skipped. df_rooms remains unchanged.")

# displaying a sample or schema to confirm
df_rooms.select("price").summary("min", "max").show()

In [0]:
# showing listings with price = $3
print(f"Listings with price = ${3}:")
df_low_price = df_rooms.filter(f.col("price") == 3)

df_low_price.select(
    "id", "name", "host_id", "neighbourhood", "room_type",
    "price", "minimum_nights", "accommodates"
).show(50, truncate=False) 

print(f"Total count of listings with price <= ${3}: {df_low_price.count()}")

In [0]:
'''
To ensure the integrity and realism of our price analysis, this single anomalous record will be **removed** from the `df_rooms` dataset.
'''
# removing the anomalous record (if price is not 3)
df_rooms = df_rooms.filter(
                    (f.col("price")!= 3)|     
                    f.col("price").isNull()    
)

# displaying a sample or schema to confirm
df_rooms.select("price").summary("min", "max").show()

In [0]:
# getting number of records in df_rooms
total_records = df_rooms.count()

# getting the number of null values ‚Äã‚Äãin the 'price' column.
null_prices = df_rooms.filter(f.col("price").isNull()).count()

print(f"Records count in df_rooms: {total_records}")
print(f"Null values ‚Äãcount ‚Äãin 'price': {null_prices}")

# showing null values percentage ‚Äã‚Äãin 'price'
if total_records > 0:
    percentual_null_price = (null_prices / total_records) * 100
    print(f"Percentage of null values ‚Äã‚Äãin the 'price' column: {percentual_null_price:.2f}%")

#####4.1.4 Synchronizing Related Tables

Now that `df_rooms` has been filtered (e.g., price outliers removed), it represents our definitive set of valid listings for the analysis. However, the `df_hosts`, `df_reviews`, and `df_amenities` DataFrames were created earlier from a potentially larger dataset and may still contain records related to listings that are no longer present in the cleaned `df_rooms`.

To ensure consistency across our normalized tables, we will now filter `df_hosts`, `df_reviews`, and `df_amenities` to keep only the records that correspond to the valid `id`s and `host_id`s remaining in the cleaned `df_rooms`.

**Methodology:**
1.  Extract the distinct `id`s (room IDs) from the current `df_rooms`.
2.  Extract the distinct `host_id`s from the current `df_rooms`.
3.  Perform a **semi join** between `df_reviews` / `df_amenities` and the valid room IDs. A semi join keeps only the rows from the left DataFrame (`df_reviews`/`df_amenities`) that have a matching key in the right DataFrame (valid room IDs).
4.  Perform a semi join between `df_hosts` and the valid host IDs.



In [0]:
# --- Re-synchronize tables based on the FINAL df_rooms ---
print("Re-synchronizing related tables with the final df_rooms...")

# getting list of valid room IDs present in the final df_rooms
# Renomeamos 'id' para 'room_id' para facilitar o join com df_reviews/df_amenities
valid_room_ids = df_rooms.select(f.col("id").alias("room_id")).distinct()
print(f"Number of valid room IDs found in final df_rooms: {valid_room_ids.count()}")

# getting list of valid host IDs present in the final df_rooms
valid_host_ids = df_rooms.select("host_id").distinct()
print(f"Number of valid host IDs found in final df_rooms: {valid_host_ids.count()}")

# filtering df_reviews to keep only reviews for valid rooms
count_reviews_before = df_reviews.count()

# semi join': it keeps the rows from df_reviews that have a match in valid_room_ids
df_reviews = df_reviews.join(valid_room_ids, on="room_id", how="semi")
count_reviews_after = df_reviews.count()
print(f"df_reviews synced: {count_reviews_before} -> {count_reviews_after} rows")

# filteringdf_hosts to keep only hosts present in the final df_rooms
count_hosts_before = df_hosts.count()
df_hosts = df_hosts.join(valid_host_ids, on="host_id", how="semi")
count_hosts_after = df_hosts.count()
print(f"df_hosts synced: {count_hosts_before} -> {count_hosts_after} rows")

# filtering df_amenities (assuming it also uses 'room_id')
count_amenities_before = df_amenities.count()
df_amenities = df_amenities.join(valid_room_ids, on="room_id", how="semi")
count_amenities_after = df_amenities.count()
print(f"df_amenities synced: {count_amenities_before} -> {count_amenities_after} rows")

print("\nSynchronization complete.")

##### 4.1.5 Treating Null/Empty Fields

The null value analysis indicates that `neighborhood_overview`, `price`, `bathrooms`, and `bedrooms` are the primary columns with missing data in the `df_rooms` table. A differentiated strategy will be applied to handle these nulls.

**Imputation Strategy:**

-   **`price`**: **No imputation will be performed.** We hypothesize that null prices are linked to inactive listings. Therefore, the strategy will be to **filter the dataset to include only records with a valid price** for any subsequent revenue-focused analysis. This ensures that our financial calculations are based solely on commercially viable listings.

-   **`bathrooms` and `bedrooms`**: The nulls in these columns will be left as is for now. They can be addressed in a more detailed analysis later if required. 

-   **`neighborhood_overview`**: This is the only column where imputation will be performed at this time. Given its potential for contextual insights about neighborhood, nulls will be replaced with `"Not Informed"`.

**Action Plan:**
Based on the strategy above, the only action to be executed now is the imputation of the `neighborhood_overview` column.


In [0]:
# treating the 'neighborhood_overview' column
df_rooms = df_rooms.na.fill(
    value="Not Informed",
    subset=["neighborhood_overview"]
)

# verification
after_count = df_rooms.filter(f.col('neighborhood_overview').isNull()).count()
print(f"Null count for 'neighborhood_overview' after imputation: {after_count}")
if after_count == 0:
    print("‚úÖ Verification successful.")


#### 4.2 df_hosts
##### 4.2.1 Understanding columns

In [0]:
# schema 
# checking the data types to plan for the casting step
df_hosts.printSchema()

# checking  for duplicate host_ids
# This is a sanity check to confirm that the .dropDuplicates() during creation was successful. The expected output is 0.
duplicate_count = df_hosts.count() - df_hosts.select("host_id").distinct().count()
print(f"Number of duplicate host_ids: {duplicate_count}")

# displaying descriptive statistics
df_hosts.describe().display()

df_hosts.display()


**Schema Analysis & Casting Plan**

The `printSchema()` output and the uniqueness check confirm that the `df_hosts` DataFrame is correctly structured with a unique `host_id` for each record. The `host_id` (string) and `host_since` (date) columns already have appropriate data types.

However, the analysis reveals that several other columns representing numerical or boolean data are currently stored as strings. To enable accurate calculations and logical filtering, the following transformations will be performed in the next step, **"4.2.2 Casting Types"**:

-   **`host_response_rate`** and **`host_acceptance_rate`**: These will be converted from string percentages (e.g., "95%") to a numeric `double` type (e.g., 0.95).
-   **`host_is_superhost`** and **`host_identity_verified`**: These will be converted from string flags (e.g., "t", "f") to a `boolean` type (True/False).


In [0]:
# calculating the count and percentage of null values in each column
mappingNullvalues(df_hosts)

**Decision**

Based on the analysis above, all columns in the `df_hosts` table have a very low percentage of null values. Therefore, all columns are considered valuable and **no columns will be removed**.


#####4.2.2 Casting types

In [0]:
# converting "host_response_rate" from string "100%" to the number 1.0
df_hosts = df_hosts.withColumn(
    "host_response_rate",
    f.regexp_replace(
        f.col("host_response_rate"),
        "%",
        ""
    ).cast("double")/100   
)

# converting "host_acceptance_rate" from string "100%" to the number 1.0
df_hosts = df_hosts.withColumn(
    "host_acceptance_rate",
     f.regexp_replace(
            f.col("host_acceptance_rate"),
            "%",
            ""
        ).cast("double")/100
)

# converting "host_is_superhost" from string "t" or "f" to boolean True or False
df_hosts = df_hosts.withColumn(
    "host_is_superhost",
    f.when(f.col("host_is_superhost") == "t", True)
     .when(f.col("host_is_superhost") == "f", False)
     .otherwise(None)
)

# converting "host_identity_verified" from string "t" or "f" to boolean True or False
df_hosts = df_hosts.withColumn(
    "host_identity_verified",
    f.when(f.col("host_identity_verified") == "t", True)
     .when(f.col("host_identity_verified") == "f", False)
     .otherwise(None)
)

display(df_hosts)
df_hosts.printSchema()

In [0]:
# data statistics
display(
        df_hosts.select("host_response_rate", "host_acceptance_rate")
        .summary("count", "mean", "stddev", "min", "50%", "max").show()
)

**Insight: Analysis of Host Response and Acceptance Rates**

The summary statistics for `host_response_rate` and `host_acceptance_rate` reveal a significant difference between the mean (average) and the median (50th percentile), which points to a skewed distribution of the data.

**Key Observations:**
-   For `host_response_rate`, the **median is 1.0 (100%)**, while the **mean is significantly lower at approximately 0.87 (87%)**.
-   A similar pattern is observed for `host_acceptance_rate`, with a **median of 0.94 (94%)** and a lower **mean of approximately 0.76 (76%)**.

**Interpretation:**
This discrepancy confirms that the data for both metrics is **negatively skewed (left-skewed)**. A large number of hosts are highly responsive, with at least 50% of them having a perfect 100% response rate. The lower mean is being pulled down by a subset of hosts with very low response and acceptance rates. These outliers have a disproportionate effect on the average.

**Business Implication & Next Step:**
This suggests there may be two distinct segments of host behavior: a majority of highly engaged hosts and a "long tail" of less responsive ones. The latter could represent an opportunity for engagement or a potential risk to the guest experience.

To better visualize this distribution and clearly illustrate the difference between the mean and median, creating a **histogram** or a **box plot** for both columns would be an excellent next step. This would provide a powerful visual insight into host performance.

#####4.2.3 Treating null/empty fields

After the type casting, the next step is to handle the remaining null values in the `df_hosts` table.

**Strategy: Imputation over Deletion**

The primary goal is to treat all null values **without deleting any rows**. This is a critical decision based on the following reason:

-   **Referential Integrity:** The `df_hosts` DataFrame is a supplementary table to `df_rooms`. Deleting host records from this table would create rooms without an associated host, which would break the referential integrity of our final dataset and lead to data loss in the dashboard.

**Action Plan & Implementation**

Based on the nature of each column, the following imputation strategy will be applied:

-   **Categorical Columns (`host_response_time`):** Null values will be replaced with the string `"Not Informed"`. This preserves the record while clearly marking the data as missing.

-   **Numerical Rate Columns (`host_response_rate`, `host_acceptance_rate`):** Null values will be filled with `0`. This is a conservative choice reflecting a lack of recorded activity. We cannot assume an average value, as the host may be new or have insufficient data, and imputing a mean could falsely inflate their performance.

-   **Boolean Columns (`host_is_superhost`, `host_identity_verified`):** Null values will be treated as `False`. This assumes that if a positive status (like being a Superhost) is not explicitly recorded as `True`, it does not apply.

In [0]:
print("Original data with nulls:")
# showing some rows that we know will be treated
df_hosts.filter(f.col("host_response_rate").isNull()).show(3, truncate=False)

# applying the .na.fill() transformations and overwrite the df_hosts DataFrame
df_hosts = df_hosts.na.fill(
                    value=0,
                    subset=["host_response_rate", "host_acceptance_rate"]
                ).na.fill(
                    value=False,
                    subset=["host_is_superhost", "host_identity_verified"]
                ).na.fill(
                    value="Not Informed",
                    subset=["host_response_time"]
                )

print("\nData after imputation (showing the rows that were treated):")

# Verification: Filter by one of the imputed values to see the result
df_hosts.filter(f.col("host_response_time") == "Not Informed").show(3, truncate=False)


In [0]:
# null mapping
mappingNullvalues(df_hosts)

In [0]:
# data statistics
df_hosts.select("host_response_rate", "host_acceptance_rate").summary("count", "mean", "stddev", "min", "50%", "max").show()


**Impact of Null Imputation on Host Rate Metrics**


After imputing `null` values with `0` for the `host_response_rate` and `host_acceptance_rate` columns, the summary statistics were recalculated.

**Key Finding:**
A significant drop was observed in the central tendency metrics. The median `host_acceptance_rate` fell from **0.94 to 0.67**, and the mean for both rates also decreased considerably.

**Interpretation:**
This confirms that the population of hosts with previously `null` rate information performs significantly differently than the population with recorded rates. By filling nulls with `0`, we have created a more complete and realistic picture of the entire host population, which now properly includes a substantial segment of hosts who, likely due to inactivity or insufficient interaction history, did not have a recorded performance rate. The previous, higher metrics were misleadingly optimistic as they only represented a subset of more active hosts.

#####Important Considerations for Dashboard

The imputation choices made here have a direct impact on statistical analysis and must be considered when building visualizations:

-   **Impact on Averages:** Filling null rates with `0` will further **lower the mean (average)** for these metrics. When presenting these insights, it is crucial to prioritize the **median** as the primary measure of central tendency, as it is robust to these `0` values. The mean should only be displayed with a clear note explaining this imputation choice.

-   **Impact on Proportions:** Similarly, treating nulls as `False` for boolean columns will increase the total count of "Non-Superhosts" and "Unverified" hosts. When displaying proportions (e.g., in pie charts or percentage), this context is essential to avoid misinterpreting the overall distribution of host types.



#### 4.3 df_reviews
#####4.2.1 Understanding columns

In [0]:
# schema 
# checking the data types to plan for the casting step
df_reviews.printSchema()

# checking for duplicate room_ids
# sanity checking (expected output is 0)
duplicate_count = df_reviews.count() - df_reviews.select("room_id").distinct().count()
print(f"Number of duplicate room_ids: {duplicate_count}")

# displaying descriptive statistics
df_reviews.describe().display()

df_reviews.display()

**Schema Analysis & Casting Plan**


The `printSchema()` output and the uniqueness check confirm that the `df_reviews` DataFrame is correctly structured, with a unique `room_id` for each record.

The analysis reveals that most columns already have the correct data types (`integer`, `date`, `double`). However, the `reviews_per_month` column, which should be numeric, is currently a `string`. The `.describe()` output also highlights a significant number of null values in the review-related columns (e.g., 24,259 total records vs. ~17,940 in `review_scores_rating`), which will be addressed in the "Treating null/empty fields" step.

For now, the only action required in the 'Casting Types' step is the following:

-   **`reviews_per_month`**: This will be converted from `string` to a numeric `double` type.

In [0]:
# getting total number of rows from the df_reviews table
mappingNullvalues(df_reviews)

**Analysis of Null Values**

The null value analysis for the `df_reviews` DataFrame reveals a consistent pattern: all `review_scores_...` columns, along with `last_review`, `first_review`, and `reviews_per_month`, share a similar null rate of approximately **26%**.

**Hypothesis:** We hypothesize that these nulls are not random errors, but are an expected and direct consequence of listings that have never received a review.

**Decision & Action Plan:**
Based on this understanding, no columns will be removed. Instead, the null values will be imputed in the next step, **"4.3.3 Treating null/empty fields"**.

Since the `number_of_reviews` column is fully populated (no nulls). We can infer that nulls in all other review-related columns (scores, rates, dates) correspond directly to listings with zero reviews. Imputing these fields with `0` is therefore a logical and accurate way to represent the absence of review activity.

##### 4.3.2 Casting types

In [0]:
# converting "reviews_per_month" from string to the double
df_reviews = df_reviews.withColumn("reviews_per_month",
               f.col("reviews_per_month").cast("double")
)
df_reviews.printSchema()


In [0]:
# data statistics
df_reviews.select("number_of_reviews","reviews_per_month","number_of_reviews_ltm", "number_of_reviews_ly").summary("count", "mean", "stddev", "min", "50%", "max").show()

df_reviews.select("review_scores_rating","review_scores_accuracy","review_scores_cleanliness").summary("count", "mean", "stddev", "min", "50%", "max").show()

df_reviews.select("review_scores_checkin","review_scores_communication","review_scores_location","review_scores_value").summary("count", "mean", "stddev", "min", "50%", "max").show()

**Key Insights from Statistical Analysis**

The statistical summary of the review-related columns reveals two distinct and important data distributions, highlighting why the **median** is a more representative measure of central tendency than the mean for this dataset.

**1. Review Counts (Right-Skewed Distribution):**
For metrics like `number_of_reviews` and `number_of_reviews_ltm`, the **mean is significantly higher than the median** (e.g., a mean of 36 total reviews vs. a median of 6). This indicates a classic right-skewed distribution, often called a "long tail." It shows that while a few "superstar" listings have an extremely high number of reviews, the vast majority of listings have very few. The median, therefore, better reflects the experience of a "typical" listing, which is not skewed by the highly popular outliers. Notably, the median of `0` for reviews in the last 12 months reveals that over half the listings had no recent review activity.

**2. Review Scores (Left-Skewed Distribution):**
Conversely, for all `review_scores_...` columns, the **median is consistently higher than the mean** (e.g., a median rating of 4.86 vs. a mean of 4.72). This indicates a left-skewed distribution, where most data points are clustered at the high end of the scale. It confirms that the typical guest experience is overwhelmingly positive. The mean is slightly pulled down by a small number of very low scores (outliers), making the median a more accurate representation of the consistently high satisfaction level.

#####4.3.3 Treating null/empty fields

**Hypothesis and Strategy**

As established in the previous analysis, our primary hypothesis is that the null values in `reviews_per_month` and all `review_scores_...` columns are a direct result of the listing having zero reviews.

To test this and treat the nulls simultaneously, we will perform a conditional imputation. For every row where the `number_of_reviews` column is `0`, we will fill the corresponding null values in all review-score and rate columns with the integer `0`. A value of `0` is a logical imputation, as it accurately reflects the absence of any review activity or score.

**Verification**

After this operation, we will re-run the null value analysis. If our hypothesis is correct, the null counts for these target columns should drop to zero, confirming that all missing review data was indeed linked to listings with no reviews.


In [0]:
# corrected list of columns to apply the conditional fill
columns_to_fill = [
    "reviews_per_month",
    "review_scores_rating",
    "review_scores_accuracy",
    "review_scores_cleanliness",
    "review_scores_checkin",
    "review_scores_communication",
    "review_scores_location",
    "review_scores_value"
]

# looping through the columns and apply the conditional fill
for col_name in columns_to_fill:
    df_reviews = df_reviews.withColumn(
        col_name,
        f.when(
            f.col("number_of_reviews") == 0, 0
        ).otherwise(f.col(col_name))
    )

# analyzing nulls after imputation
mappingNullvalues(df_reviews)

In [0]:
# data statistics
df_reviews.select("number_of_reviews","reviews_per_month","number_of_reviews_ltm", "number_of_reviews_ly").summary("count", "mean", "stddev", "min", "50%", "max").show()

df_reviews.select("review_scores_rating","review_scores_accuracy","review_scores_cleanliness").summary("count", "mean", "stddev", "min", "50%", "max").show()

df_reviews.select("review_scores_checkin","review_scores_communication","review_scores_location","review_scores_value").summary("count", "mean", "stddev", "min", "50%", "max").show()

#####Final Conclusions and Dashboarding Recommendations

**1. Hypothesis Confirmation**

The conditional imputation was highly successful. The null counts for `reviews_per_month` and all `review_scores_...` columns dropped from ~26% to nearly zero (a residual ~0.09% remains, indicating a separate minor data quality issue). This confirms our primary hypothesis: the vast majority of missing review data was directly linked to listings with `number_of_reviews == 0`.

**2. Impact on Statistical Metrics**

The imputation of `0` for these records has, as expected, significantly impacted the summary statistics:

-   **The Mean:** A notable drop was observed in the mean for all imputed columns (e.g., the mean `review_scores_rating` fell from ~4.72 to ~3.49). This is not an error, but a positive outcome. The new mean provides a more **holistic and realistic** measure of the entire listing population, as it now includes the large segment of listings with zero review performance. The previous mean was biased, representing only the subset of listings that already had reviews.
-   **The Median:** In contrast, the median for the review scores remained relatively high (e.g., the median `review_scores_rating` only shifted from ~4.86 to ~4.73). This is a powerful insight. It demonstrates that the **"typical" experience for a listing that *does* get reviewed is still excellent**.

**3. Recommendations for Dashboard Representation**

-   **Prioritize the Median for KPIs:** The median is the most honest and robust metric for representing a "typical" score, as it is unaffected by the large number of `0`-value outliers.
-   **Contextualize the Mean:** Label the mean clearly to avoid misinterpretation (e.g., "Overall Average Score, including listings with 0 reviews"), as it represents the entire population.
-   **Visualize the Bimodal Distribution:** Use a **histogram** to clearly show the two distinct groups: a large spike at `0` (the imputed listings) and another at the high end of the scale (4.5-5.0).
-   **Implement a User-Driven Filter:** In Power BI, provide a **slicer** that allows users to toggle between viewing "All Listings" and only "Actively Reviewed Listings."
-   **Handle Null Dates:** Leave the remaining nulls in `last_review` and `first_review` as `NULL`. In the dashboard, handle this by creating a "Not Reviewed" category or by filtering them out of time-based analyses.

#### 4.4 df_amenities

This table contains a list of additional room amenities and can be used to investigate the pattern of successful rooms in New York.

To facilitate analysis, we decided to create a table with 1 and 0 values ‚Äã‚Äãper column, representing whether the listing has that amenity or not. This can allow for simpler correlation analysis.

In [0]:
# transforming strings to arrays
df_amenities = df_amenities.withColumn("amenities_array",
               f.from_json(f.col("amenities"),
               t.ArrayType(t.StringType()))
)
df_amenities.printSchema()

In [0]:

# counting distinct values in amenities column
def extractDistinctValuesArray(df, array_column):
    # exploding array column to get individual values
    distinct_values = (df
                    .select(f.explode(f.col(array_column)).alias("value")) 
                    .filter(f.col("value").isNotNull())
                    .select("value")
                    .distinct()
    )
    # creating a list of distinct values getting them from df with collect()
    amenities_distinct = []
    for row in distinct_values.collect():
        amenities_distinct.append(row['value'])
    
    return amenities_distinct

amenities_distinct = extractDistinctValuesArray(df_amenities, "amenities_array")

# printting list and its counting
print(amenities_distinct)
print(f"Distinct amenities count: {len(amenities_distinct)}")


Observing the count and data, we notice that the amenities input is not predetermined and the user can write freely, which makes human analysis difficult.

We'd like to know **how many amenities each room has** and **classify them into groups**. To do this, we'll use the `MLib` library.

In [0]:
# importing necessary libraries
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, HashingTF
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

In [0]:
'''
We'll train a model to analyze the amenities data and organize it into groups.
 The idea is to get between 5 and 10 groups to create columns with.

    1 - Tokenize and create 15 clusters, considering
        that the model may repeat groups when clustering, so we
        have some leeway.

    2 - Check the quality of the clustering. If it's good,
        we'll define the groups based on it.

    3 - Create columns with the groups in question and fill
        '1' when the list has amenities in that group and '0' when it doesn't.
'''

# creating schema for DataFrame
schema = t.StructType([t.StructField("amenity", t.StringType(), True)])

# creating a DataFrame with amenities as rows
df_sample = df_amenities.select("amenities_array").limit(5000)
df_exploded_distinct = df_sample.select(f.explode("amenities_array").alias("amenity")).distinct() #using distinct to permit analize clustering quality

In [0]:


# creating optimized pipeline for amenities clustering
def creatingClusteringPipeline(groups_num=20):
    # 1. Tokenization
    tokenizer = Tokenizer(inputCol="amenity", outputCol="words")

    # 2. stop words to remove
    amen_stopwords = [
        'with', 'and', 'or', 'the', 'a', 'an', 'in', 'on', 'at', 'to', 
        'for', 'of', 'by', 'via', '&', '‚Äì', 'available', 'specific', 'wih'
    ]

    stopwords_remover = StopWordsRemover(
        inputCol="words",
        outputCol="filtered_words",
        stopWords=amen_stopwords
    )
    
     # 3. mapping words with a limited number of features (1500)
    hashing_tf = HashingTF(
        inputCol="filtered_words",
        outputCol="features",
        numFeatures=1500,            
    )
    
    # 4. dataframe clustering with groups_num criteria
    kmeans = KMeans(
        k=groups_num,
        featuresCol="features",
        predictionCol="group",
        seed=42,              # keeping the same seed for reproducibility
        maxIter=30,           # rising the number of iterations for better results
        tol=1e-6
        )
    
    pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, kmeans])
    return pipeline

# Creating and training pipeline
pipeline = creatingClusteringPipeline(groups_num=20)
model = pipeline.fit(df_exploded_distinct)

In [0]:
# creating a function to suggest group names based on the most frequent words
def suggestGroupNaming(amenities_list):
    # keywords by categories
    categories = {
        'TV & Streaming': ['tv', 'hdtv', 'netflix', 'hbo', 'disney', 'hulu', 'prime', 'video', 'streaming','chromecast', 'roku', 'apple'],
        'Gaming & Entertainment': ['game', 'console', 'xbox', 'playstation', 'nintendo', 'switch', 'ps4', 'ps5', 'dvd', 'bluray'],
        'Internet & Connectivity': ['wifi', 'internet', 'mbps', 'ethernet', 'broadband'],
        'Climate Control': ['ac', 'air', 'heater', 'fan', 'ventilation'],
        'Kitchen Appliances': ['refrigerator', 'fridge', 'stove','oven','microwave', 'keurig', 'toaster','kettle','dishwasher', 'cooktop', 'induction'],
        'Coffee & Beverage': ['coffee', 'espresso', 'maker', 'french', 'press', 'nespresso'],
        'Bathroom Amenities': ['shampoo', 'hair', 'conditioner', 'soap', 'body', 'shower', 'bidet', 'towel', 'hairdryer'],
        'Laundry Facilities': ['washer', 'dryer', 'laundry', 'iron', 'laundromat', 'drying'],
        'Parking & Transportation': ['parking', 'garage', 'valet', 'street', 'free'],
        'Security & Access': ['keypad', 'lock', 'security', 'alarm', 'camera', 'safe', 'detector'],
        'Fitness': ['gym', 'exercise', 'weights', 'treadmill', 'yoga', 'elliptical', "exercise", 'equipment', 'workout'],
        'Family & Kids': ['high' 'chair', 'crib', 'children', 'toys', 'baby', 'monitor'],
        'Outdoor Spaces': ['patio', 'balcony', 'terrace', 'garden', 'outdoor', 'waterfront', 'view'],
        'Outdoor recreation': ['grill', 'firepit', 'fire', 'bbq', 'grill', 'pool','swimming','hot', 'tub', 'sauna', 'jacuzzi'],
        'Sound Systems': ['sound', 'system', 'speaker', 'bluetooth', 'sonos', 'bose', 'jbl', 'marshall', 'audio'],
    }

    # counting matches by category
    categories_count = {}
    full_text = ' '.join(amenities_list).lower()
    
    for cat, keywords in categories.items():
        matches = sum(1 for word in keywords if word in full_text)
        if matches > 0:
            categories_count[cat] = matches
    
    if categories_count:
        main_categorie = max(categories_count.items(), key=lambda x: x[1])[0]
        return main_categorie
    else:
        # if no category is found, return a generic name
        return f"Generic group ({len(amenities_list)} amenities"

In [0]:
# analizing clusters to create groups by looking at the most frequent words
def namingGroups(model, df_distinct, num_groups=15):
    # 1. Applying model to clusters creation 
    df_with_groups = model.transform(df_exploded_distinct)
    
    # 2. Analizing and naming clusters based on the most frequent words
    named_groups = {}
    
    for group_id in range(num_groups):
        grouped_amenities = df_with_groups.filter(f.col("group") == group_id
                                                  ).select("amenity"
                                                  ).collect()
        
        amenities_list = [row['amenity'] for row in grouped_amenities]
        
        # analizing patterns
        group_name = suggestGroupNaming(amenities_list)
        
        named_groups[group_id] = {
            'name': group_name,
            'amenities': amenities_list,
            'qtd': len(amenities_list)
        }
        
        # print for visualization
        print(f"{group_id}: {group_name}")
        print(f"{len(amenities_list)} amenities")
        print(f"ex.: {amenities_list[:10]}")
        print()
    
    return named_groups

# calling nameing function
named_groups = namingGroups(model, df_exploded_distinct)

In [0]:

'''
There are 2 poorly classified groups, let's ignore them and keep the other groups that has mor accurate data
'''

remove_groups = [11, 13]
named_groups = {
    group_id: group_info
    for group_id, group_info in named_groups.items()
    if group_id not in remove_groups
}

# creating a new column with amenities list size
df_amenities = df_amenities.withColumn('amenities_count',
                            f.size(f.col('amenities_array'))
                        )

# 1. Extracting unique group names and create mapping
group_names = list(set([info['name'] for info in named_groups.values()]))

# 2. Associating each amenity with its group
amenity_to_group = {}
for group_id, group_info in named_groups.items():
    group_name = group_info['name']
    for amenity in group_info['amenities']:
        if amenity not in amenity_to_group:
            amenity_to_group[amenity] = set()

        amenity_to_group[amenity].add(group_name) 
    
# 3. Creating a function to check if any amenity belongs to the target group
def checkAmenityGroup(amenities_list, target_group):
    if amenities_list is None:
        return 0
    
    for amenity in amenities_list:
        if amenity in amenity_to_group and target_group in amenity_to_group[amenity]:
            return 1
    return 0

# 4: register UDF for each group
for group_name in group_names:
    # Create UDF for the target group
    check_group_udf = udf(
                      lambda amenities: checkAmenityGroup(amenities, group_name),
                      t.IntegerType()
                )

    # adding a column for the target group
    column_name = group_name.replace(" & ", "_"
                                     ).replace(" ", "_"
                                     ).replace("-", "_")

    df_amenities = df_amenities.withColumn(column_name,
                                check_group_udf(f.col("amenities_array"))
        )
    
# 5. Selecting the final columns
col_name = [group_name.replace(" & ", "_"
                        ).replace(" ", "_"
                        ).replace("-", "_") 
                        for group_name in group_names
                    ]
    
final_columns = ["room_id"] + col_name + ["amenities_count"]
    
df_amen_final = df_amenities.select(final_columns)
    
for group_name in group_names:
    col_name = group_name.replace(" & ", "_"
                        ).replace(" ", "_"
                        ).replace("-", "_"
                    )
    count_ones = df_amen_final.filter(f.col(col_name) == 1).count()
    total_rows = df_amen_final.count()
    percentage = (count_ones / total_rows) * 100
    print(f"   {col_name}: {count_ones}/{total_rows} rooms ({percentage:.1f}%)")

df_amen_final.display()

###5. Saving DataFrames to Unity Catalog

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.projeto_airbnb;

In [0]:
# define catalog and schema names
catalog_name = "workspace"
schema_name = "projeto_airbnb"

# save each DataFrame as a table in Delta format (default)
df_rooms.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.rooms")
df_hosts.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.hosts")
df_reviews.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.reviews")
df_amen_final.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog_name}.{schema_name}.amenities")

print("All tables saved successfully in Unity Catalog!")