In [32]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
    .config("spark.driver.extraClassPath", os.environ["JDBC_DRIVER_PATH"]) \
    .appName("Load PostgreSQL table into PySpark DataFrame") \
    .getOrCreate()

In [3]:
jdbc_url = f"jdbc:postgresql://postgres:5432/{os.environ['POSTGRES_DB']}"
connection_properties = {
    "user": os.environ["POSTGRES_USER"],
    "password": os.environ["POSTGRES_PASSWORD"]
}

Из демо PostgreSQL базы данных загрузите таблицы в PySpark DataFrame.

In [4]:
table_list = spark.read.jdbc(url=jdbc_url, table="pg_tables", properties=connection_properties) \
    .filter("schemaname = 'public' and tablename <> 'spatial_ref_sys'") \
    .select("tablename") \
    .rdd \
    .map(lambda x: x[0]) \
    .collect()

table_list

['film_actor',
 'address',
 'city',
 'actor',
 'category',
 'country',
 'customer',
 'film',
 'film_category',
 'inventory',
 'language',
 'store',
 'payment_p2022_07',
 'rental',
 'staff',
 'payment',
 'payment_p2022_03',
 'payment_p2022_04',
 'payment_p2022_05',
 'payment_p2022_06',
 'payment_p2022_01',
 'payment_p2022_02']

In [5]:
for table_name in table_list:
    table_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    print(f"Table {table_name} contains {table_df.count()} rows.")

Table film_actor contains 5462 rows.
Table address contains 603 rows.
Table city contains 600 rows.
Table actor contains 200 rows.
Table category contains 16 rows.
Table country contains 109 rows.
Table customer contains 599 rows.
Table film contains 1000 rows.
Table film_category contains 1000 rows.
Table inventory contains 4581 rows.
Table language contains 6 rows.
Table store contains 2 rows.
Table payment_p2022_07 contains 2334 rows.
Table rental contains 16044 rows.
Table staff contains 2 rows.
Table payment contains 16049 rows.
Table payment_p2022_03 contains 2713 rows.
Table payment_p2022_04 contains 2547 rows.
Table payment_p2022_05 contains 2677 rows.
Table payment_p2022_06 contains 2654 rows.
Table payment_p2022_01 contains 723 rows.
Table payment_p2022_02 contains 2401 rows.


In [6]:
table_df_dict = {}

for table_name in table_list:
    table_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    table_df_dict[table_name] = table_df


In [7]:
table_df_dict

{'film_actor': DataFrame[actor_id: int, film_id: int, last_update: timestamp],
 'address': DataFrame[address_id: int, address: string, address2: string, district: string, city_id: int, postal_code: string, phone: string, last_update: timestamp],
 'city': DataFrame[city_id: int, city: string, country_id: int, last_update: timestamp],
 'actor': DataFrame[actor_id: int, first_name: string, last_name: string, last_update: timestamp],
 'category': DataFrame[category_id: int, name: string, last_update: timestamp],
 'country': DataFrame[country_id: int, country: string, last_update: timestamp],
 'customer': DataFrame[customer_id: int, store_id: int, first_name: string, last_name: string, email: string, address_id: int, activebool: boolean, create_date: date, last_update: timestamp, active: int],
 'film': DataFrame[film_id: int, title: string, description: string, release_year: int, language_id: int, original_language_id: int, rental_duration: smallint, rental_rate: decimal(4,2), length: small

In [8]:
film_actor_df = table_df_dict["film_actor"]
address_df = table_df_dict["address"]
city_df = table_df_dict["city"]
actor_df = table_df_dict["actor"]
category_df = table_df_dict["category"]
country_df = table_df_dict["country"]
customer_df = table_df_dict["customer"]
film_df = table_df_dict["film"]
film_category_df = table_df_dict["film_category"]
inventory_df = table_df_dict["inventory"]
language_df = table_df_dict["language"]
store_df = table_df_dict["store"]
payment_p2022_07_df = table_df_dict["payment_p2022_07"]
rental_df = table_df_dict["rental"]
staff_df = table_df_dict["staff"]
payment_df = table_df_dict["payment"]
payment_p2022_03_df = table_df_dict["payment_p2022_03"]
payment_p2022_04_df = table_df_dict["payment_p2022_04"]
payment_p2022_05_df = table_df_dict["payment_p2022_05"]
payment_p2022_06_df = table_df_dict["payment_p2022_06"]
payment_p2022_01_df = table_df_dict["payment_p2022_01"]
payment_p2022_02_df = table_df_dict["payment_p2022_02"]

Используя эти данные (без SQL) напишите PySpark код для того, чтобы:

1. Вывести количество фильмов в каждой категории, отсортировать по убыванию.


In [9]:
result_1 = category_df.join(film_category_df, "category_id") \
                      .join(film_category_df, "film_id") \
                      .groupBy("name") \
                      .agg(count("*").alias("count_films")) \
                      .orderBy("count_films", ascending=False)

result_1.show()


+-----------+-----------+
|       name|count_films|
+-----------+-----------+
|     Sports|         74|
|    Foreign|         73|
|     Family|         69|
|Documentary|         68|
|  Animation|         66|
|     Action|         64|
|        New|         63|
|      Drama|         62|
|      Games|         61|
|     Sci-Fi|         61|
|   Children|         60|
|     Comedy|         58|
|     Travel|         57|
|   Classics|         57|
|     Horror|         56|
|      Music|         51|
+-----------+-----------+




2. Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.


In [10]:
result_2 = actor_df.join(film_actor_df, "actor_id") \
    .join(inventory_df, "film_id") \
    .join(rental_df, "inventory_id") \
    .groupBy("actor_id", "first_name", "last_name") \
    .agg(count("*").alias("rental_count")) \
    .orderBy("rental_count", ascending=False) \
    .limit(10)

result_2.show()

+--------+----------+-----------+------------+
|actor_id|first_name|  last_name|rental_count|
+--------+----------+-----------+------------+
|     107|      GINA|  DEGENERES|         753|
|     181|   MATTHEW|     CARREY|         678|
|     198|      MARY|     KEITEL|         674|
|     144|    ANGELA|WITHERSPOON|         654|
|     102|    WALTER|       TORN|         640|
|      60|     HENRY|      BERRY|         612|
|     150|     JAYNE|      NOLTE|         611|
|      37|       VAL|     BOLGER|         605|
|      23|    SANDRA|     KILMER|         604|
|      90|      SEAN|    GUINESS|         599|
+--------+----------+-----------+------------+



3. Вывести категорию фильмов, на которую потратили больше всего денег.



In [11]:
result_3 = payment_df.join(rental_df, "rental_id") \
                     .join(inventory_df, "inventory_id") \
                     .join(film_df, "film_id") \
                     .join(film_category_df, "film_id") \
                     .join(category_df, "category_id") \
                     .groupBy("name") \
                     .agg(sum("amount").alias("total_spent")) \
                     .orderBy("total_spent", ascending=False) \
                     .limit(1)
                          
result_3.show()

+------+-----------+
|  name|total_spent|
+------+-----------+
|Sports|    5314.21|
+------+-----------+



4. Вывести названия фильмов, которых нет в inventory.


In [63]:
result_4 = film_df.join(inventory_df, "film_id", "leftanti").select("title")

result_4.show(result_4.count(), truncate=False)

+----------------------+
|title                 |
+----------------------+
|CHOCOLATE DUCK        |
|BUTCH PANTHER         |
|VOLUME HOUSE          |
|ORDER BETRAYED        |
|TADPOLE PARK          |
|KILL BROTHERHOOD      |
|FRANKENSTEIN STRANGER |
|CROSSING DIVORCE      |
|SUICIDES SILENCE      |
|CATCH AMISTAD         |
|PERDITION FARGO       |
|FLOATS GARDEN         |
|GUMP DATE             |
|WALLS ARTIST          |
|GLADIATOR WESTWARD    |
|HOCUS FRIDA           |
|ARSENIC INDEPENDENCE  |
|MUPPET MILE           |
|FIREHOUSE VIETNAM     |
|ROOF CHAMPION         |
|DAZED PUNK            |
|PEARL DESTINY         |
|RAINBOW SHOCK         |
|KENTUCKIAN GIANT      |
|BOONDOCK BALLROOM     |
|COMMANDMENTS EXPRESS  |
|HATE HANDICAP         |
|ARK RIDGEMONT         |
|CROWDS TELEMARK       |
|DELIVERANCE MULHOLLAND|
|RAIDERS ANTITRUST     |
|SISTER FREDDY         |
|VILLAIN DESPERATE     |
|APOLLO TEEN           |
|ALICE FANTASIA        |
|CRYSTAL BREAKING      |
|TREASURE COMMAND      |


5. Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех..


In [34]:
joined_data = actor_df \
    .join(film_actor_df, actor_df.actor_id == film_actor_df.actor_id) \
    .join(film_category_df, film_actor_df.film_id == film_category_df.film_id) \
    .join(category_df, film_category_df.category_id == category_df.category_id)

filtered_data = joined_data \
    .filter(category_df.name == "Children")

result_5 = filtered_data \
    .groupBy(concat(actor_df.first_name, lit(" "), actor_df.last_name).alias("actor_name")) \
    .agg(count("*").alias("film_count"), rank().over(Window.orderBy(count("*").desc())).alias("rank")) \
    .filter(col("rank") <= 3) \
    .orderBy(col("rank"))

result_5.show()

+-------------+----------+----+
|   actor_name|film_count|rank|
+-------------+----------+----+
| HELEN VOIGHT|         7|   1|
|  SUSAN DAVIS|         6|   2|
|   MARY TANDY|         5|   3|
|   RALPH CRUZ|         5|   3|
|KEVIN GARLAND|         5|   3|
|  WHOOPI HURT|         5|   3|
+-------------+----------+----+




6. Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.


In [35]:
joined_data = customer_df.join(address_df, "address_id").join(city_df, "city_id")

result_6 = joined_data.groupBy("city") \
                   .agg(count(when(customer_df.active == 1, 1)).alias("active_customers"), \
                        count(when(customer_df.active == 0, 1)).alias("inactive_customers")) \
                   .orderBy("inactive_customers", ascending=False)

result_6.show()
f"Result has {result_6.count()} rows"

+------------------+----------------+------------------+
|              city|active_customers|inactive_customers|
+------------------+----------------+------------------+
|         Pingxiang|               0|                 1|
|       Szkesfehrvr|               0|                 1|
|  Charlotte Amalie|               0|                 1|
|         Najafabad|               0|                 1|
|           Wroclaw|               0|                 1|
|            Ktahya|               0|                 1|
|            Amroha|               0|                 1|
|   Southend-on-Sea|               0|                 1|
|           Bat Yam|               0|                 1|
|            Kamyin|               0|                 1|
|          Xiangfan|               0|                 1|
|            Daxian|               0|                 1|
|          Uluberia|               0|                 1|
|     Coatzacoalcos|               0|                 1|
|        Kumbakonam|           

'Result has 597 rows'


7. Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”.


In [62]:
joined_data = (
    film_df
    .join(inventory_df, film_df.film_id == inventory_df.film_id)
    .join(rental_df, inventory_df.inventory_id == rental_df.inventory_id)
    .join(customer_df, rental_df.customer_id == customer_df.customer_id)
    .join(address_df, customer_df.address_id == address_df.address_id)
    .join(city_df, address_df.city_id == city_df.city_id)
    .join(country_df, city_df.country_id == country_df.country_id)
    .join(film_category_df, film_df.film_id == film_category_df.film_id)
    .join(category_df, film_category_df.category_id == category_df.category_id)
)

filtered_data = (
    joined_data.filter(category_df.name.like('A%') | city_df.city.like('%-%'))
)

grouped_data = (
    filtered_data
    .groupBy(category_df.name, city_df.city)
    .agg(sum(datediff(rental_df.return_date, rental_df.rental_date)).alias("total_rental_duration"))
)

result_7 = (
    grouped_data
    .orderBy(desc("total_rental_duration"))
    .limit(1)
)

result_7.show(truncate=False)

+------+----------+---------------------+
|name  |city      |total_rental_duration|
+------+----------+---------------------+
|Action|Avellaneda|46                   |
+------+----------+---------------------+

