In [3]:
import pyspark
from pyspark import SparkContext

In [4]:
# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()
print(spark.version)

3.1.1


In [6]:
# File location and type
file_location_movies = "./ml-latest/movies.csv"
file_location_ratings = "./ml-latest/ratings.csv"
file_location_links = "./ml-latest/links.csv"
file_location_tags = "./ml-latest/tags.csv"

file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
movies_df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location_movies)
ratings_df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location_ratings)
links_df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location_links)
tags_df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location_tags)
                                            
display(movies_df.take(5))
display(ratings_df.take(5))
display(links_df.take(5))
display(tags_df.take(5))

movies_df.createOrReplaceTempView('movies')
ratings_df.createOrReplaceTempView('ratings')
links_df.createOrReplaceTempView('links')
tags_df.createOrReplaceTempView('tags')

movies_df.cache()
ratings_df.cache()
links_df.cache()
tags_df.cache()

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

[Row(userId=1, movieId=307, rating=3.5, timestamp=1256677221),
 Row(userId=1, movieId=481, rating=3.5, timestamp=1256677456),
 Row(userId=1, movieId=1091, rating=1.5, timestamp=1256677471),
 Row(userId=1, movieId=1257, rating=4.5, timestamp=1256677460),
 Row(userId=1, movieId=1449, rating=4.5, timestamp=1256677264)]

[Row(movieId=1, imdbId=114709, tmdbId=862),
 Row(movieId=2, imdbId=113497, tmdbId=8844),
 Row(movieId=3, imdbId=113228, tmdbId=15602),
 Row(movieId=4, imdbId=114885, tmdbId=31357),
 Row(movieId=5, imdbId=113041, tmdbId=11862)]

[Row(userId=14, movieId=110, tag='epic', timestamp='1443148538'),
 Row(userId=14, movieId=110, tag='Medieval', timestamp='1443148532'),
 Row(userId=14, movieId=260, tag='sci-fi', timestamp='1442169410'),
 Row(userId=14, movieId=260, tag='space action', timestamp='1442169421'),
 Row(userId=14, movieId=318, tag='imdb top 250', timestamp='1442615195')]

DataFrame[userId: int, movieId: int, tag: string, timestamp: string]

In [7]:
print(movies_df.dtypes)
print(ratings_df.dtypes)
print(links_df.dtypes)
print(tags_df.dtypes)

[('movieId', 'int'), ('title', 'string'), ('genres', 'string')]
[('userId', 'int'), ('movieId', 'int'), ('rating', 'double'), ('timestamp', 'int')]
[('movieId', 'int'), ('imdbId', 'int'), ('tmdbId', 'int')]
[('userId', 'int'), ('movieId', 'int'), ('tag', 'string'), ('timestamp', 'string')]


## Part 1: Spark SQL and OLAP

In [8]:
print(movies_df.columns)
print(ratings_df.columns)
print(links_df.columns)
print(tags_df.columns)

['movieId', 'title', 'genres']
['userId', 'movieId', 'rating', 'timestamp']
['movieId', 'imdbId', 'tmdbId']
['userId', 'movieId', 'tag', 'timestamp']


### Q1: The number of Users

In [9]:
%sql
select count(distinct userId)
from ratings

SyntaxError: invalid syntax (<ipython-input-9-0d5584a89956>, line 2)

In [None]:
ratings_df.select('userId').distinct().count()

### Q2: The number of Movies

In [None]:
%sql
select count(distinct movieId)
from movies

In [None]:
movies_df.select('movieId').distinct().count()

### Q3: How many movies are rated by users? List movies not rated before

In [None]:
%sql
select count(distinct movieId)
from ratings

In [None]:
# movies_df.select('movieId').subtract(ratings_df.select('movieId')).count()
ratings_df.select('movieId').distinct().count()

### Q4: List Movie Genres

In [None]:
%sql
select distinct genres
from movies

In [None]:
movies_df.select('genres').distinct().show()

### Q5: Movie for Each Category

In [None]:
%sql
select category, count(*) as category_count
from (
select explode(split(genres, '\\|')) as category
from movies
) as movies_category
group by movies_category.category
order by category_count desc

In [None]:
movies_df.select(split(col("genres"),"\|").alias("genres")).withColumn('genres', explode('genres')).groupby('genres').count().orderBy(col("count").desc()).show()

## Part2: Spark ALS based approach for training model

In [0]:
print(movies_df.dtypes)
print(ratings_df.dtypes)
print(links_df.dtypes)
print(tags_df.dtypes)
print(movies_df.columns)
print(ratings_df.columns)
print(links_df.columns)
print(tags_df.columns)

In [0]:
# Count the total number of ratings in the dataset
numerator = ratings_df.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings_df.select("userId").distinct().count()
num_movies = ratings_df.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("numerator:",numerator,". num_users:",num_users,". num_movies:",num_movies,'. denominator:',denominator,'. sparsity:',sparsity)
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [0]:
%sql
select userId, count(*) as count
from ratings
group by userId
order by userId

userId,count
1,16
2,15
3,11
4,736
5,72
6,42
7,15
8,31
9,1
10,121


In [0]:
# Group data by userId, count ratings
ratings_df.groupBy("userId").count().show()

In [0]:
# %sql
# SELECT movieId, COUNT(movieId) 
# FROM ratings 
# GROUP BY ratings 
# ORDER BY COUNT(movieId) 
# DESC LIMIT 1;

In [0]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings_df.groupBy("movieId").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings_df.groupBy("movieId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings_df.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings_df.groupBy("userId").count().select(avg("count")).show()

In [0]:
# Use .printSchema() to see the datatypes of the ratings dataset
ratings_df.printSchema()

# # Tell Spark to convert the columns to the proper data types
# ratings_df = ratings_df.select(ratings_df.userId.cast("integer"), ratings_df.movieId.cast("integer"), ratings_df.rating.cast("double"))

# # Call .printSchema() again to confirm the columns are now in the correct format
# ratings_df.printSchema()

In [0]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create test and train set
(train, test) = ratings_df.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False)

# Confirm that a model called "als" was created
type(als)

In [0]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5,10]) \
            .addGrid(als.regParam, [.01,.1]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [0]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

In [0]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

pipeTrain.write().overwrite().save(outpath)
model_in = PipelineModel.load(outpath)

In [0]:
type(best_model)
best_model.save('/model_save_ALS')

In [0]:
model_path = "/model_save"
cv.write().save(model_path)

In [0]:
from pyspark.ml.recommendation import ALSModel

als_loaded = ALSModel.read().load('/model_save_ALS')
als_loaded

In [0]:
display(als_loaded.recommendForAllUsers(10))

In [0]:
# Print best_model
print(type(als_loaded))

In [0]:
predictions=als_loaded.transform(test)
predictions.show()

In [0]:
predictions = predictions.na.drop()
predictions.show()

In [0]:
rmse = evaluator.evaluate(predictions)
rmse

In [0]:
best_model = als_loaded

In [0]:
best_model.getBlockSize()

In [0]:
best_model.getUserCol()

In [0]:
display(best_model.userFactors.orderBy("id").collect())

id,features
1,"List(2.7106189727783203, 3.4204370975494385, 1.0696133375167847, 3.2918269634246826, 3.4723706245422363)"
2,"List(3.4761226177215576, 2.9496142864227295, 3.27455472946167, 1.988886833190918, 2.3487050533294678)"
3,"List(2.5094239711761475, 4.3667168617248535, 2.8416635990142822, 1.4065183401107788, 2.623562812805176)"
4,"List(0.6865320801734924, 3.232478380203247, 3.204557180404663, 2.056591033935547, 4.044508934020996)"
5,"List(1.0852680206298828, 3.326652765274048, 1.8209935426712036, 2.5497100353240967, 4.591876029968262)"
6,"List(3.298166275024414, 3.0859696865081787, 1.7481738328933716, 1.974731206893921, 4.442023277282715)"
7,"List(2.110003709793091, 4.042046070098877, 1.4678417444229126, 1.9046857357025146, 2.562107563018799)"
8,"List(3.227114677429199, 2.976378917694092, 2.6246206760406494, 1.1009538173675537, 3.4232897758483887)"
9,"List(0.0, 4.828293800354004, 0.37908509373664856, 1.7712457180023193, 4.1917243003845215)"
10,"List(3.031416654586792, 3.7143807411193848, 0.7452799081802368, 2.9069347381591797, 3.4425299167633057)"


In [0]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [0]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = best_model.recommendForAllItems(10)
display(userRecs.take(5))
display(movieRecs.take(5))

#Recommend moive to users with id: 575, 232.
you can choose some users to recommend the moives

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [(0,575),(1,232)]
schema = StructType([StructField("id", IntegerType(), True),StructField("userId", IntegerType(), True)])
 
user_ids = spark.createDataFrame(data=data2,schema=schema)
user_ids.printSchema()
user_ids.show(truncate=False)

In [0]:
movies_ids = best_model.recommendForUserSubset(user_ids,5)
display(movies_ids)

userId,recommendations
232,"List(List(175695, 16.653065), List(136880, 16.487339), List(187937, 15.228992), List(135757, 14.850873), List(135151, 13.785253))"
575,"List(List(175695, 24.52223), List(182521, 17.310308), List(136880, 11.708318), List(187937, 10.938843), List(135757, 10.121726))"


# Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [(0,463),(1,471)]
schema = StructType([StructField("id", IntegerType(), True),StructField("movieId", IntegerType(), True)])
 
movie_ids = spark.createDataFrame(data=data2,schema=schema)
movie_ids.printSchema()
movie_ids.show(truncate=False)

In [0]:
users_ids = best_model.recommendForItemSubset(movie_ids,5)
display(users_ids)

movieId,recommendations
463,"List(List(230692, 6.051877), List(165699, 6.040237), List(26576, 5.8531137), List(188265, 5.7603807), List(31812, 5.692924))"
471,"List(List(174669, 8.885668), List(195814, 7.672152), List(91645, 7.668568), List(187577, 7.557473), List(77814, 6.923845))"
