In [None]:
# Get notebook parameter from Azure pipeline
dbutils.widgets.text("_pipeline_run_id","123-123")
dbutils.widgets.text("_filename","nybabynames.csv")
_pipeline_run_id = dbutils.widgets.get("_pipeline_run_id")
_filename = dbutils.widgets.get("_filename")
print (_pipeline_run_id)
print(_filename)

In [None]:
# Configure my account key and account name so Databricks can access the Data Lake
accountName = dbutils.secrets.get("dataLakeScope","accountName")
accountKey = dbutils.secrets.get("dataLakeScope","accountKey")
sparkProperty = f'fs.azure.account.key.{accountName}.dfs.core.windows.net'
spark.conf.set(sparkProperty,accountKey)

In [None]:
# Define the location of my files
bronzeSource = f'abfss://bronze@{accountName}.dfs.core.windows.net/{_filename}'
silverTarget = f'abfss://silver@{accountName}.dfs.core.windows.net/nybabynames'

In [None]:
# Read data from Data Lake
gridDataDf = spark.read.option("inferSchema", "true").csv(path= bronzeSource, header=True)

display(gridDataDf.printSchema)



In [None]:
from  pyspark.sql.functions import *

# Add audit column to the data frame 

processing_date = date_trunc("second",current_timestamp())

# 1. Rename count column 
# 2. Adding current time to process this data set
# 3. Adding pipepeline run id from ADF
# 4. The landig file name. This is useful for debugging prurpose
# 5. Modification date. This help identified order of data when the dataset doesn't have a modification date
gridDataDf = gridDataDf.withColumnRenamed("name_count", "count") \
                       .withColumn("_processing_date", processing_date) \
                       .withColumn("_pipeline_run_id", lit(_pipeline_run_id)) \
                       .withColumn("_input_filename", input_file_name()) \
                       .withColumn("_input_file_modification_date", col("_metadata.file_modification_time"))

display(gridDataDf.printSchema)

In [None]:
from  pyspark.sql.functions import *
from  pyspark.sql import *

#  Data Quality
gridCleanDF = gridDataDf.filter("year IS NOT NULL AND first_name IS NOT NULL AND county IS NOT NULL AND sex IS NOT NULL AND count IS NOT NULL AND count > 0")


# Data Duplication 
gridDataWindowSpec = Window.partitionBy("year","first_name","county","sex").orderBy(col("_input_file_modification_date").desc(),"count")
findLatest = gridCleanDF.withColumn("row_number",row_number().over(gridDataWindowSpec)).filter("row_number == 1").drop("row_number")

gridDataDf = findLatest

In [None]:
from delta.tables import *

# check if the silver contain the delta table
if(DeltaTable.isDeltaTable(spark, silverTarget)): 

    # If yes, merge data with the existing delta table
    DeltaTable.forPath(spark, silverTarget).alias("target").merge(
        source = gridDataDf.alias("src"),
        condition = "target.year = src.year and target.first_name = src.first_name and target.county = src.county and target.sex = src.sex"
    ).whenNotMatchedInsertAll().execute()
else:

    # If no, save the file to silver
    gridDataDf.write.mode("overwrite").format("delta").save(silverTarget)


In [None]:
# create the schema and table, if required

spark.sql("CREATE SCHEMA IF NOT EXISTS nybabynames")
spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS nybabynames.NewYorkBabyNames USING delta LOCATION '{silverTarget}'")

# Note: Using spark.sql because we can use f-string to retrieve the silver

In [None]:
%sql

DESCRIBE EXTENDED nybabynames.NewYorkBabyNames

-- Location: stored in the storage account
-- Provider (format): Delta

In [None]:
%sql

-- Show the transaction log on the delta version

SELECT version, operationMetrics, operationMetrics.numOutputRows, operationMetrics.numTargetRowsInserted, operationMetrics.numTargetRowsUpdated, operationMetrics.numTargetRowsDeleted
FROM (DESCRIBE HISTORY nybabynames.NewYorkBabyNames)

-- if you execute Cmd 7 again, a new version is created with no data changes (numOutputRows = 0)

In [None]:
%sql

-- Check your result for testing. Do not do this in production!
SELECT *
FROM nybabynames.NewYorkBabyNames



