In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Load movies.csv data to spark SQL dataframe
df_movies = spark.read.csv("./dataset/movies.csv", header=True, inferSchema=True)

# Load ratings.csv data to spark SQL dataframe and drop timestamp column
df_ratings = spark.read.csv("./dataset/ratings.csv", header=True, inferSchema=True)
df_ratings = df_ratings.drop("timestamp")

In [3]:
genre_list = [
    "Action", "Adventure", "Animation", "Children's",
    "Comedy", "Crime", "Documentary", "Drama",
    "Fantasy", "Film-Noir", "Horror", "Musical",
    "Mystery", "Romance", "Sci-Fi", "Thriller",
    "War", "Western"
]

def add_genre_list(genre_str, genre_name):
    if genre_name in genre_str:
        return 1
    else:
        return 0

def udf_score(label_list):
    return udf(lambda l: add_genre_list(l, label_list))
    
slen = udf(add_genre_list, IntegerType())

# Break down each genre into a separate column, assign 1 to it if the movie contains the genre
# Otherwise, assign 0
for genre_name in genre_list:
    df_movies = df_movies.withColumn(genre_name, udf_score(genre_name)(col("genres")))

# Drop genre and title column since it is no longer needed
df_movies = df_movies.drop("genres")
df_movies = df_movies.drop("title")

# Join ratings and movies dataframe together by its movie ID
df = df_ratings.join(df_movies, "movieId")

In [4]:
column_list = ["movieId", "userId", "Action", "Adventure", "Animation", "Children's", "Comedy",
               "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical",
               "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]

# Cast values in all columns (except rating column) to integer
for col in column_list:
    df = df.withColumn(col, df[col].cast("int"))

# Rename rating column as label
df = df.withColumnRenamed("rating", "label")

In [5]:
# combines a list of columns into a single vector column
assembler = VectorAssembler(inputCols = column_list, outputCol="features")

# Index categorical features (all genre columns) which allows tree-based algorithms 
# to treat categorical features appropriately
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2)

model_list = ["Linear Regression", "Random Forest", "GBT"]

for model_name in model_list:
    if model_name == "Linear Regression":
        estimator = LinearRegression(maxIter=10, regParam=0.3)
        pipeline = Pipeline(stages=[assembler, estimator])
    elif model_name == "Random Forest":
        estimator = RandomForestRegressor(featuresCol="indexedFeatures")
        pipeline = Pipeline(stages=[assembler, featureIndexer, estimator])
    else:
        GBTRegressor(featuresCol="indexedFeatures", maxIter=20)
        pipeline = Pipeline(stages=[assembler, featureIndexer, estimator])

    # Split the data into 80% training data and 20% testing data
    df_train, df_test = df.randomSplit([0.8, 0.2])
    model = pipeline.fit(df_train)
    
    # Obtain prediction result from the model
    df_train_predictions = model.transform(df_train)
    df_test_predictions = model.transform(df_test)
    
    # Use Root Mean Squared Error as evaluation metric to get training and testing error
    lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="rmse")
    print(model_name)
    print("Train data RMSE = %g" % lr_evaluator.evaluate(df_train_predictions))
    print("Test data RMSE = %g" % lr_evaluator.evaluate(df_test_predictions))

Linear Regression
Train data RMSE = 1.0224
Test data RMSE = 1.02817
Random Forest
Train data RMSE = 1.01337
Test data RMSE = 1.00695
GBT
Train data RMSE = 1.01106
Test data RMSE = 1.01521
