In [197]:
%pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when
from collections import namedtuple
import re

#del list

class TableLoader:
    def __init__(self, spark, table_names):
        self.spark = spark
        self.table_names = table_names
        self.load_tables()

    def load_tables(self):
        for table_name in self.table_names:
            setattr(self, table_name, (
                self.spark.read
                .format("jdbc")
                .option("driver", "org.postgresql.Driver")
                .option("url", "jdbc:postgresql://pagila:5432/postgres")
                .option("user", "postgres")
                .option("password", "123456")
                .option("dbtable", table_name)
                .load()
            ))

file_path = 'pagila-insert-data.sql'
with open(file_path, 'r') as file:
    sql_content = file.read()


# Creating a List of tables
pattern = re.compile(r'INSERT INTO\s+public\.(\w+)', re.IGNORECASE)
matches = pattern.findall(sql_content)
unique_tables = sorted(set(matches))
tables_with_duplicates = ["payment" if table.startswith("payment") else table for table in unique_tables]
tables = list(set(tables_with_duplicates))

df = TableLoader(spark, tables)

spark = (
    SparkSession.builder
    .config("spark.jars", "/jars/postgresql-42.7.4.jar")
    .appName("pomogi_snachala_sebe")
    .getOrCreate()
)

Note: you may need to restart the kernel to use updated packages.


In [198]:
# Вывести количество фильмов в каждой категории, отсортировать по убыванию.

count_films_by_category = (
    df.category
    .join(df.film_category, df.category.category_id == df.film_category.category_id, how="left")
    .join(df.film, df.film_category.film_id == df.film.film_id, how="left")
    .groupBy(df.category.name).count()
    .sort("count", ascending=False)
)
count_films_by_category.show()

+-----------+-----+
|       name|count|
+-----------+-----+
|      Drama|  152|
|      Music|  152|
|     Travel|  151|
|    Foreign|  150|
|      Games|  150|
|   Children|  150|
|     Action|  149|
|     Sci-Fi|  149|
|  Animation|  148|
|     Family|  147|
|   Classics|  147|
|        New|  147|
|     Sports|  145|
|Documentary|  145|
|     Comedy|  143|
|     Horror|  142|
+-----------+-----+



In [199]:
# Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

most_rental_actors = (
    df.actor
    .join(df.film_actor, df.actor.actor_id == df.film_actor.actor_id, how="left")
    .join(df.film, df.film_actor.film_id == df.film.film_id, how="left")
    .join(df.inventory, df.film.film_id == df.inventory.film_id, how="left")
    .join(df.rental, df.inventory.inventory_id == df.rental.inventory_id, how="left")
    .groupBy(df.actor.actor_id, df.actor.first_name, df.actor.last_name)
    .count()
    .sort("count", ascending=False)
    .limit(10)
)
most_rental_actors.show()

+--------+----------+-----------+-----+
|actor_id|first_name|  last_name|count|
+--------+----------+-----------+-----+
|     107|      GINA|  DEGENERES|  753|
|     181|   MATTHEW|     CARREY|  680|
|     198|      MARY|     KEITEL|  675|
|     144|    ANGELA|WITHERSPOON|  654|
|     102|    WALTER|       TORN|  642|
|      60|     HENRY|      BERRY|  612|
|     150|     JAYNE|      NOLTE|  612|
|      23|    SANDRA|     KILMER|  605|
|      37|       VAL|     BOLGER|  605|
|      90|      SEAN|    GUINESS|  599|
+--------+----------+-----------+-----+



In [200]:
# Вывести категорию фильмов, на которую потратили больше всего денег.

most_expensive_category = (
    df.category
    .join(df.film_category, df.category.category_id == df.film_category.category_id, how="left")
    .join(df.film, df.film_category.film_id == df.film.film_id, how="left")
    .join(df.inventory, df.film.film_id == df.inventory.film_id, how="left")
    .join(df.rental, df.inventory.inventory_id == df.rental.inventory_id, how="left")
    .join(df.payment, df.rental.rental_id == df.payment.rental_id, how="left")
    .groupBy(df.category.category_id, df.category.name)
    .agg(F.sum(df.payment.amount).alias("total_spent"))
    .sort("total_spent", ascending=False)
    .limit(1)
)

most_expensive_category.show()

+-----------+-------+-----------+
|category_id|   name|total_spent|
+-----------+-------+-----------+
|          9|Foreign|   10507.67|
+-----------+-------+-----------+



In [201]:
# Вывести названия фильмов, которых нет в inventory.

not_in_inventory = (
    df.film
    .select("film_id", "title")
    .join(df.inventory, df.film.film_id == df.inventory.film_id, how="leftanti")
)

not_in_inventory.show(not_in_inventory.count(), truncate=False)

+-------+----------------------+
|film_id|title                 |
+-------+----------------------+
|148    |CHOCOLATE DUCK        |
|108    |BUTCH PANTHER         |
|950    |VOLUME HOUSE          |
|642    |ORDER BETRAYED        |
|874    |TADPOLE PARK          |
|497    |KILL BROTHERHOOD      |
|332    |FRANKENSTEIN STRANGER |
|192    |CROSSING DIVORCE      |
|860    |SUICIDES SILENCE      |
|128    |CATCH AMISTAD         |
|671    |PERDITION FARGO       |
|325    |FLOATS GARDEN         |
|386    |GUMP DATE             |
|955    |WALLS ARTIST          |
|359    |GLADIATOR WESTWARD    |
|419    |HOCUS FRIDA           |
|41     |ARSENIC INDEPENDENCE  |
|607    |MUPPET MILE           |
|318    |FIREHOUSE VIETNAM     |
|742    |ROOF CHAMPION         |
|217    |DAZED PUNK            |
|669    |PEARL DESTINY         |
|713    |RAINBOW SHOCK         |
|495    |KENTUCKIAN GIANT      |
|87     |BOONDOCK BALLROOM     |
|171    |COMMANDMENTS EXPRESS  |
|404    |HATE HANDICAP         |
|38     |A

In [202]:
# Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. 
# Если у нескольких актеров одинаковое кол-во фильмов, вывести всех.

actor_film_count = (
    df.actor
    .join(df.film_actor, df.actor.actor_id == df.film_actor.actor_id, how="left")
    .join(df.film, df.film_actor.film_id == df.film.film_id, how="left")
    .join(df.film_category, df.film.film_id == df.film_category.film_id, how="left")
    .join(df.category, df.film_category.category_id == df.category.category_id, how="left")
    .filter(df.category.name == "Children")
    .groupBy(df.actor.actor_id, df.actor.first_name, df.actor.last_name)
    .agg(F.count(df.category.category_id).alias("amount_of_children_films"))
)

window_spec = Window.orderBy(F.col("amount_of_children_films").desc())

ranked_actors = (
    actor_film_count
    .withColumn("rk", F.dense_rank().over(window_spec))
)

top_actors = ranked_actors.filter(F.col("rk") <= 3)

top_actors.select("actor_id", "first_name", "last_name", "amount_of_children_films").show(top_actors.count())

+--------+----------+---------+------------------------+
|actor_id|first_name|last_name|amount_of_children_films|
+--------+----------+---------+------------------------+
|     105|    SIDNEY|    CROWE|                       9|
|     139|      EWAN|  GOODING|                       9|
|     133|   RICHARD|     PENN|                       9|
|      87|   SPENCER|     PECK|                       8|
|     145|       KIM|    ALLEN|                       8|
|      66|      MARY|    TANDY|                       8|
|      29|      ALEC|    WAYNE|                       8|
|      56|       DAN|   HARRIS|                       8|
|     149|   RUSSELL|   TEMPLE|                       8|
|     181|   MATTHEW|   CARREY|                       8|
|     131|      JANE|  JACKMAN|                       8|
|     142|      JADA|    RYDER|                       8|
|      84|     JAMES|     PITT|                       7|
|     108|    WARREN|    NOLTE|                       7|
|     123|  JULIANNE|    DENCH|

In [203]:
# Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). 
# Отсортировать по количеству неактивных клиентов по убыванию.

city_customers = (
    df.city
    .join(df.address, df.city.city_id == df.address.city_id, how="left")
    .join(df.customer, df.address.address_id == df.customer.address_id, how="left")
    .groupBy(df.city.city_id, df.city.city)
    .agg(
        F.sum(when(df.customer.active == 1, 1).otherwise(0)).alias("active_customers"),
        F.sum(when(df.customer.active == 0, 1).otherwise(0)).alias("inactive_customers")
    )
    .sort("inactive_customers", ascending=False)
)

city_customers.show()


+-------+----------------+----------------+------------------+
|city_id|            city|active_customers|inactive_customers|
+-------+----------------+----------------+------------------+
|    577|         Wroclaw|               0|                 1|
|    578|        Xiangfan|               0|                 1|
|    111|Charlotte Amalie|               0|                 1|
|    259|          Kamyin|               0|                 1|
|    512|     Szkesfehrvr|               0|                 1|
|    139|          Daxian|               0|                 1|
|    283|      Kumbakonam|               0|                 1|
|     57|         Bat Yam|               0|                 1|
|    554|        Uluberia|               0|                 1|
|    495| Southend-on-Sea|               0|                 1|
|    356|       Najafabad|               0|                 1|
|     24|          Amroha|               0|                 1|
|    125|   Coatzacoalcos|               0|            

In [205]:
# Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), 
# и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”.
sec_in_hour = 3600

rental_hours = (
    df.city
    .join(df.address, df.city.city_id == df.address.city_id, how="left")
    .join(df.customer, df.address.address_id == df.customer.address_id, how="left")
    .join(df.rental, df.customer.customer_id == df.rental.customer_id, how="left")
    .join(df.inventory, df.rental.inventory_id == df.inventory.inventory_id, how="left")
    .join(df.film, df.inventory.film_id == df.film.film_id, how="left")
    .join(df.film_category, df.film.film_id == df.film_category.film_id, how="left")
    .join(df.category, df.film_category.category_id == df.category.category_id, how="left")
    .groupBy(df.category.name.alias("category_name"), df.city.city.alias("city_name"))
    .agg(F.sum((F.unix_timestamp(df.rental.return_date) - F.unix_timestamp(df.rental.rental_date)) / sec_in_hour).alias("total_hours"))
)

category_hours_a = (
    rental_hours
    .filter(rental_hours.city_name.startswith("A"))
    .groupBy(rental_hours.category_name)
    .agg(F.sum(rental_hours.total_hours).alias("category_total_hours"))
)

category_hours_dash = (
    rental_hours
    .filter((rental_hours.city_name).contains("-"))
    .groupBy(rental_hours.category_name)
    .agg(F.sum(rental_hours.total_hours).alias("category_total_hours"))
)

max_category_a = (
    category_hours_a
    .orderBy(category_hours_a.category_total_hours.desc())
    .limit(1)
)

max_category_dash = (
    category_hours_dash
    .orderBy(category_hours_dash.category_total_hours.desc())
    .limit(1)
)

final_result = (
    max_category_a.withColumn("category_type", F.lit("City starting with 'A'"))
    .union(max_category_dash.withColumn("category_type", F.lit("City with '-'")))
)

final_result.show(truncate=False)

+-------------+--------------------+----------------------+
|category_name|category_total_hours|category_type         |
+-------------+--------------------+----------------------+
|Children     |24427.999999999993  |City starting with 'A'|
|Drama        |14556.033333333335  |City with '-'         |
+-------------+--------------------+----------------------+

