<a href="https://colab.research.google.com/github/sumitra14/PySpark_ETL/blob/main/PySpark_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PySpark ETL Project - Netflix TV Shows and Movies**


   **DataSource** - https://www.kaggle.com/datasets/victorsoeiro/netflix-tv-shows-and-movies

   **Pyspark in goggle colab** - https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/


   **Tasks performed in this Notebook** -



*   Set up pyspark
*   Unzip Data file
*   Extract data from csv file to Pandas Dataframe (Using Pandas DatFrame instead of Spark DataFrame to parse the CSV file as it handles internal commas(comma within data fields) better.
*   Dropping unecessary columns and then creating from Spark DF from Pandas DF
*   Cleaning Data using transformations
*   Exploring Data to get insights
*   Loading the Cleaned Data back to csv file(This csv file can then be used by data analysts,data scientists to find more insights and do staistical analysis for predictions)









   

   
  
  




In [1]:
#Connecting Notebook to Goggle Drive as here Data is Stored in drive, 
#however in real world it can be any storage system like HDFS(A file system that is distributed amongst many networked computers or nodes) 
#or S3(aws object storage)

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
#Setting up PySpark in Colab
#first task is to download Java.

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
#install Apache Spark 

!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [5]:
#install findspark library as it will locate Spark on the system and import it as a regular library.

!pip install -q findspark

In [6]:
#Setting the environment path as it will enable us to run Pyspark in the Colab environment.

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [7]:
#Reading Data from Drive

!unzip "/content/drive/MyDrive/netflix_tvshows_movies.zip"

Archive:  /content/drive/MyDrive/netflix_tvshows_movies.zip
  inflating: credits.csv             
  inflating: titles.csv              


In [8]:
#We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method

import findspark
findspark.init()
findspark.find()

'/content/spark-3.3.1-bin-hadoop3'

In [9]:
#Creating Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

**Extracting data from csv file to pandas dataframe**

In [10]:
import pandas as pd
Titlepd=pd.read_csv("/content/titles.csv")
Creditpd=pd.read_csv("/content/credits.csv")

In [11]:
#Dropping unecessary columns

Titlepd.drop(["description","age_certification","imdb_id"], axis=1,inplace=True)
Creditpd.drop(["person_id"],axis=1,inplace=True)

In [13]:
#importing datatypes supported by spark dataframe- these will be used to provide schema explicitly while creating spark dataframe from 
#Pandas DF otherwise there are chances that Spark didn't infer the schema correctly and can result in error or merging of column data

from pyspark.sql.types import StringType, StructField, StructType ,IntegerType,FloatType

In [14]:
#schema for Titlepd
schema1 = StructType([StructField("id", StringType(), True)\
                   ,StructField("title",StringType(), True)\
                   ,StructField("type", StringType(), True)\
                   ,StructField("release_year", IntegerType(), True)\
                   ,StructField("runtime", IntegerType(), True)\
                   ,StructField("genres", StringType(), True)\
                   ,StructField("production_countries", StringType(), True)\
                   ,StructField("season", FloatType(), True)\
                   ,StructField("imdb_score", FloatType(), True)\
                   ,StructField("imdb_votes", FloatType(), True)\
                   ,StructField("tmdb_popularity", FloatType(), True)\
                   ,StructField("tmdb_score", FloatType(), True)\
                   ])

In [15]:
#schema for Creditpd
schema2 = StructType([StructField("id", StringType(), True)\
                   ,StructField("name",StringType(), True)\
                   ,StructField("character", StringType(), True)\
                   ,StructField("role", StringType(), True)\
              
                   ])

In [16]:
#Cretaing Spark DataFrame from Pandas DataFrame

Titledf=spark.createDataFrame(Titlepd,schema=schema1)
Creditdf=spark.createDataFrame(Creditpd,schema=schema2)

In [17]:
#importing required Libraries
from pyspark.sql.functions import count,col,sum,isnan
import numpy as np

**Cleaning & Tranforming data**

In [18]:
#Counting nan(not a number) - basically pandas represent empty or blank value using nan
null_count=Titledf.select([sum(isnan(col(c)).cast("int")).alias(c) for c in Titledf.columns])
null_count.show()

+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
| id|title|type|release_year|runtime|genres|production_countries|season|imdb_score|imdb_votes|tmdb_popularity|tmdb_score|
+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
|  0|    1|   0|           0|      0|     0|                   0|  3744|       482|       498|             91|       311|
+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+



In [19]:
#We are replacing nan with None type as df.na.fill in spark dataframe does fill only Null or None type values
Titledf=Titledf.replace(np.nan,None)

In [20]:
#Checking if stil there is Nan
null_count=Titledf.select([sum(isnan(col(c)).cast("int")).alias(c) for c in Titledf.columns])
null_count.show()

+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
| id|title|type|release_year|runtime|genres|production_countries|season|imdb_score|imdb_votes|tmdb_popularity|tmdb_score|
+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
|  0|    1|   0|           0|      0|     0|                   0|     0|         0|         0|              0|         0|
+---+-----+----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+



In [21]:
#Check type of value in title
Titledf.filter(isnan(col('title'))).show()

+---------+-----+-----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
|       id|title| type|release_year|runtime|genres|production_countries|season|imdb_score|imdb_votes|tmdb_popularity|tmdb_score|
+---------+-----+-----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+
|tm1063792|  NaN|MOVIE|        2015|     11|    []|                  []|  null|      null|      null|           null|      null|
+---------+-----+-----+------------+-------+------+--------------------+------+----------+----------+---------------+----------+



In [22]:
#Removing the last one instance that was of string type, and doesn't get replaced earlier
Titledf=Titledf.replace('NaN',None)

In [23]:
#Dropping rows that doesn't have title as title is the main column for the analysis hence we can eliminate rows which don't have the value for it.
Titledf=Titledf.dropna(subset="title")

In [24]:
#The rows that containg movie will have 0 seasons
Titledf=Titledf.na.fill({'season':0})

In [25]:
#Filtering out rows that have at least one of the statistics data available for movie or show
Titledf=Titledf.filter(~((col('imdb_score').isNull()) & (col('imdb_votes').isNull()) & (col('tmdb_popularity').isNull()) & (col('tmdb_score').isNull())))

In [26]:
#Agin Checking the null count
null_count=[]
for c in Titledf.columns:
  count=Titledf.filter(Titledf[c].isNull()).count()
  null_count.append((c,count))

for c, count in null_count:
  print("column ",c,"-",count)

column  id - 0
column  title - 0
column  type - 0
column  release_year - 0
column  runtime - 0
column  genres - 0
column  production_countries - 0
column  season - 0
column  imdb_score - 466
column  imdb_votes - 482
column  tmdb_popularity - 75
column  tmdb_score - 295


**NOTE:** 

The remaining null value data present in other columns can be ignored as they are less in number and moreover less relevant for analysis

Now we will start exploring our data using **dataframe functions** and **spark sql**

In [27]:
Titledf.show(n=5)

+--------+--------------------+-----+------------+-------+--------------------+--------------------+------+----------+----------+---------------+----------+
|      id|               title| type|release_year|runtime|              genres|production_countries|season|imdb_score|imdb_votes|tmdb_popularity|tmdb_score|
+--------+--------------------+-----+------------+-------+--------------------+--------------------+------+----------+----------+---------------+----------+
|ts300399|Five Came Back: T...| SHOW|        1945|     51|   ['documentation']|              ['US']|   1.0|      null|      null|            0.6|      null|
| tm84618|         Taxi Driver|MOVIE|        1976|    114|  ['drama', 'crime']|              ['US']|   0.0|       8.2|  808582.0|         40.965|     8.179|
|tm154986|         Deliverance|MOVIE|        1972|    109|['drama', 'action...|              ['US']|   0.0|       7.7|  107673.0|          10.01|       7.3|
|tm127384|Monty Python and ...|MOVIE|        1975|     91|

In [28]:
Titledf.groupBy('type').count().show()

+-----+-----+
| type|count|
+-----+-----+
| SHOW| 2102|
|MOVIE| 3732|
+-----+-----+



In [29]:
#Creating view to perform spark sql queries on data
Titledf.createOrReplaceTempView("titles")

In [30]:
#Total count of movies or shows produced by countries
spark.sql("select REPLACE(REPLACE(production_countries,'[',''),']','') as Country, count(*) as total_movie_or_shows from titles where production_countries!='[]' group by production_countries  order by total_movie_or_shows desc").show(n=10)

+-------+--------------------+
|Country|total_movie_or_shows|
+-------+--------------------+
|   'US'|                1957|
|   'IN'|                 596|
|   'JP'|                 264|
|   'KR'|                 222|
|   'GB'|                 218|
|   'ES'|                 161|
|   'FR'|                 125|
|   'CA'|                 107|
|   'MX'|                  98|
|   'BR'|                  90|
+-------+--------------------+
only showing top 10 rows



In [31]:
#highest imdb score movie
spark.sql("select title,imdb_score,tmdb_score from titles where type='MOVIE' order by imdb_score desc, tmdb_score desc").show(n=1)

+-----+----------+----------+
|title|imdb_score|tmdb_score|
+-----+----------+----------+
|Major|       9.1|     8.188|
+-----+----------+----------+
only showing top 1 row



In [32]:
#hightes imdb voted movie
spark.sql("select title,imdb_votes from titles where type='MOVIE' order by imdb_votes desc").show(n=1)

+---------+----------+
|    title|imdb_votes|
+---------+----------+
|Inception| 2294231.0|
+---------+----------+
only showing top 1 row



In [33]:
#hightes imdb voted show
spark.sql("select title,imdb_votes from titles where type='SHOW' order by imdb_votes desc").show(n=1)

+------------+----------+
|       title|imdb_votes|
+------------+----------+
|Breaking Bad| 1775990.0|
+------------+----------+
only showing top 1 row



In [34]:
#most popular tmdb movie
spark.sql("select title,tmdb_popularity from titles where type='MOVIE' order by tmdb_popularity desc").show(n=1)

+-----------+---------------+
|      title|tmdb_popularity|
+-----------+---------------+
|Incantation|       2274.044|
+-----------+---------------+
only showing top 1 row



In [35]:
#most tmdb popular show
spark.sql("select title,tmdb_popularity from titles where type='SHOW' order by tmdb_popularity desc").show(n=1)

+---------------+---------------+
|          title|tmdb_popularity|
+---------------+---------------+
|Stranger Things|       2226.231|
+---------------+---------------+
only showing top 1 row



In [36]:
#top 5 tmdb scored movies
spark.sql("select title,tmdb_score from titles where type='MOVIE'  order by tmdb_score desc").show(n=5,truncate=False)

+------------------------------------------------------------+----------+
|title                                                       |tmdb_score|
+------------------------------------------------------------+----------+
|True: Friendship Day                                        |10.0      |
|Motu Patlu in the Game of Zones                             |10.0      |
|Kedibone                                                    |10.0      |
|Motu Patlu Kung Fu Kings 4 The Challenge of Kung Fu Brothers|10.0      |
|True: Rainbow Rescue                                        |10.0      |
+------------------------------------------------------------+----------+
only showing top 5 rows



In [37]:
#top 5 tmdb scored shows
spark.sql("select title,tmdb_score from titles where type='SHOW'  order by tmdb_score desc").show(n=5,truncate=False)

+-------------------------------+----------+
|title                          |tmdb_score|
+-------------------------------+----------+
|The Haunted House              |10.0      |
|Pink Zone                      |10.0      |
|Dharmakshetra                  |10.0      |
|Raja, Rasoi Aur Anya Kahaniyaan|10.0      |
|Little Baby Bum                |10.0      |
+-------------------------------+----------+
only showing top 5 rows



In [38]:
#Oldest movies present in the data
spark.sql("select title,type,release_year,imdb_score,tmdb_score from titles where type='MOVIE' order by release_year,imdb_score desc").show(n=5,truncate=False)

+---------------+-----+------------+----------+----------+
|title          |type |release_year|imdb_score|tmdb_score|
+---------------+-----+------------+----------+----------+
|White Christmas|MOVIE|1954        |7.5       |7.2       |
|The Blazing Sun|MOVIE|1954        |7.4       |7.0       |
|Dark Waters    |MOVIE|1956        |6.7       |5.9       |
|Cairo Station  |MOVIE|1958        |7.5       |7.3       |
|Ujala          |MOVIE|1959        |6.6       |6.0       |
+---------------+-----+------------+----------+----------+
only showing top 5 rows



In [39]:
#Oldest shows present in the data
spark.sql("select title,type,release_year,imdb_score,tmdb_score from titles where type='SHOW' and (imdb_score is not null or tmdb_score is not null) order by release_year,imdb_score desc").show(n=5,truncate=False)

+--------------------------------+----+------------+----------+----------+
|title                           |type|release_year|imdb_score|tmdb_score|
+--------------------------------+----+------------+----------+----------+
|Monty Python's Flying Circus    |SHOW|1969        |8.8       |8.306     |
|Monty Python's Fliegender Zirkus|SHOW|1972        |8.1       |7.0       |
|Danger Mouse                    |SHOW|1981        |7.4       |7.5       |
|Knight Rider                    |SHOW|1982        |6.9       |7.5       |
|Wheel of Fortune                |SHOW|1983        |6.7       |6.8       |
+--------------------------------+----+------------+----------+----------+
only showing top 5 rows



In [40]:
#most recent movies data
spark.sql("select title,release_year,imdb_score,tmdb_score from titles where type='MOVIE' order by release_year desc,imdb_score desc").show(n=5,truncate=False)

+--------------+------------+----------+----------+
|title         |release_year|imdb_score|tmdb_score|
+--------------+------------+----------+----------+
|Major         |2022        |9.1       |8.188     |
|Jana Gana Mana|2022        |8.3       |7.9       |
|Godse         |2022        |8.2       |7.0       |
|RRR           |2022        |8.0       |7.8       |
|Viraata Parvam|2022        |8.0       |5.0       |
+--------------+------------+----------+----------+
only showing top 5 rows



In [41]:
#most recent shows data
spark.sql("select title,type,release_year,imdb_score,tmdb_score from titles where type='SHOW' and (imdb_score is not null or tmdb_score is not null) order by release_year desc,imdb_score desc").show(n=5,truncate=False)

+----------------------+----+------------+----------+----------+
|title                 |type|release_year|imdb_score|tmdb_score|
+----------------------+----+------------+----------+----------+
|Twenty Five Twenty One|SHOW|2022        |8.7       |8.6       |
|Heartstopper          |SHOW|2022        |8.7       |9.027     |
|Alchemy of Souls      |SHOW|2022        |8.6       |8.0       |
|Our Blues             |SHOW|2022        |8.5       |8.8       |
|The Creature Cases    |SHOW|2022        |8.5       |7.0       |
+----------------------+----+------------+----------+----------+
only showing top 5 rows



In [42]:
#highest tmdb scored shows produced by india
spark.sql("select title,tmdb_score from titles where type='SHOW' and production_countries= '[\\'IN\\']' order by tmdb_score desc").show(n=5,truncate=False)

+-------------------------------+----------+
|title                          |tmdb_score|
+-------------------------------+----------+
|Raja, Rasoi Aur Anya Kahaniyaan|10.0      |
|Dharmakshetra                  |10.0      |
|Mai: A Mother's Rage           |9.2       |
|Masaba Masaba                  |9.0       |
|Cricket Fever: Mumbai Indians  |9.0       |
+-------------------------------+----------+
only showing top 5 rows



In [51]:
#highest imdb scored shows produced by india
spark.sql("select title,imdb_score,int(season) from titles where type='SHOW' and production_countries= '[\\'IN\\']' order by imdb_score desc").show(n=5,truncate=False)

+-------------------------------+----------+------+
|title                          |imdb_score|season|
+-------------------------------+----------+------+
|Kota Factory                   |9.1       |2     |
|Raja, Rasoi Aur Anya Kahaniyaan|8.9       |3     |
|Stories by Rabindranath Tagore |8.7       |1     |
|Daughters of Destiny           |8.6       |1     |
|Sacred Games                   |8.5       |2     |
+-------------------------------+----------+------+
only showing top 5 rows



In [44]:
#most popular movies genres
spark.sql("select title,REPLACE(REPLACE(genres,'[',''),']','') as genres_i,imdb_score from titles where type='MOVIE' and production_countries= '[\\'IN\\']' order by imdb_score desc").show(n=5,truncate=False)

+------------------------------------+--------------------------------+----------+
|title                               |genres_i                        |imdb_score|
+------------------------------------+--------------------------------+----------+
|Major                               |'action', 'drama'               |9.1       |
|C/o Kancharapalem                   |'drama'                         |8.9       |
|Anbe Sivam                          |'comedy', 'drama'               |8.7       |
|Chhota Bheem & Krishna in Mayanagari|'animation', 'action', 'fantasy'|8.7       |
|Chhota Bheem and the ShiNobi Secret |'animation'                     |8.6       |
+------------------------------------+--------------------------------+----------+
only showing top 5 rows



In [45]:
#5 longest movie present in data 
spark.sql("select title,runtime from titles where type='MOVIE' order by runtime desc").show(n=5,truncate=False)

+---------------------------------+-------+
|title                            |runtime|
+---------------------------------+-------+
|Bonnie & Clyde                   |240    |
|A Lion in the House              |225    |
|Lagaan: Once Upon a Time in India|224    |
|Jodhaa Akbar                     |214    |
|Kabhi Khushi Kabhie Gham         |210    |
+---------------------------------+-------+
only showing top 5 rows



In [46]:
#5 shortest movie present in data 
spark.sql("select title,runtime from titles where type='MOVIE' order by runtime").show(n=5,truncate=False)

+---------------------+-------+
|title                |runtime|
+---------------------+-------+
|Time to Dance        |2      |
|Silent               |3      |
|Sol Levante          |4      |
|Amsterdam to Anatolia|6      |
|Match                |7      |
+---------------------+-------+
only showing top 5 rows



In [49]:
#longest shows and the number of seasons in it
spark.sql("select title,runtime,season from titles where type='SHOW' order by runtime desc").show(n=10,truncate=False)

+----------------------+-------+------+
|title                 |runtime|season|
+----------------------+-------+------+
|A Lion in the House   |190    |1.0   |
|1994                  |178    |1.0   |
|Dead Set              |141    |1.0   |
|Maya and the Three    |131    |1.0   |
|The Yard              |108    |3.0   |
|Intersection          |107    |2.0   |
|The Gentlemen's League|98     |2.0   |
|On Children           |98     |1.0   |
|Hospital Playlist     |93     |2.0   |
|jeen-yuhs             |93     |1.0   |
+----------------------+-------+------+
only showing top 10 rows



In [48]:
#shortest shows and thier genres
spark.sql("select title,REPLACE(REPLACE(genres,'[',''),']','') as genres_i,runtime from titles where type='SHOW' and production_countries= '[\\'IN\\']' order by runtime").show(n=5,truncate=False)

+--------------------------------+-------------------------------+-------+
|title                           |genres_i                       |runtime|
+--------------------------------+-------------------------------+-------+
|Little Singham                  |'animation', 'action'          |12     |
|Ladies Up                       |'reality', 'comedy'            |17     |
|Mighty Raju                     |'animation', 'action'          |20     |
|Taarak Mehta Kka Chhota Chashmah|'animation', 'comedy'          |22     |
|Chhota Bheem                    |'animation', 'action', 'comedy'|22     |
+--------------------------------+-------------------------------+-------+
only showing top 5 rows



**Now we will look at Credit Dataframe and will join it with title dataframe to find insights**

In [52]:
#Counting nan in Credit dataframe
null_count=Creditdf.select([sum(isnan(col(c)).cast("int")).alias(c) for c in Creditdf.columns])
null_count.show()

+---+----+---------+----+
| id|name|character|role|
+---+----+---------+----+
|  0|   0|     9774|   0|
+---+----+---------+----+



In [57]:
Creditdf.filter(isnan(col('character'))).show()

+--------+-----------------+---------+--------+
|      id|             name|character|    role|
+--------+-----------------+---------+--------+
| tm84618|  Martin Scorsese|      NaN|DIRECTOR|
|tm154986|     John Boorman|      NaN|DIRECTOR|
|tm127384|      Terry Jones|      NaN|DIRECTOR|
|tm127384|    Terry Gilliam|      NaN|DIRECTOR|
|tm120801|   Robert Aldrich|      NaN|DIRECTOR|
| tm70993|      Peter Brett|      NaN|   ACTOR|
| tm70993|      Terry Jones|      NaN|DIRECTOR|
| tm14873|       Don Siegel|      NaN|DIRECTOR|
|tm119281|      Arthur Penn|      NaN|DIRECTOR|
| tm98978|   Randal Kleiser|      NaN|DIRECTOR|
| tm44204|  J. Lee Thompson|      NaN|DIRECTOR|
| tm67378|   Richard Brooks|      NaN|DIRECTOR|
| tm69997|    Jeff Margolis|      NaN|DIRECTOR|
| tm16479|   Michael Curtiz|      NaN|DIRECTOR|
|tm135083|  Loutfi El Hakim|      NaN|   ACTOR|
|tm135083| F. El Demerdache|      NaN|   ACTOR|
|tm135083|    Said El Araby|      NaN|   ACTOR|
|tm135083|Hana Abdel Fattah|      NaN|  

In [60]:
Creditdf=Creditdf.replace('NaN',None)

In [61]:
null_count=[]
for c in Creditdf.columns:
  count=Creditdf.filter(isnan(Creditdf[c])).count()
  null_count.append((c,count))

for c, count in null_count:
  print("column ",c,"-",count)

column  id - 0
column  name - 0
column  character - 0
column  role - 0


In [62]:
Creditdf=Creditdf.fillna("Unknown")

In [63]:
Creditdf.show(n=5)

+-------+---------------+--------------------+-----+
|     id|           name|           character| role|
+-------+---------------+--------------------+-----+
|tm84618| Robert De Niro|       Travis Bickle|ACTOR|
|tm84618|   Jodie Foster|       Iris Steensma|ACTOR|
|tm84618|  Albert Brooks|                 Tom|ACTOR|
|tm84618|  Harvey Keitel|Matthew 'Sport' H...|ACTOR|
|tm84618|Cybill Shepherd|               Betsy|ACTOR|
+-------+---------------+--------------------+-----+
only showing top 5 rows



In [64]:
Creditdf.createOrReplaceTempView("credit")

In [65]:
spark.sql("select count(*) as total_count, role from credit group by role").show()

+-----------+--------+
|total_count|    role|
+-----------+--------+
|       4550|DIRECTOR|
|      73251|   ACTOR|
+-----------+--------+



In [77]:
#Showing the cast and director of top 5 imdb scored movies/shows
#array function to merge vcolumn data and collect_list to merge rows as roles are stored in differen rows of smae movie 
spark.sql("select t.title,t.imdb_score,t.type, collect_list(array(c.character,c.role)) as Cast_dir from titles as t inner join credit as c on t.id=c.id group by t.id,t.title,t.imdb_score,t.type order by t.imdb_score desc").show(n=5)

+--------------------+----------+-----+--------------------+
|               title|imdb_score| type|            Cast_dir|
+--------------------+----------+-----+--------------------+
|        Breaking Bad|       9.5| SHOW|[[Walter White, A...|
|Avatar: The Last ...|       9.3| SHOW|[[Aang (voice), A...|
|          Our Planet|       9.3| SHOW|[[Self - Narrator...|
|          Reply 1988|       9.2| SHOW|[[Sung Deok-sun, ...|
|               Major|       9.1|MOVIE|[[Sandeep Unnikri...|
+--------------------+----------+-----+--------------------+
only showing top 5 rows



In [78]:
#Showing the cast and director of top 5 imdb voted movies/shows
spark.sql("select t.title,t.imdb_votes,t.type, collect_list(array(c.character,c.role)) as Cast_dir from titles as t inner join credit as c on t.id=c.id group by t.id,t.title,t.imdb_votes,t.type order by t.imdb_votes desc").show(n=5)

+--------------------+----------+-----+--------------------+
|               title|imdb_votes| type|            Cast_dir|
+--------------------+----------+-----+--------------------+
|           Inception| 2294231.0|MOVIE|[[Dom Cobb, ACTOR...|
|        Forrest Gump| 2021343.0|MOVIE|[[Forrest Gump, A...|
|        Breaking Bad| 1775990.0| SHOW|[[Walter White, A...|
|The Dark Knight R...| 1669067.0|MOVIE|[[Bruce Wayne / B...|
|               Se7en| 1606270.0|MOVIE|[[Detective David...|
+--------------------+----------+-----+--------------------+
only showing top 5 rows



In [79]:
#Showing the cast and director of top 5 tmdb popular movies/shows
spark.sql("select t.title,t.tmdb_popularity,t.type, collect_list(array(c.character,c.role)) as Cast_dir from titles as t inner join credit as c on t.id=c.id group by t.id,t.title,t.tmdb_popularity,t.type order by t.tmdb_popularity desc").show(n=5)

+--------------------+---------------+-----+--------------------+
|               title|tmdb_popularity| type|            Cast_dir|
+--------------------+---------------+-----+--------------------+
|         Incantation|       2274.044|MOVIE|[[Li Ronan, ACTOR...|
|     Stranger Things|       2226.231| SHOW|[[Joyce Byers, AC...|
|       The Sea Beast|       1723.363|MOVIE|[[Jacob Holland (...|
|  Valley of the Dead|       1668.296|MOVIE|[[Matacuras, ACTO...|
|The Man from Toronto|       1439.906|MOVIE|[[Teddy Jackson, ...|
+--------------------+---------------+-----+--------------------+
only showing top 5 rows



**Loading Cleaned Data from spark dataframes to csv files**

In [75]:
Titledf.write.csv("/content/drive/MyDrive/CleanedTitle",header=True)

In [76]:
Creditdf.write.csv("/content/drive/MyDrive/CleanedCredit",header=True)