**NOTEBOOK CELL DESCRIPTION**  

**Cell 1** : Importing required modules  
**Cell 2** : Read Data from Json File to the datafeame  
**Cell 3** : Defining data quality checks and validation rules  
**Cell 4** : Filter out bad records from the dataset and redirect it to a separate table for re-prossing  
**Cell 5** : Data cleaning & deriving new columns for further processing    
**Cell 6** : Performing SCD Type 2 and persisting(writing) data to a delta table  
**Cell 7** : Auditing the flow

In [0]:
%run ./NB_Dependency

In [0]:
%run ./NB_Logging

In [0]:
# Importing required module to the Notebook
from pyspark.sql.types import *
from pyspark.sql.functions import *
from isoduration import parse_duration
from pyspark.sql import functions as F
from datetime import date
import pandas as pd
import datetime
startTime = datetime.datetime.now()
status = "Running"

In [0]:
# Reading the json files from the input folder into a dataframe
schema = StructType([
         StructField("cookTime",StringType(),True),
         StructField("datePublished",DateType(),True),
         StructField("description",StringType(),True),
         StructField("image",StringType(),True),
         StructField("ingredients",StringType(),True),
         StructField("name",StringType(),True),
         StructField("prepTime",StringType(),True),
         StructField("recipeYield",StringType(),True),
         StructField("url",StringType(),True)
         ])

df_input = spark.read.schema(schema).option("multiline","true").json("dbfs:/FileStore/Input/*.json")


Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# Defining the data quality checks and validation rules

rule1 = col("cookTime") != "PT" #'cookTime' should not be 'PT'
rule2 = col("cookTime") != "" #'cookTime' should not be empty
rule3 = col("prepTime") != "PT" #'prepTime' should not be 'PT'
rule4 = col("prepTime") != ""#'prepTime' should not be empty


/tmp/custom_log2022-07-08-06-08-19.log


In [0]:
# Dropping records with all null values
df_input.na.drop("all")


# Filtering out all the bad records by applying validation rules
df_goodRecords = df_input.where(rule1 & rule2 & rule3 & rule4).distinct()  
df_badRecords = df_input.subtract(df_goodRecords)



# Redirecting/Storing the bad records in a separate table for further analysis
try:
  df_badRecords.write.format("delta")\
                     .mode("overwrite")\
                     .saveAsTable("BadRecords1")
except Exception as e:
  logger.error(e)
  status = "Failed"

In [0]:
# User Defined Function to parsing the ISO duration to Timedelta in minutes 
@F.pandas_udf("int")
def parse_iso_duration(str_duration):
    return str_duration.apply(lambda duration: ( ((parse_duration(duration)).time.hours)*60)
                                                +((parse_duration(duration)).time.minutes)
                                                +(((parse_duration(duration)).time.seconds)/60))
  
  
  
# Deriving new columns "cookTime_in_minutes" & "prepTime_in_minutes" by calling the UDF "parse_iso_duration"
try :
    df_final = df_goodRecords.withColumn("cookTime_in_minutes", parse_iso_duration(F.col("cookTime")))\
                             .withColumn("prepTime_in_minutes", parse_iso_duration(F.col("prepTime")))
    
    df_final = df_final.select("name",
                               "description",
                               "ingredients",
                               "url",
                               "image",
                               "recipeYield",
                               "datePublished",
                               "cookTime",
                               "prepTime",
                               "cookTime_in_minutes",
                               "prepTime_in_minutes")
except Exception as e :
  
    logger.error(e)
    status = "Failed"
  

In [0]:
# persisting(writing) the final dataset to a delta lake table
# We are performing SCD type 2 on all the new updates for the recepies
try :
  df_final.write.format("delta")\
                .mode("overwrite")\
                .saveAsTable("Recipies")
except Exception as e :
  logger.error(e)
  status = "Failed"
  


In [0]:
# Auditing the execution

today = date.today()
notebook_name = "NB_Task1"
#notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
if status == "Running":
  status = "Completed"
no_of_records_processed = df_input.count()
no_of_records_passed = df_goodRecords.count()
no_of_records_failed = df_badRecords.count()
#no_of_records_inserted = df_final.count()
endTime = datetime.datetime.now()
executionTime = (endTime - startTime) 

data2 = [(today,notebook_name,executionTime,status,no_of_records_processed,no_of_records_passed,no_of_records_failed)]

schema = StructType([ \
    StructField("date",DateType(),True), \
    StructField("notebook_name",StringType(),True), \
    StructField("executionTime",StringType(),True), \
    StructField("status",StringType(),True), \
    StructField("no_of_records_processed", StringType(), True), \
    StructField("no_of_records_passed", StringType(), True), \
    StructField("no_of_records_failed", StringType(), True) \
  ])
 
df_audit = spark.createDataFrame(data=data2,schema=schema)

try :
  df_audit.write.format("csv")\
                .mode("append")\
                .saveAsTable("Audit")
except Exception as e :
  logger.error(e)
  status = "Failed"
logging.shutdown()

In [0]:
# Moving the log file to DBFS
dbutils.fs.mv("file:"+p_logfile, "dbfs:/FileStore/CustomLogging/"+p_filename)

Out[10]: True

In [0]:
%sql
--Creating the log table "custom_logging" and loading the log data to it
drop table if exists custom_logging;
create table if not exists custom_logging
using text options(path '/FileStore/CustomLogging/*',header = true)