In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import psycopg2
from hdfs import InsecureClient
import os
from datetime import datetime

In [2]:
# params
# pg config
pg_config = {'host': '127.0.0.1'\
    , 'port': '5432'\
    , 'database': 'pagila'\
    , 'user': 'pguser'\
    , 'password': 'secret'\
}

# common config
config = {
    'outputDir': '/bronze/pagila',\
    'hdfs_url': 'http://127.0.0.1:50070/',\
    'hdfs_user': 'user',\
    'path_to_driver': '/home/user/drivers/postgresql-42.2.23.jar',\
    'tables_to_extract': (\
        'actor',\
        'actor_info',\
        'address',\
        'category',\
        'city',\
        'country',\
        'customer',\
        'customer_list',\
        'film',\
        'film_actor',\
        'film_category',\
        'film_list',\
        'inventory',\
        'language',\
        'nicer_but_slower_film_list',\
        'payment',\
        'rental',\
        'sales_by_film_category',\
        'sales_by_store',\
        'staff',\
        'staff_list',\
        'store'\
    )\
}

current_date = datetime.now().strftime("%Y-%m-%d")

In [3]:
# sessions and clients init
client = InsecureClient(config['hdfs_url'], user=config['hdfs_user'])

spark = SparkSession\
    .builder.master('local')\
    .appName('homework-06')\
    .config('spark.driver.extraClassPath'
            , config['path_to_driver'])\
    .getOrCreate()



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# extraction process
with psycopg2.connect(**pg_config) as pg_connection:
    for table_name in config['tables_to_extract']:
        with pg_connection.cursor() as cursor:
            with client.write(
                os.path.join(config['outputDir'], table_name, current_date, table_name + '.csv'),
                overwrite=True
            ) as csv_file:
                cursor.copy_expert(f"COPY (select * from {table_name}) TO STDOUT WITH HEADER CSV", csv_file)

In [5]:
# data frames definition
for table_name in config['tables_to_extract']:
    globals()[f"{table_name}_df"] = spark.read.load(os.path.join(config['outputDir'], table_name, current_date, table_name + '.csv')
        , header="true"
        , inferSchema="true"
        , format="csv")


                                                                                

In [6]:
# 1. вывести количество фильмов в каждой категории, отсортировать по убыванию
task1_df = film_category_df\
    .join(category_df,'category_id')\
    .groupBy('category_id', 'name')\
    .count()\
    .select('name','count')\
    .orderBy('count', ascending=False)

task1_df.show()



+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|     Sci-Fi|   61|
|      Games|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



                                                                                

In [7]:
#2. вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.
task2_df=rental_df\
    .join(inventory_df,'inventory_id')\
    .join(film_df, 'film_id')\
    .join(film_actor_df, 'film_id')\
    .join(actor_df, 'actor_id')\
    .groupBy('actor_id','first_name','last_name')\
    .count()\
    .select('first_name','last_name','count')\
    .orderBy('count', ascending=False)\
    .limit(10)

#     .select('actor_id','first_name','last_name')\

task2_df.show()




+----------+-----------+-----+
|first_name|  last_name|count|
+----------+-----------+-----+
|      GINA|  DEGENERES|  753|
|   MATTHEW|     CARREY|  678|
|      MARY|     KEITEL|  674|
|    ANGELA|WITHERSPOON|  654|
|    WALTER|       TORN|  640|
|     HENRY|      BERRY|  612|
|     JAYNE|      NOLTE|  611|
|       VAL|     BOLGER|  605|
|    SANDRA|     KILMER|  604|
|      SEAN|    GUINESS|  599|
+----------+-----------+-----+



                                                                                

In [8]:
# 3. вывести категорию фильмов, на которую потратили больше всего денег.
task3_df = rental_df\
    .join(inventory_df,'inventory_id')\
    .join(payment_df,'rental_id')\
    .join(film_df,'film_id')\
    .join(film_category_df,'film_id')\
    .join(category_df,'category_id')\
    .groupBy('category_id','name')\
    .agg(F.sum('amount').alias('amt'))\
    .orderBy('amt',ascending=False)\
    .select('name')\
    .limit(1)

task3_df.show()



+------+
|  name|
+------+
|Sports|
+------+





In [9]:
# 4. вывести названия фильмов, которых нет в inventory
task4_df = film_df\
    .join(inventory_df,'film_id','leftouter')\
    .where('inventory_id is null')\
    .select('title')\
    .orderBy('title')

task4_df.show(n=100000)

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
|           GUMP DATE|
|       HATE HANDICAP|
|         HOCUS FRIDA|
|    KENTUCKIAN GIANT|
|    KILL BROTHERHOOD|
|         MUPPET MILE|
|      ORDER BETRAYED|
|       PEARL DESTINY|
|     PERDITION FARGO|
|       PSYCHO SHRUNK|
|   RAIDERS ANTITRUST|
|       RAINBOW SHOCK|
|       ROOF CHAMPION|
|       SISTER FREDDY|
|         SKY MIRACLE|
|    SUICIDES SILENCE|
|        TADPOLE PARK|
|    TREASURE COMMAND|
|   VILLAIN DESPERATE|
|        VOLUME HOUSE|
|          

In [10]:
# 5. вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех.
window = Window.orderBy(F.desc('count'))

task5_df = category_df\
    .join(film_category_df,'category_id')\
    .join(film_df,'film_id')\
    .join(film_actor_df,'film_id')\
    .join(actor_df,'actor_id')\
    .where("name='Children'")\
    .groupBy('actor_id', 'first_name', 'last_name')\
    .count()\
    .withColumn('actor_rank', F.dense_rank().over(window))\
    .where('actor_rank<=3')\
    .orderBy('actor_rank')\
    .select('first_name','last_name','count','actor_rank')

task5_df.show()

21/08/12 00:16:41 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+----------+---------+-----+----------+
|first_name|last_name|count|actor_rank|
+----------+---------+-----+----------+
|     HELEN|   VOIGHT|    7|         1|
|    WHOOPI|     HURT|    5|         2|
|     RALPH|     CRUZ|    5|         2|
|     KEVIN|  GARLAND|    5|         2|
|      MARY|    TANDY|    5|         2|
|  SCARLETT|    DAMON|    4|         3|
|    SANDRA|   KILMER|    4|         3|
| SYLVESTER|     DERN|    4|         3|
|     RENEE|     BALL|    4|         3|
|   KIRSTEN|   AKROYD|    4|         3|
|     SUSAN|    DAVIS|    4|         3|
|       UMA|     WOOD|    4|         3|
| CHRISTIAN|   AKROYD|    4|         3|
|     ELLEN|  PRESLEY|    4|         3|
|      ALAN| DREYFUSS|    4|         3|
|     JAYNE|    NOLTE|    4|         3|
|      JANE|  JACKMAN|    4|         3|
|      JADA|    RYDER|    4|         3|
|       VAL|   BOLGER|    4|         3|
+----------+---------+-----+----------+



                                                                                

In [11]:
# 6. вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

task6_df = city_df\
    .join(address_df,'city_id')\
    .join(customer_df,'address_id')\
    .withColumn('inactive_attr',F.when(customer_df.active!=1,F.lit(1)).otherwise(F.lit(0)))\
    .select('city_id','city','active','inactive_attr')\
    .groupBy('city_id','city')\
    .agg(F.sum('active').alias('active_cnt'),F.sum('inactive_attr').alias('inactive_cnt'))\
    .orderBy('inactive_cnt', ascending=False)

task6_df.show(n=100000)

                                                                                

+-------+--------------------+----------+------------+
|city_id|                city|active_cnt|inactive_cnt|
+-------+--------------------+----------+------------+
|    407|           Pingxiang|         0|           1|
|    554|            Uluberia|         0|           1|
|    111|    Charlotte Amalie|         0|           1|
|    512|         Szkesfehrvr|         0|           1|
|    578|            Xiangfan|         0|           1|
|    125|       Coatzacoalcos|         0|           1|
|    281|              Ktahya|         0|           1|
|     57|             Bat Yam|         0|           1|
|     24|              Amroha|         0|           1|
|    495|     Southend-on-Sea|         0|           1|
|    283|          Kumbakonam|         0|           1|
|    139|              Daxian|         0|           1|
|    259|              Kamyin|         0|           1|
|    577|             Wroclaw|         0|           1|
|    356|           Najafabad|         0|           1|
|     74| 

In [12]:
# 7. вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. 
# То же самое сделать для городов в которых есть символ “-”.

tmp_df = category_df\
    .join(film_category_df,'category_id')\
    .join(inventory_df,'film_id')\
    .join(rental_df,'inventory_id')\
    .join(customer_df,'customer_id')\
    .join(address_df,'address_id')\
    .join(city_df,'city_id')\
    .where(rental_df.return_date.isNotNull())\
    .select('city','name',F.datediff(rental_df.return_date,rental_df.rental_date).alias('rental_time'))

window = Window.orderBy(F.desc('rtime'))

a_cities_df = tmp_df\
    .where("lower(city) like 'a%'")\
    .groupBy('name')\
    .agg(F.sum('rental_time').alias('rtime'))\
    .withColumn('category_rank',F.rank().over(window))\
    .where('category_rank=1')\
    .select(F.lit('most popular category for A% cities:').alias('title'),'name')

minus_cities_df = tmp_df\
    .where("city like '%-%'")\
    .groupBy('name')\
    .agg(F.sum('rental_time').alias('rtime'))\
    .withColumn('category_rank',F.rank().over(window))\
    .where('category_rank=1')\
    .select(F.lit('most popular category for %-% cities:').alias('title'),'name')

task7_df = a_cities_df.unionAll(minus_cities_df)

tmp_df.unpersist()
a_cities_df.unpersist()
minus_cities_df.unpersist()

task7_df.show(truncate=False)

21/08/12 00:16:47 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/08/12 00:16:47 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+-------------------------------------+-------+
|title                                |name   |
+-------------------------------------+-------+
|most popular category for A% cities: |Sports |
|most popular category for %-% cities:|Foreign|
+-------------------------------------+-------+



                                                                                