In [0]:
display(dbutils.fs.ls('/databricks-datasets/cs110x/ml-1m/data-001/'))

In [0]:
%sh
head -n 500 /databricks-datasets/cs110x/ml-1m/data-001/movies.dat

In [0]:
%sh
head -n 500 /databricks-datasets/cs110x/ml-1m/data-001/ratings.dat

In [0]:
display(dbutils.fs.ls('/databricks-datasets/cs110x/ml-20m/data-001/'))

In [0]:
%sh
head -n 500 /databricks-datasets/cs110x/ml-20m/data-001/ratings.csv

In [0]:
from pyspark.sql.types import *

movies_schema = StructType([
  StructField('movieId', IntegerType()),
  StructField('title', StringType()),
  StructField('genres', StringType())
])
ratings_schema = StructType([
  StructField('userId', IntegerType()),
  StructField('movieId', IntegerType()),
  StructField('ratings', FloatType())  
])

In [0]:
file_location = "/databricks-datasets/cs110x/ml-1m/data-001/movies.dat"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "false"
delimiter = "::"

# The applied options are for CSV files. For other file types, these will be ignored.
df_movies = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(movies_schema) \
  .load(file_location)

display(df_movies)

In [0]:
file_location = "/databricks-datasets/cs110x/ml-1m/data-001/ratings.dat"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "false"
delimiter = "::"

# The applied options are for CSV files. For other file types, these will be ignored.
df_ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(ratings_schema) \
  .load(file_location)

display(df_ratings)

In [0]:
(df_ratings_train, df_ratings_test) = df_ratings.randomSplit([0.8, 0.2])
#df_ratings_train = df_ratings_80.cache()
#df_ratings_test = df_ratings_20.cache()
print(df_ratings_train.count())
print(df_ratings_test.count())

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS() \
  .setMaxIter(10) \
  .setRegParam(0.1) \
  .setRank(6) \
  .setUserCol('userId') \
  .setRatingCol("ratings") \
  .setItemCol("movieId")

model = als.fit(df_ratings_train)

In [0]:
df_predicted_ratings = model.transform(df_ratings_test)
df_predicted_ratings = df_predicted_ratings.filter(df_predicted_ratings.prediction != float('nan'))
display(df_predicted_ratings)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

regr_eval = RegressionEvaluator(predictionCol='prediction', labelCol='ratings', metricName='rmse')
error = regr_eval.evaluate(df_predicted_ratings)
print(f'RMSE: {error}')

In [0]:
check_user_id = df_ratings.filter(df_ratings['userId']==0)
check_user_id.show()

In [0]:
my_user_id = 0
my_rated_movies = [
    (my_user_id, 318, 3), # Shawshank redemption
    (my_user_id, 908, 4), # North by Northwest (1959)
    (my_user_id, 858, 5), # Godfather, The (1972)
    (my_user_id, 2019, 4), # Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)
    (my_user_id, 912, 4), # Casablanca (1942)
    (my_user_id, 1250, 5), # Bridge on the River Kwai, The (1957)
    (my_user_id, 2324, 5), # Life Is Beautiful (La Vita ? bella) (1997)
    (my_user_id, 1233, 5), # Boat, The (Das Boot) (1981)
    (my_user_id, 593, 4), # Silence of the Lambs, The (1991)
    (my_user_id, 1262, 4), # Great Escape, The (1963)
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

In [0]:
df_my_ratings = spark.createDataFrame(my_rated_movies, ['userId', 'movieId', 'rating'])
display(df_my_ratings)

In [0]:
print(df_ratings_train.count())
df_ratings_train_with_my_pref = df_ratings_train.unionAll(df_my_ratings)
print(df_ratings_train_with_my_pref.count())


In [0]:
from pyspark.ml.recommendation import ALS

als = ALS() \
  .setMaxIter(10) \
  .setRegParam(0.1) \
  .setRank(6) \
  .setUserCol('userId') \
  .setRatingCol("ratings") \
  .setItemCol("movieId")

my_model = als.fit(df_ratings_train_with_my_pref)

In [0]:
my_rated_movieIds = [x[1] for x in my_rated_movies]
print(my_rated_movieIds)

In [0]:
print(df_movies.count())
df_not_rated_movies = df_movies.filter(~df_movies['movieId'].isin(my_rated_movieIds))
print(df_not_rated_movies.count())

In [0]:
display(df_not_rated_movies)

In [0]:
from pyspark.sql.functions import lit

df_not_rated_movies_with_user = df_not_rated_movies.withColumn('userId', lit(my_user_id))
display(df_not_rated_movies_with_user)

In [0]:
df_my_predicted_ratings = my_model.transform(df_not_rated_movies_with_user)
display(df_my_predicted_ratings)

In [0]:
df_my_predicted_ratings = df_my_predicted_ratings.filter(df_my_predicted_ratings.prediction != float('nan'))

from pyspark.sql.functions import desc

display(df_my_predicted_ratings.sort(desc('prediction')))