In [None]:
from pyspark.sql import SparkSession, functions as F, types as T

In [None]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
print(spark.version)

In [None]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [None]:
# broadcast the small df to avoid shuffle
broadcast_category_df = F.broadcast(category_df)

nr_films_by_category = (
    film_category_df.join(broadcast_category_df, on='category_id', how='inner')
    .groupBy('name')
    .agg(F.count('film_id').alias('nr_films'))
    .orderBy(F.col('nr_films').desc())
    .select(F.col('name').alias('category_name'), 'nr_films')
)

nr_films_by_category.show()

2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [None]:
# column pruning
rental_df_pruned = rental_df.select('inventory_id')
inventory_df_pruned = inventory_df.select('inventory_id', 'film_id')
film_df_pruned = film_df.select('film_id', 'title')
actor_df_pruned = actor_df.select('actor_id', 'first_name', 'last_name')
film_actor_df_pruned = film_actor_df.select('actor_id', 'film_id')

# calculate rental_count
rental_count_df = (
    rental_df_pruned.join(inventory_df_pruned, on='inventory_id')
    .join(film_df_pruned, on='film_id')
    .groupBy('film_id', 'title')
    .agg(F.count('*').alias('rents_count'))
    .orderBy(F.col('rents_count').desc())
)

# calculate total number of film rentals per actor
result_df = (
    actor_df.join(film_actor_df_pruned, on='actor_id')
    .join(film_df_pruned, on='film_id')
    .join(rental_count_df, on='film_id')
    .groupBy('actor_id', F.concat(actor_df.first_name, F.lit(' '), actor_df.last_name).alias('actor_name'))
    .agg(F.sum('rents_count').alias('total_rents'))
    .orderBy(F.col('total_rents').desc())
    .limit(10)
)

result_df.show()

3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [None]:
# broadcast the small df to avoid shuffle
broadcast_category_df = F.broadcast(category_df)

# column pruning to minimize the size of joined df
payment_df_pruned = payment_df.select('payment_id', 'amount', 'rental_id')
rental_df_pruned = rental_df.select('rental_id', 'inventory_id')
inventory_df_pruned = inventory_df.select('inventory_id', 'film_id')
film_category_df_pruned = film_category_df.select('category_id', 'film_id')

most_spent_film_category = (
    payment_df_pruned.join(rental_df_pruned, on='rental_id')
    .join(inventory_df_pruned, on='inventory_id')
    .join(film_category_df_pruned, on='film_id')
    .join(broadcast_category_df, on='category_id')
    .groupby('name')
    .agg(F.sum('amount').alias('total_revenue'))
    .orderBy(F.col('total_revenue').desc())
    .limit(1)
)

most_spent_film_category.show()

4.
Вивести назви фільмів, яких не має в inventory.

In [None]:
# Select film IDs from the film df
film_ids_df = film_df.select('film_id')

# Select film IDs from the inventory df
inventory_film_ids_df = inventory_df.select('film_id')

# Perform the EXCEPT operation to get films not in inventory
films_not_in_inventory_df = film_ids_df.subtract(inventory_film_ids_df)

# Join the result with the film df to get the film titles
missing_inventory_films_df = films_not_in_inventory_df.join(film_df, on='film_id').select('title')

# Show the result
missing_inventory_films_df.show()

5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [28]:
# Create the children_films DataFrame
children_films_df = film_category_df.filter(film_category_df.category_id == 3).select('film_id')

# Determine top 3 actors by the number of children's films they've appeared in
top_3_children_actors_df = (
    film_actor_df.join(children_films_df, on='film_id')
    .groupBy('actor_id')
    .agg(F.count('actor_id').alias('nr_films'))
    .orderBy(F.col('nr_films').desc())
    .limit(3)
)

# Fetch actor names and their number of children's films
most_popular_children_actors_df = (
    top_3_children_actors_df.join(actor_df, on='actor_id')
    .select(F.concat(actor_df.first_name, F.lit(' '), actor_df.last_name).alias('actor_name'), 'nr_films')
)

most_popular_children_actors_df.show()

+------------+--------+
|  actor_name|nr_films|
+------------+--------+
|HELEN VOIGHT|       7|
|  RALPH CRUZ|       5|
| WHOOPI HURT|       5|
+------------+--------+


Stop Spark session:

In [29]:
spark.stop()