In [17]:
from pyspark.sql import SparkSession, functions as F, types as T

In [18]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [19]:
print(spark.version)

3.4.1


In [20]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [21]:
film_category_joined = film_category_df.join(category_df, film_category_df.category_id == category_df.category_id)

category_film_count = film_category_joined.groupBy("name").agg(F.count("film_id").alias("film_count"))

sorted_category_film_count = category_film_count.orderBy(F.col("film_count").desc())

sorted_category_film_count.show()

+-----------+----------+
|       name|film_count|
+-----------+----------+
|     Sports|        74|
|    Foreign|        73|
|     Family|        69|
|Documentary|        68|
|  Animation|        66|
|     Action|        64|
|        New|        63|
|      Drama|        62|
|      Games|        61|
|     Sci-Fi|        61|
|   Children|        60|
|     Comedy|        58|
|     Travel|        57|
|   Classics|        57|
|     Horror|        56|
|      Music|        51|
+-----------+----------+


2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [22]:
film_actor_alias = film_actor_df.alias("fa")
film_alias = film_df.alias("f")
inventory_alias = inventory_df.alias("i")
rental_alias = rental_df.alias("r")
actor_alias = actor_df.alias("a")

film_actor_joined = film_actor_alias.join(film_alias, F.col("fa.film_id") == F.col("f.film_id"))

film_inventory_joined = film_actor_joined.join(inventory_alias, F.col("f.film_id") == F.col("i.film_id"))

film_rental_joined = film_inventory_joined.join(rental_alias, F.col("i.inventory_id") == F.col("r.inventory_id"))

actor_rental_count = film_rental_joined.groupBy("fa.actor_id").agg(F.count("r.rental_id").alias("rental_count"))

actor_info = actor_rental_count.join(actor_alias, F.col("fa.actor_id") == F.col("a.actor_id"))

top_actors = actor_info.select("a.first_name", "a.last_name", "rental_count").orderBy(F.col("rental_count").desc()).limit(10)

top_actors.show()

+----------+-----------+------------+
|first_name|  last_name|rental_count|
+----------+-----------+------------+
|      GINA|  DEGENERES|         753|
|   MATTHEW|     CARREY|         678|
|      MARY|     KEITEL|         674|
|    ANGELA|WITHERSPOON|         654|
|    WALTER|       TORN|         640|
|     HENRY|      BERRY|         612|
|     JAYNE|      NOLTE|         611|
|       VAL|     BOLGER|         605|
|    SANDRA|     KILMER|         604|
|      SEAN|    GUINESS|         599|
+----------+-----------+------------+


3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [23]:
film_category_alias = film_category_df.alias("fc")
category_alias = category_df.alias("c")
film_alias = film_df.alias("f")
inventory_alias = inventory_df.alias("i")
rental_alias = rental_df.alias("r")
payment_alias = payment_df.alias("p")

category_joined = film_category_alias.join(category_alias, F.col("fc.category_id") == F.col("c.category_id"))

film_inventory_joined = film_alias.join(inventory_alias, F.col("f.film_id") == F.col("i.film_id"))

film_rental_joined = film_inventory_joined.join(rental_alias, F.col("i.inventory_id") == F.col("r.inventory_id"))

film_payment_joined = film_rental_joined.join(payment_alias, F.col("r.rental_id") == F.col("p.rental_id"))

category_payment_sum = film_payment_joined.join(category_joined, F.col("i.film_id") == F.col("fc.film_id")) \
    .groupBy("c.name").agg(F.sum("p.amount").alias("total_spent"))

top_spending_category = category_payment_sum.orderBy(F.col("total_spent").desc()).limit(1)

top_spending_category.show()

                                                                                

+------+-----------------+
|  name|      total_spent|
+------+-----------------+
|Sports|5314.209999999847|
+------+-----------------+


4.
Вивести назви фільмів, яких не має в inventory.

In [24]:
film_alias = film_df.alias("f")
inventory_alias = inventory_df.alias("i")

film_inventory_joined = film_alias.join(inventory_alias, F.col("f.film_id") == F.col("i.film_id"), "left_outer")
films_not_in_inventory = film_inventory_joined.filter(F.col("i.inventory_id").isNull()).select("f.title")

films_not_in_inventory.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+


5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [25]:
film_category_alias = film_category_df.alias("fc")
category_alias = category_df.alias("c")
film_actor_alias = film_actor_df.alias("fa")
actor_alias = actor_df.alias("a")

children_films = film_category_alias.join(category_alias, F.col("fc.category_id") == F.col("c.category_id")) \
    .filter(F.col("c.name") == "Children").select("fc.film_id")

film_actor_joined = children_films.join(film_actor_alias, F.col("fc.film_id") == F.col("fa.film_id"))

actor_film_joined = film_actor_joined.join(actor_alias, F.col("fa.actor_id") == F.col("a.actor_id"))

actor_count = actor_film_joined.groupBy("a.actor_id", "a.first_name", "a.last_name") \
    .agg(F.count("fa.film_id").alias("film_count"))

top_actors = actor_count.orderBy(F.col("film_count").desc()).limit(3)

top_actors.show()

+--------+----------+---------+----------+
|actor_id|first_name|last_name|film_count|
+--------+----------+---------+----------+
|      17|     HELEN|   VOIGHT|         7|
|     127|     KEVIN|  GARLAND|         5|
|     140|    WHOOPI|     HURT|         5|
+--------+----------+---------+----------+


Stop Spark session:

In [26]:
spark.stop()