In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=ca080cff5ef90e0740bd34b477cdc0dd8b0252f152119af2d1f9972b319809cb
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
data = spark.read.csv("/content/ratings.csv", header=True, inferSchema=True)
data.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
num_rows = data.count()

# Count the number of columns
num_columns = len(data.columns)

# Print the shape
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

Number of rows: 372362
Number of columns: 4


In [None]:
# Read data from CSV file
movie_names_data = spark.read.csv("/content/movies.csv", header=True, inferSchema=True)
movie_names_data.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [None]:
from pyspark.sql.functions import col, split, trim

# Split the 'title' column by '(' to separate title and year
split_col = split(movie_names_data['title'], '\\(')
movie_names_data = movie_names_data.withColumn("title", split_col.getItem(0))

# Remove leading and trailing spaces
movie_names_data = movie_names_data.withColumn("title", trim(movie_names_data['title']))

# Show the updated DataFrame
movie_names_data.show(truncate=False)

+-------+------------------------------+-------------------------------------------+
|movieId|title                         |genres                                     |
+-------+------------------------------+-------------------------------------------+
|1      |Toy Story                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men              |Comedy|Romance                             |
|4      |Waiting to Exhale             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II   |Comedy                                     |
|6      |Heat                          |Action|Crime|Thriller                      |
|7      |Sabrina                       |Comedy|Romance                             |
|8      |Tom and Huck                  |Adventure|Children                         |
|9      |Sudden Death                  |Action                   

In [None]:
# The printSchema()method in PySpark is used to display the schema of a DataFrame.
#The schema includes the data types of each column and provides valuable information about the structure of the data
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
data.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            372362|            372362|            372362|              372362|
|   mean|1314.7315622969047|20928.468514510074|3.5561026635370956| 1.208393017946203E9|
| stddev|  729.069041526375| 38735.09961594754|1.0494675280340557|2.3262484856479302E8|
|    min|                 1|                 1|               0.5|           789652009|
|    max|              2559|            208793|               5.0|          1574253766|
+-------+------------------+------------------+------------------+--------------------+



In [None]:
data.dropna()


DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [None]:
null_check = data.select([col(column).isNull().alias(column) for column in data.columns]).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
| false|  false| false|    false|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
null_check = movie_names_data.select("title", col("title").isNull().alias("is_null")).show()
null_check = movie_names_data.select("genres", col("genres").isNull().alias("is_null")).show()


+--------------------+-------+
|               title|is_null|
+--------------------+-------+
|           Toy Story|  false|
|             Jumanji|  false|
|    Grumpier Old Men|  false|
|   Waiting to Exhale|  false|
|Father of the Bri...|  false|
|                Heat|  false|
|             Sabrina|  false|
|        Tom and Huck|  false|
|        Sudden Death|  false|
|           GoldenEye|  false|
|American Presiden...|  false|
|Dracula: Dead and...|  false|
|               Balto|  false|
|               Nixon|  false|
|    Cutthroat Island|  false|
|              Casino|  false|
|Sense and Sensibi...|  false|
|          Four Rooms|  false|
|Ace Ventura: When...|  false|
|         Money Train|  false|
+--------------------+-------+
only showing top 20 rows

+--------------------+-------+
|              genres|is_null|
+--------------------+-------+
|Adventure|Animati...|  false|
|Adventure|Childre...|  false|
|      Comedy|Romance|  false|
|Comedy|Drama|Romance|  false|
|            

In [None]:
from pyspark.sql.functions import col, sum

# Calculate the count of null values for each column
null_counts = movie_names_data.select([sum(col(column).isNull().cast("int")).alias(column) for column in movie_names_data.columns])

# Show the result
null_counts.show()


+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+



In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg



# Join the average ratings with movie names based on the movieId column
combined_data = data.join(movie_names_data, on="movieId")



In [None]:
null_counts = combined_data.select([sum(col(column).isNull().cast("int")).alias(column) for column in combined_data.columns])

# Show the result
null_counts.show()
combined_data.show()

+-------+------+------+---------+-----+------+
|movieId|userId|rating|timestamp|title|genres|
+-------+------+------+---------+-----+------+
|      0|     0|     0|        0|    0|     0|
+-------+------+------+---------+-----+------+

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    296|     1|   5.0|1147880044|        Pulp Fiction|Comedy|Crime|Dram...|
|    306|     1|   3.5|1147868817|   Three Colors: Red|               Drama|
|    307|     1|   5.0|1147868828|  Three Colors: Blue|               Drama|
|    665|     1|   5.0|1147878820|         Underground|    Comedy|Drama|War|
|    899|     1|   3.5|1147868510| Singin' in the Rain|Comedy|Musical|Ro...|
|   1088|     1|   4.0|1147868495|       Dirty Dancing|Drama|Musical|Rom...|
|   1175|     1|   3.5|1147868826|        Delicatessen|Comedy|Drama|Rom

In [None]:
#the randomSplit()** method is used to split the DataFrame into training and testing sets.
#The first argument [0.8, 0.2] specifies the relative sizes of the training and testing sets.
#Here, 80% of the data is used for training, and 20% is used for testing. The seed parameter ensures reproducibility of the random split.
# Split the data into training and testing sets
train_data, test_data = combined_data.randomSplit([0.8, 0.2], seed=123)

# Show the number of rows in the training and testing sets
print("Number of rows in train_data:", train_data.count())
print("Number of rows in test_data:", test_data.count())

Number of rows in train_data: 298007
Number of rows in test_data: 74355


In [None]:
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=15, regParam=0.01, userCol="userId" ,itemCol="movieId", ratingCol="rating")
#Fit the Model on Item-Based Data
model = als.fit(train_data)

In [None]:
# Make predictions on the test data using the trained model
predictions = model.transform(test_data)

# Show the predictions, including original columns and the "prediction" column
predictions.show()

+-------+------+------+----------+---------+--------------------+----------+
|movieId|userId|rating| timestamp|    title|              genres|prediction|
+-------+------+------+----------+---------+--------------------+----------+
|      1|     4|   3.0|1573944252|Toy Story|Adventure|Animati...| 3.2754016|
|      1|    12|   4.0|1167582601|Toy Story|Adventure|Animati...| 3.8952022|
|      1|    47|   2.0| 855093790|Toy Story|Adventure|Animati...|   3.39954|
|      1|    50|   4.0|1402505313|Toy Story|Adventure|Animati...| 4.0608797|
|      1|    66|   3.0|1138600903|Toy Story|Adventure|Animati...|  4.164846|
|      1|    77|   4.0| 832021510|Toy Story|Adventure|Animati...| 3.6073601|
|      1|    96|   5.0|1443313447|Toy Story|Adventure|Animati...| 3.4178615|
|      1|    98|   5.0|1450047199|Toy Story|Adventure|Animati...| 4.4976377|
|      1|   111|   4.5|1167221538|Toy Story|Adventure|Animati...|  5.076212|
|      1|   112|   3.0| 864074073|Toy Story|Adventure|Animati...| 3.0467987|

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
als = ALS(
          rank=30,
          maxIter=4,
          regParam=0.1,
          userCol='userId',
          itemCol='movieId',
          ratingCol='rating',
          coldStartStrategy='drop',
          implicitPrefs=False
         )
model = als.fit(train_data)

predictions = model.transform(test_data)
evaluator = RegressionEvaluator(metricName='mae', labelCol='rating',
                                predictionCol='prediction')

mae = evaluator.evaluate(predictions)
print(f'MAE (Test) = {mae}')

MAE (Test) = 0.6589723933762238


In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8476244537717983


In [None]:
single_user = test_data.filter(test_data['userId']==12).select(['movieId','userId'])

In [None]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      1|    12|
|     10|    12|
|     16|    12|
|     25|    12|
|     29|    12|
|     31|    12|
|     39|    12|
|    160|    12|
|    168|    12|
|    185|    12|
|    231|    12|
|    235|    12|
|    260|    12|
|    293|    12|
|    319|    12|
|    329|    12|
|    343|    12|
|    413|    12|
|    435|    12|
|    489|    12|
+-------+------+
only showing top 20 rows



In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession


# Create an ALS model
als = ALS(
    rank=30,
    maxIter=4,
    regParam=0.1,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop',
    implicitPrefs=False
)

# Create a RegressionEvaluator
evaluator = RegressionEvaluator(metricName='mae', labelCol='rating', predictionCol='prediction')

# Define a pipeline
pipeline = Pipeline(stages=[als])

# Fit the model in the pipeline on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model using the RegressionEvaluator
mae = evaluator.evaluate(predictions)

print(f'MAE (Test) = {mae}')



MAE (Test) = 0.6589723933762238


In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Define the ALS model
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop',
    implicitPrefs=False
)

# Define a parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01])
             .addGrid(als.rank, [10])
             .addGrid(als.maxIter, [15])
             .build())

# Create a RegressionEvaluator
evaluator = RegressionEvaluator(metricName='mae', labelCol='rating', predictionCol='prediction')

# Create a cross-validator
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Fit the model with hyperparameter tuning on the training data
cvModel = crossval.fit(train_data)

# Get the best model from cross-validation
best_model = cvModel.bestModel

# Make predictions on the test data using the best model
predictions = best_model.transform(test_data)

# Evaluate the model using the RegressionEvaluator
mae = evaluator.evaluate(predictions)

print ("**Best Model**")
print ("Rank: ", best_model)
print (" MaxIter: ", str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:",  best_model._java_obj.parent().regParam())


**Best Model**
Rank:  ALSModel: uid=ALS_a91cf54f1850, rank=10
 MaxIter:  15
 RegParam: ALS_a91cf54f1850__regParam


In [None]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test_data)
rmse = evaluator.evaluate(predictions)

In [None]:
#Print RMSE
print ("RMSE = "+str(rmse))

RMSE = 0.7187118252467011


In [None]:
movie_recommendation = best_model.recommendForAllUsers(10)
movie_recommendation.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{180985, 9.27125...|
|     3|[{668, 5.528395},...|
|     5|[{3569, 6.920839}...|
|     6|[{117109, 6.66919...|
|     9|[{1475, 8.8236685...|
|    12|[{7139, 6.199474}...|
|    13|[{6783, 6.002256}...|
|    15|[{2570, 7.78936},...|
|    16|[{1809, 8.046873}...|
|    17|[{2275, 11.046055...|
|    19|[{33817, 6.203159...|
|    20|[{62511, 7.454759...|
|    22|[{2337, 12.541262...|
|    26|[{4234, 7.981208}...|
|    27|[{135137, 8.74462...|
|    28|[{2589, 7.5457616...|
|    31|[{179085, 4.34892...|
|    34|[{117109, 7.34928...|
|    35|[{179085, 6.86652...|
|    37|[{1913, 7.728309}...|
+------+--------------------+
only showing top 20 rows



In [None]:
recommendations = best_model.transform(single_user)
userRecommendations= recommendations.orderBy('prediction',ascending=False)
userRecommendations.show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   8951|    12|  5.216053|
|  63876|    12| 4.9161263|
|    858|    12|  4.733227|
|  40870|    12| 4.6417127|
|   3435|    12| 4.5486298|
|   1221|    12| 4.5133395|
|   5339|    12| 4.4996204|
|    260|    12|  4.498723|
|  50068|    12| 4.4418745|
|   4993|    12|  4.440641|
|   1230|    12| 4.4205956|
|    912|    12| 4.4042926|
|   6711|    12| 4.3622675|
|   1208|    12| 4.3530936|
|  44555|    12| 4.2455835|
|    551|    12| 4.2203355|
|   1212|    12|  4.216413|
|   2997|    12| 4.1889677|
|   1242|    12| 4.1619616|
|   6385|    12| 4.1511345|
+-------+------+----------+
only showing top 20 rows



In [None]:
import pandas as pd
movie_recommendation = movie_recommendation.toPandas()

In [None]:
user_list = []
recommendations = []

for i in range(len(movie_recommendation)):
  user_list.append(movie_recommendation.iloc[i,0])
  user_recommendations = ""

  #Get item IDs from the recommendations
  for item in movie_recommendation.iloc[i,1]:
     user_recommendations = user_recommendations + ", " + str(item.asDict()["movieId"])

  #Append the item IDs to recommendations list
  recommendations.append(user_recommendations[2:])

#Convert results into a dataframe
recommendations_df = pd.DataFrame(data = zip(user_list, recommendations), columns=["user", "MovieID"])



In [None]:
#Check the users and the top 10 movie recommendations for the first 10 users
recommendations_df.head(10)

Unnamed: 0,user,MovieID
0,1,"180985, 45028, 119, 118930, 2769, 3773, 961, 8..."
1,3,"668, 4783, 171495, 764, 3599, 154890, 127180, ..."
2,5,"3569, 52042, 451, 2925, 1503, 2769, 56607, 564..."
3,6,"117109, 4066, 764, 6286, 198185, 6254, 3163, 1..."
4,9,"1475, 3920, 2769, 7193, 52975, 1226, 179085, 5..."
5,12,"7139, 2589, 8327, 4305, 156387, 26578, 443, 48..."
6,13,"6783, 118930, 764, 7013, 2769, 105250, 48165, ..."
7,15,"2570, 47491, 1173, 1306, 7767, 2275, 2239, 230..."
8,16,"1809, 8951, 3342, 3773, 180985, 88235, 156387,..."
9,17,"2275, 1503, 6184, 7453, 4699, 44193, 4959, 700..."
