In [0]:
%sql
select * from platform_observability.plt_bronze.brz_billing_usage limit 10

In [0]:
%sql
desc system.lakeflow.job_task_run_timeline

In [0]:
%sql
drop table platform_observability.plt_bronze.brz_billing_usage

In [0]:
%sql
select * from platform_observability.plt_bronze.brz_compute_clusters

In [0]:
# Clear module cache to ensure fresh imports
import sys
modules_to_clear = [
    'libs.error_handling',
    'libs.logging', 
    'libs.monitoring',
    'config'
]

for module in modules_to_clear:
    if module in sys.modules:
        del sys.modules[module]
        print(f"Cleared {module} from cache")

# Force reimport
from libs.error_handling import validate_data_quality
from libs.logging import StructuredLogger
print("✅ Modules reloaded successfully")

In [0]:
# Clear cache for bronze job
import importlib
import sys

# Clear the specific module
if 'notebooks.bronze_hwm_ingest_job' in sys.modules:
    importlib.reload(sys.modules['notebooks.bronze_hwm_ingest_job'])
    print("✅ Bronze job module reloaded")

# Clear all variables
%reset -f
print("✅ All variables cleared")

In [0]:
%sql
DROP TABLE IF EXISTS platform_observability.plt_bronze.brz_compute_clusters;

In [0]:
%sql
SELECT * FROM platform_observability.plt_bronze.error_logs 
WHERE notebook_name = 'silver_hwm_build_job' 
ORDER BY timestamp DESC 
LIMIT 10;

In [0]:
%sql
truncate table platform_observability.plt_bronze.error_logs 

In [0]:
def clear_old_errors():
    try:
        table_name = f"{config.catalog}.{config.bronze_schema}.error_logs"
        
        result = spark.sql(f"""
            DELETE FROM {table_name}
            WHERE timestamp < current_timestamp() - INTERVAL 7 DAYS
        """)
        
        print("✅ Old errors cleared")
    except Exception as e:
        print(f"❌ Error clearing old errors: {str(e)}")

clear_old_errors()

In [0]:
%sql
select * from platform_observability.plt_silver.slv_workspace

In [0]:
%sql
select * from platform_observability.plt_bronze.brz_billing_list_prices

In [0]:
%sql
desc platform_observability.plt_bronze.brz_lakeflow_job_task_run_timeline

In [0]:
%sql
select * from system.compute.clusters

In [0]:
%sql
select * from platform_observability.plt_bronze.brz_billing_list_prices

In [0]:
%sql
select * from system.lakeflow.jobs

In [0]:
source.account_id, source.workspace_id, source.cluster_id, source.cluster_name, source.owned_by, source.create_time, source.delete_time, source.driver_node_type, source.worker_node_type, source.worker_count, source.min_autoscale_workers, source.max_autoscale_workers, source.auto_termination_minutes, source.enable_elastic_disk, source.tags, source.cluster_source, source.init_scripts, source.aws_attributes, source.azure_attributes, source.gcp_attributes, source.driver_instance_pool_id, source.worker_instance_pool_id, source.dbr_version, source.change_time, source.change_date, source.data_security_mode, source.policy_id, source._loaded_at, source.valid_from, source.valid_to, source.is_current, source.worker_node_type_category

In [0]:
%sql
CREATE TABLE IF NOT EXISTS {catalog}.{silver_schema}.slv_clusters (
    account_id STRING,
    workspace_id STRING,                 -- Changed from BIGINT to STRING
    cluster_id STRING,
    cluster_name STRING,
    owned_by STRING,
    create_time TIMESTAMP,
    delete_time TIMESTAMP,
    driver_node_type STRING,             -- Renamed from driver_node_type_id
    worker_node_type STRING,             -- Renamed from node_type_id
    worker_count BIGINT,                 -- Total worker count
    min_autoscale_workers BIGINT,        -- Renamed from min_workers
    max_autoscale_workers BIGINT,        -- Renamed from max_workers
    auto_termination_minutes BIGINT,
    enable_elastic_disk BOOLEAN,
    tags MAP<STRING, STRING>,
    cluster_source STRING,
    init_scripts ARRAY<STRING>,
    aws_attributes STRUCT<
        instance_profile_arn:STRING,
        zone_id:STRING,
        first_on_demand:INT,
        availability:STRING,
        spot_bid_price_percent:INT,
        ebs_volume_type:STRING,
        ebs_volume_count:INT,
        ebs_volume_size:INT,
        ebs_volume_iops:INT,
        ebs_volume_throughput:INT
    >,
    azure_attributes STRUCT<
        first_on_demand:INT,
        availability:STRING,
        spot_bid_max_price:DOUBLE
    >,
    gcp_attributes STRUCT<
        use_preemptible_executors:BOOLEAN,
        zone_id:STRING,
        first_on_demand:INT,
        availability:STRING
    >,
    driver_instance_pool_id STRING,
    worker_instance_pool_id STRING,
    dbr_version STRING,                  -- Renamed from spark_version
    change_time TIMESTAMP,
    change_date DATE,
    data_security_mode STRING,
    policy_id STRING,
    -- NEW: Node type categorization (retain original + add categorized)
    worker_node_type_category STRING,    -- Categorized node type (General Purpose, Memory Optimized, etc.)
    -- NEW: Inherited job tags for cost attribution
    inherited_line_of_business STRING,
    inherited_department STRING,
    inherited_cost_center STRING,
    inherited_environment STRING,
    inherited_use_case STRING,
    inherited_workflow_level STRING,
    inherited_parent_workflow STRING,
    -- NEW: Computed runtime fields
    major_version INT,
    minor_version INT,
    runtime_age_months INT,
    is_lts BOOLEAN,
    _loaded_at TIMESTAMP,
    -- SCD2 columns
    valid_from TIMESTAMP,
    valid_to TIMESTAMP,
    is_current BOOLEAN
) USING DELTA;

In [0]:
%sql
select * from platform_observability.plt_silver.slv_clusters

In [0]:
%sql
CREATE OR REPLACE TABLE platform_observability.plt_gold.gld_dim_node_type (
    node_type_key BIGINT GENERATED ALWAYS AS IDENTITY,  -- Surrogate key
    account_id STRING,
    node_type STRING,                    -- Natural key (renamed from node_type_id to match source)
    core_count DOUBLE,                   -- Renamed from vcpus to match source
    memory_mb BIGINT,                    -- Renamed from memory_gb and changed type
    gpu_count BIGINT,                    -- Changed type to match source
    category STRING,                     -- Node type category from Silver layer
    worker_node_type_category STRING     -- Categorized node type (General Purpose, Memory Optimized, etc.)
)
USING DELTA;

In [0]:
%sql
select * from platform_observability.plt_gold.gld_dim_cluster

In [0]:
%sql
select * from platform_observability.plt_gold.gld_dim_sku

In [0]:
%sql
select * from platform_observability.plt_silver.slv_usage_txn

In [0]:
%sql
select * from platform_observability.plt_silver.slv_job_run_timeline

In [0]:
%sql
select * from platform_observability.plt_silver.slv_usage_txn

In [0]:
%sql
select * from platform_observability.plt_gold.gld_dim_entity

In [0]:
%sql
select * from platform_observability.plt_bronze.brz_lakeflow_pipelines

In [0]:
%sql
select * from platform_observability.plt_gold.gld_dim_entity

In [0]:
%sql
select * from platform_observability.plt_silver.slv_entity_latest

In [0]:
%sql
select * from platform_observability.plt_bronze.brz_lakeflow_pipelines

In [0]:
%sql
select * from 

In [0]:
%sql
CREATE OR REPLACE VIEW platform_observability.plt_gold.v_department_cost_summary AS 
            SELECT 
                f.department as department_code,
                w.workspace_name,
                f.date_key,
                SUM(f.list_cost_usd) as department_total_cost,
                COUNT(DISTINCT e.entity_id) as active_entities,
                AVG(f.list_cost_usd) as avg_cost_per_entity,
                COUNT(DISTINCT w.workspace_id) as active_workspaces
            FROM platform_observability.plt_gold.gld_fact_usage_priced_day f
            JOIN platform_observability.plt_gold.gld_dim_workspace w ON f.workspace_key = w.workspace_key
            JOIN platform_observability.plt_gold.gld_dim_entity e ON f.entity_key = e.entity_key
            GROUP BY f.department, w.workspace_name, f.date_key, e.entity_id