In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
print(spark.version)

3.5.0


In [4]:
actor_df = spark.read.csv("./data/actor.csv", header=True, inferSchema=True)
address_df = spark.read.csv("./data/address.csv", header=True, inferSchema=True)
category_df = spark.read.csv("./data/category.csv", header=True, inferSchema=True)
city_df = spark.read.csv("./data/city.csv", header=True, inferSchema=True)
country_df = spark.read.csv("./data/country.csv", header=True, inferSchema=True)
customer_df = spark.read.csv("./data/customer.csv", header=True, inferSchema=True)
film_df = spark.read.csv("./data/film.csv", header=True, inferSchema=True)
film_actor_df = spark.read.csv("./data/film_actor.csv", header=True, inferSchema=True)
film_category_df = spark.read.csv(
    "./data/film_category.csv", header=True, inferSchema=True
)
inventory_df = spark.read.csv("./data/inventory.csv", header=True, inferSchema=True)
language_df = spark.read.csv("./data/language.csv", header=True, inferSchema=True)
payment_df = spark.read.csv("./data/payment.csv", header=True, inferSchema=True)
rental_df = spark.read.csv("./data/rental.csv", header=True, inferSchema=True)
staff_df = spark.read.csv("./data/staff.csv", header=True, inferSchema=True)
store_df = spark.read.csv("./data/store.csv", header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [5]:
category_films_df = category_df.join(
    film_category_df, [category_df.category_id == film_category_df.category_id]
).drop(film_category_df.category_id)

In [6]:
category_films_df.groupBy("category_id", "name").agg(
    F.count("*").alias("category_films_amount")
).select("name", "category_films_amount").show()

+-----------+---------------------+
|       name|category_films_amount|
+-----------+---------------------+
|     Horror|                   56|
|     Sci-Fi|                   61|
|     Action|                   64|
|   Classics|                   57|
|   Children|                   60|
|Documentary|                   68|
|     Family|                   69|
|     Sports|                   74|
|  Animation|                   66|
|      Games|                   61|
|    Foreign|                   73|
|     Comedy|                   58|
|      Drama|                   62|
|        New|                   63|
|     Travel|                   57|
|      Music|                   51|
+-----------+---------------------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [7]:
rental_inventory_df = rental_df.join(
    inventory_df, [rental_df.inventory_id == inventory_df.inventory_id]
).drop(rental_df.inventory_id)

In [8]:
rental_film_df = (
    rental_inventory_df.join(film_df, rental_inventory_df.film_id == film_df.film_id)
    .drop(rental_inventory_df.film_id)
    .select("rental_id", "title", "film_id", "rental_date")
)

In [9]:
rental_film_actors_df = rental_film_df.join(
    film_actor_df, rental_film_df.film_id == film_actor_df.film_id
).drop(film_actor_df.film_id)

In [10]:
rental_actors_df = rental_film_actors_df.join(
    actor_df, rental_film_actors_df.actor_id == actor_df.actor_id
).drop(actor_df.actor_id)

In [11]:
actor_rentls_amount_df = (
    rental_actors_df.groupBy("actor_id", "first_name", "last_name")
    .agg(F.count("*").alias("actor_rentals_amount"))
    .select("first_name", "last_name", "actor_rentals_amount")
    .sort(F.col("actor_rentals_amount").desc())
    .show(10)
)

+----------+-----------+--------------------+
|first_name|  last_name|actor_rentals_amount|
+----------+-----------+--------------------+
|      GINA|  DEGENERES|                 753|
|   MATTHEW|     CARREY|                 678|
|      MARY|     KEITEL|                 674|
|    ANGELA|WITHERSPOON|                 654|
|    WALTER|       TORN|                 640|
|     HENRY|      BERRY|                 612|
|     JAYNE|      NOLTE|                 611|
|       VAL|     BOLGER|                 605|
|    SANDRA|     KILMER|                 604|
|      SEAN|    GUINESS|                 599|
+----------+-----------+--------------------+
only showing top 10 rows



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [12]:
rental_film_category_df = rental_film_df.join(
    film_category_df, rental_film_df.film_id == film_category_df.film_id
).drop(film_category_df.film_id)

In [13]:
rental_film_category_with_names = rental_film_category_df.join(
    category_df, rental_film_category_df.category_id == category_df.category_id
).drop(category_df.category_id)

In [14]:
rental_category_payments_df = (
    rental_film_category_with_names.join(
        payment_df, rental_film_category_with_names.rental_id == payment_df.rental_id
    )
    .drop(payment_df.rental_id)
    .select("rental_id", "name", "amount", "title")
)

In [15]:
category_payments_amount_df = (
    rental_category_payments_df.groupBy("name")
    .agg(F.sum("amount").alias("category_moneys_amount"))
    .select("name")
    .sort(F.col("category_moneys_amount").desc())
    .show(1)
)

+------+
|  name|
+------+
|Sports|
+------+
only showing top 1 row



4.
Вивести назви фільмів, яких не має в inventory.

In [16]:
film_inventorys_df = (
    film_df.join(inventory_df, film_df.film_id == inventory_df.film_id, "left")
    .drop(inventory_df.film_id)
    .select(F.col("title").alias("film_title"))
    .filter(F.col("inventory_id").isNull() == True)
    .show()
)

+--------------------+
|          film_title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [17]:
film_categorys_df = (
    film_df.join(film_category_df, film_df.film_id == film_category_df.film_id)
    .drop(film_category_df.film_id)
    .select("film_id", "category_id", "title", "description")
)

In [18]:
films_with_categorys_df = film_categorys_df.join(
    category_df, film_categorys_df.category_id == category_df.category_id
).drop(category_df.category_id)

In [19]:
films_with_actors_df = films_with_categorys_df.join(
    film_actor_df, films_with_categorys_df.film_id == film_actor_df.film_id
).drop(film_actor_df.film_id)

In [20]:
films_data_full_df = (
    films_with_actors_df.join(
        actor_df, films_with_actors_df.actor_id == actor_df.actor_id
    )
    .drop(actor_df.actor_id)
    .select("title", "name", "actor_id", "first_name", "last_name", "film_id")
    .filter(F.col("name") == "Children")
)

In [21]:
actors_in_children_category_df = (
    films_data_full_df.groupBy("actor_id", "first_name", "last_name")
    .agg(F.count("*").alias("actor_films_amount"))
    .select("first_name", "last_name", "actor_films_amount")
)

In [22]:
actors_in_children_category_df.sort(F.col("actor_films_amount").desc()).show(3)

+----------+---------+------------------+
|first_name|last_name|actor_films_amount|
+----------+---------+------------------+
|     HELEN|   VOIGHT|                 7|
|     KEVIN|  GARLAND|                 5|
|     RALPH|     CRUZ|                 5|
+----------+---------+------------------+
only showing top 3 rows



Stop Spark session:

In [23]:
spark.stop()