In [0]:
!pip install reverse_geocoder 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

osm_base_dir = "file:/Workspace/Users/nagham.omar@campus.technion.ac.il/VibeBnB/data/osm_pois"
osm_paths = {
        "Africa": f"{osm_base_dir}/africa_pois_enriched.parquet",
        "Antarctica": f"{osm_base_dir}/antarctica_pois_enriched.parquet",
        "Asia": f"{osm_base_dir}/asia_pois_enriched.parquet",
        "Australia/Oceania": f"{osm_base_dir}/australia_oceania_pois_enriched.parquet",
        "Central America": f"{osm_base_dir}/central_america_pois_enriched.parquet",
        "Europe": f"{osm_base_dir}/europe_pois_enriched.parquet",
        "North America": f"{osm_base_dir}/north_america_pois_enriched.parquet",
        "South America": f"{osm_base_dir}/south_america_pois_enriched.parquet",
    }
def build_continents_from_osm(spark: SparkSession, osm_paths: dict) -> dict:
    """
    Build {continent: [country_codes]} from OSM parquet files.
    """
    continents = {}

    for continent, path in osm_paths.items():
        df = spark.read.parquet(path)

        if "addr_cc" not in df.columns:
            continents[continent] = []
            continue

        countries = (
            df.select("addr_cc")
              .where(F.col("addr_cc").isNotNull())
              .distinct()
              .orderBy("addr_cc")
              .rdd.flatMap(lambda x: x)
              .collect()
        )

        continents[continent] = countries

    return continents
continents = build_continents_from_osm(spark, osm_paths)

for k, v in continents.items():
    print(k, v)

In [0]:
# data_join.py
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from config import *
from airbnb_data_loader import load_airbnb_data

def norm(c):
    """Normalize text fields for joins (lowercase, trim, remove punctuation)."""
    return F.lower(F.regexp_replace(F.trim(F.coalesce(c, F.lit(""))), r"[\.\,\-\(\)\[\]]", ""))


def cities_airbnb_join(cities_df: DataFrame, airbnb_df: DataFrame) -> DataFrame:
    """Join Airbnb listings with cities metadata using normalized city names."""
    return (
        airbnb_df
        .withColumn("city_key", norm(F.col("addr_name")))
        .withColumn("region_key", norm(F.col("addr_admin1")))
        .withColumn("country_key", norm(F.col("addr_cc")))
        .alias("a")
        .join(
            F.broadcast(
                cities_df
                .withColumn("city_key", norm(F.col("city")))
                .withColumn("region_key", norm(F.col("region")))
                .withColumn("country_key", norm(F.col("country")))
                .alias("c")
            ),
            on=[F.col("a.city_key") == F.col("c.city_key")],
            how="left"
        )
    )


def _sanitize(c: str) -> str:
    """Make column-safe names for env features."""
    return "".join(ch if ch.isalnum() else "_" for ch in c)


def osm_join_one_continent(continent: str, airbnb_df: DataFrame, osm_df: DataFrame) -> DataFrame:
    """Compute per-listing environment counts from OSM for one continent."""
    airbnb_scope = (
        airbnb_df
        .filter(F.col("addr_cc").isin(continents[continent]))
        .select("property_id", F.col("lat").cast("double").alias("lat"), F.col("long").cast("double").alias("lon"), "addr_cc")
        .filter(F.col("lat").isNotNull() & F.col("lon").isNotNull())
        .dropDuplicates(["property_id"])
    )

    osm_geo = (
        osm_df
        .select(F.col("lat").cast("double").alias("p_lat"), F.col("lon").cast("double").alias("p_lon"), F.lower(F.trim(F.col("poi_group"))).alias("group"))
        .filter(F.col("p_lat").isNotNull() & F.col("p_lon").isNotNull() & F.col("group").isNotNull() & (F.col("group") != ""))
    )

    delta_lon = (R_M / (111000.0 * F.cos(F.radians(F.col("a.lat")))))

    cand = (
        airbnb_scope.alias("a")
        .join(
            osm_geo.alias("p"),
            (F.col("p.p_lat").between(F.col("a.lat") - DELTA_LAT, F.col("a.lat") + DELTA_LAT)) &
            (F.col("p.p_lon").between(F.col("a.lon") - delta_lon, F.col("a.lon") + delta_lon)),
            "inner"
        )
    )

    dist = 2 * EARTH_R * F.asin(
        F.sqrt(
            F.pow(F.sin(F.radians(F.col("p.p_lat") - F.col("a.lat")) / 2), 2) +
            F.cos(F.radians(F.col("a.lat"))) * F.cos(F.radians(F.col("p.p_lat"))) *
            F.pow(F.sin(F.radians(F.col("p.p_lon") - F.col("a.lon")) / 2), 2)
        )
    )

    per_group = (
        cand
        .withColumn("distance_m", dist)
        .filter(F.col("distance_m") <= R_M)
        .groupBy(F.col("a.property_id").alias("property_id"), F.col("p.group").alias("group"))
        .agg(F.count("*").cast("int").alias("n_places"))
    )

    pivoted = per_group.groupBy("property_id").pivot("group").agg(F.first("n_places")).fillna(0)

    renamed = pivoted.select(
        "property_id",
        *[F.col(c).alias(f"env_{_sanitize(c)}") for c in pivoted.columns if c != "property_id"]
    )

    out = airbnb_scope.join(renamed, "property_id", "left")

    env_cols = [c for c in out.columns if c.startswith("env_")]
    out = out.fillna(0, subset=env_cols) if env_cols else out

    # Ensure fixed ENV_COLS exist
    out = out.select("*", *[F.lit(0).alias(c) for c in ENV_COLS if c not in out.columns]) if "ENV_COLS" in globals() else out

    return out


def osm_join_all_continents(spark: SparkSession, airbnb_df: DataFrame, osm_paths: dict) -> DataFrame:
    """Aggregate OSM environment features across all continents."""
    parts = [osm_join_one_continent(cont, airbnb_df, spark.read.parquet(p)) for cont, p in osm_paths.items() if cont in continents]

    base = (
        airbnb_df
        .select("property_id", F.col("lat").cast("double").alias("lat"), F.col("long").cast("double").alias("lon"), "addr_cc")
        .dropDuplicates(["property_id"])
    )

    env_all = parts[0] if parts else base
    for p in parts[1:]:
        env_all = env_all.unionByName(p, allowMissingColumns=True)

    env_cols = [c for c in env_all.columns if c.startswith("env_")]
    return env_all.dropDuplicates(["property_id"]).fillna(0, subset=env_cols) if env_cols else env_all.dropDuplicates(["property_id"])


def join_all(out_path: str, cities_path: str = "dbfs:/vibebnb/data/travel_cities.parquet", osm_base_dir: str = "file:/Workspace/Users/nagham.omar@campus.technion.ac.il/VibeBnB/data/osm_pois") -> DataFrame:
    """Main pipeline: load, join cities, join OSM env features."""
    spark = SparkSession.builder.getOrCreate()

    airbnb_df = load_airbnb_data(spark).dropDuplicates(["property_id"])

    airbnb_with_cities = cities_airbnb_join(spark.read.parquet(cities_path), airbnb_df)

    osm_paths = {
    "africa": f"{osm_base_dir}/africa_pois_enriched.parquet",
    "antarctica": f"{osm_base_dir}/antarctica_pois_enriched.parquet",
    "asia": f"{osm_base_dir}/asia_pois_enriched.parquet",
    "australia_oceania": f"{osm_base_dir}/australia_oceania_pois_enriched.parquet",
    "central_america": f"{osm_base_dir}/central_america_pois_enriched.parquet",
    "europe": f"{osm_base_dir}/europe_pois_enriched.parquet",
    "north_america": f"{osm_base_dir}/north_america_pois_enriched.parquet",
    "south_america": f"{osm_base_dir}/south_america_pois_enriched.parquet",
}


    env_all = osm_join_all_continents(spark, airbnb_df, osm_paths).drop("lat", "lon", "addr_cc")

    final_df = airbnb_with_cities.join(env_all, "property_id", "left")

    env_cols = [c for c in final_df.columns if c.startswith("env_")]
    final_df = final_df.fillna(0, subset=env_cols) if env_cols else final_df

    # final_df.write.mode("overwrite").partitionBy("addr_cc").parquet(out_path)
    return final_df


final_df = join_all(out_path="dbfs:/vibebnb/data/airbnb_joined_all.parquet")
display(final_df)


In [0]:
# embedding.py
from pyspark.sql import DataFrame
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    SQLTransformer, RegexTokenizer, StopWordsRemover,
    HashingTF, IDF, VectorAssembler, StandardScaler,
    Normalizer, BucketedRandomProjectionLSH
)
from config import TEXT_COLS, NUM_COLS, ENV_COLS


def _present_cols(df: DataFrame, cols):
    """Return columns that exist in the DataFrame."""
    return [c for c in cols if c in df.columns]


def embedding(df: DataFrame):
    """Build text + numeric/environment embedding for Airbnb listings."""
    text_cols = _present_cols(df, TEXT_COLS)
    num_cols  = _present_cols(df, NUM_COLS)
    env_cols  = _present_cols(df, ENV_COLS)

    # combine text columns
    text_sql = " || ' ' || ".join([f"coalesce({c}, '')" for c in text_cols])
    combine_text = SQLTransformer(statement=f"SELECT *, lower(trim({text_sql})) AS text_all FROM __THIS__")

    # text processing
    tokenizer = RegexTokenizer(inputCol="text_all", outputCol="tokens", pattern="\\W+", minTokenLength=2)
    stop_rm = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
    hash_tf = HashingTF(inputCol="tokens_clean", outputCol="tf", numFeatures=1 << 18)
    idf = IDF(inputCol="tf", outputCol="text_tfidf")

    # numeric + env features
    num_env_assembler = VectorAssembler(inputCols=num_cols + env_cols, outputCol="num_env_vec", handleInvalid="keep")
    scaler = StandardScaler(inputCol="num_env_vec", outputCol="num_env_scaled", withStd=True, withMean=False)

    # final feature vector
    final_assembler = VectorAssembler(inputCols=["num_env_scaled", "text_tfidf"], outputCol="features", handleInvalid="keep")

    pipeline = Pipeline(stages=[combine_text, tokenizer, stop_rm, hash_tf, idf, num_env_assembler, scaler, final_assembler])
    model = pipeline.fit(df)

    df_emb = model.transform(df).select("property_id", "features")

    # L2 normalization for similarity search
    normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=2.0)
    return normalizer.transform(df_emb)


def build_lsh(df: DataFrame):
    """Train LSH model for approximate nearest neighbors."""
    lsh = BucketedRandomProjectionLSH(
        inputCol="features_norm",
        outputCol="hashes",
        bucketLength=0.5,
        numHashTables=3
    )
    return lsh.fit(df)

final_df=embedding(spark.read.parquet("dbfs:/vibebnb/data/airbnb_joined_all.parquet"))
model=build_lsh(final_df)
# final_df.write.mode("overwrite").partitionBy("addr_cc").parquet"dbfs:/vibebnb/data/airbnb_embedded.parquet")
#save model
# model.save("dbfs:/vibebnb/models/lsh_model")

In [0]:
# scoring.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window


def airbnb_scores(df: DataFrame) -> DataFrame:
    """Compute price, property-quality, and host-quality scores (country-normalized)."""
    w = Window.partitionBy("addr_cc")

    # price per guest
    df1 = df.withColumn("PPG", F.col("price_per_night") / F.col("guests"))
    df1 = df1.withColumn("PPG_min", F.min("PPG").over(w)).withColumn("PPG_max", F.max("PPG").over(w))
    df1 = df1.withColumn(
        "PPGnorm",
        F.when((F.col("PPG_max") == F.col("PPG_min")) | F.col("PPG").isNull(), F.lit(0.0))
         .otherwise((F.col("PPG") - F.col("PPG_min")) / (F.col("PPG_max") - F.col("PPG_min")))
    )
    df1 = df1.withColumn("price_score", F.lit(1.0) - F.col("PPGnorm"))

    # property quality
    df1 = df1.withColumn(
        "category_rating_avg",
        (F.col("rating_accuracy") + F.col("rating_cleanliness") + F.col("rating_checkin") +
         F.col("rating_communication") + F.col("rating_location") + F.col("rating_value")) / F.lit(6.0)
    )
    df1 = df1.withColumn("rating01", F.col("ratings") / F.lit(5.0)).withColumn("category_rating01", F.col("category_rating_avg") / F.lit(5.0))
    df1 = df1.withColumn("rating_score", F.col("rating01") * F.lit(0.6) + F.col("category_rating01") * F.lit(0.4))

    df1 = df1.withColumn("number_of_reviews_log", F.log(F.col("property_number_of_reviews") + F.lit(1.0))).withColumn("max_log_cc", F.max("number_of_reviews_log").over(w))
    df1 = df1.withColumn(
        "property_number_of_reviews_weight",
        F.when(F.col("max_log_cc").isNull() | (F.col("max_log_cc") <= 0) | F.col("number_of_reviews_log").isNull(), F.lit(0.0))
         .otherwise(F.col("number_of_reviews_log") / F.col("max_log_cc"))
    )
    df1 = df1.withColumn(
        "weighted_rating_score",
        F.col("rating_score") * F.col("property_number_of_reviews_weight") +
        F.lit(0.5) * (F.lit(1.0) - F.col("property_number_of_reviews_weight"))
    )
    df1 = df1.withColumn("property_quality", F.col("weighted_rating_score") * F.lit(0.8) + F.col("is_guest_favorite") * F.lit(0.2))

    # host quality
    df1 = df1.withColumn("h_i", F.col("host_rating") / F.lit(5.0)).withColumn("e_i", F.log(F.col("host_number_of_reviews") + F.lit(1.0))).withColumn("r_i", F.col("host_response_rate") / F.lit(100.0)).withColumn("T_i", F.col("hosts_year"))
    df1 = df1.withColumn("max_e_cc", F.max("e_i").over(w)).withColumn("max_T_cc", F.max("T_i").over(w))
    df1 = df1.withColumn(
        "enorm_i",
        F.when(F.col("max_e_cc").isNull() | (F.col("max_e_cc") <= 0) | F.col("e_i").isNull(), F.lit(0.0))
         .otherwise(F.col("e_i") / F.col("max_e_cc"))
    )
    df1 = df1.withColumn("hconf_i", F.col("enorm_i") * F.col("h_i") + (F.lit(1.0) - F.col("enorm_i")) * F.lit(0.5))
    df1 = df1.withColumn(
        "Tnorm_i",
        F.when(F.col("max_T_cc").isNull() | (F.col("max_T_cc") <= 0) | F.col("T_i").isNull(), F.lit(0.0))
         .otherwise(F.col("T_i") / F.col("max_T_cc"))
    )
    df1 = df1.withColumn("TenureEffect_i", F.col("Tnorm_i") * (F.col("h_i") - F.lit(0.5)))
    df1 = df1.withColumn(
        "host_quality",
        F.lit(0.6) * F.col("hconf_i") +
        F.lit(0.1) * F.col("r_i") +
        F.lit(0.1) * F.col("is_supperhost") +
        F.lit(0.2) * F.col("TenureEffect_i")
    )

    return df1


def osm_scores(df: DataFrame, env_cols: list[str]) -> DataFrame:
    """Normalize environment POI counts per country."""
    w = Window.partitionBy("addr_cc")
    out = df
    for c in env_cols:
        out = out.withColumn(f"{c}_max_cc", F.max(F.col(c)).over(w)).withColumn(
            f"{c}_norm",
            F.when(F.col(f"{c}_max_cc").isNull() | (F.col(f"{c}_max_cc") == 0) | F.col(c).isNull(), F.lit(0.0))
             .otherwise(F.col(c) / F.col(f"{c}_max_cc"))
        )
    return out


def cities_scores(df: DataFrame) -> DataFrame:
    """Compute city attributes (monthly avg temps + budget rank)."""
    out = df.withColumn("temp_map", F.from_json(F.col("avg_temp_monthly"), "map<string, struct<avg:double, max:double, min:double>>"))
    for m in range(1, 13): out = out.withColumn(f"temp_avg_m{m:02d}", F.col("temp_map").getItem(F.lit(str(m))).getField("avg"))
    rank_map = F.create_map(F.lit("Budget"), F.lit(1), F.lit("Mid-range"), F.lit(2), F.lit("Luxury"), F.lit(3))
    out = out.withColumn("budget_rank", rank_map.getItem(F.col("budget_level")))
    out = out.withColumn("budget_score_raw", F.when(F.col("budget_rank").isNull(), F.lit(0.0)).otherwise(F.col("budget_rank") / F.lit(3.0)))
    return out



def scoring_all(df: DataFrame, env_cols: list[str]=ENV_COLS) -> DataFrame:
    """Run all scoring steps (no user weights applied)."""
    return cities_scores(osm_scores(airbnb_scores(df), env_cols))

final_df=scoring_all(spark.read.parquet("dbfs:/vibebnb/data/airbnb_embedded.parquet"))
#save 
# final_df.write.mode("overwrite").parquet("dbfs:/vibebnb/data/airbnb_scores.parquet")

In [0]:
# retrieve.py / rank.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.ml.feature import BucketedRandomProjectionLSH

def retrieve(target_id, country, df, lsh_model, n=50):
    """ANN retrieval inside one country using precomputed features_norm."""
    q = df.filter(F.col("property_id") == target_id).select("features_norm").limit(1).collect()
    if not q: return None
    q_vec = q[0]["features_norm"]
    cand = df.filter(F.col("addr_cc") == country)
    return lsh_model.approxNearestNeighbors(cand, q_vec, n, distCol="l2_dist")


def _normalize_weights(w: dict):
    """Normalize positive weights to sum to 1."""
    if not w: return {}
    s = sum(v for v in w.values() if v is not None and v > 0)
    if s <= 0: return {}
    return {k: float(v) / s for k, v in w.items() if v is not None and v > 0}


def order(
    df: DataFrame,
    k: int,
    price_w: float = 0.0,
    property_w: float = 0.0,
    host_w: float = 0.0,
    env_weights: dict = None,        # {"env_food": 20, "env_nature": 10, ...} OR {"env_food_norm": 20, ...}
    temp_pref: float = None,         # preferred temperature
    temp_w: float = 0.0,
    travel_month: int = None,        # 1..12
    budget_pref: str = None,         # "Budget" | "Mid-range" | "Luxury"
    budget_w: float = 0.0,
    normalize_all_weights: bool = True,
    score_col: str = "final_score"
) -> DataFrame:
    """Rank an already-filtered DF using precomputed score components + user weights."""
    work = df

    # weights (optionally normalize everything together)
    env_weights = env_weights or {}
    all_w = {"price": price_w, "property": property_w, "host": host_w, "temp": temp_w, "budget": budget_w, **{f"env::{k}": v for k, v in env_weights.items()}}
    w_norm = _normalize_weights(all_w) if normalize_all_weights else all_w
    price_w = w_norm.get("price", 0.0); property_w = w_norm.get("property", 0.0); host_w = w_norm.get("host", 0.0); temp_w = w_norm.get("temp", 0.0); budget_w = w_norm.get("budget", 0.0)

    terms = []

    # base components from airbnb_scores()
    if price_w and "price_score" in work.columns: terms.append(F.coalesce(F.col("price_score"), F.lit(0.0)) * F.lit(price_w))
    if property_w and "property_quality" in work.columns: terms.append(F.coalesce(F.col("property_quality"), F.lit(0.0)) * F.lit(property_w))
    if host_w and "host_quality" in work.columns: terms.append(F.coalesce(F.col("host_quality"), F.lit(0.0)) * F.lit(host_w))

    # env components from osm_scores(): expects <env>_norm columns
    if env_weights:
        for env_col, raw_w in env_weights.items():
            wv = w_norm.get(f"env::{env_col}", raw_w) if normalize_all_weights else raw_w
            norm_col = env_col if env_col.endswith("_norm") else f"{env_col}_norm"
            if wv and norm_col in work.columns: terms.append(F.coalesce(F.col(norm_col), F.lit(0.0)) * F.lit(wv))

    # temperature component from cities_scores(): uses temp_avg_m01..temp_avg_m12
    if temp_w and temp_pref is not None and travel_month is not None:
        m = int(travel_month)
        if 1 <= m <= 12:
            temp_col = f"temp_avg_m{m:02d}"
            if temp_col in work.columns:
                work = work.withColumn("temp_score_raw", F.greatest(F.lit(0.0), F.lit(1.0) - (F.abs(F.col(temp_col) - F.lit(float(temp_pref))) / F.lit(25.0))))
                terms.append(F.col("temp_score_raw") * F.lit(temp_w))

    # budget component: prefer the score already computed in cities_scores() if present
    if budget_w and budget_pref is not None:
        rank_map = F.create_map(F.lit("Budget"), F.lit(1), F.lit("Mid-range"), F.lit(2), F.lit("Luxury"), F.lit(3))
        work = work.withColumn("budget_rank_i", rank_map.getItem(F.col("budget_level"))).withColumn("budget_pref_i", rank_map.getItem(F.lit(budget_pref)))
        work = work.withColumn("budget_score_raw_user", F.when(F.col("budget_rank_i").isNull() | F.col("budget_pref_i").isNull(), F.lit(0.0)).otherwise(F.greatest(F.lit(0.0), F.lit(1.0) - (F.abs(F.col("budget_rank_i") - F.col("budget_pref_i")) / F.lit(2.0)))))
        terms.append(F.col("budget_score_raw_user") * F.lit(budget_w))

    work = work.withColumn(score_col, F.lit(0.0)) if not terms else work.withColumn(score_col, sum(terms))
    return work.orderBy(F.col(score_col).desc()).limit(int(k))
