# step 5 - PySpark & Mllib for step 3

### Installation

Spark requires a java version of 8 or 11 to work

In [None]:
%%capture 
%conda install -c conda-forge openjdk=11 -y

config spark to work with our env java version(11)

In [1]:
import os 
java_home_path = os.popen('dirname $(dirname $(which java))').read().strip()
os.environ["JAVA_HOME"] = java_home_path
print(f"JAVA_HOME is set to: {os.environ['JAVA_HOME']}")


JAVA_HOME is set to: /opt/anaconda3/envs/DS-101-Final


In [None]:
%%capture 
%pip install pyspark plotly

### Create Spark session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, concat, first, array, udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import BisectingKMeans
from pyspark.sql.functions import min as spark_min

try:
    spark.stop()
except:
    pass

spark = SparkSession.builder.appName("HotelClustering").getOrCreate()
print("Create Spark session") 

25/03/12 12:59:07 WARN Utils: Your hostname, Yoavs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.122 instead (on interface en0)
25/03/12 12:59:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/12 12:59:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Create Spark session


### Data Loading and Filtering

- Load the CSV file.
- Select the top 150 hotels (by number of records).
- Select the top 40 checkin dates.

In [4]:
file_path = "../data/hotels_data_changed.csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

top150_hotels = df.groupBy("Hotel Name").agg(count("*").alias("cnt")) .orderBy(col("cnt").desc()).limit(150)


df_top150 = df.join(top150_hotels.select("Hotel Name"), on="Hotel Name", how="inner")


top40_dates = df_top150.groupBy("Checkin Date").agg(count("*").alias("cnt")).orderBy(col("cnt").desc()).limit(40)

df_top150_dates = df_top150.join(top40_dates.select("Checkin Date"), on="Checkin Date", how="inner")

df_top150_dates.head()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/yoavgal/code/DS-101-Final/data/hotels_data_changed.csv.

25/03/12 17:55:15 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1010861 ms exceeds timeout 120000 ms
25/03/12 17:55:15 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/12 17:55:17 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

### Convert to 160-dims vector

In [None]:
df_grouped = df_top150_dates.groupBy("Hotel Name", "Checkin Date", "Discount Code").agg(spark_min("Discount Price").alias("minDiscountPrice"))

df_grouped = df_grouped.withColumn("date_code", concat(col("Checkin Date"), lit("_"), col("Discount Code")))

df_pivot = df_grouped.groupBy("Hotel Name").pivot("date_code").agg(first("minDiscountPrice"))

df_pivot = df_pivot.fillna(-1)

df_pivot.head(3)

We might find ourself in a situation where some of the hotels don't have a column for all the 160 dates + discount codes.
So we would ensure thy all have that.

In [None]:
top40_list = [row["Checkin Date"] for row in top40_dates.collect()]
discount_codes = [1, 2, 3, 4]

# Build the expected column names (format: "YYYY-MM-DD_1", etc.)
expected_cols = [f"{date}_{code}" for date in top40_list for code in discount_codes]

# Add any missing expected columns with default -1
existing_cols = df_pivot.columns
for col_name in expected_cols:
    if col_name not in existing_cols:
        df_pivot = df_pivot.withColumn(col_name, lit(-1))

# Reorder the DataFrame columns so that they appear in the desired order:
df_pivot = df_pivot.select(["Hotel Name"] + expected_cols)
df_pivot.head(3)

### Normalization and Save to CSV

Normalize the 160 price columns row-by-row (scaling valid prices to a 0–100 range, leaving missing values as -1).

In [None]:
# Combine the 160 price columns into an array column
df_pivot = df_pivot.withColumn("prices_array", array(*expected_cols))

# Define a UDF for normalizing the prices for each hotel (ignoring -1 values)
def normalize_prices(prices):
    # Filter out missing values (-1)
    valid_prices = [p for p in prices if p != -1]
    if not valid_prices:
        return prices
    min_price = min(valid_prices)
    max_price = max(valid_prices)
    if min_price == max_price:
        return [0 if p != -1 else -1 for p in prices]
    normalized = []
    for p in prices:
        if p == -1:
            normalized.append(-1)
        else:
            norm_val = round(((p - min_price) / (max_price - min_price)) * 100)
            normalized.append(int(norm_val))
    return normalized

normalize_udf = udf(normalize_prices, ArrayType(IntegerType()))

# Apply the normalization UDF to create a new column with normalized prices.
df_pivot = df_pivot.withColumn("norm_prices_array", normalize_udf("prices_array"))

# Replace the original price columns with the normalized values.
for i, col_name in enumerate(expected_cols):
    df_pivot = df_pivot.withColumn(col_name, col("norm_prices_array")[i])

# Optionally, drop helper columns.
df_final = df_pivot.drop("prices_array", "norm_prices_array")


**Save to CSV**

In [None]:
pyspark_hotels_clustering_data= "../data/pyspark_hotels_clustering_data.csv"
df_final.write.option("header", "true").mode("overwrite").csv(pyspark_hotels_clustering_data)


### Clustering

#### Read CSV and assemble features

In [None]:
from pyspark.ml.feature import VectorAssembler

# Define the path to the CSV folder that was saved in Step 5.
pyspark_hotels_clustering_data = "../data/pyspark_hotels_clustering_data.csv"

# Read the CSV data (Spark will read all part files in the folder)
df_loaded = spark.read.option("header", "true").option("inferSchema", "true").csv(pyspark_hotels_clustering_data)

# Identify the feature columns (all columns except "Hotel Name")
feature_cols = [col for col in df_loaded.columns if col != "Hotel Name"]

# Assemble the 160 normalized price columns into a single feature vector.

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_features = assembler.transform(df_loaded)


#### Clustering with MLlib (BisectingKMeans)

In [None]:
from pyspark.ml.clustering import BisectingKMeans

# Set the number of clusters (adjust k as needed)
bkmeans = BisectingKMeans(featuresCol="features", predictionCol="cluster", k=4)

# Train the model
model = bkmeans.fit(df_features)

# Add the cluster assignments to the DataFrame
df_clustered = model.transform(df_features)

# Show the hotel names along with their cluster assignments
df_clustered.select("Hotel Name", "cluster").show(truncate=False)


#### Visualization with PCA and Plotly

Since the features are 160-dimensional, we use PCA to reduce them to 2 dimensions for visualization. Then, we convert the Spark DataFrame to a Pandas DataFrame and use Plotly Express to create a scatter plot.

In [None]:
from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(df_clustered)
df_pca = pca_model.transform(df_clustered)

pandas_df = df_pca.select("Hotel Name", "cluster", "pcaFeatures").toPandas()

# Split the PCA features into two separate columns for plotting
pandas_df["pca1"] = pandas_df["pcaFeatures"].apply(lambda x: x[0])
pandas_df["pca2"] = pandas_df["pcaFeatures"].apply(lambda x: x[1])


In [None]:
import plotly.express as px

fig = px.scatter(
    pandas_df,
    x="pca1",
    y="pca2",
    color="cluster",
    hover_data=["Hotel Name"],
    title="Hotel Clusters Visualization (PCA Reduced)"
)
fig.show()


#### Stop the spark session

In [None]:
spark.stop()