# Smart Renovations for Maximum Airbnb Revenue

### Budget-Constrained Optimization to Identify Renovations That Maximize Nightly Price


## Input

In [0]:
user_state = 'New York'            #fill your state with its full name, with capital letters (i.e. 'New York')


user_ratings = 4.55               #fill the average rating of your airbnb
user_rating_cleanliness = 4.6     #fill the average cleanliness rating of your airbnb
user_rating_accuracy = 4.7        #fill the average accuracy rating of your airbnb
user_rating_check_in = 4.9        #fill the average check in rating of your airbnb
user_rating_communication = 4.9   #fill the average communication rating of your airbnb
user_rating_location = 4.8        #fill the average location  rating of your airbnb
user_rating_value = 4.7           #fill the average value rating of your airbnb
user_guests = 4                   #fill the amount of guests allowed




#In the following fields fill as so:
#fill with 0 if you don't have this amenity  
#fill with 1 if you do have this amenity
#fill with 2 if you don't have this amenity but you can't have it (i.e a playground in an apartment) or 
# if you don't want to have it (i.e don't want to deal with a pool)

user_air_conditioning = 0   # Air conditioning
user_bbq_grill = 0          # BBQ grill
user_basement = 0           # BasemenAt
user_coffee_maker = 1       # Coffee maker
user_crib = 0               # Crib
user_ev_charger = 0         # EV charger
user_elevator = 1           # Elevator
user_fan = 0                # Fan
user_gaming = 0             # Gaming console
user_gym = 0                # Gym
user_heating = 1            # Heating
user_hot_tub_sauna = 0      # Hot tub/sauna
user_microwave = 0          # Microwave
user_outdoor_space = 1      # Outdoor space
user_parking = 1            # Parking
user_pool = 1               # Pool
user_refrigerator = 1       # Refrigerator
user_safety_equipment = 1   # Safety equipment
user_scenic_view = 1        # Scenic view
user_sound_system = 0       # Sound system
user_stove_oven = 1         # Stove/oven
user_tv = 0                 # TV
user_washer_dryer = 1       # Washer/dryer
user_wifi_internet = 1      # WiFi/internet
user_workspace = 0          # Workspace (desk & chair)
user_bikes = 0              # Bikes
user_playground = 0         # Playground



# In the following field, add your budget in USD
user_budget = 5000


## HIDDEN

Reading the prices

In [0]:

file_path_base_cost_map = "Aviv_Oded_Ori/base_cost_map.csv"

sas_token_data = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"
storage_account = "lab94290"
container_submissions = "submissions"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token_data)

csv_url = f"abfss://{container_submissions}@{storage_account}.dfs.core.windows.net/{file_path_base_cost_map}"

base_cost_map_df = spark.read.option("header", "true").csv(csv_url)


# 4. Convert to local dictionary: key='item', value='base_cost' (as float)
base_cost_map = {row['item']: float(row['base_cost']) for row in base_cost_map_df.select("item", "base_cost").collect()}
# print(base_cost_map)

Reading the state factors

In [0]:
sas_token_data = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"
storage_account = "lab94290"
container_submissions = "submissions"
file_path_state_factors = "Aviv_Oded_Ori/state_factors.csv"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token_data)

csv_url = f"abfss://{container_submissions}@{storage_account}.dfs.core.windows.net/{file_path_state_factors}"

state_factors_df = spark.read.option("header", "true").csv(csv_url)
# display(state_factors_df)

Multiplying the prices by the state factor (according to the user's input)

In [0]:
factor = float(state_factors_df.filter(f"State_Full = '{user_state}'").select('COL_Factor').collect()[0][0])
amenity_costs = {k: float(v) * factor for k, v in base_cost_map.items()}

Loading the model weights

In [0]:
import requests
import shutil
import os
from pyspark.ml import PipelineModel

# --- 1. CONFIGURATION ---
storage_account = "lab94290"
container = "submissions"
folder_path = "Aviv_Oded_Ori"
zip_filename = "airbnb_rf_model_v1.zip"

# Your SAS Token (Exactly as you provided)
sas_token = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"

# --- 2. DIRECT HTTP DOWNLOAD ---
# We use 'blob.core.windows.net' to treat this as a simple web download
# This avoids all Spark/Hadoop permission complexities.
download_url = f"https://{storage_account}.blob.core.windows.net/{container}/{folder_path}/{zip_filename}?{sas_token}"

local_zip_path = f"/tmp/{zip_filename}"
local_unzip_dir = f"/tmp/{zip_filename.replace('.zip', '')}"
final_dbfs_path = f"dbfs:/FileStore/models/{zip_filename.replace('.zip', '')}"

print(f"1. Attempting Direct HTTP Download...")
print(f"   URL: https://{storage_account}.blob.core.windows.net/{container}/{folder_path}/{zip_filename}?[HIDDEN_TOKEN]")

# Request the file
response = requests.get(download_url, stream=True)

if response.status_code == 200:
    print("   ✅ Connection established. Downloading bytes...")
    with open(local_zip_path, 'wb') as f:
        shutil.copyfileobj(response.raw, f)
    print("   ✅ Download complete.")

    # --- 3. UNZIP AND MOVE ---
    print(f"2. Unzipping to {local_unzip_dir}...")
    shutil.unpack_archive(local_zip_path, local_unzip_dir)

    print(f"3. Moving model to DBFS ({final_dbfs_path})...")
    dbutils.fs.rm(final_dbfs_path, recurse=True) # Clean up old versions
    dbutils.fs.cp(f"file:{local_unzip_dir}", final_dbfs_path, recurse=True)

    # --- 4. LOAD MODEL ---
    print("4. Loading model into Spark...")
    try:
        loaded_model = PipelineModel.load(final_dbfs_path)
        loaded = True
        print("✅ Success! Model loaded into variable 'loaded_model'")
    except Exception as e:
        print(f"❌ Download worked, but Spark could not load the model folder: {e}")
        loaded = False
        
else:
    print(f"❌ Download Failed. HTTP Status Code: {response.status_code}")
    print(f"   Reason: {response.text}")
    loaded = False

1. Attempting Direct HTTP Download...
   URL: https://lab94290.blob.core.windows.net/submissions/Aviv_Oded_Ori/airbnb_rf_model_v1.zip?[HIDDEN_TOKEN]
   ✅ Connection established. Downloading bytes...
   ✅ Download complete.
2. Unzipping to /tmp/airbnb_rf_model_v1...
3. Moving model to DBFS (dbfs:/FileStore/models/airbnb_rf_model_v1)...
4. Loading model into Spark...
✅ Success! Model loaded into variable 'loaded_model'


In [0]:
base_row_dict = {
    'state': user_state,
    'guests': user_guests,
    'Air Conditioning': user_air_conditioning,
    'BBQ Grill': user_bbq_grill,
    'Basement': user_basement,
    'Coffee Maker': user_coffee_maker,
    'Crib': user_crib,
    'EV Charger': user_ev_charger,
    'Elevator': user_elevator,
    'Fan': user_fan,
    'Gaming': user_gaming,
    'Gym': user_gym,
    'Heating': user_heating,
    'Hot Tub/Sauna': user_hot_tub_sauna,
    'Microwave': user_microwave,
    'Outdoor Space': user_outdoor_space,
    'Parking': user_parking,
    'Pool': user_pool,
    'Refrigerator': user_refrigerator,
    'Safety Equipment': user_safety_equipment,
    'Scenic View': user_scenic_view,
    'Sound System': user_sound_system,
    'Stove/Oven': user_stove_oven,
    'TV': user_tv,
    'Washer/Dryer': user_washer_dryer,
    'Wifi/Internet': user_wifi_internet,
    'Workspace': user_workspace,
    'bikes': user_bikes,
    'playground': user_playground,
    'rating_Cleanliness': user_rating_cleanliness,
    'rating_Accuracy': user_rating_accuracy,
    'rating_Check_in': user_rating_check_in,
    'rating_Communication': user_rating_communication,
    'rating_Location': user_rating_location,
    'rating_Value': user_rating_value,
    'rating_clean': user_ratings
}

not_check = ['state', 'guests', 'rating_Cleanliness', 'rating_Accuracy', 'rating_Check_in', 'rating_Communication', 'rating_Location', 'rating_Value', 'rating_clean']

forbidden = []
for k,v in base_row_dict.items():
    if k != 'state':
        base_row_dict[k] = float(v)
    if k not in not_check and v == 2:
        v = 0
        base_row_dict[k] = 0
        forbidden.append(k)
        
print(forbidden)

base_row_df = spark.createDataFrame([base_row_dict])
# display(base_row_df)

[]


Hill Climb + Beam Search

In [0]:
from pyspark.sql import functions as F
import heapq
import math


def optimize_amenities_beam(base_row_df, model, budget, amenity_costs, forbidden_amenities=None, B=3):
    """
    Optimizes rental property amenities using Beam Search with a Spark Model,
    respecting a list of forbidden upgrades.
    
    Parameters:
    - spark: Active SparkSession
    - base_row_df: A 1-row Spark DataFrame representing the property's current state.
    - model: Trained Spark ML PipelineModel.
    - budget: Float, maximum amount to spend.
    - amenity_costs: Dict { 'AmenityName': Price(float) }.
    - forbidden_amenities: List[str] of amenities that cannot be installed.
    - B: Int, Beam Width (hyperparameter).
    """

    # --- 1. SETUP & INITIAL SCAN ---
    base_row = base_row_df.collect()[0]
    base_features = base_row.asDict()
    
    # Handle the forbidden list (convert to set for faster lookup)
    forbidden_set = set(forbidden_amenities) if forbidden_amenities else set()
    
    # Get Baseline Price
    base_pred_log = model.transform(base_row_df).select("prediction").collect()[0][0]
    base_pred = math.expm1(base_pred_log)
    
    print(f"--- Starting Optimization (Beam Width={B}) ---")
    print(f"Initial Price: ${base_pred:.2f} | Budget: ${budget}")
    if forbidden_set:
        print(f"Excluding: {forbidden_set}")

    # Define the universe of valid upgrades:
    # 1. Must be in our cost dictionary
    # 2. Must NOT be in the forbidden list (NEW CHECK)
    # 3. Must NOT already exist in the property (value == 0)
    # 4. Must be affordable (Cost <= Budget)
    possible_upgrades = [
        am for am in amenity_costs.keys()
        if am in base_features          # Ensure column exists in data
        and am not in forbidden_set     # <--- NEW CONSTRAINT
        and base_features[am] == 0      # User doesn't have it yet
        and amenity_costs[am] <= budget # Affordable
    ]
    
    print(f"Identified {len(possible_upgrades)} valid potential upgrades.")

    # The "Beam" (List of candidate objects)
    current_beam = [{
        "added_amenities": set(),
        "cost": 0.0,
        "price": base_pred,
        "features": base_features
    }]

    best_solution = current_beam[0]
    step = 0
    
    while True:
        step += 1
        next_step_candidates = {} # Deduplication dict (Key: frozenset)
        
        batch_rows_data = []
        batch_metadata = [] 

        # --- 2. EXPANSION ---
        for parent_candidate in current_beam:
            
            # Find amenities we can add to THIS path
            valid_moves = [
                am for am in possible_upgrades
                if am not in parent_candidate["added_amenities"] 
                and (parent_candidate["cost"] + amenity_costs[am]) <= budget
            ]
            
            for move in valid_moves:
                # Create State Key (Deduplication)
                new_set = parent_candidate["added_amenities"].union({move})
                state_key = frozenset(new_set)
                
                if state_key in next_step_candidates:
                    continue
                
                # Construct new features
                new_features = parent_candidate["features"].copy()
                new_features[move] = 1.0
                new_features["_temp_id"] = len(batch_rows_data)
                
                batch_rows_data.append(new_features)
                batch_metadata.append({
                    "id": new_features["_temp_id"],
                    "amenities": new_set,
                    "cost": parent_candidate["cost"] + amenity_costs[move],
                    "features": new_features
                })
                
                next_step_candidates[state_key] = None 

        # --- 3. BATCH PREDICTION ---
        if not batch_rows_data:
            print("No further affordable improvements found.")
            break
            
        print(f"Step {step}: Evaluating {len(batch_rows_data)} unique combinations...")
        
        batch_df = spark.createDataFrame(batch_rows_data)
        predictions = model.transform(batch_df).select("_temp_id", "prediction").collect()
        pred_map = {row["_temp_id"]: row["prediction"] for row in predictions}

        # --- 4. SELECTION ---
        valid_candidates_list = []
        
        for meta in batch_metadata:
            pred_log = pred_map.get(meta["id"])
            if pred_log is None: continue 

            pred_price = math.expm1(pred_log)

            # Hill Climb Check: Only keep if price improves
            if pred_price > best_solution["price"]: 
                candidate_obj = {
                    "added_amenities": meta["amenities"],
                    "cost": meta["cost"],
                    "price": pred_price,
                    "features": meta["features"]
                }
                valid_candidates_list.append(candidate_obj)
                
                if pred_price > best_solution["price"]:
                    best_solution = candidate_obj

        # --- 5. PRUNING ---
        if not valid_candidates_list:
            print("No moves improved the price. Stopping.")
            break
            
        valid_candidates_list.sort(key=lambda x: x["price"], reverse=True)
        current_beam = valid_candidates_list[:B]
        
        print(f"   -> Top Choice: {list(current_beam[0]['added_amenities'])} "
              f"(Price: ${current_beam[0]['price']:.2f})")

    # --- 6. FINAL OUTPUT ---
    return {
        "final_amenities": list(best_solution["added_amenities"]),
        "original_price": base_pred,
        "final_predicted_price": best_solution["price"],
        "price_lift": best_solution["price"] - base_pred,
        "total_cost": best_solution["cost"],
        "roi_ratio": (best_solution["price"] - base_pred) / best_solution["cost"] if best_solution["cost"] > 0 else 0
    }

In [0]:
from pyspark.sql.functions import *

# --- GENERATE UI ---
def render_optimization_card(res):
    # Formatting
    roi_pct = res['roi_ratio'] * 100
    lift = res['price_lift']
    cost = res['total_cost']
    orig = res['original_price']
    new_p = res['final_predicted_price']
    
    # Generate Amenity Badges
    badges_html = ""
    for item in res['final_amenities']:
        badges_html += f"""
            <span style="background-color: #e3f2fd; color: #1565c0; 
                         padding: 5px 12px; border-radius: 15px; 
                         font-size: 14px; font-weight: 500; margin-right: 5px; 
                         display: inline-block; margin-bottom: 5px;">
                + {item}
            </span>
        """

    # HTML Template
    html_content = f"""
    <div style="font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif;
                max-width: 700px; background: white; border-radius: 12px; 
                box-shadow: 0 4px 20px rgba(0,0,0,0.08); border: 1px solid #e0e0e0; overflow: hidden; margin: 10px 0;">
        
        <div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; color: white;">
            <h2 style="margin: 0; font-size: 20px; font-weight: 600;">✨ Optimization Complete</h2>
            <p style="margin: 5px 0 0 0; opacity: 0.9; font-size: 14px;">Best upgrade path found for your budget</p>
        </div>

        <div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 20px; padding: 25px; border-bottom: 1px solid #f0f0f0;">
            <div style="text-align: center;">
                <div style="font-size: 12px; color: #666; text-transform: uppercase; letter-spacing: 1px; font-weight: bold;">ROI Ratio</div>
                <div style="font-size: 28px; font-weight: 700; color: #2e7d32; margin-top: 5px;">{roi_pct:.3f}%</div>
                <div style="font-size: 12px; color: #2e7d32;">Return Efficiency</div>
            </div>
            <div style="text-align: center; border-left: 1px solid #eee; border-right: 1px solid #eee;">
                <div style="font-size: 12px; color: #666; text-transform: uppercase; letter-spacing: 1px; font-weight: bold;">Price Lift</div>
                <div style="font-size: 28px; font-weight: 700; color: #1565c0; margin-top: 5px;">+${lift:.2f}</div>
                <div style="font-size: 12px; color: #666;">Per Night</div>
            </div>
            <div style="text-align: center;">
                <div style="font-size: 12px; color: #666; text-transform: uppercase; letter-spacing: 1px; font-weight: bold;">Total Cost</div>
                <div style="font-size: 28px; font-weight: 700; color: #424242; margin-top: 5px;">${cost:,.0f}</div>
                <div style="font-size: 12px; color: #666;">Investment</div>
            </div>
        </div>

        <div style="padding: 25px;">
            <div style="margin-bottom: 20px;">
                <div style="font-size: 14px; color: #444; font-weight: 600; margin-bottom: 10px;">Recommended Installations:</div>
                <div>{badges_html}</div>
            </div>
            
            <div style="background: #fafafa; padding: 15px; border-radius: 8px; display: flex; justify-content: space-between; align-items: center;">
                <div>
                    <span style="color: #666; font-size: 14px;">Original Price:</span>
                    <strong style="color: #333; margin-left: 5px;">${orig:.2f}</strong>
                </div>
                <div style="color: #999;">➜</div>
                <div>
                    <span style="color: #666; font-size: 14px;">New Price:</span>
                    <strong style="color: #333; margin-left: 5px;">${new_p:.2f}</strong>
                </div>
            </div>
        </div>
    </div>
    """
    displayHTML(html_content)

In [0]:
    output = optimize_amenities_beam(base_row_df, loaded_model, user_budget, amenity_costs, forbidden, B=20)

--- Starting Optimization (Beam Width=20) ---
Initial Price: $163.01 | Budget: $5000
Identified 11 valid potential upgrades.
Step 1: Evaluating 11 unique combinations...
   -> Top Choice: ['Sound System'] (Price: $167.85)
Step 2: Evaluating 10 unique combinations...
   -> Top Choice: ['Sound System', 'BBQ Grill'] (Price: $177.04)
Step 3: Evaluating 6 unique combinations...
   -> Top Choice: ['Sound System', 'Crib', 'BBQ Grill'] (Price: $179.70)
Step 4: Evaluating 6 unique combinations...
   -> Top Choice: ['Sound System', 'Crib', 'Microwave', 'BBQ Grill'] (Price: $181.59)
No further affordable improvements found.


## OUTPUT

In [0]:
if loaded:
    render_optimization_card(output)

## HIDDEN

In [0]:
if loaded:
    dbutils.notebook.exit("Stopping execution here")


###Reading and preprocessing the data from `Airbnb.com`

In [0]:
storage_account = "lab94290"  
container = "airbnb"

In [0]:
sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"
sas_token = sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

In [0]:
path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/airbnb_1_12_parquet"

airbnb = spark.read.parquet(path)

In [0]:
from pyspark.sql import functions as F


airbnb = (
    airbnb
    .withColumn("arr", F.split(F.col("location"), ","))   # split into array
    .withColumn("country", F.element_at(F.col("arr"), -1))
    .withColumn("state", F.element_at(F.col("arr"), -2))
    .drop("arr")
)

We are only interested in countries from the US

In [0]:
airbnb_usa = airbnb.filter("country == ' United States'")
airbnb_usa = airbnb_usa.select("name", "price", "ratings", "location", "lat", "long", "state", "amenities", 'guests','category_rating')
display(airbnb_usa.limit(100))

Extracting the state from the coordinates, as is much more accurate and reliable than using the raw data

In [0]:
%pip install reverse_geocoder

In [0]:
import reverse_geocoder as rg
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, StructType, StructField

# --- 1. DEFINE SCHEMA FOR EFFICIENT RETURN ---
# We define a structure so the function can return two values at once
location_schema = StructType([
    StructField("state", StringType(), True),
    StructField("city_state", StringType(), True)
])

# --- 2. DEFINE PYTHON FUNCTION ---
def get_geo_data_pure(lat, lon):
    # Handle Nulls
    if lat is None or lon is None:
        return None
    
    try:
        # mode=1 gives the single nearest result
        results = rg.search((lat, lon), mode=1)
        
        # Extract components
        city = results[0]['name']
        state = results[0]['admin1']
        
        # Return tuple matching the schema: (state, city_state)
        return (state, f"{city}, {state}")
    except:
        return None

# --- 3. REGISTER UDF ---
get_geo_udf = udf(get_geo_data_pure, location_schema)

# --- 4. APPLY TO DATAFRAME ---
# Step A: Cast coordinates to double
df_processed = airbnb_usa.withColumn("lat", col("lat").cast("double")) \
                         .withColumn("long", col("long").cast("double"))

# Step B: Call UDF once to create a temporary struct column
df_processed = df_processed.withColumn("geo_temp", get_geo_udf(col("lat"), col("long")))

# Step C: Unpack the struct into your two desired columns
airbnb_usa = df_processed.withColumn("state", col("geo_temp.state")) \
                               .withColumn("city_state", col("geo_temp.city_state")) \
                               .drop("geo_temp")

In [0]:
states_list = ['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut', 'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New Hampshire', 'New Jersey', 'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming']

airbnb_usa = airbnb_usa.filter(col("state").isin(states_list))

### Loading the scraped data from `bestplaces.net`

In [0]:
sas_token_data = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"
storage_account = "lab94290"
container_submissions = "submissions"
file_path_state_factors = "Aviv_Oded_Ori/state_factors.csv"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token_data)

csv_url = f"abfss://{container_submissions}@{storage_account}.dfs.core.windows.net/{file_path_state_factors}"

state_factors_df = spark.read.option("header", "true").csv(csv_url)
# display(state_factors_df)

### Extracting amenities and categorizing them

In [0]:
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# 1. DEFINE THE SCHEMA
json_schema = ArrayType(
    StructType([
        StructField("group_name", StringType(), True),
        StructField("items", ArrayType(
            StructType([
                StructField("name", StringType(), True),
                StructField("value", StringType(), True)
            ])
        ), True)
    ])
)

# 2. OPTIMIZATION: Get only unique amenity strings first!
unique_amenities_df = airbnb_usa.select("amenities").dropDuplicates()

# 3. PARSE AND EXPLODE
distinct_amenities_df = (unique_amenities_df
    # Parse the JSON
    .withColumn("parsed_amenities", from_json(col("amenities"), json_schema))
    # Explode Groups (e.g., "Bathroom", "Kitchen")
    .withColumn("group", explode(col("parsed_amenities")))
    # Explode Items (e.g., "Shampoo", "Oven")
    .withColumn("item", explode(col("group.items")))
    # Select just the name
    .select(col("item.name").alias("amenity_name"))
    # Final cleanup to remove duplicates in the resulting names
    .distinct()
    .orderBy("amenity_name")
)


Categorization

In [0]:
from pyspark.sql.functions import udf, col, lower
from pyspark.sql.types import StringType

# 1. Define the Python Logic (No Pandas)
def map_amenity_logic(text):
    if text is None:
        return None
        
    # Normalize
    text = str(text).lower().strip()
    
    # --- EXCLUSION LIST (Simple Amenities) ---
    # These return None so we can filter them out later
    excludes = [
        'shampoo', 'conditioner', 'soap', 'body wash', 'shower gel', 
        'toilet paper', 'towel', 'linen', 'pillow', 'blanket', 'sheet', 
        'hanger', 'trash', 'paper towel', 'salt', 'pepper', 'oil', 'spice',
        'toothbrush', 'toothpaste', 'cotton', 'tissue', 'toiletries', 'boooks', 'toys', 'ski', 'housekeeping', 'butler'
    ]
    if any(x in text for x in excludes):
        return None 

    # --- MAPPING RULES ---
    
    # 1. Luxury / Water
    if 'pool' in text: return 'Pool'
    if any(x in text for x in ['hot tub', 'jacuzzi', 'sauna', 'steam room']): return 'Hot Tub/Sauna'
    
    # 2. Tech & Entertainment
    if any(x in text for x in ['tv', 'television', 'hdtv', 'netflix', 'roku', 'hbo', 'apple tv', 'cable', '4k']): return 'TV'
    if any(x in text for x in ['wifi','wi-fi', 'internet', 'ethernet', 'broadband']): return 'Wifi/Internet'
    if any(x in text for x in ['sound system', 'speaker', 'bluetooth', 'sound bar', 'sonos']): return 'Sound System'
    if any(x in text for x in ['game', 'console', 'ps4', 'xbox', 'nintendo', 'arcade']): return 'Gaming'
    
    # 3. Major Appliances
    if any(x in text for x in ['washer', 'dryer', 'laundry', 'washing machine']): return 'Washer/Dryer'
    if any(x in text for x in ['dishwasher']): return 'Dishwasher'
    if any(x in text for x in ['fridge', 'refrigerator', 'freezer']): return 'Refrigerator'
    if any(x in text for x in ['stove', 'oven', 'cooktop', 'range', 'cooker', 'hot plate']): return 'Stove/Oven'
    if any(x in text for x in ['Rice cooker', 'Rice maker', 'rice cooker', 'Rice maker']): return 'Rice Cooker'

    if any(x in text for x in ['microwave']): return 'Microwave'
    if any(x in text for x in ['coffee', 'espresso', 'keurig', 'nespresso']): return 'Coffee Maker'
    if any(x in text for x in ['park', 'parking']): return 'Parking'
    if any(x in text for x in ['cellar']): return 'Basement'

    
    # 4. Climate Control
    if any(x in text for x in ['air condition', 'a/c', 'ac unit', 'hvac', 'mini split', ' ac ']): return 'Air Conditioning'
    if any(x in text for x in ['heat', 'fireplace', 'wood stove', 'pellet stove']): return 'Heating'
    if any(x in text for x in ['fan', 'ceiling fan']): return 'Fan'
    
    # 5. Facilities
    if any(x in text for x in ['gym', 'fitness', 'exercise', 'weight', 'treadmill', 'yoga', 'peloton']): return 'Gym'
    if any(x in text for x in ['parking', 'garage', 'carport', 'driveway']): return 'Parking'
    if any(x in text for x in ['elevator', 'lift']): return 'Elevator'
    if any(x in text for x in ['desk', 'monitor', 'office', 'workspace']): return 'Workspace'
    
    # 6. Outdoor
    if any(x in text for x in ['bbq', 'grill', 'barbecue']): return 'BBQ Grill'
    if any(x in text for x in ['patio', 'balcony', 'terrace', 'deck', 'backyard', 'garden']): return 'Outdoor Space'
    if any(x in text for x in ['view', 'ocean', 'lake', 'mountain', 'waterfront']): return 'Scenic View'

    # 7. Safety (Optional - keep if you want to track safety features)
    if any(x in text for x in ['detector', 'alarm', 'extinguisher', 'aid kit']): return 'Safety Equipment'
    if any(x in text for x in ['EV charger', 'EV-charger', 'ev charger']): return 'EV Charger'
    if any(x in text for x in ['crib', 'Crib','paid crib']): return 'Crib'
    if any(x in text for x in ['bike', 'bikes','Bike', 'Bikes']): return 'bikes'
    if any(x in text for x in ['playground', 'Playground']): return 'playground'




    return "Other"

# 2. Register the UDF
# We specify StringType() as the return type
clean_amenity_udf = udf(map_amenity_logic, StringType())

In [0]:
sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"
sas_token = sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

In [0]:
from pyspark.sql import functions as F

if "id" not in airbnb_usa.columns:
    df_main = airbnb_usa.withColumn("listing_id_temp", F.monotonically_increasing_id())
    id_col = "listing_id_temp"
else:
    df_main = airbnb_usa
    id_col = "id"

# A. Parse JSON & Explode
# 1. Parse string -> Array of Groups
df_parsed = df_main.withColumn("parsed_amenities", F.from_json(F.col("amenities"), json_schema))

# 2. Explode Nested Structure (Groups -> Items)
df_exploded = df_parsed \
    .select(id_col, F.explode("parsed_amenities").alias("group")) \
    .select(id_col, F.explode("group.items").alias("item")) \
    .select(id_col, F.col("item.name").alias("raw_amenity_name"))

# B. Apply Categorization
df_mapped = df_exploded.withColumn("clean_category", clean_amenity_udf(F.col("raw_amenity_name")))

# C. Filter (Remove Nulls/Simples) & Pivot
# We pivot on 'clean_category' to create columns like 'TV', 'Pool'
df_features = df_mapped \
    .filter(F.col("clean_category").isNotNull()) \
    .groupBy(id_col) \
    .pivot("clean_category") \
    .agg(F.lit(1)) \
    .na.fill(0)

# --- 4. JOIN BACK (Optional) ---
# Join the feature vectors back to the original dataframe
# Using left join to keep all original rows, filling missing features with 0
final_df = df_main.join(df_features, on=id_col, how="left").na.fill(0)

# Display
# display(final_df.limit(100))

In [0]:
final_df = final_df.drop("Other")
# display(final_df.limit(100))

In [0]:
final_df = final_df.dropna(subset=['guests'])
final_df = final_df.withColumn("guests", col("guests").cast("float"))

In [0]:
from pyspark.sql.functions import from_json, col, map_from_arrays
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# 1. DEFINE SCHEMA
rating_schema = ArrayType(
    StructType([
        StructField("name", StringType(), True),
        StructField("value", StringType(), True)
    ])
)

# 2. PARSE AND EXTRACT (Updating final_df directly)
# First, create the map helper columns
final_df = final_df.withColumn("parsed_ratings", from_json(col("category_rating"), rating_schema)) \
                   .withColumn("ratings_map", map_from_arrays(col("parsed_ratings.name"), col("parsed_ratings.value")))

# 3. CREATE INDIVIDUAL COLUMNS
target_ratings = ["Cleanliness", "Accuracy", "Check-in", "Communication", "Location", "Value"]
new_rating_cols = []

for rating_name in target_ratings:
    # Define the new column name (e.g., rating_Cleanliness)
    col_name = f"rating_{rating_name.replace('-', '_')}"
    new_rating_cols.append(col_name)
    
    # Add it to final_df
    final_df = final_df.withColumn(
        col_name, 
        col("ratings_map").getItem(rating_name).cast("float")
    )

# 4. CLEAN UP
# Fill missing values with 0 and remove the temporary helper columns
final_df = final_df.fillna(0, subset=new_rating_cols) \
                   .drop("parsed_ratings", "ratings_map")

# 5. VERIFY
print("Columns added successfully!")
# display(final_df.limit(100))

### Training The Model (Random Forest)

In [0]:
import mlflow
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, log1p, expm1
from pyspark.ml.evaluation import RegressionEvaluator
import os

rf = RandomForestRegressor(
    featuresCol="features", 
    labelCol="log_label",   # Train on LOG price
    numTrees=100,           
    maxDepth=12,            # 12 is a sweet spot for speed vs accuracy
    seed=42
)

loaded = False

from pyspark.ml import PipelineModel

model_path = "/Workspace/Users/orihillel@campus.technion.ac.il/94290_Databricks/databricks-course/notebooks/airbnb_price_model_v1"
print(f"Loading model from {model_path}...")

if os.path.exists(model_path):
    try:
        model = PipelineModel.load(model_path)
        loaded = True
        print("Model loaded successfully!")
    except Exception as e:
        print(f"Model loading failed: {e}")
else:
    print("Model path does not exist. Model not loaded.")

In [0]:
import matplotlib.pyplot as plt
import numpy as np

# Check the data type and sample values from final_df
final_df_2 = final_df.limit(1000).filter(col('price') > 0)
price_data = [float(row['price']) for row in final_df_2.select("price").collect()]

# Calculate upper fence
q1 = np.percentile(price_data, 25)
q3 = np.percentile(price_data, 75)
iqr = q3 - q1
upper_fence = q3 + 1.5 * iqr
print("Upper fence value:", upper_fence)

plt.boxplot(price_data, vert=True)
plt.title("Price Distribution")
plt.xlabel("Price")
plt.axhline(upper_fence, color='red', linestyle='--', label=f'Upper Fence: {upper_fence:.2f}')
plt.legend()
plt.show()


In [0]:
# --- FIX 1: FILTER OUTLIERS ---
# Keep only listings > $0 and <= $2000
filtered_df = final_df.filter((col("price") > 0) & (col("price") <= 2000))

# Prepare Data
model_df = filtered_df \
    .withColumn("label", col("price").cast("float")) \
    .withColumn("log_label", log1p(col("price").cast("float"))) \
    .withColumn("rating_clean", col("ratings").cast("float")) \
    .fillna(0, subset=["rating_clean"]) \
    .dropna(subset=["label", "log_label"])

train_data, test_data = model_df.randomSplit([0.8, 0.2], seed=42)

# --- FIX 2: LOCATION (STATE ONLY) ---
# We removed City to make this run fast (minutes instead of hours)
state_indexer = StringIndexer(inputCol="State", outputCol="State_Index", handleInvalid="keep")
state_encoder = OneHotEncoder(inputCols=["State_Index"], outputCols=["State_Vec"])

# Define inputs
amenities = [
    'Air Conditioning', 'BBQ Grill', 'Basement', 'Coffee Maker', 'Crib', 
    'EV Charger', 'Elevator', 'Fan', 'Gaming', 'Gym', 'Heating', 
    'Hot Tub/Sauna', 'Microwave', 'Outdoor Space', 'Parking', 'Pool', 
    'Refrigerator', 'Safety Equipment', 'Scenic View', 'Sound System', 
    'Stove/Oven', 'TV', 'Washer/Dryer', 'Wifi/Internet', 'Workspace', 
    'bikes', 'playground', 'guests', 
    'rating_Cleanliness', 'rating_Accuracy', 'rating_Check_in', 
    'rating_Communication', 'rating_Location', 'rating_Value'
]

# Add 'State_Vec' (City is gone)
all_inputs = amenities + ['State_Vec', 'rating_clean']
all_inputs = [c for c in all_inputs if c and c.strip() != ""]

assembler = VectorAssembler(inputCols=all_inputs, outputCol="features", handleInvalid="skip")

# --- FIX 3: INCREASE MODEL POWER ---

# Pipeline (City stages removed)
pipeline = Pipeline(stages=[state_indexer, state_encoder, assembler, rf])

print("Training Optimized Model (State Only)...")
model = pipeline.fit(train_data)
print("Training Complete.")

# --- EVALUATE (Convert back to Dollars) ---
predictions = model.transform(test_data)

# Convert log predictions back to real dollars to calculate RMSE
predictions = predictions.withColumn("prediction_dollars", expm1("prediction"))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction_dollars", metricName="rmse")
rmse = evaluator.evaluate(predictions)
r2_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction_dollars", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)

print(f"New RMSE: ${rmse:.2f}")
print(f"R-Squared: {r2:.2%}")
display(predictions.select("label", "prediction_dollars", "State"))

In [0]:
# --- Configuration ---
# Choose a unique path so it doesn't clash with others
# 'dbfs:/' is the protocol Spark uses to talk to the file system
SAVE_PATH = "dbfs:/FileStore/models/airbnb_rf_model_v1"

print(f"Saving model to: {SAVE_PATH} ...")

# --- Saving ---
# 'model' is the variable name of your trained model (e.g., from pipeline.fit or crossval.fit)
model.write().overwrite().save(SAVE_PATH)

print("✅ Model saved successfully!")

In [0]:
from pyspark.ml import PipelineModel
from pyspark.ml.tuning import CrossValidatorModel

# --- Configuration ---
# Must match the path used above
LOAD_PATH = "dbfs:/FileStore/models/airbnb_rf_model_v1"

print(f"Attempting to load model from: {LOAD_PATH} ...")

try:
    # --- Loading ---
    # option A: Use PipelineModel if you used pipeline.fit()
    loaded_model = PipelineModel.load(LOAD_PATH)
    
    # option B: Use CrossValidatorModel if you used crossval.fit()
    # loaded_model = CrossValidatorModel.load(LOAD_PATH)

    print("✅ Model loaded successfully!")
    print(f"Model type: {type(loaded_model)}")
    
    # Optional: Print a stage to prove it's real
    # print(loaded_model.stages[-1]) 

except Exception as e:
    print(f"❌ Error loading model: {e}")

### Loading the scraped data from `costco.com`

In [0]:
file_path_costco = "Aviv_Oded_Ori/costco.csv"

sas_token_data = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"
storage_account = "lab94290"
container_submissions = "submissions"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token_data)
csv_url = f"abfss://{container_submissions}@{storage_account}.dfs.core.windows.net/{file_path_costco}"

costco_df = spark.read.option("header", "true").csv(csv_url)
# display(costco_df)

In [0]:
sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"
sas_token = sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

In [0]:
#taking the median of the costco prices
from pyspark.sql.functions import col, expr
median_costs_df = costco_df.groupBy("Keyword") \
    .agg(
        expr("percentile_approx(Price, 0.5)").alias("Median_Cost")
    )

# 3. View the results
# display(median_costs_df)

### Hill climbing + Beam Search

Loading the aggregated prices from websites such as Costco, Fixr, Target, etc.

In [0]:

file_path_base_cost_map = "Aviv_Oded_Ori/base_cost_map.csv"

sas_token_data = "sp=rle&st=2026-01-25T10:55:58Z&se=2026-03-01T19:10:58Z&spr=https&sv=2024-11-04&sr=c&sig=jgt2r2TSHpDaCyEfTEgHAfkvEvy49xReFDS4Mg9KnOA%3D"
storage_account = "lab94290"
container_submissions = "submissions"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token_data)

csv_url = f"abfss://{container_submissions}@{storage_account}.dfs.core.windows.net/{file_path_base_cost_map}"

base_cost_map_df = spark.read.option("header", "true").csv(csv_url)


# 4. Convert to local dictionary: key='item', value='base_cost' (as float)
base_cost_map = {row['item']: float(row['base_cost']) for row in base_cost_map_df.select("item", "base_cost").collect()}
# print(base_cost_map)

In [0]:
factor = float(state_factors_df.filter(f"State_Full = '{user_state}'").select('COL_Factor').collect()[0][0])
amenity_costs = {k: float(v) * factor for k, v in base_cost_map.items()}

In [0]:
sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"
sas_token = sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

Read the input data, and create the row which will be inserted into the model

In [0]:
base_row_dict = {
    'state': user_state,
    'guests': user_guests,
    'Air Conditioning': user_air_conditioning,
    'BBQ Grill': user_bbq_grill,
    'Basement': user_basement,
    'Coffee Maker': user_coffee_maker,
    'Crib': user_crib,
    'EV Charger': user_ev_charger,
    'Elevator': user_elevator,
    'Fan': user_fan,
    'Gaming': user_gaming,
    'Gym': user_gym,
    'Heating': user_heating,
    'Hot Tub/Sauna': user_hot_tub_sauna,
    'Microwave': user_microwave,
    'Outdoor Space': user_outdoor_space,
    'Parking': user_parking,
    'Pool': user_pool,
    'Refrigerator': user_refrigerator,
    'Safety Equipment': user_safety_equipment,
    'Scenic View': user_scenic_view,
    'Sound System': user_sound_system,
    'Stove/Oven': user_stove_oven,
    'TV': user_tv,
    'Washer/Dryer': user_washer_dryer,
    'Wifi/Internet': user_wifi_internet,
    'Workspace': user_workspace,
    'bikes': user_bikes,
    'playground': user_playground,
    'rating_Cleanliness': user_rating_cleanliness,
    'rating_Accuracy': user_rating_accuracy,
    'rating_Check_in': user_rating_check_in,
    'rating_Communication': user_rating_communication,
    'rating_Location': user_rating_location,
    'rating_Value': user_rating_value,
    'rating_clean': user_ratings
}

not_check = ['state', 'guests', 'rating_Cleanliness', 'rating_Accuracy', 'rating_Check_in', 'rating_Communication', 'rating_Location', 'rating_Value', 'rating_clean']

forbidden = []
for k,v in base_row_dict.items():
    if k != 'state':
        base_row_dict[k] = float(v)
    if k not in not_check and v == 2:
        v = 0
        base_row_dict[k] = 0
        forbidden.append(k)
        
print(forbidden)

base_row_df = spark.createDataFrame([base_row_dict])
# display(base_row_df)

Beam Search

In [0]:
output = optimize_amenities_beam(base_row_df, model, user_budget, amenity_costs, forbidden, B=20)

In [0]:
render_optimization_card(output)