In [12]:
import pyspark
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, count, sum, concat_ws
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when
from pyspark import Row

In [4]:
spark = SparkSession.builder \
    .appName("DB_Connection_Check") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.19") \
    .getOrCreate()

JDBC_URL = "jdbc:postgresql://host.docker.internal:5432/postgres"
USER = os.getenv("DB_USER")
PASSWORD = os.getenv("DB_PASSWORD") 
DRIVER = "org.postgresql.Driver"

jdbc_properties = {
    "user": USER,
    "password": PASSWORD,
    "driver": DRIVER
}

In [5]:
try:
    actor = spark.read.jdbc(url=JDBC_URL, table="actor", properties=jdbc_properties)
    address = spark.read.jdbc(url=JDBC_URL, table="address", properties=jdbc_properties)
    category = spark.read.jdbc(url=JDBC_URL, table="category", properties=jdbc_properties)
    city = spark.read.jdbc(url=JDBC_URL, table="city", properties=jdbc_properties)
    country = spark.read.jdbc(url=JDBC_URL, table="country", properties=jdbc_properties)
    customer = spark.read.jdbc(url=JDBC_URL, table="customer", properties=jdbc_properties)
    film = spark.read.jdbc(url=JDBC_URL, table="film", properties=jdbc_properties)
    film_actor = spark.read.jdbc(url=JDBC_URL, table="film_actor", properties=jdbc_properties)
    film_category = spark.read.jdbc(url=JDBC_URL, table="film_category", properties=jdbc_properties)
    inventory = spark.read.jdbc(url=JDBC_URL, table="inventory", properties=jdbc_properties)
    language = spark.read.jdbc(url=JDBC_URL, table="language", properties=jdbc_properties)
    payment = spark.read.jdbc(url=JDBC_URL, table="payment", properties=jdbc_properties)
    rental = spark.read.jdbc(url=JDBC_URL, table="rental", properties=jdbc_properties)
    staff = spark.read.jdbc(url=JDBC_URL, table="staff", properties=jdbc_properties)
    store = spark.read.jdbc(url=JDBC_URL, table="store", properties=jdbc_properties)

except Exception as e:
    print(f"Error: {e}")
    spark.stop()
    raise e

In [10]:
# 1. Output the number of movies in each category, sorted descending.

df_joined = film_category.join(  
    category,
    film_category["category_id"] == category["category_id"], 
    "inner"
)

df_result = df_joined.groupBy(col("name").alias("category_name")) \
                     .agg(count(col("film_id")).alias("film_count"))

df_final = df_result.orderBy(col("film_count").desc())

df_final.show(truncate=False)

+-------------+----------+
|category_name|film_count|
+-------------+----------+
|Sports       |74        |
|Foreign      |73        |
|Family       |69        |
|Documentary  |68        |
|Animation    |66        |
|Action       |64        |
|New          |63        |
|Drama        |62        |
|Games        |61        |
|Sci-Fi       |61        |
|Children     |60        |
|Comedy       |58        |
|Travel       |57        |
|Classics     |57        |
|Horror       |56        |
|Music        |51        |
+-------------+----------+



In [13]:
# 2. Output the 10 actors whose movies rented the most, sorted in descending order.

df_joined_1 = rental.join(
    inventory,
    on="inventory_id", 
    how="inner"
)

df_joined_2 = df_joined_1.join(
    film_actor,
    on="film_id",
    how="inner"
)

df_final_join = df_joined_2.join(
    actor,
    on="actor_id",
    how="inner"
)

df_grouped = df_final_join.groupBy("actor_id", "first_name", "last_name") \
                          .agg(count(col("rental_id")).alias("rental_count")) 

df_sorted = df_grouped.orderBy(col("rental_count").desc())

df_top_10 = df_sorted.limit(10)

df_result = df_top_10.withColumn(
    "actor_name",
    concat_ws(" ", col("first_name"), col("last_name"))
).select("actor_name", "rental_count")

df_result.show(truncate=False)

+------------------+------------+
|actor_name        |rental_count|
+------------------+------------+
|GINA DEGENERES    |753         |
|MATTHEW CARREY    |678         |
|MARY KEITEL       |674         |
|ANGELA WITHERSPOON|654         |
|WALTER TORN       |640         |
|HENRY BERRY       |612         |
|JAYNE NOLTE       |611         |
|VAL BOLGER        |605         |
|SANDRA KILMER     |604         |
|SEAN GUINESS      |599         |
+------------------+------------+



In [15]:
# 3. Output the category of movies on which the most money was spent.

df_joined_1 = film.join(
    film_category,
    on="film_id", 
    how="inner"
)

df_final_join = df_joined_1.join(
    category,
    on="category_id",
    how="inner"
)

df_grouped = df_final_join.groupBy("name") \
                          .agg(sum(col("replacement_cost")).alias("total_replacement_cost"))

df_sorted = df_grouped.orderBy(col("total_replacement_cost").desc())

df_top_1 = df_sorted.limit(1)

df_top_1.show(truncate=False)

+------+----------------------+
|name  |total_replacement_cost|
+------+----------------------+
|Sports|1509.26               |
+------+----------------------+



In [16]:
# 4. Print the names of movies that are not in the inventory. 
# Write a query without using the IN operator.

df_result = film\
            .join(inventory, on="film_id", how="left_anti")\
            .select("film_id", "title")\
            .sort(col("title"))

df_result.show(truncate=False)

+-------+----------------------+
|film_id|title                 |
+-------+----------------------+
|14     |ALICE FANTASIA        |
|33     |APOLLO TEEN           |
|36     |ARGONAUTS TOWN        |
|38     |ARK RIDGEMONT         |
|41     |ARSENIC INDEPENDENCE  |
|87     |BOONDOCK BALLROOM     |
|108    |BUTCH PANTHER         |
|128    |CATCH AMISTAD         |
|144    |CHINATOWN GLADIATOR   |
|148    |CHOCOLATE DUCK        |
|171    |COMMANDMENTS EXPRESS  |
|192    |CROSSING DIVORCE      |
|195    |CROWDS TELEMARK       |
|198    |CRYSTAL BREAKING      |
|217    |DAZED PUNK            |
|221    |DELIVERANCE MULHOLLAND|
|318    |FIREHOUSE VIETNAM     |
|325    |FLOATS GARDEN         |
|332    |FRANKENSTEIN STRANGER |
|359    |GLADIATOR WESTWARD    |
+-------+----------------------+
only showing top 20 rows



In [18]:
# 5. Output the top 3 actors who have appeared the most in movies in the “Children” category. 
# If several actors have the same number of movies, output all of them.

window_spec = Window.orderBy(F.desc("amount"))

df_joined_and_aggregated = actor\
            .join(film_actor, on="actor_id")\
            .join(film_category, on="film_id")\
            .join(category, on="category_id")\
            .filter(col("name") == "Children")\
            .groupBy("actor_id", "first_name", "last_name")\
            .agg(
                F.count("film_id").alias("amount")
            )

df_result = df_joined_and_aggregated\
            .withColumn(
                    "rnk",
                    F.dense_rank().over(window_spec)
            )\
            .filter(F.col("rnk") <= 3)\
            .sort(F.col("rnk").asc(), F.col("amount").desc())\
            .select("rnk", "first_name", "last_name", "amount")

df_result.show()

+---+----------+---------+------+
|rnk|first_name|last_name|amount|
+---+----------+---------+------+
|  1|     HELEN|   VOIGHT|     7|
|  2|     KEVIN|  GARLAND|     5|
|  2|     RALPH|     CRUZ|     5|
|  2|      MARY|    TANDY|     5|
|  2|    WHOOPI|     HURT|     5|
|  3|  SCARLETT|    DAMON|     4|
|  3| SYLVESTER|     DERN|     4|
|  3|    SANDRA|   KILMER|     4|
|  3|     RENEE|     BALL|     4|
|  3|   KIRSTEN|   AKROYD|     4|
|  3|      ALAN| DREYFUSS|     4|
|  3|     SUSAN|    DAVIS|     4|
|  3|     JAYNE|    NOLTE|     4|
|  3|       UMA|     WOOD|     4|
|  3|      JANE|  JACKMAN|     4|
|  3| CHRISTIAN|   AKROYD|     4|
|  3|      JADA|    RYDER|     4|
|  3|     ELLEN|  PRESLEY|     4|
|  3|       VAL|   BOLGER|     4|
+---+----------+---------+------+



In [19]:
# 6. Output cities with the number of active and inactive customers (active - customer.active = 1). 
# Sort by the number of inactive customers in descending order.

df_joined = city\
            .join(address, on="city_id")\
            .join(customer, on="address_id")

df_result = df_joined\
            .groupBy("city")\
            .agg(
                F.sum(
                    F.when(col("active") == 0, 1).otherwise(0)
            ).alias("inactive_customers"),
                F.sum(
                    F.when(col("active") == 1, 1).otherwise(0)
            ).alias("active_customers")
            )\
            .sort(col("inactive_customers").desc())

df_result.show()

+------------------+------------------+----------------+
|              city|inactive_customers|active_customers|
+------------------+------------------+----------------+
|          Uluberia|                 1|               0|
|         Najafabad|                 1|               0|
|         Pingxiang|                 1|               0|
|          Xiangfan|                 1|               0|
|        Kumbakonam|                 1|               0|
|       Szkesfehrvr|                 1|               0|
|  Charlotte Amalie|                 1|               0|
|            Kamyin|                 1|               0|
|            Daxian|                 1|               0|
|     Coatzacoalcos|                 1|               0|
|           Wroclaw|                 1|               0|
|            Ktahya|                 1|               0|
|           Bat Yam|                 1|               0|
|   Southend-on-Sea|                 1|               0|
|            Amroha|           

In [20]:
# 7. Output the category of movies that have the highest number of total rental 
# hours in the city (customer.address_id in this city) and that start with the letter “a”. 
# Do the same for cities that have a “-” in them. Write everything in one query.

df_joined = (
    city
        .join(address, on="city_id")
        .join(customer, on="address_id")
        .join(rental, on="customer_id")
        .join(inventory, on="inventory_id")
        .join(film_category, on="film_id")
        .join(category, on="category_id")
        .select(
            F.col("name").alias("category_name"),
            F.col("city"),
            F.col("rental_date"),
            F.col("return_date")
        )
        .filter(F.col("return_date").isNotNull())
        .withColumn(
            "rental_seconds",
            F.unix_timestamp(F.col("return_date")) - F.unix_timestamp(F.col("rental_date"))
        )
)

df_agg = df_joined.groupBy("category_name").agg(
    F.sum(
        F.when(
            F.lower(F.col("city")).like("a%"),
            F.col("rental_seconds")
        )
    ).alias("total_seconds_a_city"),
    F.sum(
        F.when(
            col("city").contains("-"),
            F.col("rental_seconds")
        )
    ).alias("total_seconds_dash_city")
)

df_total_hours = df_agg.withColumn(
    "total_hours_a_city",
    F.round(F.col("total_seconds_a_city") / 3600, 2)
).withColumn(
    "total_hours_dash_city",
    F.round(F.col("total_seconds_dash_city") / 3600, 2)
).drop(
    "total_seconds_a_city",
    "total_seconds_dash_city"
).sort(F.col("category_name"))

window_spec_a = Window.orderBy(F.col("total_hours_a_city").desc())
window_spec_dash = Window.orderBy(F.col("total_hours_dash_city").desc())

df_ranked = df_total_hours\
        .withColumn(
            "rank_a",
            F.rank().over(window_spec_a)
        )\
        .withColumn(
            "rank_dash",
            F.rank().over(window_spec_dash)
        )

df_ranked\
    .filter(F.col("rank_a") == 1)\
    .select(F.col("category_name"), F.col("total_hours_a_city"))\
    .show()

df_ranked\
    .filter(F.col("rank_dash") == 1)\
    .select(F.col("category_name"), F.col("total_hours_dash_city"))\
    .show()

+-------------+------------------+
|category_name|total_hours_a_city|
+-------------+------------------+
|       Sports|          12360.35|
+-------------+------------------+

+-------------+---------------------+
|category_name|total_hours_dash_city|
+-------------+---------------------+
|      Foreign|              6472.15|
+-------------+---------------------+

