In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Exam Analysis") \
    .getOrCreate()

# Step 1: Load data and add year column
df = spark.read.csv("exam.csv", header=True)
df = df.withColumn("timestamp", F.to_timestamp("timestamp"))  # Convert timestamp column to datetime
df = df.withColumn("year", year(col("timestamp")))  # Extract year from timestamp and append to dataframe

# Step 2: Perform ALS and calculate MSE
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")
train, test = df.randomSplit([0.8, 0.2], seed=123)
model = als.fit(train)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")
mse_before = evaluator.evaluate(predictions)
print("MSE before entity resolution:", mse_before)

# Step 3: Entity Resolution
# Assume df1 and df2 are two dataframes representing the same entities but with slightly different values
# For demonstration purposes, we'll just use the same dataframe here
df1 = df.alias("df1")
df2 = df.alias("df2")

# Example entity resolution logic (replace with your own logic)
resolved_df = df1.join(df2, df1.user_id == df2.user_id).select(
    df1.user_id,
    F.coalesce(df1.rating, df2.rating).alias("rating")
)

# Perform ALS again on resolved data and calculate MSE
train_resolved, test_resolved = resolved_df.randomSplit([0.8, 0.2], seed=123)
model_resolved = als.fit(train_resolved)
predictions_resolved = model_resolved.transform(test_resolved)
mse_after = evaluator.evaluate(predictions_resolved)
print("MSE after entity resolution:", mse_after)

# Compare MSE
print("MSE improvement after entity resolution:", mse_before - mse_after)

# Stop SparkSession
spark.stop()
