In [None]:
# activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.1.1'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad

In [None]:
#import packages
# We are using pandas to read the raw csv files from github, then converting them to spark Dataframes (this will save us some download time and HDD space on our laptops)
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
import pandas as pd
# we are going to use this to time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/filterNames.csv"
spark.sparkContext.addFile(url)
names_df = spark.read.csv(SparkFiles.get("filterNames.csv"), sep=",", header=True)

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.functions import hour
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import when

**QUESTION 1**

In [None]:
#viewing the names dataset
names_df.show()

+---------+------------------+---------+---------+--------------------+--------------------+---------+
|   nconst|       primaryname|birthyear|deathyear|   primaryprofession|      knownfortitles| knownFor|
+---------+------------------+---------+---------+--------------------+--------------------+---------+
|nm0458594|     Roger Kleiber|     null|     null|visual_effects,ed...|tt0405022,tt19579...|tt0497467|
|nm0458601|        Mark Kleid|     null|     null|               actor|tt0105041,tt01120...|tt0277615|
|nm0458616|      Mark Kleiman|     null|     null|editor,producer,m...|tt0196091,tt01256...|tt0972846|
|nm0458617|   Michael Kleiman|     1969|     null|camera_department...|tt0913354,tt04311...|tt0431197|
|nm0458618|      Naum Kleiman|     1937|     null|director,actor,wr...|tt3066262,tt64875...|tt3066262|
|nm0458648|     Andreas Klein|     null|     null|producer,executiv...|tt0311429,tt03139...|tt0311429|
|nm0458656|      Arthur Klein|     null|     null|editorial_departm...|tt

In [None]:
#Creating a temporary view.  The temp view is a pre-requisite for Spark to allow you to read with sql.
names_df.createOrReplaceTempView('profession')

In [None]:
 #using sql in Spark just requires you to call spark.sql(<sql>) and showing the data.
#here we are peeking at the data using SparkSQL
pri_prof= spark.sql("select nconst,primaryname,primaryprofession,birthyear,deathyear  from profession where primaryprofession like '%actor%' or primaryprofession like '%actress%'")

In [None]:
#Tempview of the names_df
pri_prof.createOrReplaceTempView('birth')

In [None]:
#viewing ONLY actors/actresses with birthyear and deathyear (dead people)
birth_year = spark.sql("select nconst,primaryname,primaryprofession,birthyear,deathyear  from birth where birthyear >0 and  deathyear >0")
birth_year.show()

+---------+------------------+--------------------+---------+---------+
|   nconst|       primaryname|   primaryprofession|birthyear|deathyear|
+---------+------------------+--------------------+---------+---------+
|nm0459186|    Maria Klejdysz|             actress|     1927|     2009|
|nm0459258|   Jaromír Klempír|composer,soundtra...|     1944|     2016|
|nm0459299|   Ladislav Klepal|               actor|     1937|     2002|
|nm0459585|   Matthiew Klinck|producer,actor,ci...|     1978|     2016|
|nm0459991|       Enn Klooren|               actor|     1940|     2011|
|nm0460294|       Harry Klynn|actor,writer,dire...|     1940|     2018|
|nm0460554|      Nikita Knatz|art_department,ac...|     1940|     2010|
|nm0461841|   Dominic Knutton|               actor|     1970|     2007|
|nm0458957|       Robin Klein|casting_director,...|     1960|     2004|
|nm0459585|   Matthiew Klinck|producer,actor,ci...|     1978|     2016|
|nm0460104|      Claude Klotz|        writer,actor|     1932|   

In [None]:
#calculating the ages of actors/actresses who are dead 
actors_age= birth_year.withColumn("age",(birth_year['deathyear'] - birth_year['birthyear']))
actors_age = actors_age.withColumn("age", actors_age["age"].cast(IntegerType()))
actors_age.show()

+---------+------------------+--------------------+---------+---------+---+
|   nconst|       primaryname|   primaryprofession|birthyear|deathyear|age|
+---------+------------------+--------------------+---------+---------+---+
|nm0459186|    Maria Klejdysz|             actress|     1927|     2009| 82|
|nm0459258|   Jaromír Klempír|composer,soundtra...|     1944|     2016| 72|
|nm0459299|   Ladislav Klepal|               actor|     1937|     2002| 65|
|nm0459585|   Matthiew Klinck|producer,actor,ci...|     1978|     2016| 38|
|nm0459991|       Enn Klooren|               actor|     1940|     2011| 71|
|nm0460294|       Harry Klynn|actor,writer,dire...|     1940|     2018| 78|
|nm0460554|      Nikita Knatz|art_department,ac...|     1940|     2010| 70|
|nm0461841|   Dominic Knutton|               actor|     1970|     2007| 37|
|nm0458957|       Robin Klein|casting_director,...|     1960|     2004| 44|
|nm0459585|   Matthiew Klinck|producer,actor,ci...|     1978|     2016| 38|
|nm0460104| 

In [None]:
from pyspark.sql.functions import when

In [None]:
#checks from the actors/actress dataframe to see if they are alive or dead.
alive_dead = pri_prof.withColumn("Status",
       when(pri_prof.birthyear.isNotNull() & pri_prof.deathyear.isNotNull(), "Dead")
      .when(pri_prof.birthyear.isNotNull() & pri_prof.deathyear.isNull(), "Alive")
      .when(pri_prof.birthyear.isNull() & pri_prof.deathyear.isNotNull(), "Dead")
      .otherwise("unknown"))
alive_dead.show(40)

+---------+-------------------+--------------------+---------+---------+-------+
|   nconst|        primaryname|   primaryprofession|birthyear|deathyear| Status|
+---------+-------------------+--------------------+---------+---------+-------+
|nm0458601|         Mark Kleid|               actor|     null|     null|unknown|
|nm0458618|       Naum Kleiman|director,actor,wr...|     1937|     null|  Alive|
|nm0458664|         Bart Klein|               actor|     null|     null|unknown|
|nm0458680|        Butch Klein| actor,miscellaneous|     null|     null|unknown|
|nm0458680|        Butch Klein| actor,miscellaneous|     null|     null|unknown|
|nm0458808|         Jeff Klein|art_department,actor|     null|     null|unknown|
|nm0458833|         Jona Klein|editorial_departm...|     null|     null|unknown|
|nm0458834|        Jonah Klein|actor,producer,wr...|     null|     null|unknown|
|nm0458842|       Jürgen Klein|        stunts,actor|     1968|     null|  Alive|
|nm0458866|       Leslie Kle

In [None]:
#calculating actors age based on the given data set, without any assumption.
actors_actress_age= alive_dead.withColumn("age",(alive_dead['deathyear'] - alive_dead['birthyear']))
actors_actress_age = actors_actress_age.withColumn("age", actors_actress_age["age"].cast(IntegerType()))
#actors_actress_age.show(50)

In [None]:
#Assuming those actors/actresses with their missing death dates are still alive, their ages are calculated using 2021.
act_age = actors_actress_age.withColumn("Actors/actresses age",
       when(actors_actress_age.birthyear.isNotNull() & actors_actress_age.deathyear.isNotNull(), actors_actress_age.age)
      .when(actors_actress_age.birthyear.isNotNull() & actors_actress_age.deathyear.isNull(), (2021 - actors_actress_age['birthyear']).cast(IntegerType()))
      .otherwise(actors_actress_age.age))
act_age.show(40)

+---------+-------------------+--------------------+---------+---------+-------+----+--------------------+
|   nconst|        primaryname|   primaryprofession|birthyear|deathyear| Status| age|Actors/actresses age|
+---------+-------------------+--------------------+---------+---------+-------+----+--------------------+
|nm0458601|         Mark Kleid|               actor|     null|     null|unknown|null|                null|
|nm0458618|       Naum Kleiman|director,actor,wr...|     1937|     null|  Alive|null|                  84|
|nm0458664|         Bart Klein|               actor|     null|     null|unknown|null|                null|
|nm0458680|        Butch Klein| actor,miscellaneous|     null|     null|unknown|null|                null|
|nm0458680|        Butch Klein| actor,miscellaneous|     null|     null|unknown|null|                null|
|nm0458808|         Jeff Klein|art_department,actor|     null|     null|unknown|null|                null|
|nm0458833|         Jona Klein|editor

In [None]:
#creating actors/actresses tables with unique names 
actors_table = act_age.select('nconst','primaryname','Actors/actresses age','Status').distinct()

In [None]:
#display actors/actresses table
actors_table.show()


+----------+--------------------+--------------------+-------+
|    nconst|         primaryname|Actors/actresses age| Status|
+----------+--------------------+--------------------+-------+
| nm0460785|Stephanie Knight ...|                null|unknown|
| nm7746098|Belén García del ...|                null|unknown|
| nm7749647|     Jamie Soricelli|                null|unknown|
| nm1445342|     Houda Echouafni|                null|unknown|
| nm1446060|      Hunter Parrish|                  34|  Alive|
| nm1446758|Loison Robert Ludwig|                null|unknown|
| nm4947226|        Erwan Catrix|                null|unknown|
| nm4939875| Svetoslav Slaveykov|                null|unknown|
| nm4944880|    Angeliki Kivotou|                null|unknown|
| nm4946616|        Camari Brown|                null|unknown|
| nm7743487|          Riley Wood|                null|unknown|
| nm1449191|    José Javier Arce|                null|unknown|
|nm10587537|      Sandile Mkhize|                null|u

**QUESTION 2**

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/titles_basic.csv"
spark.sparkContext.addFile(url)
titles_df = spark.read.csv(SparkFiles.get("titles_basic.csv"), sep=",", header=True)

In [None]:
titles_df.show()

+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|_c0|   tconst|titletype|        primarytitle|       originaltitle|isadult|startyear|endyear|runtimeminutes|              genres|
+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|  0|tt0439997|    movie|           500 Almas|           500 Almas|      0|     2004|   null|         105.0|         Documentary|
|  1|tt0439999|tvSpecial|           80s Mania|           80s Mania|      0|     2001|   null|          50.0|               Music|
|  3|tt0440003|    movie|        A-1 Headline|         A1 tou tiao|      0|     2004|   null|          95.0|    Mystery,Thriller|
|  4|tt0440004|  tvMovie| AD/BC: A Rock Opera| AD/BC: A Rock Opera|      0|     2004|   null|          30.0|      Comedy,Musical|
|  5|tt0440008|  tvMovie|Abbamania: We Say...|Abbamania: We Say...|      0|     2004|   nu

In [None]:
#pyspark to float conversion
titles_df = titles_df.withColumn("TimestampIntStr", titles_df.runtimeminutes.cast("float").cast("int").cast("string"))

In [None]:
# spliting timestamp into hour and minute
titles_df = titles_df.withColumn("timestamphour", f.floor(titles_df.runtimeminutes.cast("float").cast("int")/60).cast("string"))
titles_df = titles_df.withColumn("timestampminute", (titles_df.runtimeminutes.cast("float").cast("int")%60).cast("string"))
titles_df.show()

+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+
|_c0|   tconst|titletype|        primarytitle|       originaltitle|isadult|startyear|endyear|runtimeminutes|              genres|TimestampIntStr|timestamphour|timestampminute|
+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+
|  0|tt0439997|    movie|           500 Almas|           500 Almas|      0|     2004|   null|         105.0|         Documentary|            105|            1|             45|
|  1|tt0439999|tvSpecial|           80s Mania|           80s Mania|      0|     2001|   null|          50.0|               Music|             50|            0|             50|
|  3|tt0440003|    movie|        A-1 Headline|         A1 tou tiao|      0|     2004|   null|          95.0|    Mystery,

In [None]:
#putting the hour and minute together
titles_df = titles_df.withColumn("tsstring", f.concat_ws(':',titles_df.timestamphour.cast("string"),titles_df.timestampminute.cast("string")))
#titles_df.show()

In [None]:
from pyspark.sql import functions as f

In [None]:
#titles_df.show()

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/ratings.csv"
spark.sparkContext.addFile(url)
rating_df = spark.read.csv(SparkFiles.get("ratings.csv"), sep=",", header=True)

In [None]:
rating_df.show()

+---------+-------------+--------+
|   tconst|averagerating|numvotes|
+---------+-------------+--------+
|tt0214461|          7.4|      74|
|tt0214659|          6.0|      21|
|tt0214878|          6.1|    1566|
|tt0215244|          3.0|       5|
|tt0215402|          8.1|       9|
|tt0215423|          8.1|      11|
|tt0215458|          5.6|     103|
|tt0215785|          5.3|     506|
|tt0215972|          5.5|      17|
|tt0216247|          8.1|      17|
|tt0216705|          7.0|     114|
|tt0217135|          5.9|     306|
|tt0217143|          6.5|      20|
|tt0217196|          7.4|      66|
|tt0217629|          7.0|    3009|
|tt0217769|          5.8|     369|
|tt0217824|          6.3|    1579|
|tt0217836|          2.5|     430|
|tt0218182|          6.3|    1593|
|tt0218440|          6.0|    1960|
+---------+-------------+--------+
only showing top 20 rows



In [None]:
titles_df.createOrReplaceTempView('Film')

In [None]:
 #using sql in Spark just requires you to call spark.sql(<sql>) and showing the data.
#here we are peeking at the data using SparkSQL
film_doc= spark.sql("select tconst,titletype,originaltitle,runtimeminutes  from Film")

In [None]:
film_doc.show()

+---------+---------+--------------------+--------------+
|   tconst|titletype|       originaltitle|runtimeminutes|
+---------+---------+--------------------+--------------+
|tt0439997|    movie|           500 Almas|         105.0|
|tt0439999|tvSpecial|           80s Mania|          50.0|
|tt0440003|    movie|         A1 tou tiao|          95.0|
|tt0440004|  tvMovie| AD/BC: A Rock Opera|          30.0|
|tt0440008|  tvMovie|Abbamania: We Say...|          50.0|
|tt0440016|    movie|       Ah ma yau nan|          93.0|
|tt0440022|  tvMovie|        Al atardecer|          66.0|
|tt0440035|  tvMovie|      L'amour en pen|          52.0|
|tt0440067|    movie|      Bau lit do see|          99.0|
|tt0440078|  tvMovie|  The Band Aid Story|          95.0|
|tt0440084|  tvMovie|A Beachcombers Ch...|         120.0|
|tt0440149|    video|Blink 182: Punk P...|          61.0|
|tt0440154|  tvMovie|Boogie special: 5...|          26.0|
|tt0440155|  tvMovie|Boogie special: M...|          28.0|
|tt0440157|tvS

In [None]:
from pyspark.sql.functions import to_timestamp
df=titles_df.withColumn('date_time',to_timestamp('tsstring','K:m'))
df.show()

+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+
|_c0|   tconst|titletype|        primarytitle|       originaltitle|isadult|startyear|endyear|runtimeminutes|              genres|TimestampIntStr|timestamphour|timestampminute|tsstring|          date_time|
+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+
|  0|tt0439997|    movie|           500 Almas|           500 Almas|      0|     2004|   null|         105.0|         Documentary|            105|            1|             45|    1:45|1970-01-01 01:45:00|
|  1|tt0439999|tvSpecial|           80s Mania|           80s Mania|      0|     2001|   null|          50.0|               Music|             50|            0|             50|    0

In [None]:
#extracting only_time
from pyspark.sql.functions import date_format
film_table = df.withColumn('runtime', date_format('date_time', 'HH:mm'))

In [None]:
film_table.show()

+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+-------+
|_c0|   tconst|titletype|        primarytitle|       originaltitle|isadult|startyear|endyear|runtimeminutes|              genres|TimestampIntStr|timestamphour|timestampminute|tsstring|          date_time|runtime|
+---+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+-------+
|  0|tt0439997|    movie|           500 Almas|           500 Almas|      0|     2004|   null|         105.0|         Documentary|            105|            1|             45|    1:45|1970-01-01 01:45:00|  01:45|
|  1|tt0439999|tvSpecial|           80s Mania|           80s Mania|      0|     2001|   null|          50.0|               Music|             50|   

In [None]:
# joining two tables film_table and rating_df
join_films = film_table.join(rating_df, on='tconst')


In [None]:
#showing table
join_films.show()

+---------+---+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+-------+-------------+--------+
|   tconst|_c0|titletype|        primarytitle|       originaltitle|isadult|startyear|endyear|runtimeminutes|              genres|TimestampIntStr|timestamphour|timestampminute|tsstring|          date_time|runtime|averagerating|numvotes|
+---------+---+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+---------------+-------------+---------------+--------+-------------------+-------+-------------+--------+
|tt0439997|  0|    movie|           500 Almas|           500 Almas|      0|     2004|   null|         105.0|         Documentary|            105|            1|             45|    1:45|1970-01-01 01:45:00|  01:45|          7.2|      19|
|tt0440003|  3|    movie|        A-1 Headline|         A

In [None]:
#creating temp view of the table having all the columns needed
join_films.createOrReplaceTempView('new_join_table')

In [None]:
# made tables of the columns required by question two
films_table = spark.sql("select tconst,titletype,originaltitle,averagerating,runtime,genres,isadult from new_join_table ")

In [None]:
#showing the new table
films_table.show()

+---------+---------+--------------------+-------------+-------+--------------------+-------+
|   tconst|titletype|       originaltitle|averagerating|runtime|              genres|isadult|
+---------+---------+--------------------+-------------+-------+--------------------+-------+
|tt0439997|    movie|           500 Almas|          7.2|  01:45|         Documentary|      0|
|tt0440003|    movie|         A1 tou tiao|          5.9|  01:35|    Mystery,Thriller|      0|
|tt0440004|  tvMovie| AD/BC: A Rock Opera|          7.4|  00:30|      Comedy,Musical|      0|
|tt0440016|    movie|       Ah ma yau nan|          5.5|  01:33|        Comedy,Crime|      0|
|tt0440067|    movie|      Bau lit do see|          5.4|  01:39|              Action|      0|
|tt0440078|  tvMovie|  The Band Aid Story|          8.1|  01:35|         Documentary|      0|
|tt0440084|  tvMovie|A Beachcombers Ch...|          7.0|  02:00|Adventure,Comedy,...|      0|
|tt0440149|    video|Blink 182: Punk P...|          4.1|  01

**QUESTION 3**

In [None]:
 #using sql in Spark just requires you to call spark.sql(<sql>) and showing the data.
#here we are peeking at the data using SparkSQL
producer_names= spark.sql("select primaryname,knownFor  from profession where primaryprofession like '%producer%' ")

In [None]:
producer_names.show()

+------------------+---------+
|       primaryname| knownFor|
+------------------+---------+
|      Mark Kleiman|tt0972846|
|     Andreas Klein|tt0311429|
|Bonnie Sherr Klein|tt5492464|
| Ellyn Klein Golub|tt0268397|
|       Frank Klein|tt0297284|
|        Greg Klein|tt0343314|
|        Jeff Kline|tt0236908|
|       Jonah Klein|tt1098356|
|       Kalle Klein|tt3814852|
|       Laura Klein|tt2910274|
|       Linda Klein|tt4572514|
|         Lou Klein|tt2357423|
|  Michael L. Klein|tt0330419|
|    Nicholas Klein|tt0120753|
|       Peter Klein|tt1520498|
|     Richard Klein|tt0386245|
|       Robin Klein|tt2419934|
|    Seth Kleinberg|tt1194173|
|    Seth Kleinberg|tt0800080|
|      Thilo Kleine|tt0161860|
+------------------+---------+
only showing top 20 rows



In [None]:
#creating producers table by joining producers_names and films table using tconst and known for values as links
producer_table = producer_names.join(films, producer_names.knownFor == films.tconst, how='inner')


In [None]:
#display table
producer_table.show()

+--------------------+---------+---------+---------+--------------------+-------------+-------+--------------------+-------+
|         primaryname| knownFor|   tconst|titletype|       originaltitle|averagerating|runtime|              genres|isadult|
+--------------------+---------+---------+---------+--------------------+-------------+-------+--------------------+-------+
|         Jimmy Smits|tt0121766|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|     Olivia McCallum|tt0121766|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|       Leigh Ann Fan|tt0121766|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
| Eric D. Christensen|tt0121766|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|         Paul Hilton|tt0121766|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|


**  QUESTION 4**

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/filteredPrincipals.csv"
spark.sparkContext.addFile(url)
principal_df = spark.read.csv(SparkFiles.get("filteredPrincipals.csv"), sep=",", header=True)

In [None]:
#display 
principal_df.show()

+----------+--------+---------+---------------+----+-----------------+
|    tconst|ordering|   nconst|       category| job|       characters|
+----------+--------+---------+---------------+----+-----------------+
|tt12189034|       6|nm5943320|          actor|null|      "[""Lee""]"|
|tt12189034|       6|nm5943320|          actor|null|      "[""Lee""]"|
|tt12189034|       6|nm5943320|          actor|null|      "[""Lee""]"|
|tt12189034|       6|nm5943320|          actor|null|      "[""Lee""]"|
|tt12189034|       8|nm7672770|          actor|null|"[""Detective""]"|
|tt12189034|       8|nm7672770|          actor|null|"[""Detective""]"|
|tt12189034|       8|nm7672770|          actor|null|"[""Detective""]"|
|tt12189034|       8|nm7672770|          actor|null|"[""Detective""]"|
|tt12190374|       3|nm0767339|cinematographer|null|             null|
|tt12190374|       3|nm0767339|cinematographer|null|             null|
|tt12190374|       3|nm0767339|cinematographer|null|             null|
|tt121

In [None]:
#selecting only needed columns
uniq_char = principal_df.select('characters','tconst','nconst').distinct()

In [None]:
uniq_char.show()

+--------------------+----------+----------+
|          characters|    tconst|    nconst|
+--------------------+----------+----------+
|                null|tt12198800|nm10990579|
|                null| tt9780330| nm1278480|
|"[""Oton Miyazawa...| tt0444902| nm0337751|
|                null| tt4991384| nm1333687|
|     "[""El cura""]"| tt3286888| nm0525599|
|                null|tt13648222| nm0055656|
|                null|tt11230846| nm0930782|
|        "[""Self""]"| tt6640526| nm8843903|
|                null| tt0938691| nm1951277|
|                null| tt0940852| nm2523449|
|       "[""Horse""]"| tt8982042| nm6378490|
|                null| tt0444401| nm0820715|
|        "[""Self""]"| tt5676108| nm5348537|
|                null| tt0455620| nm1031112|
|                null| tt3294878| nm6048763|
|                null| tt1734072| nm0765405|
|        "[""Riku""]"|tt13628470| nm1515534|
|   "[""Self - Winner| tt1366325| nm2233157|
|                null| tt1727380| nm3762381|
|         

In [None]:
#joining characters and film tables 
char_table1 = uniq_char.join(films, uniq_char.tconst == films.tconst, how='inner')

In [None]:
char_table1.show()


+--------------------+---------+---------+---------+---------+--------------------+-------------+-------+--------------------+-------+
|          characters|   tconst|   nconst|   tconst|titletype|       originaltitle|averagerating|runtime|              genres|isadult|
+--------------------+---------+---------+---------+---------+--------------------+-------------+-------+--------------------+-------+
|  "[""Mace Windu""]"|tt0121766|nm0000168|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|                null|tt0121766|nm0059242|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|                null|tt0121766|nm0123785|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|                null|tt0121766|nm0000184|tt0121766|    movie|Star Wars: Episod...|          7.5|  02:20|Action,Adventure,...|      0|
|                null|tt0121766|nm0002354|tt0121766|   

In [None]:
#character table joined with actors and film tables
characters = char_table1.join(pri_prof, char_table1.nconst == pri_prof.nconst, how='inner')

In [None]:
characters.show()

+--------------------+----------+---------+----------+---------+--------------------+-------------+-------+--------------------+-------+---------+-----------+--------------------+---------+---------+
|          characters|    tconst|   nconst|    tconst|titletype|       originaltitle|averagerating|runtime|              genres|isadult|   nconst|primaryname|   primaryprofession|birthyear|deathyear|
+--------------------+----------+---------+----------+---------+--------------------+-------------+-------+--------------------+-------+---------+-----------+--------------------+---------+---------+
|"[""Commissioner ...| tt1345836|nm0000198| tt1345836|    movie|The Dark Knight R...|          8.4|  02:44|    Action,Adventure|      0|nm0000198|Gary Oldman|actor,soundtrack,...|     1958|     null|
|"[""Commissioner ...| tt1345836|nm0000198| tt1345836|    movie|The Dark Knight R...|          8.4|  02:44|    Action,Adventure|      0|nm0000198|Gary Oldman|actor,soundtrack,...|     1958|     null|


In [None]:
#getting disctinct characters
characters_unique = characters.select('characters','titletype','originaltitle','averagerating','runtime','genres','isadult','primaryname','primaryprofession','birthyear','deathyear').distinct()

In [None]:
characters_unique.show(truncate=False)

+-------------------------------------------------------------------+---------+-----------------------------+-------------+-------+--------------------------+-------+-----------------------+-----------------------------------+---------+---------+
|characters                                                         |titletype|originaltitle                |averagerating|runtime|genres                    |isadult|primaryname            |primaryprofession                  |birthyear|deathyear|
+-------------------------------------------------------------------+---------+-----------------------------+-------------+-------+--------------------------+-------+-----------------------+-----------------------------------+---------+---------+
|"[""Self""]"                                                       |tvSpecial|Super Bowl XXXV Halftime Show|5.1          |null   |Music,Sport               |0      |Mary J. Blige          |soundtrack,actress,composer        |1971     |null     |
|null       

In [None]:
#droping NA from character table
characters_table = characters_unique.na.drop()

In [None]:
# final character table joined with film and actors/actresses tables using links
characters_table.show()

+--------------------+---------+--------------------+-------------+-------+--------------------+-------+------------------+--------------------+---------+---------+
|          characters|titletype|       originaltitle|averagerating|runtime|              genres|isadult|       primaryname|   primaryprofession|birthyear|deathyear|
+--------------------+---------+--------------------+-------------+-------+--------------------+-------+------------------+--------------------+---------+---------+
|   "[""Jim Lahey""]"|  tvMovie|Trailer Park Boys...|          6.0|  01:14|              Comedy|      0|    John Dunsworth|actor,casting_dir...|     1946|     2017|
|"[""Shuichi Aizaw...|  tvMovie|Death Note Rewrit...|          7.5|  02:11|Animation,Mystery...|      0|    Keiji Fujiwara|actor,soundtrack,...|     1964|     2020|
| "[""Donata Fiok""]"|    movie|      Superprodukcja|          4.7|  01:32|              Comedy|      0|   Anna Przybylska|             actress|     1978|     2014|
|        "