## Step Configuration

In [0]:
%run ./includes/configuration_jsonmoive

## Raw to Bronze Pipeline

In [0]:
#Step1 Read and transform raw data
rawDF = read_batch_raw(rawPath)
transformedRawDF = transform_raw(rawDF)

In [0]:
#Step2 Write betch to a bronze table
rawToBronzeWriter = batch_writer(
    dataframe=transformedRawDF, partition_column="p_Ingestdate"
)

rawToBronzeWriter.save(bronzePath)

#Step3 Register the Bronze Table in the Metastore
spark.sql(
    """
DROP TABLE IF EXISTS movies_df_bronze
"""
)

spark.sql(
    f"""
CREATE TABLE movies_df_bronze
USING DELTA
LOCATION "{bronzePath}"
"""
)

## Bronze to Silver Pipeline

In [0]:
#Step1 Read and transform raw data
bronzeDF = read_batch_bronze(spark)
transformedBronzeDF = transform_bronze(bronzeDF)

In [0]:
#Step2 Generate dataframe
#clean and quarantine Movies
(silverCleanDF, silverQuarantineDF) = generate_clean_and_quarantine_dataframes(transformedBronzeDF)

#Genres
genre_df_silver = generate_genre_dataframes(transformedBronzeDF)

#Junction MovieGenres
junction_moviegenre_df_silver = generate_Juntion_MovieGenre_dataframes(transformedBronzeDF)

#OriginalLanguage
OriginalLanguage_df_silver = generate_originalLanguage_dataframes(transformedBronzeDF)


In [0]:
#Step3 WRITE Clean Batch to Silver Tables And Register
#Movies
bronzeToSilverWriter = batch_writer(
    dataframe=silverCleanDF, partition_column="p_ReleaseYear", exclude_columns=["Movies"]
)
bronzeToSilverWriter.save(silverPath)

#Genres
bronzeToSilverGenreWriter = batch_writer_lookuptable(dataframe = genre_df_silver)
bronzeToSilverGenreWriter.save(silverGenrePath)

#Junction MovieGenres
bronzeToSilverJuncMovieGenreWriter = batch_writer_lookuptable(dataframe = junction_moviegenre_df_silver)
bronzeToSilverJuncMovieGenreWriter.save(silverJuncMovieGenrePath)

#OriginalLanguage
bronzeToSilverOriginalLanguageWriter = batch_writer_lookuptable(dataframe = OriginalLanguage_df_silver)
bronzeToSilverOriginalLanguageWriter.save(silverOriginalLanguagePath)

In [0]:
#Register
#Movies
spark.sql(
    """
DROP TABLE IF EXISTS movies_df_silver
"""
)

spark.sql(
    f"""
CREATE TABLE movies_df_silver
USING DELTA
LOCATION "{silverPath}"
"""
)

#Genres
spark.sql(
    """
DROP TABLE IF EXISTS genres_df_silver
"""
)

spark.sql(
    f"""
CREATE TABLE genres_df_silver
USING DELTA
LOCATION "{silverGenrePath}"
"""
)

#Junction MovieGenres
spark.sql(
    """
DROP TABLE IF EXISTS movies_genres_df_silver
"""
)

spark.sql(
    f"""
CREATE TABLE movies_genres_df_silver
USING DELTA
LOCATION "{silverJuncMovieGenrePath}"
"""
)

#OriginalLanguage
spark.sql(
    """
DROP TABLE IF EXISTS OriLanguage_df_silver
"""
)

spark.sql(
    f"""
CREATE TABLE OriLanguage_df_silver
USING DELTA
LOCATION "{silverOriginalLanguagePath}"
"""
)


In [0]:
#Step4 Update Bronze table to Reflect the Loads
update_bronze_table_status(spark, bronzePath, silverCleanDF, "loaded")
update_bronze_table_status(spark, bronzePath, silverQuarantineDF, "quarantined")

## Handle Quarantined Records

In [0]:
#Step1 Load AND Transform Quarantined Records from the Bronze Table
bronzeQuarantinedDF = spark.read.table("movies_df_bronze").filter("Status = 'quarantined'")
bronzeQuarTransDF = transform_bronze(bronzeQuarantinedDF)

#Step2 Repair
repairDF = bronzeQuarTransDF.withColumn('Runtime', abs(col('Runtime'))).drop('genres')

In [0]:
#Step3 Batch Write the Repaired Dataframe
bronzeToSilverQuarWriter = batch_writer(
    dataframe=repairDF, partition_column="p_ReleaseYear", exclude_columns=["Movies"]
)

bronzeToSilverQuarWriter.save(silverPath)


In [0]:
#Step4 Update Status
update_bronze_table_status(spark, bronzePath, repairDF, "loaded")