In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.4.1


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [70]:
film_count_by_category = film_category_df.join(
    category_df,
    on="category_id"
).groupBy(
    F.col("category_id"),
    F.col("name"),
).agg(
    F.count("film_id").alias("film_count")
).orderBy(
    F.desc("film_count")
)

film_count_by_category.show()

+-----------+-----------+----------+
|category_id|       name|film_count|
+-----------+-----------+----------+
|         15|     Sports|        74|
|          9|    Foreign|        73|
|          8|     Family|        69|
|          6|Documentary|        68|
|          2|  Animation|        66|
|          1|     Action|        64|
|         13|        New|        63|
|          7|      Drama|        62|
|         14|     Sci-Fi|        61|
|         10|      Games|        61|
|          3|   Children|        60|
|          5|     Comedy|        58|
|          4|   Classics|        57|
|         16|     Travel|        57|
|         11|     Horror|        56|
|         12|      Music|        51|
+-----------+-----------+----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [69]:
most_rented_actors = actor_df.join(
    film_actor_df,
    on="actor_id"
).join(
    inventory_df,
    on="film_id"
).join(
    rental_df,
    on="inventory_id"
).groupBy(
    F.col("first_name"),
    F.col("last_name")
).agg(
    F.count("rental_id").alias("rental_count")
).orderBy(
    F.desc("rental_count")
)

most_rented_actors.show()

+----------+-----------+------------+
|first_name|  last_name|rental_count|
+----------+-----------+------------+
|     SUSAN|      DAVIS|         825|
|      GINA|  DEGENERES|         753|
|   MATTHEW|     CARREY|         678|
|      MARY|     KEITEL|         674|
|    ANGELA|WITHERSPOON|         654|
|    WALTER|       TORN|         640|
|     HENRY|      BERRY|         612|
|     JAYNE|      NOLTE|         611|
|       VAL|     BOLGER|         605|
|    SANDRA|     KILMER|         604|
|      SEAN|    GUINESS|         599|
|    ANGELA|     HUDSON|         574|
|  SCARLETT|      DAMON|         572|
|      EWAN|    GOODING|         571|
|     KEVIN|    GARLAND|         565|
|    WARREN|      NOLTE|         564|
|     WOODY|    HOFFMAN|         560|
|   CAMERON|  ZELLWEGER|         560|
|      JADA|      RYDER|         560|
|     HELEN|     VOIGHT|         557|
+----------+-----------+------------+
only showing top 20 rows



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [67]:
most_profitable_films = category_df.join(
    film_category_df,
    on="category_id"
).join(
    inventory_df,
    on="film_id"
).join(
    rental_df,
    on="inventory_id"
).join(
    payment_df,
    on="rental_id"
).groupBy(
    category_df.name,
).agg(
    F.sum("amount").alias("payments_cost"),
    F.count("amount").alias("payments_count"),
).orderBy(
    F.desc("payments_cost")
)

most_profitable_films.show()

+-----------+------------------+--------------+
|       name|     payments_cost|payments_count|
+-----------+------------------+--------------+
|     Sports| 5314.209999999843|          1179|
|     Sci-Fi| 4756.979999999873|          1101|
|  Animation| 4656.299999999867|          1166|
|      Drama|4587.3899999998885|          1060|
|     Comedy| 4383.579999999895|           941|
|     Action| 4375.849999999871|          1112|
|        New| 4361.569999999893|           945|
|      Games| 4281.329999999897|           969|
|    Foreign| 4270.669999999886|          1033|
|     Family| 4226.069999999887|          1096|
|Documentary| 4217.519999999893|          1050|
|     Horror| 3722.539999999919|           846|
|   Children| 3655.549999999912|           945|
|   Classics| 3639.589999999916|           939|
|     Travel|3549.6399999999226|           837|
|      Music|3417.7199999999216|           830|
+-----------+------------------+--------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [66]:
missing_films = film_df.join(
    inventory_df,
    on="film_id",
    how="left"
).where(
    inventory_df.film_id.isNull(),  
).select(
    film_df.title,
).distinct()

missing_films.show()

+--------------------+
|               title|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [65]:
top3_children_actors = actor_df.join(
    film_actor_df,
    on="actor_id"
).join(
    film_category_df,
    on="film_id"
).join(
    category_df,
    on="category_id"
).where(
    F.col("name") == "Children",
).groupBy(
    F.col("first_name"),
    F.col("last_name"),
).agg(
    F.count("film_id").alias("film_count"),
).orderBy(
    F.desc("film_count"),
).limit(3)

top3_children_actors.show()

+----------+---------+----------+
|first_name|last_name|film_count|
+----------+---------+----------+
|     HELEN|   VOIGHT|         7|
|     SUSAN|    DAVIS|         6|
|     RALPH|     CRUZ|         5|
+----------+---------+----------+



Stop Spark session:

In [71]:
spark.stop()