In [1]:
import findspark
findspark.init()

In [35]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, avg, min, round, col, count, sum, dense_rank, desc, when

In [3]:
import os
from dotenv import load_dotenv

load_dotenv()

True

In [4]:
spark = SparkSession.builder.master('local[*]').appName('task5').config('spark.jars', 'C:\spark_folder\spark-3.5.0-bin-hadoop3\jars\postgresql-42.7.1.jar').getOrCreate()

In [5]:
jdbc_url = 'jdbc:postgresql://localhost:5432/Task3'
properties = {'user': os.getenv('DB_USER'), 'password': os.getenv('DB_PASSWORD')}

reader = spark.read.format('jdbc').option('url', jdbc_url).option('driver', 'org.postgresql.Driver').option('user', properties['user']).option('password', properties['password'])

In [6]:
df_actor = reader.option('dbtable', 'actor').load()
df_address = reader.option('dbtable', 'address').load()
df_category = reader.option('dbtable', 'category').load()
df_city = reader.option('dbtable', 'city').load()
df_country = reader.option('dbtable', 'country').load()
df_customer = reader.option('dbtable', 'customer').load()
df_film = reader.option('dbtable', 'film').load()
df_film_actor = reader.option('dbtable', 'film_actor').load()
df_film_category = reader.option('dbtable', 'film_category').load()
df_inventory = reader.option('dbtable', 'inventory').load()
df_language = reader.option('dbtable', 'language').load()
df_payment = reader.option('dbtable', 'payment').load()
df_rental = reader.option('dbtable', 'rental').load()
df_staff = reader.option('dbtable', 'staff').load()
df_store = reader.option('dbtable', 'store').load()

In [7]:
df_actor.show()

+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2022-02-15 12:34:33|
|       2|      NICK|    WAHLBERG|2022-02-15 12:34:33|
|       3|        ED|       CHASE|2022-02-15 12:34:33|
|       4|  JENNIFER|       DAVIS|2022-02-15 12:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2022-02-15 12:34:33|
|       6|     BETTE|   NICHOLSON|2022-02-15 12:34:33|
|       7|     GRACE|      MOSTEL|2022-02-15 12:34:33|
|       8|   MATTHEW|   JOHANSSON|2022-02-15 12:34:33|
|       9|       JOE|       SWANK|2022-02-15 12:34:33|
|      10| CHRISTIAN|       GABLE|2022-02-15 12:34:33|
|      11|      ZERO|        CAGE|2022-02-15 12:34:33|
|      12|      KARL|       BERRY|2022-02-15 12:34:33|
|      13|       UMA|        WOOD|2022-02-15 12:34:33|
|      14|    VIVIEN|      BERGEN|2022-02-15 12:34:33|
|      15|      CUBA|     OLIVIER|2022-02-15 12:34:33|
|      16|

In [8]:
#  Вывести количество фильмов в каждой категории, отсортировать по убыванию.
query_1 = df_film_category.join(df_category, df_film_category.category_id == df_category.category_id, 'inner') \
    .groupBy(df_category.name).agg(count(df_film_category.film_id).alias('amount_in_each_category')) \
    .orderBy(col('amount_in_each_category').desc()).show()

+-----------+-----------------------+
|       name|amount_in_each_category|
+-----------+-----------------------+
|     Sports|                     74|
|    Foreign|                     73|
|     Family|                     69|
|Documentary|                     68|
|  Animation|                     66|
|     Action|                     64|
|        New|                     63|
|      Drama|                     62|
|      Games|                     61|
|     Sci-Fi|                     61|
|   Children|                     60|
|     Comedy|                     58|
|     Travel|                     57|
|   Classics|                     57|
|     Horror|                     56|
|      Music|                     51|
+-----------+-----------------------+



In [9]:
# Вывести 10 актеров, чьи фильмы больше всего арендовали, отсортировать по убыванию.
query_2 = df_rental.join(df_inventory, df_rental.inventory_id == df_inventory.inventory_id, 'left') \
    .join(df_film_actor, df_inventory.film_id == df_film_actor.film_id, 'inner') \
    .join(df_actor, df_film_actor.actor_id == df_actor.actor_id, 'left') \
    .groupBy(df_actor.actor_id, df_actor.first_name, df_actor.last_name) \
    .agg(count(df_rental.rental_id).alias('amount_of_rental_films')) \
    .orderBy(col('amount_of_rental_films').desc()) \
    .select(df_actor.first_name, df_actor.last_name, col('amount_of_rental_films')) \
    .show(10, truncate=False)

+----------+-----------+----------------------+
|first_name|last_name  |amount_of_rental_films|
+----------+-----------+----------------------+
|GINA      |DEGENERES  |753                   |
|MATTHEW   |CARREY     |678                   |
|MARY      |KEITEL     |674                   |
|ANGELA    |WITHERSPOON|654                   |
|WALTER    |TORN       |640                   |
|HENRY     |BERRY      |612                   |
|JAYNE     |NOLTE      |611                   |
|VAL       |BOLGER     |605                   |
|SANDRA    |KILMER     |604                   |
|SEAN      |GUINESS    |599                   |
+----------+-----------+----------------------+
only showing top 10 rows



In [10]:
# Вывести категорию фильмов, на которую потратили больше всего денег.
query_3 = df_film.join(df_film_category, df_film.film_id == df_film_category.film_id, 'left') \
    .join(df_category, df_film_category.category_id == df_category.category_id, 'inner') \
    .groupBy(df_category.name) \
    .agg(sum(df_film.replacement_cost).alias('most_expensive_category')) \
    .orderBy(col('most_expensive_category').desc()) \
    .show(1)

+------+-----------------------+
|  name|most_expensive_category|
+------+-----------------------+
|Sports|                1509.26|
+------+-----------------------+
only showing top 1 row



In [11]:
# Вывести названия фильмов, которых нет в inventory. Написать запрос без использования оператора IN.
query_4 = df_film.join(df_inventory, df_film.film_id == df_inventory.film_id, 'left_anti') \
    .select(df_film.title).show(100, truncate=False)

+----------------------+
|title                 |
+----------------------+
|CHOCOLATE DUCK        |
|BUTCH PANTHER         |
|VOLUME HOUSE          |
|ORDER BETRAYED        |
|TADPOLE PARK          |
|KILL BROTHERHOOD      |
|FRANKENSTEIN STRANGER |
|CROSSING DIVORCE      |
|SUICIDES SILENCE      |
|CATCH AMISTAD         |
|PERDITION FARGO       |
|FLOATS GARDEN         |
|GUMP DATE             |
|WALLS ARTIST          |
|GLADIATOR WESTWARD    |
|HOCUS FRIDA           |
|ARSENIC INDEPENDENCE  |
|MUPPET MILE           |
|FIREHOUSE VIETNAM     |
|ROOF CHAMPION         |
|DAZED PUNK            |
|PEARL DESTINY         |
|RAINBOW SHOCK         |
|KENTUCKIAN GIANT      |
|BOONDOCK BALLROOM     |
|COMMANDMENTS EXPRESS  |
|HATE HANDICAP         |
|ARK RIDGEMONT         |
|CROWDS TELEMARK       |
|DELIVERANCE MULHOLLAND|
|RAIDERS ANTITRUST     |
|SISTER FREDDY         |
|VILLAIN DESPERATE     |
|APOLLO TEEN           |
|ALICE FANTASIA        |
|CRYSTAL BREAKING      |
|TREASURE COMMAND      |


In [12]:
from pyspark.sql.window import Window

In [13]:
# Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”.
# --Если у нескольких актеров одинаковое кол-во фильмов, вывести всех.

windowSpec = Window.orderBy(desc('film_count'))

query_5 = df_category.join(df_film_category, df_category.category_id == df_film_category.category_id, 'inner') \
    .join(df_film_actor, df_film_category.film_id == df_film_actor.film_id, 'inner') \
    .join(df_actor, df_film_actor.actor_id == df_actor.actor_id, 'left') \
    .filter(df_category.name == 'Children') \
    .groupBy(df_actor.actor_id, df_actor.first_name, df_actor.last_name) \
    .agg(count(df_film_actor.film_id).alias('film_count')) \
    .withColumn('rank', dense_rank().over(windowSpec)) \
    .filter('rank <= 3') \
    .select('first_name', 'last_name', 'rank') \
    .show(truncate=False)

+----------+---------+----+
|first_name|last_name|rank|
+----------+---------+----+
|HELEN     |VOIGHT   |1   |
|KEVIN     |GARLAND  |2   |
|RALPH     |CRUZ     |2   |
|MARY      |TANDY    |2   |
|WHOOPI    |HURT     |2   |
|SCARLETT  |DAMON    |3   |
|SYLVESTER |DERN     |3   |
|SANDRA    |KILMER   |3   |
|RENEE     |BALL     |3   |
|KIRSTEN   |AKROYD   |3   |
|ALAN      |DREYFUSS |3   |
|SUSAN     |DAVIS    |3   |
|JAYNE     |NOLTE    |3   |
|UMA       |WOOD     |3   |
|JANE      |JACKMAN  |3   |
|CHRISTIAN |AKROYD   |3   |
|JADA      |RYDER    |3   |
|ELLEN     |PRESLEY  |3   |
|VAL       |BOLGER   |3   |
+----------+---------+----+



In [33]:
# Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). 
# --Отсортировать по количеству неактивных клиентов по убыванию.

query_6 = df_customer.join(df_address, df_customer.address_id == df_address.address_id, 'inner') \
    .join(df_city, df_address.city_id == df_city.city_id) \
    .groupBy(df_city.city) \
    .agg(sum(when(df_customer.active == 0, 1).otherwise(0)).alias('amount_of_inactive'), 
         sum(when(df_customer.active == 1, 1).otherwise(0)).alias('amount_of_active')) \
    .orderBy(col('amount_of_inactive').desc(), col('amount_of_active').desc()).show()

+------------------+------------------+----------------+
|              city|amount_of_inactive|amount_of_active|
+------------------+------------------+----------------+
|          Uluberia|                 1|               0|
|         Najafabad|                 1|               0|
|         Pingxiang|                 1|               0|
|          Xiangfan|                 1|               0|
|        Kumbakonam|                 1|               0|
|       Szkesfehrvr|                 1|               0|
|  Charlotte Amalie|                 1|               0|
|            Kamyin|                 1|               0|
|            Daxian|                 1|               0|
|     Coatzacoalcos|                 1|               0|
|           Wroclaw|                 1|               0|
|            Ktahya|                 1|               0|
|            Amroha|                 1|               0|
|   Southend-on-Sea|                 1|               0|
|           Bat Yam|           

In [47]:
# --7. Вывести категорию фильмов, у которой самое большое кол-во часов 
# --суммарной аренды в городах (customer.address_id в этом city), 
# --и которые начинаются на букву “a”. 
# --То же самое сделать для городов в которых есть символ “-”. 
# --Написать все в одном запросе.

query_7_1 = df_category.join(df_film_category, df_category.category_id == df_film_category.category_id, 'inner') \
    .join(df_inventory, df_film_category.film_id == df_inventory.film_id, 'inner') \
    .join(df_rental, df_inventory.inventory_id == df_rental.inventory_id, 'inner') \
    .join(df_customer, df_rental.customer_id == df_customer.customer_id, 'inner') \
    .join(df_address, df_customer.address_id == df_address.address_id, 'inner') \
    .join(df_city, df_address.city_id == df_city.city_id, 'inner') \
    .where(df_city.city.like('%-%')) \
    .groupBy(df_category.name) \
    .agg(sum(df_rental.return_date - df_rental.rental_date).alias('sum_hours')) \
    .orderBy(col('sum_hours').desc()).limit(1)


query_7_2 = df_category.join(df_film_category, df_category.category_id == df_film_category.category_id, 'inner') \
    .join(df_inventory, df_film_category.film_id == df_inventory.film_id, 'inner') \
    .join(df_rental, df_inventory.inventory_id == df_rental.inventory_id, 'inner') \
    .join(df_customer, df_rental.customer_id == df_customer.customer_id, 'inner') \
    .join(df_address, df_customer.address_id == df_address.address_id, 'inner') \
    .join(df_city, df_address.city_id == df_city.city_id, 'inner') \
    .where(df_city.city.like('a%')) \
    .groupBy(df_category.name) \
    .agg(sum(df_rental.return_date - df_rental.rental_date).alias('sum_hours')) \
    .orderBy(col('sum_hours').desc()).limit(1)

query_7 = query_7_1.union(query_7_2).show(truncate=False)

+-------+-------------------------------------+
|name   |sum_hours                            |
+-------+-------------------------------------+
|Foreign|INTERVAL '269 16:09:00' DAY TO SECOND|
|Action |INTERVAL '60 23:41:00' DAY TO SECOND |
+-------+-------------------------------------+

