# **Data Preprocessing**

In [0]:
pip install langdetect

In [0]:
# PySpark SQL
from pyspark.sql import functions as F, Window
from pyspark.sql.functions import col, count, when, isnan, trim, broadcast
from pyspark.sql.types import StringType, BooleanType

# PySpark ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec
from pyspark.ml.regression import LinearRegression

# Spark NLP
from sparknlp.base import DocumentAssembler
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import Tokenizer, BertForSequenceClassification

# Language detection
from langdetect import detect, DetectorFactory

# Utilities
from functools import reduce
import re

# Plotting
import matplotlib.pyplot as plt

Train Data:
- sample scraped booking + expedia

Test Data:
- scraped booking + expedia
- sample original booking
- sample original airbnb


### scraped booking dataset

In [0]:
storage_account = "lab94290" 
container = "submissions"
group = "itay_asaf_antal"
sas_token = "..." # change to your sas token
sas_token = sas_token.lstrip('?')

# 2) tell ABFS to use SAS for the account and give the fixed token provider
acct = storage_account
spark.conf.set(f"fs.azure.account.auth.type.{acct}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{acct}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{acct}.dfs.core.windows.net", sas_token)

output_path = f"abfss://{container}@{acct}.dfs.core.windows.net/{group}/scraped_booking.csv"

scraped_booking = (spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", '"')
    .option("quote", '"')
    .load(output_path)
)

print("number of rows is", scraped_booking.count())
display(scraped_booking.limit(10))

In [0]:
scraped_booking = scraped_booking.withColumn(
    "hotel_id",
    F.concat_ws(", ", F.col("HotelName"), F.col("City"), F.col("Country"))
).withColumn("hotel_id", F.lower(F.col("hotel_id")))

scraped_booking = scraped_booking.select("hotel_id",
                F.col("Review").alias("text_review"),
                F.col("Rating").alias("label")
                )
display(scraped_booking.limit(10))

### scraped expedia dataset

In [0]:
storage_account = "lab94290" 
container = "submissions"
group = "itay_asaf_antal"
sas_token = "..." # change to your sas token
sas_token = sas_token.lstrip('?')

# 2) tell ABFS to use SAS for the account and give the fixed token provider
acct = storage_account
spark.conf.set(f"fs.azure.account.auth.type.{acct}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{acct}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{acct}.dfs.core.windows.net", sas_token)

output_path = f"abfss://{container}@{acct}.dfs.core.windows.net/{group}/scraped_expedia.csv"

scraped_expedia = (spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", '"')
    .option("quote", '"')
    .load(output_path)
)

print("number of rows is", scraped_expedia.count())
display(scraped_expedia.limit(10))

In [0]:
scraped_expedia = scraped_expedia.select("Hotel Name", "City", "Country", "Rating", "Review")

scraped_expedia = scraped_expedia.withColumn(
    "hotel_id",
    F.concat_ws(", ", F.col("Hotel name"), F.col("City"), F.col("Country"))
).withColumn("hotel_id", F.lower(F.col("hotel_id")))

scraped_expedia = scraped_expedia.select("hotel_id",
                F.col("Review").alias("text_review"),
                F.col("Rating").alias("label")
                )
display(scraped_expedia.limit(10))

### combined scraped dataset

In [0]:
df_scraped = scraped_booking.unionByName(scraped_expedia)
print("number of rows is", df_scraped.count())
display(df_scraped.limit(10))

In [0]:
# Calculate Null and NaN counts for all columns in df_scraped
null_counts_df = df_scraped.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c) 
    for c in df_scraped.columns
])

# Display the results
print("Null/NaN counts for each column in df_scraped:")
null_counts_df.show()

In [0]:
df_short_reviews = df_scraped.filter(
    F.length(F.trim(F.col("text_review"))) < 20
)

print("number of rows is", df_short_reviews.count())
df_short_reviews.select("text_review").show(10, truncate=False)

In [0]:
# Set seed to ensure consistent language detection results
DetectorFactory.seed = 0

# DEFINE THE ENGLISH DETECTION UDF
@F.udf(returnType=StringType())
def detect_language(text):
    try:
        return detect(text)
    except:
        return "error"

# Pattern: Anything NOT (a-z, A-Z, 0-9, space, !, ,, ., ?, -, (, ))
strict_pattern = r"[^a-zA-Z0-9\s\!\,\.\?\-\(\)]"
    
def clean_and_filter_english_reviews(df, text_review_col):
    """
    Cleans review text by removing unwanted characters and filters out short or non-English reviews.
    Keeps only English reviews with sufficient length and returns a cleaned DataFrame.
    """
    df = (
        df.dropna(subset=[text_review_col])
          .withColumn("cleaned_review", F.regexp_replace(F.col(text_review_col), strict_pattern, ""))
          .filter(F.length(F.trim(F.col("cleaned_review"))) >= 20)
          .withColumn("language", detect_language(F.col("cleaned_review")))
          .filter(F.col("language") == "en")
          .drop(text_review_col, "language")
          .withColumnRenamed("cleaned_review", text_review_col)
    )
    return df

df_scraped = clean_and_filter_english_reviews(df_scraped, "text_review")

print(f"after cleaning and filter en, row count is: {df_scraped.count()}")
display(df_scraped.limit(10))

In [0]:
df_scraped = (
    df_scraped
    .withColumn("label_d", F.col("label").cast("double"))
    .filter(F.col("label_d").isNotNull())
    .withColumn("label", F.round("label_d"))
    .filter(F.col("label").between(1, 10))
    .drop("label_d")
)
print("after ensure label is double, row count is:", df_scraped.count())

In [0]:
df_scraped = df_scraped.dropDuplicates()
print("after drop duplicate, row count is:", df_scraped.count())

**for EDA:**

In [0]:
df_sample_scraped = df_scraped.sample(withReplacement=False, fraction=0.3, seed=42).cache()
total = df_sample_scraped.count()
print("total amount of sample for eda:", total)

In [0]:
df_counts = (df_sample_scraped
    .groupBy("label")
    .count()
    .withColumn("percent", F.col("count") / F.lit(total) * 100)
    .orderBy("label")
)

pdf = df_counts.toPandas()

In [0]:
plt.figure()
bars = plt.bar(pdf["label"], pdf["percent"])

plt.xticks(range(1, 11))
plt.xlim(0.5, 10.5)

plt.xlabel("rating")
plt.ylabel("percent (%)")
plt.title("Rating Distribution of Full Reviews")

for bar in bars:
    h = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width()/2,
        h + 0.3,                 # המרחק מעל העמודה (שנה ל-0.2/0.5 לפי טעם)
        f"{h:.2f}%",
        ha="center",
        va="bottom",
        fontsize=9
    )

plt.tight_layout()
plt.show()

### scraped booking real categories scores dataset

In [0]:
storage_account = "lab94290" 
container = "submissions"
group = "itay_asaf_antal"
sas_token = "..." # change to your sas token
sas_token = sas_token.lstrip('?')

# 2) tell ABFS to use SAS for the account and give the fixed token provider
acct = storage_account
spark.conf.set(f"fs.azure.account.auth.type.{acct}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{acct}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{acct}.dfs.core.windows.net", sas_token)

output_path = f"abfss://{container}@{acct}.dfs.core.windows.net/{group}/scraped_booking_real_scores.csv"

scraped_booking_real_scores = (spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", '"')
    .option("quote", '"')
    .load(output_path)
)

print("number of rows is", scraped_booking_real_scores.count())
display(scraped_booking_real_scores.limit(10))

In [0]:
scraped_booking_real_scores = scraped_booking_real_scores.withColumn(
    "hotel_id",
    F.concat_ws(", ", F.col("HotelName"), F.col("City"), F.col("Country"))
).withColumn("hotel_id", F.lower(F.col("hotel_id")))

scraped_booking_real_scores = scraped_booking_real_scores.drop("HotelName", "City", "Country")

scraped_booking_real_scores = scraped_booking_real_scores.toDF(*[c.lower() for c in scraped_booking_real_scores.columns])
scraped_booking_real_scores = scraped_booking_real_scores.dropDuplicates(["hotel_id"])

print("number of rows is", scraped_booking_real_scores.count())
display(scraped_booking_real_scores.limit(10))

### df train

In [0]:
df_train = df_scraped.join(
    broadcast(scraped_booking_real_scores.select("hotel_id").distinct()),
    on="hotel_id",
    how="left_anti"
)

print("number of rows is", df_train.count())
display(df_train.limit(10))

In [0]:
df_train = df_train.sample(withReplacement=False, fraction=0.7, seed=42).cache()
print("number of rows is", df_train.count())

In [0]:
# Predicts sentiment (positive / negative) for each hotel review using a BERT model
# fine-tuned on hotel-review data. Internal labels are mapped to readable sentiment.

document = (DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("document"))

tokenizer = (Tokenizer()
    .setInputCols(["document"])
    .setOutputCol("token"))

clf = (BertForSequenceClassification
    .pretrained("finetunedmodel_hotel_sentiment_5k", "en")
    .setInputCols(["document", "token"])
    .setOutputCol("class"))

pipeline = Pipeline(stages=[document, tokenizer, clf])

# Dummy fit to materialize the Spark ML pipeline
dummy = spark.createDataFrame([("ok",)], ["text"])
model = pipeline.fit(dummy)

def add_sentiment_to_review(df, text_col="text_review"):
    """
    Add a 'sentiment' column ("positive"/"negative") based on hotel review text.
    """
    tmp = df.withColumn("text", F.col(text_col))
    out = model.transform(tmp)
    return (
        out.select(
            *df.columns,
            F.col("class.result")[0].alias("lbl")
        )
        .withColumn(
            "sentiment",
            F.when(F.col("lbl") == "LABEL_1", F.lit("negative"))
             .when(F.col("lbl") == "LABEL_0", F.lit("positive"))
             .otherwise(F.lit(None))
        )
        .drop("lbl")
    )

In [0]:
df_with_sentiment = add_sentiment_to_review(df_train, text_col="text_review")

condition_mismatch = (
    ((F.col("sentiment") == "negative") & (F.col("label") == 10.0)) |
    ((F.col("sentiment") == "positive") & (F.col("label") == 1.0))
)

df_wrong_label = df_with_sentiment.filter(condition_mismatch == True).select("text_review", "label", "sentiment")
print("Drop wrong label, count drop rows is:", df_wrong_label.count())
print("Exapmles of wrong label:")
display(df_wrong_label.limit(50))

In [0]:
df_train = df_with_sentiment.filter(~condition_mismatch).drop("sentiment").cache()
print("train df count row is:", df_train.count())
display(df_train.limit(10))

### df test

original booking dataset

In [0]:
storage_account = "lab94290" 
container = "booking"
booking_sas_token= "..." # change to your sas token
booking_sas_token = booking_sas_token.lstrip('?')

# 2) tell ABFS to use SAS for the account and give the fixed token provider
acct = storage_account
spark.conf.set(f"fs.azure.account.auth.type.{acct}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{acct}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{acct}.dfs.core.windows.net", booking_sas_token)

path = f"abfss://{container}@{acct}.dfs.core.windows.net/booking_1_9.parquet"
df_origin_booking = spark.read.parquet(path)
print("booking original df count row is:", df_origin_booking.count())
display(df_origin_booking.limit(10))

In [0]:
df_sample_origin_booking = df_origin_booking.sample(withReplacement=False, fraction=0.03, seed=42).cache()
print("sample of booking original df count row is:", df_sample_origin_booking.count())

In [0]:
df_sample_origin_booking = df_sample_origin_booking.select("city", "country", "hotel_id", "title", "top_reviews")

df_sample_origin_booking = df_sample_origin_booking.filter(F.size(F.col("top_reviews")) > 0)
print("row count is:", df_sample_origin_booking.count())

In [0]:
# 1. Identify hotel_ids that have more than 1 distinct title
bad_hotels_df = df_sample_origin_booking.groupBy("hotel_id") \
    .agg(F.countDistinct("title").alias("name_count")) \
    .filter(F.col("name_count") >= 2) \
    .select("hotel_id")

# 2. Use a Left Anti Join to remove every row belonging to those IDs
booking_df_cleaned = df_sample_origin_booking.join(bad_hotels_df, on="hotel_id", how="left_anti")

# 3. Verification
original_ids = df_sample_origin_booking.select("hotel_id").distinct().count()
final_ids = booking_df_cleaned.select("hotel_id").distinct().count()

print(f"Removed {original_ids - final_ids} hotel IDs that had inconsistent names.")
print(f"Remaining unique hotel IDs: {final_ids}")

In [0]:
# Calculate Null and NaN counts for all columns
# For complex types (arrays, structs), only check isNull()
# For other types, check both isNull() and isnan() after casting to double
null_counts = booking_df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c)
    if str(booking_df_cleaned.schema[c].dataType).startswith('ArrayType') or
       str(booking_df_cleaned.schema[c].dataType).startswith('StructType')
    else count(when(col(c).isNull() | (col(c).cast('double').isNotNull() & isnan(col(c).cast('double'))), c)).alias(c)
    for c in booking_df_cleaned.columns
])

print("Missing Values (Null/NaN) per column:")
null_counts.show()

initial_count = booking_df_cleaned.count()

booking_df_cleaned = booking_df_cleaned.dropna()

after_dropna_count = booking_df_cleaned.count()
print(f"Row count after dropping nulls: {after_dropna_count}")

booking_df_cleaned = booking_df_cleaned.dropDuplicates()

final_count = booking_df_cleaned.count()

print(f"Original Row Count: {initial_count}")
print(f"Duplicates Removed: {after_dropna_count - final_count}")
print(f"Final Row Count: {final_count}")

In [0]:
booking_df_cleaned = booking_df_cleaned.withColumn(
    "hotel_name",
    F.concat_ws(", ", F.col("title"), F.col("city"), F.col("country"))
).withColumn("hotel_name", F.lower(F.col("hotel_name")))

booking_df_cleaned = booking_df_cleaned.select("hotel_id", "hotel_name", "top_reviews")

# 2. Explode the array of dictionaries
# Each row currently has an array like [{"review": "...", ...}, {"review": "...", ...}]
df_exploded = booking_df_cleaned.withColumn("review_dict", F.explode(F.col("top_reviews")))

# 3. Extract the 'review' field and Normalize
# We lowercase immediately to make regex matching easier
df_extracted = df_exploded.select(
    F.col("hotel_name").alias("hotel_id"),
    F.lower(F.col("review_dict.review")).alias("text_review")
)
print("row count is:", df_extracted.count())

In [0]:
df_sample_origin_booking = clean_and_filter_english_reviews(df_extracted, "text_review")
print("row count is:", df_sample_origin_booking.count())
display(df_sample_origin_booking.limit(10))

original airbnb dataset

In [0]:
storage_account = "lab94290"  
container = "airbnb"
airbnb_sas_token="..." # change to your sas token
sas_token = airbnb_sas_token.lstrip('?')
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token)
path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/airbnb_1_12_parquet"
df_origin_airbnb = spark.read.parquet(path)
print("airbnb original df count row is:", df_origin_airbnb.count())
display(df_origin_airbnb.limit(10))

In [0]:
df_sample_origin_airbnb = df_origin_airbnb.sample(withReplacement=False, fraction=0.02, seed=42).cache()
print("sample of airbnb original df count row is:", df_sample_origin_airbnb.count())

In [0]:
from pyspark.sql.types import ArrayType, StringType

airbnb_df = df_sample_origin_airbnb.select("name", "reviews", "location", "property_id")

airbnb_df = airbnb_df.withColumn(
    "reviews",
    F.from_json(F.col("reviews"), ArrayType(StringType()))
)

airbnb_df = airbnb_df.filter(F.size(F.col("reviews")) > 0)

print("row count is:", airbnb_df.count())

In [0]:
# 1. Identify the "bad" IDs (those with more than 1 distinct name)
bad_ids_df = airbnb_df.groupBy("property_id") \
    .agg(F.countDistinct("name").alias("unique_name_count")) \
    .filter(F.col("unique_name_count") >= 2) \
    .select("property_id")

# 2. Remove all rows matching these IDs using a Left Anti Join
# This keeps only rows in df_expedia where property_id is NOT in bad_ids_df
df_airbnb_cleaned = airbnb_df.join(bad_ids_df, on="property_id", how="left_anti")

# 3. Verify the results
original_count = airbnb_df.count()
final_count = df_airbnb_cleaned.count()
removed_rows = original_count - final_count

print(f"Original record count: {original_count}")
print(f"Records removed: {removed_rows}")
print(f"Final record count: {final_count}")

display(df_airbnb_cleaned.limit(10))

In [0]:
from pyspark.sql.types import ArrayType, StructType, MapType, StringType

null_summary = df_airbnb_cleaned.select([
    F.count(
        F.when(
            (F.col(c).isNull()) |
            (F.size(F.col(c)) == 0) if isinstance(df_airbnb_cleaned.schema[c].dataType, ArrayType)
            else (
                (F.trim(F.col(c)) == "") if isinstance(df_airbnb_cleaned.schema[c].dataType, StringType)
                else (F.isnan(F.col(c).cast("double")))
            ),
            c
        )
    ).alias(c)
    for c in df_airbnb_cleaned.columns
])

print("Missing values (Null/NaN/Empty) per column:")
null_summary.show(truncate=False)

In [0]:
import unicodedata
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# --- 1. DEFINE NORMALIZATION UDF ---
# This converts accented characters to their closest English letters
@F.udf(returnType=StringType())
def normalize_udf(text):
    if text is None: return None
    # Decompose unicode characters and filter out the accent marks (Mn category)
    return "".join(
        char for char in unicodedata.normalize('NFD', text)
        if unicodedata.category(char) != 'Mn'
    )

# --- 2. MASTER COUNTRY LIST ---
# Comprehensive list of countries in lowercase for filtering
valid_countries = [
    "afghanistan", "albania", "algeria", "andorra", "angola", "antigua and barbuda", "argentina", "armenia", "australia", 
    "austria", "azerbaijan", "bahamas", "bahrain", "bangladesh", "barbados", "belarus", "belgium", "belize", "benin", 
    "bhutan", "bolivia", "bosnia and herzegovina", "botswana", "brazil", "brunei", "bulgaria", "burkina faso", "burundi", 
    "cote d ivoire", "cabo verde", "cambodia", "cameroon", "canada", "central african republic", "chad", "chile", "china", 
    "colombia", "comoros", "congo", "costa rica", "croatia", "cuba", "cyprus", "czechia", "czech republic", 
    "democratic republic of the congo", "denmark", "djibouti", "dominica", "dominican republic", "ecuador", "egypt", 
    "el salvador", "equatorial guinea", "eritrea", "estonia", "eswatini", "ethiopia", "fiji", "finland", "france", "gabon", 
    "gambia", "georgia", "germany", "ghana", "greece", "grenada", "guatemala", "guinea", "guinea bissau", "guyana", "haiti", 
    "honduras", "hungary", "iceland", "india", "indonesia", "iran", "iraq", "ireland", "israel", "italy", "jamaica", "japan", 
    "jordan", "kazakhstan", "kenya", "kiribati", "kuwait", "kyrgyzstan", "laos", "latvia", "lebanon", "lesotho", "liberia", 
    "libya", "liechtenstein", "lithuania", "luxembourg", "madagascar", "malawi", "malaysia", "maldives", "mali", "malta", 
    "marshall islands", "mauritania", "mauritius", "mexico", "micronesia", "moldova", "monaco", "mongolia", "montenegro", 
    "morocco", "mozambique", "myanmar", "namibia", "nauru", "nepal", "netherlands", "new zealand", "nicaragua", "niger", 
    "nigeria", "north korea", "north macedonia", "norway", "oman", "pakistan", "palau", "palestine", "panama", 
    "papua new guinea", "paraguay", "peru", "philippines", "poland", "portugal", "qatar", "romania", "russia", "rwanda", 
    "saint kitts and nevis", "saint lucia", "saint vincent and the grenadines", "samoa", "san marino", "sao tome and principe", 
    "saudi arabia", "senegal", "serbia", "seychelles", "sierra leone", "singapore", "slovakia", "slovenia", "solomon islands", 
    "somalia", "south africa", "south korea", "south sudan", "spain", "sri lanka", "sudan", "suriname", "sweden", 
    "switzerland", "syria", "tajikistan", "tanzania", "thailand", "timor leste", "togo", "tonga", "trinidad and tobago", 
    "tunisia", "turkey", "turkmenistan", "tuvalu", "uganda", "ukraine", "united arab emirates", "united kingdom", "uk", 
    "united states", "usa", "united states of america", "uruguay", "uzbekistan", "vanuatu", "venezuela", "vietnam", 
    "yemen", "zambia", "zimbabwe"
]

# --- 3. EXTRACTION & FILTERING LOGIC ---

# Step A: Split the location by comma and extract the last part
# Step B: Lowercase, Normalize accents, and remove non-letter characters
# Step C: Filter rows based on the valid_countries list or empty values
df_airbnb_final = df_airbnb_cleaned
df_airbnb_final = df_airbnb_final.withColumn(
    "country_temp", 
    F.element_at(F.split(F.col("location"), ","), -1)
).withColumn(
    "country",
    F.trim(
        F.regexp_replace(
            F.regexp_replace(normalize_udf(F.lower(F.col("country_temp"))), r"[^a-z\s]", " "),
            r"\s+", " "
        )
    )
).filter(
    (F.col("country").isin(valid_countries)) | 
    (F.col("country") == "") | 
    (F.col("country").isNull())
).drop("country_temp") # Ditching intermediate extraction columns

# --- 4. VERIFICATION ---
print(f"Extraction and filtering complete.")
df_airbnb_final.select("location", "country").show(10, truncate=False)

In [0]:
from pyspark.sql import functions as F

# 1. Split by the middle dot character "·"
# 2. Extract the first element (index 0)
# 3. Trim any leftover whitespace
df_airbnb_final = df_airbnb_final.withColumn(
    "property_name_extracted", 
    F.trim(F.split(F.col("name"), "·").getItem(0))
)

# Optional: Replace the original name column with the extracted one
df_airbnb_final = df_airbnb_final.drop("name").withColumnRenamed("property_name_extracted", "name")

# Verification
df_airbnb_final.select("name").show(5, truncate=False)

In [0]:
df_airbnb_final = df_airbnb_final.drop("location")

from pyspark.sql import functions as F

# 1. Clean the outer quotes from the start and end of the string
# 2. Split by the 3-character delimiter ","
# 3. Explode the resulting array into multiple rows
df_airbnb_exploded = df_airbnb_final.withColumn(
    "text_review", 
    F.explode(F.col("reviews"))
).withColumn(
    "hotel_id",
    F.concat_ws(", ", F.col("name"), F.col("country"))
).withColumn("hotel_id", F.lower(F.col("hotel_id")))


# 4. Final cleaning: remove the array column and show results
df_airbnb_final = df_airbnb_exploded.select("hotel_id", "text_review")
print("row count is:", df_airbnb_final.count())
display(df_airbnb_final.limit(10))                            

In [0]:
df_airbnb_final = df_airbnb_final.sample(withReplacement=False, fraction=0.1, seed=42)
print("sample of airbnb final df count row is:", df_airbnb_final.count())

In [0]:
df_sample_origin_airbnb = clean_and_filter_english_reviews(df_airbnb_final, "text_review")
print("row count is:", df_sample_origin_airbnb.count())
display(df_sample_origin_airbnb.limit(10))

**union dfs for df_test:**

In [0]:
df_scraped_for_test = df_scraped.drop("label")

In [0]:
df_test = df_scraped_for_test.unionByName(df_sample_origin_booking)
df_test = df_test.unionByName(df_sample_origin_airbnb)
df_test = df_test.dropDuplicates().cache()
print("number of rows is", df_test.count())
display(df_test.limit(10))

# **Model**

**For training linear regression:**
- Split into categories (each review can be in multiple categories).
- Trim according to category relevant parts.
- align review score according to sentiment analysis.
- Build linear regression model for each category from train set with noise (X is embbeding of review and Y is its score).

**For final hotel's category score:**
- For a review find all categoires its about.
- Trim each review to relevent category parts.
- Seperatly find predictions with linear regression model and avg per hotel_id save it.

### **Split into Categories and Refine Review Score:**

**Find Review Categories:**

In [0]:
categories_kw = {
  "cleanliness": [
    "clean","cleanliness","spotless","tidy","neat","hygienic","sanitary",
    "dirty","dusty","dust","grime","stain","stained","smell","odor","mould","mold","mildew",
    "housekeeping","clean towels","clean sheets","fresh linens","not cleaned",
    "filthy", "unclean", "smelly", "stinky", "musty", "damp", "smoke", "smoky",
    "bugs", "insects", "cockroaches", "ants", "hair", "hairs",
    "immaculate","pristine","sparkling","well kept","well-kept","sanitized","sanitised",
    "cleaned","not clean","not cleaned properly","deep clean","deep-clean","fresh smell",
    "reek","reeking","stink","stinky smell","sewage","drain smell",
    "sticky","sticky floor","greasy","grease","slimy","moldy","mouldy",
    "cobweb","cobwebs","bedbugs","bed bugs","mosquitoes","flies","spider","spiders",
    "vacuum","vacuumed","mop","mopped","trash","garbage","bin","bins"
  ],
  "comfort": [
    "comfortable","comfort","cozy","snug", "comfy"
    "bed","mattress","pillow","pillows","bedding","sheets","linens",
    "soft bed","hard bed","uncomfortable","lumpy",
    "noise","noisy","quiet","soundproof","thin walls",
    "temperature","hot","cold","heating","heater","air conditioning","ac",
    "spacious","cramped","small room","room size", "comfy",
    "firm bed", "soft mattress", "hard mattress", "blanket", "blankets", "duvet", "sleep", "slept", "sleeping",
    "warm", "cool", "freezing", "aircon", "ventilation",
    "restful","relaxing","good sleep","sleep well","good night sleep",
    "king bed","queen bed","double bed","single bed","twin bed",
    "sofa bed","couch","sofa","couch bed","extra bed","rollaway","crib","cot",
    "squeaky","creaky","bed frame","springs",
    "blackout","blackout curtains","curtains","blinds",
    "stuffy","humid","humidity","draught","draft","drafty","draughty",
    "street noise","traffic noise","construction noise","earplugs"
  ],
  "facilities": [
    "facilities","amenities","equipment",
    "gym","fitness","pool","swimming pool","sauna","spa","jacuzzi","hot tub",
    "elevator","lift","lobby","lounge","terrace","garden","patio",
    "parking","car park","garage",
    "restaurant","bar","breakfast area","cafeteria", "breakfast",
    "laundry","washing machine","dryer",
    "kitchenette","kitchen","microwave","fridge","refrigerator","kettle",
    "tv","television","channels", "bathroom", "shower", "toilet",
    "water pressure", "hot water", "cold water", "coffee", "tea", "coffee machine",
    "dishwasher", "oven", "stove", "workspace", "desk", "balcony",
    "air conditioner","air conditioning unit","heater","radiator",
    "hairdryer","hair dryer","toiletries","soap","shampoo","conditioner","body wash",
    "towels","bath towel","bathrobe","robes","slippers",
    "bathtub","bath tub","sink","drain","bidet",
    "iron","ironing board","safe","minibar","mini bar",
    "usb","usb outlet","charger","charging","plug","socket","outlet",
    "vending machine","ice machine","water dispenser","water cooler",
    "kids club","playground","game room","games room",
    "meeting room","conference room","business center",
    "shuttle","airport shuttle","bike rental","bicycle"
  ],
  "staff": [
    "staff","service","team","personnel","receptionist","front desk","reception",
    "helpful","friendly","polite","welcoming","attentive","professional","kind",
    "rude","unfriendly","unhelpful","disrespectful","arrogant",
    "check-in","check in","check-out","checkout",
    "communication","responsive","response time",
    "host", "hosts", "manager", "owner", "support", "customer service", "greet", "greeted",
     "concierge","porter","bellboy","bellhop","doorman",
    "courteous","accommodating","helped","assisted","supportive","patient",
    "ignored","ignoring","impolite","unprofessional",
    "efficient","inefficient","slow","quick","prompt","delayed",
    "early check-in","early check in","late check-out","late check out",
    "upgrade","upgraded","reservation","booking issue","refund"
  ],
  "location": [
    "location","located","area","neighborhood","neighbourhood",
    "central","city center","city centre","downtown","in the center",
    "close to","near","nearby","walking distance","steps away",
    "transport","public transport","metro","subway","train","bus","station",
    "safe area","unsafe","sketchy", "close by",
    "convenient", "conveniently located", "distance",
    "far", "far from", "away from", "walkable",
    "near the beach","beach","seafront","waterfront",
    "old town","city centre","main square",
    "restaurants nearby","shops nearby","shopping","supermarket","grocery",
    "attractions","sights","tourist area",
    "airport","near the airport","port","harbor","harbour",
    "quiet area","noisy area","busy area",
    "hill","steep","remote","isolated"
  ],
  "free_wifi": [
    "wifi","wi-fi", "wi fi", "wireless","internet","connection","network","router",
    "password","speed","fast wifi","slow wifi","unstable","disconnect","drops",
    "no signal","coverage","bandwidth","streaming","zoom","video call","free wifi",
    "signal", "lag", "laggy", "buffering", "connect", "connected",
    "internet access","wifi signal","signal strength","strong signal","weak signal",
    "login","log in","sign in","portal",
    "download","upload","mbps","ping","latency",
    "ethernet","lan","wired internet","modem","hotspot"
  ]
}

In [0]:
def create_categories_column(df, text_review_col):
    """
    Creates a 'categories' array column for each review.
    Each review is assigned to all categories whose keywords appear in the text.
    If no category matches, take out the review and not use it.
    """
    txt = F.lower(F.col(text_review_col))

    cat_cols = []
    for ctg, kws in categories_kw.items():
        is_ctg = None
        for k in kws:
            cond = txt.contains(k)
            is_ctg = cond if is_ctg is None else (is_ctg | cond)

        cat_cols.append(F.when(is_ctg, F.lit(ctg)))

    df = df.withColumn("categories_raw", F.array(*cat_cols))
    df = df.withColumn(
        "categories",
        F.expr("filter(categories_raw, x -> x is not null)")
    ).drop("categories_raw")

    return df.filter(F.size("categories") > 0) 

df_train = create_categories_column(df_train, "text_review")
df_train_long = df_train.select(
        "text_review",
        "label",
        F.explode("categories").alias("category")
    )

In [0]:
display(df_train_long.limit(20))

**Trim Review To Category Relevant Text:**

In [0]:
# Cuts each review down to only the sentence parts that belong to the row’s category:
# it first splits the review into chunks (by punctuation and contrast words), then keeps only the chunks that contain category keywords, and finally replaces text_review with those kept chunks.

# Convert the multi-label categories column into separate training DataFrames,
#       one DataFrame per category (each review may appear in multiple category datasets)

contrast_words = [
    "but", "however", "though", "although", "yet", "whereas", "while", "on the other hand",
    "even though", "even if", "still", "nevertheless", "nonetheless", "except", "except for",
    "apart from", "aside from","instead", "otherwise", "rather", "rather than", "despite",
    "in spite of", "regardless", "regardless of", "unfortunately", "sadly",
]
contrast_regex = "|".join(contrast_words)

kws_words = [kw for kws in categories_kw.values() for kw in kws]
kws_regex = "|".join(kw.replace(" ", r"\s+") for kw in kws_words)

split_regex = (
    r"(?<=\.{3})\s+"
    r"|(?<=[.!?])\s+"
    r"|\s+(?:" + contrast_regex +r")\s+"
    r"|(?:\s+and\s+|,\s*)"
      r"(?=(?:(?!\s+and\s+|,\s*).)*\b(?:" + kws_regex + r")\b)"
)

def trim_review_to_category_relevant_text(df):
    """
    First splits each review into chunks using sentence punctuation and contrast words.
    Then conditionally splits on "and"/commas only when the following phrase contains a category keyword, and keeps only the chunks relevant to each category.
    """
    category_dfs = {}

    for ctg, kws in categories_kw.items():
        kws_lower = [k.lower() for k in kws]

        patterns = []
        for k in kws_lower:
            k_esc = re.escape(k)
            k_esc = k_esc.replace(r"\ ", r"\s+")
            patterns.append(f"rlike(s, '\\\\b{k_esc}\\\\b')")

        hits_sql = " OR ".join(patterns) if patterns else "false"

        category_dfs[ctg] = (
            df
            .filter(F.col("category") == ctg)
            .withColumn("_segments", F.split(F.lower("text_review"), split_regex))
            .withColumn("_segments", F.expr("filter(_segments, s -> trim(s) <> '')"))
            .withColumn(
                "_hits",
                F.expr(f"filter(_segments, s -> ({hits_sql}))")
            )
            .withColumn(
                "_hits",
                F.expr("transform(_hits, s -> regexp_replace(trim(s), '[.!?]+$', ''))")
            )
            .withColumn("text_review", F.concat_ws(". ", F.col("_hits")))
            .drop("_segments", "_hits")
            .filter(F.length("text_review") > 0)
            .select(*[c for c in ["hotel_id", "text_review", "label"] if c in df.columns])
        )

    return category_dfs
    
train_category_dfs = trim_review_to_category_relevant_text(df_train_long)

In [0]:
display(train_category_dfs['comfort'].limit(20))

In [0]:
# Keep only reviews with at least 10 characters to drop very short texts.
train_category_dfs = {
    ctg: df_ctg.filter(F.length(F.trim(F.col("text_review"))) >= 10)
    for ctg, df_ctg in train_category_dfs.items()
}

for ctg, df in train_category_dfs.items():
    print(f"{ctg}: {df.count()}")

display(train_category_dfs['comfort'].limit(20))

**Find each review its sentiment:**

In [0]:
train_category_dfs = {
    ctg: add_sentiment_to_review(df_ctg, text_col="text_review")
    for ctg, df_ctg in train_category_dfs.items()
}

In [0]:
display(train_category_dfs['comfort'].limit(20))

**Update Review Score From Category Review Semantics:**

In [0]:
# This block creates an adjusted score based on the sentiment of the category-specific text.
# First it detects sentiment (positive/negative/neutral), then marks “hard” sentiment using strong words.
# Finally it updates the score: hard sentiment changes it by ±4, regular sentiment changes it by ±2, and clamps the result to 1–10.

hard_neg_re = r"(terrible|awful|horrible|disgusting|unacceptable|worst|broken|ridiculous|appalling)"
hard_pos_re = r"(amazing|excellent|perfect|outstanding|fantastic|wonderful|exceptional|incredible)"


def adjust_review_score(df):
    df_scored = (
        df
        .withColumn("is_hard_neg", (F.col("sentiment") == "negative") & F.lower(F.col("text_review")).rlike(hard_neg_re))
        .withColumn("is_hard_pos", (F.col("sentiment") == "positive") & F.lower(F.col("text_review")).rlike(hard_pos_re))

        # apply scoring rules (hard first, then regular)
        .withColumn(
            "rating_adj_raw",
            F.when(F.col("is_hard_neg") & (F.col("label") >= 6), F.col("label") - F.lit(4.0))
            .when(F.col("is_hard_pos") & (F.col("label") <= 6), F.col("label") + F.lit(4.0))
            .when((F.col("sentiment") == "negative") & (F.col("label") >= 6), F.col("label") - F.lit(2.0))
            .when((F.col("sentiment") == "positive") & (F.col("label") <= 6), F.col("label") + F.lit(2.0))
            .otherwise(F.col("label"))
        )
        # clamp to valid range
        .withColumn("label", F.least(F.lit(10.0), F.greatest(F.lit(1.0), F.col("rating_adj_raw"))))
        .drop("rating_adj_raw")
    )
    return df_scored.select("text_review", "label")

In [0]:
train_dfs = {
    ctg: adjust_review_score(df_ctg)
    for ctg, df_ctg in train_category_dfs.items()
}

In [0]:
display(train_dfs['comfort'].limit(20))

**for EDA:**

In [0]:
sample_df_train = df_train.sample(withReplacement=False, fraction=0.2, seed=42)
sample_df_train = create_categories_column(sample_df_train, "text_review")
sample_df_train_long = sample_df_train.select(
        "hotel_id",
        "text_review",
        "label",
        F.explode("categories").alias("category")
    )
sample_train_category_dfs = trim_review_to_category_relevant_text(sample_df_train_long)
sample_train_category_dfs = {
    ctg: add_sentiment_to_review(df_ctg, text_col="text_review")
    for ctg, df_ctg in sample_train_category_dfs.items()
}
sample_train_category_dfs = {
    ctg: adjust_review_score(df_ctg)
    for ctg, df_ctg in sample_train_category_dfs.items()
}

display(sample_train_category_dfs["comfort"].limit(10))

In [0]:
avg_dfs = []
for ctg, df_ctg in sample_train_category_dfs.items():
    avg_dfs.append(
        df_ctg
        .select("label")
        .withColumn("category", F.lit(ctg))
        .withColumn("stage", F.lit("before"))
        .groupBy("stage", "category")
        .agg(F.avg("label").alias("avg_label"))
    )

avg_by_category = reduce(lambda a, b: a.unionByName(b), avg_dfs)
pdf = avg_by_category.orderBy("category").toPandas()

In [0]:
cats = pdf["category"].tolist()
vals = pdf["avg_label"].tolist()
x = list(range(len(cats)))

plt.scatter(x, vals, s=70)

plt.xticks(x, cats, fontsize=10)
plt.margins(x=0.03)
plt.xlim(-0.5, len(x)-1 + 0.5)

plt.ylim(7, 10)

for xi, yi in zip(x, vals):
    plt.text(
        xi, yi + 0.08,
        f"{yi:.2f}",
        ha="center", va="bottom", fontsize=9
    )

plt.ylabel("average rating")
plt.title("Average Rating per Category (train)")

plt.tight_layout()
plt.show()


**______________________________**

### **Linear Regression:**

In [0]:
def train_regression_linear_model(train_df):
    """
    Trains a text-based linear regression model using Word2Vec embeddings.
    The function tokenizes review text, removes stop words, learns word embeddings,
    and fits a regularized linear regression to predict numeric review scores.

    train_df:
        text_review : string
        label       : double
    """
    tokenizer = RegexTokenizer(
        inputCol="text_review",
        outputCol="tokens",
        pattern="\\W+",
        minTokenLength=2
    )

    remover = StopWordsRemover(
        inputCol="tokens",
        outputCol="filtered_tokens"
    )

    w2v = Word2Vec(
        inputCol="filtered_tokens",
        outputCol="features",
        vectorSize=50,
        windowSize=3,
        minCount=2
    )

    lr = LinearRegression(
        featuresCol="features",
        labelCol="label",
        predictionCol="prediction",
        regParam=0.05,          # regularization קל לריאליזם
        elasticNetParam=0.0    # Ridge-style
    )

    pipeline = Pipeline(stages=[tokenizer, remover, w2v, lr])
    model = pipeline.fit(train_df)
    return model


def estimate_noise_sigma(model, train_df):
    """
    Estimates the standard deviation of the model's prediction error (noise)
        by computing residuals (label - prediction) on the training data.
    This value is later used to add realistic Gaussian noise to predictions.
    """
    preds = model.transform(train_df)
    preds = preds.withColumn("residual", F.col("label") - F.col("prediction"))
    sigma = (
        preds
        .agg(F.stddev("residual").alias("sigma"))
        .collect()[0]["sigma"]
    )
    return float(sigma) if sigma is not None else 0.0

In [0]:
# Train a separate linear regression model (with noise estimation) for each category

models = {}
for ctg, train_df in train_dfs.items():
    model = train_regression_linear_model(train_df)
    sigma = estimate_noise_sigma(model, train_df)
    models[ctg] = (model, sigma)

### **Predictions:**

In [0]:
def predict_with_regression_linear_model(model, df, sigma, noise_scale=0.5):
    """
    Generates predictions using a trained model and adds Gaussian noise
    to simulate realistic variability in text-based score predictions.

    noise_scale:
        1.0 = realistic noise level
        <1  = reduced noise
        >1  = increased noise
    """
    preds = model.transform(df)
    preds = preds.withColumn(
        "prediction",
        F.col("prediction") + F.lit(sigma * noise_scale) * F.randn()
    )
    preds = preds.withColumn(
        "prediction",
        F.greatest(F.lit(1.0), F.least(F.col("prediction"), F.lit(10.0)))
    )
    return preds

In [0]:
TOO_SMALL_REVIEWS_NUM = 20

In [0]:
# For each hotel, assign reviews to categories, predict noisy scores with a category-specific regression model,
# and aggregate per (hotel_id, category) to produce final category scores, review counts, and representative examples.

df_test = create_categories_column(df_test, "text_review")
test_long = df_test.select(
    "hotel_id",
    "text_review",
    F.explode("categories").alias("category")
)
test_category_dfs = trim_review_to_category_relevant_text(test_long)

K = 3   # number of example reviews to keep per category
categories_scores = []

for ctg, (model, sigma) in models.items():
    w_cnt = Window.partitionBy("hotel_id")

    df_ctg = (
        test_category_dfs[ctg]
        .select("hotel_id", "text_review")
        .withColumn("number_reviews", F.count("*").over(w_cnt))
    )

    # Save hotel's category information only if there are enough reviews
    df_enough_reviews = df_ctg.filter(F.col("number_reviews") >= TOO_SMALL_REVIEWS_NUM)

    # If no hotel has enough reviews in this category, move to next one
    if df_enough_reviews.rdd.isEmpty():
        continue

    preds_ctg = predict_with_regression_linear_model(model, df_enough_reviews, sigma)

    avg_preds = preds_ctg.groupBy("hotel_id").agg(
        F.lit(ctg).alias("category"),
        F.max("number_reviews").alias("number_reviews"),
        F.round(F.avg("prediction"), 3).alias("score")
    )

    # save K reviews per hotel to "show" why they got their score 
    w_mean = Window.partitionBy("hotel_id")
    w_rank = Window.partitionBy("hotel_id").orderBy(F.col("abs_diff").asc())

    examples_df = (
        preds_ctg
        .withColumn("mean", F.avg("prediction").over(w_mean))
        .withColumn("abs_diff", F.abs(F.col("prediction") - F.col("mean")))
        .withColumn("rn", F.row_number().over(w_rank))
        .filter(F.col("rn") <= K)
        .groupBy("hotel_id")
        .agg(F.collect_list(F.col("text_review")).alias("example_reviews"))
    )

    avg_preds = (
        avg_preds
        .join(examples_df, on="hotel_id", how="inner")
        .withColumn("example_reviews", F.coalesce("example_reviews", F.array()))
    )

    categories_scores.append(avg_preds)

# Union all categories into one table: many rows (hotel_id, category)
category_summary = reduce(lambda a, b: a.unionByName(b), categories_scores)

category_summary = category_summary.select(
    "hotel_id", "category", "score", "example_reviews", "number_reviews"
)

In [0]:
display(category_summary.limit(20))

In [0]:
hotels_id_with_real_category_scores = scraped_booking_real_scores.select("hotel_id").distinct()
hotels_summary_for_tool = hotels_id_with_real_category_scores.join(category_summary, on="hotel_id", how="inner")

**tool input:**

In [0]:
hotels_summary_as_json = (
    hotels_summary_for_tool
    .groupBy("hotel_id")
    .agg(
        F.to_json(
            F.map_from_entries(
                F.collect_list(
                    F.struct(
                        F.col("category"),
                        F.struct(
                            F.col("score").alias("score"),
                            F.col("number_reviews").alias("number_reviews"),
                            F.col("example_reviews").alias("examples"),
                        )
                    )
                )
            )
        ).alias("hotel_categories_score")
    )
)

print("needs to be <= 120, row count is", hotels_summary_as_json.count())
display(hotels_summary_as_json)


In [0]:
print("unique hotel_id:", category_summary.select("hotel_id").distinct().count())
print("Rows per category:")
category_summary.groupBy("category").count().show(truncate=False)

**for EDA:**

**Graph AVG real and pred category scores for same hotels**

In [0]:
ctg_cols = ["staff", "facilities", "cleanliness", "comfort", "location", "free_wifi"]

In [0]:
df_avg_real_long = (
    scraped_booking_real_scores
    .select(F.explode(F.array(*[
        F.struct(
            F.lit(c).alias("category"),
            F.col(c).cast("double").alias("score")
        )
        for c in ctg_cols
    ])).alias("x"))
    .select("x.category", "x.score")
    .filter(F.col("score").isNotNull() & (F.col("score") != 0))
    .groupBy("category")
    .agg(F.avg("score").alias("avg"))
    .withColumn("source", F.lit("real"))
)
display(df_avg_real_long)

In [0]:
df_avg_pred_long = (
    hotels_summary_for_tool
    .filter(F.col("category").isin(ctg_cols))
    .filter(F.col("score").isNotNull() & (F.col("score") != 0))
    .groupBy("category")
    .agg(F.avg("score").alias("avg"))
    .withColumn("source", F.lit("pred"))
)
display(df_avg_pred_long)

In [0]:
df_compare = df_avg_real_long.unionByName(df_avg_pred_long)
display(df_compare)
pdf = df_compare.toPandas()

In [0]:
cats = sorted(pdf["category"].unique())
x_map = {c: i for i, c in enumerate(cats)}
pdf["x"] = pdf["category"].map(x_map)

plt.figure()

real = pdf[pdf.source == "real"]
pred = pdf[pdf.source == "pred"]

plt.scatter(real["x"], real["avg"], marker="o", s=80, label="avg real")
plt.scatter(pred["x"], pred["avg"], marker="^", s=80, label="avg ours")

# add score labels (2 digits after decimal) slightly above each point
for _, r in real.iterrows():
    plt.text(r["x"], r["avg"] + 0.03, f'{r["avg"]:.2f}', ha="center", va="bottom", fontsize=9)

for _, r in pred.iterrows():
    plt.text(r["x"], r["avg"] + 0.03, f'{r["avg"]:.2f}', ha="center", va="bottom", fontsize=9)

plt.xticks(range(len(cats)), cats)
plt.ylabel("average")
plt.title("Average Rating per Category: real vs ours")
plt.legend()
plt.ylim(7, 10)
plt.tight_layout()
plt.show()

In [0]:
cats = sorted(pdf["category"].unique())
x_map = {c: i for i, c in enumerate(cats)}
pdf["x"] = pdf["category"].map(x_map)

plt.figure()

real = pdf[pdf.source == "real"]
pred = pdf[pdf.source == "pred"]

plt.scatter(real["x"], real["avg"], marker="o", s=80, label="avg real")
plt.scatter(pred["x"], pred["avg"], marker="^", s=80, label="avg ours")

plt.xticks(range(len(cats)), cats)
plt.ylabel("average")
plt.title("Average Rating per Category: real vs ours")
plt.legend()
plt.ylim(7, 10)
plt.tight_layout()
plt.show()

**examples for predictions for each category:**

In [0]:
df_sample_test = df_test.sample(withReplacement=False, fraction=0.0003, seed=42).cache()
print("number of rows is", df_sample_test.count())

In [0]:
df_sample_test = create_categories_column(df_sample_test, "text_review")
sample_test_long = df_sample_test.select(
    "hotel_id",
    "text_review",
    F.explode("categories").alias("category")
)
sample_test_category_dfs = trim_review_to_category_relevant_text(sample_test_long)

for ctg, (model, sigma) in models.items():
    w_cnt = Window.partitionBy("hotel_id")

    df_sample_ctg = (
        sample_test_category_dfs[ctg]
        .select("hotel_id", "text_review")
    )

    preds_ctg = predict_with_regression_linear_model(model, df_sample_ctg, sigma).select("hotel_id", "text_review", "prediction")
    print("example for prediction for category ", ctg, ":")
    display(preds_ctg.limit(10))

**examples for category avg for tool for each category:**

In [0]:
w = Window.partitionBy("category").orderBy(F.rand())

category_summary_10 = (
    category_summary
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") <= 10)
    .drop("rn")
)

display(category_summary_10)

---------------------------------------------------