In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import window,column,desc,col,instr,expr, pow,translate,lit
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, from_unixtime
import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as f

spark = SparkSession \
    .builder \
    .appName("Foo") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

You can monitor the progress of a job through the Spark web UI. The Spark UI is available on port 4040 of the driver node. If you are running in local mode, this will be http://localhost:4040

In [2]:
movie_data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",")\
.option("dateFormat", "MM/dd/YYYY HH:mm")\
.load("C:/Users/ramya/Desktop/Santa_Clara_University/Projects/TODO/Moviesallstreaming/MoviesOnStreamingPlatforms_updated.csv")

movie_data.head()

Row(ID=1, Title='Inception', Year='2010', Age='13+', IMDb=8.8, Rotten Tomatoes='87%', Netflix=1, Hulu=0, Prime Video=0, Disney+=0, Directors='Christopher Nolan', Genres='Action,Adventure,Sci-Fi,Thriller', Country='United States,United Kingdom', Language='English,Japanese,French', Runtime=148)

In [3]:
movie_data.sort("Title").explain()

== Physical Plan ==
*(1) Sort [Title#17 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Title#17 ASC NULLS FIRST, 200), true, [id=#32]
   +- FileScan csv [ID#16,Title#17,Year#18,Age#19,IMDb#20,Rotten Tomatoes#21,Netflix#22,Hulu#23,Prime Video#24,Disney+#25,Directors#26,Genres#27,Country#28,Language#29,Runtime#30] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/ramya/Desktop/Santa_Clara_University/Projects/TODO/Moviesallstre..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:int,Title:string,Year:string,Age:string,IMDb:double,Rotten Tomatoes:string,Netflix:int,...




**SCHEMA**

A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. The metadata is a way of storing information about this column (Spark uses this in its machine learning library).

In [4]:
movie_data.schema

StructType(List(StructField(ID,IntegerType,true),StructField(Title,StringType,true),StructField(Year,StringType,true),StructField(Age,StringType,true),StructField(IMDb,DoubleType,true),StructField(Rotten Tomatoes,StringType,true),StructField(Netflix,IntegerType,true),StructField(Hulu,IntegerType,true),StructField(Prime Video,IntegerType,true),StructField(Disney+,IntegerType,true),StructField(Directors,StringType,true),StructField(Genres,StringType,true),StructField(Country,StringType,true),StructField(Language,StringType,true),StructField(Runtime,IntegerType,true)))

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [6]:
fill_cols_vals = {"IMDb": 0, "Age" : "all"}
movie_data.na.fill(fill_cols_vals)

DataFrame[ID: int, Title: string, Year: string, Age: string, IMDb: double, Rotten Tomatoes: string, Netflix: int, Hulu: int, Prime Video: int, Disney+: int, Directors: string, Genres: string, Country: string, Language: string, Runtime: int]

**MAX RUN TIME**

In [7]:
movie_data.select(max("Runtime")).take(1)

[Row(max(Runtime)=1256)]

**TOP MOST MOVIES PRODUCING YEARS OVERALL**

In [8]:
movie_data.groupBy("Year").count().withColumnRenamed("count", "Count_movies").\
sort(desc("Count_movies")).show(5)

+----+------------+
|Year|Count_movies|
+----+------------+
|2017|        1401|
|2018|        1284|
|2016|        1206|
|2015|        1065|
|2014|         986|
+----+------------+
only showing top 5 rows



**TOP MOST MOVIES PRODUCING YEARS FOR EACH PLATFORM**

In [9]:
movie_data.groupBy("Year").agg(sum("Netflix"),sum("Hulu"),sum("Prime Video"),sum("Disney+")).sort(desc("Year")).\
withColumnRenamed("sum(Netflix)","Movies_Netflix").show(8)

+----+--------------+---------+----------------+------------+
|Year|Movies_Netflix|sum(Hulu)|sum(Prime Video)|sum(Disney+)|
+----+--------------+---------+----------------+------------+
|2020|           104|        6|              31|           9|
|2019|           428|      104|             172|          23|
|2018|           560|      158|             624|          16|
|2017|           569|      124|             763|          22|
|2016|           444|       62|             730|          17|
|2015|           272|       61|             765|          10|
|2014|           174|       46|             783|          12|
|2013|           133|       45|             811|          12|
+----+--------------+---------+----------------+------------+
only showing top 8 rows



**AGE WISE AVERAGE RATING ON EACH PLATFORM**

In [10]:
movie_data.filter(col("Netflix") == 1).groupBy(["Age","Year"]).agg(round(mean("IMDb"),2)).\
withColumnRenamed("round(avg(IMDb), 2)","Mean_rating").sort(desc("Mean_rating")).show(5)

+---+----+-----------+
|Age|Year|Mean_rating|
+---+----+-----------+
|18+|1966|        8.8|
| 7+|1985|        8.5|
|13+|1968|        8.5|
|18+|1989|        8.5|
| 7+|1981|        8.4|
+---+----+-----------+
only showing top 5 rows



**Structured Streaming**


maxFilesPerTrigger option, which simply specifies the number of files we should read in at once. This is to make our demonstration more “streaming,” and in a production scenario this would probably be omitted

In [11]:
streamingDataFrame = spark.readStream\
    .schema(movie_data.schema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("C:/Users/ramya/Desktop/Santa_Clara_University/Projects/TODO/Moviesallstreaming/MoviesOnStreamingPlatforms_updated.csv")

In [12]:
streamingDataFrame.isStreaming

True

**WHICH MOVIE IS PRESENT IN ALL PLATFORMS**

In [13]:
movie_data=movie_data.withColumn("No_of_platforms",col("Netflix")+col("Hulu")+col("Prime Video")+ col("Disney+"))

NO movie is present in all 4 platforms but the below are present in 3 of them

select and selectExpr allow you to do the DataFrame equivalent of SQL queries on a table of data

In [14]:
movie_data.where(col("No_of_platforms")==3).select("Title","IMDb","Rotten Tomatoes").show()

+--------------------+----+---------------+
|               Title|IMDb|Rotten Tomatoes|
+--------------------+----+---------------+
|                 Amy| 7.8|            95%|
|          The Square| 8.1|           100%|
|       The Interview| 6.5|            52%|
|              Blame!| 6.7|            82%|
|           Evolution| 6.1|            43%|
|No Game No Life: ...| 7.5|           null|
|              Zapped| 5.1|           null|
|              Mother| 5.6|           null|
|             The Kid| 5.9|            45%|
|          Inside Out| 4.5|            25%|
+--------------------+----+---------------+



**EACH YEAR IMDB TOP MOST RATED MOVIES**

In [15]:
w = Window.partitionBy('Year')


movie_data.withColumn('max_R', f.max('IMDb').over(w))\
    .where(f.col('IMDb') == f.col('max_R'))\
    .drop('max_R')\
    .select("Year","Title","IMDb","Rotten Tomatoes").sort(desc("Year")).show(truncate=False)

+----+-------------------------------------------------------------------------------------------+----+---------------+
|Year|Title                                                                                      |IMDb|Rotten Tomatoes|
+----+-------------------------------------------------------------------------------------------+----+---------------+
|2020|Sufna                                                                                      |8.2 |null           |
|2019|Square One                                                                                 |9.3 |null           |
|2019|My Next Guest with David Letterman and Shah Rukh Khan                                      |9.3 |null           |
|2018|Operation Toussaint: Operation Underground Railroad and the Fight to End Modern Day Slavery|8.8 |null           |
|2017|Where's Daddy?                                                                             |9.1 |null           |
|2016|Natsamrat                         

**EACH YEAR IMDB TOP MOST RATED MOVIES IN NETFLIX**

In [16]:
movie_data.filter(col("Netflix") == 1).withColumn('max_R', f.max('IMDb').over(w))\
    .where(f.col('IMDb') == f.col('max_R'))\
    .drop('max_R')\
    .select("Year","Title","IMDb","Rotten Tomatoes").sort(desc("Year")).show(truncate=False)

+----+---------------------------------------------------------------------+----+---------------+
|Year|Title                                                                |IMDb|Rotten Tomatoes|
+----+---------------------------------------------------------------------+----+---------------+
|2020|A Secret Love                                                        |8.1 |null           |
|2019|My Next Guest with David Letterman and Shah Rukh Khan                |9.3 |null           |
|2018|Untamed Romania                                                      |8.7 |null           |
|2017|One Heart: The A.R. Rahman Concert Film                              |8.7 |null           |
|2016|Natsamrat                                                            |9.1 |null           |
|2015|Eh Janam Tumhare Lekhe                                               |8.7 |null           |
|2014|Punjab 1984                                                          |8.5 |null           |
|2013|Bo Burnham: Wh

**COUNT OF MOVIES IN EACH PLATFORM YEAR WISE AFTER 2000 having rating above 5**

In [17]:
movie_data.where(col("Year") > 2000).where(col("IMDb") >= 5).groupBy("Year").agg(sum("Netflix"),sum("Hulu")).sort(desc("Year")).\
withColumnRenamed("sum(Netflix)","Count_of_movies_Netflix").withColumnRenamed("sum(Hulu)","Count_of_movies_Hulu").show(5)

+----+-----------------------+--------------------+
|Year|Count_of_movies_Netflix|Count_of_movies_Hulu|
+----+-----------------------+--------------------+
|2020|                     82|                   4|
|2019|                    337|                  90|
|2018|                    468|                 132|
|2017|                    481|                 104|
|2016|                    370|                  51|
+----+-----------------------+--------------------+
only showing top 5 rows



**COUNT OF DISTINCT DIRECTORS IN EACH PLATFORM YEAR WISE AFTER 2000**

In [18]:
movie_data.where(col("Year") > 2000).select("Directors").distinct().count()

9213

**WHICH YEAR WERE HIGHEST COUNT OF MOVIES PRESENT IN ALL THE PLATFORMS TOGETHER**

In [19]:
movie_data.where(col("Year") > 2000).groupBy("Year").\
agg(sum("Netflix"),sum("Hulu"),sum("Prime Video"),sum("Disney+")).\
withColumn("Count_of_all", col("sum(Netflix)")+col("sum(Hulu)")+col("sum(Prime Video)")+col("sum(Disney+)")).\
sort(desc("Count_of_all")).select(col("Year"),col("Count_of_all")).show(2)

+----+------------+
|Year|Count_of_all|
+----+------------+
|2017|        1478|
|2018|        1358|
+----+------------+
only showing top 2 rows



**WHICH YEAR WERE HIGHEST RATED MOVIES PRESENT IN ALL THE PLATFORMS TOGETHER**

In [20]:
movie_data.where(col("Year") > 2000).groupBy("Year").\
agg(mean("IMDb")).\
withColumnRenamed("avg(IMDb)","Average_rating").\
sort(desc("Average_rating")).select(col("Year"),col("Average_rating")).show(2)

+----+-----------------+
|Year|   Average_rating|
+----+-----------------+
|2020|6.175000000000001|
|2002|6.065608465608464|
+----+-----------------+
only showing top 2 rows



**HOW DID THE DURATION OF MOVIES CHANGED ACROSS YEARS-FINDING DURATION OF TOP RATED MOVIES AND LEAST RATED MOVIES**

In [21]:
movie_h=movie_data.withColumn("High_or_low", when(col("IMDb")>=5, "Highly_rated").otherwise("Low_rated")).\
where(col("High_or_low")=="Highly_rated").groupBy("Year").agg(round(mean("Runtime"),2)).\
withColumnRenamed("round(avg(Runtime), 2)","Avg_rating_of_highly_rated_movies").sort(desc(("Year")))

In [22]:
movie_l=movie_data.withColumn("High_or_low", when(col("IMDb")>=5, "Highly_rated").otherwise("Low_rated")).\
where(col("High_or_low")=="Low_rated").groupBy("Year").agg(round(mean("Runtime"),2)).\
withColumnRenamed("round(avg(Runtime), 2)","Avg_rating_of_low_rated_movies").sort(desc(("Year")))

In [23]:
movie_h.join(movie_l, 'Year').sort(desc("Year")).show()

+----+---------------------------------+------------------------------+
|Year|Avg_rating_of_highly_rated_movies|Avg_rating_of_low_rated_movies|
+----+---------------------------------+------------------------------+
|2020|                            93.69|                         95.16|
|2019|                            94.54|                         89.04|
|2018|                            97.88|                         85.96|
|2017|                            96.01|                         89.42|
|2016|                            94.74|                         91.97|
|2015|                            94.21|                          89.1|
|2014|                            94.63|                         88.95|
|2013|                            92.36|                         88.97|
|2012|                            91.61|                         87.49|
|2011|                            94.53|                         86.86|
|2010|                            93.12|                        

**ENGLISH AND NON ENGLISH MOVIES AVERAGE RATING**

In [24]:
descripFilter = instr(lower(movie_data.Language), "english") >= 1
movie_data.where(descripFilter).agg(mean(col("IMDb"))).show()

+-----------------+
|        avg(IMDb)|
+-----------------+
|5.822641363881808|
+-----------------+



In [25]:
movie_data.where(~descripFilter).agg(mean(col("IMDb"))).show()

+-----------------+
|        avg(IMDb)|
+-----------------+
|6.260852713178291|
+-----------------+



**NUMBER OF MOVIES WHICH FOLLOW A CRITERIA**

In [26]:
descripFilter = instr(lower(movie_data.Language), "english") >= 1
countfilter= instr(lower(col("Country")), "united states") >= 1
genrefilter= instr(upper(col("Genres")),"ACTION")>= 1

In [27]:
movie_data.withColumn("Criteria", countfilter & (descripFilter | genrefilter))\
  .where("Criteria")\
  .count()

10303

**CORRELATION DURATION AND RATING**

In [28]:
from pyspark.sql.functions import corr
movie_data.stat.corr("Runtime", "IMDb")

0.22394511701216854

**HOW MANY UNIQUE GENRES**

In [29]:
movie_data.withColumn("Genres_all",split(("Genres"),",")).drop("Genres").select(explode("Genres_all")).distinct().count()

27

**HOW MANY UNIQUE LANGUAGES**

In [30]:
lang_count=movie_data.withColumn("Lang_all",split(("Language"),",")).drop("Language").select(explode("Lang_all")).distinct().count()

In [31]:
lang_count

178

**MOVIES PER LANGUAGE**

In [32]:
movie_data.withColumn("Lang_all",explode(split(("Language"),","))).drop("Language").groupBy('Lang_all').count().sort(desc("count")).show(5)

+--------+-----+
|Lang_all|count|
+--------+-----+
| English|13233|
| Spanish|  872|
|  French|  799|
|   Hindi|  731|
|  German|  483|
+--------+-----+
only showing top 5 rows



**TOP MOST GENRE BY COUNT PER LANGUAGE**

In [33]:
gen_lan=movie_data.withColumn("Lang_all",explode(split(("Language"),","))).withColumn("Genre_all",explode(split(("Genres"),","))).\
withColumn("Number", lit(1)).select("Lang_all","Genre_all","Number")

In [34]:
w = Window.partitionBy('Lang_all')

gen_lan.groupBy("Lang_all","Genre_all").count().withColumn('Max_count', f.max('count').over(w)).\
where(f.col('count') == f.col('Max_count')).sort(desc("Max_count")).select("Lang_all","Genre_all").\
show(5,truncate=False)

+--------+---------+
|Lang_all|Genre_all|
+--------+---------+
|English |Drama    |
|Hindi   |Drama    |
|French  |Drama    |
|Spanish |Drama    |
|German  |Drama    |
+--------+---------+
only showing top 5 rows



**OTHER THAN US,AUSTRALIA AND UK WHICH COUNTRIES HAVE ENGLISH MOVIES**

In [35]:
filter1=instr(lower(movie_data.Country), "united states") == 0
filter2=instr(lower(movie_data.Country), "kingdom") == 0
filter3=instr(lower(movie_data.Country), "australia") == 0
filter4=instr(lower(movie_data.Language), "english") >= 1

In [36]:
movie_data.withColumn("Criteria", filter4 & (filter1 & (filter2 & filter3)))\
  .where("Criteria").count()

1559

In [37]:
movie_data.withColumn("Criteria", filter4 & (filter1 & (filter2 & filter3)))\
  .where("Criteria").withColumn("Lang_all",explode(split(("Language"),","))).withColumn("Country_all",explode(split(("Country"),",")))\
.select("Lang_all","Country_all").filter(col("Lang_all")=="English").groupBy("Country_all").count().sort(desc("count")).show()

+------------+-----+
| Country_all|count|
+------------+-----+
|      Canada|  597|
|      France|  171|
|       India|  150|
|     Germany|  136|
|       Italy|  107|
|       Spain|   66|
|   Hong Kong|   52|
|     Ireland|   47|
|     Belgium|   42|
|South Africa|   37|
| Netherlands|   37|
|       China|   34|
|      Sweden|   32|
|       Japan|   32|
|     Denmark|   30|
| South Korea|   30|
|      Mexico|   26|
| New Zealand|   25|
|     Nigeria|   22|
| Philippines|   20|
+------------+-----+
only showing top 20 rows



**WHICH MOVIE INTO MANY LANGUAGES**

In [38]:
movie_data=movie_data.withColumn("Lang_all",size(split(("Language"),",")))

In [39]:
max_count=movie_data.agg({"Lang_all": "max"}).collect()[0][0]

In [40]:
movie_data.filter(f.col('Lang_all') == max_count).select("Title").show()

+-----+
|Title|
+-----+
| 2012|
+-----+



**HOW MANY MOVIES HAVE ENGLISH AS THEIR TRANSLATED LANGUAGE(not the first name in list)**

In [41]:
movie_data.withColumn("Lang_all",size(split(("Language"),","))).filter(col("Lang_all")>1).\
withColumn("xyz",(split(("Language"),","))).withColumn("abc",f.slice(f.col("xyz"), 2, max_count)).\
where(array_contains(col("abc"),"English")).count()

705

**REGEX REPLACE USAGE**

In [42]:
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
movie_data.select(regexp_replace(col("Title"), regex_string, "COLOR").alias("color_clean"),col("Title")).show(2)

+-----------+----------+
|color_clean|     Title|
+-----------+----------+
|  Inception| Inception|
| The Matrix|The Matrix|
+-----------+----------+
only showing top 2 rows



In [43]:
from pyspark.sql.functions import create_map
movie_data.select(create_map(col("Title"), col("Year")).alias("complex_map"))\
.selectExpr("explode(complex_map)").show(2)

+----------+-----+
|       key|value|
+----------+-----+
| Inception| 2010|
|The Matrix| 1999|
+----------+-----+
only showing top 2 rows



**WHICH LANGUAGE HAS SO MANY CRIME THRILLERS**

In [45]:
filter1=instr(lower(movie_data.Genres), "crime") >= 1
filter2=instr(lower(movie_data.Genres), "thriller") >=1

In [50]:
movie_data.withColumn("Criteria", filter1 | filter2).where("Criteria").\
withColumn("Lang_all",explode(split(("Language"),","))).select("Lang_all","Criteria").groupBy("Lang_all").agg(count("Criteria")).show(5)

+---------+---------------+
| Lang_all|count(Criteria)|
+---------+---------------+
|  English|           3518|
|Afrikaans|              8|
|    Xhosa|              2|
| Hawaiian|              1|
|  Swedish|             26|
+---------+---------------+
only showing top 5 rows



**TOP MOST GENRE BY AVERAGE RATING PER LANGUAGE**

In [60]:
gen_lan=movie_data.withColumn("Lang_all",explode(split(("Language"),","))).\
withColumn("Genre_all",explode(split(("Genres"),","))).select("Lang_all","Genre_all","IMDb").\
groupBy("Genre_all","Lang_all").agg(round(mean("IMDb"),2).alias("Rating"))

In [63]:
w = Window.partitionBy('Lang_all')

gen_lan.withColumn('Max_rating', f.max('Rating').over(w)).\
where(f.col('Rating') == f.col('Max_rating')).sort(desc("Max_rating")).select("Lang_all","Genre_all").\
show(5,truncate=False)

+---------+---------+
|Lang_all |Genre_all|
+---------+---------+
|Polish   |Sport    |
|Bosnian  |History  |
|Bosnian  |Family   |
|Xhosa    |News     |
|Afrikaans|Music    |
+---------+---------+
only showing top 5 rows



**WHAT ARE THE HIGH RATED KIDS(7+) MOVIES EVERY YEAR**

In [66]:
gen_lan=movie_data.filter(col("Age")=="7+")

w = Window.partitionBy('Year')

In [70]:
gen_lan.withColumn('Max_rating', f.max('IMDb').over(w)).\
where(f.col('IMDb') == f.col('Max_rating')).sort(desc("Max_rating")).select("Title","Year","IMDb").\
show(5,truncate=False)

+----------------------------------+----+----+
|Title                             |Year|IMDb|
+----------------------------------+----+----+
|Untamed Romania                   |2018|8.7 |
|Star Wars: The Empire Strikes Back|1980|8.7 |
|Slednecks 13                      |2010|8.6 |
|Fabulous Frogs                    |2014|8.6 |
|It's a Wonderful Life             |1946|8.6 |
+----------------------------------+----+----+
only showing top 5 rows



In [76]:
pivoted = movie_data.groupBy("Year").pivot("Genres").sum()

In [80]:
pivoted.where("Year > 2000").select("Year" ,"Thriller_sum(CAST(No_of_platforms AS BIGINT))").show()

+----+---------------------------------------------+
|Year|Thriller_sum(CAST(No_of_platforms AS BIGINT))|
+----+---------------------------------------------+
|2015|                                           18|
|2008|                                            3|
|2020|                                            5|
|2019|                                           17|
|2003|                                         null|
|2012|                                           10|
|2013|                                           12|
|2004|                                            3|
|2017|                                           14|
|2002|                                            2|
|2001|                                            2|
|2009|                                           11|
|2018|                                           23|
|2016|                                           36|
|2005|                                            1|
|2014|                                        