In [0]:
# imports
from pyspark.sql.functions import explode, current_timestamp, lit, col, from_json, monotonically_increasing_id
from delta.tables import DeltaTable

In [0]:
# global variables
BRONZE_SCHEMA = 'movie ARRAY<STRING>'

SILVER_SCHEMA = """
    Id INTEGER,
    Title STRING,
    Overview STRING,
    Tagline STRING,
    Budget DOUBLE,
    Revenue DOUBLE,
    ImdbUrl STRING,
    TmdbUrl STRING,
    PosterUrl STRING, 
    BackdropUrl STRING,
    OriginalLanguage STRING,
    ReleaseDate TIMESTAMP,
    RunTime INTEGER,
    Price DOUBLE,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    UpdatedBy STRING,
    CreatedBy STRING,
    genres ARRAY<STRING>
"""

GENRE_SCHEMA = '''
    id INTEGER,
    name STRING
'''

In [0]:
def read_batch_raw(directory):
    '''read movie json files into a dataframe'''
    
    global BRONZE_SCHEMA
    raw_movie_df = (spark.read
                .option('multiLine', True)
                .schema(BRONZE_SCHEMA)
                .json(directory))
    return raw_movie_df

In [0]:
def transform_raw(raw_movie_df):
    exploded_movie_df = raw_movie_df.withColumn("movie_exploded", explode("movie"))
    exploded_df = exploded_movie_df.select("movie_exploded")
    transformed_movie_df = (
        exploded_df.select(
            lit("www.imdb.com").alias("datasource"),
            current_timestamp().alias("ingesttime"),
            col("movie_exploded").alias('value'),
            lit("new").alias("status"),
            current_timestamp().cast("date").alias("p_ingestdate"),
        )
    )
    return transformed_movie_df

In [0]:
def batch_writer(dataframe, partition=False, partition_column='', exclude_columns=[], mode='append'):
    if partition:
        raw2BronzeWriter = (
            dataframe.drop(*exclude_columns)
            .write.format("delta")
            .mode(mode)
            .partitionBy(partition_column)
        )
    else:
        raw2BronzeWriter = (
            dataframe.drop(*exclude_columns)
            .write.format("delta")
            .mode(mode)
        )
    return raw2BronzeWriter

In [0]:
def create_table(tableName, deltaPath):
    spark.sql(
    f""" DROP TABLE IF EXISTS {tableName} """
    )

    spark.sql(
        f"""
    CREATE TABLE {tableName}
    USING DELTA
    LOCATION "{deltaPath}"
    """
    )

In [0]:
def read_batch_bronze(spark):
    return spark.read.table("movie_bronze").filter("status = 'new'")

In [0]:
def generate_silverDF(bronzeDF):
    global SILVER_SCHEMA
    global GENRE_SCHEMA
    augmented_bronze_movie = bronzeDF.withColumn(
        "nested_json", from_json(col("value"), SILVER_SCHEMA)
    )
    silver_movie = augmented_bronze_movie.select("value", "nested_json.*")
    
    silver_genre = silver_movie.select(col("Id").alias('movie_id'), 'genres')
    silver_genre = silver_genre.withColumn("genre_exploded", explode("genres"))
    
    silver_genre = silver_genre.withColumn(
        "genre", from_json(col("genre_exploded"), GENRE_SCHEMA))
    
    genre_movie_junction = silver_genre.select(
        col("movie_id").alias("movie_id"),
        col("genre.id").alias("genre_id"),
        col("genre.name").alias("genre_name"))
    
    genre_lookup = genre_movie_junction.select("genre_id", "genre_name").distinct()
    genre_lookup = genre_lookup.dropDuplicates(["genre_id"])
    
    genre_movie_junction = genre_movie_junction.drop('genre_name')
    language_lookup = silver_movie.select("OriginalLanguage").distinct()
    language_lookup = language_lookup.withColumn('language_id', monotonically_increasing_id())
    language_lookup = language_lookup.select('language_id', col('OriginalLanguage').alias('language_name'))
    
    silver_movie_final = silver_movie.join(language_lookup, silver_movie.OriginalLanguage==language_lookup.language_name).select(
    'value',
    'Id',
    'Title',
    'Overview',
    'Tagline',
    'Budget',
    'Revenue',
    'ImdbUrl',
    'TmdbUrl',
    'PosterUrl',
    'BackdropUrl',
    col('language_id').alias('Language_id'),
    col('ReleaseDate').cast('date'),
    'RunTime',
    'Price',
    col('CreatedDate').cast('date').alias('p_createddate'),
    'UpdatedDate',
    'UpdatedBy',
    'CreatedBy')

    silver_movie_final = silver_movie_final.dropDuplicates()
    
    return silver_movie_final, genre_lookup, genre_movie_junction, language_lookup

In [0]:
def generate_clean_and_quarantine_dataframes(silverDF):
    silver_movie_clean = silverDF.filter('Runtime >= 0')
    silver_movie_quarantine = silverDF.filter('Runtime < 0')
    return silver_movie_clean, silver_movie_quarantine

In [0]:
def update_bronze_table_status(spark, bronzeTablePath, dataframe, status):
    bronzeTable = DeltaTable.forPath(spark, bronzePath)
    dataframeAugmented = dataframe.withColumn("status", lit(status))

    update_match = "bronze.value = dataframe.value"
    update = {"status": "dataframe.status"}

    (
        bronzeTable.alias("bronze")
        .merge(dataframeAugmented.alias("dataframe"), update_match)
        .whenMatchedUpdate(set=update)
        .execute()
    )