In [0]:
spark.conf.set(
    "fs.azure.sas.poddemo.demolakehouse.blob.core.windows.net",
    "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-03-24T12:27:36Z&st=2025-03-24T04:27:36Z&spr=https&sig=SWuoDnXSaD7CkcIYirThAH11YTJmQvqFvKODe908mDc%3D"
)


In [0]:
# Define ADLS folder path
adls_path = "wasbs://poddemo@demolakehouse.blob.core.windows.net/Aqualake_sourcefiles/"

# List all files in the folder
files = dbutils.fs.ls(adls_path)

# Print available file names (for debugging)
for f in files:
    print(f.name)


Customers.csv
Sales.csv
categories.csv
cities.csv
countries.csv
employees.csv
products.csv


In [0]:
# Dictionary to store DataFrames
dfs = {}

# Define file mappings (matching exact case-sensitive names)
expected_files = {
    "customers": "Customers.csv",
    "sales": "Sales.csv",
    "categories": "categories.csv",
    "cities": "cities.csv",
    "countries": "countries.csv",
    "employees": "employees.csv",
    "products": "products.csv"
}

# Read each file dynamically
for table_name, file_name in expected_files.items():
    file_path = adls_path + file_name
    try:
        df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
        dfs[table_name] = df  # Store DataFrame in dictionary
        print(f"✅ Successfully loaded {file_name} into {table_name}_df")
    except Exception as e:
        print(f"❌ Error loading {file_name}: {e}")


✅ Successfully loaded Customers.csv into customers_df
✅ Successfully loaded Sales.csv into sales_df
✅ Successfully loaded categories.csv into categories_df
✅ Successfully loaded cities.csv into cities_df
✅ Successfully loaded countries.csv into countries_df
✅ Successfully loaded employees.csv into employees_df
✅ Successfully loaded products.csv into products_df


In [0]:
# Show first 5 rows for each DataFrame
for table_name, df in dfs.items():
    print(f"📌 Sample data from {table_name.upper()}:")
    df.show(5)


📌 Sample data from CUSTOMERS:
+----------+---------+-------------+--------+------+--------------------+
|CustomerID|FirstName|MiddleInitial|LastName|CityID|             Address|
+----------+---------+-------------+--------+------+--------------------+
|         1| Stefanie|            Y|    Frye|    79|       97 Oak Avenue|
|         2|    Sandy|            T|   Kirby|    96|52 White First Fr...|
|         3|      Lee|            T|   Zhang|    55|921 White Fabien ...|
|         4|   Regina|            S|   Avery|    40|       75 Old Avenue|
|         5|   Daniel|            S|  Mccann|     2|283 South Green H...|
+----------+---------+-------------+--------+------+--------------------+
only showing top 5 rows

📌 Sample data from SALES:
+-------+-------------+----------+---------+--------+--------+----------+---------+--------------------+
|SalesID|SalesPersonID|CustomerID|ProductID|Quantity|Discount|TotalPrice|SalesDate|   TransactionNumber|
+-------+-------------+----------+---------

In [0]:
# Print schema (data types) for each DataFrame
for table_name, df in dfs.items():
    print(f"📌 Schema for {table_name.upper()}:")
    df.printSchema()
    print("-" * 50)


📌 Schema for CUSTOMERS:
root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleInitial: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- CityID: integer (nullable = true)
 |-- Address: string (nullable = true)

--------------------------------------------------
📌 Schema for SALES:
root
 |-- SalesID: integer (nullable = true)
 |-- SalesPersonID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- TotalPrice: integer (nullable = true)
 |-- SalesDate: string (nullable = true)
 |-- TransactionNumber: string (nullable = true)

--------------------------------------------------
📌 Schema for CATEGORIES:
root
 |-- CategoryID: integer (nullable = true)
 |-- CategoryName: string (nullable = true)

--------------------------------------------------
📌 Schema for CITIES:
root
 |-- CityID: in

In [0]:
from pyspark.sql.functions import lit, current_timestamp

# Add Timestamp and ActionStatus before writing
dfs_enriched = {}
for table_name, df in dfs.items():
    df = df.withColumn("Timestamp", current_timestamp()).withColumn("ActionStatus", lit("Insert"))
    dfs_enriched[table_name] = df  # Store updated DataFrame
    print(f"✅ Added Timestamp & ActionStatus to {table_name}")


✅ Added Timestamp & ActionStatus to customers
✅ Added Timestamp & ActionStatus to sales
✅ Added Timestamp & ActionStatus to categories
✅ Added Timestamp & ActionStatus to cities
✅ Added Timestamp & ActionStatus to countries
✅ Added Timestamp & ActionStatus to employees
✅ Added Timestamp & ActionStatus to products


In [0]:
%sql
CREATE TABLE IF NOT EXISTS aqualake_test.control_framework (
    TableName STRING,
    SourceCount BIGINT,
    TargetCount BIGINT,
    LoadTime TIMESTAMP,
    Status STRING
) USING DELTA;


In [0]:
# Define catalog and database
catalog_name = "podlakehousedemo"
database_name = "aqualake_test"

# Ensure the database exists in Unity Catalog
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_name}.{database_name}")


DataFrame[]

In [0]:
from pyspark.sql.functions import current_timestamp, lit

# Define primary keys for each table
primary_keys = {
    "customers": "CustomerID",
    "sales": "SalesID",
    "categories": "CategoryID",
    "cities": "CityID",
    "countries": "CountryID",
    "employees": "EmployeeID",
    "products": "ProductID"
}

# Perform Delta Merge (Upsert: Insert & Update)
for table_name, df in dfs_enriched.items():
    full_table_name = f"{catalog_name}.{database_name}.{table_name}"  # Use Unity Catalog

    # Check if the table exists
    if spark._jsparkSession.catalog().tableExists(full_table_name):
        print(f"🔄 Performing UPSERT (MERGE) for {table_name}...")

        # Merge query to update existing records and insert new ones
        merge_query = f"""
        MERGE INTO {full_table_name} AS target
        USING (SELECT * FROM new_data) AS source
        ON target.{primary_keys[table_name]} = source.{primary_keys[table_name]}
        WHEN MATCHED THEN 
            UPDATE SET 
                target.* = source.*, 
                target.ActionStatus = 'Update', 
                target.Timestamp = current_timestamp()
        WHEN NOT MATCHED THEN 
            INSERT * 
        """

        # Create temporary view for new data
        df.createOrReplaceTempView("new_data")
        spark.sql(merge_query)

    else:
        print(f"🚀 First Load: Creating Delta Table in Unity Catalog for {table_name}...")
        df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)

    print(f"✅ {table_name} data successfully loaded into Unity Catalog")


🚀 First Load: Creating Delta Table in Unity Catalog for customers...
✅ customers data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for sales...
✅ sales data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for categories...
✅ categories data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for cities...
✅ cities data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for countries...
✅ countries data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for employees...
✅ employees data successfully loaded into Unity Catalog
🚀 First Load: Creating Delta Table in Unity Catalog for products...
✅ products data successfully loaded into Unity Catalog


In [0]:
%sql
TRUNCATE TABLE podlakehousedemo.aqualake_test.customers;
TRUNCATE TABLE podlakehousedemo.aqualake_test.sales;
TRUNCATE TABLE podlakehousedemo.aqualake_test.categories;
TRUNCATE TABLE podlakehousedemo.aqualake_test.cities;
TRUNCATE TABLE podlakehousedemo.aqualake_test.countries;
TRUNCATE TABLE podlakehousedemo.aqualake_test.employees;
TRUNCATE TABLE podlakehousedemo.aqualake_test.products;
Truncate Table podlakehousedemo.aqualake_test.control_framework;

In [0]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.sql import Row

# Define schema to match your control_framework table
control_schema = StructType([
    StructField("TableName", StringType(), False),
    StructField("SourceCount", LongType(), False),  # BIGINT in SQL = LongType in PySpark
    StructField("TargetCount", LongType(), False),  # BIGINT in SQL = LongType in PySpark
    StructField("LoadTime", TimestampType(), False),
    StructField("Status", StringType(), False)
])

# Initialize tracking list
control_records = []

# Perform Delta Merge (Upsert: Insert & Update) with Control Framework
for table_name, df in dfs_enriched.items():
    full_table_name = f"{catalog_name}.{database_name}.{table_name}"  # Use Unity Catalog

    # Count source records
    source_count = df.count()

    try:
        # Check if target table exists
        if spark._jsparkSession.catalog().tableExists(full_table_name):
            print(f"🔄 Performing UPSERT (MERGE) for {table_name}...")

            # Get column names dynamically (excluding ActionStatus & Timestamp)
            columns = [col for col in df.columns if col not in ["ActionStatus", "Timestamp"]]

            # Construct SET clause for update
            update_set_clause = ", ".join([f"target.{col} = source.{col}" for col in columns])
            
            merge_query = f"""
            MERGE INTO {full_table_name} AS target
            USING (SELECT * FROM new_data) AS source
            ON target.{primary_keys[table_name]} = source.{primary_keys[table_name]}
            WHEN MATCHED THEN 
                UPDATE SET 
                    {update_set_clause}, 
                    target.ActionStatus = 'Update', 
                    target.Timestamp = current_timestamp()
            WHEN NOT MATCHED THEN 
                INSERT ({", ".join(df.columns)}) 
                VALUES ({", ".join(["source." + col for col in df.columns])})
            """

            # Create temporary view for new data
            df.createOrReplaceTempView("new_data")
            spark.sql(merge_query)

        else:
            print(f"🚀 First Load: Creating Delta Table for {table_name}...")
            df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)

        # Count target records after load
        target_count = spark.read.table(full_table_name).count()

        # Log success in control table
        control_records.append(Row(table_name, int(source_count), int(target_count), datetime.now(), "Success"))

        print(f"✅ {table_name} loaded successfully: Source ({source_count}) → Target ({target_count})")

    except Exception as e:
        print(f"❌ Error loading {table_name}: {str(e)}")
        control_records.append(Row(table_name, int(source_count), 0, datetime.now(), "Failure"))

# Convert tracking data to DataFrame using defined schema
control_df = spark.createDataFrame(control_records, schema=control_schema)

# Define Control Framework Table
control_table = f"{catalog_name}.{database_name}.control_framework"

# ✅ Ensure schema consistency before appending
if spark._jsparkSession.catalog().tableExists(control_table):
    print("🔎 Checking schema compatibility for control framework...")

    # Append control records with explicit casting
    control_df = control_df.selectExpr(
        "TableName",
        "CAST(SourceCount AS BIGINT) AS SourceCount",
        "CAST(TargetCount AS BIGINT) AS TargetCount",
        "LoadTime",
        "Status"
    )

    control_df.write.mode("append").saveAsTable(control_table)
else:
    print("🚀 Creating new Control Framework table...")
    control_df.write.format("delta").mode("overwrite").saveAsTable(control_table)

print("📊 Control Framework updated successfully!")


🔄 Performing UPSERT (MERGE) for customers...
✅ customers loaded successfully: Source (2000) → Target (2000)
🔄 Performing UPSERT (MERGE) for sales...
✅ sales loaded successfully: Source (12556) → Target (12556)
🔄 Performing UPSERT (MERGE) for categories...
✅ categories loaded successfully: Source (11) → Target (11)
🔄 Performing UPSERT (MERGE) for cities...
✅ cities loaded successfully: Source (96) → Target (96)
🔄 Performing UPSERT (MERGE) for countries...
✅ countries loaded successfully: Source (206) → Target (206)
🔄 Performing UPSERT (MERGE) for employees...
✅ employees loaded successfully: Source (23) → Target (23)
🔄 Performing UPSERT (MERGE) for products...
✅ products loaded successfully: Source (452) → Target (452)
🔎 Checking schema compatibility for control framework...
📊 Control Framework updated successfully!


In [0]:
%sql
select * from podlakehousedemo.aqualake_test.control_framework

TableName,SourceCount,TargetCount,LoadTime,Status
employees,23,23,2025-03-24T05:45:17.981693Z,Success
products,452,452,2025-03-24T05:45:20.568277Z,Success
cities,96,96,2025-03-24T05:45:12.879212Z,Success
countries,206,206,2025-03-24T05:45:15.590004Z,Success
sales,12556,12556,2025-03-24T05:45:07.832249Z,Success
categories,11,11,2025-03-24T05:45:10.623297Z,Success
customers,2000,2000,2025-03-24T05:45:05.202617Z,Success


In [0]:
%sql
select * from aqualake.customers

CustomerID,FirstName,MiddleInitial,LastName,CityID,Address,Timestamp,ActionStatus
2194,Walter,F,Sellers,63,22 Hague Street,2025-03-24T06:38:28.369Z,Update
2195,Marie,F,Sharp,16,20 Oak Street,2025-03-24T06:38:28.369Z,Update
2196,Kari,C,Cain,28,99 South Rocky Second Road,2025-03-24T06:38:28.369Z,Update
2197,Tricia,F,Sheppard,44,80 Cowley Parkway,2025-03-24T06:38:28.369Z,Update
2198,Brendan,F,Glenn,55,40 Rocky Old Street,2025-03-24T06:38:28.369Z,Update
2199,Gerard,I,Conrad,49,202 Cowley Drive,2025-03-24T06:38:28.369Z,Update
2200,Jill,K,Sherman,81,66 North Green Hague Avenue,2025-03-24T06:38:28.369Z,Update
2201,Theresa,T,Barker,85,36 Oak Street,2025-03-24T06:38:28.369Z,Update
2202,Valerie,K,Sanford,43,12 Clarendon Way,2025-03-24T06:38:28.369Z,Update
2203,Alison,T,Guzman,4,154 North Milton Freeway,2025-03-24T06:38:28.369Z,Update
