In [1]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Downloading python_dotenv-1.2.1-py3-none-any.whl (21 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.2.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName("PracticeThroughPostgres").getOrCreate()
spark

In [5]:
from dotenv import dotenv_values

config = dotenv_values(".env")

jdbc_url = f"jdbc:postgresql://host.docker.internal:5432/{config['POSTGRES_DB_NAME']}"

db_properties = {
    "user": config["POSTGRES_USER"],
    "password": config["POSTGRES_PASSWORD"],
    "driver": "org.postgresql.Driver"
}

In [6]:
TABLE_NAME="actor"

try:
    df_postgres = spark.read.jdbc(
        url=jdbc_url,
        table=TABLE_NAME,
        properties=db_properties
    )
    df_postgres.printSchema()
    df_postgres.show(5)

except Exception as e:
    print(f"An error by connection or by reading the data: {e}")

root
 |-- actor_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- last_update: timestamp (nullable = true)

+--------+----------+------------+--------------------+
|actor_id|first_name|   last_name|         last_update|
+--------+----------+------------+--------------------+
|       1|  Penelope|     Guiness|2013-05-26 14:47:...|
|       2|      Nick|    Wahlberg|2013-05-26 14:47:...|
|       3|        Ed|       Chase|2013-05-26 14:47:...|
|       4|  Jennifer|       Davis|2013-05-26 14:47:...|
|       5|    Johnny|Lollobrigida|2013-05-26 14:47:...|
+--------+----------+------------+--------------------+
only showing top 5 rows



In [7]:
del df_postgres

In [8]:
table_list = [
    "actor", "film_actor", "film_category", "category", "inventory", 
    "rental", "payment", "customer", "address", "city", "film"
]

dataframes = {}

for table_name in table_list:
    df = spark.read.jdbc(
        url=jdbc_url,
        table=table_name,
        properties=db_properties
    )
    dataframes[f"df_{table_name}"] = df
    print(f"DataFrame: df_{table_name} ({df.count()} rows)")

DataFrame: df_actor (200 rows)
DataFrame: df_film_actor (5462 rows)
DataFrame: df_film_category (1000 rows)
DataFrame: df_category (16 rows)
DataFrame: df_inventory (4581 rows)
DataFrame: df_rental (16044 rows)
DataFrame: df_payment (14596 rows)
DataFrame: df_customer (599 rows)
DataFrame: df_address (603 rows)
DataFrame: df_city (600 rows)
DataFrame: df_film (1000 rows)


In [12]:
# -- Output the number of movies in each category, sorted descending.
df_category_counts = (
    dataframes["df_film_category"].alias("fc")
    .join(dataframes["df_category"].alias("c"), F.col("fc.category_id") == F.col("c.category_id"), "inner")
    .groupBy(F.col("c.name"))
    .agg(
        F.count(F.col("fc.film_id")).alias("number_of_movies")
    )
    .orderBy(F.desc("number_of_movies"))
)

df_category_counts.show(truncate=False)

+-----------+----------------+
|name       |number_of_movies|
+-----------+----------------+
|Sports     |74              |
|Foreign    |73              |
|Family     |69              |
|Documentary|68              |
|Animation  |66              |
|Action     |64              |
|New        |63              |
|Drama      |62              |
|Games      |61              |
|Sci-Fi     |61              |
|Children   |60              |
|Comedy     |58              |
|Travel     |57              |
|Classics   |57              |
|Horror     |56              |
|Music      |51              |
+-----------+----------------+



In [11]:
# -- Output the 10 actors whose movies rented the most, sorted in descending order.
df_actor_names = dataframes["df_actor"].select(
    F.col("actor_id"),
    F.concat(F.col("first_name"), F.lit(" "), F.col("last_name")).alias("actor_name")
)

df_top_actors = (
    df_actor_names.alias("an")
    .join(dataframes["df_film_actor"].alias("fa"), ["actor_id"], "inner")
    .join(dataframes["df_inventory"].alias("i"), ["film_id"], "inner")
    .join(dataframes["df_rental"].alias("r"), ["inventory_id"], "inner")
    .groupBy(F.col("an.actor_name"))
    .agg(
        F.count(F.col("r.rental_id")).alias("number_of_rentals")
    )
    .orderBy(F.desc("number_of_rentals"))
    .limit(10)
)

df_top_actors.show(truncate=False)

+------------------+-----------------+
|actor_name        |number_of_rentals|
+------------------+-----------------+
|Susan Davis       |825              |
|Gina Degeneres    |753              |
|Matthew Carrey    |678              |
|Mary Keitel       |674              |
|Angela Witherspoon|654              |
|Walter Torn       |640              |
|Henry Berry       |612              |
|Jayne Nolte       |611              |
|Val Bolger        |605              |
|Sandra Kilmer     |604              |
+------------------+-----------------+



In [13]:
# -- Output the category of movies on which the most money was spent.
df_top_category_money = (
    dataframes["df_category"].alias("c")
    .join(dataframes["df_film_category"].alias("fc"), ["category_id"], "inner")
    .join(dataframes["df_inventory"].alias("i"), ["film_id"], "inner")
    .join(dataframes["df_rental"].alias("r"), ["inventory_id"], "inner")
    .join(dataframes["df_payment"].alias("p"), ["rental_id"], "inner")
    .groupBy(F.col("c.name").alias("category_name"))
    .agg(
        F.sum(F.col("p.amount")).alias("money_per_category")
    )
    .orderBy(F.desc("money_per_category"))
    .limit(1)
)

df_top_category_money.show(truncate=False)

+-------------+------------------+
|category_name|money_per_category|
+-------------+------------------+
|Sports       |4892.19           |
+-------------+------------------+



In [17]:
# -- Print the names of movies that are not in the inventory.
df_not_in_inventory = (
    dataframes["df_film"].alias("f")
    .join(dataframes["df_inventory"].alias("i"), ["film_id"], "left_anti")
    .select(F.col("f.title"))
)

df_not_in_inventory.show(df_not_in_inventory.count(), truncate=False)

+----------------------+
|title                 |
+----------------------+
|Chocolate Duck        |
|Butch Panther         |
|Volume House          |
|Order Betrayed        |
|Tadpole Park          |
|Kill Brotherhood      |
|Frankenstein Stranger |
|Crossing Divorce      |
|Suicides Silence      |
|Catch Amistad         |
|Perdition Fargo       |
|Floats Garden         |
|Gump Date             |
|Walls Artist          |
|Gladiator Westward    |
|Hocus Frida           |
|Arsenic Independence  |
|Muppet Mile           |
|Firehouse Vietnam     |
|Roof Champion         |
|Dazed Punk            |
|Pearl Destiny         |
|Rainbow Shock         |
|Kentuckian Giant      |
|Boondock Ballroom     |
|Commandments Express  |
|Hate Handicap         |
|Ark Ridgemont         |
|Crowds Telemark       |
|Deliverance Mulholland|
|Raiders Antitrust     |
|Sister Freddy         |
|Villain Desperate     |
|Apollo Teen           |
|Alice Fantasia        |
|Crystal Breaking      |
|Treasure Command      |


In [18]:
# -- Output the top 3 actors who have appeared the most in movies in the “Children” category. 
# -- If several actors have the same number of movies, output all of them.

from pyspark.sql import Window

df_children_actors = (
    dataframes["df_actor"].alias("a")
    .join(dataframes["df_film_actor"].alias("fa"), ["actor_id"], "inner")
    .join(dataframes["df_film_category"].alias("fc"), ["film_id"], "inner")
    .join(dataframes["df_category"].alias("c"), ["category_id"], "inner")
    .filter(F.col("c.name") == "Children")
    .groupBy(F.concat(F.col("a.first_name"), F.lit(" "), F.col("a.last_name")).alias("actor_name"))
    .agg(
        F.count(F.col("fc.film_id")).alias("number_of_movies")
    )
)

window_spec = Window.orderBy(F.desc("number_of_movies"))

df_top_children_actors = (
    df_children_actors
    .withColumn("actor_rank", F.dense_rank().over(window_spec))
    .filter(F.col("actor_rank") <= 3)
    .orderBy("actor_rank", F.desc("number_of_movies"))
)

df_top_children_actors.show(truncate=False)

+-------------+----------------+----------+
|actor_name   |number_of_movies|actor_rank|
+-------------+----------------+----------+
|Helen Voight |7               |1         |
|Susan Davis  |6               |2         |
|Kevin Garland|5               |3         |
|Ralph Cruz   |5               |3         |
|Mary Tandy   |5               |3         |
|Whoopi Hurt  |5               |3         |
+-------------+----------------+----------+



In [23]:
# -- Output cities with the number of active and inactive customers (active - customer.active = 1). 
# -- Sort by the number of inactive customers in descending order.

df_city_activity = (
    dataframes["df_city"].alias("ct")
    .join(dataframes["df_address"].alias("ad"), ["city_id"], "inner")
    .join(dataframes["df_customer"].alias("cm"), ["address_id"], "inner")
    .groupBy(F.col("ct.city"))
    .agg(
        F.sum(F.col("cm.active")).alias("active"),
        F.sum(F.when(F.col("cm.active") == 1, 0).otherwise(1)).alias("inactive")
    )
    .orderBy(F.desc("inactive"))
)

df_city_activity.show(truncate=False)

+------------------+------+--------+
|city              |active|inactive|
+------------------+------+--------+
|Uluberia          |0     |1       |
|Najafabad         |0     |1       |
|Pingxiang         |0     |1       |
|Xiangfan          |0     |1       |
|Kumbakonam        |0     |1       |
|Szkesfehrvr       |0     |1       |
|Charlotte Amalie  |0     |1       |
|Kamyin            |0     |1       |
|Daxian            |0     |1       |
|Coatzacoalcos     |0     |1       |
|Wroclaw           |0     |1       |
|Ktahya            |0     |1       |
|Bat Yam           |0     |1       |
|Southend-on-Sea   |0     |1       |
|Amroha            |0     |1       |
|A Corua (La Corua)|1     |0       |
|Fengshan          |1     |0       |
|Chisinau          |1     |0       |
|Linz              |1     |0       |
|Udaipur           |1     |0       |
+------------------+------+--------+
only showing top 20 rows



In [21]:
# -- Output the category of movies that have the highest number of total rental hours 
# -- in the city (customer.address_id in this city) and that start with the letter “a”. 
# -- Do the same for cities that have a “-” in them. 
# -- Write everything in one query.

df_duration_agg = (
    dataframes["df_category"].alias("c")
    .join(dataframes["df_film_category"].alias("fc"), ["category_id"], "inner")
    .join(dataframes["df_inventory"].alias("i"), ["film_id"], "inner")
    .join(dataframes["df_rental"].alias("r"), ["inventory_id"], "inner")
    .join(dataframes["df_customer"].alias("cm"), ["customer_id"], "inner")
    .join(dataframes["df_address"].alias("ad"), ["address_id"], "inner")
    .join(dataframes["df_city"].alias("ct"), ["city_id"], "inner")
    .filter(
        (F.col("ct.city").rlike("(?i)^a.*")) | (F.col("ct.city").contains("-"))
    )
    .groupBy(F.col("ct.city"), F.col("c.name").alias("category_name"))
    .agg(
        F.sum((F.col("r.return_date").cast("long") - F.col("r.rental_date").cast("long"))).alias("duration_seconds")
    )
)

window_spec = Window.partitionBy("city").orderBy(F.desc("duration_seconds"))

df_max_duration = (
    df_duration_agg
    .withColumn("max_duration", F.max("duration_seconds").over(Window.partitionBy("city")))
    .filter(F.col("duration_seconds") == F.col("max_duration"))
    .select("category_name", "city", "duration_seconds")
    .orderBy("city")
)

df_max_duration.show(df_max_duration.count(), truncate=False)

+-------------+-----------------------+----------------+
|category_name|city                   |duration_seconds|
+-------------+-----------------------+----------------+
|Comedy       |A Corua (La Corua)     |2133900         |
|Sci-Fi       |Abha                   |1981560         |
|Sci-Fi       |Abu Dhabi              |1783140         |
|Drama        |Acua                   |1989180         |
|Comedy       |Adana                  |1919400         |
|Family       |Addis Abeba            |1904760         |
|New          |Aden                   |2862840         |
|Children     |Adoni                  |1473060         |
|Children     |Ahmadnagar             |2116560         |
|Children     |Akishima               |2349540         |
|Sports       |Akron                  |1927920         |
|Comedy       |Alessandria            |1369800         |
|New          |Allappuzha (Alleppey)  |2003220         |
|Travel       |Allende                |2468940         |
|Sports       |Almirante Brown 

In [27]:
spark.stop()

In [31]:
%reset -f

In [32]:
%who

Interactive namespace is empty.
