In [0]:
%run ./includes/configuration

In [0]:
%run ./includes/utilities

In [0]:
%run ./includes/operations

In [0]:
# Step 1: Create RawDF DataFrame
rawDF = ingest_batch_raw(rawData)

In [0]:
# Step 2: Transform the Raw Data
transformedRawDF = transform_raw(rawDF)

In [0]:
#Step 3:
rawToBronzeWriter = batch_writer(dataframe=transformedRawDF, checkpoint=bronzeCheckpoint, name="write_raw_to_bronze",partition_column="p_ingestdate")
rawToBronzeWriter.save(bronzePath)

In [0]:
#Display the bronze table
bronzeDF = read_stream_delta(spark, bronzePath)
display(bronzeDF, streamName="display_bronze")

In [0]:
language_silver = get_language_table(movies_bronze)
display(language_silver)

In [0]:
genres_silver = get_genres_table(movies_bronze)
display(genres_silver)

In [0]:
movies_silver = movies_silver.withColumn("movie_genre_junction_id", monotonically_increasing_id()+1)
movie_genre_junction_silver = get_movie_genre_junction_table(movies_silver)
display(movie_genre_junction_silver)

In [0]:
movies_silver = movies_silver.join(language_silver, movies_silver.OriginalLanguage == language_silver.OriginalLanguage, "inner").drop("OriginalLanguage")
display(movies_silver)

In [0]:
movies_silver = movies_silver.drop("genres")
display(movies_silver)

In [0]:
#Show running streams
for stream in spark.streams.active:
    print(stream.name)

In [0]:
dbutils.fs.rm(silverPath, recurse=True)

In [0]:
# move data to silver master table
silver_master_tracker = (bronzeDF
                         .select("movie",
                                 col("movie.BackdropUrl"), 
                                 col("movie.Budget"), 
                                 col("movie.CreatedBy"), 
                                 col("movie.CreatedDate").cast("date").alias("CreatedDate"), 
                                 col("movie.Id"), 
                                 col("movie.ImdbUrl"), 
                                 col("movie.OriginalLanguage"), 
                                 col("movie.Overview"), 
                                 col("movie.PosterUrl"), 
                                 col("movie.Price"), 
                                 col("movie.ReleaseDate"), 
                                 col("movie.Revenue"), 
                                 col("movie.RunTime"), 
                                 col("movie.Tagline"), 
                                 col("movie.Title"), 
                                 col("movie.TmdbUrl"), 
                                 col("movie.UpdatedBy"), 
                                 col("movie.UpdatedDate").cast("date").alias("UpdatedDate"), 
                                 col("movie.genres")
                                ))

In [0]:
display(silver_master_tracker)

In [0]:
'''from pyspark.sql.types import _parse_datatype_string
assert silver_master_tracker.schema == _parse_datatype_string(
    """
      movie STRING,
      BackdropUrl string,
      Budget double,
      CreatedBy string,
      CreatedDate DATE,
      Id long,
      ImdbUrl string,
      OriginalLanguage string,
      Overview string,
      PosterUrl string,
      Price double,
      ReleaseDate string,
      Revenue double,
      RunTime long,
      Tagline string,
      Title string,
      TmdbUrl string,
      UpdatedBy string,
      UpdatedDate DATE,
      genres array
      """), "Schemas do not match"
print("Assertion passed.")
'''

In [0]:
silver_master_tracker.count()

In [0]:
silver_master_tracker.dropna(how="all").count()

In [0]:
# Split the silver dataFrame
silver_master_tracker_clean = silver_master_tracker.filter("runtime >= 0" and "budget >= 1000000")
silver_master_tracker_quarantine = silver_master_tracker.filter("budget < 1000000" or "runtime < 0")

In [0]:
silver_master_tracker_clean.count()

In [0]:
silver_master_tracker_quarantine.count()

In [0]:
display(silver_master_tracker_quarantine)

In [0]:
(silver_master_tracker_clean.select("BackdropUrl", "Budget", "CreatedBy", "CreatedDate", "Id", "ImdbUrl", "OriginalLanguage", "Overview", "PosterUrl", "Price", "ReleaseDate", "Revenue", "RunTime", "Tagline", "Title", "TmdbUrl", "UpdatedBy", "UpdatedDate", "genres")
    .write.format("delta")
    .mode("append")
    .save(silverPath))

In [0]:
spark.sql(
    """
DROP TABLE IF EXISTS master_silver
"""
)

spark.sql(
    f"""
CREATE TABLE master_silver
USING DELTA
LOCATION "{silverPath}"
"""
)

In [0]:
'''silverTable = spark.read.table("master_silver")
expected_schema = """
      BackdropUrl string,
      Budget double,
      CreatedBy string,
      CreatedDate DATE,
      Id long,
      ImdbUrl string,
      OriginalLanguage string,
      Overview string,
      PosterUrl string,
      Price double,
      ReleaseDate string,
      Revenue double,
      RunTime long,
      Tagline string,
      Title string,
      TmdbUrl string,
      UpdatedBy string,
      UpdatedDate DATE,
      genres array
"""
assert silverTable.schema == _parse_datatype_string(expected_schema), "Schemas do not match"
print("Assertion passed.")'''

In [0]:
# Step 1: Update Clean records
from delta.tables import DeltaTable

bronzeTable = DeltaTable.forPath(spark, bronzePath)
silverAugmented = silver_master_tracker_clean.withColumn("status", lit("loaded")).dropDuplicates()

update_match = "bronze.movie = clean.movie"
update = {"status": "clean.status"}

(
    bronzeTable.alias("bronze")
    .merge(silverAugmented.alias("clean"), update_match)
    .whenMatchedUpdate(set=update)
    .execute()
)

In [0]:
# Step 2: Update Quarantined records
silverAugmented = silver_master_tracker_quarantine.withColumn("status", lit("quarantined")).dropDuplicates()

update_match = "bronze.movie = quarantine.movie"
update = {"status": "quarantine.status"}

(
    bronzeTable.alias("bronze")
    .merge(silverAugmented.alias("quarantine"), update_match)
    .whenMatchedUpdate(set=update)
    .execute()
)