In [149]:
import pyspark.sql.types as t
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.\
    builder.\
    appName("task4").\
    config("spark.jars", "postgresql-42.5.0.jar").\
    getOrCreate()

22/08/27 15:42:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [38]:
connector = spark.read\
    .format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/task3db")\
    .option("user", "postgres")\
    .option("password", "postgres")\
    .option("driver", "org.postgresql.Driver")

In [39]:
def table_reader(table_name:str):
    return connector.option("dbtable", table_name).load()

In [43]:
actor = table_reader('actor')
address = table_reader('address')
category = table_reader('category')
city = table_reader('city')
country = table_reader('country')
customer = table_reader('customer')
film  = table_reader('film')
film_actor = table_reader('film_actor')
film_category = table_reader('film_category')
inventory = table_reader('inventory')
language = table_reader('language')
payment = table_reader('payment')
rental = table_reader('rental')
staff = table_reader('staff')
store = table_reader('store')

In [52]:
#1

task1 = (
    category.join(film_category, on='category_id')
    .groupby('name')
    .count()
    .orderBy(-f.col('count').cast(t.FloatType()))
)
task1.show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



In [53]:
#2

task2 = (actor.join(film_actor, on="actor_id")
    .join(film, on="film_id")
    .withColumn("actor_name", f.concat_ws(' ', "first_name", "last_name"))
    .groupBy("actor_name")
    .agg(f.sum("rental_duration").alias("total"))
    .orderBy(-f.col("total"))
)
task2.show()

+------------------+-----+
|        actor_name|total|
+------------------+-----+
|       SUSAN DAVIS|  242|
|    GINA DEGENERES|  209|
|       WALTER TORN|  201|
|       MARY KEITEL|  192|
|    MATTHEW CARREY|  190|
|     ANGELA HUDSON|  183|
|     GROUCHO DUNST|  183|
|     SANDRA KILMER|  181|
|       HENRY BERRY|  180|
|          UMA WOOD|  179|
|    WARREN JACKMAN|  178|
|   NATALIE HOPKINS|  174|
|ANGELA WITHERSPOON|  174|
|        VAL BOLGER|  173|
|      SIDNEY CROWE|  172|
|        MARY TANDY|  172|
|   VIVIEN BASINGER|  172|
|     JULIA MCQUEEN|  172|
|      SEAN GUINESS|  171|
|    RUSSELL TEMPLE|  171|
+------------------+-----+
only showing top 20 rows



In [60]:
#3

task3 = (
    category.join(film_category, on='category_id')
    .join(film, on='film_id')
    .join(inventory, on='film_id')
    .join(rental, on='inventory_id')
    .join(payment, on='rental_id')
    .groupBy('name')
    .agg(f.sum('amount').alias('payment'))
    .orderBy(-f.col('payment'))
)
task3.show(1)

+------+-------+
|  name|payment|
+------+-------+
|Sports|5314.21|
+------+-------+
only showing top 1 row



In [69]:
#4

task4 = (
    film.join(inventory, on='film_id', how='left')
    .filter('inventory_id is null')
    .select('title')
    .orderBy('title')
)
task4.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [147]:
#5

task5_table = (
    actor
    .join(film_actor, on='actor_id')
    .join(film, on='film_id')
    .join(film_category, on='film_id')
    .join(category, on='category_id')
    .where(f.col('name') == 'Children')
    .withColumn("actor_name", f.concat_ws(' ', "first_name", "last_name"))
    .groupBy("actor_name")
    .count()
    .orderBy(-f.col('count'))
)

tk = (
    task5_table.select(f.col('count'))
    .distinct()
    .limit(3)
    .rdd.map(lambda row : row[0]).collect()
)

task5 = (task5_table.
         filter(f.col('count').
                isin(tk))
)
task5.show()

+-------------+-----+
|   actor_name|count|
+-------------+-----+
| HELEN VOIGHT|    7|
|  SUSAN DAVIS|    6|
|   MARY TANDY|    5|
|   RALPH CRUZ|    5|
|KEVIN GARLAND|    5|
|  WHOOPI HURT|    5|
+-------------+-----+



In [178]:
#6

task6 = (
    customer
    .join(address, on='address_id')
    .join(city, on='city_id')
    .groupBy('city')
    .agg(
        f.sum(f.col('active').cast(t.FloatType())).alias('active'),
        (f.count(f.col('active')) - f.sum(f.col('active'))).alias('inactive')
    )
    .orderBy(-f.col("inactive"))
)

task6.show()

+------------------+------+--------+
|              city|active|inactive|
+------------------+------+--------+
|         Pingxiang|   0.0|       1|
|       Szkesfehrvr|   0.0|       1|
|  Charlotte Amalie|   0.0|       1|
|         Najafabad|   0.0|       1|
|           Wroclaw|   0.0|       1|
|            Ktahya|   0.0|       1|
|            Amroha|   0.0|       1|
|   Southend-on-Sea|   0.0|       1|
|           Bat Yam|   0.0|       1|
|            Kamyin|   0.0|       1|
|          Xiangfan|   0.0|       1|
|            Daxian|   0.0|       1|
|          Uluberia|   0.0|       1|
|     Coatzacoalcos|   0.0|       1|
|        Kumbakonam|   0.0|       1|
|          Fengshan|   1.0|       0|
|A Corua (La Corua)|   1.0|       0|
|           El Alto|   1.0|       0|
|          Myingyan|   1.0|       0|
|              Linz|   1.0|       0|
+------------------+------+--------+
only showing top 20 rows



In [188]:
#7

task7_table = (
    city
    .join(address, on='city_id')
    .join(customer, on='address_id')
    .join(rental, on='customer_id')
    .join(inventory, on='inventory_id')
    .join(film, on='film_id')
    .join(film_category, on='film_id')
    .join(category, on='category_id')
    .filter('return_date is not null')
    .withColumn('time', f.col('return_date') - f.col('rental_date'))
    .select('name', 'time', 'city')
)

task7_1 = (task7_table
    .filter(f.col('city').like('A%') | f.col('city').like('a%'))
    .groupBy('name')
    .agg(f.sum('time').alias('total_time'))
    .orderBy(-f.col('total_time'))
    .limit(1)
)

task7_2 = (task7_table
    .filter(f.col('city').like('%-%'))
    .groupBy('name')
    .agg(f.sum('time').alias('total_time'))
    .orderBy(-f.col('total_time'))
    .limit(1)
)

task7 = task7_1.union(task7_2)
task7.show()

+-------+--------------------+
|   name|          total_time|
+-------+--------------------+
| Sports|INTERVAL '515 00:...|
|Foreign|INTERVAL '269 16:...|
+-------+--------------------+

