In [2]:
import pyspark as ps    # for the pyspark suite


In [3]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

In [4]:
spark

In [59]:
spark.sql("Create Database Imdb").show()

AnalysisException: u"Database 'imdb' already exists;"

In [60]:
df_movie_raw = spark.read.csv('data/title.basics.tsv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="\t",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

In [6]:
df_movie_raw.show(5)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

In [7]:
df_movie_raw.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [61]:
df_movie_raw.createOrReplaceTempView('Imdb_All')


In [62]:
spark.sql("Desc Imdb_All").show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|        tconst|   string|   null|
|     titleType|   string|   null|
|  primaryTitle|   string|   null|
| originalTitle|   string|   null|
|       isAdult|      int|   null|
|     startYear|   string|   null|
|       endYear|   string|   null|
|runtimeMinutes|   string|   null|
|        genres|   string|   null|
+--------------+---------+-------+



In [47]:
spark.sql("Select distinct(m.titleType) from Imdb_All m").show()

+------------+
|   titleType|
+------------+
|    tvSeries|
|tvMiniSeries|
|     tvMovie|
|   tvEpisode|
|       movie|
|   tvSpecial|
|       video|
|   videoGame|
|     tvShort|
|       short|
+------------+



In [49]:
pd_df_Movies = spark.sql("select * from Imdb_All m where m.titleType in ('tvMovie','movie')").toPandas()

In [55]:
pd_df_Movies.count()

tconst            667934
titleType         667934
primaryTitle      667934
originalTitle     667934
isAdult           667934
startYear         667934
endYear           667934
runtimeMinutes    667934
genres            667934
dtype: int64

In [63]:
df_ratings_raw = spark.read.csv('data/title.ratings.tsv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="\t",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

Row(tconst=u'tt0000001', averageRating=5.6, numVotes=1593)

In [15]:
df_ratings_raw.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [64]:
df_ratings_raw.createOrReplaceTempView('Ratings')

In [65]:
spark.sql("Desc Ratings").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       tconst|   string|   null|
|averageRating|   double|   null|
|     numVotes|      int|   null|
+-------------+---------+-------+



In [66]:
spark.sql("Select * from Ratings").show(10)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.6|    1593|
|tt0000002|          6.0|     195|
|tt0000003|          6.5|    1266|
|tt0000004|          6.1|     121|
|tt0000005|          6.1|    2029|
|tt0000006|          5.1|     111|
|tt0000007|          5.4|     632|
|tt0000008|          5.4|    1741|
|tt0000009|          5.3|      89|
|tt0000010|          6.9|    5746|
+---------+-------------+--------+
only showing top 10 rows



In [67]:
df_principal_crew_raw = spark.read.csv('data/title.principals.tsv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="\t",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

In [22]:
df_principal_crew_raw.show(5)

+---------+--------+---------+---------------+--------------------+----------+
|   tconst|ordering|   nconst|       category|                 job|characters|
+---------+--------+---------+---------------+--------------------+----------+
|tt0000001|       1|nm1588970|           self|                  \N|  ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|        \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|        \N|
|tt0000002|       1|nm0721526|       director|                  \N|        \N|
|tt0000002|       2|nm1335271|       composer|                  \N|        \N|
+---------+--------+---------+---------------+--------------------+----------+
only showing top 5 rows



In [23]:
df_principal_crew_raw.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [68]:
df_principal_crew_raw.createOrReplaceTempView('Principal_Crew')

In [69]:
spark.sql("Desc Principal_Crew").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|    tconst|   string|   null|
|  ordering|      int|   null|
|    nconst|   string|   null|
|  category|   string|   null|
|       job|   string|   null|
|characters|   string|   null|
+----------+---------+-------+



In [29]:
spark.sql("Select distinct(category) from Principal_Crew").show()

+-------------------+
|           category|
+-------------------+
|            actress|
|           producer|
|             writer|
|           composer|
|           director|
|               self|
|              actor|
|             editor|
|    cinematographer|
|      archive_sound|
|production_designer|
|    archive_footage|
+-------------------+



In [30]:
spark.sql("select * from Principal_Crew where tconst = 'tt9916880'").show()

+---------+--------+----------+--------+------------------+--------------------+
|   tconst|ordering|    nconst|category|               job|          characters|
+---------+--------+----------+--------+------------------+--------------------+
|tt9916880|      10| nm2676923| actress|                \N|["Sour Susan","Go...|
|tt9916880|       1| nm1483166|   actor|                \N|["Rude Ralph","Mi...|
|tt9916880|       2| nm0254176| actress|                \N|  ["Moody Margaret"]|
|tt9916880|       3| nm0286175|   actor|                \N|["Dad","Aerobic A...|
|tt9916880|       4|nm10535738| actress|                \N|    ["Horrid Henry"]|
|tt9916880|       5| nm0996406|director|principal director|                  \N|
|tt9916880|       6| nm1482639|  writer|                \N|                  \N|
|tt9916880|       7| nm2586970|  writer|             books|                  \N|
|tt9916880|       8| nm1594058|producer|          producer|                  \N|
|tt9916880|       9| nm10525

In [114]:
spark.sql("Select distinct(job) from Principal_Crew where category = 'writer' order by job").show(50, False)

+------------------------------------------------------------------+
|job                                                               |
+------------------------------------------------------------------+
|"A Box in Town" by                                                |
|"A Kidnapped Santa Claus" and other tales                         |
|"A Midsummer Night's Dream" sequence                              |
|"Alan A'Dale" sketch                                              |
|"Alien" characters                                                |
|"All in the Family" written by                                    |
|"And Afterwards at..." by                                         |
|"Archy and Mehitabel" stories                                     |
|"Barney and the Backyard Gang" and "Barney & Friends" developed by|
|"Battletoads" created by                                          |
|"Biohazard 2/Resident Evil 2" characters                          |
|"Blue's Clues" created by        

In [90]:
spark.sql("Select distinct(characters) from Principal_Crew where category = 'writer'").show(200, False)

+----------+
|characters|
+----------+
|\N        |
+----------+



In [70]:
df_crew_names = spark.read.csv('data/name.basics.tsv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="\t",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

In [39]:
df_crew_names.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [71]:
df_crew_names.createOrReplaceTempView('Crew_names')


In [72]:
spark.sql("Desc Crew_names").show()

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|           nconst|   string|   null|
|      primaryName|   string|   null|
|        birthYear|   string|   null|
|        deathYear|   string|   null|
|primaryProfession|   string|   null|
|   knownForTitles|   string|   null|
+-----------------+---------+-------+



In [122]:
spark.sql("Show tables").show()

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
|        |        crew_names|       true|
|        |          imdb_all|       true|
|        |            movies|       true|
|        |movies_and_ratings|       true|
|        |    principal_crew|       true|
|        |           ratings|       true|
|        |       writer_info|       true|
|        |           writers|       true|
+--------+------------------+-----------+



In [75]:
spark.sql("Drop table movies").show()

++
||
++
++



In [80]:
df_movies = spark.sql("select * from Imdb_All m where m.titleType in ('tvMovie','movie')")

In [81]:
df_movies.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [82]:
df_movies.createOrReplaceTempView('Movies')

In [85]:
spark.sql("Select distinct(m.titleType) from Movies m").show()

+---------+
|titleType|
+---------+
|  tvMovie|
|    movie|
+---------+



In [91]:
df_writers = spark.sql("Select tconst, nconst, category, job from Principal_Crew where category = 'writer'")

In [92]:
df_writers.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)



In [93]:
df_writers.createOrReplaceTempView('Writers')

In [94]:
spark.sql("Desc Writers").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  tconst|   string|   null|
|  nconst|   string|   null|
|category|   string|   null|
|     job|   string|   null|
+--------+---------+-------+



In [96]:
spark.sql("Desc Movies").show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|        tconst|   string|   null|
|     titleType|   string|   null|
|  primaryTitle|   string|   null|
| originalTitle|   string|   null|
|       isAdult|      int|   null|
|     startYear|   string|   null|
|       endYear|   string|   null|
|runtimeMinutes|   string|   null|
|        genres|   string|   null|
+--------------+---------+-------+



In [97]:
spark.sql("Desc ratings").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       tconst|   string|   null|
|averageRating|   double|   null|
|     numVotes|      int|   null|
+-------------+---------+-------+



In [98]:
df_movies_ratings = spark.sql("select m.tconst as movie_id, m.titleType as title_type, m.primaryTitle as primary_title, m.originalTitle as original_title, m.startYear as year, m.runtimeMinutes as runtime_mins, m.genres as genres, r.averageRating as avg_ratings, r.numVotes as votes from Movies m inner join ratings r on m.tconst = r.tconst")

In [99]:
df_movies_ratings.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title_type: string (nullable = true)
 |-- primary_title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- runtime_mins: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- avg_ratings: double (nullable = true)
 |-- votes: integer (nullable = true)



In [100]:
df_movies_ratings.take(5)

[Row(movie_id=u'tt0002588', title_type=u'movie', primary_title=u'Zigomar contre Nick Carter', original_title=u'Zigomar contre Nick Carter', year=u'1912', runtime_mins=u'48', genres=u'Crime,Thriller', avg_ratings=6.4, votes=14),
 Row(movie_id=u'tt0004272', title_type=u'movie', primary_title=u'Lucille Love: The Girl of Mystery', original_title=u'Lucille Love: The Girl of Mystery', year=u'1914', runtime_mins=u'300', genres=u'Action', avg_ratings=5.9, votes=8),
 Row(movie_id=u'tt0004336', title_type=u'movie', primary_title=u'The Million Dollar Mystery', original_title=u'The Million Dollar Mystery', year=u'1914', runtime_mins=u'\\N', genres=u'Adventure,Mystery', avg_ratings=5.6, votes=20),
 Row(movie_id=u'tt0005209', title_type=u'movie', primary_title=u'Don Quixote', original_title=u'Don Quixote', year=u'1915', runtime_mins=u'50', genres=u'Drama', avg_ratings=5.7, votes=9),
 Row(movie_id=u'tt0006204', title_type=u'movie', primary_title=u'The Valley of Decision', original_title=u'The Valley 

In [101]:
df_movies_ratings.createOrReplaceTempView('Movies_and_Ratings')

In [102]:
spark.sql("Desc Movies_and_Ratings").show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|      movie_id|   string|   null|
|    title_type|   string|   null|
| primary_title|   string|   null|
|original_title|   string|   null|
|          year|   string|   null|
|  runtime_mins|   string|   null|
|        genres|   string|   null|
|   avg_ratings|   double|   null|
|         votes|      int|   null|
+--------------+---------+-------+



In [103]:
spark.sql("select * from Movies_and_Ratings").show(10)

+---------+----------+--------------------+--------------------+----+------------+-----------------+-----------+-----+
| movie_id|title_type|       primary_title|      original_title|year|runtime_mins|           genres|avg_ratings|votes|
+---------+----------+--------------------+--------------------+----+------------+-----------------+-----------+-----+
|tt0002588|     movie|Zigomar contre Ni...|Zigomar contre Ni...|1912|          48|   Crime,Thriller|        6.4|   14|
|tt0004272|     movie|Lucille Love: The...|Lucille Love: The...|1914|         300|           Action|        5.9|    8|
|tt0004336|     movie|The Million Dolla...|The Million Dolla...|1914|          \N|Adventure,Mystery|        5.6|   20|
|tt0005209|     movie|         Don Quixote|         Don Quixote|1915|          50|            Drama|        5.7|    9|
|tt0006204|     movie|The Valley of Dec...|The Valley of Dec...|1916|          50|            Drama|        6.1|    7|
|tt0006489|     movie|     The Captive God|     

In [104]:
spark.sql("Desc Writers").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  tconst|   string|   null|
|  nconst|   string|   null|
|category|   string|   null|
|     job|   string|   null|
+--------+---------+-------+



In [105]:
spark.sql("Desc crew_names").show()

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|           nconst|   string|   null|
|      primaryName|   string|   null|
|        birthYear|   string|   null|
|        deathYear|   string|   null|
|primaryProfession|   string|   null|
|   knownForTitles|   string|   null|
+-----------------+---------+-------+



In [117]:
spark.sql("select job from Writers where job like 'novel %' OR job like 'Book %' OR job like 'based on %'order by job").show(200, False)

+-------------------------------------------------------+
|job                                                    |
+-------------------------------------------------------+
|Book "Heart Of The Blackhawks: The Pierre Pilote Story"|
|Book "Heart Of The Blackhawks: The Pierre Pilote Story"|
|Book Author                                            |
|Book by                                                |
|based on "A Christmas Carol" by                        |
|based on "A Film Star's Holiday" story by              |
|based on "A Giacometti Portrait" by                    |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A Song of Ice and Fire" by                   |
|based on "A S

In [119]:
df_writer_info = spark.sql("select w.tconst as movie_id, w.nconst as crew_id, w.category as category, c.primaryName as name, c.birthYear as birth_year, c.primaryProfession as profession, w.job as job_desc from Writers w inner join crew_names c on w.nconst = c.nconst")

In [120]:
df_writer_info.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- crew_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- name: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- job_desc: string (nullable = true)



In [121]:
df_writer_info.createOrReplaceTempView("Writer_Info")

In [124]:
spark.sql("select * from Writer_Info where name like '%Rowling%'").show()

+----------+---------+--------+------------+----------+--------------------+--------------------+
|  movie_id|  crew_id|category|        name|birth_year|          profession|            job_desc|
+----------+---------+--------+------------+----------+--------------------+--------------------+
| tt0241527|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|               novel|
| tt0295297|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|               novel|
| tt0304140|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|characters and un...|
| tt0304141|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|               novel|
| tt0304142|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|characters and un...|
| tt0330373|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|               novel|
| tt0373889|nm0746830|  writer|J.K. Rowling|      1965|writer,producer,s...|               novel|
| tt0388991|nm074683