In [1]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [2]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("numpy")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1747094742883_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Downloading pandas-2.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
Collecting tzdata>=2022.7
  Downloading tzdata-2025.2-py2.py3-none-any.whl (347 kB)
Collecting numpy>=1.22.4
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Collecting python-dateutil>=2.8.2
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Installing collected packages: tzdata, python-dateutil, numpy, pandas
  Attempting uninstall: python-dateutil
    Found existing installation: python-dateutil 2.8.1
    Not uninstalling python-dateutil at /usr/lib/python3.9/site-packages, outside environment /mnt1/yarn/usercache/livy/appcache/application_1747094742883_0003/container_1747094742883_0003_01_000001/tmp/spark-95a6ea88-d45e-49e0-8c9d-cf8d4dbd1adb
    Can't uninstall 'python-dateutil'. No files were found to uninstall.
Successfully installed numpy-2.0.2 pandas-2.2.3 python-dateutil-2.9.0.post0 tzdata-2025.2


ERRO

In [3]:
import pandas as pd
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession.builder.appName("Movies").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import time

# 1) Measure load time for movies
start = time.perf_counter()
movies_filtered = (
    spark.read
         .option("header",    "true")
         .option("inferSchema","true")
         .csv("s3://recomendandoando/processed/movies_filtered/")
)
end = time.perf_counter()
print(f" Movies_filtered loaded in {end - start:.2f} s")

# 2) Measure load time for ratings
start = time.perf_counter()
ratings_filtered = (
    spark.read
         .option("header",    "true")
         .option("inferSchema","true")
         .csv("s3://recomendandoando/processed/ratings_filtered/")
)
end = time.perf_counter()
print(f" Ratings_filtered loaded in {end - start:.2f} s")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 Movies_filtered loaded in 8.04 s
 Ratings_filtered loaded in 25.55 s

### Select train test sets (77% in the training set, 23% in the test set)

In [6]:
total_users = ratings_filtered.select("userId").distinct().count()
total_movies = ratings_filtered.select("movieId").distinct().count()
percent_users_to_mask = 0.8
percent_movies_to_mask = 0.8

user_cutoff = int(total_users * (1 - percent_users_to_mask))
movie_cutoff = int(total_movies * (1 - percent_movies_to_mask))
print(user_cutoff, total_users)
print(movie_cutoff, total_movies)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40189 200948
16808 84041

In [7]:
train_data = ratings_filtered.filter(~((col("userId") > user_cutoff) & (col("movieId") > movie_cutoff)))
test_data = ratings_filtered.filter((col("userId") > user_cutoff) & (col("movieId") > movie_cutoff))
print(train_data.count())
print(test_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

24579265
7322529

## Importar y configurar ALS

In [8]:
from pyspark.ml.recommendation import ALS

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# set a checkpoint directory so checkpoint() will actually write to disk
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

# cache & materialize train and test sets
train_data = train_data.cache()
test_data  = test_data.cache()

# optional: checkpoint test_data to fully truncate its lineage
test_data = test_data.checkpoint()

# force the cache/checkpoint to happen now
_ = train_data.count()
_ = test_data.count()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pow, avg, sqrt

# Evaluator for RMSE
def rmse_from_df(df, label="rating", pred="prediction"):
    mse = df.select(
        avg(pow(col(label) - col(pred), 2)).alias("mse")
    ).collect()[0]["mse"]
    return float(mse**0.5)

# Try different rank values
ranks = [10, 20, 30]
results = []

start_total = time.perf_counter()

for rank in ranks:
    als = ALS(
        rank=rank,
        maxIter=20,
        regParam=0.1,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop",  # drop NaNs from cold-start users/items
        nonnegative=True
    )

    model = als.fit(train_data)
    predictions = model.transform(test_data)
    rmse = rmse_from_df(predictions)
    results.append((rank, rmse))
    print(f"Rank = {rank}, RMSE = {rmse:.4f}")
    
total_elapsed = time.perf_counter() - start_total

best_rank, best_rmse = results[0]  # Initialize with the first element

for rank, rmse in results[1:]:
    if rmse < best_rmse:
        best_rank, best_rmse = rank, rmse

print(f"Best rank: {best_rank} with RMSE = {best_rmse:.4f}")
print(f"Total time for all ranks: {total_elapsed:.2f} s")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rank = 10, RMSE = 0.8430
Rank = 20, RMSE = 0.8425
Rank = 30, RMSE = 0.8427
Best rank: 20 with RMSE = 0.8425
Total time for all ranks: 809.98 s

In [11]:
# Run best model
start_total = time.perf_counter()

als = ALS(
    rank=20,
    maxIter=20,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",  # drop NaNs from cold-start users/items
    nonnegative=True
)

best_model = als.fit(train_data)
best_predictions = best_model.transform(test_data)
best_rmse = rmse_from_df(best_predictions)
total_elapsed = time.perf_counter() - start_total
print(f"Best model RMSE: {best_rmse}")
print(f"Model execution time: {total_elapsed:.2f} s")
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best model RMSE: 0.8424792920851977
Model execution time: 254.93 s

In [12]:
# Save Model
save_path = "s3://recomendandoando/models/als_rank20"
best_model.write().overwrite().save(save_path)
print(f"✅ Model saved to {save_path}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? Model saved to s3://recomendandoando/models/als_rank20

In [13]:
# This is how it could be loaded later:
# from pyspark.ml.recommendation import ALSModel
# loaded_model = ALSModel.load("s3://recomendandoando/models/als_rank20")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…