In [211]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc, concat, lit, sum, max

In [212]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [213]:
print(spark.version)

3.4.1


In [214]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [215]:
joined_df = film_category_df.join(
    category_df, film_category_df.category_id == category_df.category_id
).select(
    film_category_df["film_id"], category_df["name"].alias("category")
)
result_df = joined_df.groupBy("category").agg(
    count("film_id").alias("total_films")
)
result_df = result_df.orderBy(
    desc("total_films")
)
result_df.show()

+-----------+-----------+
|   category|total_films|
+-----------+-----------+
|     Sports|         74|
|    Foreign|         73|
|     Family|         69|
|Documentary|         68|
|  Animation|         66|
|     Action|         64|
|        New|         63|
|      Drama|         62|
|      Games|         61|
|     Sci-Fi|         61|
|   Children|         60|
|     Comedy|         58|
|     Travel|         57|
|   Classics|         57|
|     Horror|         56|
|      Music|         51|
+-----------+-----------+


2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [216]:
top_rental_films_actors = rental_df.join(
    inventory_df, rental_df.inventory_id == inventory_df.inventory_id, "left"
).join(
    film_actor_df, inventory_df.film_id == film_actor_df.film_id, "left"
).select(
    inventory_df["film_id"], film_actor_df["actor_id"]
)
top_rental_films_actors = top_rental_films_actors.groupBy(
    inventory_df.film_id, film_actor_df.actor_id
).agg(
    count(inventory_df.film_id).alias("rental_count")
)
top_rental_films_actors = top_rental_films_actors.orderBy(
    desc("rental_count")
)

result_df = top_rental_films_actors.join(
    actor_df, top_rental_films_actors.actor_id == actor_df.actor_id
).select(
    concat(actor_df["first_name"], lit(" "), actor_df["last_name"]).alias("actor")
).limit(10)
result_df.show()

+----------------+
|           actor|
+----------------+
|   DEBBIE AKROYD|
|      RENEE BALL|
|  JENNIFER DAVIS|
|   ANGELA HUDSON|
|JEFF SILVERSTONE|
|   RUSSELL CLOSE|
|   LUCILLE TRACY|
|   SANDRA KILMER|
|  AUDREY OLIVIER|
| PENELOPE CRONYN|
+----------------+


3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [217]:
payment_by_category = payment_df.join(
    rental_df, payment_df.rental_id == rental_df.rental_id
).join(
    inventory_df, rental_df.inventory_id == inventory_df.inventory_id
).join(
    film_category_df, inventory_df.film_id == film_category_df.film_id
).join(
    category_df, film_category_df.category_id == category_df.category_id
).select(payment_df["amount"], category_df["name"])
payment_by_category = payment_by_category.groupBy(
    category_df["name"]
).agg(
    sum(payment_df["amount"]).alias("total")
)

max_payment_by_category = payment_by_category.agg(
    max("total").alias("max_total")
).collect()[0]["max_total"]

result_df = payment_by_category.select(
    payment_by_category["name"].alias("category")
).filter(
    payment_by_category["total"] == max_payment_by_category
)
result_df.show()

+--------+
|category|
+--------+
|  Sports|
+--------+


4.
Вивести назви фільмів, яких не має в inventory.

In [218]:
result_df = film_df.join(
    inventory_df, film_df.film_id == inventory_df.film_id, "left"
).select(
    film_df["title"]
).filter(
    inventory_df["film_id"].isNull()
)
result_df.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+


5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [219]:
children_category_films = film_category_df.join(
    category_df, film_category_df.category_id == category_df.category_id, "left"
).select(
    film_category_df["film_id"]
).filter(
    category_df["name"] == 'Children'
)

filmography_of_actor = children_category_films.join(
    film_actor_df, children_category_films.film_id == film_actor_df.film_id, "left"
).select(
    film_actor_df["actor_id"], children_category_films["film_id"]
).groupBy(
    film_actor_df.actor_id
).agg(
    count(children_category_films.film_id).alias("total_films")
)

result_df = filmography_of_actor.join(
    actor_df, filmography_of_actor.actor_id == actor_df.actor_id, "left"
).select(
    concat(actor_df["first_name"], lit(" "), actor_df["last_name"]).alias("actor")
).limit(3)

result_df.show()

+----------------+
|           actor|
+----------------+
|       EMILY DEE|
| MORGAN WILLIAMS|
|MINNIE ZELLWEGER|
+----------------+


Stop Spark session:

In [220]:
spark.stop()