In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType, DoubleType

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.5.0


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

In [5]:
category_schema = StructType([
    StructField('category_id', IntegerType(), False),
    StructField('name', StringType(), False),
    StructField('last_update', DateType(), False),
])

film_category_schema = StructType([
        StructField('film_id', IntegerType(), False),
        StructField('category_id', IntegerType(), False),
        StructField('last_update', DateType(), False),
])

film_actor_schema = StructType([
    StructField('actor_id', IntegerType(), False),
    StructField('film_id', IntegerType(), False),
    StructField('last_update', TimestampType(), False),
])

rental_schema = StructType([
    StructField('rental_id', IntegerType(), False),
    StructField('rental_date', TimestampType(), False),
    StructField('inventory_id', IntegerType(), False),
    StructField('customer_id', IntegerType(), False),
    StructField('return_date', TimestampType(), False),
    StructField('staff_id', IntegerType(), False),
    StructField('last_update', TimestampType(), False),
])

inventory_schema = StructType([
    StructField('inventory_id', IntegerType(), False),
    StructField('film_id', IntegerType(), False),
    StructField('store_id', IntegerType(), False),
    StructField('last_update', TimestampType(), False),
])

actor_schema = StructType([
    StructField('actor_id', IntegerType(), False),
    StructField('first_name', StringType(), False),
    StructField('last_name', StringType(), False),
    StructField('last_update', TimestampType(), False),
])

payment_schema = StructType([
    StructField('payment_id', IntegerType(), False),
    StructField('customer_id', IntegerType(), False),
    StructField('staff_id', IntegerType(), False),
    StructField('rental_id', IntegerType(), False),
    StructField('amount', DoubleType(), False),
    StructField('payment_date', TimestampType(), False),
])

film_schema = StructType(
    [
        StructField('film_id', IntegerType(), False),
        StructField('title', StringType(), False),
        StructField('description', StringType(), False),
        StructField('release_year', IntegerType(), False),
        StructField('language_id', IntegerType(), False),
        StructField('original_language_id', IntegerType(), True),
        StructField('rental_duration', IntegerType(), False),
        StructField('rental_rate', StringType(), False),
        StructField('length', IntegerType(), False),
        StructField('replacement_cost', IntegerType(), False),
        StructField('rating', IntegerType(), False),
        StructField('special_features', StringType(), False),
        StructField('fulltext', StringType(), False),
        StructField('last_update', DateType(), False),
    ]
)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [6]:
category_df = spark.read.csv('./data/category.csv', header=True, schema=category_schema)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, schema=film_category_schema)

inner_join_df = category_df.join(film_category_df, category_df.category_id == film_category_df.category_id, 'inner')
# inner_join_df.show()

grouped_df = inner_join_df.groupBy(film_category_df.category_id, category_df.name).count()
# grouped_df.show()

sorted_df = grouped_df.sort(F.desc("count"))
sorted_df.select('name', 'count').show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|     Sci-Fi|   61|
|      Games|   61|
|   Children|   60|
|     Comedy|   58|
|   Classics|   57|
|     Travel|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [11]:
join1_df = rental_df.join(inventory_df, inventory_df.inventory_id == rental_df.inventory_id, 'inner')
join2_df = join1_df.join(film_actor_df, film_actor_df.film_id == inventory_df.film_id, 'inner')

grouped_df = join2_df.groupBy(film_actor_df.actor_id).agg(film_actor_df.actor_id, F.count(rental_df.rental_id).alias('rental_count'))
grouped_df = grouped_df.orderBy(F.desc('rental_count')).limit(10).select('actor_id', 'rental_count')

top_actors_df = grouped_df.join(actor_df, actor_df.actor_id == grouped_df.actor_id).orderBy(
    F.desc(grouped_df.rental_count)).select(actor_df.actor_id, actor_df.first_name, actor_df.last_name, grouped_df.rental_count)

top_actors_df.show()

+--------+----------+-----------+------------+
|actor_id|first_name|  last_name|rental_count|
+--------+----------+-----------+------------+
|     107|      GINA|  DEGENERES|         753|
|     181|   MATTHEW|     CARREY|         678|
|     198|      MARY|     KEITEL|         674|
|     144|    ANGELA|WITHERSPOON|         654|
|     102|    WALTER|       TORN|         640|
|      60|     HENRY|      BERRY|         612|
|     150|     JAYNE|      NOLTE|         611|
|      37|       VAL|     BOLGER|         605|
|      23|    SANDRA|     KILMER|         604|
|      90|      SEAN|    GUINESS|         599|
+--------+----------+-----------+------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [16]:
join1_df = rental_df.join(payment_df, rental_df.rental_id == payment_df.rental_id, 'left')
join2_df = join1_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id, 'left')
join3_df = join2_df.join(film_category_df, inventory_df.film_id == film_category_df.film_id, 'left')
final_df = join3_df.join(category_df, film_category_df.category_id == category_df.category_id, 'left')

grouped_df = final_df.groupBy(category_df.name).agg(F.sum(payment_df.amount).alias('total_amount'))

sorted_df = grouped_df.orderBy(F.desc("total_amount"))
sorted_df.select('total_amount', 'name').orderBy(F.desc('total_amount')).show()

+------------------+-----------+
|      total_amount|       name|
+------------------+-----------+
| 5314.209999999848|     Sports|
|  4756.97999999987|     Sci-Fi|
| 4656.299999999864|  Animation|
| 4587.389999999876|      Drama|
| 4383.579999999895|     Comedy|
| 4375.849999999871|     Action|
| 4361.569999999897|        New|
| 4281.329999999893|      Games|
| 4270.669999999888|    Foreign|
|4226.0699999998815|     Family|
| 4217.519999999889|Documentary|
| 3722.539999999922|     Horror|
|3655.5499999999147|   Children|
|3639.5899999999156|   Classics|
| 3549.639999999925|     Travel|
| 3414.729999999933|      Music|
+------------------+-----------+



4.
Вивести назви фільмів, яких не має в inventory.

In [18]:
inventory_df = spark.read.csv('./data/inventory.csv', header=True, schema=inventory_schema)
film_df = spark.read.csv('./data/film.csv', header=True, schema=film_schema)

left_join_df = film_df.join(inventory_df, inventory_df.film_id == film_df.film_id, 'left')
# left_join_df.show()

filtered_df = left_join_df.filter(F.col('inventory_id').isNull())

filtered_df.select('title').distinct().show()

+--------------------+
|               title|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [19]:
join1_df = film_actor_df.join(actor_df, actor_df.actor_id == film_actor_df.actor_id, 'left')
join2_df = join1_df.join(film_category_df, film_actor_df.film_id == film_category_df.film_id, 'inner')
join3_df = join2_df.join(category_df, category_df.category_id == film_category_df.category_id, 'inner')

join3_df.filter(category_df.name == 'Children')

grouped_df = join3_df.groupBy(film_actor_df.actor_id, actor_df.first_name, actor_df.last_name).agg(
    F.sum(film_actor_df.actor_id).alias('appearance_count'))

sorted_df = grouped_df.orderBy(F.desc('appearance_count'))
sorted_df.select('first_name', 'last_name', 'appearance_count')

limited_df = sorted_df.limit(3)
limited_df.show()


+--------+----------+---------+----------------+
|actor_id|first_name|last_name|appearance_count|
+--------+----------+---------+----------------+
|     198|      MARY|   KEITEL|            7920|
|     181|   MATTHEW|   CARREY|            7059|
|     197|     REESE|     WEST|            6501|
+--------+----------+---------+----------------+



Stop Spark session:

In [20]:
spark.stop()