## Setup and Helper Functions

In [0]:
## DQ Helper Function 

import pyspark.sql.functions as F

## ANSI Colours
C_GREEN = "\033[92m"
C_RED = "\033[91m"
C_END = "\033[0m"

# NULL Checker
def null_checker(view_name, field_name):
    print(f"Checking for nulls in {view_name}.{field_name}")

    df = spark.read.table(view_name)
    null_count = df.filter(F.col(field_name).isNull()).count()

    if null_count > 0:
      print(f"{C_RED}FAILURE: Found {null_count} nulls in {field_name}{C_END}")
      # Crashes pipeline
      raise Exception("Pipeline Stopped: Null Values Detected in {field_name}")
    
    print(f"{C_GREEN}PASSED: No nulls found in {field_name}!{C_END}")

# Duplicate Checker
def duplicate_checker(view_name, field_name):
    print(f"Checking for duplicates in {view_name}.{field_name}")

    df = spark.read.table(view_name)
    # Calculate duplicate count
    total_count = df.count()
    distinct_count = df.select(field_name).distinct().count()
    duplicate_count = total_count - distinct_count

    if duplicate_count > 0:
      print(f"{C_RED}FAILURE: Found {duplicate_count} duplicates in {field_name}{C_END}")
      # Crashes pipeline
      raise Exception("Pipeline Stopped: Duplicate Values Detected in {field_name}")
    
    print(f"{C_GREEN}PASSED: No duplicates found in {field_name}!{C_END}")

# Regex Pattern Checker
def regex_checker(view_name, field_name, pattern):
    print(f"Checking for invalid values in {view_name}.{field_name}")

    df = spark.read.table(view_name)
    # Count invalid values
    invalid_count = df.filter(~F.col(field_name).rlike(pattern)).count()

    if invalid_count > 0 :
      print(f"{C_RED}FAILURE: {invalid_count} number of invalid values found in {field_name}{C_END}")
      #Crash pipeline
      raise Exception("Pipeline Stopped: Invalid regex pattern detected")
  
    print(f"{C_GREEN}PASSED: All vaues in {field_name} follow the regex pattern{C_END}")

    


## title_ratings

#### 1. Transformation

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW staging_title_ratings AS

WITH 

-- Set up base table
base_table AS(
  SELECT * FROM imdb.raw.title_ratings
),

-- Clean base table
cleaned_base AS(
SELECT
  LOWER(TRIM(tconst)) AS id_number,
  CAST(averageRating AS DOUBLE) AS average_rating,
  CAST(numVotes AS INT) AS num_votes
FROM base_table
),

-- Enrich table with rating_band field
enriched_data AS(
  SELECT *,

  -- Setting ranges for rating bands ()
    CASE 
      WHEN average_rating IS NULL THEN 'unknown'
      WHEN average_rating < 0 THEN 'error'
      WHEN average_rating < 3 THEN '0-3'
      WHEN average_rating < 5 THEN '3-5'
      WHEN average_rating < 7 THEN '5-7'
      WHEN average_rating <= 10 THEN '7-10'
      ELSE 'error'
    END AS rating_band

  FROM cleaned_base
)

SELECT * FROM enriched_data;

SELECT * FROM staging_title_ratings;

####2. Validation Checks

In [0]:
# Universal Checks
null_checker(view_name="staging_title_ratings", field_name = "id_number")
duplicate_checker(view_name="staging_title_ratings", field_name = "id_number")

# Table-Specific Checks

#Regex Pattern check: id_number
id_number_pattern = r"^tt\d+$"
regex_checker(view_name = "staging_title_ratings", field_name = "id_number", pattern = id_number_pattern)


####3. Write table

In [0]:
%sql

CREATE OR REPLACE TABLE imdb.staging.title_ratings
AS SELECT * FROM staging_title_ratings;

SELECT * FROM imdb.staging.title_ratings LIMIT 1000;

## name_basics


#### 1. Transformation

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW staging_name_basics AS

WITH 

-- Set up base table
base_table AS(
  SELECT * FROM imdb.raw.name_basics
),

-- Clean base table
cleaned_base AS(
  SELECT 
    nconst AS person_id,
    primaryName AS primary_name,
    birthYear AS birth_year,
    deathYear AS death_year,

    -- COnvert merged strings to arrays
    SPLIT(LOWER(TRIM(primaryProfession)), ',') AS primary_profession,

    SPLIT(LOWER(TRIM(knownForTitles)), ',')AS known_for_titles

  FROM base_table
  WHERE primaryName IS NOT NULL
)

SELECT * FROM cleaned_base;

SELECT * FROM staging_name_basics;

####2. Validation Checks

In [0]:
# Universal Checks
null_checker(view_name = "staging_name_basics", field_name = "person_id")
null_checker(view_name = "staging_name_basics", field_name = "primary_name")
duplicate_checker(view_name = "staging_name_basics", field_name = "person_id")

# Table-Specific Checks

# Regex Pattern check: person_id
person_id_pattern = r"^nm\d+$"
regex_checker(view_name = "staging_name_basics", field_name = "person_id", pattern = person_id_pattern)

# Regex Pattern check: known_for_titles (array)
df = spark.read.table("staging_name_basics")
df_explode = df.select(F.explode(F.col("known_for_titles")).alias("known_for_titles"))    # Explode the array to check every array value
df_explode.createOrReplaceTempView("df_exploded")

pattern_known_for_title = r"^tt\d+$"        # Same pattern as id_number
regex_checker(view_name = "df_exploded", field_name = ("known_for_titles"), pattern = pattern_known_for_title)

####3. Write table

In [0]:
%sql

CREATE OR REPLACE TABLE imdb.staging.name_basics
AS SELECT * FROM staging_name_basics;

SELECT * FROM imdb.staging.name_basics LIMIT 1000;

## title_akas

#### 1. Transformation

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW staging_title_akas AS

WITH

-- Set up base table
base_table AS(
  SELECT * FROM imdb.raw.title_akas
),

-- Clean base table
cleaned_base AS(
  SELECT 
    titleId AS title_id,
    ordering AS ordering,
    title AS primary_title,

    --Replace market rollups
     CASE 
        WHEN region = 'XWW' THEN 'International' 
        WHEN region = 'XNA' THEN 'North America' 
        WHEN region = 'XEU' THEN 'Europe'
        WHEN region = 'XAF' THEN 'Africa'
        WHEN region = 'XAS' THEN 'Asia'
        WHEN region = 'XSA' THEN 'South America'
        WHEN region = 'XAU' THEN 'Australasia'
        WHEN region = 'XKO' THEN 'Korea'
        WHEN region = 'XPI' THEN 'Palestinian Territory'
        WHEN region = 'XKX' THEN 'Kosovo'
        WHEN region = 'XSI' THEN 'Other'
      ELSE region
      END AS region
    ,
    language AS language,
    types AS types,
    attributes AS attributes,
    CAST(isOriginalTitle AS BOOLEAN) AS is_original_title
  FROM base_table
)


SELECT * FROM cleaned_base;

SELECT * FROM staging_title_akas;

####2. Validation Checks

In [0]:
# Universal Checks
null_checker(view_name = "staging_title_akas", field_name = "title_id")
null_checker(view_name = "staging_title_akas", field_name = "primary_title")

# Regex pattern check: title_id
title_id_pattern = r"^tt\d+$"
regex_checker("staging_title_akas", "title_id", title_id_pattern)

####3. Write table

In [0]:
%sql
CREATE OR REPLACE TABLE imdb.staging.title_akas
AS SELECT * FROM staging_title_akas;

SELECT * FROM imdb.staging.title_akas LIMIT 1000;

## title_basics


#### 1. Transformation

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW staging_title_basics AS

WITH

-- Set up base table
base_table AS(
  SELECT * FROM imdb.raw.title_basics
),

-- Clean base table
cleaned_base AS(
  SELECT 
    LOWER(TRIM(tconst)) AS id_number,
    titleType AS title_type,
    primaryTitle AS primary_title,
    originalTitle AS original_title,

    -- Replace values NOT IN (0,1) to false for isAdult, then convert to boolean
    CASE
      WHEN isAdult NOT IN (0,1) THEN 'false'
      ELSE CAST(isAdult AS BOOLEAN)
      END AS is_adult,

    startYear AS start_year,
    endYear AS end_year,
    runtimeMinutes AS runtime_minutes,
    genres AS genres

  FROM base_table
),

-- Fix column misalignment: primary_title and original_title
  mislaignment_clean AS(
    SELECT * EXCEPT(primary_title, original_title),

      -- fixing for primary_title
      CASE WHEN primary_title LIKE '"%\t"%' 
        THEN substring(array_distinct(SPLIT(primary_title, '\t'))[0], 2)   
        /*01. split merged string into array
          02. array_distinct to remove duplicate array
          03. cast to string type by selecting 0th element in distinct array
          04. substring(___, 2) starts the string on the second cahracter, thus removing the bginining quotation mark */
      ELSE primary_title
      END AS primary_title,

      --fixing for original_title
      CASE WHEN primary_title LIKE '"%\t"%' 
        THEN substring(array_distinct(SPLIT(primary_title, '\t'))[0], 2)   -- this is just duplicate of cleaned primary_title
      ELSE original_title
      END AS original_title


    FROM cleaned_base
  ),

-- Enrich genres field and convert to array
  --Enrich Logic: if isAdult = 1 but genres does not have "Adult", then add "Adult" in genres
  enrich_and_convert_genres AS(
    SELECT * EXCEPT(genres),
      CASE WHEN
        is_adult = 'true' AND LOWER(genres) NOT LIKE "%adult%"
          THEN SPLIT(TRIM(LOWER(CONCAT(genres, ',Adult'))), ',')
        ELSE SPLIT(TRIM(LOWER(genres)), ',')
        END AS genres
    FROM mislaignment_clean
  )

SELECT 
  id_number,
  title_type,
  primary_title,
  original_title,
  is_adult,
  start_year,
  end_year,
  runtime_minutes,
  genres
 FROM enrich_and_convert_genres;

SELECT * FROM staging_title_basics;

####2. Validation Checks

In [0]:
# Universal Checks
null_checker("staging_title_basics", "id_number")
null_checker("staging_title_basics", "primary_title")
null_checker("staging_title_basics", "is_adult")
duplicate_checker("staging_title_basics", "id_number")

# Regex pattern checker: id_number
id_number_pattern = r"^tt\d+$"
regex_checker("staging_title_basics", "id_number", id_number_pattern)

####3. Write table

In [0]:
%sql
CREATE OR REPLACE TABLE imdb.staging.title_basics
AS SELECT * FROM staging_title_basics;

SELECT * FROM imdb.staging.title_basics LIMIT 1000;

## title_principals

#### 1. Transformation

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW staging_title_principals AS

WITH

base_table AS(
  SELECT * FROM imdb.raw.title_principals
),

cleaned_table AS(
  SELECT 
    tconst AS id_number,     -- rename for consistency across all tables  
    ordering AS ordering,
    nconst AS person_id,

   --Replace 'actress' and 'self' with 'actor' 
    CASE 
    WHEN TRIM(LOWER(category)) = 'actress' THEN 'actor'
    WHEN TRIM(LOWER(category)) = 'self' THEN 'actor'
    ELSE TRIM(LOWER(category))
    END AS role,

  -- Remove duplicates of category from job
    CASE
    WHEN TRIM(LOWER(job)) = TRIM(LOWER(category)) THEN null
    ELSE TRIM(LOWER(job))
    END AS job,

  -- Remove regex pattern
    REGEXP_REPLACE(characters, '[\\"\\[\\]]', '') AS characters

  FROM imdb.raw.title_principals
),

-- Coalesce job and characters columns
merge_job_characters AS(
  SELECT * EXCEPT(job, characters),
    COALESCE(job, characters) AS job_details
  FROM cleaned_table
)

SELECT * FROM merge_job_characters;

SELECT * FROM staging_title_principals

####2. Validation Checks

In [0]:
# Universal checks
null_checker("staging_title_principals","id_number")
null_checker("staging_title_principals","ordering")
null_checker("staging_title_principals","person_id")
null_checker("staging_title_principals","role")

# Regex pattern check: id_number
id_number_pattern = r"^tt\d+$"
regex_checker("staging_title_principals", "id_number", id_number_pattern)

# Regex pattern check: person_number
person_number_pattern = r"^nm\d+$"
regex_checker("staging_title_principals", "person_id", person_number_pattern)


####3. Write table

In [0]:
%sql
CREATE OR REPLACE TABLE imdb.staging.title_principals
AS SELECT * FROM staging_title_principals;

SELECT * FROM imdb.staging.title_principals LIMIT 1000