# Before start

In [2]:
import findspark

findspark.init()

In [3]:
from pyspark.sql import SparkSession

# local[1] means start locally with 1 core
spark = SparkSession.builder \
    .config("spark.jars", "/usr/local/postgresql-42.5.2.jar") \
    .master("local[1]") \
    .appName("spark_framework") \
    .getOrCreate()

spark.sparkContext.setLogLevel(logLevel="ERROR")

23/02/09 14:12:17 WARN Utils: Your hostname, katerina-ubuntu resolves to a loopback address: 127.0.1.1; using 10.202.48.114 instead (on interface wlp2s0)
23/02/09 14:12:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/02/09 14:12:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
import os
from dotenv import load_dotenv

load_dotenv(".env")

postgresql_user = os.getenv('POSTGRESQL_USER')
postgresql_password = os.getenv('POSTGRESQL_PASSWORD')
spark_home = os.getenv('SPARK_HOME')
postgres_port = os.getenv('CONTAINER_PORT')

In [5]:
jdbc_connector = spark.read \
    .format('jdbc') \
    .option('driver', 'org.postgresql.Driver') \
    .option('url', f'jdbc:postgresql://localhost:{postgres_port}/postgres') \
    .option('user', postgresql_user) \
    .option('password', postgresql_password)

In [6]:
def load_tables(tables_names: list[str]):
    tables = {}
    for name in tables_names:
        tables[name] = jdbc_connector.option('dbtable', name).load()
    return tables

In [24]:
tables_names = ['actor', 'film', 'category',
                'film_category', 'film_actor',
                'inventory', 'customer',
                'address', 'city',
                'rental']
tables = load_tables(tables_names)

# Вывести количество фильмов в каждой категории, отсортировать по убыванию.

In [25]:
from pyspark.sql.functions import count, col

result = tables['film_category'] \
    .join(tables['category'], on='category_id', how='left') \
    .groupby('category_id', 'name') \
    .agg(count('film_id').alias('number_of_films')) \
    .select('category_id', 'name', 'number_of_films') \
    .orderBy(col('number_of_films').desc())

In [26]:
result.show()

+-----------+-----------+---------------+
|category_id|       name|number_of_films|
+-----------+-----------+---------------+
|         15|     Sports|             74|
|          9|    Foreign|             73|
|          8|     Family|             69|
|          6|Documentary|             68|
|          2|  Animation|             66|
|          1|     Action|             64|
|         13|        New|             63|
|          7|      Drama|             62|
|         10|      Games|             61|
|         14|     Sci-Fi|             61|
|          3|   Children|             60|
|          5|     Comedy|             58|
|         16|     Travel|             57|
|          4|   Classics|             57|
|         11|     Horror|             56|
|         12|      Music|             51|
+-----------+-----------+---------------+



# Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [27]:
# кто ж знал что будет перекликаться с питоновским sum()..
from pyspark.sql.functions import sum as _sum

result = tables['film']\
    .join(tables['film_actor'], on='film_id', how='left')\
    .join(tables['actor'], on='actor_id')\
    .groupby('actor_id', 'first_name', 'last_name')\
    .agg(_sum('rental_duration').alias('rental_duration'))\
    .select('first_name', 'last_name', 'rental_duration')\
    .orderBy(col('rental_duration').desc())

In [28]:
result.show()

+----------+-----------+---------------+
|first_name|  last_name|rental_duration|
+----------+-----------+---------------+
|      GINA|  DEGENERES|            209|
|    WALTER|       TORN|            201|
|      MARY|     KEITEL|            192|
|   MATTHEW|     CARREY|            190|
|    ANGELA|     HUDSON|            183|
|   GROUCHO|      DUNST|            183|
|    SANDRA|     KILMER|            181|
|     HENRY|      BERRY|            180|
|       UMA|       WOOD|            179|
|    WARREN|    JACKMAN|            178|
|   NATALIE|    HOPKINS|            174|
|    ANGELA|WITHERSPOON|            174|
|       VAL|     BOLGER|            173|
|     JULIA|    MCQUEEN|            172|
|      MARY|      TANDY|            172|
|    SIDNEY|      CROWE|            172|
|    VIVIEN|   BASINGER|            172|
|   RUSSELL|     TEMPLE|            171|
|      SEAN|    GUINESS|            171|
|    HARVEY|       HOPE|            169|
+----------+-----------+---------------+
only showing top

# Вывести категорию фильмов, на которую потратили больше всего денег.
А как читать из view?

In [29]:
result = tables['category'] \
    .join(tables['film_category'], on='category_id') \
    .join(tables['film'], on='film_id') \
    .groupby('category_id', 'name') \
    .agg(_sum('replacement_cost').alias('sum_of_replacement_cost')) \
    .select('category_id', 'name', 'sum_of_replacement_cost') \
    .orderBy(col('sum_of_replacement_cost').desc()) \
    .limit(1)

In [30]:
result.show()

+-----------+------+-----------------------+
|category_id|  name|sum_of_replacement_cost|
+-----------+------+-----------------------+
|         15|Sports|                1509.26|
+-----------+------+-----------------------+



 # Вывести названия фильмов, которых нет в inventory.

In [31]:
result = tables['film']\
    .join(tables['inventory'], on='film_id', how='leftanti')\
    .select('film_id', 'title')

In [32]:
result.show()

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    108|       BUTCH PANTHER|
|    950|        VOLUME HOUSE|
|    642|      ORDER BETRAYED|
|    874|        TADPOLE PARK|
|    497|    KILL BROTHERHOOD|
|    332|FRANKENSTEIN STRA...|
|    192|    CROSSING DIVORCE|
|    860|    SUICIDES SILENCE|
|    128|       CATCH AMISTAD|
|    671|     PERDITION FARGO|
|    325|       FLOATS GARDEN|
|    386|           GUMP DATE|
|    955|        WALLS ARTIST|
|    359|  GLADIATOR WESTWARD|
|    419|         HOCUS FRIDA|
|     41|ARSENIC INDEPENDENCE|
|    607|         MUPPET MILE|
|    318|   FIREHOUSE VIETNAM|
|    742|       ROOF CHAMPION|
+-------+--------------------+
only showing top 20 rows



# Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех..

In [33]:
children_movies = tables['category']\
    .filter(col('name') == 'Children')\
    .join(tables['film_category'], on='category_id')\
    .join(tables['film'], on='film_id')\
    .select('film_id')

In [34]:
result = children_movies\
    .join(tables['film_actor'], on='film_id')\
    .groupby('actor_id')\
    .agg(count('film_id').alias('film_count'))\
    .orderBy(col('film_count').desc())\
    .join(tables['actor'], on='actor_id')\
    .select('actor_id', 'first_name', 'last_name', 'film_count')\
    .limit(3)

In [35]:
result.show()

+--------+----------+---------+----------+
|actor_id|first_name|last_name|film_count|
+--------+----------+---------+----------+
|     148|     EMILY|      DEE|         3|
|     137|    MORGAN| WILLIAMS|         2|
|      85|    MINNIE|ZELLWEGER|         1|
+--------+----------+---------+----------+



# Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

In [36]:
from pyspark.sql.functions import when

count_filter = lambda condition: _sum(when(condition, 1).otherwise(0))
result = tables['address']\
    .join(tables['city'], on='city_id')\
    .join(tables['customer'], on='address_id')\
    .groupby('city')\
    .agg(count_filter(col('activebool') == True).alias('num_of_active_customers'),
         count_filter(col('activebool') == False).alias('num_of_inactive_customers')) \
    .orderBy(col('num_of_active_customers').desc())\
    .select('city', 'num_of_active_customers', 'num_of_inactive_customers')

In [37]:
result.show()

+------------------+-----------------------+-------------------------+
|              city|num_of_active_customers|num_of_inactive_customers|
+------------------+-----------------------+-------------------------+
|            London|                      2|                        0|
|            Aurora|                      2|                        0|
|A Corua (La Corua)|                      1|                        0|
|          Fengshan|                      1|                        0|
|              Linz|                      1|                        0|
|          Myingyan|                      1|                        0|
|          Chisinau|                      1|                        0|
|       Sultanbeyli|                      1|                        0|
|           Udaipur|                      1|                        0|
|         Mit Ghamr|                      1|                        0|
|           El Alto|                      1|                        0|
|     

# Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”.

In [54]:
from pyspark.sql.functions import coalesce, current_date, datediff

result = tables['city']\
    .filter(col('city').like('%A') | col('city').like('%-%'))\
    .join(tables['address'], on='city_id')\
    .join(tables['customer'], on='address_id')\
    .join(tables['rental'], on='customer_id')\
    .select('inventory_id', datediff(coalesce(col('return_date'), current_date()), col('rental_date')).alias('rental_duration'))\
    .join(tables['inventory'], on='inventory_id')\
    .groupby('film_id')\
    .agg(_sum(col('rental_duration')).alias('sum_rental_duration'))\
    .join(tables['film_category'], on='film_id')\
    .groupby('category_id')\
    .agg(_sum(col('sum_rental_duration')).alias('sum_rental_duration'))\
    .join(tables['category'], on='category_id')\
    .orderBy(col('sum_rental_duration').desc())\
    .limit(1)

In [55]:
result.show()

                                                                                

+-----------+-------------------+------+-------------------+
|category_id|sum_rental_duration|  name|        last_update|
+-----------+-------------------+------+-------------------+
|         15|                613|Sports|2022-02-15 12:46:27|
+-----------+-------------------+------+-------------------+

