# Gold Layer Data Processing
This notebook creates gold Delta tables from silver data using configurations.

In [0]:
# Databricks notebook source
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import dlt

In [0]:
dbutils.widgets.dropdown("should_test", "false", ["true", "false"])

In [0]:
should_test = dbutils.widgets.get("should_test")

In [0]:
def read_sql_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

In [0]:
# Get list of SQL files in gold config folder
gold_path = "./configs/gold"
sql_files = [f for f in os.listdir(gold_path) if f.endswith('.sql')]

In [0]:
# Read and execute each view definition from configs/gold
gold_path = Path('configs/gold')
for sql_file in gold_path.glob('*.sql'):
    with open(sql_file, 'r') as f:
        view_definition = f.read()
        spark.sql(view_definition)

In [0]:
# List all created views
views = spark.sql("SHOW VIEWS IN practice_sandbox.ma_sandbox").filter("viewName LIKE 'gold_%'")
display(views)

In [0]:
def test_not_null_ids(view_name):
    """Test that ID columns don't contain nulls"""
    # Get all ID columns (assuming they end with '_id' or are named 'id')
    columns_df = spark.sql(f"DESCRIBE {view_name}")
    id_columns = [row.col_name for row in columns_df.collect() 
                 if row.col_name.lower().endswith('_id') 
                 or row.col_name.lower() == 'id']
    
    results = []
    for col in id_columns:
        null_count = spark.sql(f"SELECT COUNT(*) as null_count FROM {view_name} WHERE {col} IS NULL").collect()[0].null_count
        results.append({
            'view': view_name,
            'column': col,
            'test': 'not_null',
            'passed': null_count == 0,
            'failed_records': null_count
        })
    return results

def test_unique_ids(view_name):
    """Test that ID columns contain unique values"""
    columns_df = spark.sql(f"DESCRIBE {view_name}")
    id_columns = [row.col_name for row in columns_df.collect() 
                 if row.col_name.lower().endswith('_id') 
                 or row.col_name.lower() == 'id']
    
    results = []
    for col in id_columns:
        duplicate_count = spark.sql(f"""
            SELECT COUNT(*) as dup_count 
            FROM (
                SELECT {col}
                FROM {view_name}
                WHERE {col} IS NOT NULL
                GROUP BY {col}
                HAVING COUNT(*) > 1
            )
        """).collect()[0].dup_count
        
        results.append({
            'view': view_name,
            'column': col,
            'test': 'unique',
            'passed': duplicate_count == 0,
            'failed_records': duplicate_count
        })
    return results


In [0]:
if should_test:
    print("Running data quality tests...")
    all_test_results = []

    views_list = spark.sql("SHOW VIEWS IN practice_sandbox.ma_sandbox").filter("viewName LIKE 'gold_%'").collect()
    for view in views_list:
        view_name = f"practice_sandbox.ma_sandbox.{view.viewName}"
        
        # Run not null tests
        null_results = test_not_null_ids(view_name)
        all_test_results.extend(null_results)
        
        # Run unique tests
        unique_results = test_unique_ids(view_name)
        all_test_results.extend(unique_results)

    # Create DataFrame with test results
    test_results_df = spark.createDataFrame(all_test_results)
    display(test_results_df)

    # Check if any tests failed
    failed_tests = test_results_df.filter("passed = false").count()
    if failed_tests > 0:
        print(f"\n⚠️ {failed_tests} tests failed! Check the results above for details.")
    else:
        print("\n✅ All tests passed!")
else:
    print("Skipping tests as should_test is set to false")