# Loading datasets into a dataframe

last update: 2025-12-24

In [0]:
storage_account = "lab94290"  
container = "airbnb"

# Reading airbnb data from Parquet.

In [0]:
sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"
sas_token = sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)

In [0]:
path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/airbnb_1_12_parquet"

airbnb = spark.read.parquet(path)
display(airbnb.limit(50))
airbnb.printSchema()

DataFrame[name: string, price: string, image: string, description: string, category: string, availability: string, discount: string, reviews: string, ratings: string, seller_info: string, breadcrumbs: string, location: string, lat: string, long: string, guests: string, pets_allowed: string, description_items: string, category_rating: string, house_rules: string, details: string, highlights: string, arrangement_details: string, amenities: string, images: string, available_dates: string, url: string, final_url: string, listing_title: string, property_id: string, listing_name: string, location_details: string, description_by_sections: string, description_html: string, location_details_html: string, is_supperhost: string, host_number_of_reviews: string, host_rating: string, hosts_year: string, host_response_rate: string, is_guest_favorite: string, travel_details: string, pricing_details: string, total_price: string, currency: string, cancellation_policy: string, property_number_of_reviews:

root
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- image: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- ratings: string (nullable = true)
 |-- seller_info: string (nullable = true)
 |-- breadcrumbs: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- guests: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- description_items: string (nullable = true)
 |-- category_rating: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- details: string (nullable = true)
 |-- highlights: string (nullable = true)
 |-- arrangement_details: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- images: string (nullable = true)
 |-- availa

# Configuration

In [0]:

# =========================
# CONFIG
# =========================

# Similarity
SIMILARITY_FEATURES_PRICE  = ["bedrooms", "beds", "bathrooms", "guests", "lat", "long"]
SIMILARITY_FEATURES_RATING = ["bedrooms", "beds", "bathrooms", "guests", "lat", "long", "price"]

FEATURE_WEIGHTS = {
    "bedrooms": 1,
    "beds": 1,
    "bathrooms": 1,
    "guests": 1,
    "lat": 3,
    "long": 3,
    "price": 1,   
}

N_NEIGHBORS = 20

# Amenity logic
MUST_HAVE_THRESHOLD = 0.9
MIN_SUPPORT = 3

# Default (for future combine)
PRICE_WEIGHT = 0.5
RATING_WEIGHT = 0.5

OUTPUT_BASE = "dbfs:/FileStore/airbnb"

GRID_SIZE = 0.05

Local analysis

In [0]:
# =========================
# IMPORTS
# =========================
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, ArrayType, StringType, DoubleType
from pyspark.sql.window import Window
import ast, re


# =========================
# STRUCTURAL FEATURES 
# =========================
def extract_structural_features_spark(df):

    def extract_value(items, keyword):
        if not isinstance(items, list):
            return None
        for item in items:
            if not isinstance(item, str):
                continue
            lowered = item.lower()
            if keyword in lowered:
                m = re.search(r"(\d+)", lowered)
                if m:
                    return int(m.group(1))
        return None

    extract_udf = F.udf(lambda x, k: extract_value(x, k), IntegerType())

    # description_items
    if "description_items" in df.columns:
        df = df.withColumn("bedrooms", extract_udf(F.col("description_items"), F.lit("bedroom")))
        df = df.withColumn("beds", extract_udf(F.col("description_items"), F.lit(" bed")))
        df = df.withColumn("bathrooms", extract_udf(F.col("description_items"), F.lit("bath")))

    # details (string list -> list)
    if "details" in df.columns:
        df = df.withColumn(
            "details_list",
            F.when(
                F.col("details").isNotNull() & F.col("details").startswith("["),
                F.from_json(F.col("details"), "array<string>")
            ).otherwise(F.lit(None))
        )

        df = df.withColumn("guests", extract_udf(F.col("details_list"), F.lit("guest")))

        # Fill missing structural (coalesce)
        df = df.withColumn("bedrooms", F.coalesce(F.col("bedrooms"), extract_udf(F.col("details_list"), F.lit("bedroom"))))
        df = df.withColumn("beds", F.coalesce(F.col("beds"), extract_udf(F.col("details_list"), F.lit(" bed"))))
        df = df.withColumn("bathrooms", F.coalesce(F.col("bathrooms"), extract_udf(F.col("details_list"), F.lit("bath"))))

    return df


In [0]:
# =========================
# PREP DF
# =========================
df = airbnb.drop("price")

df = (
    df
    .withColumn("price", F.get_json_object(F.col("pricing_details"), "$.price_per_night").cast("double"))
    .withColumn("ratings", F.col("ratings").cast("double"))
    .withColumn("property_id", F.col("property_id").cast("string"))
    .withColumn("country", F.trim(F.element_at(F.split(F.col("location"), ","), -1)))
    .withColumn("lat_bin", F.floor(F.col("lat") / F.lit(GRID_SIZE)))
    .withColumn("lon_bin", F.floor(F.col("long") / F.lit(GRID_SIZE)))
)

df = extract_structural_features_spark(df)


# =========================
# DISTANCE
# =========================
def weighted_distance_expr(prefix_a, prefix_b, features):
    exprs = []
    for f in features:
        w = FEATURE_WEIGHTS.get(f, 1.0)

        # NULL-safe: missing feature contributes 0
        diff = F.when(
            F.col(f"{prefix_a}.{f}").isNull() | F.col(f"{prefix_b}.{f}").isNull(),
            F.lit(0.0)
        ).otherwise(F.col(f"{prefix_a}.{f}") - F.col(f"{prefix_b}.{f}"))

        exprs.append(F.lit(float(w)) * diff ** 2)

    return F.sqrt(sum(exprs))

In [0]:

# =========================
# KNN
# =========================
def compute_knn(df, mode="price"):
    """
    mode:
      - "price"  : distance uses structural + location
      - "rating" : distance uses structural + location + price
    """
    a = df.alias("a")

    # neighbors must have price; for rating-mode also must have ratings
    b = df.filter(F.col("price").isNotNull())
    if mode == "rating":
        b = b.filter(F.col("ratings").isNotNull())
        features = SIMILARITY_FEATURES_RATING
    else:
        features = SIMILARITY_FEATURES_PRICE

    b = b.alias("b")

    joined = (
        a.join(
            b,
            (F.col("a.property_id") != F.col("b.property_id")) &
            (F.col("a.country") == F.col("b.country")) &
            (F.col("a.lat_bin") == F.col("b.lat_bin")) &
            (F.col("a.lon_bin") == F.col("b.lon_bin"))
        )
        .withColumn("distance", weighted_distance_expr("a", "b", features))
        .withColumn("similarity", 1 / (1 + F.col("distance")))
    )

    # window rank
    w = Window.partitionBy("a.property_id").orderBy("distance")

    knn = (
        joined
        .withColumn("rank", F.row_number().over(w))
        .filter(F.col("rank") <= N_NEIGHBORS)
        .select(
            F.col("a.property_id").alias("target_id"),
            F.col("b.property_id").alias("neighbor_id"),
            "distance",
            "similarity",

            # Baseline target fields (needed for gains / comparisons)
            F.col("a.price").alias("target_price"),
            F.col("a.ratings").alias("target_ratings"),
            F.col("a.amenities").alias("target_amenities"),

            # Neighbor fields
            F.col("b.price").alias("price"),
            F.col("b.ratings").alias("ratings"),
            F.col("b.amenities").alias("amenities"),
            F.col("b.house_rules").alias("house_rules"),
            F.col("b.pets_allowed").alias("pets_allowed")
        )
    )

    return knn


# =========================
# AMENITY EXTRACTION + CHECKIN/CHECKOUT
# =========================
def extract_amenities(val):
    if val is None:
        return []
    if isinstance(val, str):
        try:
            val = ast.literal_eval(val)
        except:
            return []
    if not isinstance(val, list):
        return []
    names = []
    for g in val:
        if isinstance(g, dict):
            group = g.get("group_name", "").lower()
            if "not included" in group:
                continue 
            for it in g.get("items", []):
                n = str(it.get("name", "")).lower().strip()
                if n:
                    names.append(n)
    return names

extract_amenities_udf = F.udf(extract_amenities, ArrayType(StringType()))

def extract_hour(text, mode="in"):
    if not isinstance(text, str):
        return None
    t = text.lower()
    if "flexible" in t:
        return 8 if mode == "in" else 15
    m = re.search(r"(\d{1,2})", t)
    if not m:
        return None
    h = int(m.group(1))
    if "pm" in t and h < 12:
        h += 12
    if "am" in t and h == 12:
        h = 0
    return max(0, min(22, (h // 2) * 2))

checkin_udf = F.udf(lambda x: extract_hour(x, "in"), IntegerType())
checkout_udf = F.udf(lambda x: extract_hour(x, "out"), IntegerType())



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

def is_empty_df(d):
    return (d is None) or d.rdd.isEmpty()

def build_amenity_reports(neighbors, listing_amenities_flat, mode):
    """
    Returns:
      - recs_df: amenities to recommend (freq>=MIN_SUPPORT), score per mode
      - popular_df: MUST_HAVE amenities (percentage>=MUST_HAVE_THRESHOLD), show ONLY averages
    """
    
    if is_empty_df(neighbors):
        rec_schema = (
            "target_id string, amenity string, freq long, total long, percentage double, "
            "avg_price_with double, avg_rating_with double, score double, category string"
        )
        pop_schema = (
            "target_id string, amenity string, freq long, total long, percentage double, "
            "avg_price_with double, avg_rating_with double, category string"
        )
        return (
            spark.createDataFrame([], rec_schema),
            spark.createDataFrame([], pop_schema),
        )

    # Parse TARGET amenities (before join - efficient!)
    neighbors = neighbors.withColumn(
        "target_amenity_list",
        extract_amenities_udf("target_amenities")
    )

    # Rename columns to avoid ambiguity
    listing_amenities_flat = (
        listing_amenities_flat
        .withColumnRenamed("price", "neighbor_price")
        .withColumnRenamed("ratings", "neighbor_ratings")
    )

    # Join neighbors with flat amenities
    joined = (
        neighbors
        .join(
            broadcast(listing_amenities_flat),
            neighbors.neighbor_id == listing_amenities_flat.property_id,
            "left"
        )
    )

    # Remove duplicates BEFORE groupBy, filter out NULL amenities
    joined_distinct = (
        joined
        .filter(F.col("amenity").isNotNull())  # ← סינון NULL!
        .select(
            "target_id", "amenity", "neighbor_id",
            "neighbor_price", "neighbor_ratings",
            "target_price", "target_ratings", "target_amenity_list"
        )
        .distinct()
    )

    # Total neighbors per target - from joined_distinct!
    total = (
        joined_distinct
        .select("target_id", "neighbor_id")
        .distinct()
        .groupBy("target_id")
        .count()
        .withColumnRenamed("count", "total")
    )

    # Stats per amenity
    stats = (
        joined_distinct
        .groupBy("target_id", "amenity")
        .agg(
            # Count distinct neighbors WITH this amenity
            F.countDistinct("neighbor_id").alias("freq"),
            
            # Averages over neighbors WITH the amenity
            F.avg("neighbor_price").alias("avg_price_with"),
            
            # Ratings: ignore 0, NULL stays NULL
            F.avg(
                F.when(F.col("neighbor_ratings") > 0, F.col("neighbor_ratings"))
            ).alias("avg_rating_with"),
            
            # Stable target baselines
            F.first("target_price", ignorenulls=True).alias("target_price"),
            F.first("target_ratings", ignorenulls=True).alias("target_ratings"),
            F.first("target_amenity_list", ignorenulls=True).alias("target_amenity_list"),
        )
        .join(total, "target_id", "left")
        .withColumn("percentage", F.col("freq") / F.col("total"))
        .withColumn(
            "category",
            F.when(F.col("percentage") >= MUST_HAVE_THRESHOLD, F.lit("must_have"))
             .when(F.col("freq") < MIN_SUPPORT, F.lit("uncertain"))
             .otherwise(F.lit("value"))
        )
    )

    # POPULAR / MUST_HAVE
    popular_df = (
        stats
        .filter(F.col("percentage") >= MUST_HAVE_THRESHOLD)
        .filter(~F.array_contains(F.col("target_amenity_list"), F.col("amenity")))
        .select(
            "target_id", "amenity", "freq", "total", "percentage",
            "avg_price_with", "avg_rating_with",
            "category"
        )
    )

    # RECOMMENDATIONS
    recs_df = (
        stats
        .filter(F.col("freq") >= MIN_SUPPORT)
        .filter(~F.array_contains(F.col("target_amenity_list"), F.col("amenity")))
    )

    if mode == "price":
        recs_df = recs_df.withColumn(
            "score",
            F.col("avg_price_with") - F.col("target_price")
        )
    elif mode == "rating":
        recs_df = recs_df.withColumn(
            "score",
            F.col("avg_rating_with") - F.col("target_ratings")
        )
    else:
        raise ValueError("mode must be 'price' or 'rating'")

    recs_df = recs_df.select(
        "target_id", "amenity", "freq", "total", "percentage",
        "avg_price_with", "avg_rating_with",
        "score",
        "category"
    )

    return recs_df, popular_df

In [0]:
# =========================
# RUN + SAVE
# =========================
neighbors_price = compute_knn(df, mode="price").repartition("target_id")
neighbors_price.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/neighbors_price")

neighbors_rating = compute_knn(df, mode="rating").repartition("target_id")
neighbors_rating.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/neighbors_rating")


In [0]:
from pyspark.sql import functions as F

def build_listing_amenities_flat(listings_df):
    df = listings_df.withColumn(
        "amenity_list",
        extract_amenities_udf("amenities")
    )

    df = df.withColumn("checkin", checkin_udf("house_rules"))
    df = df.withColumn("checkout", checkout_udf("house_rules"))

    df = df.withColumn(
        "amenity_list",
        F.when(
            F.col("checkin").isNotNull(),
            F.concat(
                F.col("amenity_list"),
                F.array(F.format_string(
                    "checkin_%02d:00-%02d:00",
                    F.col("checkin"), F.col("checkin") + 2
                ))
            )
        ).otherwise(F.col("amenity_list"))
    )

    df = df.withColumn(
        "amenity_list",
        F.when(
            F.col("checkout").isNotNull(),
            F.concat(
                F.col("amenity_list"),
                F.array(F.format_string(
                    "checkout_%02d:00-%02d:00",
                    F.col("checkout"), F.col("checkout") + 2
                ))
            )
        ).otherwise(F.col("amenity_list"))
    )

    df = df.withColumn(
        "amenity_list",
        F.when(
            F.col("pets_allowed") == True,
            F.concat(F.col("amenity_list"), F.array(F.lit("pets_allowed")))
        ).otherwise(F.col("amenity_list"))
    )

    df = df.withColumn(
        "amenity_list",
        F.when(F.size("amenity_list") > 0, F.col("amenity_list"))
         .otherwise(F.array(F.lit("__NO_AMENITY__")))
    )

    listing_amenities_flat = (
        df
        .withColumn("amenity", F.explode("amenity_list"))
        .filter(F.col("amenity") != "__NO_AMENITY__")
        .select(
            F.col("property_id"),
            "amenity",
            "price",
            "ratings"
        )
    )

    return listing_amenities_flat


In [0]:
listing_amenities_flat = build_listing_amenities_flat(df)

listing_amenities_flat.write.mode("overwrite").parquet(
    f"{OUTPUT_BASE}/listing_amenities_flat"
)

In [0]:
neighbors_price = spark.read.parquet(f"{OUTPUT_BASE}/neighbors_price")

recs_price, popular_price = build_amenity_reports(
    neighbors=neighbors_price,
    listing_amenities_flat=listing_amenities_flat,
    mode="price"
)

recs_price.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/recs_price_final")
popular_price.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/popular_price_final")


In [0]:
neighbors_rating = spark.read.parquet(f"{OUTPUT_BASE}/neighbors_rating")

recs_rating, popular_rating = build_amenity_reports(
    neighbors=neighbors_rating,
    listing_amenities_flat=listing_amenities_flat,
    mode="rating"
)

recs_rating.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/recs_rating_final")
popular_rating.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/popular_rating_final")


In [0]:
OUTPUT_BASE = "dbfs:/FileStore/airbnb"
recs_rating = spark.read.parquet(f"{OUTPUT_BASE}/recs_rating_final")
popular_rating = spark.read.parquet(f"{OUTPUT_BASE}/popular_rating_final")

recs_price = spark.read.parquet(f"{OUTPUT_BASE}/recs_price_final")
popular_price = spark.read.parquet(f"{OUTPUT_BASE}/popular_price_final")


In [0]:
df.write.mode("overwrite").parquet("/mnt/airbnb_outputs/listings")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# =========================
# CONFIG
# =========================
OUTPUT_BASE = "dbfs:/FileStore/airbnb"
NEIGHBORS_PRICE_PATH = f"{OUTPUT_BASE}/neighbors_price"
NEIGHBORS_RATING_PATH = f"{OUTPUT_BASE}/neighbors_rating"
LISTING_AMENITIES_FLAT_PATH = f"{OUTPUT_BASE}/listing_amenities_flat"
RECS_PRICE_PATH = f"{OUTPUT_BASE}/recs_price_final"
RECS_RATING_PATH = f"{OUTPUT_BASE}/recs_rating_final"

OUT_PRICE_PATH = f"{OUTPUT_BASE}/amenity_price_effects.parquet"
OUT_RATING_PATH = f"{OUTPUT_BASE}/amenity_rating_effects.parquet"

VAR_FALLBACK = 1e6 

print("🚀 Spark session started")

# ======================================================
# CORE FUNCTION
# ======================================================

def build_effects_with_variance(
    neighbors_path,
    recs_path,
    amenities_path,
    mode="price"  # "price" | "rating"
):
    print(f"\n==============================")
    print(f"🔵 START build_effects_with_variance | mode={mode}")
    print(f"==============================")

    print("📥 Loading parquet files...")
    neighbors = spark.read.parquet(neighbors_path)
    recs = spark.read.parquet(recs_path)
    amenities = spark.read.parquet(amenities_path)

    # --------------------------------------------------
    # amenity set per property
    # --------------------------------------------------
    print("🧩 Building amenity sets per property...")
    amenity_sets = (
        amenities
        .groupBy("property_id")
        .agg(F.collect_set("amenity").alias("amenity_set"))
    )

    # --------------------------------------------------
    # property values
    # --------------------------------------------------
    print("💰 Preparing property values (price / rating)...")
    property_values = (
        amenities
        .select("property_id", "price", "ratings")
        .distinct()
        .withColumnRenamed("price", "neighbor_price")
        .withColumnRenamed("ratings", "neighbor_ratings")
    )

    # --------------------------------------------------
    # relevant pairs
    # --------------------------------------------------
    print("🎯 Extracting relevant (target_id, amenity) pairs...")
    relevant_pairs = recs.select("target_id", "amenity").distinct()

    # --------------------------------------------------
    # universe
    # --------------------------------------------------
    print("🌍 Building universe (neighbors × relevant amenities)...")
    universe = (
        neighbors
        .join(amenity_sets, neighbors.neighbor_id == amenity_sets.property_id, "left")
        .join(property_values, neighbors.neighbor_id == property_values.property_id, "left")
        .join(relevant_pairs, on="target_id", how="inner")
        .withColumn(
            "has_amenity",
            F.coalesce(
                F.array_contains(F.col("amenity_set"), F.col("amenity")),
                F.lit(False)
            )
        )
    )

    # ======================================================
    # ✅ FIX FOR RATING 0/NULL 
    # ======================================================
    if mode == "rating":
        print("🧹 [RATING FIX] Filtering invalid neighbor ratings (NULL or <=0)...")
        universe = universe.filter(
            F.col("neighbor_ratings").isNotNull() & (F.col("neighbor_ratings") > 0)
        )

        print("🧹 [RATING FIX] Filtering targets with invalid ratings (NULL or <=0)...")
        target_ratings = (
            amenities
            .select(
                F.col("property_id").alias("target_id"),
                F.col("ratings").alias("target_ratings")
            )
            .dropDuplicates(["target_id"])
        )

        target_ratings2 = target_ratings.withColumnRenamed("target_ratings", "target_ratings_base")

        universe = (
            universe
            .join(target_ratings2, on="target_id", how="left")
            .filter(F.col("target_ratings_base").isNotNull() & (F.col("target_ratings_base") > 0))
            .drop("target_ratings_base")
        )


    # --------------------------------------------------
    # value column
    # --------------------------------------------------
    print("📐 Selecting value column...")
    if mode == "price":
        universe = universe.filter(
        F.col("neighbor_price").isNotNull() & (F.col("neighbor_price") > 0)
        )
        value_col = F.log(F.col("neighbor_price"))
        print("   using neighbor_price (>0)")
    else:
        # now neighbor_ratings already valid (no need for when)
        value_col = F.col("neighbor_ratings")
        print("   using neighbor_ratings (>0, already filtered)")

    # --------------------------------------------------
    # WITH stats
    # --------------------------------------------------
    print("➕ Computing WITH stats...")
    with_stats = (
        universe
        .filter(F.col("has_amenity"))
        .groupBy("target_id", "amenity")
        .agg(
            F.countDistinct("neighbor_id").alias("n_with"),
            F.variance(value_col).alias("var_with")
        )
    )

    # --------------------------------------------------
    # WITHOUT stats
    # --------------------------------------------------
    print("➖ Computing WITHOUT stats...")
    without_stats = (
        universe
        .filter(~F.col("has_amenity"))
        .groupBy("target_id", "amenity")
        .agg(
            F.countDistinct("neighbor_id").alias("n_without"),
            F.variance(value_col).alias("var_without")
        )
    )

    # --------------------------------------------------
    # variance of effect
    # --------------------------------------------------
    print("📊 Computing variance of effect...")
    variance_df = (
        with_stats
        .join(without_stats, ["target_id", "amenity"], "left")
        .withColumn(
            "variance",
            (F.col("var_with") / F.col("n_with")) +
            (F.col("var_without") / F.col("n_without"))
        )
        .select("target_id", "amenity", "variance")
    )

    # --------------------------------------------------
    # enrich recs
    # --------------------------------------------------
    print("🧠 Enriching recs with variance...")
    enriched = (
        recs
        .join(variance_df, ["target_id", "amenity"], "left")
        .withColumn(
            "variance",
            F.when(
                F.col("variance").isNull() | (F.col("variance") <= 0),
                F.lit(VAR_FALLBACK)
            ).otherwise(F.col("variance"))
        )
    )

    if mode == "rating":
        print("🧹 [RATING FIX] Dropping recs where target rating is NULL/0 (safe)...")
        target_ratings_for_recs = (
            amenities
            .select(
                F.col("property_id").alias("target_id"),
                F.col("ratings").alias("target_ratings")
            )
            .dropDuplicates(["target_id"])
        )

        enriched = (
            enriched
            .join(target_ratings_for_recs, on="target_id", how="left")
            .filter(F.col("target_ratings").isNotNull() & (F.col("target_ratings") > 0))
            .drop("target_ratings")
        )

    print(f"✅ DONE build_effects_with_variance | mode={mode}")
    return enriched


# ======================================================
# PRICE
# ======================================================

print("\n💰 START PRICE PIPELINE")
price_enriched = build_effects_with_variance(
    neighbors_path=NEIGHBORS_PRICE_PATH,
    recs_path=RECS_PRICE_PATH,
    amenities_path=LISTING_AMENITIES_FLAT_PATH,
    mode="price"
)

price_out = (
    price_enriched
    .select(
        F.col("target_id").alias("property_id"),
        "amenity",
        F.col("score").alias("delta_price"),
        "variance"
    )
)

print("💾 Writing price output...")
price_out.write.mode("overwrite").parquet(OUT_PRICE_PATH)
print("✅ Price output written")


# ======================================================
# RATING
# ======================================================

print("\n⭐ START RATING PIPELINE")
rating_enriched = build_effects_with_variance(
    neighbors_path=NEIGHBORS_RATING_PATH,
    recs_path=RECS_RATING_PATH,
    amenities_path=LISTING_AMENITIES_FLAT_PATH,
    mode="rating"
)

rating_out = (
    rating_enriched
    .select(
        F.col("target_id").alias("property_id"),
        "amenity",
        F.col("score").alias("delta_rating"),
        "variance"
    )
)

print("💾 Writing rating output...")
rating_out.write.mode("overwrite").parquet(OUT_RATING_PATH)
print("✅ Rating output written")

print("🛑 Spark session stopped")

🚀 Spark session started

💰 START PRICE PIPELINE

🔵 START build_effects_with_variance | mode=price
📥 Loading parquet files...
🧩 Building amenity sets per property...
💰 Preparing property values (price / rating)...
🎯 Extracting relevant (target_id, amenity) pairs...
🌍 Building universe (neighbors × relevant amenities)...
📐 Selecting value column...
   using neighbor_price (>0)
➕ Computing WITH stats...
➖ Computing WITHOUT stats...
📊 Computing variance of effect...
🧠 Enriching recs with variance...
✅ DONE build_effects_with_variance | mode=price
💾 Writing price output...
✅ Price output written

⭐ START RATING PIPELINE

🔵 START build_effects_with_variance | mode=rating
📥 Loading parquet files...
🧩 Building amenity sets per property...
💰 Preparing property values (price / rating)...
🎯 Extracting relevant (target_id, amenity) pairs...
🌍 Building universe (neighbors × relevant amenities)...
🧹 [RATING FIX] Filtering invalid neighbor ratings (NULL or <=0)...
🧹 [RATING FIX] Filtering targets wit

In [0]:
# ============================================================
# 1) Helpers: normalization + safe casting
# ============================================================

def normalize_amenity_expr(col):
    """
    Normalize raw amenity text (format 2) into the key style of format 1:
      "private patio or balcony" -> "a_private_patio_or_balcony"
    Notes:
    - Intended for SINGLE amenities (no __x__ in format 2)
    - Robust: lower, trim, replace punctuation with spaces, collapse spaces -> '_'
    """
    c = F.lower(F.trim(col))

    # Replace common punctuation / separators with space
    c = F.regexp_replace(c, r"[/|,;:\(\)\[\]\{\}\.\!\?\+\"'`]", " ")
    c = F.regexp_replace(c, r"[-–—]", " ")                 # dashes -> space
    c = F.regexp_replace(c, r"[&]", " and ")               # '&' -> 'and'
    c = F.regexp_replace(c, r"\s+", " ")                   # collapse whitespace

    # spaces -> underscores
    c = F.regexp_replace(c, r"\s", "_")

    # remove duplicated underscores
    c = F.regexp_replace(c, r"_+", "_")

    # strip underscores at ends
    c = F.regexp_replace(c, r"^_+|_+$", "")

    # prefix with 'a_'
    return F.concat(F.lit("a_"), c)


def is_valid_variance(col):
    """variance must be non-null and > 0"""
    return col.isNotNull() & (col > F.lit(0.0))

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Base input paths (CHANGE THESE)
OUTPUT_BASE = "dbfs:/FileStore/airbnb"
FORMAT1_PATH = "dbfs:/airbnb/property_upgrade_recommendations"  
KNN_PRICE_PATH = f"{OUTPUT_BASE}/amenity_price_effects.parquet"
KNN_RATING_PATH = f"{OUTPUT_BASE}/amenity_rating_effects.parquet"    

# Read
df_format1 = spark.read.parquet(FORMAT1_PATH)
df_knn_price = spark.read.parquet(KNN_PRICE_PATH)
df_knn_rating = spark.read.parquet(KNN_RATING_PATH)

print("✅ Loaded inputs:")
print("df_format1 columns:", df_format1.columns)
print("df_knn_price columns:", df_knn_price.columns)
print("df_knn_rating columns:", df_knn_rating.columns)


# ============================================================
# 2) Parse FORMAT 1 (PRIOR): explode into long tables
# ============================================================

def build_prior_long(df_format1,
                     property_col="property_id",
                     host_col="host_id",
                     price_col="price_upgrades_all",
                     rating_col="rating_upgrades_all"):
    """
    FORMAT1 is already: array<struct(feature, price_delta_usd, rating_delta, coef_var, ... )>
    We only explode.
    Adds host_id to carry it into final outputs.
    """

    # PRIOR PRICE LONG
    prior_price_long = (
        df_format1
        .select(
            F.col(host_col).alias("host_id"),
            F.col(property_col).alias("property_id"),
            F.explode_outer(F.col(price_col)).alias("it")
        )
        .select(
            "host_id",
            "property_id",
            F.col("it.feature").alias("amenity_key"),
            F.col("it.price_delta_usd").cast("double").alias("mu_prior"),
            F.col("it.coef_var").cast("double").alias("var_prior"),   # ✅ variance = coef_var
        )
    )

    # PRIOR RATING LONG
    prior_rating_long = (
        df_format1
        .select(
            F.col(host_col).alias("host_id"),
            F.col(property_col).alias("property_id"),
            F.explode_outer(F.col(rating_col)).alias("it")
        )
        .select(
            "host_id",
            "property_id",
            F.col("it.feature").alias("amenity_key"),
            F.col("it.rating_delta").cast("double").alias("mu_prior"),
            F.col("it.coef_var").cast("double").alias("var_prior"),   # ✅ variance = coef_var
        )
    )

    return prior_price_long, prior_rating_long


# ============================================================
# 3) Parse FORMAT 2 (KNN measurement): already long, normalize amenity -> amenity_key
# ============================================================

def build_knn_long_price(df_knn_price,
                         property_col="property_id",
                         amenity_col="amenity",
                         delta_col="delta_price",
                         variance_col="variance"):
    """
    Output columns:
      property_id, amenity_key, mu_knn, var_knn, amenity_raw
    """
    return (
        df_knn_price
        .select(
            F.col(property_col).alias("property_id"),
            F.col(amenity_col).alias("amenity_raw"),
            F.col(delta_col).cast("double").alias("mu_knn"),
            F.col(variance_col).cast("double").alias("var_knn"),
        )
        .withColumn("amenity_key", normalize_amenity_expr(F.col("amenity_raw")))
    )


def build_knn_long_rating(df_knn_rating,
                          property_col="property_id",
                          amenity_col="amenity",
                          delta_col="delta_rating",
                          variance_col="variance"):
    return (
        df_knn_rating
        .select(
            F.col(property_col).alias("property_id"),
            F.col(amenity_col).alias("amenity_raw"),
            F.col(delta_col).cast("double").alias("mu_knn"),
            F.col(variance_col).cast("double").alias("var_knn"),
        )
        .withColumn("amenity_key", normalize_amenity_expr(F.col("amenity_raw")))
    )


# ============================================================
# 4) Static Kalman / Bayesian fusion + LCB ranking score
# ============================================================

def fuse_prior_and_knn(prior_long, knn_long, model_name, k_lcb=1.0):
    """
    Performs static Kalman-style Bayesian fusion:

    prior:  N(mu_prior, var_prior)
    knn:    N(mu_knn,  var_knn)

    posterior:
      K = var_prior / (var_prior + var_knn)
      mu_post  = mu_prior + K * (mu_knn - mu_prior)
      var_post = (1 - K) * var_prior

    Rules:
    - combos (__x__) exist only in prior -> prior-only
    - if one side missing -> take the other
    - if variance is null or <=0 -> treat as missing
    """

    # mark combinations
    prior_prepared = prior_long.withColumn(
        "is_combo",
        F.instr(F.col("amenity_key"), "__x__") > 0
    )

    # join
    joined = (
        prior_prepared.alias("p")
        .join(
            knn_long.alias("k"),
            on=["property_id", "amenity_key"],
            how="full_outer"
        )
        .select(
            # ✅ keep host_id from prior (knn doesn't have it)
            F.col("p.host_id").alias("host_id"),

            F.coalesce(F.col("p.property_id"), F.col("k.property_id")).alias("property_id"),
            F.coalesce(F.col("p.amenity_key"), F.col("k.amenity_key")).alias("amenity_key"),

            F.col("p.mu_prior").alias("mu_prior"),
            F.col("p.var_prior").alias("var_prior"),

            F.col("k.mu_knn").alias("mu_knn"),
            F.col("k.var_knn").alias("var_knn"),
            F.col("k.amenity_raw").alias("amenity_raw"),

            F.coalesce(F.col("p.is_combo"), F.lit(False)).alias("is_combo"),
        )
    )

    # validity flags
    prior_ok = (
        F.col("mu_prior").isNotNull() &
        F.col("var_prior").isNotNull() &
        (F.col("var_prior") > 0)
    )

    knn_ok = (
        F.col("mu_knn").isNotNull() &
        F.col("var_knn").isNotNull() &
        (F.col("var_knn") > 0)
    )

    # fuse only when both exist AND not combo
    can_fuse = prior_ok & knn_ok & (~F.col("is_combo"))

    # Kalman gain (static)
    K = F.col("var_prior") / (F.col("var_prior") + F.col("var_knn"))

    mu_post = F.when(
        can_fuse,
        F.col("mu_prior") + K * (F.col("mu_knn") - F.col("mu_prior"))
    ).when(
        prior_ok & (~knn_ok),
        F.col("mu_prior")
    ).when(
        knn_ok & (~prior_ok),
        F.col("mu_knn")
    ).when(
        prior_ok & knn_ok & F.col("is_combo"),
        F.col("mu_prior")   # combos prior-only
    ).otherwise(F.lit(None).cast("double"))

    var_post = F.when(
        can_fuse,
        (F.lit(1.0) - K) * F.col("var_prior")
    ).when(
        prior_ok & (~knn_ok),
        F.col("var_prior")
    ).when(
        knn_ok & (~prior_ok),
        F.col("var_knn")
    ).when(
        prior_ok & knn_ok & F.col("is_combo"),
        F.col("var_prior")
    ).otherwise(F.lit(None).cast("double"))

    source_flag = F.when(
        can_fuse, F.lit("prior+knn")
    ).when(
        prior_ok & (~knn_ok), F.lit("prior_only")
    ).when(
        knn_ok & (~prior_ok), F.lit("knn_only")
    ).when(
        prior_ok & knn_ok & F.col("is_combo"), F.lit("prior_only_combo")
    ).otherwise(F.lit("no_valid_data"))

    # ✅ LCB score in log space (simple + correct)
    # Score = log(mu_post) - k * sqrt(var_post)
    score_lcb_log = F.when(
        (mu_post.isNotNull()) & (var_post.isNotNull()) &
        (mu_post > 0) & (var_post > 0),
        F.log(mu_post) - F.lit(float(k_lcb)) * F.sqrt(var_post)
    ).otherwise(F.lit(None).cast("double"))

    out = (
        joined
        .withColumn("mu_post", mu_post)
        .withColumn("var_post", var_post)
        .withColumn("score_lcb_log", score_lcb_log)
        .withColumn("source_flag", source_flag)
        .withColumn("fused_flag", can_fuse)
        .withColumn("model", F.lit(model_name))
    )

    return out


# ============================================================
# 5) END-TO-END: build two parquet models (price + rating)
# ============================================================

# prior longs (✅ now includes host_id)
prior_price_long, prior_rating_long = build_prior_long(
    df_format1,
    property_col="property_id",
    host_col="host_id",
    price_col="price_upgrades_all",
    rating_col="rating_upgrades_all",
)

# knn longs
knn_price_long = build_knn_long_price(
    df_knn_price,
    property_col="property_id",
    amenity_col="amenity",
    delta_col="delta_price",
    variance_col="variance",
)

knn_rating_long = build_knn_long_rating(
    df_knn_rating,
    property_col="property_id",
    amenity_col="amenity",
    delta_col="delta_rating",
    variance_col="variance",
)

# fuse -> posterior models
bayes_price_model = fuse_prior_and_knn(prior_price_long, knn_price_long, model_name="price", k_lcb=1.0)
bayes_rating_model = fuse_prior_and_knn(prior_rating_long, knn_rating_long, model_name="rating", k_lcb=1.0)

# Write outputs
bayes_price_model.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/bayes_price_model")
bayes_rating_model.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/bayes_rating_model")

print("✅ Done. Wrote:")
print(f" - {OUTPUT_BASE}/bayes_price_model")
print(f" - {OUTPUT_BASE}/bayes_rating_model")


✅ Loaded inputs:
df_format1 columns: ['host_id', 'property_id', 'price_upgrades_all', 'rating_upgrades_all']
df_knn_price columns: ['property_id', 'amenity', 'delta_price', 'variance']
df_knn_rating columns: ['property_id', 'amenity', 'delta_rating', 'variance']
✅ Done. Wrote:
 - dbfs:/FileStore/airbnb/bayes_price_model
 - dbfs:/FileStore/airbnb/bayes_rating_model


In [0]:
OUTPUT_BASE = "dbfs:/FileStore/airbnb" 

BAYES_PRICE_PATH  = f"{OUTPUT_BASE}/bayes_price_model"
BAYES_RATING_PATH = f"{OUTPUT_BASE}/bayes_rating_model"

bayes_price_model  = spark.read.parquet(BAYES_PRICE_PATH)
bayes_rating_model = spark.read.parquet(BAYES_RATING_PATH)

print("✅ Loaded outputs:")
print("bayes_price_model columns:", bayes_price_model.columns)
print("bayes_rating_model columns:", bayes_rating_model.columns)


✅ Loaded outputs:
bayes_price_model columns: ['host_id', 'property_id', 'amenity_key', 'mu_prior', 'var_prior', 'mu_knn', 'var_knn', 'amenity_raw', 'is_combo', 'mu_post', 'var_post', 'score_lcb_log', 'source_flag', 'fused_flag', 'model']
bayes_rating_model columns: ['host_id', 'property_id', 'amenity_key', 'mu_prior', 'var_prior', 'mu_knn', 'var_knn', 'amenity_raw', 'is_combo', 'mu_post', 'var_post', 'score_lcb_log', 'source_flag', 'fused_flag', 'model']


In [0]:
df_csv = spark.read.csv(
    "dbfs:/data/amenity_inventory_with_ikea_price_embeddings",
    header=True,
    inferSchema=True
)
display(df_csv.limit(5))

amenity_name,n_properties,amenity_norm,matched_amenity_norm,similarity_score,estimated_cost,min_price,max_price,n_products,match_confidence
kitchen,1817149,kitchen,kitchen,1.0000001192092896,455.0,305.0,705.0,7.0,HIGH
smoke alarm,1718954,smoke alarm,alarm clock,0.614111065864563,25.0,25.0,25.0,1.0,MANUAL_SAFETY
wifi,1715041,wifi,sonos wifi bookshelf speaker,0.4769666790962219,0.0,0.0,0.0,1.0,MANUAL_CONNECTIVITY
hot water,1650531,hot water,sink,0.4359509646892547,0.0,0.0,0.0,1.0,MANUAL_UTILITIES
essentials,1643562,essentials,starter kit,0.3661669492721557,,,,,NO_MATCH


In [0]:
from pyspark.sql import functions as F

# ============================================================
# Attach estimated_cost from prices CSV
# - Normalize prices CSV amenity -> amenity_key (a_...)
# - Join on amenity_key to both bayes models
# ============================================================

COSTS_CSV_PATH = "dbfs:/data/amenity_inventory_with_ikea_price_embeddings"

df_csv = spark.read.csv(COSTS_CSV_PATH, header=True, inferSchema=True)
print("✅ Costs CSV columns:", df_csv.columns)

CSV_AMENITY_COL = "amenity_name"  

df_costs_norm = (
    df_csv
    .select(
        F.col(CSV_AMENITY_COL).alias("amenity_raw"),
        F.col("estimated_cost").cast("double").alias("estimated_cost")
    )
    .filter(F.col("amenity_raw").isNotNull())
    .withColumn("amenity_key", normalize_amenity_expr(F.col("amenity_raw"))) 
    .groupBy("amenity_key")
    .agg(F.first("estimated_cost", ignorenulls=True).alias("estimated_cost"))
)

if "estimated_cost" in bayes_price_model.columns:
    bayes_price_model = bayes_price_model.drop("estimated_cost")
if "estimated_cost" in bayes_rating_model.columns:
    bayes_rating_model = bayes_rating_model.drop("estimated_cost")

bayes_price_model = bayes_price_model.join(df_costs_norm, on="amenity_key", how="left")
bayes_rating_model = bayes_rating_model.join(df_costs_norm, on="amenity_key", how="left")

print("✅ Added estimated_cost to both bayes models")

# Write final outputs
bayes_price_model.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/bayes_price_model_final")
bayes_rating_model.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/bayes_rating_model_final")
print("✅ Wrote final models including estimated_cost")
