In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import *


In [30]:
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder\
    .config('spark.driver.extraClassPath', '/home/pikarchu/drivers/postgresql-42.3.1.jar')\
    .master('local')\
    .appName('homework_6')\
    .getOrCreate()

In [3]:
spark

In [4]:
pg_url = 'jdbc:postgresql://192.168.0.110:5432/postgres'
pg_creds = {'user' : 'pguser', 'password' : 'secret'}

In [7]:
#First tasks
tb_film_category = spark.read.jdbc(pg_url, table='film_category', properties=pg_creds)
tb_category = spark.read.jdbc(pg_url, table='category', properties=pg_creds)

In [9]:
#tb_category.head(5)

In [31]:
first_result = tb_film_category.join(tb_category , tb_film_category.category_id == tb_category.category_id, 'inner')\
    .groupby(tb_category.name)\
    .count()\
    .orderBy(desc('count'))

In [32]:
first_result.show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



In [33]:
#Second Query

In [34]:
tb_actor = spark.read.jdbc(pg_url, table='actor', properties=pg_creds)
tb_film_actor = spark.read.jdbc(pg_url, table='film_actor', properties=pg_creds)
tb_inventory = spark.read.jdbc(pg_url, table='inventory', properties=pg_creds)
tb_rental = spark.read.jdbc(pg_url, table='rental', properties=pg_creds)

In [38]:
second_result = tb_actor.join(tb_film_actor, tb_film_actor.actor_id == tb_actor.actor_id , 'inner')\
        .join(tb_inventory, tb_inventory.film_id == tb_film_actor.film_id , 'inner')\
        .join(tb_rental, tb_rental.inventory_id ==    tb_inventory.inventory_id   , 'inner')\
        .select(concat(col('first_name'), lit(' '), col('last_name')).alias('Actor_Name'))\
        .groupby('Actor_Name')\
        .count()\
        .orderBy(desc('count'))

In [40]:
second_result.show(10)

+------------------+-----+
|        Actor_Name|count|
+------------------+-----+
|       SUSAN DAVIS|  825|
|    GINA DEGENERES|  753|
|    MATTHEW CARREY|  678|
|       MARY KEITEL|  674|
|ANGELA WITHERSPOON|  654|
|       WALTER TORN|  640|
|       HENRY BERRY|  612|
|       JAYNE NOLTE|  611|
|        VAL BOLGER|  605|
|     SANDRA KILMER|  604|
+------------------+-----+
only showing top 10 rows



In [60]:
#Third Query

In [41]:
tb_payment =spark.read.jdbc(pg_url, table='payment', properties=pg_creds)

In [56]:
third_result= tb_category.join(tb_film_category,tb_film_category.category_id == tb_category.category_id , 'inner' )\
        .join(tb_inventory,tb_inventory.film_id  == tb_film_category.film_id  , 'inner' )\
        .join(tb_rental,tb_rental.inventory_id  == tb_inventory.inventory_id  , 'inner' )\
        .join(tb_payment,tb_payment.rental_id   == tb_rental.rental_id   , 'inner' )\
        .groupby(tb_category.name)\
        .agg(sum(tb_payment.amount))\
        .orderBy(desc('sum(amount)'))


In [59]:
third_result.show(1)

+------+-----------+
|  name|sum(amount)|
+------+-----------+
|Sports|    5314.21|
+------+-----------+
only showing top 1 row



In [61]:
#Fourth Query

In [62]:
tb_film = spark.read.jdbc(pg_url, table='film', properties=pg_creds)

In [71]:
fourth_query = tb_film.join(tb_inventory, tb_inventory.film_id == tb_film.film_id, 'left_anti')

In [73]:
fourth_query.select('title').show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [72]:
fourth_query.count()

42

In [74]:
#fifth Query

In [88]:
windowSpec  = Window.partitionBy().orderBy(desc("count"))

In [91]:
fifth_query = tb_actor.join(tb_film_actor, tb_film_actor.actor_id == tb_actor.actor_id, 'inner')\
.join(tb_film_category, tb_film_category.film_id==tb_film_actor.film_id , 'inner')\
.join(tb_category, [tb_category.category_id ==tb_film_category.category_id , tb_category.name == 'Children'] , 'inner')\
.select(concat(col('first_name'), lit(' '), col('last_name')).alias('Actor_Name'))\
.groupby('Actor_Name')\
.count()\
.withColumn("rank",rank().over(windowSpec)) \
.where(col('rank') <= 3)




In [92]:
fifth_query.show()

+-------------+-----+----+
|   Actor_Name|count|rank|
+-------------+-----+----+
| HELEN VOIGHT|    7|   1|
|  SUSAN DAVIS|    6|   2|
|   MARY TANDY|    5|   3|
|   RALPH CRUZ|    5|   3|
|  WHOOPI HURT|    5|   3|
|KEVIN GARLAND|    5|   3|
+-------------+-----+----+



In [93]:
#Sixth Query

In [96]:
tb_customer = spark.read.jdbc(pg_url, table='customer', properties=pg_creds)
tb_address = spark.read.jdbc(pg_url, table='address', properties=pg_creds)
tb_city = spark.read.jdbc(pg_url, table='city', properties=pg_creds)

In [104]:
Sixth_query = tb_city.join(tb_address, tb_address.city_id == tb_city.city_id, 'inner')\
    .join(tb_customer, tb_customer.address_id ==tb_address.address_id, 'inner')\
    .withColumn("not active", when(tb_customer.active == 1, 0).otherwise(1))\
    .groupby('city')\
    .agg({'active':'sum', 'not active':'sum'})\
    .sort(desc('sum(not active)'))


In [105]:
Sixth_query.show()

+------------------+-----------+---------------+
|              city|sum(active)|sum(not active)|
+------------------+-----------+---------------+
|         Pingxiang|          0|              1|
|       Szkesfehrvr|          0|              1|
|  Charlotte Amalie|          0|              1|
|         Najafabad|          0|              1|
|           Wroclaw|          0|              1|
|            Ktahya|          0|              1|
|           Bat Yam|          0|              1|
|   Southend-on-Sea|          0|              1|
|            Amroha|          0|              1|
|            Kamyin|          0|              1|
|          Xiangfan|          0|              1|
|            Daxian|          0|              1|
|          Uluberia|          0|              1|
|     Coatzacoalcos|          0|              1|
|        Kumbakonam|          0|              1|
|A Corua (La Corua)|          1|              0|
|          Fengshan|          1|              0|
|          Chisinau|

In [106]:
#seventh Query

In [129]:
windowSpec = Window.partitionBy(["city"]).orderBy(desc("sum(rental_duration)"))

In [130]:
seventh_query_1 = tb_film_category.join(tb_category, tb_film_category.category_id == tb_category.category_id, 'inner')\
    .join(tb_film, tb_film_category.film_id ==tb_film.film_id, 'inner')\
    .join(tb_inventory, tb_film.film_id ==tb_inventory.film_id, 'inner')\
    .join(tb_rental, tb_inventory.inventory_id ==tb_rental.inventory_id, 'inner')\
    .join(tb_customer, tb_rental.customer_id ==tb_customer.customer_id, 'inner')\
    .join(tb_address, tb_customer.address_id ==tb_address.address_id, 'inner')\
    .join(tb_city, [tb_address.city_id ==tb_city.city_id, ], 'inner')\
    .where(lower(tb_city.city).like('a%'))\
    .groupby([tb_city.city,tb_category.name])\
    .agg({'rental_duration':'sum'})\
    .withColumn("rnm", row_number().over(windowSpec))\
    .where(col('rnm') == 1)


In [134]:
seventh_query_2 = tb_film_category.join(tb_category, tb_film_category.category_id == tb_category.category_id, 'inner')\
    .join(tb_film, tb_film_category.film_id ==tb_film.film_id, 'inner')\
    .join(tb_inventory, tb_film.film_id ==tb_inventory.film_id, 'inner')\
    .join(tb_rental, tb_inventory.inventory_id ==tb_rental.inventory_id, 'inner')\
    .join(tb_customer, tb_rental.customer_id ==tb_customer.customer_id, 'inner')\
    .join(tb_address, tb_customer.address_id ==tb_address.address_id, 'inner')\
    .join(tb_city, [tb_address.city_id ==tb_city.city_id, ], 'inner')\
    .where((lower(tb_city.city).like('a%')) | (tb_city.city.like("%-%")))\
    .groupby([tb_city.city,tb_category.name])\
    .agg({'rental_duration':'sum'})\
    .withColumn("rnm", row_number().over(windowSpec))\
    .where(col('rnm') == 1)

In [135]:
seventh_query_1.show()

+--------------------+-----------+--------------------+---+
|                city|       name|sum(rental_duration)|rnm|
+--------------------+-----------+--------------------+---+
|  A Corua (La Corua)|     Comedy|                  20|  1|
|                Abha|     Sci-Fi|                  24|  1|
|           Abu Dhabi|     Sci-Fi|                  21|  1|
|                Acua|     Family|                  17|  1|
|               Adana|Documentary|                  18|  1|
|         Addis Abeba|  Animation|                  15|  1|
|                Aden|        New|                  27|  1|
|               Adoni|     Horror|                  17|  1|
|          Ahmadnagar|     Sci-Fi|                  25|  1|
|            Akishima|   Children|                  33|  1|
|               Akron|     Horror|                  21|  1|
|         Alessandria|  Animation|                  13|  1|
|Allappuzha (Allep...|      Games|                  20|  1|
|             Allende|     Travel|      