In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, min, avg, col, \
current_date, year, row_number, explode, split, floor, expr, count
from pyspark.sql.window import Window

['C:\\Users\\csiha\\AppData\\Local\\Programs\\PyCharm Professional\\plugins\\python\\helpers-pro\\jupyter_debug', 'C:\\Users\\csiha\\AppData\\Local\\Temp\\spark-c5a40620-0828-4f19-bfd9-d010aefbce4f\\userFiles-9c5c2496-aff9-4e36-9b0b-284e8a702a3b', 'C:\\Users\\csiha\\AppData\\Local\\Programs\\PyCharm Professional\\plugins\\python\\helpers\\pydev', 'C:\\Users\\csiha\\PycharmProjects\\Pyspark\\Task1', 'C:\\Users\\csiha\\PycharmProjects\\Pyspark\\Task1', 'C:\\Users\\csiha\\AppData\\Local\\Programs\\Python\\Python310\\python310.zip', 'C:\\Users\\csiha\\AppData\\Local\\Programs\\Python\\Python310\\DLLs', 'C:\\Users\\csiha\\AppData\\Local\\Programs\\Python\\Python310\\lib', 'C:\\Users\\csiha\\AppData\\Local\\Programs\\Python\\Python310', 'C:\\Users\\csiha\\.virtualenvs\\Task1', '', 'C:\\Users\\csiha\\.virtualenvs\\Task1\\lib\\site-packages', 'C:\\Users\\csiha\\.virtualenvs\\Task1\\lib\\site-packages\\win32', 'C:\\Users\\csiha\\.virtualenvs\\Task1\\lib\\site-packages\\win32\\lib', 'C:\\Users\\

In [None]:
spark = SparkSession.builder.master('local').appName('IMDb_analysis').getOrCreate()

spark.sparkContext.setLogLevel('WARN')

In [2]:
#Creating a spark session, and the pathways


NAME_DATA_PATH = './persons.tsv'
RATINGS_PATH = './ratings.tsv'
ORDERS_PATH = './orders.tsv'
BASICS_PATH = './basics.tsv'
EPISODES_PATH = './episodes.tsv'
CREW_PATH = './crew.tsv'
PRINCIPALS_PATH = './principals.tsv'

In [3]:
def get_all_ratings():
    df_ratings = spark.read.option('sep', '\t').csv(RATINGS_PATH,
                                                 header=True, inferSchema=True)
    
    return df_ratings
get_all_ratings()

DataFrame[tconst: string, averageRating: double, numVotes: int]

In [4]:
#Relevant rows from rating df

def get_relevant_rating():
    df_rating = get_all_ratings()

    relevant_ratings = df_rating.filter(df_rating['numVotes'] > 100_000)
    return relevant_ratings
get_relevant_rating()

DataFrame[tconst: string, averageRating: double, numVotes: int]

In [5]:
#Movies

def get_movies():  
    df_basics = spark.read.option('sep', '\t').csv(BASICS_PATH, 
                                                header = True, inferSchema = True)
    df_movies = df_basics.filter(df_basics['titleType'] == 'movie')
    return df_movies
get_movies()

DataFrame[tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string]

In [6]:
#Movies with more than 100 000 votes

def get_relevant_movies():
    relevant_ratings = get_relevant_rating()
    df_movies = get_movies()
    relevant_movies = relevant_ratings.join(df_movies, on = 'tconst', how = 'inner')
    return relevant_movies
get_relevant_movies()

DataFrame[tconst: string, averageRating: double, numVotes: int, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string]

In [7]:
#Actors from principal table

def get_actors():
    principal_df = spark.read.option('sep', '\t').csv(PRINCIPALS_PATH, 
                                                header = True, inferSchema = True)
    principal_df = principal_df.filter(principal_df['category'] == 'actor')

    return principal_df
get_actors()

DataFrame[tconst: string, ordering: int, nconst: string, category: string, job: string, characters: string]

In [8]:
# Directors from principal table

def get_principal_directors():
    principal_dir_df = spark.read.option('sep', '\t').csv(PRINCIPALS_PATH, 
                                                header = True, inferSchema = True)
    principal_dir_df = principal_dir_df.filter(principal_dir_df['category'] == 'director')

    return principal_dir_df
get_principal_directors()

DataFrame[tconst: string, ordering: int, nconst: string, category: string, job: string, characters: string]

In [8]:
#Get the best 100 movies of all time

def all_time_best_100movies():
    relevant_movies = get_relevant_movies()
    
    all_time_best_top100 = relevant_movies.orderBy(col('averageRating').desc()).limit(100)
    desired_columns_order=['tconst','primaryTitle', 'numVotes', 'averageRating', 'startYear']
    all_time_best_top100 = all_time_best_top100.select(desired_columns_order)
    return all_time_best_top100
all_time_best_100movies()

DataFrame[tconst: string, primaryTitle: string, numVotes: int, averageRating: double, startYear: string]

In [None]:
#Display the best 100 movies

def show_all_time_best_100movies():
    all_time_best_100_movies_df = all_time_best_100movies()
    all_time_best_100_movies_df.show()

# To display the DataFrame:
show_all_time_best_100movies()

In [None]:
#Get the best movies from the past 10 years

def get_best_movies_last10_years():
    current_year = year(current_date())
    last_10_years_range = (current_year - 9, current_year)
    relevant_movies = get_relevant_movies()
    
    desired_columns_order=['tconst','primaryTitle', 'numVotes', 'averageRating', 'startYear']
    best_last_10years = relevant_movies.filter(
        (relevant_movies['startYear'] >= last_10_years_range[0]) &
        (relevant_movies['startYear'] <= last_10_years_range[1])
    )
    best_last_10years = best_last_10years.select(desired_columns_order).orderBy(col('averageRating').desc()).limit(100)
    best_last_10years.show()
get_best_movies_last10_years()

In [None]:
#Get the most popular movies from the 60s

def get_popular_movies_60s():
    year_range_60s = (1960, 1969)
    relevant_movies = get_relevant_movies()
    
    desired_columns_order=['tconst','primaryTitle', 'numVotes', 'averageRating', 'startYear']
    best_films_60s = relevant_movies.filter(
        (relevant_movies['startYear'] >= year_range_60s[0]) &
        (relevant_movies['startYear'] <= year_range_60s[1])
    )
    best_films_60s = best_films_60s.select(desired_columns_order).limit(100).orderBy(col('averageRating').desc())
    best_films_60s.show()
get_popular_movies_60s()

In [None]:
#2nd task
def top10_films_by_genre():
    relevant_movies = get_relevant_movies()
    
    split_movies = relevant_movies.withColumn("genres",
                                 split(col("genres"), ","))
    exploded_movies = split_movies.withColumn('genre', explode(col('genres')))
    window_spec = Window.partitionBy('genre').orderBy(col('averageRating').desc())

    ranked_movies = exploded_movies.withColumn('rank', row_number().over(window_spec))

    top_10_movies_by_genre = ranked_movies.filter(col('rank') <= 10)

    selected_columns = ['tconst', 'averageRating', 'numVotes', 'primaryTitle', 'startYear', 'genre']
    result_df = top_10_movies_by_genre.select(selected_columns)
    return result_df

top10_films_by_genre()

In [None]:
#3rd task
def get_populat_movies_by_genre_and_decade():
    relevant_movies = get_relevant_movies()

    split_movies = relevant_movies.withColumn("genres", split(col("genres"), ","))
    exploded_movies = split_movies.withColumn('genre', explode(col('genres')))
    year_range_movies = exploded_movies.withColumn('yearRange', expr(f"concat(floor(startYear/10) *10, \
                                                                    '-', floor(startYear/10)*10 + 9)"))
    window_spec = Window.partitionBy('yearRange', 'genre').orderBy(col('averageRating').desc())
    ranked_movies = year_range_movies.withColumn('rank', row_number().over(window_spec))
    top_10_movies_by_genre = ranked_movies.filter(col('rank') <= 10)

    selected_columns = ['tconst', 'averageRating', 'numVotes', 'primaryTitle', 'startYear', 'genre', 'yearRange']
    result_df = top_10_movies_by_genre.select(selected_columns)
    final_ordered_df = result_df.orderBy(col('yearRange').desc())

    final_ordered_df.show()
get_populat_movies_by_genre_and_decade()

In [None]:
#Task 4

#Get the actors 
def get_names(NAME_DATA_PATH):
    df_names = spark.read.option('sep', '\t').csv(NAME_DATA_PATH,
                                                  header=True, inferSchema=True)
    split_names = df_names.withColumn("knownForTitles", split(col("knownForTitles"), ","))
    exploded_names = split_names.withColumn('knownForTitles', explode(col('knownForTitles')))
    profession_split = exploded_names.withColumn("primaryProfession", split(col('primaryProfession'), ','))
    profession_explode = profession_split.withColumn('primaryProfession', explode(col('primaryProfession')))
    actors = profession_explode.filter(profession_explode.primaryProfession == 'actor')

    desired_columns = ["nconst", "primaryName", "deathYear"]
    actors = actors.select(desired_columns)

    return actors

get_names(NAME_DATA_PATH)

In [None]:
def print_actors():
    actors = get_names(NAME_DATA_PATH)
    actors.show()
    
print_actors()

In [None]:
# Get the directors

def get_directors(NAME_DATA_PATH):
    df_names = spark.read.option('sep', '\t').csv(NAME_DATA_PATH,
                                                 header=True, inferSchema=True)
    split_names = df_names.withColumn("knownForTitles", split(col("knownForTitles"), ","))
    exploded_names = split_names.withColumn('knownForTitles', explode(col('knownForTitles')))
    profession_split = exploded_names.withColumn("primaryProfession", split(col('primaryProfession'), ','))
    profession_explode = profession_split.withColumn('primaryProfession', explode(col('primaryProfession')))
    directors = profession_explode.filter(profession_explode.primaryProfession == 'director')
    
    
    return directors
get_directors(NAME_DATA_PATH)

In [None]:
def print_directors():
    directors = get_directors(NAME_DATA_PATH)
    directors.show()
    
print_directors()

In [None]:
# Search for mutual elements between the 100 best films and principal attributes

def join_movies_principals():
    best_movies = all_time_best_100movies()
    principals = get_actors(PRINCIPALS_PATH)
    
    movie_principal_df = principals.join(best_movies, on = 'tconst', how = 'inner')
    return  movie_principal_df

join_movies_principals()


In [None]:
def show_join_movies_principals():
    mv_pr = get_actors_in_best_movies()
    mv_pr.show()
show_join_movies_principals()

In [None]:
# Find the actors who played in the best 100 film in at least 2 different roles 

def join_movie_principal_df_actors():
    movie_princ = get_actors_in_best_movies()
    actors = get_names(NAME_DATA_PATH)
    
    actor_movie_princ = actors.join(movie_princ, on='nconst', how='inner')
    actor_movie_princ = actor_movie_princ.dropDuplicates(subset=["tconst"])

    actor_counts = actor_movie_princ.groupBy("primaryName").agg(count("*").alias("appearance_count"))
    actor_counts_filtered = actor_counts.filter(actor_counts.appearance_count >= 2)
    
    actor_counts_filtered.show()
    return actor_counts_filtered

join_movie_principal_df_actors()

In [None]:
#Task 5:

def join_directors_principal():
    directors = get_directors(NAME_DATA_PATH)
    principal_dir = get_principal_directors(PRINCIPALS_PATH)
    dir_princ = directors.join(principal_dir, on = 'nconst', how = 'inner')
    return dir_princ
join_directors_principal()
    

In [None]:
def get_movies_per_director():
    dir_princ = join_directors_principal()
    movies = get_movies(BASICS_PATH)
    movies_per_director = dir_princ.join(movies, on='tconst', how='inner')
    
    columns_to_keep = ['tconst', 'primaryName', 'primaryTitle', 'startYear']
    movies_per_director = movies_per_director.select(columns_to_keep)
    
    return movies_per_director

get_movies_per_director()
    

In [None]:
def top5_movies_per_director():
    movies_per_director = get_movies_per_director()
    #[tconst: string, primaryName: string, primaryTitle: string, startYear: string]
    df_ratings = get_all_ratings(RATINGS_PATH)
    #[tconst: string, averageRating: double, numVotes: int]
    
    movies_per_director_with_ratings = movies_per_director.join(df_ratings, on='tconst', how='inner')
    
    window_spec = Window.partitionBy("primaryName").orderBy(df_ratings["averageRating"].desc())
    
    movies_per_director_with_ratings = movies_per_director_with_ratings.withColumn("rank", row_number().over(window_spec))
    
    top5_movies_per_director = movies_per_director_with_ratings.filter(movies_per_director_with_ratings["rank"] <= 5)
    
    needed_columns = ['primaryName', 'primaryTitle', 'startYear', 'averageRating', 'numVotes']
    top5_movies_per_director = top5_movies_per_director.select(needed_columns).dropDuplicates(subset=["primaryTitle"])
    
    top5_movies_per_director.show()
    return top5_movies_per_director

top5_movies_per_director()