In [7]:
from operator import concat

from pyspark.sql import SparkSession, functions as F, types as T

In [8]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [9]:
print(spark.version)

3.4.1


In [10]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

In [11]:
# Домашнє завдання на тему Spark SQL

# Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.
# 
# - Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
# - Можете створювати стільки нових клітинок, скільки вам необхідно.
# - Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
# - код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.
# 
# **Увага!**
# Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [12]:
#кількість фільмів в кожній категорії
count_of_films_df = film_category_df.groupBy("category_id").agg(F.count("film_id").alias("films_count"))

#результат
count_of_films_df.join(category_df, on="category_id", how="left").select("name", "films_count").orderBy("films_count", ascending=False).show()

+-----------+-----------+
|       name|films_count|
+-----------+-----------+
|     Sports|         74|
|    Foreign|         73|
|     Family|         69|
|Documentary|         68|
|  Animation|         66|
|     Action|         64|
|        New|         63|
|      Drama|         62|
|      Games|         61|
|     Sci-Fi|         61|
|   Children|         60|
|     Comedy|         58|
|     Travel|         57|
|   Classics|         57|
|     Horror|         56|
|      Music|         51|
+-----------+-----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [13]:
#кількість оренд по кожній копії
rent_count_df = rental_df.groupBy("inventory_id").count()

#кількіст оренд по кожному фільму 
film_rent_stat_df = inventory_df.join(rent_count_df, on="inventory_id", how="left").groupBy("film_id").agg( F.sum("count").alias("rent_count"))

#топ 10 акторів по кількості аренд всіх фільмів де вони знімались. від більшого до меншого
top_ten_actor_id_df = film_rent_stat_df.join(film_actor_df, on="film_id", how="left").groupBy("actor_id").agg(F.sum("rent_count").alias("rent_count_films_by_actors")).orderBy("rent_count_films_by_actors", ascending=False).limit(10)

#результат
top_ten_actor_id_df.join(actor_df, on="actor_id", how="left").withColumn("actor_name", F.concat_ws(" ", "first_name", "last_name")).select("actor_name").show()


+------------------+
|        actor_name|
+------------------+
|    GINA DEGENERES|
|    MATTHEW CARREY|
|       MARY KEITEL|
|ANGELA WITHERSPOON|
|       WALTER TORN|
|       HENRY BERRY|
|       JAYNE NOLTE|
|        VAL BOLGER|
|     SANDRA KILMER|
|      SEAN GUINESS|
+------------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [14]:

rental_df.join(payment_df, on="rental_id", how="inner") \
         .join(inventory_df, on="inventory_id", how="inner") \
         .join(film_category_df, on="film_id", how="inner") \
                .groupBy("category_id").agg(F.sum("amount").alias("s_amount")) \
         .join(category_df, on="category_id", how="inner") \
                .orderBy("s_amount", ascending=False) \
                .limit(1).select("name") \
         .show()


+------+
|  name|
+------+
|Sports|
+------+



4.
Вивести назви фільмів, яких не має в inventory.

In [15]:
film_df.join(inventory_df, on="film_id", how="left")\
       .filter(inventory_df["film_id"].isNull())\
       .select("title")\
       .show()


+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [16]:
id_category_children = category_df.filter(F.col("name") == "Children").first()["category_id"]

all_films_id_category_children_df = film_category_df.filter( F.col("category_id") == id_category_children )\
                                                    .join(film_actor_df, on="film_id", how="inner")\
                                                    .groupBy("actor_id").agg(F.count("film_id").alias("count_films"))\
                                                    .orderBy("count_films", ascending=False).limit(3)\
                                                    .join(actor_df, on="actor_id", how="inner")\
                                                    .select(F.concat_ws(' ', "first_name", "last_name")).show()

+-----------------------------------+
|concat_ws( , first_name, last_name)|
+-----------------------------------+
|                       HELEN VOIGHT|
|                         RALPH CRUZ|
|                        WHOOPI HURT|
+-----------------------------------+



Stop Spark session:

In [17]:
spark.stop()