Write and implement the following problem in PySpark
The movies.csv file contains following fields
a. Movieid
b. Title
c. Genre

The ratings.csv file contains the following fields
a. Userid
b. Movieid
c. Rating
d. Timestamp

A. Apply ALS algorithm for the above dataset. Display user id with title and genre
B. Find RMSE
C. Genres are seperaated by the pipe symbol. Display every unique genre along with count of their occurences
D. Display total rating of each movie and total rating of each genre
E. List out top 10 best performing movies. Also list 5 worst performing movies. Find out the average rating of all the movies and average rating of each user

In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieRecommendation") \
    .getOrCreate()

# Load movies.csv
movies_df = spark.read.csv("movies.csv", header=True, inferSchema=True)
print("Movies Schema:")
movies_df.printSchema()
movies_df.show(5)

# Load ratings.csv
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
print("Ratings Schema:")
ratings_df.printSchema()
ratings_df.show(5)

# Build ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

# Split data into training and test set
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# Train the ALS model
model = als.fit(training)

# Generate predictions
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Display recommended movies for each user
userRecs = model.recommendForAllUsers(5)  # recommending 5 movies for each user
userRecs_df = userRecs.join(movies_df, userRecs.userId == movies_df.movieId).select(userRecs.userId, "title", "genres")
userRecs_df.show(truncate=False)

# Stop SparkSession
spark.stop()


Movies Schema:
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

Ratings Schema:
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4

In [18]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieRecommendation") \
    .getOrCreate()

# Load movies.csv
movies_schema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)
])
movies_df = spark.read.csv("movies.csv", header=True, schema=movies_schema)

# Load ratings.csv
ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", IntegerType(), True)
])
ratings_df = spark.read.csv("ratings.csv", header=True, schema=ratings_schema)

# Build ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

# Split data into training and test set
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# Train the ALS model
model = als.fit(training)

# Generate predictions
predictions = model.transform(test)

# Calculate RMSE
rmse = predictions.select("rating", "prediction").rdd.map(lambda x: (x[0] - x[1])**2).mean()**0.5
print("Root-mean-square error = " + str(rmse))

# Display recommended movies for each user
userRecs = model.recommendForAllUsers(5)  # recommending 5 movies for each user
userRecs_df = userRecs.join(movies_df, userRecs.userId == movies_df.movieId).select(userRecs.userId, "title", "genres")
userRecs_df.show(truncate=False)

# Stop SparkSession
spark.stop()


Root-mean-square error = 1.076523958923484
+------+-------------------------------------+-------------------------------------------+
|userId|title                                |genres                                     |
+------+-------------------------------------+-------------------------------------------+
|1     |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2     |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3     |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4     |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5     |Father of the Bride Part II (1995)   |Comedy                                     |
|6     |Heat (1995)                          |Action|Crime|Thriller                      |
|7     |Sabrina (1995)                       |Comedy|Romance                             |
|8     |Tom and Huck (1995)                  |A

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, count

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieGenresCount") \
    .getOrCreate()

# Load movies.csv
movies_df = spark.read.csv("movies.csv", header=True)

# Split genres by pipe symbol
movies_df = movies_df.withColumn("genres_split", split("genres", "\|"))

# Explode genres array
movies_df = movies_df.withColumn("genre", explode("genres_split"))

# Count occurrences of each genre
genre_counts = movies_df.groupBy("genre").agg(count("*").alias("count")).orderBy("count", ascending=False)

# Show the result
genre_counts.show(truncate=False)

# Stop SparkSession
spark.stop()




+------------------+-----+
|genre             |count|
+------------------+-----+
|Drama             |4361 |
|Comedy            |3756 |
|Thriller          |1894 |
|Action            |1828 |
|Romance           |1596 |
|Adventure         |1263 |
|Crime             |1199 |
|Sci-Fi            |980  |
|Horror            |978  |
|Fantasy           |779  |
|Children          |664  |
|Animation         |611  |
|Mystery           |573  |
|Documentary       |440  |
|War               |382  |
|Musical           |334  |
|Western           |167  |
|IMAX              |158  |
|Film-Noir         |87   |
|(no genres listed)|34   |
+------------------+-----+



In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, explode

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieRatingAndGenreRating") \
    .getOrCreate()

# Load movies.csv and ratings.csv
movies_df = spark.read.csv("movies.csv", header=True)
ratings_df = spark.read.csv("ratings.csv", header=True)

# Total rating of each movie
movie_ratings = ratings_df.groupBy("movieId").agg(sum("rating").alias("total_rating"))

# Total rating of each genre
genre_ratings = movies_df.join(ratings_df, "movieId") \
    .withColumn("genre", explode(split("genres", "\|"))) \
    .groupBy("genre").agg(sum("rating").alias("total_rating_per_genre"))

# Show results
print("Total rating of each movie:")
movie_ratings.show(truncate=False)

print("Total rating of each genre:")
genre_ratings.show(truncate=False)

# Stop SparkSession
spark.stop()




Total rating of each movie:
+-------+------------+
|movieId|total_rating|
+-------+------------+
|296    |1288.5      |
|1090   |251.0       |
|115713 |109.5       |
|3210   |146.0       |
|88140  |113.5       |
|829    |24.0        |
|2088   |45.0        |
|2294   |146.0       |
|4821   |15.5        |
|48738  |79.5        |
|3959   |29.0        |
|89864  |69.0        |
|2136   |34.5        |
|691    |10.0        |
|3606   |15.0        |
|121007 |4.0         |
|6731   |29.0        |
|27317  |22.5        |
|26082  |13.5        |
|100553 |9.0         |
+-------+------------+
only showing top 20 rows

Total rating of each genre:
+------------------+----------------------+
|genre             |total_rating_per_genre|
+------------------+----------------------+
|Crime             |61024.0               |
|Romance           |63552.0               |
|Thriller          |92415.5               |
|Adventure         |84771.5               |
|Drama             |153296.5              |
|War          

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieStats") \
    .getOrCreate()

# Load ratings.csv
ratings_df = spark.read.csv("ratings.csv", header=True)

# Calculate total rating of each movie
movie_ratings = ratings_df.groupBy("movieId").agg(avg("rating").alias("avg_rating"))

# Find top 10 best performing movies
top_10_best_movies = movie_ratings.orderBy(col("avg_rating").desc()).limit(10)
print("Top 10 best performing movies:")
top_10_best_movies.show(truncate=False)

# Find 5 worst performing movies
worst_movies = movie_ratings.orderBy(col("avg_rating")).limit(5)
print("5 worst performing movies:")
worst_movies.show(truncate=False)

# Calculate average rating of all movies
average_rating_all_movies = movie_ratings.agg(avg("avg_rating")).collect()[0][0]
print("Average rating of all movies:", average_rating_all_movies)

# Calculate average rating of each user
user_avg_rating = ratings_df.groupBy("userId").agg(avg("rating").alias("avg_rating_per_user"))
print("Average rating of each user:")
user_avg_rating.show(truncate=False)

# Stop SparkSession
spark.stop()




Top 10 best performing movies:
+-------+----------+
|movieId|avg_rating|
+-------+----------+
|170597 |5.0       |
|102217 |5.0       |
|139640 |5.0       |
|1349   |5.0       |
|140627 |5.0       |
|67618  |5.0       |
|147330 |5.0       |
|6402   |5.0       |
|149508 |5.0       |
|467    |5.0       |
+-------+----------+

5 worst performing movies:
+-------+----------+
|movieId|avg_rating|
+-------+----------+
|157172 |0.5       |
|3933   |0.5       |
|4580   |0.5       |
|151745 |0.5       |
|26696  |0.5       |
+-------+----------+

Average rating of all movies: 3.2624482748109664
Average rating of each user:
+------+-------------------+
|userId|avg_rating_per_user|
+------+-------------------+
|296   |4.166666666666667  |
|467   |3.409090909090909  |
|125   |3.859722222222222  |
|451   |3.7941176470588234 |
|7     |3.2302631578947367 |
|51    |3.7757660167130918 |
|124   |3.99               |
|447   |3.871794871794872  |
|591   |3.2777777777777777 |
|307   |2.6656410256410257 |
|4