# Setup

### Installazione e import

In [None]:
from dotenv import load_dotenv
import os
import shutil
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col,
    last,
    lit,
    split,
    regexp_extract,
    monotonically_increasing_id,
    count,
    when,
    concat_ws,
    min as spark_min,
    datediff,
    to_date,
    avg,
)
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from zipfile import ZipFile

Se la variabile JAVA_HOME non è impostata globalmente, scommentare la cella sotto per popolarla, dopo aver inserito il percorso alla JDK di propria scelta nel file [.env](./.env).

In [None]:
#load_dotenv()
#!echo $JAVA_HOME

In [None]:
!pyspark --version
!which python
!python --version

In [None]:
class Costanti:
    BASE_PATH = "./resources/dataset/archive"
    SALVATAGGI_PATH = "./resources/salvataggi"
    USA_SALVATAGGI = True
    DF_RATING_OTTIMIZZATO = "df_ratings_ottimizzato.parquet"
    DF_COMPLETO_BIAS = "Tutti_con_bias"
    DF_CAMPIONE_BIAS = "Campione_con_bias"
    DF_COMPLETO = "Tutti"
    DF_CAMPIONE = "Campione"

### Caricamento dati

In [None]:
try:
    spark.stop()
except: pass
try:
    sc.stop()
except: pass

# Memoria impostata a 6 GB + 6 GB per rientrare nei 12.7 GB di Colab
# Può essere personalizzata in base alle esigenze del proprio ambiente di esecuzione
spark = SparkSession.builder \
    .appName("Netflix Recommendation System") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "1000") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
    .getOrCreate()

schema = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Date", StringType(), True)
])

In [None]:
def persist_df(df, filename, path):
    """
    Salva il DataFrame pyspark in formato parquet

    Parametri:
        df (DataFrame): il DataFrame da salvare
        filename (str): il nome del file
        path (str): il percorso del dataframe da salvare

    Ritorna:
        None
    """
    df.write.mode("overwrite").parquet(f"{path}/{filename}")

def load_df(filename):
    """
    Carica il DataFrame pyspark da un file in formato parquet

    Parametri:
        filename (str): il nome del file

    Ritorna:
        il DataFrame pyspark in formato parquet
    """
    return spark.read.parquet(f"{Costanti.SALVATAGGI_PATH}/{filename}")

def checkFile(path, filename):
    """
    Verifica se il file esitse

    Parametri:
        path (str): il percorso del file
        filename (str): il nome del file

    Ritorna:
        True se il file esiste, False altrimenti
    """
    return os.path.exists(os.path.join(path, filename))


In [None]:
if (not checkFile(Costanti.BASE_PATH, '*')):
    with ZipFile("./resources/dataset/archive.zip", mode="r") as archive:
        archive.extractall("./resources/dataset/archive")

### Utilizzo dei file parquet

In [None]:
percorso_salvataggio_parquet = os.path.join(Costanti.SALVATAGGI_PATH, Costanti.DF_RATING_OTTIMIZZATO)

if Costanti.USA_SALVATAGGI and checkFile(Costanti.SALVATAGGI_PATH, Costanti.DF_RATING_OTTIMIZZATO):
    print(f" Caricamento dal file Parquet esistente: {percorso_salvataggio_parquet}")
    df_ratings = spark.read.parquet(percorso_salvataggio_parquet)
else:
    print("File Parquet non trovato. Salvataggio del file parquet")
    file_paths = [os.path.join(Costanti.BASE_PATH, f"combined_data_{i}.txt") for i in range(1, 5)]
    df_raw = spark.read.text(file_paths)

    df_ordered = df_raw.withColumn("order_id", monotonically_increasing_id())

    # Uso delle regex per trovare l'id del film
    df_with_id = df_ordered.withColumn(
        "MovieID_temp",
        regexp_extract(col("value"), r"(\d+):", 1).cast("integer")
    )

    window_spec = Window.orderBy("order_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df_filled = df_with_id.withColumn(
        "MovieID",
        last(col("MovieID_temp"), ignorenulls=True).over(window_spec)
    )

    # Filtra le righe che non contengono ":" e crea colonne strutturate splittando sulla ","
    df_parsed = df_filled.filter(~col("value").contains(":")) \
        .withColumn("data", split(col("value"), ",")) \
        .select(
            col("MovieID"),
            col("data").getItem(0).cast("integer").alias("CustomerID"),
            col("data").getItem(1).cast("integer").alias("Rating"),
            col("data").getItem(2).cast("string").alias("Date")
        )
    df_ratings = df_parsed.cache()

    print(f" Trasformazione completata in formato parquet. Righe totali: {df_ratings.count()}")

    print(f" Salvataggio del DataFrame in formato Parquet su: {percorso_salvataggio_parquet}")
    # vedere se da cambiare
    df_ratings.write.mode("overwrite").parquet(percorso_salvataggio_parquet)

df_ratings.show()

In [None]:
# Nuovo percorso di salvataggio per il test set completo
percorso_probe_parquet = os.path.join(Costanti.SALVATAGGI_PATH, "df_probe_test_set.parquet")
percorso_probe_parquet_keys = os.path.join(Costanti.SALVATAGGI_PATH, "df_probe_keys.parquet")

# Controlliamo se il test set completo esiste già
if Costanti.USA_SALVATAGGI and (os.path.exists(percorso_probe_parquet) and os.path.exists(percorso_probe_parquet_keys)):
    print(f"Caricamento del test set completo dal file Parquet: {percorso_probe_parquet}")
    df_probe_test_set = spark.read.parquet(percorso_probe_parquet)
    df_probe_keys = spark.read.parquet(percorso_probe_parquet_keys)
else:
    print("File Parquet del test set non trovato. Lo ricostruiamo...")
    print("FASE A: Estrazione delle chiavi (MovieID, CustomerID) da probe.txt...")

    file_path = os.path.join(Costanti.BASE_PATH, "probe.txt")
    df_raw = spark.read.text(file_path)

    df_ordered = df_raw.withColumn("order_id", monotonically_increasing_id())

    df_with_id = df_ordered.withColumn(
        "MovieID_temp",
        regexp_extract(col("value"), r"(\d+):", 1).cast("integer")
    )

    window_spec = Window.orderBy("order_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df_filled = df_with_id.withColumn("MovieID", last(col("MovieID_temp"), ignorenulls=True).over(window_spec))

    df_customers = df_filled.filter(~col("value").endswith(":"))

    # Questo DataFrame di probe contiene solo le chiavi, senza i ratings
    df_probe_keys = df_customers.withColumn("CustomerID", col("value").cast("integer")) \
                                .select("MovieID", "CustomerID")

    print("Chiavi del probe set estratte con successo :)")

    df_probe_test_set = df_ratings.join(
        df_probe_keys,
        on=["CustomerID", "MovieID"],
        how="inner"
    )

    df_probe_test_set.write.mode("overwrite").parquet(percorso_probe_parquet)
    df_probe_keys.write.mode("overwrite").parquet(percorso_probe_parquet_keys)

In [None]:
df_ratings = df_ratings.join(
  df_probe_keys,
  on=["CustomerID", "MovieID"],
  how="left_anti"
)

print("\nAnteprima del test set finale (df_probe_test_set):")
df_probe_test_set.show(5)

In [None]:
# shutil.rmtree("./salvataggi/df_ratings", ignore_errors=True)

In [None]:
#BASE_PATH = "/content/dataset/archive"

schema_movies = StructType([
    StructField("MovieId", IntegerType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Name", StringType(), True)
])

df_movies = (
    spark.read.format("csv")
        .option("header", "false")
        .schema(schema_movies)
        .load(f"{Costanti.BASE_PATH}/movie_titles.csv")
)
df_movies = df_movies.withColumnRenamed('MovieId', 'MovieID')

df_movies.show()

print("Ci sono", df_movies.count(), "film")

In [None]:
# if not os.path.exists("./salvataggi/df_ratings_ottimizzato.parquet") or not os.path.exists("./salvataggi/df_ratings"):
#     persist_df(df_ratings, "df_ratings", "./salvataggi")

# Addestramento sistema di raccomandazione (Alternating Least Squares)

### Utilizzo dei file parquet

In [None]:
spark = SparkSession.builder.appName("StratifiedSampling").getOrCreate()

In [None]:
df_per_campionamento = df_ratings
user_counts = df_per_campionamento.groupBy('CustomerID').agg(count('*').alias('user_rating_count'))
movie_counts = df_per_campionamento.groupBy('MovieID').agg(count('*').alias('movie_rating_count'))

df_per_campionamento = df_per_campionamento.join(user_counts, 'CustomerID').join(movie_counts, 'MovieID')

In [None]:
# Definizione degli strati
df_per_campionamento = df_per_campionamento.withColumn('strato_utente',
                       when(col('user_rating_count') <= 10, 'utente_poco_attivo')
                       .when(col('user_rating_count') <= 30, 'utente_medio_attivo')
                       .otherwise('utente_molto_attivo'))

df_per_campionamento = df_per_campionamento.withColumn('strato_film',
                       when(col('movie_rating_count') <= 20, 'film_nicchia')
                       .when(col('movie_rating_count') <= 60, 'film_popolare')
                       .otherwise('film_molto_popolare'))

# Unione degli strati
df_per_campionamento = df_per_campionamento.withColumn('strato_combinato', concat_ws('_', col('strato_utente'), col('strato_film')))

In [None]:
strati = df_per_campionamento.select('strato_combinato').distinct().collect()
fractions = {row['strato_combinato']: 0.05 for row in strati} # Esempio: 5% da ogni strato

In [None]:
df_campionato = df_per_campionamento.stat.sampleBy('strato_combinato', fractions=fractions, seed=42)

print(f"Dimensione del DataFrame originale 'df_ratings': {df_ratings.count()}")
print(f"Dimensione del campione stratificato 'df_campionato': {df_campionato.count()}")

print("Distribuzione del campione stratificato:")
df_campionato.groupBy('strato_combinato').count().show()

### Definizione del modello con ALS

In [None]:
als_on_residuals = ALS(
        maxIter=10, regParam=0.1,
        userCol="CustomerID", itemCol="MovieID", ratingCol="residual",
        rank=10, coldStartStrategy="drop", nonnegative=True
)

In [None]:
als_on_ratings = ALS(
        maxIter=10, regParam=0.1,
        userCol="CustomerID", itemCol="MovieID", ratingCol="Rating",
        rank=10, coldStartStrategy="drop", nonnegative=True
)

In [None]:
def calcola_bias(df_input: DataFrame):
    """
    Calcola la media globale (μ), il bias per item (b_i) e il bias per utente (b_x)
    da un DataFrame di rating in input.

    Parametri:
    df_input (Dataframe): DataFrame che deve contenere le colonne 'Rating', 'MovieID', 'CustomerID'.

    Ritorna:
    Una tupla contenente (mu, item_biases_df, user_biases_df).
    """

    print("Calcolo della media globale (μ)")
    mu = df_input.agg(avg("Rating")).collect()[0][0]

    print("Calcolo del bias per item (b_i)")
    item_biases_df = df_input.groupBy("MovieID").agg((avg(col("Rating")) - mu).alias("b_i"))

    df_with_item_bias = df_input.join(item_biases_df, "MovieID", "left")
    print("Calcolo del bias per utente (b_x)...")
    user_biases_df = df_with_item_bias.groupBy("CustomerID").agg(
        (avg(col("Rating") - lit(mu) - col("b_i"))).alias("b_x")
    )

    print(f"Calcolo dei bias completato. Valore della media: {mu:.4f}")

    return mu, item_biases_df, user_biases_df

### Produzione grafici di BellKor

In [None]:
(mu, item_biases_df, user_biases_df) = calcola_bias(df_per_campionamento)

In [None]:
def fit_model_with_bias(df_to_use, nome):
    """
    Esegue la fit di un modello ALS con termini di bias utilizzando la tecnica BellKor

    Parametri:
        df_to_us (DataFrame): DataFrame di training con le colonne CustomerID, MovieID, Rating
        nome (str): Nome del modello da usare

    Ritorna:
        DataFrame: Dati di training con i residuals

    """
    df_to_use = df_to_use.dropna(subset=["Rating"])
    save_path = f"{Costanti.SALVATAGGI_PATH}/modelli_als/{nome}_con_bias"

    df_with_biases = df_to_use \
        .join(item_biases_df, "MovieID", "left") \
        .join(user_biases_df, "CustomerID", "left")

    # Calcoliamo il residuo e gestiamo i null

    df_for_als_training = df_with_biases.withColumn(
        "residual",
        col("Rating") - lit(mu) - col("b_i") - col("b_x")
    ).fillna(0, subset=["residual", "b_i", "b_x"])

    if not os.path.exists(save_path):
        print(f"[{nome}] Addestramento")
        model = als_on_residuals.fit(df_for_als_training)
        model.write().overwrite().save(save_path)
    else:
        print(f"[{nome}] Caricamento del modello da {save_path}")
        model = ALSModel.load(save_path)

    print(f"[{nome}] Calcolo delle predizioni e RMSE")
    predictions_residual = model.transform(df_probe_test_set)

    predictions_with_biases = predictions_residual \
        .join(item_biases_df, "MovieID", "left") \
        .join(user_biases_df, "CustomerID", "left")

    final_predictions = predictions_with_biases.withColumn(
        "final_prediction",
        col("prediction") + lit(mu) + col("b_i") + col("b_x")
    )

    evaluator = RegressionEvaluator(
        metricName="rmse", labelCol="Rating", predictionCol="final_prediction"
    )
    rmse = evaluator.evaluate(final_predictions)
    print(f"RMSE ({nome}): {rmse:.4f}\n")
    return df_for_als_training

In [None]:
def fit_model_no_bias(df_to_use, nome):
    """
    Addestra un modello ALS standard sui rating originali.

    Parametri:
    df_to_use (DataFrame):  DataFrame di training con le colonne CustomerID, MovieID, Rating

    nome (str): Nome del modello da utilizzare

    Ritorna:
    DataFrame: Dataset di training utilizzato per l'addestramento
    """
    df_to_use = df_to_use.dropna(subset=["Rating"])
    save_path = f"{Costanti.SALVATAGGI_PATH}/modelli_als/{nome}"

    # Calcoliamo il residuo e gestiamo i null

    if not os.path.exists(save_path):
        print(f"Addestramento del modello per '{nome}'...")
        model = als_on_ratings.fit(df_to_use)
        model.write().overwrite().save(save_path)
    else:
        print(f"Caricamento del modello per '{nome}' da {save_path}...")
        model = ALSModel.load(save_path)

    predictions = model.transform(df_probe_test_set)
    evaluator = RegressionEvaluator(
        metricName="rmse", labelCol="Rating", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)
    print(f"RMSE ({nome}): {rmse:.4f}")

In [None]:
def grafici_bellKor(df_to_use, nome):
    """
    Questa funzione applica i bias pre-calcolati, addestra un modello ALS sui residui,
    calcola l'RMSE e genera i grafici di BellKor per il DataFrame fornito.

    Parametri:
    df_to_use (DataFrame): il dataFrame del quale vogliamo generare i grafici
    nome (str): il nome del dataFrame del quale vogliamo generare i garfici

    Ritorna:
        None
    """

    print(f"[{nome}] Generazione dei grafici di BellKor...")

    df_movies_cleaned = df_movies \
        .withColumn("Year", col("Year").cast("integer")) \
        .withColumn("MovieID", col("MovieID").cast("integer")) \
        .withColumn("ReleaseDate", to_date(concat_ws("-", col("Year").cast("string"), lit("01"), lit("01"))))

    df_to_use = df_to_use.withColumn('Date', to_date(col('Date')))

    # GRAFICO 1: Rating by date
    w = Window.partitionBy()
    df_with_days = df_to_use.withColumn('days_from_start', datediff(col('Date'), spark_min('Date').over(w)))
    grouped_by_day_pd = df_with_days.groupBy('days_from_start').agg(avg('Rating').alias('mean_rating')).orderBy('days_from_start').toPandas()

    plt.figure(figsize=(8,6))
    plt.scatter(grouped_by_day_pd['days_from_start'], grouped_by_day_pd['mean_rating'], color='red', alpha=0.7, s=8)
    plt.title(f'Rating by date ({nome})')
    plt.xlabel('time (days)')
    plt.ylabel('mean score')
    plt.ylim(3.2, 3.9)
    plt.grid(True)
    plt.show()

    # GRAFICO 2: Rating by movie age
    df_joined = df_to_use.join(df_movies_cleaned.select('MovieID', 'ReleaseDate'), on='MovieID', how='inner')
    df_valid = df_joined.withColumn('movie_age_days', datediff(col('Date'), col('ReleaseDate'))).filter(col('movie_age_days').isNotNull() & (col('movie_age_days') >= 0))
    grouped_by_age_pd = df_valid.groupBy('movie_age_days').agg(avg('Rating').alias('mean_rating')).orderBy('movie_age_days').toPandas()

    plt.figure(figsize=(8,6))
    plt.scatter(grouped_by_age_pd['movie_age_days'], grouped_by_age_pd['mean_rating'], color='red', alpha=0.7, s=8)
    plt.title(f'Rating by movie age ({nome})')
    plt.xlabel('movie age (days)')
    plt.ylabel('mean score')
    plt.ylim(3.2, 3.9)
    plt.grid(True)
    plt.show()

    print(f"--- Fine analisi per '{nome}' ---")
    return df_to_use

### Grafici di BellKor


In [None]:
grafici_bellKor(df_per_campionamento, 'Tutti')

In [None]:
grafici_bellKor(df_campionato, 'Campione')

### Addestramento con bias

Dataset completo

In [None]:
df_biased = fit_model_with_bias(df_per_campionamento, 'Tutti')

Dataset campionato

In [None]:
df_biased = fit_model_with_bias(df_campionato, 'Campione')

### Addestramento senza bias

Dataset completo

In [None]:
fit_model_no_bias(df_per_campionamento, 'Tutti')

Dataset campionato

In [None]:
fit_model_no_bias(df_campionato, 'Campione')

# Raccomandazione

In [None]:
while(True):
  try:
    scelta = int(input("""Su quale DataFrame effettuare la raccomandazione?
                  1. Completo con bias
                  2. Campione con bias
                  3. Completo senza bias
                  4. Campione senza bias
                  > 
                  """))
  except ValueError:
    print("Inserire un valore numerico.")
  
  if scelta == 1:
    dataframe = Costanti.DF_COMPLETO_BIAS
    break
  elif scelta == 2:
    dataframe = Costanti.DF_CAMPIONE_BIAS
    break
  elif scelta == 3:
    dataframe = Costanti.DF_COMPLETO
    break
  elif scelta == 4:
    dataframe = Costanti.DF_CAMPIONE
    break
  else:
    print("Effettuare una scelta valida.")

save_path = f"{Costanti.SALVATAGGI_PATH}/modelli_als/{dataframe}"

print(f"Tentativo di caricare il modello dal percorso: '{save_path}'")

if os.path.exists(save_path):
  print("Percorso trovato! Caricamento del modello...")
  model = ALSModel.load(save_path)

  print("Modello caricato. Calcolo delle raccomandazioni...")
  model.recommendForAllUsers(10).show()
else:
  raise Exception(f"Modello non trovato nel percorso: {save_path}")