In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = (SparkSession.builder
         .config("spark.driver.extraClassPath", "/usr/local/postgresql-42.5.4.jar")
         .master("local")
         .appName("PySpark_Postgres_test")
         .getOrCreate()
)

In [2]:
import pyspark.pandas as ps


jbdc_url = "jdbc:postgresql://localhost:5432/postgres?user=postgres&password=123456"

df_category_ps = ps.read_sql_table("category", con=jbdc_url)

df_film_category_ps = ps.read_sql_table("film_category", con=jbdc_url)
df_film_ps = ps.read_sql_table("film", con=jbdc_url)
df_film_actor_ps = ps.read_sql_table("film_actor", con=jbdc_url)

df_actor_ps = ps.read_sql_table("actor", con=jbdc_url)

df_address_ps = ps.read_sql_table("address", con=jbdc_url)
df_city_ps = ps.read_sql_table("city", con=jbdc_url)

df_inventory_ps = ps.read_sql_table("inventory", con=jbdc_url)
df_rental_ps = ps.read_sql_table("rental", con=jbdc_url)
df_customer_ps = ps.read_sql_table("customer", con=jbdc_url)
df_payment_ps = ps.read_sql_table("payment", con=jbdc_url)



In [3]:
# task 1 

(
    df_category_ps
        .merge(df_film_category_ps, on=["category_id"],
                how="left", 
                suffixes=("_c", "_fc"))
        .rename(columns={"name": "category_name"})
        .groupby("category_name")
        .agg(films_count=("film_id", "count"))
        .sort_values("films_count", ascending=False)
)

Unnamed: 0_level_0,films_count
category_name,Unnamed: 1_level_1
Sports,74
Foreign,73
Family,69
Documentary,68
Animation,66
Action,64
New,63
Drama,62
Games,61
Sci-Fi,61


In [4]:
# task 2

(
    df_actor_ps
    .merge(df_film_actor_ps, how="inner", on="actor_id", suffixes=("_a", "_fa"))
    .merge(df_inventory_ps, how="inner", on="film_id")
    .merge(df_rental_ps, how="inner", on="inventory_id")
    .groupby(["first_name", "last_name"])
    .agg(rental_amount=("rental_id", "count"))
    .sort_values("rental_amount", ascending=False)
).head(5)

Unnamed: 0_level_0,Unnamed: 1_level_0,rental_amount
first_name,last_name,Unnamed: 2_level_1
SUSAN,DAVIS,825
GINA,DEGENERES,753
MATTHEW,CARREY,678
MARY,KEITEL,674
ANGELA,WITHERSPOON,654


In [5]:
# task 3

(
    df_category_ps
        .merge(df_film_category_ps, on=["category_id"],
                how="inner", 
                suffixes=("_c", "_fc"))
        .merge(df_inventory_ps, on=["film_id"],
                how="inner", 
                )
        .merge(df_rental_ps, on=["inventory_id"],
                how="inner", 
                )
        .merge(df_payment_ps, on=["rental_id"],
                how="inner", 
                )
        .rename(columns={"name": "category_name"})
        .groupby("category_name")
        .agg(payment_amount=("amount", "sum"))
        .nlargest(1, "payment_amount")        
        
)

Unnamed: 0_level_0,payment_amount
category_name,Unnamed: 1_level_1
Sports,5314.21


In [6]:
# task 4

merged_films_with_inventory = df_film_ps.merge(
    df_inventory_ps, how="left", on="film_id"
)

merged_films_with_inventory.loc[merged_films_with_inventory["inventory_id"].isnull()][
    "title"
]


0               CHOCOLATE DUCK
237              BUTCH PANTHER
256               VOLUME HOUSE
276             ORDER BETRAYED
324               TADPOLE PARK
347           KILL BROTHERHOOD
435      FRANKENSTEIN STRANGER
466           CROSSING DIVORCE
576           SUICIDES SILENCE
667              CATCH AMISTAD
881            PERDITION FARGO
927              FLOATS GARDEN
928                  GUMP DATE
934               WALLS ARTIST
1257        GLADIATOR WESTWARD
1548               HOCUS FRIDA
1654      ARSENIC INDEPENDENCE
1697               MUPPET MILE
1933         FIREHOUSE VIETNAM
2148             ROOF CHAMPION
2189                DAZED PUNK
2291             PEARL DESTINY
2292             RAINBOW SHOCK
2362          KENTUCKIAN GIANT
2608         BOONDOCK BALLROOM
2617      COMMANDMENTS EXPRESS
2940             HATE HANDICAP
2992             ARK RIDGEMONT
3361           CROWDS TELEMARK
3489    DELIVERANCE MULHOLLAND
3556         RAIDERS ANTITRUST
3584             SISTER FREDDY
3585    

In [7]:
# task 5

ps.set_option("compute.ops_on_diff_frames", True)

df_grouped_by_filming_amount = (
    df_film_category_ps.merge(
        (df_category_ps[(df_category_ps["name"] == "Children")]),
        on=["category_id"],
    )
    .merge(
        df_film_actor_ps,
        on=["film_id"],
    )
    .merge(df_actor_ps, on=["actor_id"], suffixes=("", "_a"))
    .groupby(["actor_id"])
    .agg(filming_amount=("actor_id", "count"))
)
df_grouped_by_filming_amount["rank"] = df_grouped_by_filming_amount[
    "filming_amount"
].rank(method="min", ascending=False)
df_grouped_by_filming_amount.loc[df_grouped_by_filming_amount["rank"] <= 3].drop(
    columns=["rank"]
).reset_index().merge(df_actor_ps, how="inner", on="actor_id")[
    ["first_name", "last_name", "filming_amount"]
].sort_values(
    "filming_amount", ascending=False
)


Unnamed: 0,first_name,last_name,filming_amount
2,HELEN,VOIGHT,7
0,WHOOPI,HURT,5
1,KEVIN,GARLAND,5
3,RALPH,CRUZ,5
4,MARY,TANDY,5


In [8]:
# task 6

df_merged_customers_with_addresess = (
df_city_ps
        .merge(df_address_ps, how="left", on=["city_id"],
                )
        .merge(df_customer_ps, how="left", on=["address_id"],
                )
)
df_merged_customers_with_addresess["is_active"] = df_merged_customers_with_addresess["active"].apply(lambda x: 1 if x == 1 else 0)
df_merged_customers_with_addresess["is_non_active"] = df_merged_customers_with_addresess["active"].apply(lambda x: 0 if x == 1 else 1)

(df_merged_customers_with_addresess
 .groupby(["city_id", "city"])
 .agg(active_users=("is_active", "sum"), non_active_users=("is_non_active", "sum"))
 .sort_values("non_active_users", ascending=False)
 .head()
)


Unnamed: 0_level_0,Unnamed: 1_level_0,active_users,non_active_users
city_id,city,Unnamed: 2_level_1,Unnamed: 3_level_1
300,Lethbridge,0,2
576,Woodridge,0,2
111,Charlotte Amalie,0,1
495,Southend-on-Sea,0,1
407,Pingxiang,0,1


In [9]:
# task 7

merged_tables = (
    df_city_ps.loc[(df_city_ps["city"].str.lower().str.contains("-"))]
    .merge(df_address_ps, on=["city_id"], suffixes=("_city", "_address"))
    .merge(df_customer_ps, on=["address_id"], suffixes=("", "_customer"))
    .merge(df_rental_ps, on=["customer_id"], suffixes=("", "_rental"))
    .merge(df_inventory_ps, on=["inventory_id"], suffixes=("", "_inventory"))
    .merge(
        df_film_ps.loc[(df_film_ps["title"].str.lower().str.startswith("a"))],
        on=["film_id"],
        suffixes=("", "_film"),
    )
    .merge(df_film_category_ps, on=["film_id"], suffixes=("", "_film_category"))
    .merge(df_category_ps, on=["category_id"], suffixes=("", "_category"))
)

merged_tables["time_diff"] = ps.to_timedelta(
    merged_tables["return_date"] - merged_tables["rental_date"], unit="seconds"
)
merged_tables = merged_tables.groupby(["name"]).agg(time_diff_sum=("time_diff", "sum"))
merged_tables["rank"] = merged_tables["time_diff_sum"].rank(
    method="min", ascending=False
)
merged_tables.loc[merged_tables["rank"] <= 1].drop(columns=["rank"]).sort_values(
    "time_diff_sum", ascending=False
)



Unnamed: 0_level_0,time_diff_sum
name,Unnamed: 1_level_1
Action,27 days 10:44:00
