In [1]:
# importing libraries
import pandas as pd
import numpy as np

# pass in column names for each CSV as the column name is not given in the file and read them using pandas.
# You can check the column names from the readme file

# reading users file:
u_cols = ['user_id', 'age', 'sex', 'occupation', 'zip_code']
users_df = pd.read_csv('/mnt/d/Study/Analytics Vidhya/AV-Recommendation Engine/ml-100k/u.user', sep='|', names=u_cols,encoding='latin-1')

# reading ratings file:
r_cols = ['user_id', 'movie_id', 'rating', 'unix_timestamp']
ratings_df = pd.read_csv('/mnt/d/Study/Analytics Vidhya/AV-Recommendation Engine/ml-100k/u.data', sep='\t', names=r_cols,encoding='latin-1')

# reading items file:
i_cols = ['movie id', 'movie title' ,'release date','video release date', 'IMDb URL', 'unknown', 'Action', 'Adventure',
'Animation', 'Children\'s', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy',
'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']
items_df = pd.read_csv('/mnt/d/Study/Analytics Vidhya/AV-Recommendation Engine/ml-100k/u.item', sep='|', names=i_cols,
encoding='latin-1')



# After loading the dataset, we should look at the content of each file (users, ratings, items).

# Looking at the user file
print("\nUser Data :")
print("shape : ", users_df.shape)
print(users_df.head())

# We have 943 users in the dataset and each user has 5 features, i.e. user_ID, age, sex, occupation and zip_code. Now let’s look at the ratings file.

# Ratings Data
print("\nRatings Data :")
print("shape : ", ratings_df.shape)
print(ratings_df.head())

# We have 100k ratings for different user and movie combinations. Now finally examine the items file.

# Item Data
print("\nItem Data :")
print("shape : ", items_df.shape)
print(items_df.head())


User Data :
shape :  (943, 5)
   user_id  age sex  occupation zip_code
0        1   24   M  technician    85711
1        2   53   F       other    94043
2        3   23   M      writer    32067
3        4   24   M  technician    43537
4        5   33   F       other    15213

Ratings Data :
shape :  (100000, 4)
   user_id  movie_id  rating  unix_timestamp
0      196       242       3       881250949
1      186       302       3       891717742
2       22       377       1       878887116
3      244        51       2       880606923
4      166       346       1       886397596

Item Data :
shape :  (1682, 24)
   movie id        movie title release date  video release date  \
0         1   Toy Story (1995)  01-Jan-1995                 NaN   
1         2   GoldenEye (1995)  01-Jan-1995                 NaN   
2         3  Four Rooms (1995)  01-Jan-1995                 NaN   
3         4  Get Shorty (1995)  01-Jan-1995                 NaN   
4         5     Copycat (1995)  01-Jan-1995     

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "App Name")

In [3]:
from pyspark.sql import SQLContext

sql = SQLContext(sc)
#sc.stop()

In [4]:
sc.setCheckpointDir('checkpoint/')

In [5]:
ratings = sql.createDataFrame(ratings_df)

In [6]:
# Look at the column names
print(ratings.columns)

# Look at the first few rows of data
print(ratings.show())

['user_id', 'movie_id', 'rating', 'unix_timestamp']
+-------+--------+------+--------------+
|user_id|movie_id|rating|unix_timestamp|
+-------+--------+------+--------------+
|    196|     242|     3|     881250949|
|    186|     302|     3|     891717742|
|     22|     377|     1|     878887116|
|    244|      51|     2|     880606923|
|    166|     346|     1|     886397596|
|    298|     474|     4|     884182806|
|    115|     265|     2|     881171488|
|    253|     465|     5|     891628467|
|    305|     451|     3|     886324817|
|      6|      86|     3|     883603013|
|     62|     257|     2|     879372434|
|    286|    1014|     5|     879781125|
|    200|     222|     5|     876042340|
|    210|      40|     3|     891035994|
|    224|      29|     3|     888104457|
|    303|     785|     3|     879485318|
|    122|     387|     5|     879270459|
|    194|     274|     2|     879539794|
|    291|    1042|     4|     874834944|
|    234|    1184|     2|     892079237|
+----

In [7]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("user_id").distinct().count()
num_movies = ratings.select("movie_id").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  93.70% empty.


In [8]:
# Import the requisite packages
from pyspark.sql.functions import col

# View the ratings dataset
ratings.show()

# Filter to show only userIds less than 100
ratings.filter(col("user_id") < 100).show()

# Group data by userId, count ratings
ratings.groupBy("user_id").count().show()

+-------+--------+------+--------------+
|user_id|movie_id|rating|unix_timestamp|
+-------+--------+------+--------------+
|    196|     242|     3|     881250949|
|    186|     302|     3|     891717742|
|     22|     377|     1|     878887116|
|    244|      51|     2|     880606923|
|    166|     346|     1|     886397596|
|    298|     474|     4|     884182806|
|    115|     265|     2|     881171488|
|    253|     465|     5|     891628467|
|    305|     451|     3|     886324817|
|      6|      86|     3|     883603013|
|     62|     257|     2|     879372434|
|    286|    1014|     5|     879781125|
|    200|     222|     5|     876042340|
|    210|      40|     3|     891035994|
|    224|      29|     3|     888104457|
|    303|     785|     3|     879485318|
|    122|     387|     5|     879270459|
|    194|     274|     2|     879539794|
|    291|    1042|     4|     874834944|
|    234|    1184|     2|     892079237|
+-------+--------+------+--------------+
only showing top

In [None]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movie_id").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movie_id").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("user_id").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("user_id").count().select(avg("count")).show()

In [9]:
# Use .printSchema() to see the datatypes of the ratings dataset
ratings.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- unix_timestamp: long (nullable = true)



In [10]:

# Tell Spark to convert the columns to the proper data types
ratings = ratings.select(ratings.user_id.cast("integer"), ratings.movie_id.cast("integer"), ratings.rating.cast("double"))

# Call .printSchema() again to confirm the columns are now in the correct format
ratings.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: double (nullable = true)



In [13]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", nonnegative = True,coldStartStrategy= "drop", implicitPrefs = False)

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [12]:

#als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating",rank =25, maxIter = 100, regParam = 0.5, nonnegative = True,coldStartStrategy= "drop", implicitPrefs = False)

#model=als.fit(train)

In [15]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, ]) \
            .addGrid(als.maxIter, [50]) \
            .addGrid(als.regParam, [.05]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol
="prediction") 
print ("Num models to be tested: ", len(param_grid))




Num models to be tested:  2


In [16]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_32fd7436c0e7


In [17]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

In [18]:
#Extract best model from the cv model above
best_model = model.bestModel

In [22]:
best_model.getPredictionCol

<bound method HasPredictionCol.getPredictionCol of ALSModel: uid=ALS_b78242419c46, rank=50>

In [23]:
test_predictions = best_model.transform(test)

In [24]:
# View the predictions 
test_predictions.show()

# Calculate and print the RMSE of test_predictions
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

+-------+--------+------+----------+
|user_id|movie_id|rating|prediction|
+-------+--------+------+----------+
|    601|     148|   3.0|   2.61434|
|    330|     148|   4.0|  4.318637|
|    727|     148|   2.0| 3.5162623|
|    190|     148|   4.0| 3.2469242|
|    297|     148|   3.0| 2.3736668|
|    178|     148|   4.0| 3.6291234|
|    328|     148|   3.0| 2.9313915|
|    923|     148|   4.0| 3.9039102|
|    552|     148|   3.0| 2.5907874|
|    423|     148|   3.0| 2.5433755|
|    757|     148|   4.0| 2.7531908|
|     51|     148|   3.0| 2.9523456|
|    438|     148|   5.0|  4.086958|
|    706|     148|   4.0| 2.8821194|
|    361|     148|   1.0| 2.4361029|
|    181|     148|   2.0| 1.9416219|
|    249|     148|   3.0| 3.2219837|
|    893|     148|   3.0| 3.2738438|
|    203|     148|   3.0| 3.3258357|
|    690|     148|   3.0| 2.7949307|
+-------+--------+------+----------+
only showing top 20 rows

0.9406700180002333
