In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
conf = SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.jars.packages", 
                                                                        "org.apache.hadoop:hadoop-aws-2.7.3").appName("spark-hw").getOrCreate()

In [4]:
print(spark.version)

3.5.0


In [6]:
actor_df = spark.read.csv('/home/lec_14/data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('/home/lec_14/data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('/home/lec_14/data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('/home/lec_14/data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('/home/lec_14/data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('/home/lec_14/data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('/home/lec_14/data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('/home/lec_14/data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('/home/lec_14/data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('/home/lec_14/data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('/home/lec_14/data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('/home/lec_14/data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('/home/lec_14/data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('/home/lec_14/data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('/home/lec_14/data/store.csv', header=True, inferSchema=True)

1. Вивести кількість фільмів в кожній категорії. Результат відсортувати за спаданням.

In [47]:
film_with_category_id = film_df.join(film_category_df,"film_id")
film_with_category = film_with_category_id.join(category_df,"category_id")

#film_with_category.show()
category_film_count_df = film_with_category.groupBy("category_id","name") \
    .agg(F.count("film_id").alias("film_in_category_count")) \
    .orderBy(F.col("film_in_category_count").desc())
category_film_count_df.show()

+-----------+-----------+----------------------+
|category_id|       name|film_in_category_count|
+-----------+-----------+----------------------+
|         15|     Sports|                    74|
|          9|    Foreign|                    73|
|          8|     Family|                    69|
|          6|Documentary|                    68|
|          2|  Animation|                    66|
|          1|     Action|                    64|
|         13|        New|                    63|
|          7|      Drama|                    62|
|         14|     Sci-Fi|                    61|
|         10|      Games|                    61|
|          3|   Children|                    60|
|          5|     Comedy|                    58|
|          4|   Classics|                    57|
|         16|     Travel|                    57|
|         11|     Horror|                    56|
|         12|      Music|                    51|
+-----------+-----------+----------------------+



2. Вивести 10 акторів, чиї фільми брали на прокат найбільше. Результат відсортувати за спаданням.

In [48]:
film_actor = actor_df.join(film_actor_df,"actor_id")
film_actor_inventory = film_actor.join(inventory_df,"film_id")
film_actor_inventory_rental = film_actor_inventory.join(rental_df,"inventory_id")
actor_film_rate_df = film_actor_inventory_rental.groupBy("actor_id","first_name","last_name") \
    .agg(F.count("film_id").alias("count_of_rented_films")) \
    .orderBy(F.col("count_of_rented_films").desc())
actor_film_rate_df.show(10)

+--------+----------+-----------+---------------------+
|actor_id|first_name|  last_name|count_of_rented_films|
+--------+----------+-----------+---------------------+
|     107|      GINA|  DEGENERES|                  753|
|     181|   MATTHEW|     CARREY|                  678|
|     198|      MARY|     KEITEL|                  674|
|     144|    ANGELA|WITHERSPOON|                  654|
|     102|    WALTER|       TORN|                  640|
|      60|     HENRY|      BERRY|                  612|
|     150|     JAYNE|      NOLTE|                  611|
|      37|       VAL|     BOLGER|                  605|
|      23|    SANDRA|     KILMER|                  604|
|      90|      SEAN|    GUINESS|                  599|
+--------+----------+-----------+---------------------+
only showing top 10 rows



3. Вивести категорія фільмів, на яку було витрачено найбільше грошей в прокаті

In [58]:
film_category_id = film_df.join(film_category_df,"film_id")
film_category = film_category_id.join(category_df,"category_id")
film_category_inventory = film_category.join(inventory_df,"film_id")
film_category_inventory_rent = film_category_inventory.join(rental_df,"inventory_id")
film_category_inventory_rent_payment = film_category_inventory_rent.join(payment_df,"rental_id")
film_category_cost_df = film_category_inventory_rent_payment.groupBy("category_id","name") \
    .agg(F.sum("amount").alias("biggest_sum")) \
    .orderBy(F.col("biggest_sum").desc())
film_category_cost_df.show(1)

+-----------+------+-----------------+
|category_id|  name|      biggest_sum|
+-----------+------+-----------------+
|         15|Sports|5314.209999999843|
+-----------+------+-----------------+
only showing top 1 row



4. Вивести назви фільмів, яких не має в inventory.

In [75]:
films_not_in_inventory_df = film_df.join(inventory_df,"film_id",how="left")
films_not_in_inventory_df.filter(F.col("inventory_id").isNull()).select("title").distinct().show()



+--------------------+
|               title|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5. Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [82]:
film_actor_id = actor_df.join(film_actor_df,"actor_id")
film_actor = film_df.join(film_actor_id,"film_id")
film_actor_category_id = film_category_df.join(film_actor,"film_id")
film_actor_category = category_df.join(film_actor_category_id,"category_id").filter(F.col("name") == "Children")
actor_category_max_df = film_actor_category.groupBy("actor_id","first_name","last_name") \
    .agg(F.count("category_id").alias("count_of_appearences")) \
    .orderBy(F.col("count_of_appearences").desc())
#category_df.show()
actor_category_max_df.show(3)


+--------+----------+---------+--------------------+
|actor_id|first_name|last_name|count_of_appearences|
+--------+----------+---------+--------------------+
|      17|     HELEN|   VOIGHT|                   7|
|     127|     KEVIN|  GARLAND|                   5|
|      80|     RALPH|     CRUZ|                   5|
+--------+----------+---------+--------------------+
only showing top 3 rows

