# Create Refreshable Spark Instance

# Environment Setup

In [7]:
# Nuclear Spark Reset - Complete Clean Start
# This completely wipes all Spark references and starts from scratch

import subprocess
import time
import os
import sys

print("=" * 60)
print("NUCLEAR SPARK RESET - COMPLETE SYSTEM CLEANUP")
print("=" * 60)

# Step 1: Kill ALL Java processes (nuclear option)
print("\nSTEP 1: TERMINATING ALL JAVA PROCESSES")
print("-" * 40)
try:
    subprocess.run(['pkill', '-f', 'java'], capture_output=True)
    time.sleep(5)  # Longer wait for complete cleanup
    print("STATUS: All Java processes terminated")
except:
    print("STATUS: Java process cleanup completed")

# Step 2: Clear ALL PySpark imports from memory
print("\nSTEP 2: CLEARING PYSPARK FROM MEMORY")
print("-" * 40)
modules_to_remove = []
for module_name in sys.modules:
    if 'pyspark' in module_name or 'py4j' in module_name:
        modules_to_remove.append(module_name)

for module_name in modules_to_remove:
    if module_name in sys.modules:
        del sys.modules[module_name]
        
print(f"STATUS: Removed {len(modules_to_remove)} Spark-related modules from memory")

# Step 3: Set environment variables for clean start
print("\nSTEP 3: CONFIGURING CLEAN ENVIRONMENT")
print("-" * 40)
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
print("STATUS: Environment variables configured")

# Step 4: Now import fresh PySpark and create session
print("\nSTEP 4: CREATING FRESH SPARK SESSION")
print("-" * 40)
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Create session with unique app name
spark = SparkSession.builder \
    .appName(f"BerlinAirbnb_{int(time.time())}") \
    .master("local[1]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# Verify it works
print("\nRESULTS:")
print("-" * 40)
print("STATUS: Nuclear reset successful")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
print(f"App ID: {spark.sparkContext.applicationId}")
print("=" * 60)
print("SPARK SESSION READY FOR USE")
print("=" * 60)

NUCLEAR SPARK RESET - COMPLETE SYSTEM CLEANUP

STEP 1: TERMINATING ALL JAVA PROCESSES
----------------------------------------
STATUS: All Java processes terminated

STEP 2: CLEARING PYSPARK FROM MEMORY
----------------------------------------
STATUS: Removed 98 Spark-related modules from memory

STEP 3: CONFIGURING CLEAN ENVIRONMENT
----------------------------------------
STATUS: Environment variables configured

STEP 4: CREATING FRESH SPARK SESSION
----------------------------------------


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 17:09:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



RESULTS:
----------------------------------------
STATUS: Nuclear reset successful
Spark UI: http://192.168.1.30:4040
App ID: local-1754921377579
SPARK SESSION READY FOR USE


# Download Airbnb Data

In [8]:
# Get All Berlin Links (Including Archived Data)
# Click "show archived data" and extract all Berlin links

import requests
import re
import time

print("=" * 60)
print("GETTING ALL BERLIN LINKS FROM INSIDE AIRBNB")
print("=" * 60)

try:
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.chrome.options import Options
    
    # Set up headless Chrome
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    
    driver = webdriver.Chrome(options=chrome_options)
    driver.get("https://insideairbnb.com/get-the-data/")
    
    print("Looking for Berlin archived data button...")
    time.sleep(2)
    
    # Click the specific Berlin archived data button
    berlin_button = driver.find_element(By.CSS_SELECTOR, 'a.showArchivedData[data-city="Berlin"]')
    driver.execute_script("arguments[0].click();", berlin_button)
    time.sleep(3)  # Wait for content to load
    
    # Extract all Berlin links
    page_source = driver.page_source
    berlin_links = []
    
    # Find all URLs containing 'berlin'
    berlin_patterns = [
        r'https?://data\.insideairbnb\.com/germany/be/berlin/[^\s<>"\']*'
    ]
    
    for pattern in berlin_patterns:
        matches = re.findall(pattern, page_source, re.IGNORECASE)
        for match in matches:
            if match not in berlin_links:
                berlin_links.append(match)
    
    driver.quit()
    
    print(f"Found {len(berlin_links)} Berlin links:")
    print("-" * 60)
    
    for link in sorted(berlin_links):
        print(link)
        
except ImportError:
    print("ERROR: Selenium not installed. Install with: pip install selenium")
except Exception as e:
    print(f"ERROR: {e}")

print("\n" + "=" * 60)
print("COMPLETE")
print("=" * 60)

GETTING ALL BERLIN LINKS FROM INSIDE AIRBNB
Looking for Berlin archived data button...
Found 28 Berlin links:
------------------------------------------------------------
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/calendar.csv.gz
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/listings.csv.gz
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/reviews.csv.gz
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/visualisations/listings.csv
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/visualisations/neighbourhoods.csv
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/visualisations/neighbourhoods.geojson
https://data.insideairbnb.com/germany/be/berlin/2024-09-18/visualisations/reviews.csv
https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/calendar.csv.gz
https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/listings.csv.gz
https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/rev

Select files of concern

# File Download

In [9]:
# Filter Berlin Links - Only .csv.gz Files
# Extract only the compressed data files for analysis

print("=" * 60)
print("FILTERING FOR .CSV.GZ FILES ONLY")
print("=" * 60)

# Filter berlin_links from previous cell for .csv.gz files only
try:
    csv_gz_links = [link for link in berlin_links if link.endswith('.csv.gz')]
    
    print(f"Found {len(csv_gz_links)} .csv.gz files:")
    print("-" * 60)
    
    for link in sorted(csv_gz_links):
        # Extract filename and date for better readability
        filename = link.split('/')[-1]
        date = link.split('/')[-3] if len(link.split('/')) >= 3 else 'unknown'
        print(f"{date} - {filename}")
        print(f"  {link}")
    
except NameError:
    print("ERROR: Run the previous cell first to get berlin_links")

print("\n" + "=" * 60)
print("CSV.GZ FILTERING COMPLETE")
print("=" * 60)

FILTERING FOR .CSV.GZ FILES ONLY
Found 12 .csv.gz files:
------------------------------------------------------------
2024-09-18 - calendar.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/calendar.csv.gz
2024-09-18 - listings.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/listings.csv.gz
2024-09-18 - reviews.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-09-18/data/reviews.csv.gz
2024-12-21 - calendar.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/calendar.csv.gz
2024-12-21 - listings.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/listings.csv.gz
2024-12-21 - reviews.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2024-12-21/data/reviews.csv.gz
2025-03-15 - calendar.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2025-03-15/data/calendar.csv.gz
2025-03-15 - listings.csv.gz
  https://data.insideairbnb.com/germany/be/berlin/2025-03-15/data/listings.csv.g

# File Extraction

In [10]:
# Download CSV.GZ Files with Date Prefix
# Download all .csv.gz files with proper error handling

import os
import requests
import time

print("=" * 60)
print("DOWNLOADING .CSV.GZ FILES")
print("=" * 60)

# Create download folder
download_folder = "../data/csv_gz_files"
os.makedirs(download_folder, exist_ok=True)

# Dictionary to store downloaded files
downloaded_files = {}

# Download each file
for link in csv_gz_links[:2]:
    try:
        # Extract date and filename
        date = link.split('/')[-3]  # Extract date from URL
        filename = link.split('/')[-1]  # Extract filename
        new_filename = f"{date}_{filename}"  # Add date prefix
        
        print(f"Downloading: {new_filename}")
        
        # Download with error handling
        response = requests.get(link, stream=True)
        response.raise_for_status()  # Raise exception for HTTP errors
        
        # Save file
        file_path = os.path.join(download_folder, new_filename)
        with open(file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        # Add to dictionary
        downloaded_files[new_filename] = file_path
        
        print(f"✓ Downloaded: {new_filename}")
        time.sleep(5)  # Be polite to server
        
    except requests.exceptions.RequestException as e:
        print(f"✗ Failed to download {link}: {e}")
    except Exception as e:
        print(f"✗ Error with {link}: {e}")

print(f"\n{len(downloaded_files)} files downloaded successfully:")
for filename, path in downloaded_files.items():
    print(f"  {filename}: {path}")

print("\n" + "=" * 60)
print("DOWNLOAD COMPLETE")
print("=" * 60)

DOWNLOADING .CSV.GZ FILES
Downloading: 2025-06-20_listings.csv.gz
✓ Downloaded: 2025-06-20_listings.csv.gz
Downloading: 2025-06-20_calendar.csv.gz
✓ Downloaded: 2025-06-20_calendar.csv.gz

2 files downloaded successfully:
  2025-06-20_listings.csv.gz: ../data/csv_gz_files/2025-06-20_listings.csv.gz
  2025-06-20_calendar.csv.gz: ../data/csv_gz_files/2025-06-20_calendar.csv.gz

DOWNLOAD COMPLETE


# UNZIP

In [11]:
# Unzip CSV.GZ Files to Raw Data Folder
# Extract all downloaded .csv.gz files

import gzip
import shutil
import os

print("=" * 60)
print("UNZIPPING CSV.GZ FILES")
print("=" * 60)

# Create raw data folder
raw_folder = "../data/raw"
os.makedirs(raw_folder, exist_ok=True)

# Dictionary to store unzipped files
unzipped_files = {}

# Unzip each file
for filename, gz_path in downloaded_files.items():
    try:
        # Remove .gz extension for output filename
        csv_filename = filename.replace('.gz', '')
        csv_path = os.path.join(raw_folder, csv_filename)
        
        print(f"Unzipping: {filename} → {csv_filename}")
        
        # Unzip file
        with gzip.open(gz_path, 'rb') as f_in:
            with open(csv_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        
        unzipped_files[csv_filename] = csv_path
        print(f"✓ Unzipped: {csv_filename}")
        
    except Exception as e:
        print(f"✗ Failed to unzip {filename}: {e}")

print("\n" + "=" * 60)
print(f"UNZIP COMPLETE FOR {list(unzipped_files.keys())}")
print("=" * 60)

UNZIPPING CSV.GZ FILES
Unzipping: 2025-06-20_listings.csv.gz → 2025-06-20_listings.csv
✓ Unzipped: 2025-06-20_listings.csv
Unzipping: 2025-06-20_calendar.csv.gz → 2025-06-20_calendar.csv
✓ Unzipped: 2025-06-20_calendar.csv

UNZIP COMPLETE FOR ['2025-06-20_listings.csv', '2025-06-20_calendar.csv']


# Data Loading - LISTING

In [12]:
# Fetch csv with name listing using spark with proper parsing
from pyspark.sql import SparkSession
print("=" * 60)
print("FETCHING FIRST LISTING CSV WITH SPARK (CORRECTED PARSING)")
print("=" * 60)

listing_csv_path = unzipped_files[next((f for f in unzipped_files if "listings" in f), None)]

# Read CSV with proper parsing options
listing_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .csv(listing_csv_path)

# Get all columns first (before using all_columns)
all_columns = listing_df.columns

# How many rows/columns 
print("Number of rows and columns in the listing DataFrame:")
num_rows = listing_df.count()
num_columns = len(all_columns)
print(f"Rows: {num_rows}, Columns: {num_columns}")
print("=" * 60)

print("All columns in the listing DataFrame:")
print(all_columns)
print("=" * 60)

# print data types for all columns
print("Data types of each column:")
print(listing_df.dtypes)

# Show first 5 rows with limited columns to avoid display issues
print("First 5 rows")
listing_df.show(5)
print("=" * 60)

FETCHING FIRST LISTING CSV WITH SPARK (CORRECTED PARSING)
Number of rows and columns in the listing DataFrame:
Rows: 14187, Columns: 79
All columns in the listing DataFrame:
['id', 'listing_url', 'scrape_id', 'last_scraped', 'source', 'name', 'description', 'neighborhood_overview', 'picture_url', 'host_id', 'host_url', 'host_name', 'host_since', 'host_location', 'host_about', 'host_response_time', 'host_response_rate', 'host_acceptance_rate', 'host_is_superhost', 'host_thumbnail_url', 'host_picture_url', 'host_neighbourhood', 'host_listings_count', 'host_total_listings_count', 'host_verifications', 'host_has_profile_pic', 'host_identity_verified', 'neighbourhood', 'neighbourhood_cleansed', 'neighbourhood_group_cleansed', 'latitude', 'longitude', 'property_type', 'room_type', 'accommodates', 'bathrooms', 'bathrooms_text', 'bedrooms', 'beds', 'amenities', 'price', 'minimum_nights', 'maximum_nights', 'minimum_minimum_nights', 'maximum_minimum_nights', 'minimum_maximum_nights', 'maximum_ma

25/08/11 17:10:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----+--------------------+--------------+------------+---------------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+--------------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+---------------+----------------------+----------------------------+--------+---------+------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+---------------------+-----

# pre-processing

## Column Selection

In [None]:
from pyspark.sql.functions import col, sum as spark_sum, when

# Select comprehensive columns for analysis
selected_columns = [
    # Core identifiers and location
    "id", "host_id", "latitude", "longitude",
    
    # Neighborhood data
    "neighbourhood_group_cleansed",
    
    # Review and rating data
    "review_scores_rating", "review_scores_accuracy", "review_scores_cleanliness", 
    "review_scores_checkin", "review_scores_communication", "review_scores_location", 
    "review_scores_value", "reviews_per_month", "number_of_reviews",
    
    # Pricing and availability
    "price",
    
    # "availability_30", "availability_60", "availability_90", "availability_365",
    "minimum_nights", "maximum_nights",
    
    # Property characteristics
    "property_type", "amenities", "accommodates", "beds"
]

listing_df = listing_df.select(*selected_columns)

## Non-null Filtering

In [14]:
# Filter to keep only rows where critical columns are not null
critical_columns = ["price", "latitude", "longitude", "neighbourhood_group_cleansed"]

print(f"\nRows before filtering critical column: {listing_df.count()}")

# Apply filter - keep rows where ALL critical columns are not null
listing_df = listing_df.filter(
    col("price").isNotNull() 
    & ((col("latitude").isNotNull() & col("longitude").isNotNull()) 
        | col("neighbourhood_group_cleansed").isNotNull())
)

print(f"Rows after filtering (no nulls in critical columns): {listing_df.count()}")


Rows before filtering critical column: 14187
Rows after filtering (no nulls in critical columns): 9183


## Enforce Schema

In [None]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, LongType 

print("=" * 60)
print("ENFORCING DATA TYPES")
print("=" * 60)

# Show current schema before conversion
print("Current schema:")
print(listing_df.dtypes)

# Clean price column first (remove $ and commas)
listing_df = listing_df.withColumn("price", regexp_replace(col("price"), "[\$,]", ""))

# Inferred Schema
# [('id', 'bigint'), ('host_id', 'int'), ('latitude', 'double'), ('longitude', 'double'), ('neighbourhood', 'string'), ('neighbourhood_group_cleansed', 'string'), ('review_scores_rating', 'double'), ('review_scores_accuracy', 'double'), ('review_scores_cleanliness', 'double'), ('review_scores_checkin', 'double'), ('review_scores_communication', 'double'), ('review_scores_location', 'double'), ('review_scores_value', 'double'), ('reviews_per_month', 'double'), ('number_of_reviews', 'int'), ('price', 'string'), ('availability_30', 'int'), ('availability_60', 'int'), ('availability_90', 'int'), ('availability_365', 'int'), ('minimum_nights', 'int'), ('maximum_nights', 'int'), ('property_type', 'string'), ('room_type', 'string'), ('amenities', 'string'), ('accommodates', 'int'), ('beds', 'int')]


# Cast columns to appropriate data types
listing_df = listing_df \
    .withColumn("id", col("id").cast(LongType())) \
    .withColumn("host_id", col("host_id").cast(LongType())) \
    .withColumn("latitude", col("latitude").cast(DoubleType())) \
    .withColumn("longitude", col("longitude").cast(DoubleType())) \
    .withColumn("neighbourhood_group_cleansed", col("neighbourhood_group_cleansed").cast(StringType())) \
    .withColumn("review_scores_rating", col("review_scores_rating").cast(FloatType())) \
    .withColumn("review_scores_accuracy", col("review_scores_accuracy").cast(FloatType())) \
    .withColumn("review_scores_cleanliness", col("review_scores_cleanliness").cast(FloatType())) \
    .withColumn("review_scores_checkin", col("review_scores_checkin").cast(FloatType())) \
    .withColumn("review_scores_communication", col("review_scores_communication").cast(FloatType())) \
    .withColumn("review_scores_location", col("review_scores_location").cast(FloatType())) \
    .withColumn("review_scores_value", col("review_scores_value").cast(FloatType())) \
    .withColumn("reviews_per_month", col("reviews_per_month").cast(FloatType())) \
    .withColumn("number_of_reviews", col("number_of_reviews").cast(IntegerType())) \
    .withColumn("price", col("price").cast(FloatType())) \
    .withColumn("minimum_nights", col("minimum_nights").cast(IntegerType())) \
    .withColumn("maximum_nights", col("maximum_nights").cast(IntegerType())) \
    .withColumn("property_type", col("property_type").cast(StringType())) \
    .withColumn("amenities", col("amenities").cast(StringType())) \
    .withColumn("accommodates", col("accommodates").cast(IntegerType())) \
    .withColumn("beds", col("beds").cast(IntegerType())) \

print("\nSchema after data type conversion:")
print(listing_df.dtypes)

ENFORCING DATA TYPES
Current schema:
[('id', 'bigint'), ('host_id', 'int'), ('latitude', 'double'), ('longitude', 'double'), ('neighbourhood_group_cleansed', 'string'), ('review_scores_rating', 'double'), ('review_scores_accuracy', 'double'), ('review_scores_cleanliness', 'double'), ('review_scores_checkin', 'double'), ('review_scores_communication', 'double'), ('review_scores_location', 'double'), ('review_scores_value', 'double'), ('reviews_per_month', 'double'), ('number_of_reviews', 'int'), ('price', 'string'), ('minimum_nights', 'int'), ('maximum_nights', 'int'), ('property_type', 'string'), ('room_type', 'string'), ('amenities', 'string'), ('accommodates', 'int'), ('beds', 'int')]

Schema after data type conversion:
[('id', 'bigint'), ('host_id', 'bigint'), ('latitude', 'double'), ('longitude', 'double'), ('neighbourhood_group_cleansed', 'string'), ('review_scores_rating', 'float'), ('review_scores_accuracy', 'float'), ('review_scores_cleanliness', 'float'), ('review_scores_check

## 

### Renaming

In [16]:
# Renaming 
listing_df_renamed = listing_df
listing_df_renamed = listing_df_renamed.withColumnRenamed("id", "listing_id")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_rating", "rating_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_accuracy", "accuracy_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_cleanliness", "cleanliness_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_checkin", "checkin_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_communication", "communication_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_location", "location_score")
listing_df_renamed = listing_df_renamed.withColumnRenamed("review_scores_value", "value_score")

### Cleaning

In [17]:
# import trim 
listing_df_clean = listing_df_renamed

# Handle negative or zero prices
listing_df_clean = listing_df_clean.filter(col("price") > 10) 

# Berlin coordinate bounds validation
listing_df_clean = listing_df_clean.filter(
    (col("latitude").between(52.3, 52.7)) &   # Berlin latitude range
    (col("longitude").between(13.0, 13.8))    # Berlin longitude range
)

# Review scores should be between 0-5
review_cols = ["rating_score", "accuracy_score", "cleanliness_score", 
               "checkin_score", "communication_score", "location_score", "value_score"]

for col_name in review_cols:
    listing_df_clean = listing_df_clean.withColumn(
        col_name,
        when((col(col_name) >= 0) & (col(col_name) <= 5), col(col_name))
        .otherwise(None)
    )

### Extend Columns 

In [19]:
listing_df_grouped = listing_df_clean

# Property type cleansing   
from pyspark.sql.functions import when, col, regexp_extract

# Level 1: Broad Property Categories
listing_df_grouped = listing_df_grouped.withColumn("property_category",
    when(col("property_type").rlike("(?i)entire"), "Entire Property")
    .when(col("property_type").rlike("(?i)private room"), "Private Room")  
    .when(col("property_type").rlike("(?i)shared room"), "Shared Room")
    .when(col("property_type").rlike("(?i)room in"), "Hotel/Hostel Room")
    .otherwise("Other")
)


# Level 2: Property Type (Housing vs Commercial)
listing_df_grouped = listing_df_grouped.withColumn("accommodation_type",
    when(col("property_type").rlike("(?i)hotel|hostel|aparthotel"), "Commercial")
    .when(col("property_type").rlike("(?i)home|house|apartment|condo|villa|cabin|cottage|loft"), "Residential")
    .when(col("property_type").rlike("(?i)boat|treehouse|cave|dome|tiny|camper|rv"), "Unique/Alternative")
    .otherwise("Other")
)

# Level 3: Specific Property Subtype
listing_df_grouped = listing_df_grouped.withColumn("property_subtype",
    when(col("property_type").rlike("(?i)condo"), "Condo/Apartment")
    .when(col("property_type").rlike("(?i)house|home"), "House")
    .when(col("property_type").rlike("(?i)villa|mansion"), "Luxury")
    .when(col("property_type").rlike("(?i)hotel"), "Hotel")
    .when(col("property_type").rlike("(?i)hostel"), "Hostel")
    .when(col("property_type").rlike("(?i)boat|houseboat"), "Watercraft")
    .when(col("property_type").rlike("(?i)treehouse|cave|dome|tiny"), "Unique")
    .otherwise("Standard")
)

In [23]:
listing_df_final = listing_df_grouped

listing_df_final.count()

9180