In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

In [3]:
spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.jars.packages", 
                                                                        "org.apache.hadoop:hadoop-aws-2.7.3").appName("spark-hw").getOrCreate()

In [4]:
print(spark.version)

4.0.0


In [5]:
actor_df = spark.read.csv('../data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('../data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('../data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('../data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('../data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('../data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('../data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('../data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('../data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('../data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('../data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('../data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('../data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('../data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('../data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [6]:
# з'єднуємо таблиці film_category та category
film_with_categories_df = film_category_df.join(
    category_df,
    film_category_df.category_id == category_df.category_id,
    "inner"
)

# групуємо за назвою категорії та рахуємо кількість фільмів
films_per_category_df = film_with_categories_df.groupBy("name") \
    .agg(F.count("*").alias("film_count")) \
    .orderBy(F.col("film_count").desc())

# виводимо результат
films_per_category_df.show()

+-----------+----------+
|       name|film_count|
+-----------+----------+
|     Sports|        74|
|    Foreign|        73|
|     Family|        69|
|Documentary|        68|
|  Animation|        66|
|     Action|        64|
|        New|        63|
|      Drama|        62|
|      Games|        61|
|     Sci-Fi|        61|
|   Children|        60|
|     Comedy|        58|
|     Travel|        57|
|   Classics|        57|
|     Horror|        56|
|      Music|        51|
+-----------+----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [7]:
# з'єднуємо акторів із їхніми фільмами
actor_films_df = actor_df.join(
    film_actor_df,
    actor_df.actor_id == film_actor_df.actor_id,
    "inner"
)

# додаємо інформацію про інвентар
actor_inventory_df = actor_films_df.join(
    inventory_df,
    film_actor_df.film_id == inventory_df.film_id,
    "inner"
)

# додаємо інформацію про прокати
actor_rentals_df = actor_inventory_df.join(
    rental_df,
    actor_inventory_df.inventory_id == rental_df.inventory_id,
    "inner"
)

# групуємо за актором і рахуємо кількість прокатів
top_actors_df = actor_rentals_df.groupBy("first_name", "last_name") \
    .agg(F.count("*").alias("rental_count")) \
    .orderBy(F.col("rental_count").desc()) \
    .limit(10)

# виводимо результат
top_actors_df.show()

+----------+-----------+------------+
|first_name|  last_name|rental_count|
+----------+-----------+------------+
|     SUSAN|      DAVIS|         825|
|      GINA|  DEGENERES|         753|
|   MATTHEW|     CARREY|         678|
|      MARY|     KEITEL|         674|
|    ANGELA|WITHERSPOON|         654|
|    WALTER|       TORN|         640|
|     HENRY|      BERRY|         612|
|     JAYNE|      NOLTE|         611|
|       VAL|     BOLGER|         605|
|    SANDRA|     KILMER|         604|
+----------+-----------+------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [10]:
# з'єднуємо таблиці
film_with_category_df = film_category_df \
    .join(category_df, on="category_id", how="inner") \
    .join(inventory_df, on="film_id", how="inner") \
    .join(rental_df, on="inventory_id", how="inner") \
    .join(payment_df, on="rental_id", how="inner")

# агрегуємо суму оплати по категоріях
category_revenue_df = film_with_category_df \
    .groupBy("name") \
    .agg(F.sum("amount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

# виводимо категорію з найбільшим доходом
category_revenue_df.show(1)

+------+-----------------+
|  name|    total_revenue|
+------+-----------------+
|Sports|5314.209999999843|
+------+-----------------+
only showing top 1 row


4.
Вивести назви фільмів, яких не має в inventory.

In [11]:
# вибираємо лише ті фільми, які не зустрічаються в inventory
films_not_in_inventory = film_df.join(
    inventory_df,
    on="film_id",
    how="left_anti"  # залишає тільки ті, яких немає в inventory
)

# виводимо назви таких фільмів
films_not_in_inventory.select("title").show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows


5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [12]:
# об'єднуємо фільми з категоріями
film_with_category = film_df.join(film_category_df, on="film_id") \
                            .join(category_df, on="category_id")

# фільтруємо лише категорію "Children"
children_films = film_with_category.filter(film_with_category.name == "Children")

# додаємо акторів, які знімались у цих фільмах
children_actors = children_films.join(film_actor_df, on="film_id") \
                                .join(actor_df, on="actor_id")

# групуємо по акторам, рахуємо кількість фільмів
top_actors = children_actors.groupBy("actor_id", "first_name", "last_name") \
                            .count() \
                            .orderBy("count", ascending=False) \
                            .limit(3)

# виводимо результат
top_actors.show()

+--------+----------+---------+-----+
|actor_id|first_name|last_name|count|
+--------+----------+---------+-----+
|      17|     HELEN|   VOIGHT|    7|
|     127|     KEVIN|  GARLAND|    5|
|     140|    WHOOPI|     HURT|    5|
+--------+----------+---------+-----+



Stop Spark session:

In [13]:
spark.stop()