In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine, text, inspect

Load in the CSV data file to convert it to Postgres Table

In [2]:
# file path for data and confirmation of data load
csv_path = 'data/actor-films.csv'

df = pd.read_csv(csv_path)
print(f"Loaded {len(df)} rows from CSV.")

Loaded 169770 rows from CSV.


In [3]:
# create some custom functions for executing sql statements with sql alchemy or pandas

def query_sql(sql_engine, sql_statement):
     """Executes standard sql query with sql alchemy engine and sql command"""
     return pd.read_sql_query(sql_statement, sql_engine)

def execute_sql(sql_engine, sql_statement, message="Execution Succesful!"):
    """Executes sql action with sql alchemy engine and sql command (optional command success message)"""
    with sql_engine.connect() as connection:
        results = connection.execute(text(sql_statement))
        transaction = connection.commit()
        if transaction:
            print(message)
            transaction.close()
            print(f"Connection is closed: {transaction.closed}")
        if transaction is None:
            print(message)
            results.close()
            print(f"Connection is closed: {results.closed}")

def inspect_database(sql_engine):
    """Inspects and prints active table names"""
    database_inspector = inspect(sql_engine)
    tables = database_inspector.get_table_names()
    print("Tables in the database:", tables)


def check_connection(sql_engine):
    """Checks Postgres connection and prints confirmation message"""
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version();"))
        version = result.fetchone()
        print("Connected to:", version[0])

In [4]:
# Load .env file and get username and password
load_dotenv()

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")

In [5]:
# Postgres connection details from .env file
username = DB_USER
password = DB_PASSWORD
host = DB_HOST
port = DB_PORT
database = DB_NAME

# Create the SQLalchemy engine using credentials
engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{database}')
check_connection(engine)

Connected to: PostgreSQL 14.15 (Debian 14.15-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit


In [6]:
# Check the database tables (in case of re-running this notebook)
inspect_database(engine)

Tables in the database: ['actors_scd', 'actor_films', 'actors']


In [7]:
# Drop table and types if they already exist
drop_data="""
DROP TABLE IF EXISTS actors_scd;
DROP TABLE IF EXISTS actor_films;
DROP TABLE IF EXISTS actors;
DROP TYPE IF EXISTS films;
DROP TYPE IF EXISTS quality_class;
"""
execute_sql(engine, drop_data)

Execution Succesful!
Connection is closed: True


In [8]:
inspect_database(engine)

Tables in the database: []


In [9]:
# Write DataFrame to Postgres table named 'actor_films'
df.to_sql('actor_films', engine, if_exists='replace', index=False)
print("Data loaded into Postgres successfully!")

Data loaded into Postgres successfully!


In [10]:
# Query back data from Postgres to test
query_sql(engine, "SELECT * FROM actor_films WHERE actor = 'Hugh Jackman' LIMIT 3;")

Unnamed: 0,actor,actorid,film,year,votes,rating,filmid
0,Hugh Jackman,nm0413168,Bad Education,2019,33654,7.1,tt8206668
1,Hugh Jackman,nm0413168,Missing Link,2019,22582,6.7,tt6348138
2,Hugh Jackman,nm0413168,The Front Runner,2018,11768,6.1,tt7074886


In [11]:
# SQL statements for creating two custom types in Postgres: quality_class as ENUM type, 
#                                                           films as Struct (Custom, user-defined Type)
# SQL statement for creating new table new 'actors' table
create_sql_types = """
-- Create two user-defined types: 'quality_class' as Enum and 'films' as custom type
CREATE TYPE quality_class AS ENUM ('star', 'good', 'average', 'bad');

CREATE TYPE films AS (
    film TEXT,
    votes INTEGER,
    rating REAL,
    year INTEGER,
    filmid TEXT
);
"""

create_sql_table = """
-- Create the 'actors table' to be constructed from the 'actors_film table'
CREATE TABLE actors (
    actor TEXT,
    actorid TEXT,
    quality_class quality_class,
    is_active BOOL,
    current_year SMALLINT,
    films films[],
    PRIMARY KEY(actorid, current_year)
);
"""
execute_sql(engine, create_sql_types, "Type 'quality_class' and 'films' Successfully created!")
execute_sql(engine, create_sql_table, "Table 'actors' Successfully created!")

Type 'quality_class' and 'films' Successfully created!
Connection is closed: True
Table 'actors' Successfully created!
Connection is closed: True


In [14]:
# Populate the 'actors' table using cumulative script with DO-block, anonymous code block function and FOR LOOP
populate_actors_table = """
-- Script STARTS for constructing the 'actors table'
-- Initiate variable 'start_year' as 1969 (one year before data starts in 1970 or MIN(year)) as starting value of FOR LOOP 
-- Initiate variable 'end_year' as 2021 (final year in data or MAX(year)) as ending value of FOR LOOP
-- Initiate variable 'current_year'
DO $$ 
DECLARE  
    start_year SMALLINT := 1969;  
    end_year SMALLINT := 2021;  
    current SMALLINT;  
BEGIN 
-- Start FOR LOOP from first year in the table to the final year - 1
FOR current IN start_year..end_year-1 LOOP 
    -- Insert into the new table data cumulatively created with FOR LOOP
    INSERT INTO actors
    -- CTE of previous year's data, starting from the start year 
    -- This table will initiate as empty table, since cumulative data starts from one year before data collection
    -- Culumative table script always joins the previous_year to the current_year
    WITH previous_year AS (
        SELECT * FROM actors
        WHERE current_year = current
    ), 
    -- CTE of current year's data, starting from the start year + 1
    current_year AS (
        SELECT actor,
               actorid,
               -- Aggregated array of composite data struct that stores data in array of 'film' type stucts for all films for the actor from the year
               ARRAY_AGG(
                   -- Composite array data struct of type 'film' that compresses each film from an actor into a single json-like data structure
                   ARRAY[ROW(
                       film,
                       votes,
                       rating,
                       year,
                       filmid
                   )::films] 
               ORDER BY rating DESC
               ) AS films,
               year AS current_year
        FROM actor_films
        WHERE year = current + 1
        GROUP BY actor, actorid, year
    ), 
    combined_years AS (
        -- Culumative table join for current and previous year that coalesces null data (key for first step)
        SELECT COALESCE(cy.actor, py.actor) AS actor,
               COALESCE(cy.actorid, py.actorid) AS actorid,    
               -- Boolean Tag if the actor had any films in the current year
               CASE
                 WHEN cy.current_year IS NOT NULL THEN TRUE
                 ELSE FALSE
               END AS is_active,
               -- Add one year to previous_year current year to get proper coalesce of combined current_year when actor is not active (cy.current_year is null)
               COALESCE(cy.current_year, py.current_year +1) AS current_year,
               -- Update the films array by setting films to the current year films when previous year is null (i.e. a new entry)
               -- or set combined the arrays of previous year films to the current year films for active actor
               -- or carry over previous year film array when actor becomes not active
               CASE 
                 WHEN py.films IS NULL THEN cy.films
                 WHEN cy.current_year IS NOT NULL THEN py.films || cy.films
                 ELSE py.films
               END        
        FROM current_year as cy
        FULL OUTER JOIN previous_year as py
        ON cy.actorid = py.actorid
    )
    -- Main statement of combined tables to commit to table from each culumative year
    SELECT 
        cy.actor,
        cy.actorid,
        -- Use Enum quality_class type stuct data to set the quality of actor based on overall flim rating
        CASE 
            WHEN avg_rating > 8 THEN 'star'
            WHEN avg_rating > 7 THEN 'good'
            WHEN avg_rating > 6 THEN 'average'
            ELSE 'bad'
        END::quality_class AS quality_class,
        cy.is_active,
        cy.current_year,
        cy.films
    -- Self-select FROM statement that uses the average film rating from all films up to the current year to determine quality class rating
    FROM (
        SELECT 
            actor,
            actorid,
            is_active,
            current_year,
            films,
            (SELECT AVG((f).rating) 
	         FROM unnest(films) AS f) AS avg_rating
        FROM combined_years
      ) AS cy;
END LOOP;  
END $$;
-- Script ENDS for constructing the 'actors table'
"""

execute_sql(engine, populate_actors_table)

Execution Succesful!
Connection is closed: True


In [15]:
# Testing the 'actors' table
query_sql(engine, "SELECT * FROM actors WHERE actor='Ralph Fiennes' ORDER BY current_year;")

Unnamed: 0,actor,actorid,quality_class,is_active,current_year,films
0,Ralph Fiennes,nm0000146,average,True,1992,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
1,Ralph Fiennes,nm0000146,good,True,1993,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
2,Ralph Fiennes,nm0000146,good,True,1994,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
3,Ralph Fiennes,nm0000146,good,True,1995,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
4,Ralph Fiennes,nm0000146,good,True,1996,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
5,Ralph Fiennes,nm0000146,good,True,1997,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
6,Ralph Fiennes,nm0000146,average,True,1998,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
7,Ralph Fiennes,nm0000146,average,True,1999,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
8,Ralph Fiennes,nm0000146,average,True,2000,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."
9,Ralph Fiennes,nm0000146,average,False,2001,"{{""(\""Wuthering Heights\"",12384,6.8,1992,tt010..."


In [16]:
create_scd_actors = """
--Create 'actors_scd table' with DDL
CREATE TABLE actors_scd
(
	actor text,
    actorid text, 
	quality_class quality_class,
	is_active boolean,
	start_date SMALLINT,
	end_date SMALLINT,
	current_year SMALLINT,
	PRIMARY KEY(actorid, start_date)
);
"""
execute_sql(engine, create_scd_actors, "Table 'actors_scd' Successfully created!")

Table 'actors_scd' Successfully created!
Connection is closed: True


In [17]:
populate_actors_scd_table = """
-- Script STARTS for populating 'actors_scd table'
INSERT INTO actors_scd
-- CTE that idenifies any change in quality class or active status
WITH actor_changes AS (
SELECT 
       actor,
       actorid,
	   current_year,
	   quality_class,
	   is_active,
       -- Use LAG to create boolean tag for status change in 'quality_class' or 'is_active dimensions'
       -- If LAG of 1 year differs from current_year, tagged as a changed or True (the previous and current years are not equal) 
       -- If they are the same, tagged as not changed or False
       -- Null values also tagged as True, for first time entries
       LAG(quality_class, 1) OVER(PARTITION BY actorid ORDER BY current_year) <> quality_class
	   OR
	   LAG(quality_class, 1) OVER (PARTITION BY actorid ORDER BY current_year) IS NULL
	   OR
	   LAG(is_active, 1) OVER(PARTITION BY actorid ORDER BY current_year) <> is_active
	   OR
	   LAG(is_active, 1) OVER (PARTITION BY actorid ORDER BY current_year) IS NULL AS changed
	 FROM actors
WHERE current_year <= 2020 
), 
-- CTE that marks each year with a change or no change. 
--- Addition of one on SUMS from year to year indicates a change, Equality of SUMS from year to year indicates no change
change_indicator AS (
SELECT 
       actor,
       actorid,
	   current_year,
	   quality_class,
	   is_active,
       -- Sum over the fully grouped partition by current_year and adds to the sum when the changed tag is True
       -- Year by year, the sum does not increase when all the group by elements are the same
	   SUM(CASE WHEN changed THEN 1 ELSE 0 END)
        OVER (PARTITION BY actorid ORDER BY current_year) as change_identifier
FROM actor_changes
GROUP BY actor, actorid, current_year, quality_class, is_active, changed
)
-- Creates the proper record with start and end date of the change as data is aggregated by the change indicator
-- All years that have the same sum in the change_indentifier means the status of quality_class or is_active did not change
-- MIN and MAX over this grouped partition gives the bounds for the SCD
SELECT 
      actor,
	  actorid,
	  quality_class, 
	  is_active,
	  MIN(current_year) AS start_date,
	  MAX(current_year) AS end_date,
	  2020 AS current_year
FROM change_indicator
GROUP BY actor, actorid, quality_class, is_active, change_identifier
ORDER BY actor, actorid, change_identifier;
-- Script ENDS for populating 'actors_scd table'
"""
execute_sql(engine, populate_actors_scd_table, "Table 'actors_scd' successfully populated!")

Table 'actors_scd' successfully populated!
Connection is closed: True


In [18]:
query_sql(engine, "SELECT * FROM actors_scd WHERE actor='Ralph Fiennes';")

Unnamed: 0,actor,actorid,quality_class,is_active,start_date,end_date,current_year
0,Ralph Fiennes,nm0000146,average,True,1992,1992,2020
1,Ralph Fiennes,nm0000146,good,True,1993,1997,2020
2,Ralph Fiennes,nm0000146,average,True,1998,2000,2020
3,Ralph Fiennes,nm0000146,average,False,2001,2001,2020
4,Ralph Fiennes,nm0000146,average,True,2002,2002,2020
5,Ralph Fiennes,nm0000146,average,False,2003,2004,2020
6,Ralph Fiennes,nm0000146,average,True,2005,2008,2020
7,Ralph Fiennes,nm0000146,average,False,2009,2009,2020
8,Ralph Fiennes,nm0000146,average,True,2010,2020,2020


In [None]:
# Back up SDC table as .csv file
table = pd.read_sql_table('actors_scd', engine)
table.to_csv('data/actors_scd.csv')