In [1]:
import os
import sys

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

In [3]:
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder \
    .appName("MyOldProject") \
    .master("local[4]") \
    .config("spark.driver.extraClassPath", r"C:\spark-3.5.7-bin-hadoop3\jars\postgresql-42.7.8.jar") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

# Подключение к Postgres в Docker
url = "jdbc:postgresql://localhost:5433/postgres"
properties = {"user": "postgres", "password": "123456", "driver": "org.postgresql.Driver"}
print(spark.sparkContext.uiWebUrl)

http://127.0.0.1:4040


In [18]:
# query 1
df_category = spark.read.jdbc(url=url, table="category", properties=properties)
df_film_category = spark.read.jdbc(url=url, table="film_category", properties=properties)
df_join = df_category.join(df_film_category, on = "category_id", how = "inner")
df_group = df_join.groupBy("name").count()
df_order = df_group.orderBy("count", ascending = False)
df_order.show()
df_order.explain()

+-----------+-----+
|       name|count|
+-----------+-----+
|      Drama|  152|
|      Music|  152|
|     Travel|  151|
|    Foreign|  150|
|      Games|  150|
|   Children|  150|
|     Action|  149|
|     Sci-Fi|  149|
|  Animation|  148|
|     Family|  147|
|   Classics|  147|
|        New|  147|
|     Sports|  145|
|Documentary|  145|
|     Comedy|  143|
|     Horror|  142|
+-----------+-----+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#3905L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#3905L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=18068]
      +- HashAggregate(keys=[name#3883], functions=[count(1)])
         +- Exchange hashpartitioning(name#3883, 200), ENSURE_REQUIREMENTS, [plan_id=18065]
            +- HashAggregate(keys=[name#3883], functions=[partial_count(1)])
               +- Project [name#3883]
                  +- SortMergeJoin [category_id#3882], [category_id#3889], Inner
                     :- Sort [category_i

In [6]:
# query 2
df_rental = spark.read.jdbc(url=url, table="rental", properties=properties)
df_inventory = spark.read.jdbc(url=url, table="inventory", properties=properties)
df_film = spark.read.jdbc(url=url, table="film", properties=properties)
df_film_actor = spark.read.jdbc(url=url, table="film_actor", properties=properties)
df_actor = spark.read.jdbc(url=url, table="actor", properties=properties)
df_actor1 = df_actor.withColumn("actor", F.concat(F.col("first_name"), F.lit(" "), F.col("last_name")))
df_join = df_rental.join(df_inventory, "inventory_id").join(df_film, "film_id").join(df_film_actor, "film_id").join(df_actor1, "actor_id")
df_group = df_join.groupBy("actor").count()
df_order = df_group.orderBy("count", ascending = False)
df_order.show(10)


+------------------+-----+
|             actor|count|
+------------------+-----+
|       SUSAN DAVIS|  825|
|    GINA DEGENERES|  753|
|    MATTHEW CARREY|  678|
|       MARY KEITEL|  674|
|ANGELA WITHERSPOON|  654|
|       WALTER TORN|  640|
|       HENRY BERRY|  612|
|       JAYNE NOLTE|  611|
|        VAL BOLGER|  605|
|     SANDRA KILMER|  604|
+------------------+-----+
only showing top 10 rows



In [7]:
# query 3
df_payment = spark.read.jdbc(url=url, table="payment", properties=properties)
df_join = df_category.join(df_film_category, "category_id").join(df_inventory, "film_id").join(df_rental, "inventory_id").join(df_payment, "rental_id")
df_group = df_join.groupBy("name").agg(F.sum("amount").alias("sum"))
df_order = df_group.orderBy("sum", ascending = False)
df_order.show(1)

+-------+--------+
|   name|     sum|
+-------+--------+
|Foreign|10507.67|
+-------+--------+
only showing top 1 row



In [8]:
# query 4
df_anti_join = df_film.join(df_inventory, on = "film_id", how = "leftanti")
df_anti_join.select("title").show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [9]:
# query 5
df_category = spark.read.jdbc(url=url, table="category", properties=properties)
df_film_category = spark.read.jdbc(url=url, table="film_category", properties=properties)
df_film = spark.read.jdbc(url=url, table="film", properties=properties)
df_film_actor = spark.read.jdbc(url=url, table="film_actor", properties=properties)
df_actor = spark.read.jdbc(url=url, table="actor", properties=properties)
df_join = df_category.join(df_film_category, "category_id").join(df_film, "film_id"). join(df_film_actor, "film_id").join(df_actor, "actor_id")
df_filter = df_join.filter(F.col("name") =="Children")
df_edit_column = df_filter.withColumn("actor", F.concat_ws(" ", "first_name", "last_name"))
df_group = df_edit_column.groupBy("actor").agg(F.count("film_id").alias("film_count"))
df_order = df_group.orderBy("film_count", ascending = False)

window = Window.partitionBy().orderBy(F.desc("film_count"))
df_window = df_order.withColumn("dense_rank", F.dense_rank().over(window))
df_top3 = df_window.filter(F.col("dense_rank") <= 3)
df_top3.show(30)
                                        

+----------------+----------+----------+
|           actor|film_count|dense_rank|
+----------------+----------+----------+
|    SIDNEY CROWE|         9|         1|
|    RICHARD PENN|         9|         1|
|    EWAN GOODING|         9|         1|
|      DAN HARRIS|         8|         2|
|       KIM ALLEN|         8|         2|
|      ALEC WAYNE|         8|         2|
|      MARY TANDY|         8|         2|
|    JANE JACKMAN|         8|         2|
|  RUSSELL TEMPLE|         8|         2|
|    SPENCER PECK|         8|         2|
|  MATTHEW CARREY|         8|         2|
|     SUSAN DAVIS|         8|         2|
|      JADA RYDER|         8|         2|
|   ANGELA HUDSON|         7|         3|
|    WARREN NOLTE|         7|         3|
|MINNIE ZELLWEGER|         7|         3|
|     GENE WILLIS|         7|         3|
|  AUDREY OLIVIER|         7|         3|
|  JULIANNE DENCH|         7|         3|
|      JAMES PITT|         7|         3|
|    KENNETH TORN|         7|         3|
|  DARYL WAHLBER

In [10]:
# query 6
df_customer = spark.read.jdbc(url=url, table="customer", properties=properties)
df_city = spark.read.jdbc(url=url, table="city", properties=properties)
df_address = spark.read.jdbc(url=url, table="address", properties=properties)
df_join = df_customer.join(df_address,"address_id").join(df_city, "city_id")
df_group = df_join.groupBy("city").agg(F.sum(F.when(F.col("active")==1, 1).otherwise(0)).alias("active_cnt"), \
                                       F.sum(F.when(F.col("active")==0, 1).otherwise(0)).alias("inactive_cnt"))
df_order = df_group.orderBy("inactive_cnt", ascending = False) 
df_order.show()

+------------------+----------+------------+
|              city|active_cnt|inactive_cnt|
+------------------+----------+------------+
|          Uluberia|         0|           1|
|         Najafabad|         0|           1|
|         Pingxiang|         0|           1|
|          Xiangfan|         0|           1|
|        Kumbakonam|         0|           1|
|       Szkesfehrvr|         0|           1|
|  Charlotte Amalie|         0|           1|
|            Kamyin|         0|           1|
|            Daxian|         0|           1|
|     Coatzacoalcos|         0|           1|
|           Wroclaw|         0|           1|
|            Ktahya|         0|           1|
|            Amroha|         0|           1|
|   Southend-on-Sea|         0|           1|
|           Bat Yam|         0|           1|
|          Fengshan|         1|           0|
|A Corua (La Corua)|         1|           0|
|           El Alto|         1|           0|
|              Linz|         1|           0|
|         

In [16]:
# query 7
df_rental = spark.read.jdbc(url=url, table="rental", properties=properties)
df_customer = spark.read.jdbc(url=url, table="customer", properties=properties)
df_address = spark.read.jdbc(url=url, table="address", properties=properties)
df_city = spark.read.jdbc(url=url, table="city", properties=properties)
df_inventory = spark.read.jdbc(url=url, table="inventory", properties=properties)
df_film = spark.read.jdbc(url=url, table="film", properties=properties)
df_film_category = spark.read.jdbc(url=url, table="film_category", properties=properties)
df_category = spark.read.jdbc(url=url, table="category", properties=properties)
df_join = df_rental.join(df_customer, "customer_id" \
                        ).join(df_address, "address_id" \
                              ).join(df_city, "city_id" \
                                ).join(df_inventory, "inventory_id" \
                                      ).join(df_film, "film_id" \
                                            ).join(df_film_category, "film_id" \
                                                  ).join(df_category, "category_id")
df_hours = df_join.withColumn("diff_hours", 
    (F.unix_timestamp("return_date") - F.unix_timestamp("rental_date")) / 3600
)

df_group = df_hours.groupBy("city", "name").agg(F.sum("diff_hours").alias("hours"))
df_filter = df_group.filter(F.col("city").ilike("A%") | F.col("city").ilike("%-%"))

window = Window.partitionBy("city").orderBy(F.desc("hours"))
df_window = df_filter.withColumn("rank", F.dense_rank().over(window))
df_filter = df_window.filter(F.col("rank") == 1)
df_rename = df_filter.withColumnRenamed("name", "category")
df_rename.show()
                            

+--------------------+--------+------------------+----+
|                city|category|             hours|rank|
+--------------------+--------+------------------+----+
|  A Corua (La Corua)|  Sci-Fi| 931.9166666666666|   1|
|                Abha|  Action|1184.9166666666667|   1|
|           Abu Dhabi|  Family| 920.2166666666667|   1|
|                Acua|  Action|1015.3333333333334|   1|
|               Adana|Classics|1012.8666666666667|   1|
|         Addis Abeba|   Music| 904.9666666666666|   1|
|                Aden|  Sports|1121.3166666666668|   1|
|               Adoni|  Comedy| 891.4000000000001|   1|
|          Ahmadnagar|  Family| 943.2666666666667|   1|
|            Akishima|Children|1430.7666666666667|   1|
|               Akron|  Family|1035.8333333333333|   1|
|         Alessandria|Children| 756.3666666666666|   1|
|Allappuzha (Allep...|Classics|             846.4|   1|
|             Allende|Classics|            1004.2|   1|
|     Almirante Brown| Foreign|1054.833333333333

In [64]:
spark.stop()