In [1]:
#Problem Description
#You have two files named movies_en.json and artists_en.json containing a small movie database in the JSON format. 
# You need to load them into Spark Data frames and perform analysis.


#Spark Imports and Get Context

#https://spark.apache.org/docs/latest/sql-getting-started.html
from pyspark import SparkContext, SparkConf, SQLContext
from os import getcwd

conf = SparkConf().setAppName('SparkS1')
sc = SparkContext(conf=conf).getOrCreate()
spark = SQLContext.getOrCreate(sc)

In [3]:
#Import movieDF and print 
movieJDF = spark.read.json('datasets/movie/movies_en_1_0.txt')
movieJDF.printSchema()
movieJDF.show(5)

root
 |-- actors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- country: string (nullable = true)
 |-- director: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- year_of_birth: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+--------------------+-------+--------------------+---------------+-------+--------------------+-------------+----+
|              actors|country|            director|          genre|     id|             summary|        title|year|
+--------------------+-------+--------------------+---------------+-------+--------------------+-------------+----+
|[{artist:15, John...|    USA

In [5]:
# Import artistDF and print
artistsJDF = spark.read.json('datasets/movie/artists_en_1_2.txt')
artistsJDF.printSchema()
artistsJDF.show(5)

root
 |-- first_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- year_of_birth: string (nullable = true)

+----------+--------+---------+-------------+
|first_name|      id|last_name|year_of_birth|
+----------+--------+---------+-------------+
|     Sofia|artist:1|  Coppola|         1971|
|   Kirsten|artist:2|    Dunst|         null|
|    Alfred|artist:3|Hitchcock|         1899|
|    Ridley|artist:4|    Scott|         1937|
| Sigourney|artist:5|   Weaver|         1949|
+----------+--------+---------+-------------+
only showing top 5 rows



In [6]:
#Assignment 2 - Write a query to group titles of American movies by year

from pyspark.sql.functions import *
		
AmericanMDF = movieJDF.filter(movieJDF.country == 'USA')
AmericanMDFL = AmericanMDF.withColumn('AmericalMovies', struct('title')).groupBy('year').agg(collect_list('AmericalMovies').alias('AmericalMovies'))
AmericanMDFL.show(5,truncate = False)

+----+----------------------------------------------+
|year|AmericalMovies                                |
+----+----------------------------------------------+
|1958|[{Vertigo}]                                   |
|1983|[{Star Wars: Episode VI - Return of the Jedi}]|
|1972|[{The Godfather}]                             |
|1979|[{Alien}]                                     |
|1988|[{Die Hard}, {Rain Man}]                      |
+----+----------------------------------------------+
only showing top 5 rows



In [8]:
#Assignment 3 : Write spark code to normalize the data frames created above and store the output as Parquet files

artistsJDF.write.parquet('datasets/artistp') #artistDF as parquet, requires hadoop

##Exploding array columns
		
movieJDF1 = movieJDF.withColumn('actors',explode(col('actors'))).select(col('actors.id').alias('actorid'),col('actors.role').alias('actorrole'),'country','director','genre','id','summary','title','year')
		
movieJDF2 = movieJDF1.select('actorid','actorrole','country',col('director.first_name').alias('director_fname'),col('director.id').alias('director_id'),col('director.last_name').alias('director_lname'),col('director.year_of_birth').alias('director_YOB'),'genre','id','summary','title','year')
movieJDF2.show()
movieJDF2.write.parquet('datasets/moviesp') #movieDF as parquet,requires hadoop

#Store link between Movie, Artist and Role played 
		
artistRoles = movieJDF.withColumn('actor',explode(col('actors'))).select('title','actor.id','actor.role')
artistRoleName = artistRoles.join(artistsJDF,'id','inner').select('title','id','role',concat(col('first_name'),lit(' '),col('last_name')).alias('artist_name')).sort('title')
artistRoleName.show()


artistRoleName.write.parquet('datasets/artistrolep') #artistRoleName as parquet,requires hadoop

+----------+--------------------+-------+--------------+-----------+--------------+------------+---------------+--------+--------------------+---------------+----+
|   actorid|           actorrole|country|director_fname|director_id|director_lname|director_YOB|          genre|      id|             summary|          title|year|
+----------+--------------------+-------+--------------+-----------+--------------+------------+---------------+--------+--------------------+---------------+----+
| artist:15|       John Ferguson|    USA|        Alfred|   artist:3|     Hitchcock|        1899|          Drama| movie:1|A retired San Fra...|        Vertigo|1958|
| artist:16|    Madeleine Elster|    USA|        Alfred|   artist:3|     Hitchcock|        1899|          Drama| movie:1|A retired San Fra...|        Vertigo|1958|
|  artist:5|              Ripley|    USA|        Ridley|   artist:4|         Scott|        1937|Science-Fiction| movie:2|The commercial ve...|          Alien|1979|
|artist:109| Ros

In [10]:
#Assignement4 - Write spark code to normalize the data frames created above and store the output as 3 Hive Table in Parquet files

spark.sql('create database sparkjson')


movieJDF2.write.saveAsTable('sparkjson.movie1',format='parquet')
artistsJDF.write.saveAsTable('sparkjson.artists',format='parquet')
artistRoleName.write.saveAsTable('sparkjson.artistsrole',format='parquet')

In [11]:
spark.sql('use sparkjson')

spark.sql('show tables').show()

+---------+-----------+-----------+
| database|  tableName|isTemporary|
+---------+-----------+-----------+
|sparkjson|    artists|      false|
|sparkjson|artistsrole|      false|
|sparkjson|     movie1|      false|
+---------+-----------+-----------+



In [12]:
#Assignment 5 - Execute a Spark SQL query on hive tables to list MovieID, Title, Year, Genre, Country, Director and actors by joining on the tables
#  created in assignment 4 and display the first 5 records in command prompt
	
assignment5 = spark.sql('''select m.id,title,year,genre,country,
        concat_ws(' ',director_fname, director_lname) as director,concat_ws(' ',first_name,last_name) as actor from 
        movie1 m inner join artists a on m.actorid = a.id''')

assignment5.show(10)

+-------+-------------+----+---------------+-------+----------------+------------------+
|     id|        title|year|          genre|country|        director|             actor|
+-------+-------------+----+---------------+-------+----------------+------------------+
|movie:1|      Vertigo|1958|          Drama|    USA|Alfred Hitchcock|     James Stewart|
|movie:1|      Vertigo|1958|          Drama|    USA|Alfred Hitchcock|         Kim Novak|
|movie:2|        Alien|1979|Science-Fiction|    USA|    Ridley Scott|  Sigourney Weaver|
|movie:3|      Titanic|1997|          Drama|    USA|   James Cameron|      Kate Winslet|
|movie:3|      Titanic|1997|          Drama|    USA|   James Cameron| Leonardo DiCaprio|
|movie:5|     Face/Off|1997|         Action|    USA|        John Woo|     John Travolta|
|movie:5|     Face/Off|1997|         Action|    USA|        John Woo|      Nicolas Cage|
|movie:6|Sleepy Hollow|1999|        Mystery|    USA|      Tim Burton|       Johnny Depp|
|movie:6|Sleepy Hollo