# Individual Assignment 3: Spark MLlib demo notebook

In [1]:
# You are highly recommended to select the "PySpark" kernel instead of python kernel,
# Otherwise you need to modify this cell to get pyspark working.

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

# sc = spark.sparkContext
from pyspark.sql import *


print(f'num executors: {sc.getConf().get("spark.executor.instances")}')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/03 00:45:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/03 00:45:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


num executors: None


## Load our data into a DataFrame

In [2]:
from pyspark.sql.types import IntegerType, StringType, TimestampType, StructField, StructType

schema = StructType([StructField('timestamp', TimestampType(), True),
                       StructField('user_id', IntegerType(), True),
                       StructField('movie_id', StringType(), True),
                       StructField('rating', IntegerType(), True)]
                   )

In [3]:
# reads our data using the schema defined above
df = spark.read.format("csv")\
        .option("header", "true")\
        .schema(schema)\
        .load("ratings.csv")

In [4]:
# build two dictionaries to serve as inverted indices
movie_id_to_idx_map = df.select("movie_id")\
                        .rdd\
                        .distinct()\
                        .map(lambda r: r["movie_id"])\
                        .zipWithIndex()\
                        .collectAsMap()
inverted_index_movie_id = {v: k for k, v in movie_id_to_idx_map.items()}

                                                                                

In [5]:
from pyspark.sql.functions import udf

# Use this UDF to map string movie_id fields to integers
def map_movie_id_to_idx(movie_id):
    return movie_id_to_idx_map[movie_id]

mapMovieIdsUDF = udf(lambda x: map_movie_id_to_idx(x), IntegerType())

ratings_df = df.withColumn("movie_idx", mapMovieIdsUDF("movie_id"))
train, test = ratings_df.randomSplit([0.8, 0.2])

## Define and train a baseline ALS model

In [41]:
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="movie_idx", ratingCol="rating",
          coldStartStrategy="drop")



In [42]:
baseline_model = als.fit(train)

                                                                                

## Evaluate our baseline model

In [43]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = baseline_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

                                                                                

Root-mean-square error = 8.283882218554766


In [29]:
# Generate top 20 movie recommendations for each user
# userRecs = baseline_model.recommendForAllUsers(20)



## Let's try to improve on this model with some manual parameter tuning

In [44]:
# First, what if we just made a change ourselves?
man_tune_als = ALS(rank=50, maxIter=5, regParam=0.01, userCol="user_id", itemCol="movie_idx", ratingCol="rating",
          coldStartStrategy="drop")

In [45]:
manually_tuned_model = man_tune_als.fit(train)

                                                                                

In [46]:
# Evaluate the model by computing the RMSE on the test data
predictions = manually_tuned_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

[Stage 434:>                                                        (0 + 1) / 1]

Root-mean-square error = 4.017565583817099


                                                                                

## What about MLlib's Automated Hyper-parameter Tuning?

In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# First, what if we just made a change ourselves?
cv_als = ALS(maxIter=5, userCol="user_id", itemCol="movie_idx", ratingCol="rating", coldStartStrategy="drop")

paramGrid = ParamGridBuilder() \
    .addGrid(cv_als.rank, [50, 75]) \
    .addGrid(cv_als.regParam, [0.1, 0.01]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

crossval = CrossValidator(estimator=cv_als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

22/04/03 00:46:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/03 00:46:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/04/03 00:46:54 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK

In [None]:
predictions = cvModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))