In [1]:
%%init_spark
# Configure Spark to use a local master
launcher.master = "local"

In [2]:
// init_spark creates several variables:
// sc: spark context
// spark: spark session

// Read data into DataFrames
val reader = spark.read.option("header",true).option("inferSchema","true")
val movies = reader.csv("db/IMDb movies.csv.gz")
movies.printSchema()
val names = reader.csv("db/IMDb names.csv.gz")
names.printSchema()
val ratings = reader.csv("db/IMDb ratings.csv.gz")
ratings.printSchema()
val title_principals = reader.csv("db/IMDb title_principals.csv.gz")

Intitializing Scala interpreter ...

Spark Web UI available at http://efd27ee5dea3:4040
SparkContext available as 'sc' (version = 3.1.2, master = local, app id = local-1628302983922)
SparkSession available as 'spark'


root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- date_published: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- director: string (nullable = true)
 |-- writer: string (nullable = true)
 |-- production_company: string (nullable = true)
 |-- actors: string (nullable = true)
 |-- description: string (nullable = true)
 |-- avg_vote: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- usa_gross_income: string (nullable = true)
 |-- worlwide_gross_income: string (nullable = true)
 |-- metascore: string (nullable = true)
 |-- reviews_from_users: string (nullable = true)
 |-- reviews_from_critics: string (nullable = true)

root
 |-- imdb_name_id: string (nullable = true)
 |-- name:

reader: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@4c1c7ebe
movies: org.apache.spark.sql.DataFrame = [imdb_title_id: string, title: string ... 20 more fields]
names: org.apache.spark.sql.DataFrame = [imdb_name_id: string, name: string ... 15 more fields]
ratings: org.apache.spark.sql.DataFrame = [imdb_title_id: string, weighted_average_vote: double ... 47 more fields]
title_principals: org.apache.spark.sql.DataFrame = [imdb_title_id: string, ordering: int ... 4 more fields]


In [3]:
// DataFrame queries
// Top highest voted action movies among by 45 years old females, with total vote greater than 1000
val q1 = ratings.join(movies, "imdb_title_id").filter($"genre".contains("Action")).filter($"total_votes" > 1000).
    select("title", "year", "genre", "females_45age_avg_vote").sort($"females_45age_avg_vote".desc)
q1.explain(true)
q1.show()

== Parsed Logical Plan ==
'Sort ['females_45age_avg_vote DESC NULLS LAST], true
+- Project [title#17, year#19, genre#21, females_45age_avg_vote#167]
   +- Filter (total_votes#128 > 1000)
      +- Filter Contains(genre#21, Action)
         +- Project [imdb_title_id#126, weighted_average_vote#127, total_votes#128, mean_vote#129, median_vote#130, votes_10#131, votes_9#132, votes_8#133, votes_7#134, votes_6#135, votes_5#136, votes_4#137, votes_3#138, votes_2#139, votes_1#140, allgenders_0age_avg_vote#141, allgenders_0age_votes#142, allgenders_18age_avg_vote#143, allgenders_18age_votes#144, allgenders_30age_avg_vote#145, allgenders_30age_votes#146, allgenders_45age_avg_vote#147, allgenders_45age_votes#148, males_allages_avg_vote#149, ... 46 more fields]
            +- Join Inner, (imdb_title_id#126 = imdb_title_id#16)
               :- Relation[imdb_title_id#126,weighted_average_vote#127,total_votes#128,mean_vote#129,median_vote#130,votes_10#131,votes_9#132,votes_8#133,votes_7#134,votes_6#1

q1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [title: string, year: string ... 2 more fields]


In [4]:
// SQL version
movies.createOrReplaceTempView("movies")
names.createOrReplaceTempView("names")
ratings.createOrReplaceTempView("ratings")
title_principals.createOrReplaceTempView("title_principals")

val q2 = sql("select title, year, genre, females_45age_avg_vote from movies join ratings using (imdb_title_id) where genre like '%Action%' and total_votes > 1000 order by females_45age_avg_vote desc")
q2.explain(true)
q2.show()

== Parsed Logical Plan ==
'Sort ['females_45age_avg_vote DESC NULLS LAST], true
+- 'Project ['title, 'year, 'genre, 'females_45age_avg_vote]
   +- 'Filter ('genre LIKE %Action% AND ('total_votes > 1000))
      +- 'Join UsingJoin(Inner,ArrayBuffer(imdb_title_id))
         :- 'UnresolvedRelation [movies], [], false
         +- 'UnresolvedRelation [ratings], [], false

== Analyzed Logical Plan ==
title: string, year: string, genre: string, females_45age_avg_vote: double
Sort [females_45age_avg_vote#167 DESC NULLS LAST], true
+- Project [title#17, year#19, genre#21, females_45age_avg_vote#167]
   +- Filter (genre#21 LIKE %Action% AND (total_votes#128 > 1000))
      +- Project [imdb_title_id#16, title#17, original_title#18, year#19, date_published#20, genre#21, duration#22, country#23, language#24, director#25, writer#26, production_company#27, actors#28, description#29, avg_vote#30, votes#31, budget#32, usa_gross_income#33, worlwide_gross_income#34, metascore#35, reviews_from_users#36, rev

q2: org.apache.spark.sql.DataFrame = [title: string, year: string ... 2 more fields]
