In [1]:
import findspark

findspark.init()

In [2]:
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, when
import os

In [4]:
postgresql_user = os.getenv('POSTGRESQL_USER')
postgresql_password = os.getenv('POSTGRESQL_PASSWORD')
spark_home = os.getenv('SPARK_HOME')

In [5]:
spark = SparkSession.builder \
    .config('spark.jars', f"{spark_home}/jars/postgresql-42.5.1.jar") \
    .master('local[*]') \
    .appName('pagila-queries') \
    .getOrCreate()

22/12/24 02:15:21 WARN Utils: Your hostname, archlinux resolves to a loopback address: 127.0.1.1; using 192.168.0.109 instead (on interface wlp0s20f3)
22/12/24 02:15:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/12/24 02:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
data_frame_reader = spark.read \
    .format('jdbc') \
    .option('driver', 'org.postgresql.Driver') \
    .option('url', 'jdbc:postgresql://localhost:5432/postgres') \
    .option('user', postgresql_user) \
    .option('password', postgresql_password)

In [7]:
def read_table(table_name: str) -> ps.sql.DataFrame:
    return data_frame_reader.option('dbtable', table_name).load()

In [8]:
actor = read_table('actor')
address = read_table('address')
category = read_table('category')
city = read_table('city')
country = read_table('country')
customer = read_table('customer')
film = read_table('film')
film_actor = read_table('film_actor')
film_category = read_table('film_category')
inventory = read_table('inventory')
language = read_table('language')
payment = read_table('payment')
rental = read_table('rental')
staff = read_table('staff')
store = read_table('store')

In [9]:
first = category \
    .join(film_category, on='category_id', how='left') \
    .groupby('category_id', 'name') \
    .agg(count('film_id').alias('number_of_films')) \
    .select('category_id', 'name', 'number_of_films') \
    .orderBy(col('number_of_films').desc())

In [10]:
first.show(truncate=False)

+-----------+-----------+---------------+
|category_id|name       |number_of_films|
+-----------+-----------+---------------+
|15         |Sports     |74             |
|9          |Foreign    |73             |
|8          |Family     |69             |
|6          |Documentary|68             |
|2          |Animation  |66             |
|1          |Action     |64             |
|13         |New        |63             |
|7          |Drama      |62             |
|10         |Games      |61             |
|14         |Sci-Fi     |61             |
|3          |Children   |60             |
|5          |Comedy     |58             |
|16         |Travel     |57             |
|4          |Classics   |57             |
|11         |Horror     |56             |
|12         |Music      |51             |
+-----------+-----------+---------------+



In [11]:
second = actor \
    .join(film_actor, on='actor_id') \
    .join(film, on='film_id') \
    .groupby('actor_id', 'first_name', 'last_name') \
    .agg(sum('rental_duration').alias('sum_of_rental_duration')) \
    .select('actor_id', 'first_name', 'last_name', 'sum_of_rental_duration') \
    .orderBy(col('sum_of_rental_duration').desc()) \
    .limit(10)

In [12]:
second.show(truncate=False)

+--------+----------+---------+----------------------+
|actor_id|first_name|last_name|sum_of_rental_duration|
+--------+----------+---------+----------------------+
|107     |GINA      |DEGENERES|209                   |
|102     |WALTER    |TORN     |201                   |
|198     |MARY      |KEITEL   |192                   |
|181     |MATTHEW   |CARREY   |190                   |
|65      |ANGELA    |HUDSON   |183                   |
|106     |GROUCHO   |DUNST    |183                   |
|23      |SANDRA    |KILMER   |181                   |
|60      |HENRY     |BERRY    |180                   |
|13      |UMA       |WOOD     |179                   |
|119     |WARREN    |JACKMAN  |178                   |
+--------+----------+---------+----------------------+



In [13]:
third = category \
    .join(film_category, on='category_id') \
    .join(film, on='film_id') \
    .groupby('category_id', 'name') \
    .agg(sum('replacement_cost').alias('sum_of_replacement_cost')) \
    .select('category_id', 'name', 'sum_of_replacement_cost') \
    .orderBy(col('sum_of_replacement_cost').desc()) \
    .limit(1)

In [14]:
third.show(truncate=False)

+-----------+------+-----------------------+
|category_id|name  |sum_of_replacement_cost|
+-----------+------+-----------------------+
|15         |Sports|1509.26                |
+-----------+------+-----------------------+



In [15]:
fourth = film \
    .join(inventory, on='film_id', how='leftanti') \
    .select('film_id', 'title')

In [16]:
fourth.show(truncate=False)

+-------+---------------------+
|film_id|title                |
+-------+---------------------+
|148    |CHOCOLATE DUCK       |
|108    |BUTCH PANTHER        |
|950    |VOLUME HOUSE         |
|642    |ORDER BETRAYED       |
|874    |TADPOLE PARK         |
|497    |KILL BROTHERHOOD     |
|332    |FRANKENSTEIN STRANGER|
|192    |CROSSING DIVORCE     |
|860    |SUICIDES SILENCE     |
|128    |CATCH AMISTAD        |
|671    |PERDITION FARGO      |
|325    |FLOATS GARDEN        |
|386    |GUMP DATE            |
|955    |WALLS ARTIST         |
|359    |GLADIATOR WESTWARD   |
|419    |HOCUS FRIDA          |
|41     |ARSENIC INDEPENDENCE |
|607    |MUPPET MILE          |
|318    |FIREHOUSE VIETNAM    |
|742    |ROOF CHAMPION        |
+-------+---------------------+
only showing top 20 rows



In [17]:
actors_with_number_of_children_films = actor \
    .join(film_actor, on='actor_id') \
    .join(film, on='film_id') \
    .join(film_category, on='film_id') \
    .join(category, on='category_id') \
    .where(col('name') == 'Children') \
    .groupby('actor_id', 'first_name', 'last_name') \
    .agg(count('film_id').alias('number_of_children_films')) \
    .select('actor_id', 'first_name', 'last_name', 'number_of_children_films')

In [18]:
top_3_distinct_numbers_of_children_films = actors_with_number_of_children_films \
    .select(col('number_of_children_films')) \
    .distinct() \
    .orderBy(col('number_of_children_films').desc()) \
    .limit(3)

In [19]:
fifth = actors_with_number_of_children_films \
    .join(top_3_distinct_numbers_of_children_films, on='number_of_children_films', how='semi') \
    .select('actor_id', 'first_name', 'last_name', 'number_of_children_films') \
    .orderBy(col('number_of_children_films').desc())

In [20]:
fifth.show(truncate=False)

+--------+----------+---------+------------------------+
|actor_id|first_name|last_name|number_of_children_films|
+--------+----------+---------+------------------------+
|17      |HELEN     |VOIGHT   |7                       |
|140     |WHOOPI    |HURT     |5                       |
|80      |RALPH     |CRUZ     |5                       |
|127     |KEVIN     |GARLAND  |5                       |
|66      |MARY      |TANDY    |5                       |
|81      |SCARLETT  |DAMON    |4                       |
|23      |SANDRA    |KILMER   |4                       |
|109     |SYLVESTER |DERN     |4                       |
|187     |RENEE     |BALL     |4                       |
|92      |KIRSTEN   |AKROYD   |4                       |
|101     |SUSAN     |DAVIS    |4                       |
|13      |UMA       |WOOD     |4                       |
|58      |CHRISTIAN |AKROYD   |4                       |
|93      |ELLEN     |PRESLEY  |4                       |
|173     |ALAN      |DREYFUSS |

In [21]:
count_filter = lambda condition: sum(when(condition, 1).otherwise(0))

In [22]:
sixth = city \
    .join(address, on='city_id') \
    .join(customer, on='address_id') \
    .groupby('city_id', 'city') \
    .agg(count_filter(col('activebool') == True).alias('number_of_active_customers'),
         count_filter(col('activebool') == False).alias('number_of_inactive_customers')) \
    .select('city_id', 'city', 'number_of_active_customers', 'number_of_inactive_customers') \
    .orderBy(col('number_of_inactive_customers').desc())

In [23]:
sixth.show(truncate=False)

+-------+-----------------------+--------------------------+----------------------------+
|city_id|city                   |number_of_active_customers|number_of_inactive_customers|
+-------+-----------------------+--------------------------+----------------------------+
|463    |Sasebo                 |1                         |0                           |
|148    |Duisburg               |1                         |0                           |
|471    |Shenzhen               |1                         |0                           |
|496    |Southport              |1                         |0                           |
|243    |Jodhpur                |1                         |0                           |
|392    |Paarl                  |1                         |0                           |
|540    |Tongliao               |1                         |0                           |
|31     |Arak                   |1                         |0                           |
|516    |T

In [24]:
seconds_in_hour = 3600
categories_with_rental_period_for_city = category \
    .join(film_category, on='category_id') \
    .join(film, on='film_id') \
    .join(inventory, on='film_id') \
    .join(rental, on='inventory_id') \
    .join(customer, on='customer_id') \
    .join(address, on='address_id') \
    .join(city, on='city_id') \
    .groupby('category_id', 'name', 'city_id', 'city') \
    .agg(sum((col('return_date').cast('long') - col('rental_date').cast('long')) / seconds_in_hour).alias('sum_of_rental_hours')) \
    .select('category_id', 'name', 'city', 'sum_of_rental_hours')

In [25]:
first_part = categories_with_rental_period_for_city \
    .where(col('city').like('%a%')) \
    .select('category_id', 'name', 'sum_of_rental_hours') \
    .limit(1)

In [26]:
second_part = categories_with_rental_period_for_city \
    .where(col('city').like('%-%')) \
    .select('category_id', 'name', 'sum_of_rental_hours') \
    .limit(1)

In [27]:
seventh = first_part.unionAll(second_part)

In [28]:
seventh.show(truncate=False)

+-----------+-----+-------------------+
|category_id|name |sum_of_rental_hours|
+-----------+-----+-------------------+
|10         |Games|208.25             |
|12         |Music|221.93333333333334 |
+-----------+-----+-------------------+

