In [1]:
from pyspark.sql import SparkSession

# JDBC driver path
jdbc_driver_path = r"E:\innowiseHW\sparkhw\spark-pagila\postgresql-42.6.0.jar"

# Create Spark session once
spark = SparkSession.builder \
    .appName("Spark Pagila") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

# JDBC connection options (reuse this dictionary)
jdbc_url = "jdbc:postgresql://localhost:5433/pagila"
jdbc_options = {
    "url": jdbc_url,
    "user": "postgres",
    "password": "123456",
    "driver": "org.postgresql.Driver"
}

# Function to read a table
def read_table(table_name):
    return spark.read.format("jdbc") \
        .options(**jdbc_options, dbtable=table_name) \
        .load()


In [11]:
film_df = read_table('film')
film_df.show(10)

+-------+----------------+--------------------+------------+-----------+--------------------+---------------+-----------+------+----------------+------+--------------------+--------------------+--------------------+
|film_id|           title|         description|release_year|language_id|original_language_id|rental_duration|rental_rate|length|replacement_cost|rating|         last_update|    special_features|            fulltext|
+-------+----------------+--------------------+------------+-----------+--------------------+---------------+-----------+------+----------------+------+--------------------+--------------------+--------------------+
|      1|ACADEMY DINOSAUR|A Epic Drama of a...|        2012|          1|                null|              6|       0.99|    86|           20.99|    PG|2022-09-10 21:46:...|[Deleted Scenes, ...|'academi':1 'batt...|
|      2|  ACE GOLDFINGER|A Astounding Epis...|        2023|          1|                null|              3|       4.99|    48|        

(a) Number of movies in each category, sorted descending

In [3]:
from pyspark.sql.functions import col, count

film_category_df = read_table("film_category")
category_df = read_table("category")

movies_per_category = film_category_df \
    .join(category_df, 'category_id') \
    .groupby('name') \
    .agg(count('film_id').alias('num_of_movies')) \
    .orderBy(col('num_of_movies').desc())
    
movies_per_category.show()

+-----------+-------------+
|       name|num_of_movies|
+-----------+-------------+
|      Drama|          152|
|      Music|          152|
|     Travel|          151|
|    Foreign|          150|
|      Games|          150|
|   Children|          150|
|     Action|          149|
|     Sci-Fi|          149|
|  Animation|          148|
|     Family|          147|
|   Classics|          147|
|        New|          147|
|     Sports|          145|
|Documentary|          145|
|     Comedy|          143|
|     Horror|          142|
+-----------+-------------+



(b) Top 10 actors whose movies were rented the most

In [4]:
from pyspark.sql.functions import desc

rental_df = read_table("rental")
inventory_df = read_table("inventory")
film_actor_df = read_table("film_actor")
actor_df = read_table("actor")

actor_rentals = rental_df \
    .join(inventory_df, "inventory_id") \
    .join(film_actor_df, "film_id") \
    .join(actor_df, "actor_id") \
    .groupBy("first_name", "last_name") \
    .count() \
    .orderBy(desc("count")) \
    .limit(10)

actor_rentals.show()


+----------+-----------+-----+
|first_name|  last_name|count|
+----------+-----------+-----+
|     SUSAN|      DAVIS|  825|
|      GINA|  DEGENERES|  753|
|   MATTHEW|     CARREY|  678|
|      MARY|     KEITEL|  674|
|    ANGELA|WITHERSPOON|  654|
|    WALTER|       TORN|  640|
|     HENRY|      BERRY|  612|
|     JAYNE|      NOLTE|  611|
|       VAL|     BOLGER|  605|
|    SANDRA|     KILMER|  604|
+----------+-----------+-----+



(c) Category of movies on which the most money was spent

In [5]:
payment_df = read_table("payment")

category_revenue = payment_df \
    .join(rental_df, 'rental_id') \
    .join(inventory_df, 'inventory_id') \
    .join(film_category_df, 'film_id') \
    .join(category_df, 'category_id') \
    .groupBy('name') \
    .sum('amount') \
    .orderBy(desc("sum(amount)"))
    
category_revenue.show()

+-----------+-----------+
|       name|sum(amount)|
+-----------+-----------+
|    Foreign|   10507.67|
|   Children|   10437.05|
|  Animation|   10369.55|
|Documentary|   10307.29|
|     Action|   10289.60|
|      Music|   10188.81|
|     Sci-Fi|   10054.10|
|        New|    9915.24|
|     Sports|    9902.86|
|      Games|    9848.23|
|     Horror|    9807.69|
|     Travel|    9793.98|
|   Classics|    9708.77|
|     Family|    9703.84|
|      Drama|    9522.54|
|     Comedy|    9181.93|
+-----------+-----------+



(d) Names of movies that are not in the inventory

In [12]:

film_inventory_df = inventory_df.select("film_id").distinct()
films_not_in_inventory = film_df.join(film_inventory_df, "film_id", "left_anti")
films_not_in_inventory.select('title').show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



(e) Top 3 actors in “Children” category

In [13]:
children_category_id = category_df.filter(col('name') == 'Children').select('category_id').collect()[0][0]
top_actors_children = film_actor_df \
    .join(film_category_df.filter(col("category_id") == children_category_id), "film_id") \
    .join(actor_df, "actor_id") \
    .groupBy("first_name", "last_name") \
    .count() \
    .orderBy(desc("count")) \
    .limit(3)

top_actors_children.show()

+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|    SIDNEY|    CROWE|    9|
|   RICHARD|     PENN|    9|
|      EWAN|  GOODING|    9|
+----------+---------+-----+



(f) Cities with active/inactive customers

In [14]:
from pyspark.sql.functions import col, sum as spark_sum, desc

customer_df = read_table('customer')
address_df = read_table('address')
city_df = read_table('city')
customer_with_address = customer_df.join(address_df, 'address_id').join(city_df, 'city_id')

cities_activity = customer_with_address.groupBy("city") \
    .agg(
        spark_sum(col("active")).alias("active_customers"),
        spark_sum(1 - col("active")).alias("inactive_customers")
    ).orderBy(desc("inactive_customers"))

cities_activity.show()

+------------------+----------------+------------------+
|              city|active_customers|inactive_customers|
+------------------+----------------+------------------+
|          Uluberia|               0|                 1|
|         Najafabad|               0|                 1|
|         Pingxiang|               0|                 1|
|          Xiangfan|               0|                 1|
|        Kumbakonam|               0|                 1|
|       Szkesfehrvr|               0|                 1|
|  Charlotte Amalie|               0|                 1|
|            Kamyin|               0|                 1|
|            Daxian|               0|                 1|
|     Coatzacoalcos|               0|                 1|
|           Wroclaw|               0|                 1|
|            Ktahya|               0|                 1|
|           Bat Yam|               0|                 1|
|   Southend-on-Sea|               0|                 1|
|            Amroha|           

(g) Category with highest rental hours in cities (names start with “a” or contain “-”)

In [15]:
rental_info = rental_df \
    .join(inventory_df, "inventory_id") \
    .join(film_category_df, "film_id") \
    .join(category_df, "category_id") \
    .join(customer_with_address, "customer_id") \
    .withColumn("rental_hours", col("return_date").cast("long") - col("rental_date").cast("long"))


rental_info.filter(col("city").startswith("A")) \
    .groupBy("city", "name") \
    .agg(spark_sum("rental_hours").alias("total_hours")) \
    .orderBy("city", desc("total_hours")) \
    .show()


rental_info.filter(col("city").contains("-")) \
    .groupBy("city", "name") \
    .agg(spark_sum("rental_hours").alias("total_hours")) \
    .orderBy("city", desc("total_hours")) \
    .show()


+------------------+-----------+-----------+
|              city|       name|total_hours|
+------------------+-----------+-----------+
|A Corua (La Corua)|     Sci-Fi|    3354900|
|A Corua (La Corua)|     Family|    2914620|
|A Corua (La Corua)|   Children|    2829000|
|A Corua (La Corua)|      Drama|    2543520|
|A Corua (La Corua)|    Foreign|    2330520|
|A Corua (La Corua)|      Music|    2326800|
|A Corua (La Corua)|Documentary|    2166480|
|A Corua (La Corua)|      Games|    2072700|
|A Corua (La Corua)|  Animation|    1813980|
|A Corua (La Corua)|     Sports|    1638120|
|A Corua (La Corua)|        New|    1632720|
|A Corua (La Corua)|     Travel|    1544340|
|A Corua (La Corua)|   Classics|    1482120|
|A Corua (La Corua)|     Comedy|    1394880|
|A Corua (La Corua)|     Horror|    1279740|
|A Corua (La Corua)|     Action|     389220|
|              Abha|     Action|    4265700|
|              Abha|    Foreign|    3119040|
|              Abha|   Children|    2407020|
|         