In [2]:
# Welcome to your new notebook
# Type here in the cell editor to add code!



StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 4, Finished, Available, Finished)


###### ------------------------------------------------------------
###### Load Common Lakehouse Path Definitions
###### ------------------------------------------------------------
###### This command runs the "Lake_House_Paths" notebook/script,
###### which contains variables and constants like `injection_lh_fpath`,
###### `bronz_lh_fpath`, etc., used to define file paths for different layers.
###### -----------------------------------------------------------

###### Uncomment this line before executing
 



In [None]:
%run Lake_House_Paths 

In [17]:

# ------------------------------------------------------------
# Run Bronze Layer Notebook
# ------------------------------------------------------------
# This command executes the `Bronze_Layer_notebook_sw` notebook,
# which handles:
# - Reading raw HR data from the staging layer (CSV)
# - Writing the cleaned/raw data to the bronze layer (Parquet format)
# 
# Ensure that `Lake_House_Paths` has been run before this to define path variables.
# ------------------------------------------------------------



StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 22, Finished, Available, Finished)

In [18]:
%run Bronze_Layer_notebook_sw

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 29, Finished, Available, Finished)

HR data successfully loaded into the Staging Layer: 'hr_data_staging.csv'
root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- department: string (nullable = true)
 |-- jobtitle: string (nullable = true)
 |-- location: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- termdate: timestamp (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_state: string (nullable = true)

Number of rows  22227


SynapseWidget(Synapse.DataFrame, 2c8ddf74-eb74-4c55-9c2f-b0216cb62a81)

✅ HR data successfully moved to the Bronze Layer: 'hr_data_bronze.parquet'
HR data successfully moved to the Bronze Layer: 'hr_data_bronze.parquet'


In [19]:
# Bronze Layer Data


# ------------------------------------------------------------
# This section reads the HR data from the Bronze Layer, which 
# is stored in Parquet format, and displays a sample of the data.
# ------------------------------------------------------------
bronze_df = spark.read.parquet(bronze_file_path)


# Show sample data
# bronze_df.show(5)



StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 30, Finished, Available, Finished)

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable



# ------------------------------------------------------------
# Initialize Spark Session with Delta Lake Support
# ------------------------------------------------------------
# This Spark session is configured to work with Delta Lake
# ------------------------------------------------------------

# Initialize Spark Session with Delta support
spark = SparkSession.builder \
    .appName("HR Silver Layer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

    


StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 35, Finished, Available, Finished)

In [25]:
# Count Total Records and Null Values Before Processing
total_records_before = bronze_df
null_counts_before = {col_name: bronze_df.filter(col(col_name).isNull()).count() for col_name in bronze_df.columns}

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 36, Finished, Available, Finished)

In [26]:
# Remove Nulls (except termdate)
columns_to_clean = [col_name for col_name in bronze_df.columns if col_name != "termdate"]
df_cleaned = bronze_df.dropna(subset=columns_to_clean)


StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 37, Finished, Available, Finished)

In [27]:
# Remove Duplicates
df_silver = df_cleaned.dropDuplicates()

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 38, Finished, Available, Finished)

In [28]:
#  Null Value count
null_counts_before = {c: bronze_df.filter(col(c).isNull()).count() for c in bronze_df.columns}

# Count Total Records After Processing
total_records_before = bronze_df.count()


StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 39, Finished, Available, Finished)

In [29]:
# Count after processing
total_records_after = df_silver.count()
nulls_removed = total_records_before - df_cleaned.count()  #  Get count before subtracting
duplicates_removed = df_cleaned.count() - total_records_after  #  Get count before subtracting

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 40, Finished, Available, Finished)

In [31]:

# ------------------------------------------------------------
# Silver Layer Processing Summary Log
# ------------------------------------------------------------
# This section prints a summary of data cleaning operations
# applied during the Silver Layer transformation.
# ------------------------------------------------------------

print(f"""
Silver Layer Processing Summary:

Total records before processing: {total_records_before}
Null values removed (except termdate): {nulls_removed}
Duplicate records removed: {duplicates_removed}
Total records after cleaning: {total_records_after}
""")

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 42, Finished, Available, Finished)


Silver Layer Processing Summary:

Total records before processing: 22227
Null values removed (except termdate): 0
Duplicate records removed: 13
Total records after cleaning: 22214



In [32]:
# ------------------------------------------------------------
# Create Cleaned Integer ID Column in Silver Layer
# ------------------------------------------------------------
# This transformation removes dashes from the 'id' column 
# and creates a new column 'int_id' cast as IntegerType.
# Useful for downstream operations like joins, indexing, or BI tools.
# ------------------------------------------------------------

from pyspark.sql.functions import *
cleand_id = df_silver.withColumn("int_id", regexp_replace(col("id"), "-", ""))
silver_df = cleand_id.withColumn("int_id", col("int_id").cast("int"))
# silver_df.select("id", "id_cleaned").show(10)
display(silver_df)


StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3837862e-3a0b-4a70-84c2-55ec380baef3)

In [33]:
# ------------------------------------------------------------
# Convert 'birthdate' and 'hire_date' Columns to DateType
# ------------------------------------------------------------
# This step ensures that date fields are properly cast for 
# downstream time-based operations (e.g., age calculation, tenure).
# The input date format is assumed to be "M/d/yyyy" (e.g., 3/14/1990).
# ------------------------------------------------------------


silver_df_birth_date = silver_df.withColumn("birthdate", to_date(col("birthdate"), "M/d/yyyy"))
hr_data_silver = silver_df_birth_date.withColumn("hire_date",to_date(col("hire_date"),"M/d/yyyy"))
# display(silver_df_hire_date)
# display(silver_df_birth_date)
hr_data_silver.printSchema()


StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 44, Finished, Available, Finished)

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- department: string (nullable = true)
 |-- jobtitle: string (nullable = true)
 |-- location: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- termdate: timestamp (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_state: string (nullable = true)
 |-- int_id: integer (nullable = true)



In [34]:
# ------------------------------------------------------------
# Rename Column: 'int_id' → 'employee_id'
# ------------------------------------------------------------
# This step improves clarity and consistency by renaming the
# technical column name 'int_id' to a more meaningful 'employee_id'.
# ------------------------------------------------------------

hr_data_silver = hr_data_silver.withColumnRenamed("int_id","employee_id") 
display(hr_data_silver)



StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 45, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8af30f6d-2faf-42b4-ab04-660d651c11fe)

In [35]:
hr_data_silver.write.format("delta").mode("overwrite").save(silver_lh_fpath)

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 46, Finished, Available, Finished)

In [36]:
# ------------------------------------------------------------
# Save Silver Layer Data as Delta Table and Reload for Verification
# ------------------------------------------------------------
# This step writes the cleaned HR data to the Silver Layer path 
# in Delta format and reads it back to verify successful write.
# ------------------------------------------------------------

silver_path = r"abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_lakehouse_1.Lakehouse/Tables/hr_data_silver "
hr_data_silver.write.format("delta").mode("overwrite").save(silver_path)
test_df = spark.read.format("delta").load(silver_path)
display(test_df)

StatementMeta(, ef998733-0497-402a-bbad-ecd9388a8da0, 47, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, eb880fbe-354d-4453-84de-c6218e71314b)