In [2]:
import os
import configparser
import numpy as np
from datetime import datetime 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, col, monotonically_increasing_id, collect_list
from pyspark.sql.types import TimestampType, IntegerType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek

In [3]:
config = configparser.ConfigParser()
config.read('settings.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('KEYS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('KEYS', 'AWS_SECRET_ACCESS_KEY')
input_dir = config.get('FILES', 'INPUT_DIR')
output_dir = config.get('FILES', 'OUTPUT_DIR')
current_year = datetime.now().year
filter_by_region = config.get('FILTERS', 'region')

imdb_input = input_dir + "/imdb"
tmdb_input = input_dir + "/tmdb"

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [11]:
#Basic title
imdb_titles_df = spark.read.options(header=True, inferSchema=True, nullValue="\\N") \
        .csv(imdb_input + '/title.basics.tsv.gz', sep='\t')

In [6]:
imdb_movie_titles_df = imdb_titles_df.select(
    col("tconst").alias("title_id"),
    col("titleType").alias("title_type"),
    col("primaryTitle").alias("primary_title"),
    col("originalTitle").alias("original_title"),
    col("genres"),
    col("runtimeMinutes").alias("runtime"),
    col("isAdult").alias("is_adult"),
    col("startYear").alias("start_year"),
    col("endYear").alias("end_year")
)

In [7]:
imdb_movie_titles_df = imdb_movie_titles_df.filter((imdb_titles_df.titleType == 'movie') & (imdb_titles_df['isAdult'] == '0'))

In [13]:
imdb_title_akas_df = spark.read.options(header=True, inferSchema=True, nullValue="\\N") \
    .csv(imdb_input + '/title.akas.tsv.gz', sep='\t')

In [66]:
movies_titles = imdb_movie_titles_df \
    .join(imdb_title_akas_df, imdb_movie_titles_df.title_id == imdb_title_akas_df.titleId, "left") \
    .select(col("titleId").alias("imdb_title_id"),
            imdb_title_akas_df.title,
            "language",
            "region",
            "is_adult",
            "start_year",
            ).distinct()

In [67]:
movies_titles = movies_titles.filter((col("language") == "en") & col("region").isNotNull() & col("start_year").isNotNull())

In [68]:
filter_by_region = config.get('FILTERS', 'region')
filter_by_year = config.get('FILTERS', 'year')

condition = None

if filter_by_year and filter_by_region:
    condition = "start_year == {0} and region == {1}".format(filter_by_year, filter_by_region)

elif filter_by_region and not filter_by_year:
    condition = "region == {0}".format(filter_by_region)

elif filter_by_year and not filter_by_region:
    condition = "start_year == {0}".format(filter_by_year)

if condition is not None:
    movies_titles = movies_titles.filter(condition)
    
movies_titles.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- is_original_title: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- region: string (nullable = true)
 |-- is_adult: integer (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- runtime: string (nullable = true)



In [13]:
movies_titles.write.partitionBy("region", "start_year").format("parquet") \
        .save(output_dir + '/movies_titles', mode="overwrite")

In [12]:
#TMDB movies list
tmdb_movies_df = spark.read.options(inferSchema=True, nullValue="\\N").json(tmdb_input+'/movies_list', multiLine=True)

In [8]:
tmdb_movies = tmdb_movies_df.select("id", "imdb_id", "revenue", "budget", "overview", "popularity", "release_date", "status", "vote_average", "vote_count")

In [5]:
tmdb_movies_df= spark.read.parquet(output_dir + '/tmdb_movies')

In [9]:
tmdb_movies.printSchema()

root
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- budget: long (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)



In [13]:
movies_titles_df = spark.read.parquet(output_dir + '/movies_titles')

In [14]:
movies_titles_df.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- is_original_title: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- is_adult: integer (nullable = true)
 |-- runtime: string (nullable = true)
 |-- region: string (nullable = true)
 |-- start_year: integer (nullable = true)



In [22]:
#Movie details

imdb_movie_titles_df = imdb_titles_df.select(
    col("tconst").alias("imdb_title_id"),
    col("titleType").alias("title_type"),
    col("primaryTitle").alias("primary_title"),
    col("originalTitle").alias("original_title"),
    col("genres"),
    col("runtimeMinutes").alias("runtime"),
    col("isAdult").alias("is_adult"),
    col("startYear").alias("start_year"),
)

imdb_movie_titles_df = imdb_movie_titles_df.filter((imdb_movie_titles_df['title_type'] == 'movie') & (imdb_movie_titles_df['is_adult'] == '0'))

# movies_details query combining imdb & tmdb movies
movies_details = imdb_movie_titles_df \
    .join(imdb_title_akas_df, imdb_movie_titles_df.imdb_title_id == imdb_title_akas_df.titleId, "left")\
    .join(tmdb_movies_df, imdb_movie_titles_df.imdb_title_id == tmdb_movies_df.imdb_id, "left") \
    .select(
        "imdb_title_id",
        imdb_title_akas_df.title,
        imdb_movie_titles_df.original_title,
        imdb_movie_titles_df.primary_title,
        col("isOriginalTitle").alias("is_original_title"),
        imdb_title_akas_df.ordering,
        imdb_movie_titles_df.genres,
        imdb_title_akas_df.language,
        imdb_title_akas_df.region,
        tmdb_movies_df.overview,
        imdb_movie_titles_df.start_year,
        imdb_movie_titles_df.runtime,
        tmdb_movies_df.release_date
)

filter_by_region = config.get('FILTERS', 'region')
filter_by_year = config.get('FILTERS', 'year')

condition = None

if filter_by_year and filter_by_region:
    condition = "start_year == {0} and region == {1}".format(filter_by_year, filter_by_region)

elif filter_by_region and not filter_by_year:
    condition = "region == {0}".format(filter_by_region)

elif filter_by_year and not filter_by_region:
    condition = "start_year == {0}".format(filter_by_year)

if condition is not None:
    movies_details = movies_details.filter(condition)
    
movies_details.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- primary_title: string (nullable = true)
 |-- is_original_title: integer (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- language: string (nullable = true)
 |-- region: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- runtime: string (nullable = true)
 |-- release_date: string (nullable = true)



In [23]:
# movies_details table parquet
movies_details.write.partitionBy("region", "start_year").format("parquet").save(output_dir + '/movies_titles_details', mode="overwrite")

KeyboardInterrupt: 

In [21]:
#IMDB ratings DF
imdb_ratings_df = spark.read.options(header=True, inferSchema=True, nullValue="\\N").csv(imdb_input + '/title.ratings.tsv.gz', sep='\t')
imdb_ratings_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [31]:
movies_ratings = movies_titles_df.join(imdb_ratings_df, movies_titles_df.imdb_title_id == imdb_ratings_df.tconst, "left") \
        .join(tmdb_movies_df, movies_titles_df.imdb_title_id == tmdb_movies_df.imdb_id, "left") \
        .select(
        "imdb_title_id",
        movies_titles_df.title,
        "region",
        "language",
        "start_year",
        col("numVotes").alias("imdb_total_votes"),
        col("averageRating").alias("imdb_avg_rating"),
        col("vote_count").alias("tmdb_total_votes"),
        col("vote_average").alias("tmdb_avg_rating")
)

movies_ratings.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- imdb_total_votes: integer (nullable = true)
 |-- imdb_avg_rating: double (nullable = true)
 |-- tmdb_total_votes: long (nullable = true)
 |-- tmdb_avg_rating: double (nullable = true)



In [33]:
#IMDB Ratings table write parquet
movies_ratings.write.partitionBy("region", "start_year").format("parquet").save(output_dir + '/movies_ratings', mode="overwrite")

In [5]:
imdb_names_df = spark.read.option("header", "true").csv(imdb_input + '/name.basics.tsv.gz', sep='\t')
imdb_names_df.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [10]:
moviesTitlesMap = movies_details.select("imdb_title_id").distinct().rdd.map(lambda row: row['imdb_title_id']).collect()

In [18]:
distinct_titleIds = movies_details.select("imdb_title_id").distinct()

In [7]:
imdb_principals_df = spark.read.options(header=True, inferSchema=True, nullValue="\\N").csv(imdb_input + '/title.principals.tsv.gz', sep='\t')
imdb_principals_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [32]:
imdb_principal_crew = imdb_principals_df.select(
    col("tconst").alias("imdb_title_id"),
    "ordering",
    col("nconst").alias("imdb_name_id"),
    "category",
    "job",
    "characters"
).filter(imdb_principals_df['tconst'].isin(sampleIds))

In [None]:
imdb_principal_crew.write.partitionBy("imdb_title_id").format("parquet").save(output_dir + '/movies_principal_crew', mode="overwrite")

In [36]:
movieCrewNameIds = imdb_principal_crew.select("imdb_name_id").distinct().rdd.map(lambda row: row['imdb_name_id']).collect()

In [39]:
imdb_names_df = spark.read.options(header=True, inferSchema=True, nullValue="\\N").csv(imdb_input + '/name.basics.tsv.gz', sep='\t')
imdb_names_df.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- deathYear: integer (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [42]:
imdb_crew_names = imdb_names_df.select(
    col("nconst").alias("imdb_name_id"),
    col("primaryName").alias("primary_name"),
    col("birthYear").alias("birth_year"),
    col("deathYear").alias("death_year"),
    col("primaryProfession").alias("primary_profession"),
    col("knownForTitles").alias("known_for_titles")
).filter(imdb_names_df['nconst'].isin(movieCrewNameIds))

In [10]:
#Movie finance
movies_finances = movies_titles_df\
        .join(tmdb_movies_df, movies_titles_df.imdb_title_id == tmdb_movies_df.imdb_id)\
        .select(
            col("imdb_title_id"),
            movies_titles_df.title,
            movies_titles_df.language,
            movies_titles_df.region,
            movies_titles_df.start_year,
            col("revenue"),
            col("budget")
        )

movies_finances.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- region: string (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- revenue: long (nullable = true)
 |-- budget: long (nullable = true)



In [11]:
movies_finances.write.partitionBy("region", "start_year").format("parquet").save(output_dir + '/movies_finances', mode="overwrite")

In [18]:
movies_ratings = spark.read.parquet(output_dir + "/movies_ratings")

In [19]:
movies_ratings.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- imdb_total_votes: integer (nullable = true)
 |-- imdb_avg_rating: double (nullable = true)
 |-- tmdb_total_votes: long (nullable = true)
 |-- tmdb_avg_rating: double (nullable = true)
 |-- region: string (nullable = true)
 |-- start_year: integer (nullable = true)



In [20]:
movies_ratings.createOrReplaceTempView("movies_ratings")

In [21]:
top_american_english_movies = spark.sql('''SELECT 
              imdb_title_id, 
              collect_list(title) as title, 
              imdb_total_votes, 
              imdb_avg_rating, 
              region, 
              language, 
              start_year 
            FROM movies_ratings 
            WHERE language IS NULL OR language = 'en' 
            GROUP BY 
              imdb_title_id, 
              imdb_total_votes, 
              imdb_avg_rating, 
              region, 
              language, 
              start_year 
            ORDER BY imdb_total_votes DESC''')

In [22]:
top_american_english_movies.show(10, False)

+-------------+------------------------------------------------------+----------------+---------------+------+--------+----------+
|imdb_title_id|title                                                 |imdb_total_votes|imdb_avg_rating|region|language|start_year|
+-------------+------------------------------------------------------+----------------+---------------+------+--------+----------+
|tt10872600   |[Spider-Man: No Way Home, Serenity Now]               |521999          |8.6            |US    |null    |2021      |
|tt1160419    |[Dune]                                                |512910          |8.1            |US    |null    |2021      |
|tt11286314   |[Don't Look Up]                                       |462155          |7.2            |US    |null    |2021      |
|tt12361974   |[The Snyder Cut, Zack Snyder's Justice League]        |364815          |8.1            |US    |null    |2021      |
|tt3480822    |[Blue Bayou, Black Widow]                             |341616       

In [19]:
movies_finances = spark.read.parquet(output_dir + '/movies_finances')
movies_finances.createOrReplaceTempView("movies_finances")

In [20]:
top_grossing_american_english_movies = spark.sql('''
    SELECT 
      imdb_title_id, 
      collect_list(title) as title, 
      revenue, 
      budget, 
      region, 
      language, 
      start_year 
    FROM 
      movies_finances 
    WHERE revenue IS NOT NULL AND (language IS NULL OR language = 'en') 
    GROUP BY 
      imdb_title_id, 
      revenue, 
      budget, 
      region, 
      language, 
      start_year 
    ORDER BY revenue DESC
''')

AnalysisException: "cannot resolve '`imdb_title_id`' given input columns: [movies_finances.title, movies_finances.language, movies_finances.start_year, movies_finances.region, movies_finances.revenue, movies_finances.imdb_id, movies_finances.budget]; line 14 pos 6;\n'Sort ['revenue DESC NULLS LAST], true\n+- 'Aggregate ['imdb_title_id, revenue#803L, budget#804L, region#805, language#802, start_year#806], ['imdb_title_id, collect_list(title#801, 0, 0) AS title#814, revenue#803L, budget#804L, region#805, language#802, start_year#806]\n   +- Filter (isnotnull(revenue#803L) && (isnull(language#802) || (language#802 = en)))\n      +- SubqueryAlias `movies_finances`\n         +- Relation[imdb_id#800,title#801,language#802,revenue#803L,budget#804L,region#805,start_year#806] parquet\n"

In [127]:
top_grossing_american_english_movies.show(10, False)

[]