In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

### Postgres Params

In [3]:
import os
from dotenv import load_dotenv

In [4]:
load_dotenv()

JAR_PATH=os.getenv("JAR_PATH")

DB_USER = os.getenv("POSTGRES_USER")
DB_PASSWORD = os.getenv("POSTGRES_PASSWORD")
DB_HOST = os.getenv("POSTGRES_HOST")
DB_PORT = os.getenv("POSTGRES_PORT")
DB_NAME = os.getenv("POSTGRES_DB")

In [5]:
jdbc_url =f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"


In [6]:
spark = SparkSession.builder\
    .appName('pagial Analysis').config('spark.jars',JAR_PATH).getOrCreate()

25/12/03 15:37:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:

def read_table(table_name:str):
    
    try:

        df = (spark.read
            .format("jdbc")
            .option("driver", "org.postgresql.Driver")
            .option("url", jdbc_url)
            .option("dbtable", f"public.\"{table_name}\"")
            .option("user", DB_USER)
            .option("password", DB_PASSWORD)
            .load()
             )

        df = df.withColumn('_source_table', F.lit(table_name))

        return df

    except Exception as e:
        print(f"Error reading table {table_name}: {str(e)}")
        return None


In [9]:
tables = [
    "actor",
    "address",
    "category",
    "city",
    "country",
    "customer",
    "film",
    "film_actor",
    "film_category",
    "inventory",
    "language",
    "payment",
    "rental",
    "staff",
    "store"
]

dataframes = {}
for table in tables:
    dataframes[table] = read_table(table)

#### - Output the number of movies in each category, sorted in descending order. 

In [9]:
film_category_df = dataframes['film_category']\
    .join(dataframes['category'], 'category_id', 'inner')


In [11]:
number_of_movies = film_category_df.groupBy('name').agg(F.count('*').alias('movies_count')).orderBy(F.desc('movies_count'))

In [12]:
number_of_movies.show()

+-----------+------------+
|       name|movies_count|
+-----------+------------+
|      Drama|         152|
|      Music|         152|
|     Travel|         151|
|    Foreign|         150|
|      Games|         150|
|   Children|         150|
|     Action|         149|
|     Sci-Fi|         149|
|  Animation|         148|
|     Family|         147|
|   Classics|         147|
|        New|         147|
|     Sports|         145|
|Documentary|         145|
|     Comedy|         143|
|     Horror|         142|
+-----------+------------+



#### - Output the 10 actors whose movies rented the most, sorted in descending order. 

In [14]:
rental_actor_df = dataframes['rental']\
    .join(dataframes['inventory'], 'inventory_id', 'inner')\
    .join(dataframes['film_actor'], 'film_id', 'inner')\
    .join(dataframes['actor'], 'actor_id', 'inner')

actories_df = rental_actor_df.groupBy('last_name','first_name')\
    .agg(F.count('*').alias('movies_count')).orderBy(F.desc('movies_count')).limit(10)

In [15]:
actories_df.show()

+-----------+----------+------------+
|  last_name|first_name|movies_count|
+-----------+----------+------------+
|      DAVIS|     SUSAN|         825|
|  DEGENERES|      GINA|         753|
|     CARREY|   MATTHEW|         678|
|     KEITEL|      MARY|         674|
|WITHERSPOON|    ANGELA|         654|
|       TORN|    WALTER|         640|
|      BERRY|     HENRY|         612|
|      NOLTE|     JAYNE|         611|
|     BOLGER|       VAL|         605|
|     KILMER|    SANDRA|         604|
+-----------+----------+------------+



#### - Output the category of movies on which the most money was spent. 

In [16]:
payment_category = dataframes['payment'].join(dataframes['rental'], 'rental_id', 'inner')\
                    .join(dataframes['inventory'], 'inventory_id', 'inner')\
                    .join(dataframes['film_category'], 'film_id', 'inner')\
                    .join(dataframes['category'], 'category_id', 'inner').select('name', 'amount')

most_spent_categories = payment_category.groupBy('name')\
    .agg(F.sum('amount').alias('total_money_spent')).orderBy(F.desc('total_money_spent')).limit(1)

most_spent_categories.show()

+-------+-----------------+
|   name|total_money_spent|
+-------+-----------------+
|Foreign|         10507.67|
+-------+-----------------+



#### - Output the names of movies that are not in the inventory. 

In [17]:
uni_movies_df = dataframes['film'].join(dataframes['inventory'], 'film_id', 'left_anti')\
    .select('film_id', 'title')
uni_movies_df.show()

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    108|       BUTCH PANTHER|
|    950|        VOLUME HOUSE|
|    642|      ORDER BETRAYED|
|    874|        TADPOLE PARK|
|    497|    KILL BROTHERHOOD|
|    332|FRANKENSTEIN STRA...|
|    192|    CROSSING DIVORCE|
|    860|    SUICIDES SILENCE|
|    128|       CATCH AMISTAD|
|    671|     PERDITION FARGO|
|    325|       FLOATS GARDEN|
|    386|           GUMP DATE|
|    955|        WALLS ARTIST|
|    359|  GLADIATOR WESTWARD|
|    419|         HOCUS FRIDA|
|     41|ARSENIC INDEPENDENCE|
|    607|         MUPPET MILE|
|    318|   FIREHOUSE VIETNAM|
|    742|       ROOF CHAMPION|
+-------+--------------------+
only showing top 20 rows


#### - Output the top 3 actors who have appeared most in movies in the “Children” category. If several actors have the same number of movies, output all of them. 

In [19]:
movies_category_df = dataframes['category'].join(dataframes['film_category'],'category_id','inner')\
                    .join(dataframes['film_actor'], 'film_id','inner')\
                    .join(dataframes['actor'], 'actor_id','inner').filter(F.col('name') == 'Children')\
                    .select('actor_id','first_name', 'last_name')

film_count_df = movies_category_df.groupBy('actor_id','first_name','last_name')\
    .agg(F.count("*").alias("film_count"))

windowSpec = Window.orderBy(F.desc('film_count'))

rn_actors_df = film_count_df.withColumn('rn', F.dense_rank().over(windowSpec))\
    .filter(F.col('rn') <=3)
rn_actors_df.show()

25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 17:15:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/28 1

+--------+----------+---------+----------+---+
|actor_id|first_name|last_name|film_count| rn|
+--------+----------+---------+----------+---+
|     105|    SIDNEY|    CROWE|         9|  1|
|     139|      EWAN|  GOODING|         9|  1|
|     133|   RICHARD|     PENN|         9|  1|
|      87|   SPENCER|     PECK|         8|  2|
|     145|       KIM|    ALLEN|         8|  2|
|      66|      MARY|    TANDY|         8|  2|
|      29|      ALEC|    WAYNE|         8|  2|
|      56|       DAN|   HARRIS|         8|  2|
|     149|   RUSSELL|   TEMPLE|         8|  2|
|     181|   MATTHEW|   CARREY|         8|  2|
|     131|      JANE|  JACKMAN|         8|  2|
|     142|      JADA|    RYDER|         8|  2|
|      84|     JAMES|     PITT|         7|  3|
|     108|    WARREN|    NOLTE|         7|  3|
|     123|  JULIANNE|    DENCH|         7|  3|
|      34|    AUDREY|  OLIVIER|         7|  3|
|      96|      GENE|   WILLIS|         7|  3|
|      65|    ANGELA|   HUDSON|         7|  3|
|      95|   

#### - Output cities with the number of active and inactive customers (active - customer.active = 1). Sort by the number of inactive customers in descending order. 

In [18]:
dataframes['customer'].printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)
 |-- _source_table: string (nullable = false)



In [25]:

customer_status_df = dataframes['customer'].join(dataframes['address'], 'address_id', 'inner')\
                    .join(dataframes['city'], 'city_id', 'inner').select('city_id','city','active')

# i thought that will be cleaner if there is records with null values or missing

# fin_cus_status_df = customer_status_df.groupBy("city_id","city").agg(
#                 F.sum(F.when(F.col('active') == 1, 1).otherwise(0)).alias('active_count'),
#                 F.sum(F.when(F.col('active') == 0, 1).otherwise(0)).alias('inactive_count')
#             ).orderBy(F.desc('inactive_count'))

new_customer_status_df = customer_status_df.groupBy("city_id","city").agg(
        F.sum(F.col('active').cast('int')).alias('active_count'),
        (F.count(F.col('active')) - F.sum(F.col('active').cast('int'))).alias('inactive_count')
).orderBy(F.desc('inactive_count'))

new_customer_status_df.show()

+-------+----------------+------------+--------------+
|city_id|            city|active_count|inactive_count|
+-------+----------------+------------+--------------+
|    111|Charlotte Amalie|           0|             1|
|    495| Southend-on-Sea|           0|             1|
|    407|       Pingxiang|           0|             1|
|    259|          Kamyin|           0|             1|
|    554|        Uluberia|           0|             1|
|    512|     Szkesfehrvr|           0|             1|
|    125|   Coatzacoalcos|           0|             1|
|    283|      Kumbakonam|           0|             1|
|    356|       Najafabad|           0|             1|
|    578|        Xiangfan|           0|             1|
|    281|          Ktahya|           0|             1|
|     57|         Bat Yam|           0|             1|
|     24|          Amroha|           0|             1|
|    139|          Daxian|           0|             1|
|    577|         Wroclaw|           0|             1|
|    191| 

#### - Output the category of movies that have the highest number of total rental hours in the cities (customer.address_id in this city), and that start with the letter “a”. Do the same for cities with a “-” symbol.

In [10]:
category_rental_df = dataframes['rental'].withColumn(
     "hours_rented",
    F.timestamp_diff("HOUR", F.col("rental_date"), F.col("return_date")))\
                    .join(dataframes['customer'], 'customer_id', 'inner')\
                    .join(dataframes['address'], 'address_id', 'inner')\
                    .join(dataframes['city'], 'city_id', 'inner')\
                    .join(dataframes['inventory'],'inventory_id','inner')\
                    .join(dataframes['film_category'], 'film_id','inner')\
                    .join(dataframes['category'], 'category_id','inner')\
                    .select('rental_id','city', 'name','hours_rented')

total_rent_category_df = category_rental_df.groupBy('city','name').agg(F.sum('hours_rented').alias('total_hours'))

categories_start_a = total_rent_category_df.filter(F.col('name').ilike('a%'))
cities_have_dash = total_rent_category_df.filter(F.col('city').ilike("%-%"))

windowSpec_cities = Window.partitionBy('city').orderBy(F.desc('total_hours'))

rank_cities_a_df = categories_start_a.withColumn('rn', F.rank().over(windowSpec_cities)).filter(F.col('rn') == 1)\
    .orderBy(F.desc('total_hours'))
rank_cities_dash_df = cities_have_dash.withColumn('rn', F.rank().over(windowSpec_cities)).filter(F.col('rn')== 1)\
    .orderBy(F.desc('total_hours'))

fnal_ranked_df = rank_cities_a_df.withColumn('source', F.lit('starts_with_a_cities')) \
    .unionAll(rank_cities_dash_df.withColumn('source', F.lit('contains_dash_cities')))


In [11]:
fnal_ranked_df.show()

                                                                                

+--------------------+---------+-----------+---+--------------------+
|                city|     name|total_hours| rn|              source|
+--------------------+---------+-----------+---+--------------------+
|           Molodetno|   Action|       1574|  1|starts_with_a_cities|
|             Hodeida|   Action|       1501|  1|starts_with_a_cities|
|                Fuyu|Animation|       1426|  1|starts_with_a_cities|
|             Bijapur|   Action|       1389|  1|starts_with_a_cities|
|        Garden Grove|Animation|       1379|  1|starts_with_a_cities|
|              Yangor|Animation|       1328|  1|starts_with_a_cities|
|            Bhilwara|Animation|       1249|  1|starts_with_a_cities|
|       Richmond Hill|   Action|       1247|  1|starts_with_a_cities|
|            Erlangen|   Action|       1237|  1|starts_with_a_cities|
|          Santa Rosa|   Action|       1219|  1|starts_with_a_cities|
|          Cape Coral|Animation|       1216|  1|starts_with_a_cities|
|Kowloon and New K..