In [3]:
from pyspark.sql import SparkSession, functions as F, types as T

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-hw")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .getOrCreate()
)

print("Spark version:", spark.version)
print("Spark master:", spark.sparkContext.master)


Spark version: 3.5.1
Spark master: local[*]


26/01/25 17:02:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
print(spark.version)

3.5.1


In [5]:
print("Spark version:", spark.version)
print("Spark master:", spark.sparkContext.master)
spark.range(5).show()

Spark version: 3.5.1
Spark master: local[*]
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [6]:
actor_df = spark.read.csv('../data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('../data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('../data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('../data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('../data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('../data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('../data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('../data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('../data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('../data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('../data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('../data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('../data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('../data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('../data/store.csv', header=True, inferSchema=True)

In [7]:
import os, glob
print("cwd:", os.getcwd())
print("actor.csv exists:", os.path.exists("../data/actor.csv"))
print("sample files:", glob.glob("../data/*.csv")[:5])


cwd: /mnt/c/my_homework/DATA-ENGINEERING/lec14/notebooks
actor.csv exists: True
sample files: ['../data/actor.csv', '../data/address.csv', '../data/category.csv', '../data/city.csv', '../data/country.csv']


In [8]:
actor_df.printSchema()
actor_df.count()
actor_df.show(5, truncate=False)

root
 |-- actor_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- last_update: timestamp (nullable = true)

+--------+----------+------------+-------------------+
|actor_id|first_name|last_name   |last_update        |
+--------+----------+------------+-------------------+
|1       |PENELOPE  |GUINESS     |2022-02-15 10:34:33|
|2       |NICK      |WAHLBERG    |2022-02-15 10:34:33|
|3       |ED        |CHASE       |2022-02-15 10:34:33|
|4       |JENNIFER  |DAVIS       |2022-02-15 10:34:33|
|5       |JOHNNY    |LOLLOBRIGIDA|2022-02-15 10:34:33|
+--------+----------+------------+-------------------+
only showing top 5 rows



In [9]:
film_df.printSchema()
film_df.show(5, truncate=False)

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- language_id: integer (nullable = true)
 |-- original_language_id: string (nullable = true)
 |-- rental_duration: integer (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- length: integer (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- rating: string (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- special_features: string (nullable = true)
 |-- fulltext: string (nullable = true)

+-------+----------------+---------------------------------------------------------------------------------------------------------------------+------------+-----------+--------------------+---------------+-----------+------+----------------+------+--------------------------+----------------------------------+------------------------------------------------------------------------

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [10]:
# тут має бути розвʼязок задачі
(
    film_category_df.alias("fc")
    .join(category_df.alias("c"), F.col("fc.category_id") == F.col("c.category_id"), "inner")
    .groupBy(F.col("c.name").alias("category"))
    .agg(F.count(F.col("fc.film_id")).alias("films_cnt"))
    .orderBy(F.col("films_cnt").desc(), F.col("category").asc())
    .show(truncate=False)
)


+-----------+---------+
|category   |films_cnt|
+-----------+---------+
|Sports     |74       |
|Foreign    |73       |
|Family     |69       |
|Documentary|68       |
|Animation  |66       |
|Action     |64       |
|New        |63       |
|Drama      |62       |
|Games      |61       |
|Sci-Fi     |61       |
|Children   |60       |
|Comedy     |58       |
|Classics   |57       |
|Travel     |57       |
|Horror     |56       |
|Music      |51       |
+-----------+---------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [11]:
# тут має бути розвʼязок задачі
(
    actor_df.alias("a")
    .join(film_actor_df.alias("fa"), F.col("a.actor_id") == F.col("fa.actor_id"), "inner")
    .join(inventory_df.alias("i"), F.col("fa.film_id") == F.col("i.film_id"), "inner")
    .join(rental_df.alias("r"), F.col("i.inventory_id") == F.col("r.inventory_id"), "inner")
    .groupBy(
        F.col("a.actor_id").alias("actor_id"),
        F.col("a.first_name").alias("first_name"),
        F.col("a.last_name").alias("last_name"),
    )
    .agg(F.count(F.col("r.rental_id")).alias("rentals_cnt"))
    .orderBy(F.col("rentals_cnt").desc(), F.col("actor_id").asc())
    .limit(10)
    .show(truncate=False)
)


+--------+----------+-----------+-----------+
|actor_id|first_name|last_name  |rentals_cnt|
+--------+----------+-----------+-----------+
|107     |GINA      |DEGENERES  |753        |
|181     |MATTHEW   |CARREY     |678        |
|198     |MARY      |KEITEL     |674        |
|144     |ANGELA    |WITHERSPOON|654        |
|102     |WALTER    |TORN       |640        |
|60      |HENRY     |BERRY      |612        |
|150     |JAYNE     |NOLTE      |611        |
|37      |VAL       |BOLGER     |605        |
|23      |SANDRA    |KILMER     |604        |
|90      |SEAN      |GUINESS    |599        |
+--------+----------+-----------+-----------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [12]:
# тут має бути розвʼязок задачі
(
    payment_df.alias("p")
    .join(rental_df.alias("r"), F.col("p.rental_id") == F.col("r.rental_id"), "inner")
    .join(inventory_df.alias("i"), F.col("r.inventory_id") == F.col("i.inventory_id"), "inner")
    .join(film_category_df.alias("fc"), F.col("i.film_id") == F.col("fc.film_id"), "inner")
    .join(category_df.alias("c"), F.col("fc.category_id") == F.col("c.category_id"), "inner")
    .groupBy(F.col("c.name").alias("category"))
    .agg(F.round(F.sum(F.col("p.amount")), 2).alias("total_amount"))
    .orderBy(F.col("total_amount").desc(), F.col("category").asc())
    .limit(1)
    .show(truncate=False)
)


+--------+------------+
|category|total_amount|
+--------+------------+
|Sports  |5314.21     |
+--------+------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [13]:
# тут має бути розвʼязок задачі
(
    film_df.alias("f")
    .join(inventory_df.select("film_id").distinct().alias("i"), F.col("f.film_id") == F.col("i.film_id"), "left_anti")
    .select(F.col("f.film_id"), F.col("f.title"))
    .orderBy(F.col("f.film_id").asc())
    .show(truncate=False)
)


+-------+----------------------+
|film_id|title                 |
+-------+----------------------+
|14     |ALICE FANTASIA        |
|33     |APOLLO TEEN           |
|36     |ARGONAUTS TOWN        |
|38     |ARK RIDGEMONT         |
|41     |ARSENIC INDEPENDENCE  |
|87     |BOONDOCK BALLROOM     |
|108    |BUTCH PANTHER         |
|128    |CATCH AMISTAD         |
|144    |CHINATOWN GLADIATOR   |
|148    |CHOCOLATE DUCK        |
|171    |COMMANDMENTS EXPRESS  |
|192    |CROSSING DIVORCE      |
|195    |CROWDS TELEMARK       |
|198    |CRYSTAL BREAKING      |
|217    |DAZED PUNK            |
|221    |DELIVERANCE MULHOLLAND|
|318    |FIREHOUSE VIETNAM     |
|325    |FLOATS GARDEN         |
|332    |FRANKENSTEIN STRANGER |
|359    |GLADIATOR WESTWARD    |
+-------+----------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [14]:
# тут має бути розвʼязок задачі
children_category_df = category_df.filter(F.col("name") == F.lit("Children")).select("category_id")

(
    film_category_df.alias("fc")
    .join(children_category_df.alias("cc"), F.col("fc.category_id") == F.col("cc.category_id"), "inner")
    .join(film_actor_df.alias("fa"), F.col("fc.film_id") == F.col("fa.film_id"), "inner")
    .join(actor_df.alias("a"), F.col("fa.actor_id") == F.col("a.actor_id"), "inner")
    .groupBy(
        F.col("a.actor_id").alias("actor_id"),
        F.col("a.first_name").alias("first_name"),
        F.col("a.last_name").alias("last_name"),
    )
    .agg(F.count(F.col("fa.film_id")).alias("appearances_cnt"))
    .orderBy(F.col("appearances_cnt").desc(), F.col("actor_id").asc())
    .limit(3)
    .show(truncate=False)
)


+--------+----------+---------+---------------+
|actor_id|first_name|last_name|appearances_cnt|
+--------+----------+---------+---------------+
|17      |HELEN     |VOIGHT   |7              |
|66      |MARY      |TANDY    |5              |
|80      |RALPH     |CRUZ     |5              |
+--------+----------+---------+---------------+



Stop Spark session:

In [None]:
spark.stop()