## 1. Load and Prepare the Data

In [23]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize Spark Session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Load data
movies = spark.read.csv('../data/intermediate/filtered_movies.tsv', sep='\t', header=True, inferSchema=True)
users = spark.read.csv('../data/intermediate/filtered_users.tsv', sep='\t', header=True, inferSchema=True)
user_ratings = spark.read.csv('../data/intermediate/filtered_user_ratings.tsv', sep='\t', header=True, inferSchema=True)


In [24]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)
])

# Load the training data
train = spark.read.csv("../data/raw/u1.base", sep="\t", schema=schema, header=False)

# Load the test data
test = spark.read.csv("../data/raw/u1.test", sep="\t", schema=schema, header=False)


In [25]:
# check the data types of all columns
user_ratings.printSchema()

users.printSchema()

movies.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)

root
 |-- movie_id: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_date: integer (nullable = true)
 |-- genre_1: integer (nullable = true)
 |-- genre_2: integer (nullable = true)
 |-- genre_3: integer (nullable = true)
 |-- genre_4: integer (nullable = true)
 |-- genre_5: integer (nullable = true)
 |-- genre_6: integer (nullable = true)
 |-- genre_7: integer (nullable = true)
 |-- genre_8: integer (nullable = true)
 |-- genre_9: integer (nullable = true)
 |-- genre_10: integer (nullable = true)
 |-- genre_11: integer (nullable = true)
 |-- genre_12: integer (nullable = true)
 |-- genre_13

## 2. Model Creation and Evaluation

In [26]:
# Build the ALS Model with initial parameters
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop")

# Define a parameter grid to search over
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.maxIter, [5, 10]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Setup CrossValidator
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Adjust numFolds as needed

# Fit the model using cross-validation
cvModel = crossval.fit(user_ratings)  # Assuming 'user_ratings' is your training dataset

# Make predictions on the test data
predictions = cvModel.transform(test)  # Assuming 'test' is your test dataset

# Evaluate the model using Root-mean-square error
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# Evaluate the model using Mean-square error
mse_evaluator = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")
mse = mse_evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE) on test data = {mse}")

Root-mean-square error = 0.8279527668903118
Mean Squared Error (MSE) on test data = 0.685505784201323


## 3. Save the Model

In [None]:
# Define the path where you want to save the model
model_path = "../data/models/1.0-model"

# Save the model
cvModel.bestModel.save(model_path)