## Notes

Использование JDBC для коннекта спарка и БД

---
Инкапсуляция импорта таблиц из БД в датафреймы с помощью функции *read_table_from_db*

---
Оптимизация запросов. 
1. Запросы я писал по аналогии моим запросам на SQL, по сути менялся только синтаксис. 
2. Использовал иногда broadcast, но только в случае, когда таблица заведомо небольшая. Вообще, в теории, тут много чего можно было в броадкаст засунуть т.к. в целом данных не много, но я решил этого не делать, чтобы при увелечении кол-ва данных +- все запросы все также работали.
3. Также ничего не кешировал; в случае, если все 7 запросов выпольнялись бы последовательно, это могло бы дать прирост производительности, но я расценивал каждый таск как независимую единицу, а в таком случае кеширование ничего абсолютно не давало.
4. Можно было бы еще для каждого запроса перед джоинами выбирать только нужные колонки, но на сколько я знаю Catalyst Optimizer достаточно умен, что сделать это самому пользуясь column pruning


---
spark.sql.shuffle.partitions = 50 

Установил такое количество, пользуясь рекомендацией, что кол-во партиций должно быть в 2-3 раза больше количества ядер ( у меня 16 )

# Setup

In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

jdbc_path = "/home/hello/.jdbc/postgresql-42.7.7.jar"

spark = SparkSession.builder \
    .appName("PagilaInNotebook") \
    .config("spark.jars", jdbc_path) \
    .getOrCreate()

load_dotenv()

url = "jdbc:postgresql://localhost:5432/pagila"

spark.conf.set("spark.sql.shuffle.partitions", "50")

25/07/04 09:34:05 WARN Utils: Your hostname, athena resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/07/04 09:34:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/07/04 09:34:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
def read_table_from_db(table_name):
    return spark.read.format("jdbc") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .options(**{"user": os.getenv("DB_USER"), "password": os.getenv("PASSWORD"), "driver": "org.postgresql.Driver"}) \
    .load()

# Task 1

In [3]:
df_category = read_table_from_db("public.category")
df_film_category = read_table_from_db("public.film_category")

result_df = df_film_category.join(broadcast(df_category), on="category_id") \
    .groupBy("name") \
    .agg(count("name").alias("num_of_films")) \
    .orderBy(desc(col("num_of_films")))

result_df.show(truncate=False)

+-----------+------------+
|name       |num_of_films|
+-----------+------------+
|Drama      |152         |
|Music      |152         |
|Travel     |151         |
|Foreign    |150         |
|Children   |150         |
|Games      |150         |
|Sci-Fi     |149         |
|Action     |149         |
|Animation  |148         |
|Family     |147         |
|Classics   |147         |
|New        |147         |
|Sports     |145         |
|Documentary|145         |
|Comedy     |143         |
|Horror     |142         |
+-----------+------------+



# Task 2

In [4]:
from pyspark.sql.functions import sum as _sum  

df_rental = read_table_from_db("rental")
df_inventory = read_table_from_db("inventory")
df_film_actor = read_table_from_db("film_actor")
df_actor = read_table_from_db("actor")

film_count = df_rental.join(df_inventory, on="inventory_id") \
    .groupBy("film_id") \
    .count()


df_actor = df_actor.withColumn(
    "actor_name",
    concat_ws(" ", "first_name", "last_name")
)


res = df_film_actor.join(film_count, on="film_id", how="left") \
    .join(df_actor, on="actor_id") \
    .groupBy("actor_name") \
    .agg(_sum("count").alias("rent_count")) \
    .orderBy(desc(col("rent_count")))
    
res.show(10)


+------------------+----------+
|        actor_name|rent_count|
+------------------+----------+
|       SUSAN DAVIS|       825|
|    GINA DEGENERES|       753|
|    MATTHEW CARREY|       678|
|       MARY KEITEL|       674|
|ANGELA WITHERSPOON|       654|
|       WALTER TORN|       640|
|       HENRY BERRY|       612|
|       JAYNE NOLTE|       611|
|        VAL BOLGER|       605|
|     SANDRA KILMER|       604|
+------------------+----------+
only showing top 10 rows



# Task 3

In [5]:
df_rental = read_table_from_db("rental")
df_inventory = read_table_from_db("inventory")
df_category = read_table_from_db("category")
df_film_category = read_table_from_db("film_category")
df_payment = read_table_from_db("payment")


df_film_revenue = df_payment.join(df_rental , on="rental_id") \
    .join(df_inventory, on="inventory_id") \
    .groupBy("film_id") \
    .agg(_sum("amount").alias("spent_on_film"))
    
res = df_film_category.join(df_film_revenue, on="film_id") \
    .join(df_category, on="category_id") \
    .groupBy("name") \
    .agg(_sum("spent_on_film").alias("category_revenue")) \
    .orderBy(desc(col("category_revenue")))
    
res.show(1)


+-------+----------------+
|   name|category_revenue|
+-------+----------------+
|Foreign|        10507.67|
+-------+----------------+
only showing top 1 row



# Task 4

In [12]:
df_film = read_table_from_db("film")
df_inventory = read_table_from_db("inventory")

#Analogy to SQL query
res = df_film.join(df_inventory, on="film_id", how="left") \
    .filter(col("inventory_id").isNull()).dropDuplicates(["film_id"]).select(["film_id", "title"])
    
res.show()

#PySpark way using anitjoin
df_inventory_id = df_inventory.select("film_id").distinct()
res = df_film.join(df_inventory_id, on="film_id", how="left_anti").select("film_id", "title")

res.show()


+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    669|       PEARL DESTINY|
|    713|       RAINBOW SHOCK|
|    221|DELIVERANCE MULHO...|
|    495|    KENTUCKIAN GIANT|
|    712|   RAIDERS ANTITRUST|
|    801|       SISTER FREDDY|
|    943|   VILLAIN DESPERATE|
|    359|  GLADIATOR WESTWARD|
|    108|       BUTCH PANTHER|
|    950|        VOLUME HOUSE|
|     87|   BOONDOCK BALLROOM|
|    642|      ORDER BETRAYED|
|    171|COMMANDMENTS EXPRESS|
|     33|         APOLLO TEEN|
|    874|        TADPOLE PARK|
|    497|    KILL BROTHERHOOD|
|     14|      ALICE FANTASIA|
|    198|    CRYSTAL BREAKING|
|    332|FRANKENSTEIN STRA...|
+-------+--------------------+
only showing top 20 rows

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    669|       PEARL DESTINY|
|    713|       RAINBOW SHOCK|
|    221|DELIVERANCE MULHO...|
|    495|    

# Task 5

In [13]:
from pyspark.sql.window import Window

df_category = read_table_from_db("category").alias("category")
df_film_category = read_table_from_db("film_category").alias("film_category")
df_film_actor = read_table_from_db("film_actor").alias("film_actor")
df_actor = read_table_from_db("actor").alias("actor")

df_actor = df_actor.withColumn(
    "actor_name",
    concat_ws(" ", "first_name", "last_name")
)

df_selected_actors = df_film_actor.join(df_film_category, on="film_id") \
    .join(df_category, on="category_id") \
    .join(df_actor, on="actor_id") \
    .filter(col("category.name") == "Children") \
    .groupBy("actor_name") \
    .agg(count(col("film_actor.film_id")).alias("cnt"))
    
windowSpec = Window.orderBy(desc(col("cnt")))

top_actors = df_selected_actors.withColumn("rank", rank().over(windowSpec)).filter(col("rank") <= 3).select("actor_name")

top_actors.show()


25/07/04 09:44:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+
|  actor_name|
+------------+
|RICHARD PENN|
|EWAN GOODING|
|SIDNEY CROWE|
+------------+



# Task 6

In [8]:
df_city = read_table_from_db("city").alias("city")
df_address = read_table_from_db("address").alias("address")
df_customer = read_table_from_db("customer").alias("customer")

res = df_customer.join(df_address, on="address_id") \
    .join(df_city, on="city_id") \
    .groupBy(col("city.city")) \
    .agg(
        _sum(col("customer.active")).alias("active_cnt"), 
        count(col("customer.active")).alias("cnt")) \
    .withColumn("inactive_cnt", col("cnt") - col("active_cnt")) \
    .orderBy(desc(col("inactive_cnt"))) \
    .drop("cnt")
    
res.show()

+----------------+----------+------------+
|            city|active_cnt|inactive_cnt|
+----------------+----------+------------+
|          Kamyin|         0|           1|
|         Bat Yam|         0|           1|
|       Najafabad|         0|           1|
|         Wroclaw|         0|           1|
|          Amroha|         0|           1|
|Charlotte Amalie|         0|           1|
|          Daxian|         0|           1|
|       Pingxiang|         0|           1|
|   Coatzacoalcos|         0|           1|
|     Szkesfehrvr|         0|           1|
|        Uluberia|         0|           1|
|        Xiangfan|         0|           1|
|          Ktahya|         0|           1|
| Southend-on-Sea|         0|           1|
|      Kumbakonam|         0|           1|
|         Lincoln|         1|           0|
|     Addis Abeba|         1|           0|
|           Sucre|         1|           0|
|     Qinhuangdao|         1|           0|
|        Tambaram|         1|           0|
+----------

# Task 7

In [9]:
from pyspark.sql.functions import sum as _sum 

df_rental_joined = df_rental.join(df_inventory, on="inventory_id") \
    .join(df_customer, on="customer_id") \
    .join(df_address, on="address_id") \
    .join(df_city, on="city_id") \
    .join(df_film_category, on="film_id") \
    .join(df_category, on="category_id") \
    .filter((col("city").like("%-%")) | (col("city").like("A%"))) \
    .withColumn("city_group", 
                when(col("city").like("%-%"), lit("Dashed cities")) 
                .when(col("city").like("A%"), lit("A_cities"))
    ) \
    .withColumn("duration" , (unix_timestamp("return_date") - unix_timestamp("rental_date")) / 3600)
    

df_rental_summary = df_rental_joined \
    .groupBy("city_group", "category.name").agg(
        count("rental_id"),
        _sum("duration").alias("total_duration")
    )


windowSpec = Window.orderBy(desc(col("total_duration"))).partitionBy("city_group")
top_categories_in_selected_countries = df_rental_summary.withColumn("rank", row_number().over(windowSpec)) \
    .filter(col("rank") == 1).select(col("city_group"), col("name").alias("top_category"), col("total_duration"))
 
top_categories_in_selected_countries.show()



+-------------+------------+------------------+
|   city_group|top_category|    total_duration|
+-------------+------------+------------------+
|     A_cities|    Children| 24021.29999999999|
|Dashed cities|       Drama|14556.033333333326|
+-------------+------------+------------------+

