### 5. Given a Movie dataset containing user ratings for movies, using PySpark SQL perform the following operations
### • Load a CSV file containing movie data
### • Create temporary views for movies and ratings.
### • Write queries to find the top 10 highest-rated movies with at least 10 ratings.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Program 5").getOrCreate()

#### • Load a CSV file containing movie data

In [9]:
movies_df = spark.read.csv("Datasets/Movies/movies.csv", header=True, inferSchema=True)

In [10]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [11]:
ratings_df = spark.read.csv("Datasets/Movies/ratings.csv", header=True, inferSchema=True)

In [12]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



#### • Create temporary views for movies and ratings.

In [13]:
movies_df.createOrReplaceTempView("movies")

In [14]:
ratings_df.createOrReplaceTempView("ratings")

#### • Write queries to find the top 10 highest-rated movies with at least 10 ratings.

In [31]:
query = """
        Select m.movieId, title, avg(rating) as Average_Rating, count(rating) as No_of_Ratings
        from movies as m join ratings as r
        on m.movieId = r.movieId
        group by m.movieId, title
        having No_of_Ratings>10
        order by Average_Rating desc
        limit 10
        """

In [32]:
top_10_movies = spark.sql(query)

In [29]:
top_10_movies.show()

+-------+--------------------+-----------------+-------------+
|movieId|               title|   Average_Rating|No_of_Ratings|
+-------+--------------------+-----------------+-------------+
|   1041|Secrets & Lies (1...|4.590909090909091|           11|
|   3451|Guess Who's Comin...|4.545454545454546|           11|
|   1178|Paths of Glory (1...|4.541666666666667|           12|
|   1104|Streetcar Named D...|            4.475|           20|
|   2360|Celebration, The ...|4.458333333333333|           12|
|   1217|          Ran (1985)|4.433333333333334|           15|
|    318|Shawshank Redempt...|4.429022082018927|          317|
|    951|His Girl Friday (...|4.392857142857143|           14|
|   3468| Hustler, The (1961)|4.333333333333333|           18|
|    922|Sunset Blvd. (a.k...|4.333333333333333|           27|
+-------+--------------------+-----------------+-------------+



In [30]:
spark.stop()