In [None]:
# Load u.data
ratings = spark.read.option("delimiter", "\t").option("header", False).csv("/FileStore/tables/u.data")
ratings = ratings.withColumnRenamed("_c0", "user_id")\
.withColumnRenamed("_c1", "movie_id")\
.withColumnRenamed("_c2", "rating")\
.withColumnRenamed("_c3", "timestamp")

In [None]:
# Load u.item (encoding issue fix: ISO-8859-1)
movies = spark.read.option("delimiter", "|").option("header", False)\
.csv("/FileStore/tables/u.item", encoding="ISO-8859-1")
movies = movies.selectExpr("_c0 as movie_id", "_c1 as title")

In [None]:
# Show sample data
ratings.show(5)
movies.show(5)
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
# Convert types
ratings = ratings.select(
col("user_id").cast("int"),
col("movie_id").cast("int"),
col("rating").cast("float")
)

In [None]:
# Split into training and test sets
(training, test) = ratings.randomSplit([0.8, 0.2], seed=42)
als = ALS(
userCol="user_id",
itemCol="movie_id",
ratingCol="rating",
nonnegative=True,
coldStartStrategy="drop",  # handles NaN predictions
implicitPrefs=False,
rank=10,
maxIter=10,
regParam=0.1
)

In [None]:
model = als.fit(training)

In [None]:
predictions = model.transform(test)

In [None]:
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)

In [None]:
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")
# Top 5 movie recommendations for all users
user_recs = model.recommendForAllUsers(5)

In [None]:
# Show recommendations for a specific user (e.g., user_id = 100)
user_recs.filter(user_recs.user_id == 100).show(truncate=False)
# Load u.item (movie metadata)
movies_raw = spark.read.text("/FileStore/tables/u.item")

In [None]:
# u.item is pipe-delimited, and contains:
# movie_id | title | release_date | video_release | IMDb URL | genres (19 binary columns)

In [None]:
movies = spark.read.option("delimiter", "|").csv(
"/FileStore/tables/u.item",
inferSchema=True
).toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_URL",
"unknown", "Action", "Adventure", "Animation", "Children", "Comedy",
"Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror",
"Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")

In [None]:
from pyspark.sql.functions import array, concat_ws

In [None]:
# Combine genre columns
genre_columns = ["Action", "Adventure", "Animation", "Children", "Comedy",
"Crime", "Documentary", "Drama", "Fantasy", "Film-Noir",
"Horror", "Musical", "Mystery", "Romance", "Sci-Fi",
"Thriller", "War", "Western"]

In [None]:
movies = movies.withColumn("genres",
concat_ws(" ",
*[col(c).cast("string") for c in genre_columns if c in movies.columns]
)
)

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

In [None]:
tokenizer = Tokenizer(inputCol="genres", outputCol="words")
words_data = tokenizer.transform(movies)

In [None]:
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=100)
featurized_data = hashingTF.transform(words_data)

In [None]:
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

In [None]:
import os

In [None]:
# Create the directory if it doesn't exist
os.makedirs("/dbfs/FileStore/tables", exist_ok=True)

In [None]:
# Now save the CSV
# Save to a community-accessible path
final_recs.toPandas().to_csv("/dbfs/FileStore/tables/als_recommendations.csv", index=False)

In [None]:
# Save ALS recommendations
user_recommendations = model.recommendForAllUsers(10)

In [None]:
# Explode to flat structure
from pyspark.sql.functions import explode
flat_recs = user_recommendations.select("user_id", explode("recommendations").alias("rec"))
flat_recs = flat_recs.select("user_id", col("rec.movie_id"), col("rec.rating"))

In [None]:
# Join with movie titles
final_recs = flat_recs.join(movies.select("movie_id", "title"), on="movie_id", how="left")

In [None]:
# Export as CSV
# Save to a community-accessible path
final_recs.toPandas().to_csv("/dbfs/FileStore/tables/als_recommendations.csv", index=False)

In [None]:
# Save ALS recommendations as CSV using Spark (not Pandas)
final_recs.write \
.mode("overwrite") \
.option("header", "true") \
.csv("dbfs:/FileStore/tables/als_recommendations")

In [None]:
# Load the partitioned files
merged_df = spark.read.csv("dbfs:/FileStore/tables/als_recommendations/", header=True, inferSchema=True)

In [None]:
# Save as a single CSV
merged_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("dbfs:/FileStore/als_recommendations_merged")

In [None]:
display(dbutils.fs.ls("dbfs:/FileStore/als_recommendations_merged"))
# Generate a downloadable link
displayHTML(f"<a href='/files/als_recommendations_merged/part-00000-tid-6547586960507395759-25303109-f205-42cd-923f-0db5e5132b54-1765-1-c000.csv'>Download CSV</a>")