## Project: Delta Lake for Movie Jsons

In [0]:
# path
import re
source_path = '/mnt/antrastrg01adls/adlscontainer01/dataset/'
raw_path = '/FileStore/movie/raw/'
bronze_path = '/FileStore/movie/bronze/'
sivler_path = '/FileStore/movie/silver/'
gold_path = '/FileStore/movie/gold/'

### 0. datalake to Raw

In [0]:
# Make Raw Idempotent
dbutils.fs.rm(raw_path, recurse=True)

def retrieve_data(file: str, dest_path: str) -> bool:
    """Download file from remote location to driver. Move from driver to DBFS."""
    src = source_path + file
    dest = dest_path + file
    dbutils.fs.cp(src, dest)
    return True

def prepare_raw(dest_path=raw_path) -> bool:
    """Search for movie*.json data files, then call retrieve_data method to download to the destination raw pool."""
    json_pattern = '^movie.*\.json$'
    for f in dbutils.fs.ls(source_path):
        if re.match(json_pattern, f.name) is not None:
            retrieve_data(f.name, dest_path)
            
prepare_raw()

### 1. Raw to Bronze

In [0]:
from pyspark.sql.types import ArrayType, StructType,StructField, StringType, IntegerType, DateType, DoubleType, FloatType, LongType, TimestampType, DateType, BooleanType, MapType
from pyspark.sql.functions import col, explode, current_date, current_timestamp, lit, from_json, upper, lower
from pyspark.sql.dataframe import DataFrame

In [0]:
# Prepare Bronze path
dbutils.fs.rm(bronze_path, recurse=True)

In [0]:
# Ingest with a simple schema and metadata
movie_schema = StructType([StructField('movie', ArrayType(StringType()), True),])
    
def raw_to_bronze(src_path=raw_path, dest_path=bronze_path, schema=movie_schema, persist=True) -> DataFrame:
    """
    One by one ingestion to show json file names as the datasource.
    """
    for f in dbutils.fs.ls(src_path):
        movie_raw_df = (spark.read.option("inferSchema", 'false').option('multiline', 'true').schema(movie_schema).json(f.path))
        movie_raw_df = movie_raw_df.withColumn('movie', explode('movie'))
        movie_meta_df = movie_raw_df.select('Movie', lit(f.name).alias('SourceFile'), current_timestamp().alias('IngestTime'), current_timestamp().cast('date').alias('p_IngestDate'), lit('new').alias('Status'))
        if persist:
            movie_meta_df.write.format('delta').partitionBy('p_IngestDate').mode('append').save(dest_path)
        return movie_meta_df

movie_bronze = raw_to_bronze(persist=True)

In [0]:
# Alternatively, batch ingestion with detailed schema
movie_schema = StructType().add('movie', ArrayType(
    StructType([
    StructField('BackdropUrl', StringType(), True),
    StructField('Budget', FloatType(), True),
    StructField('CreatedBy', StringType(), True),
    StructField('CreatedDate', TimestampType(), True),
    StructField('Id', LongType(), True),
    StructField('ImdbUrl', StringType(), True),
    StructField('OriginalLanguage', StringType(), True),
    StructField('Overview', StringType(), True),
    StructField('PosterUrl', StringType(), True),
    StructField('Price', FloatType(), True),
    StructField('ReleaseDate', TimestampType(), True),
    StructField('Revenue', FloatType(), True),
    StructField('RunTime', IntegerType(), True),
    StructField('Tagline', StringType(), True),
    StructField('Title', StringType(), True),
    StructField('TmdbUrl', StringType(), True),
    StructField('UpdatedBy', StringType(), True),
    StructField('UpdatedDate', TimestampType(), True),
    StructField('genres', ArrayType(StructType([
        StructField('id', LongType(), True),
        StructField('name', StringType(), True),
    ])), True),
])
), True)

def raw_to_bronze(src_path=raw_path, dest_path=bronze_path, schema=movie_schema, persist=True) -> DataFrame:
    """
    Batch ingestion with wildcard.
    """
    movie_raw_df = (spark.read.option("inferSchema", 'false').option('multiline', 'true').schema(movie_schema).json(src_path+'movie*.json'))
    movie_raw_df = movie_raw_df.withColumn('movie', explode('movie'))
    movie_meta_df = movie_raw_df.select('Movie', lit('movie_json').alias('SourceFile'), current_timestamp().alias('IngestTime'), current_timestamp().cast('date').alias('p_IngestDate'), lit('new').alias('Status'))
    if persist:
        movie_meta_df.write.format('delta').partitionBy('p_IngestDate').mode('append').save(dest_path)
    return movie_meta_df

movie_bronze = raw_to_bronze(persist=False)

In [0]:
%sql
-- show history of the bronze folder
DESCRIBE HISTORY '/FileStore/movie/bronze'

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2022-08-23T21:56:02.000+0000,1289947930359854,yokurt@yahoo.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1641359404838874),0817-160429-43ys5vhs,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1249, numOutputBytes -> 523212)",,Databricks-Runtime/10.4.x-scala2.12


#### Create bronze delta table

In [0]:
# Register the Bronze Table in the Metastore
spark.sql(
    """
    CREATE SCHEMA IF not EXISTS delta_lake
    """
)
spark.sql(
    """
    USE delta_lake
    """
)
spark.sql(
    """
    DROP TABLE IF EXISTS movie_bronze
    """
)
spark.sql(
    f"""
CREATE TABLE movie_bronze
USING DELTA
LOCATION "{bronze_path}"
"""
)

In [0]:
%sql
-- quety the bronze table
SELECT * FROM movie_bronze limit 10;

Movie,sourcefile,IngestTime,p_IngestDate,Status
"{""Id"":1,""Title"":""Inception"",""Overview"":""Cobb, a skilled thief who commits corporate espionage by infiltrating the subconscious of his targets is offered a chance to regain his old life as payment for a task considered to be impossible: \""inception\"", the implantation of another person's idea into a target's subconscious."",""Tagline"":""Your mind is the scene of the crime."",""Budget"":1.6E8,""Revenue"":8.25532764E8,""ImdbUrl"":""https://www.imdb.com/title/tt1375666"",""TmdbUrl"":""https://www.themoviedb.org/movie/27205"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//9gk7adHYeDvHkCSEqAvQNLV5Uge.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//s3TBrRGB1iav7gFOCNx3H31MoES.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2010-07-15T00:00:00"",""RunTime"":148,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1633333"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":6,""name"":""Action""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":2,""Title"":""Interstellar"",""Overview"":""The adventures of a group of explorers who make use of a newly discovered wormhole to surpass the limitations on human space travel and conquer the vast distances involved in an interstellar voyage."",""Tagline"":""Mankind was born on Earth. It was never meant to die here."",""Budget"":1.65E8,""Revenue"":6.75120017E8,""ImdbUrl"":""https://www.imdb.com/title/tt0816692"",""TmdbUrl"":""https://www.themoviedb.org/movie/157336"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//gEU2QniE6E77NI6lCU6MxlNBvIx.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//xJHokMbljvjADYdit5fK5VQsXEG.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2014-11-05T00:00:00"",""RunTime"":169,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1633333"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":4,""name"":""Drama""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":3,""Title"":""The Dark Knight"",""Overview"":""Batman raises the stakes in his war on crime. With the help of Lt. Jim Gordon and District Attorney Harvey Dent, Batman sets out to dismantle the remaining criminal organizations that plague the streets. The partnership proves to be effective, but they soon find themselves prey to a reign of chaos unleashed by a rising criminal mastermind known to the terrified citizens of Gotham as the Joker."",""Tagline"":""Why So Serious?"",""Budget"":1.85E8,""Revenue"":1.004558444E9,""ImdbUrl"":""https://www.imdb.com/title/tt0468569"",""TmdbUrl"":""https://www.themoviedb.org/movie/155"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//qJ2tW6WMUDux911r6m7haRef0WH.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//hkBaDkMWbLaf8B1lsWsKX7Ew3Xq.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2008-07-16T00:00:00"",""RunTime"":152,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1633333"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":4,""name"":""Drama""},{""id"":6,""name"":""Action""},{""id"":10,""name"":""Thriller""},{""id"":11,""name"":""Crime""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":4,""Title"":""Deadpool"",""Overview"":""Deadpool tells the origin story of former Special Forces operative turned mercenary Wade Wilson, who after being subjected to a rogue experiment that leaves him with accelerated healing powers, adopts the alter ego Deadpool. Armed with his new abilities and a dark, twisted sense of humor, Deadpool hunts down the man who nearly destroyed his life."",""Tagline"":""Witness the beginning of a happy ending"",""Budget"":5.8E7,""Revenue"":7.831E8,""ImdbUrl"":""https://www.imdb.com/title/tt1431045"",""TmdbUrl"":""https://www.themoviedb.org/movie/293660"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//yGSxMiF0cYuAiyuve5DA6bnWEOI.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//en971MEXui9diirXlogOrPKmsEn.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2016-02-09T00:00:00"",""RunTime"":108,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1633333"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":6,""name"":""Action""},{""id"":7,""name"":""Comedy""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":5,""Title"":""The Avengers"",""Overview"":""When an unexpected enemy emerges and threatens global safety and security, Nick Fury, director of the international peacekeeping agency known as S.H.I.E.L.D., finds himself in need of a team to pull the world back from the brink of disaster. Spanning the globe, a daring recruitment effort begins!"",""Tagline"":""Some assembly required."",""Budget"":2.2E8,""Revenue"":1.51955791E9,""ImdbUrl"":""https://www.imdb.com/title/tt0848228"",""TmdbUrl"":""https://www.themoviedb.org/movie/24428"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//RYMX2wcKCBAr24UyPD7xwmjaTn.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//kwUQFeFXOOpgloMgZaadhzkbTI4.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2012-04-25T00:00:00"",""RunTime"":143,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":6,""name"":""Action""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":6,""Title"":""Avatar"",""Overview"":""In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but becomes torn between following orders and protecting an alien civilization."",""Tagline"":""Enter the World of Pandora."",""Budget"":2.37E8,""Revenue"":2.787965087E9,""ImdbUrl"":""https://www.imdb.com/title/tt0499549"",""TmdbUrl"":""https://www.themoviedb.org/movie/19995"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//6EiRUJpuoeQPghrs3YNktfnqOVh.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//AmHOQ7rpHwiaUMRjKXztnauSJb7.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2009-12-10T00:00:00"",""RunTime"":162,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":2,""name"":""Fantasy""},{""id"":6,""name"":""Action""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":7,""Title"":""Guardians of the Galaxy"",""Overview"":""Light years from Earth, 26 years after being abducted, Peter Quill finds himself the prime target of a manhunt after discovering an orb wanted by Ronan the Accuser."",""Tagline"":""All heroes start somewhere."",""Budget"":1.7E8,""Revenue"":7.727766E8,""ImdbUrl"":""https://www.imdb.com/title/tt2015381"",""TmdbUrl"":""https://www.themoviedb.org/movie/118340"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//r7vmZjiyZw9rpJMQJdXpjgiCOk9.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//mZSAu5acXueGC4Z3S5iLSWx8AEp.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2014-07-30T00:00:00"",""RunTime"":121,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":6,""name"":""Action""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":8,""Title"":""Fight Club"",""Overview"":""A ticking-time-bomb insomniac and a slippery soap salesman channel primal male aggression into a shocking new form of therapy. Their concept catches on, with underground \""fight clubs\"" forming in every town, until an eccentric gets in the way and ignites an out-of-control spiral toward oblivion."",""Tagline"":""Mischief. Mayhem. Soap."",""Budget"":6.3E7,""Revenue"":1.00853753E8,""ImdbUrl"":""https://www.imdb.com/title/tt0137523"",""TmdbUrl"":""https://www.themoviedb.org/movie/550"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//8kNruSfhk5IoE4eZOc4UpvDn6tq.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//52AfXWuXCHn3UjD17rBruA9f5qb.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""1999-10-15T00:00:00"",""RunTime"":139,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":4,""name"":""Drama""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":9,""Title"":""Avengers: Infinity War"",""Overview"":""As the Avengers and their allies have continued to protect the world from threats too large for any one hero to handle, a new danger has emerged from the cosmic shadows: Thanos. A despot of intergalactic infamy, his goal is to collect all six Infinity Stones, artifacts of unimaginable power, and use them to inflict his twisted will on all of reality. Everything the Avengers have fought for has led up to this moment - the fate of Earth and existence itself has never been more uncertain."",""Tagline"":""An entire universe. Once and for all."",""Budget"":3.0E8,""Revenue"":2.046239637E9,""ImdbUrl"":""https://www.imdb.com/title/tt4154756"",""TmdbUrl"":""https://www.themoviedb.org/movie/299536"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//7WsyChQLEftFiDOVTGkv3hFpyyt.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//lmZFxXgJE3vgrciwuDib0N8CfQo.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""2018-04-25T00:00:00"",""RunTime"":149,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":1,""name"":""Adventure""},{""id"":6,""name"":""Action""},{""id"":13,""name"":""Science Fiction""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new
"{""Id"":10,""Title"":""Pulp Fiction"",""Overview"":""A burger-loving hit man, his philosophical partner, a drug-addled gangster's moll and a washed-up boxer converge in this sprawling, comedic crime caper. Their adventures unfurl in three stories that ingeniously trip back and forth in time."",""Tagline"":""Just because you are a character doesn't mean you have character."",""Budget"":8000000.0,""Revenue"":2.14179088E8,""ImdbUrl"":""https://www.imdb.com/title/tt0110912"",""TmdbUrl"":""https://www.themoviedb.org/movie/680"",""PosterUrl"":""https://image.tmdb.org/t/p/w342//plnlrtBUULT0rh3Xsjmpubiso3L.jpg"",""BackdropUrl"":""https://image.tmdb.org/t/p/original//w7RDIgQM6bLT7JXtH4iUQd3Iwxm.jpg"",""OriginalLanguage"":""en"",""ReleaseDate"":""1994-09-10T00:00:00"",""RunTime"":154,""Price"":9.9,""CreatedDate"":""2021-04-03T16:51:30.1666667"",""UpdatedDate"":null,""UpdatedBy"":null,""CreatedBy"":null,""genres"":[{""id"":10,""name"":""Thriller""},{""id"":11,""name"":""Crime""}]}",movie_0.json,2022-08-23T21:56:02.196+0000,2022-08-23,new


## 2. Bronze to Silver

#### checkout bronze data and unpack schema

In [0]:
# checkout bronze data
movie_bronze = spark.read.table("movie_bronze").filter("Status = 'new'")
movie_bronze.printSchema()

In [0]:
# define the schema
schema = StructType([
    StructField('Id', LongType(), True),
    StructField('Title', StringType(), True),
    StructField('Overview', StringType(), True),
    StructField('Tagline', StringType(), True),
    StructField('Budget', FloatType(), True),
    StructField('Revenue', FloatType(), True),
    StructField('ImdbUrl', StringType(), True),
    StructField('TmdbUrl', StringType(), True),
    StructField('PosterUrl', StringType(), True),
    StructField('BackdropUrl', StringType(), True),
    StructField('OriginalLanguage', StringType(), True),
    StructField('ReleaseDate', TimestampType(), True),
    StructField('RunTime', IntegerType(), True),
    StructField('Price', FloatType(), True),
    StructField('CreatedDate', TimestampType(), True),
    StructField('UpdatedDate', TimestampType(), True),
    StructField('CreatedBy', StringType(), True),
    StructField('UpdatedBy', StringType(), True),
    StructField('genres', ArrayType(StructType([
        StructField('id', LongType(), True),
        StructField('name', StringType(), True),
    ])), True),
])
movie_augment_df = movie_bronze.withColumn('value', from_json(col('Movie'), schema)).select('value.*','*')
movie_augment_df = movie_augment_df.drop('value')
movie_augment_df = movie_augment_df.withColumn('genres', explode('genres')).select('*', col('genres.id').alias('genre_id'), col('genres.name').alias('genre_name'))
movie_augment_df = movie_augment_df.drop('genres')
movie_augment_df.printSchema()

#### Cleanse and Quarantine

In [0]:
silver_col = movie_augment_df.columns

In [0]:
# budget lower than 1M
movie_quarantine_lowbudget = movie_augment_df.filter('Budget < 1000000')

In [0]:
# genre name is missing
movie_quarantine_genre_missing = movie_augment_df.filter('genre_name is null')

In [0]:
# negative runtime
movie_quarantine_negative_runtime = movie_augment_df.filter('RunTime < 0')

In [0]:
# quarantine
movie_quarantine = movie_quarantine_lowbudget.union(movie_quarantine_genre_missing).union(movie_quarantine_negative_runtime)
movie_clean = movie_augment_df.subtract(movie_quarantine)

#### update bronze records to reflect the cleaned/loaded status

??? q1, bad records in exploded table, how to quarantine? the whole original table or the exploded sub-rows?

q2, how to treat duplicate movies records with inconsistent details? will it land on bronze as new or loaded?
(should be treated as two different records in silver.... i guess)

q3, quarantine tables in silver include junction tables and lookup tables?

### split & normalization for silver

In [0]:
movie_table = movie_clean.select('*').drop('Movie', 'OriginalLanguage', 'genre_id', 'genre_name').distinct()
originallanguage_lookup = movie_clean.select(col('Id').alias('movie_id'), col('OriginalLanguage')).distinct()
genres_lookup = movie_clean.select(col('Id').alias('movie_id'), col('genre_id'), col('genre_name')).distinct()