<h1><center>SparkFlix Analytics: Exploring Movie Reviews with PySpark and SQL </center></h1>

# Sai Sanwariya Narayan

## The goals of this project is:
### - Use Data Frames in Spark for Processing Structured Data
### - Perform Basic DataFrame Transformations such as filtering rows and selecting columns of DataFrame
### - Create New Columns of DataFrame using `withColumn`
### - Use DF SQL Functions to transform a string into an Array
### - Filter on a DF Column that is an Array using `array_contains`
### - Perform Join operations on DataFrames 
### - Use groupBy, followed by count and sum DF transformation to calculate the count and the sum of a DF column (e.g., reviews) for each group (e.g., movie).
### - Perform sorting on a DataFrame column
### - Apply the obove to find Movies in a Genre of your choice that has good reviews with a significant number of ratings (use 10 as the threshold for local mode, 100 as the threshold for cluster mode).
### - After completing all exercises in the Notebook, convert the code for processing large reviews dataset and large movies dataset to find movies with top average ranking with at least 100 reviews for a genre of comedy.

## Importing pyspark first.

In [1]:
import pyspark

### Once we import pyspark, we need to import "SparkContext".  Every spark program needs a SparkContext object
### In order to use Spark SQL on DataFrames, we also need to import SparkSession from PySpark.SQL

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, column
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql import Row

## We then create a Spark Session variable (rather than Spark Context) in order to use DataFrame. 

In [3]:
ss=SparkSession.builder.master("local").appName("Lab 5 Top Reviews").getOrCreate()

In [4]:
ss.sparkContext.setCheckpointDir("~/scratch")

In [5]:
rating_schema = StructType([ StructField("UserID", IntegerType(), False ), \
                            StructField("MovieID", IntegerType(), True), \
                            StructField("Rating", FloatType(), True ), \
                            StructField("RatingID", IntegerType(), True ), \
                           ])

In [6]:
ratings_DF = ss.read.csv("/storage/home/ratings_2.csv", schema= rating_schema, header=True, inferSchema=False)
# In the cluster mode, we need to change to  `header=False` because it does not have header.

In [7]:
movie_schema = StructType([ StructField("MovieID", IntegerType(), False), \
                            StructField("MovieTitle", StringType(), True ), \
                            StructField("Genres", StringType(), True ), \
                           ])

In [8]:
movies_DF = ss.read.csv("/storage/home/movies_2.csv", schema=movie_schema, header=True, inferSchema=False)
# In the cluster mode, we need to change to `header=False` because it does not have header.

In [9]:
movies_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)



In [10]:
movies_DF.show(10)

+-------+--------------------+--------------------+
|MovieID|          MovieTitle|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [11]:
movies_genres_DF = movies_DF.select("MovieID","Genres")

In [12]:
movies_genres_rdd = movies_genres_DF.rdd

In [13]:
movies_genres_rdd.take(3)

[Row(MovieID=1, Genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(MovieID=2, Genres='Adventure|Children|Fantasy'),
 Row(MovieID=3, Genres='Comedy|Romance')]

In [14]:
movies_genres2_rdd = movies_genres_rdd.flatMap(lambda x: x['Genres'].split('|'))

In [15]:
movies_genres2_rdd.take(3)

['Adventure', 'Animation', 'Children']

In [16]:
movies_genres3_rdd = movies_genres2_rdd.map(lambda x: (x, 1))

In [17]:
movies_genres_count_rdd = movies_genres3_rdd.reduceByKey(lambda x, y: x+y)

In [18]:
movies_genres_count_rdd.take(10)

[('Adventure', 1117),
 ('Animation', 447),
 ('Children', 583),
 ('Comedy', 3315),
 ('Fantasy', 654),
 ('Romance', 1545),
 ('Drama', 4365),
 ('Action', 1545),
 ('Crime', 1100),
 ('Thriller', 1729)]

In [19]:
movies_genres_count_rdd.saveAsTextFile("/storage/home/MovieGenres_count.txt")

In [20]:
ratings_DF.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: float (nullable = true)
 |-- RatingID: integer (nullable = true)



In [21]:
ratings_DF.show(5)

+------+-------+------+----------+
|UserID|MovieID|Rating|  RatingID|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



# 2. DataFrames Transformations
DataFrame in Spark provides higher-level transformations that are convenient for selecting rows, columns, and for creating new columns.  These transformations are part of Spark SQL.

## 2.1 `where` DF Transformation for Filtering/Selecting Rows
Select rows from a DataFrame (DF) that satisfy a condition.  This is similar to "WHERE" clause in SQL query language.
- One important difference (compared to SQL) is we need to add `col( ...)` when referring to a column name. 
- The condition inside `where` transformation can be an equality test, `>` test, or '<' test, as illustrated below.

# `show` DF action
The `show` DF action is similar to `take` RDD action. It takes a number as a parameter, which is the number of elements to be randomly selected from the DF to be displayed.

In [22]:
movies_DF.where(movies_DF["MovieTitle"] == "Toy Story (1995)").show()

+-------+----------------+--------------------+
|MovieID|      MovieTitle|              Genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+



In [23]:
ratings_DF.where(ratings_DF["Rating"] > 3).show(5)

+------+-------+------+----------+
|UserID|MovieID|Rating|  RatingID|
+------+-------+------+----------+
|     1|   1172|   4.0|1260759205|
|     1|   1339|   3.5|1260759125|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     2|     10|   4.0| 835355493|
+------+-------+------+----------+
only showing top 5 rows



# `count` DF action
The `count` action returns the total number of elements in the input DataFrame.

In [24]:
ratings_DF.filter(4 < ratings_DF["Rating"]).count()

22818

# Filtering DF Rows

In [25]:
review_5_count = ratings_DF.where(5 == col("Rating")).count()
print(review_5_count)

15095


# DataFrame Transformation for Selecting Columns

DataFrame transformation `select` is similar to the projection operation in SQL: it returns a DataFrame that contains all of the columns selected.

In [26]:
movies_DF.select("MovieTitle").show(5)

+--------------------+
|          MovieTitle|
+--------------------+
|    Toy Story (1995)|
|      Jumanji (1995)|
|Grumpier Old Men ...|
|Waiting to Exhale...|
|Father of the Bri...|
+--------------------+
only showing top 5 rows



In [27]:
movies_DF.select(col("MovieTitle")).show(5)

+--------------------+
|          MovieTitle|
+--------------------+
|    Toy Story (1995)|
|      Jumanji (1995)|
|Grumpier Old Men ...|
|Waiting to Exhale...|
|Father of the Bri...|
+--------------------+
only showing top 5 rows



# Selecting DF Columns

In [28]:
movie_rating_DF = ratings_DF.select(col("MovieID"),col("Rating"))

In [29]:
movie_rating_DF.show(5)

+-------+------+
|MovieID|Rating|
+-------+------+
|     31|   2.5|
|   1029|   3.0|
|   1061|   3.0|
|   1129|   2.0|
|   1172|   4.0|
+-------+------+
only showing top 5 rows



# Statistical Summary of Numerical Columns
DataFrame provides a `describe` method that provides a summary of basic statistical information (e.g., count, mean, standard deviation, min, max) for numerical columns.

In [30]:
ratings_DF.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            UserID|           MovieID|            Rating|            RatingID|
+-------+------------------+------------------+------------------+--------------------+
|  count|            100004|            100004|            100004|              100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|1.1296390869392424E9|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|1.9168582602710962E8|
|    min|                 1|                 1|               0.5|           789652009|
|    max|               671|            163949|               5.0|          1476640644|
+-------+------------------+------------------+------------------+--------------------+



## RDD has a histogram method to compute the total number of rows in each "bucket".
The code below selects the Rating column from `ratings_DF`, converts it to an RDD, which maps to extract the rating value for each row, which is used to compute the total number of reviews in 5 buckets.

In [31]:
ratings_DF.select(col("Rating")).rdd.map(lambda row: row[0]).histogram([0,1,2,3,4,5,6])

([0, 1, 2, 3, 4, 5, 6], [1101, 5013, 11720, 30602, 36473, 15095])

# Transforming the Generes Column into Array of Generes 
## We want transform a column Generes, which represent all Generes of a movie using a string that uses "|" to connect the Generes so that we can later filter for movies of a Genere more efficiently.
## This transformation can be done using `split` Spark SQL function (which is different from python `split` function)

In [32]:
Splitted_Generes_DF= movies_DF.select(split(col("Genres"), '\|'))
Splitted_Generes_DF.show(5)

+---------------------+
|split(Genres, \|, -1)|
+---------------------+
| [Adventure, Anima...|
| [Adventure, Child...|
|    [Comedy, Romance]|
| [Comedy, Drama, R...|
|             [Comedy]|
+---------------------+
only showing top 5 rows



## Adding a Column to a DataFrame using withColumn

# `withColumn` DF Transformation

We often need to transform content of a column into another column. For example, it is desirable to transform the column Genres in the movies DataFrame into an `Array` of genres that each movie belongs, we can do this using the DataFrame method `withColumn`.

### Creates a new column called "Genres_Array", whose values are arrays of genres for each movie, obtained by splitting the column value of "Genres" for each row (movie).

In [33]:
moviesG2_DF= movies_DF.withColumn("Genres_Array",split("Genres", '\|') )

In [34]:
moviesG2_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Genres_Array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [35]:
moviesG2_DF.show(5)

+-------+--------------------+--------------------+--------------------+
|MovieID|          MovieTitle|              Genres|        Genres_Array|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|            [Comedy]|
+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



# Choosing Comedy as Genre

In [36]:
from pyspark.sql.functions import array_contains
movies_your_genre_DF = moviesG2_DF.filter(array_contains(moviesG2_DF["Genres_Array"], "Comedy" ))

In [37]:
movies_your_genre_DF.show(5)

+-------+--------------------+--------------------+--------------------+
|MovieID|          MovieTitle|              Genres|        Genres_Array|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|            [Comedy]|
|      7|      Sabrina (1995)|      Comedy|Romance|   [Comedy, Romance]|
+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



# An DF-based approach to compute Average Movie Ratings and Total Count of Reviews for each movie.

# `groupBy` DF transformation
Takes a column name (string) as the parameter, the transformation groups rows of the DF based on the column.  All rows with the same value for the column is grouped together.  The result of groupBy transformation is often folled by an aggregation across all rows in the same group.  

# `sum` DF transformation
Takes a column name (string) as the parameter. This is typically used after `groupBy` DF transformation, `sum` adds the content of the input column of all rows in the same group.

# `count` DF transformation
Returns the number of rows in the DataFrame.  When `count` is used after `groupBy`, it returns a DataFrame with a column called "count" that contains the total number of rows for each group generated by the `groupBy`.

In [38]:
Movie_RatingSum_DF = ratings_DF.groupBy("MovieID").sum("Rating")

In [39]:
Movie_RatingSum_DF.show(4)

+-------+-----------+
|MovieID|sum(Rating)|
+-------+-----------+
|   1580|      696.0|
|   2659|       12.0|
|   3794|       17.0|
|   3175|      228.0|
+-------+-----------+
only showing top 4 rows



# Calculating the total number of reviews for each movies.

In [40]:
Movie_RatingCount_DF = ratings_DF.groupBy(col("MovieID")).count()

In [41]:
Movie_RatingCount_DF.show(4)

+-------+-----+
|MovieID|count|
+-------+-----+
|   1580|  190|
|   2659|    3|
|   3794|    5|
|   3175|   65|
+-------+-----+
only showing top 4 rows



# Join Transformation on Two DataFrames

In [42]:
Movie_Rating_Sum_Count_DF = Movie_RatingSum_DF.join(Movie_RatingCount_DF, "MovieID", 'inner')

In [43]:
Movie_Rating_Sum_Count_DF.show(4)

+-------+-----------+-----+
|MovieID|sum(Rating)|count|
+-------+-----------+-----+
|   1580|      696.0|  190|
|   2659|       12.0|    3|
|   3794|       17.0|    5|
|   3175|      228.0|   65|
+-------+-----------+-----+
only showing top 4 rows



In [44]:
Movie_Rating_Count_Avg_DF = Movie_Rating_Sum_Count_DF.withColumn("AvgRating", (col("sum(Rating)") / col("count")) )

In [45]:
Movie_Rating_Count_Avg_DF.show(4)

+-------+-----------+-----+------------------+
|MovieID|sum(Rating)|count|         AvgRating|
+-------+-----------+-----+------------------+
|   1580|      696.0|  190| 3.663157894736842|
|   2659|       12.0|    3|               4.0|
|   3794|       17.0|    5|               3.4|
|   3175|      228.0|   65|3.5076923076923077|
+-------+-----------+-----+------------------+
only showing top 4 rows



##  Next, we want to join the avg_rating_total_review_DF with moviesG2_DF

In [46]:
joined_DF = Movie_Rating_Count_Avg_DF.join(moviesG2_DF,'MovieID', 'inner')

In [47]:
moviesG2_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Genres_Array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [48]:
joined_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- sum(Rating): double (nullable = true)
 |-- count: long (nullable = false)
 |-- AvgRating: double (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Genres_Array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [49]:
joined_DF.show(4)

+-------+-----------+-----+------------------+--------------------+--------------------+--------------------+
|MovieID|sum(Rating)|count|         AvgRating|          MovieTitle|              Genres|        Genres_Array|
+-------+-----------+-----+------------------+--------------------+--------------------+--------------------+
|   1580|      696.0|  190| 3.663157894736842|Men in Black (a.k...|Action|Comedy|Sci-Fi|[Action, Comedy, ...|
|   2659|       12.0|    3|               4.0|It Came from Holl...|  Comedy|Documentary|[Comedy, Document...|
|   3794|       17.0|    5|               3.4| Chuck & Buck (2000)|        Comedy|Drama|     [Comedy, Drama]|
|   3175|      228.0|   65|3.5076923076923077| Galaxy Quest (1999)|Adventure|Comedy|...|[Adventure, Comed...|
+-------+-----------+-----+------------------+--------------------+--------------------+--------------------+
only showing top 4 rows



# Filter DataFrame on an Array Column of DataFrame Using `array_contains`

In [50]:
from pyspark.sql.functions import array_contains
SelectGenreAvgRating_DF = joined_DF.filter(array_contains('Genres_Array', \
                                               "Comedy")).select("MovieID","AvgRating","count","MovieTitle")

In [51]:
SelectGenreAvgRating_DF.show(5)

+-------+------------------+-----+--------------------+
|MovieID|         AvgRating|count|          MovieTitle|
+-------+------------------+-----+--------------------+
|   1580| 3.663157894736842|  190|Men in Black (a.k...|
|   2659|               4.0|    3|It Came from Holl...|
|   3794|               3.4|    5| Chuck & Buck (2000)|
|   3175|3.5076923076923077|   65| Galaxy Quest (1999)|
|    471| 3.877551020408163|   49|Hudsucker Proxy, ...|
+-------+------------------+-----+--------------------+
only showing top 5 rows



In [52]:
SelectGenreAvgRating_DF.count()

3307

In [53]:
SelectGenreAvgRating_DF.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|           MovieID|         AvgRating|             count|          MovieTitle|
+-------+------------------+------------------+------------------+--------------------+
|  count|              3307|              3307|              3307|                3307|
|   mean|30277.501663138795| 3.190353163648426|11.498639250075597|                null|
| stddev|  40111.1391405288|0.8910523473881353|23.795030821491128|                null|
|    min|                 1|               0.5|                 1|'Hellboy': The Se...|
|    max|            160567|               5.0|               341|À nous la liberté...|
+-------+------------------+------------------+------------------+--------------------+



In [54]:
SortedSelectGenreAvgRating_DF = SelectGenreAvgRating_DF.orderBy('AvgRating', ascending=False)

In [55]:
SortedSelectGenreAvgRating_DF.show(10)

+-------+---------+-----+--------------------+
|MovieID|AvgRating|count|          MovieTitle|
+-------+---------+-----+--------------------+
|   4796|      5.0|    1|Grass Is Greener,...|
| 140749|      5.0|    1| 29th and Gay (2005)|
|  26501|      5.0|    1|    Choose Me (1984)|
|  95313|      5.0|    1|Jack-Jack Attack ...|
|  61250|      5.0|    1|House Bunny, The ...|
|    183|      5.0|    1| Mute Witness (1994)|
|    876|      5.0|    1|Supercop 2 (Proje...|
|   8123|      5.0|    1|Sammy and Rosie G...|
|  91690|      5.0|    1|Friends with Kids...|
|   6342|      5.0|    1|    Trip, The (2002)|
+-------+---------+-----+--------------------+
only showing top 10 rows



### Use DataFrame method `where` or `filter` to find all movies (in your choice of genre) that have more than 10 reviews (change this to 100 for the cluster mode).

In [56]:
SortedFilteredSelectGenreAvgRating_DF = SortedSelectGenreAvgRating_DF.where(col("count") > 10)

In [57]:
SortedFilteredSelectGenreAvgRating_DF.show(5)

+-------+-----------------+-----+--------------------+
|MovieID|        AvgRating|count|          MovieTitle|
+-------+-----------------+-----+--------------------+
|   1948|4.458333333333333|   12|    Tom Jones (1963)|
|    969|             4.42|   50|African Queen, Th...|
|   3035|4.411764705882353|   17|Mister Roberts (1...|
|   1066|4.409090909090909|   11|Shall We Dance (1...|
|    905|             4.38|   25|It Happened One N...|
+-------+-----------------+-----+--------------------+
only showing top 5 rows



## Cluster mode tests with Comedy genre.

In [58]:
output_path = "/storage/home/SortedFilteredComedyMovieAvgRating_local"
SortedFilteredSelectGenreAvgRating_DF.write.csv(output_path)

In [59]:
ss.stop()

- Choice of the genre for analysis 

     - Comedy
     
    _____

- Top five movies in the genre?
    - 1. Life Is Beautiful (La Vita è bella) (1997) 
    - 2. Pulp Fiction (1994)
    - 3. Monty Python and the Holy Grail (1975)
    - 4. Thin Man, The (1934)
    - 5. Sting, The (1973)
    
    _____

- The computation time in Cluster mode took: 

    - real 	 1m 10.355s


