In [1]:
import time

import pandas as pd

from dotenv import load_dotenv
from pyspark.sql import SparkSession

In [2]:
load_dotenv()

True

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df = (
    spark.read
    .option("escape", '\"')
    .option("multiLine", "true")
    .csv("datasets/union.csv", header=True, inferSchema=True)
)

df = df.drop("_c0")
df.createOrReplaceTempView("titles")

In [5]:
def movies_and_tv_shows():
    query = """
        SELECT type, COUNT(DISTINCT show_id) AS count
        FROM titles
        GROUP BY type
        ORDER BY count DESC
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_1 = movies_and_tv_shows()
print("Execution time (ns):", time.time_ns() - start)

df_1

Execution time (ns): 4388419500


Unnamed: 0,type,count
0,Movie,16481
1,TV Show,6517


In [6]:
def movies_and_tv_shows_by_platform():
    query = """
        SELECT platform, type, COUNT(DISTINCT show_id) AS count
        FROM titles
        GROUP BY platform, type
        ORDER BY platform, type
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_2 = movies_and_tv_shows_by_platform()
print("Execution time (ns):", time.time_ns() - start)

df_2

Execution time (ns): 2025363900


Unnamed: 0,platform,type,count
0,amazon_prime,Movie,7814
1,amazon_prime,TV Show,1854
2,disney,Movie,1052
3,disney,TV Show,398
4,hulu,Movie,1484
5,hulu,TV Show,1589
6,netflix,Movie,6131
7,netflix,TV Show,2676


In [7]:
def movies_and_tv_shows_trend():
    query = """
        SELECT type, release_year, COUNT(DISTINCT show_id) AS count
        FROM titles
        GROUP BY type, release_year
        ORDER BY type, release_year
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_3 = movies_and_tv_shows_trend()
print("Execution time (ns):", time.time_ns() - start)

df_3

Execution time (ns): 2502962200


Unnamed: 0,type,release_year,count
0,Movie,1920,3
1,Movie,1922,2
2,Movie,1923,2
3,Movie,1924,1
4,Movie,1925,8
...,...,...,...
169,TV Show,2017,571
170,TV Show,2018,711
171,TV Show,2019,808
172,TV Show,2020,853


In [8]:
def movies_and_tv_shows_trend_by_platform():
    query = """
        SELECT platform, type, release_year, COUNT(DISTINCT show_id) AS count
        FROM titles
        GROUP BY platform, type, release_year
        ORDER BY platform, type, release_year
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_4 = movies_and_tv_shows_trend_by_platform()
print("Execution time (ns):", time.time_ns() - start)

df_4

Execution time (ns): 2447554500


Unnamed: 0,platform,type,release_year,count
0,amazon_prime,Movie,1920,3
1,amazon_prime,Movie,1922,2
2,amazon_prime,Movie,1923,1
3,amazon_prime,Movie,1924,1
4,amazon_prime,Movie,1925,8
...,...,...,...,...
531,netflix,TV Show,2017,265
532,netflix,TV Show,2018,380
533,netflix,TV Show,2019,397
534,netflix,TV Show,2020,436


In [9]:
def movies_and_tv_shows_by_rating():
    query = """
        SELECT rating, COUNT(DISTINCT show_id) AS count
        FROM titles
        WHERE rating IS NOT NULL
            AND rating NOT LIKE '%min'
            AND rating NOT LIKE '%Season'
            AND rating NOT LIKE '%Seasons'
        GROUP BY rating
        ORDER BY count DESC
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_5 = movies_and_tv_shows_by_rating()
print("Execution time (ns):", time.time_ns() - start)

df_5

Execution time (ns): 1669582100


Unnamed: 0,rating,count
0,TV-MA,3675
1,TV-14,3138
2,R,2154
3,13+,2117
4,TV-PG,1654
5,16+,1547
6,ALL,1268
7,18+,1243
8,PG-13,1112
9,PG,881


In [10]:
def movies_and_tv_shows_by_duration():
    query = """
        SELECT
            CASE
                WHEN duration_min >= 0 AND duration_min <= 30 THEN 'Below 30 minutes'
                WHEN duration_min > 30 AND duration_min <= 60 THEN '30 - 60 minutes'
                WHEN duration_min > 60 AND duration_min <= 120 THEN '1 - 2 hours'
                WHEN duration_min > 120 then 'Above 2 hours'
            END AS duration,
            COUNT(DISTINCT show_id) AS count
        FROM titles
        WHERE duration_min IS NOT NULL
        GROUP BY
            CASE
                WHEN duration_min >= 0 AND duration_min <= 30 THEN 'Below 30 minutes'
                WHEN duration_min > 30 AND duration_min <= 60 THEN '30 - 60 minutes'
                WHEN duration_min > 60 AND duration_min <= 120 THEN '1 - 2 hours'
                WHEN duration_min > 120 then 'Above 2 hours'
            END
        ORDER BY duration
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_6 = movies_and_tv_shows_by_duration()
print("Execution time (ns):", time.time_ns() - start)

df_6

Execution time (ns): 1579648500


Unnamed: 0,duration,count
0,1 - 2 hours,11318
1,30 - 60 minutes,1468
2,Above 2 hours,2434
3,Below 30 minutes,779


In [11]:
def top_genres_by_platform_and_shows():
    query = """
        WITH base_table AS (
            SELECT platform, listed_in, COUNT(DISTINCT show_id) AS count
            FROM titles
            GROUP BY platform, listed_in
        ),
        group_ranked_table AS (
            SELECT
                platform,
                listed_in,
                count,
                ROW_NUMBER() OVER (PARTITION BY platform ORDER BY count DESC) AS rank
            FROM base_table
        )
        SELECT platform, listed_in, count
        FROM group_ranked_table
        WHERE rank <= 10
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_7 = top_genres_by_platform_and_shows()
print("Execution time (ns):", time.time_ns() - start)

df_7

Execution time (ns): 1927036400


Unnamed: 0,platform,listed_in,count
0,amazon_prime,Drama,3687
1,amazon_prime,Comedy,2099
2,amazon_prime,Action,1657
3,amazon_prime,Suspense,1501
4,amazon_prime,Kids,1085
5,amazon_prime,Documentary,993
6,amazon_prime,Special Interest,980
7,amazon_prime,Horror,875
8,amazon_prime,Romance,674
9,amazon_prime,Animation,547


In [12]:
def top_countries_by_shows():
    query = """
        SELECT country, COUNT(DISTINCT show_id) AS count
        FROM titles
        WHERE country IS NOT NULL
        GROUP BY country
        ORDER BY count DESC
        LIMIT 10
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_8 = top_countries_by_shows()
print("Execution time (ns):", time.time_ns() - start)

df_8

Execution time (ns): 1371905600


Unnamed: 0,country,count
0,United States,6304
1,India,1299
2,United Kingdom,1172
3,Canada,645
4,Japan,615
5,France,475
6,Germany,293
7,South Korea,260
8,Spain,258
9,Australia,225


In [13]:
def top_directors_by_shows():
    query = """
        SELECT director, COUNT(DISTINCT show_id) AS count
        FROM titles
        WHERE director IS NOT NULL
        GROUP BY director
        ORDER BY count DESC
        LIMIT 10
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_9 = top_directors_by_shows()
print("Execution time (ns):", time.time_ns() - start)

df_9

Execution time (ns): 1319856500


Unnamed: 0,director,count
0,Mark Knight,113
1,Cannis Holder,61
2,Jay Chapman,46
3,Moonbug Entertainment,37
4,Arthur van Merwijk,30
5,Manny Rodriguez,27
6,Jay Karas,22
7,John English,20
8,Rajiv Chilaka,19
9,"Raúl Campos, Jan Suter",18


In [14]:
def top_actors_by_shows():
    query = """
        SELECT cast, COUNT(DISTINCT show_id) AS count
        FROM titles
        WHERE cast IS NOT NULL
        GROUP BY cast
        ORDER BY count DESC
        LIMIT 10
    """

    return spark.sql(query).toPandas()

start = time.time_ns()
df_10 = top_actors_by_shows()
print("Execution time (ns):", time.time_ns() - start)

df_10

Execution time (ns): 1811310100


Unnamed: 0,cast,count
0,Anupam Kher,60
1,Maggie Binkley,56
2,Amitabh Bachchan,47
3,Shah Rukh Khan,46
4,Jim Cummings,44
5,Nassar,43
6,Akshay Kumar,41
7,Danny Trejo,39
8,Paresh Rawal,39
9,Naseeruddin Shah,39
