In [2]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.recommendation import ALS
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import time, os, socket, platform, psutil, random

In [4]:
spark = SparkSession.builder \
    .appName("Recommendation System with Checkpoint") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "8") \
    .config("spark.driver.memory", "8g") \
    .config("spark.master", "local[6]") \
    .getOrCreate()


In [7]:
path = os.getcwd()

MOVIES = "path_to_dataset"
RATINGS = "path_to_dataset"

checkpoint_dir = f"{path}\\sparkcheckpoints"
save_dir = f"{path}\\models"              
result_dir = f"{path}\\results"            

if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
spark.sparkContext.setCheckpointDir(checkpoint_dir)

if not os.path.exists(save_dir):
    os.makedirs(save_dir)
spark.sparkContext.setCheckpointDir(save_dir)

if not os.path.exists(result_dir):
    os.makedirs(result_dir)
spark.sparkContext.setCheckpointDir(result_dir)

In [8]:
MOVIE_DF = spark.read.csv(MOVIES, header=True, inferSchema=True)
RATINGS_DF = spark.read.option("header", True) \
    .option("delimiter", ",") \
    .option("inferSchema", True) \
    .csv(RATINGS)
RATINGS_DF = RATINGS_DF.dropna()

In [9]:
RATINGS_DF = RATINGS_DF.select(
    col("userId").cast(IntegerType()).alias("userId"),
    col("movieId").cast(IntegerType()).alias("movieId"),
    col("rating").cast(FloatType()).alias("rating"),
    col("timestamp")
)
train_data, test_data = RATINGS_DF.randomSplit([0.7, 0.3], seed = 5003)
print("Number of rows in train_data:", train_data.count())
print("Number of rows in test_data:", test_data.count())


MOVIE_DF.show()
RATINGS_DF.show()


Number of rows in train_data: 16834500
Number of rows in test_data: 7219264
+---+----+--------------------+
|  1|2003|     Dinosaur Planet|
+---+----+--------------------+
|  2|2004|Isle of Man TT 20...|
|  3|1997|           Character|
|  4|1994|Paula Abdul's Get...|
|  5|2004|The Rise and Fall...|
|  6|1997|                Sick|
|  7|1992|               8 Man|
|  8|2004|What the #$*! Do ...|
|  9|1991|Class of Nuke 'Em...|
| 10|2001|             Fighter|
| 11|1999|Full Frame: Docum...|
| 12|1947|My Favorite Brunette|
| 13|2003|Lord of the Rings...|
| 14|1982|  Nature: Antarctica|
| 15|1988|Neil Diamond: Gre...|
| 16|1996|           Screamers|
| 17|2005|           7 Seconds|
| 18|1994|    Immortal Beloved|
| 19|2000|By Dawn's Early L...|
| 20|1972|     Seeta Aur Geeta|
| 21|2002|   Strange Relations|
+---+----+--------------------+
only showing top 20 rows

+-------+-------+------+----------+
| userId|movieId|rating| timestamp|
+-------+-------+------+----------+
|1488844|      1|   3.

In [7]:
def train_als_model(train_data, train_rank, train_max_iter, train_reg_param, test_data):        
    modelcpoint_dir = os.path.join(checkpoint_dir, f"rank{train_rank}_iter{train_max_iter}_reg{train_reg_param}")
    if not os.path.exists(modelcpoint_dir):
        os.makedirs(modelcpoint_dir)

    spark.sparkContext.setCheckpointDir(modelcpoint_dir)

    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        seed=5003,
        nonnegative=True,
        implicitPrefs=False,
        coldStartStrategy="drop",
        checkpointInterval=5
    )

    evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

    start_time = time.time()

    model = als.setRank(train_rank).setMaxIter(train_max_iter).setRegParam(train_reg_param).fit(train_data)

    end_time = time.time()

    predictions = model.transform(test_data)
 
    rmse = evaluator_rmse.evaluate(predictions)
    
    trainingtime = end_time - start_time

    fpath = os.path.join(result_dir, f"model_rank{train_rank}_iter{train_max_iter}_reg{train_reg_param}.txt")
    with open(fpath, "w") as file:
        file.write(f"Rank: {train_rank}\n")
        file.write(f"Max Iterations: {train_max_iter}\n")
        file.write(f"Regularization Parameter: {train_reg_param}\n")
        file.write(f"RMSE: {rmse}\n")
        file.write(f"Training Duration: {trainingtime} seconds\n")

    modelsave_path = os.path.join(save_dir, f"ALS_model_rank{train_rank}_iter{train_max_iter}_reg{train_reg_param}")
    model.save(modelsave_path)
    
    print(f"Model saved at: {modelsave_path}")

    print(f"Model trained and results saved: RMSE: {rmse}, Rank: {train_rank}, Max Iterations: {train_max_iter}")
    
    print(f"Training duration: {trainingtime} seconds\n")
    
    return train_rank, train_max_iter, train_reg_param, rmse, trainingtime

In [12]:
# Parametreler
ranks = [10,50]
iterations = [200]
lambdas = [0.01,0.1]

# Döngü ile her parametre kombinasyonu için modeli eğitip kaydetme
for rank in ranks:
    for iteration in iterations:
        for lambda_ in lambdas:
            try:
                print(f"Training model with rank={rank}, iteration={iteration}, lambda={lambda_}")
                model_name = f"ALS_model_rank{rank}_iter{iteration}_lambda{lambda_}"
                rmse = train_als_model(train_data,rank,iteration,lambda_, test_data)
            except Exception as e:
                # Eğer model eğitilemezse hata mesajı ver ve bir sonraki kombinasyona geç
                print(f"Model with rank={rank}, iteration={iteration}, lambda={lambda_} failed: {e}")
                continue

Training model with rank=10, iteration=200, lambda=0.01
Model saved at: c:\Users\Orhan Gazi Barak\Desktop\BigDataFinal\models\ALS_model_rank10_iter200_reg0.01
Model trained and results saved: RMSE: 0.7280467607665773, Rank: 10, Max Iterations: 200
Training duration: 582.2378866672516 seconds

Training model with rank=10, iteration=200, lambda=0.1
Model saved at: c:\Users\Orhan Gazi Barak\Desktop\BigDataFinal\models\ALS_model_rank10_iter200_reg0.1
Model trained and results saved: RMSE: 0.7902014129770596, Rank: 10, Max Iterations: 200
Training duration: 492.2477331161499 seconds

Training model with rank=50, iteration=200, lambda=0.01
Model saved at: c:\Users\Orhan Gazi Barak\Desktop\BigDataFinal\models\ALS_model_rank50_iter200_reg0.01
Model trained and results saved: RMSE: 0.5031821565551075, Rank: 50, Max Iterations: 200
Training duration: 4094.216643810272 seconds

Training model with rank=50, iteration=200, lambda=0.1
Model saved at: c:\Users\Orhan Gazi Barak\Desktop\BigDataFinal\mo

In [None]:
best_model_regression = 0
best_model_iteration = 0
best_model_rank = 0
best_model_var = None

def best_models(top_n=18):
    results = []

    global best_model_rank
    global best_model_iteration
    global best_model_regression

    for file_name in os.listdir(result_dir):
        if file_name.endswith(".txt"):
            file_path = os.path.join(result_dir, file_name)
            with open(file_path, "r") as file:
                content = file.readlines()
                rank = int(content[0].strip().split(": ")[1])
                max_iter = int(content[1].strip().split(": ")[1])
                reg_param = float(content[2].strip().split(": ")[1])
                rmse = float(content[3].strip().split(": ")[1])
                mse = float(rmse**2)
                results.append((rank, max_iter, reg_param, rmse, mse))
    
    results.sort(key=lambda x: x[3])

    print(f"Top {top_n} Models with Lowest RMSE:")
    for i, (rank, max_iter, reg_param, rmse, mse) in enumerate(results[:top_n], 1):
        print(f"Model {i}: Rank={rank}, MaxIter={max_iter}, RegParam={reg_param}, RMSE={rmse}, MSE={mse}")
    
    best_model = results[0]
    print(f"\nBest Model: Rank={best_model[0]}, MaxIter={best_model[1]}, RegParam={best_model[2]}, RMSE={best_model[3]}, MSE={best_model[4]}")
    
    best_model_rank = best_model[0]
    best_model_iteration = best_model[1]
    best_model_regression = best_model[2]

def cosine_similarity(v1, v2):
    cosine_similarity_value = float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))) 
    print(f"Cosine Similarity Value: {cosine_similarity_value}")
    return cosine_similarity_value

def get_random_movie(test_data):    
    row = test_data.count()
    random_index = random.randint(0, row-1)
    return test_data.collect()[random_index]["movieId"]

def load_best_model(rank, iteration, regression):
    global best_model_var

    my_cwd = os.getcwd()
    models = f"{my_cwd}\\models"
    best_dir_model_name = f"ALS_model_rank{rank}_iter{iteration}_reg{regression}"
    loaded_model_path = f"{models}\\{best_dir_model_name}"

    loaded_model = ALSModel.load(loaded_model_path)
    predictions = loaded_model.transform(test_data)
    predictions.show()

    best_model_var = loaded_model
    

def calculate_cosine_similarity(model):
    user_factors = model.userFactors
    item_factors = model.itemFactors

    random_movie = get_random_movie(test_data)

    print(f"Our random movie is: {random_movie}")

    product_features = item_factors.filter(item_factors.id == random_movie).collect()[0]['features']
    user_similarities = user_factors.rdd.map(
        lambda row: (row['id'], cosine_similarity(row['features'], product_features))
        ).map(lambda x: (x[0],float(x[1]))).toDF(["userId", "similarity"])
    
    top_users = user_similarities.orderBy("similarity", ascending=False).limit(10)
    top_users.select("userId").show()

best_models()

print("-"*50)

load_best_model(best_model_rank, best_model_iteration, best_model_regression)

print("-"*50)

calculate_cosine_similarity(best_model_var)

Top 18 Models with Lowest RMSE:
Model 1: Rank=200, MaxIter=50, RegParam=0.01, RMSE=0.26759923303580746, MSE=0.07160934952135238
Model 2: Rank=200, MaxIter=200, RegParam=0.01, RMSE=0.27153081178234084, MSE=0.07372898174717701
Model 3: Rank=200, MaxIter=10, RegParam=0.01, RMSE=0.33133095507589366, MSE=0.10978020179150387
Model 4: Rank=50, MaxIter=200, RegParam=0.01, RMSE=0.5031821565551075, MSE=0.2531922826754487
Model 5: Rank=50, MaxIter=50, RegParam=0.01, RMSE=0.5033630995841964, MSE=0.25337441002300964
Model 6: Rank=50, MaxIter=10, RegParam=0.01, RMSE=0.555789187929548, MSE=0.30890162141938643
Model 7: Rank=10, MaxIter=200, RegParam=0.01, RMSE=0.7280467607665773, MSE=0.5300520858627058
Model 8: Rank=10, MaxIter=50, RegParam=0.01, RMSE=0.7322324239314713, MSE=0.5361643226565579
Model 9: Rank=200, MaxIter=200, RegParam=0.1, RMSE=0.7362854887990632, MSE=0.5421163210160754
Model 10: Rank=200, MaxIter=50, RegParam=0.1, RMSE=0.7389190980848643, MSE=0.5460014335145493
Model 11: Rank=50, MaxI