# Snowflake Horizon Catalog - Complete Feature Guide

This comprehensive notebook covers all features of Snowflake's Horizon Catalog, including:

- **Iceberg Tables** - Open table format support
- **Traditional Tables & Views** - Standard Snowflake objects
- **AI & ML Objects** - Machine learning models and functions
- **Costs & Usage** - Monitoring and optimization
- **Data Warehouses** - Compute resource management
- **Data Shares** - Secure data sharing
- **Semantic Views** - Business-friendly data access
- **Databases & Schemas** - Data organization
- **Business Glossary** - Data governance and documentation
- **Workflows** - Automated data pipelines
- **RBAC** - Role-based access control
- **Audit Trail & Logging** - Security and compliance
- **Metadata Retrieval** - Data discovery and lineage
- **Data Quality** - Validation and monitoring

## Prerequisites

Before running this notebook, ensure you have:
- Snowflake account with Horizon Catalog enabled
- Appropriate permissions for catalog management
- Access to sample data for demonstrations


In [None]:
# Import required libraries
import snowflake.connector
import pandas as pd
import json
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configuration - Update these with your Snowflake connection details
CONNECTION_CONFIG = {
    'account': 'your_account.snowflakecomputing.com',
    'user': 'your_username',
    'password': 'your_password',
    'warehouse': 'COMPUTE_WH',
    'database': 'HORIZON_CATALOG_DEMO',
    'schema': 'PUBLIC',
    'role': 'ACCOUNTADMIN'
}

# Initialize connection
def get_snowflake_connection():
    """Establish connection to Snowflake"""
    try:
        conn = snowflake.connector.connect(**CONNECTION_CONFIG)
        print("✅ Successfully connected to Snowflake")
        return conn
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        return None

# Test connection
conn = get_snowflake_connection()


# 1. Iceberg Tables

Apache Iceberg is an open table format that provides ACID transactions, schema evolution, and time travel capabilities. Snowflake Horizon Catalog supports Iceberg tables for better interoperability and performance.

## Key Features:
- **ACID Transactions**: Ensures data consistency
- **Schema Evolution**: Add, drop, or modify columns without breaking existing queries
- **Time Travel**: Access historical data versions
- **Partition Evolution**: Modify partitioning schemes
- **Multi-table Transactions**: Atomic operations across multiple tables


In [None]:
# Iceberg Tables Examples

def execute_sql(conn, sql, description=""):
    """Execute SQL and return results as DataFrame"""
    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        results = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(results, columns=columns)
        print(f"✅ {description}")
        return df
    except Exception as e:
        print(f"❌ Error executing SQL: {e}")
        return None

# 1. Create Iceberg Catalog
print("Creating Iceberg Catalog...")
create_catalog_sql = """
CREATE CATALOG IF NOT EXISTS iceberg_catalog
WITH (
    CATALOG_TYPE = 'ICEBERG',
    CATALOG_PROVIDER = 'SNOWFLAKE',
    CATALOG_SOURCE = 'SNOWFLAKE'
);
"""
execute_sql(conn, create_catalog_sql, "Iceberg catalog created")

# 2. Create Iceberg Table
print("\nCreating Iceberg Table...")
create_iceberg_table_sql = """
CREATE TABLE iceberg_catalog.sales_data (
    order_id STRING,
    customer_id STRING,
    product_id STRING,
    order_date DATE,
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(12,2)
)
USING ICEBERG
PARTITIONED BY (order_date)
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy'
);
"""
execute_sql(conn, create_iceberg_table_sql, "Iceberg table created")

# 3. Insert sample data
print("\nInserting sample data...")
insert_data_sql = """
INSERT INTO iceberg_catalog.sales_data VALUES
('ORD001', 'CUST001', 'PROD001', '2024-01-15', 2, 25.50, 51.00),
('ORD002', 'CUST002', 'PROD002', '2024-01-16', 1, 45.00, 45.00),
('ORD003', 'CUST001', 'PROD003', '2024-01-17', 3, 15.75, 47.25),
('ORD004', 'CUST003', 'PROD001', '2024-01-18', 1, 25.50, 25.50),
('ORD005', 'CUST002', 'PROD004', '2024-01-19', 2, 30.00, 60.00);
"""
execute_sql(conn, insert_data_sql, "Sample data inserted")

# 4. Query Iceberg table
print("\nQuerying Iceberg table...")
query_sql = "SELECT * FROM iceberg_catalog.sales_data ORDER BY order_date;"
df_sales = execute_sql(conn, query_sql, "Data retrieved from Iceberg table")
if df_sales is not None:
    print(df_sales)


In [None]:
# Iceberg Advanced Features

# 5. Schema Evolution - Add new column
print("Demonstrating Schema Evolution...")
alter_table_sql = """
ALTER TABLE iceberg_catalog.sales_data 
ADD COLUMN discount_percent DECIMAL(5,2) DEFAULT 0.0;
"""
execute_sql(conn, alter_table_sql, "Added discount_percent column")

# 6. Update data with new column
update_sql = """
UPDATE iceberg_catalog.sales_data 
SET discount_percent = 10.0 
WHERE customer_id = 'CUST001';
"""
execute_sql(conn, update_sql, "Updated data with discount")

# 7. Time Travel - Show table history
print("\nTime Travel - Table History:")
history_sql = """
SELECT 
    snapshot_id,
    committed_at,
    operation,
    summary
FROM iceberg_catalog.sales_data.snapshots
ORDER BY committed_at DESC;
"""
df_history = execute_sql(conn, history_sql, "Retrieved table history")
if df_history is not None:
    print(df_history)

# 8. Partition Evolution
print("\nPartition Evolution:")
partition_info_sql = """
SELECT 
    partition_spec_id,
    partition_fields,
    partition_values
FROM iceberg_catalog.sales_data.partitions
ORDER BY partition_spec_id;
"""
df_partitions = execute_sql(conn, partition_info_sql, "Retrieved partition information")
if df_partitions is not None:
    print(df_partitions)

# 9. Table Properties and Metadata
print("\nTable Properties:")
properties_sql = """
SHOW TABLE PROPERTIES iceberg_catalog.sales_data;
"""
execute_sql(conn, properties_sql, "Retrieved table properties")


# 2. Traditional Tables & Views

Snowflake's traditional tables and views provide the foundation for data storage and access patterns. Horizon Catalog enhances these with improved metadata management and governance capabilities.

## Table Types:
- **Permanent Tables**: Standard persistent storage
- **Temporary Tables**: Session-scoped storage
- **Transient Tables**: Short-term storage with reduced durability
- **External Tables**: Reference external data sources
- **Materialized Views**: Pre-computed query results
- **Dynamic Tables**: Automated data pipelines


In [None]:
# Traditional Tables & Views Examples

# 1. Create different types of tables
print("Creating Traditional Tables...")

# Permanent Table
create_permanent_table_sql = """
CREATE OR REPLACE TABLE customers (
    customer_id STRING PRIMARY KEY,
    first_name STRING,
    last_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    zip_code STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
"""
execute_sql(conn, create_permanent_table_sql, "Permanent table created")

# Temporary Table
create_temp_table_sql = """
CREATE OR REPLACE TEMPORARY TABLE temp_orders (
    order_id STRING,
    customer_id STRING,
    order_date DATE,
    status STRING
);
"""
execute_sql(conn, create_temp_table_sql, "Temporary table created")

# Transient Table
create_transient_table_sql = """
CREATE OR REPLACE TRANSIENT TABLE staging_products (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DECIMAL(10,2),
    stock_quantity INTEGER
);
"""
execute_sql(conn, create_transient_table_sql, "Transient table created")

# 2. Insert sample data
print("\nInserting sample data...")

# Insert into customers table
insert_customers_sql = """
INSERT INTO customers VALUES
('CUST001', 'John', 'Doe', 'john.doe@email.com', '555-0101', '123 Main St', 'New York', 'NY', '10001', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP()),
('CUST002', 'Jane', 'Smith', 'jane.smith@email.com', '555-0102', '456 Oak Ave', 'Los Angeles', 'CA', '90210', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP()),
('CUST003', 'Bob', 'Johnson', 'bob.johnson@email.com', '555-0103', '789 Pine Rd', 'Chicago', 'IL', '60601', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP());
"""
execute_sql(conn, insert_customers_sql, "Customer data inserted")

# Insert into staging products
insert_products_sql = """
INSERT INTO staging_products VALUES
('PROD001', 'Laptop Computer', 'Electronics', 999.99, 50),
('PROD002', 'Office Chair', 'Furniture', 299.99, 25),
('PROD003', 'Coffee Mug', 'Kitchen', 12.99, 100),
('PROD004', 'Desk Lamp', 'Office', 45.99, 75);
"""
execute_sql(conn, insert_products_sql, "Product data inserted")

# 3. Create Views
print("\nCreating Views...")

# Standard View
create_view_sql = """
CREATE OR REPLACE VIEW customer_summary AS
SELECT 
    customer_id,
    CONCAT(first_name, ' ', last_name) AS full_name,
    email,
    city,
    state,
    created_date
FROM customers
WHERE created_date >= DATEADD(day, -30, CURRENT_DATE());
"""
execute_sql(conn, create_view_sql, "Standard view created")

# Materialized View
create_materialized_view_sql = """
CREATE OR REPLACE MATERIALIZED VIEW product_summary AS
SELECT 
    category,
    COUNT(*) AS product_count,
    AVG(price) AS avg_price,
    SUM(stock_quantity) AS total_stock
FROM staging_products
GROUP BY category;
"""
execute_sql(conn, create_materialized_view_sql, "Materialized view created")

# 4. Query the views
print("\nQuerying Views...")
view_query_sql = "SELECT * FROM customer_summary;"
df_customers = execute_sql(conn, view_query_sql, "Retrieved customer summary")
if df_customers is not None:
    print("Customer Summary:")
    print(df_customers)

materialized_query_sql = "SELECT * FROM product_summary;"
df_products = execute_sql(conn, materialized_query_sql, "Retrieved product summary")
if df_products is not None:
    print("\nProduct Summary:")
    print(df_products)


In [None]:
# Dynamic Tables and Advanced Features

# 5. Create Dynamic Table (Automated Data Pipeline)
print("\nCreating Dynamic Table...")
create_dynamic_table_sql = """
CREATE OR REPLACE DYNAMIC TABLE customer_analytics
TARGET_LAG = '1 hour'
WAREHOUSE = 'COMPUTE_WH'
AS
SELECT 
    c.customer_id,
    c.first_name,
    c.last_name,
    c.city,
    c.state,
    COUNT(s.order_id) AS total_orders,
    SUM(s.total_amount) AS total_spent,
    AVG(s.total_amount) AS avg_order_value,
    MAX(s.order_date) AS last_order_date
FROM customers c
LEFT JOIN iceberg_catalog.sales_data s ON c.customer_id = s.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name, c.city, c.state;
"""
execute_sql(conn, create_dynamic_table_sql, "Dynamic table created")

# 6. External Table (Reference external data)
print("\nCreating External Table...")
create_external_table_sql = """
CREATE OR REPLACE EXTERNAL TABLE external_weather_data (
    date DATE AS (value:c1::DATE),
    temperature FLOAT AS (value:c2::FLOAT),
    humidity INTEGER AS (value:c3::INTEGER),
    city STRING AS (value:c4::STRING)
)
LOCATION = @my_stage/weather/
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);
"""
execute_sql(conn, create_external_table_sql, "External table created")

# 7. Table Cloning
print("\nDemonstrating Table Cloning...")
clone_table_sql = """
CREATE OR REPLACE TABLE customers_backup CLONE customers;
"""
execute_sql(conn, clone_table_sql, "Table cloned")

# 8. Show table information
print("\nTable Information:")
show_tables_sql = "SHOW TABLES;"
df_tables = execute_sql(conn, show_tables_sql, "Retrieved table list")
if df_tables is not None:
    print(df_tables)

# 9. Table Statistics
print("\nTable Statistics:")
stats_sql = """
SELECT 
    table_name,
    row_count,
    bytes,
    created,
    last_altered
FROM information_schema.tables 
WHERE table_schema = 'PUBLIC'
ORDER BY created DESC;
"""
df_stats = execute_sql(conn, stats_sql, "Retrieved table statistics")
if df_stats is not None:
    print(df_stats)


# 3. AI & ML Objects

Snowflake Horizon Catalog provides comprehensive support for AI and Machine Learning objects, enabling organizations to manage ML models, functions, and AI-powered analytics within their data platform.

## AI/ML Object Types:
- **ML Models**: Trained machine learning models
- **ML Functions**: Custom ML functions and procedures
- **Vector Embeddings**: Semantic search capabilities
- **AI Functions**: Built-in AI functions for text analysis
- **ML Pipelines**: Automated ML workflows
- **Model Registry**: Centralized model management


In [None]:
# AI & ML Objects Examples

# 1. Create ML Model Registry
print("Creating ML Model Registry...")
create_model_registry_sql = """
CREATE OR REPLACE SCHEMA ml_models;
"""
execute_sql(conn, create_model_registry_sql, "ML schema created")

# 2. Create ML Model Table
print("\nCreating ML Model Storage...")
create_model_table_sql = """
CREATE OR REPLACE TABLE ml_models.model_registry (
    model_id STRING PRIMARY KEY,
    model_name STRING,
    model_version STRING,
    model_type STRING,
    training_data STRING,
    model_metrics VARIANT,
    model_artifact STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    created_by STRING,
    status STRING DEFAULT 'ACTIVE',
    description STRING
);
"""
execute_sql(conn, create_model_table_sql, "Model registry table created")

# 3. Insert Sample ML Models
print("\nRegistering Sample ML Models...")
insert_models_sql = """
INSERT INTO ml_models.model_registry VALUES
('MODEL001', 'customer_churn_prediction', 'v1.0', 'CLASSIFICATION', 'customers, sales_data', 
 PARSE_JSON('{"accuracy": 0.89, "precision": 0.87, "recall": 0.91, "f1_score": 0.89}'),
 's3://ml-artifacts/churn_model_v1.pkl', CURRENT_TIMESTAMP(), 'data_scientist', 'ACTIVE',
 'Predicts customer churn probability based on purchase history and demographics'),
 
('MODEL002', 'sales_forecasting', 'v2.1', 'REGRESSION', 'sales_data, product_data',
 PARSE_JSON('{"rmse": 125.5, "mae": 98.2, "r2_score": 0.85}'),
 's3://ml-artifacts/sales_forecast_v2.pkl', CURRENT_TIMESTAMP(), 'ml_engineer', 'ACTIVE',
 'Forecasts sales revenue for next quarter based on historical trends'),
 
('MODEL003', 'product_recommendation', 'v1.5', 'RECOMMENDATION', 'customers, sales_data, products',
 PARSE_JSON('{"hit_rate": 0.76, "ndcg": 0.82, "coverage": 0.91}'),
 's3://ml-artifacts/recommendation_v1.pkl', CURRENT_TIMESTAMP(), 'data_scientist', 'ACTIVE',
 'Recommends products to customers based on purchase history and preferences');
"""
execute_sql(conn, insert_models_sql, "ML models registered")

# 4. Create ML Function
print("\nCreating ML Function...")
create_ml_function_sql = """
CREATE OR REPLACE FUNCTION ml_models.predict_churn(customer_id STRING)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
    -- This is a simplified example - in practice, you would call your actual ML model
    SELECT 
        CASE 
            WHEN total_orders < 2 THEN 0.8
            WHEN total_spent < 100 THEN 0.6
            WHEN days_since_last_order > 90 THEN 0.7
            ELSE 0.2
        END as churn_probability
    FROM (
        SELECT 
            COUNT(s.order_id) as total_orders,
            SUM(s.total_amount) as total_spent,
            DATEDIFF(day, MAX(s.order_date), CURRENT_DATE()) as days_since_last_order
        FROM customers c
        LEFT JOIN iceberg_catalog.sales_data s ON c.customer_id = s.customer_id
        WHERE c.customer_id = customer_id
        GROUP BY c.customer_id
    );
$$;
"""
execute_sql(conn, create_ml_function_sql, "ML function created")

# 5. Test ML Function
print("\nTesting ML Function...")
test_ml_function_sql = """
SELECT 
    customer_id,
    first_name,
    last_name,
    ml_models.predict_churn(customer_id) as churn_probability
FROM customers
ORDER BY churn_probability DESC;
"""
df_churn_prediction = execute_sql(conn, test_ml_function_sql, "Churn prediction completed")
if df_churn_prediction is not None:
    print("Churn Prediction Results:")
    print(df_churn_prediction)


In [None]:
# AI Functions and Vector Embeddings

# 6. Create AI Functions for Text Analysis
print("\nCreating AI Functions...")
create_ai_function_sql = """
CREATE OR REPLACE FUNCTION ml_models.analyze_sentiment(text STRING)
RETURNS STRING
LANGUAGE SQL
AS
$$
    -- Simplified sentiment analysis function
    SELECT 
        CASE 
            WHEN UPPER(text) LIKE '%EXCELLENT%' OR UPPER(text) LIKE '%GREAT%' THEN 'POSITIVE'
            WHEN UPPER(text) LIKE '%TERRIBLE%' OR UPPER(text) LIKE '%BAD%' THEN 'NEGATIVE'
            ELSE 'NEUTRAL'
        END;
$$;
"""
execute_sql(conn, create_ai_function_sql, "AI function created")

# 7. Create Vector Embeddings Table
print("\nCreating Vector Embeddings...")
create_embeddings_sql = """
CREATE OR REPLACE TABLE ml_models.product_embeddings (
    product_id STRING,
    product_name STRING,
    embedding VECTOR(FLOAT, 384),
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
"""
execute_sql(conn, create_embeddings_sql, "Vector embeddings table created")

# 8. Model Performance Monitoring
print("\nModel Performance Monitoring...")
performance_sql = """
SELECT 
    model_name,
    model_version,
    model_metrics,
    created_date,
    status
FROM ml_models.model_registry
WHERE status = 'ACTIVE'
ORDER BY created_date DESC;
"""
df_performance = execute_sql(conn, performance_sql, "Model performance retrieved")
if df_performance is not None:
    print(df_performance)


# 4. Costs & Usage Monitoring

Horizon Catalog provides comprehensive cost monitoring and usage analytics to help organizations optimize their Snowflake spending and resource utilization.

## Key Features:
- **Cost Tracking**: Monitor compute and storage costs
- **Usage Analytics**: Analyze resource consumption patterns
- **Budget Alerts**: Set up cost thresholds and notifications
- **Resource Optimization**: Identify optimization opportunities
- **Chargeback Reporting**: Allocate costs to departments/projects


In [None]:
# Cost and Usage Monitoring Examples

# 1. Create Cost Tracking Tables
print("Setting up Cost Monitoring...")
create_cost_tracking_sql = """
CREATE OR REPLACE SCHEMA cost_monitoring;
"""
execute_sql(conn, create_cost_tracking_sql, "Cost monitoring schema created")

# 2. Warehouse Usage Tracking
print("\nWarehouse Usage Analysis...")
warehouse_usage_sql = """
SELECT 
    warehouse_name,
    DATE(start_time) as usage_date,
    COUNT(*) as query_count,
    SUM(credits_used) as total_credits,
    AVG(credits_used) as avg_credits_per_query,
    SUM(total_elapsed_time) as total_time_ms
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= DATEADD(day, -7, CURRENT_DATE())
GROUP BY warehouse_name, DATE(start_time)
ORDER BY usage_date DESC, total_credits DESC;
"""
df_warehouse_usage = execute_sql(conn, warehouse_usage_sql, "Warehouse usage retrieved")

# 3. Storage Cost Analysis
print("\nStorage Cost Analysis...")
storage_cost_sql = """
SELECT 
    database_name,
    schema_name,
    table_name,
    ROUND(bytes / 1024 / 1024 / 1024, 2) as size_gb,
    ROUND(bytes / 1024 / 1024 / 1024 * 0.023, 2) as estimated_monthly_cost_usd,
    row_count,
    created,
    last_altered
FROM snowflake.information_schema.tables
WHERE table_schema NOT IN ('INFORMATION_SCHEMA')
ORDER BY bytes DESC
LIMIT 10;
"""
df_storage_cost = execute_sql(conn, storage_cost_sql, "Storage cost analysis completed")

# 4. Query Performance and Cost
print("\nQuery Performance Analysis...")
query_performance_sql = """
SELECT 
    query_id,
    query_text,
    warehouse_name,
    credits_used,
    total_elapsed_time,
    bytes_scanned,
    rows_produced,
    DATE(start_time) as query_date
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -1, CURRENT_DATE())
  AND credits_used > 0.1
ORDER BY credits_used DESC
LIMIT 10;
"""
df_query_performance = execute_sql(conn, query_performance_sql, "Query performance analysis completed")

# 5. Cost Optimization Recommendations
print("\nCost Optimization Analysis...")
optimization_sql = """
SELECT 
    'Large Tables' as category,
    COUNT(*) as count,
    SUM(bytes) as total_bytes,
    ROUND(SUM(bytes) / 1024 / 1024 / 1024, 2) as total_gb
FROM snowflake.information_schema.tables
WHERE bytes > 1024 * 1024 * 1024  -- Tables larger than 1GB

UNION ALL

SELECT 
    'Unused Tables' as category,
    COUNT(*) as count,
    SUM(bytes) as total_bytes,
    ROUND(SUM(bytes) / 1024 / 1024 / 1024, 2) as total_gb
FROM snowflake.information_schema.tables t
LEFT JOIN snowflake.account_usage.access_history ah 
    ON t.table_name = ah.object_name
WHERE ah.object_name IS NULL
  AND t.created < DATEADD(day, -30, CURRENT_DATE());
"""
df_optimization = execute_sql(conn, optimization_sql, "Cost optimization analysis completed")


# 5. Data Warehouses Management

Data warehouses are compute resources that execute SQL queries. Horizon Catalog provides enhanced management capabilities for optimizing warehouse performance and costs.

## Warehouse Features:
- **Multi-cluster Warehouses**: Auto-scaling compute resources
- **Warehouse Sizing**: Right-size compute for workloads
- **Auto-suspend/Auto-resume**: Cost optimization
- **Resource Monitors**: Credit usage limits
- **Query Optimization**: Performance tuning


In [None]:
# Data Warehouse Management Examples

# 1. Create Optimized Warehouse
print("Creating Optimized Warehouse...")
create_warehouse_sql = """
CREATE OR REPLACE WAREHOUSE analytics_wh
WITH 
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
    MIN_CLUSTER_COUNT = 1
    MAX_CLUSTER_COUNT = 3
    SCALING_POLICY = 'STANDARD'
    COMMENT = 'Analytics warehouse with auto-scaling';
"""
execute_sql(conn, create_warehouse_sql, "Analytics warehouse created")

# 2. Warehouse Performance Analysis
print("\nWarehouse Performance Analysis...")
warehouse_performance_sql = """
SELECT 
    warehouse_name,
    AVG(credits_used) as avg_credits,
    MAX(credits_used) as max_credits,
    COUNT(*) as query_count,
    AVG(total_elapsed_time) as avg_execution_time_ms
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -7, CURRENT_DATE())
GROUP BY warehouse_name
ORDER BY avg_credits DESC;
"""
df_warehouse_perf = execute_sql(conn, warehouse_performance_sql, "Warehouse performance analyzed")

# 3. Resource Monitor Setup
print("\nSetting up Resource Monitor...")
resource_monitor_sql = """
CREATE OR REPLACE RESOURCE MONITOR monthly_budget
WITH 
    CREDIT_QUOTA = 1000
    FREQUENCY = MONTHLY
    START_TIMESTAMP = CURRENT_TIMESTAMP()
    TRIGGERS
        (75 PERCENT DO NOTIFY,
         90 PERCENT DO SUSPEND,
         100 PERCENT DO SUSPEND_IMMEDIATE);
"""
execute_sql(conn, resource_monitor_sql, "Resource monitor created")


# 6. Data Shares

Data sharing enables secure, real-time data sharing between Snowflake accounts without copying data. Horizon Catalog enhances sharing with improved governance and monitoring.

## Sharing Features:
- **Secure Sharing**: No data movement required
- **Real-time Access**: Live data access
- **Governance Controls**: Fine-grained permissions
- **Usage Monitoring**: Track shared data consumption
- **Cross-cloud Sharing**: Share across cloud providers


In [None]:
# Data Sharing Examples

# 1. Create Data Share
print("Creating Data Share...")
create_share_sql = """
CREATE OR REPLACE SHARE customer_analytics_share
COMMENT = 'Customer analytics data for business partners';
"""
execute_sql(conn, create_share_sql, "Data share created")

# 2. Add Objects to Share
print("\nAdding Objects to Share...")
add_to_share_sql = """
GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO SHARE customer_analytics_share;
GRANT USAGE ON SCHEMA HORIZON_CATALOG_DEMO.PUBLIC TO SHARE customer_analytics_share;
GRANT SELECT ON TABLE HORIZON_CATALOG_DEMO.PUBLIC.customers TO SHARE customer_analytics_share;
GRANT SELECT ON VIEW HORIZON_CATALOG_DEMO.PUBLIC.customer_summary TO SHARE customer_analytics_share;
"""
execute_sql(conn, add_to_share_sql, "Objects added to share")

# 3. Share Usage Monitoring
print("\nShare Usage Monitoring...")
share_usage_sql = """
SELECT 
    share_name,
    consumer_account,
    consumer_account_locator,
    created_on,
    kind
FROM snowflake.account_usage.shares
WHERE share_name = 'CUSTOMER_ANALYTICS_SHARE';
"""
df_share_usage = execute_sql(conn, share_usage_sql, "Share usage monitored")


# 7. Semantic Views

Semantic views provide business-friendly abstractions over technical data structures, making data more accessible to business users and analytics tools.

## Semantic View Features:
- **Business Logic**: Encapsulate complex business rules
- **Data Abstraction**: Hide technical complexity
- **Consistent Metrics**: Standardized calculations
- **Self-Service Analytics**: Enable business users
- **Governance**: Centralized business definitions


In [None]:
# Semantic Views Examples

# 1. Create Business Metrics View
print("Creating Semantic Views...")
create_semantic_view_sql = """
CREATE OR REPLACE VIEW business_metrics.customer_kpi AS
SELECT 
    c.customer_id,
    CONCAT(c.first_name, ' ', c.last_name) AS customer_name,
    c.city,
    c.state,
    COUNT(s.order_id) AS total_orders,
    SUM(s.total_amount) AS lifetime_value,
    AVG(s.total_amount) AS avg_order_value,
    MAX(s.order_date) AS last_purchase_date,
    DATEDIFF(day, MAX(s.order_date), CURRENT_DATE()) AS days_since_last_purchase,
    CASE 
        WHEN COUNT(s.order_id) >= 10 THEN 'VIP'
        WHEN COUNT(s.order_id) >= 5 THEN 'Premium'
        WHEN COUNT(s.order_id) >= 2 THEN 'Regular'
        ELSE 'New'
    END AS customer_tier,
    CASE 
        WHEN DATEDIFF(day, MAX(s.order_date), CURRENT_DATE()) <= 30 THEN 'Active'
        WHEN DATEDIFF(day, MAX(s.order_date), CURRENT_DATE()) <= 90 THEN 'At Risk'
        ELSE 'Inactive'
    END AS customer_status
FROM customers c
LEFT JOIN iceberg_catalog.sales_data s ON c.customer_id = s.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name, c.city, c.state;
"""
execute_sql(conn, create_semantic_view_sql, "Semantic view created")

# 2. Revenue Analytics View
print("\nCreating Revenue Analytics View...")
revenue_view_sql = """
CREATE OR REPLACE VIEW business_metrics.revenue_analytics AS
SELECT 
    DATE_TRUNC('month', order_date) AS month,
    COUNT(DISTINCT customer_id) AS active_customers,
    COUNT(order_id) AS total_orders,
    SUM(total_amount) AS total_revenue,
    AVG(total_amount) AS avg_order_value,
    SUM(total_amount) / COUNT(DISTINCT customer_id) AS revenue_per_customer
FROM iceberg_catalog.sales_data
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;
"""
execute_sql(conn, revenue_view_sql, "Revenue analytics view created")


# 8. Databases & Schemas

Databases and schemas provide logical organization for data objects. Horizon Catalog enhances this with improved metadata management and governance.

## Organization Features:
- **Logical Grouping**: Organize related objects
- **Namespace Management**: Avoid naming conflicts
- **Access Control**: Schema-level permissions
- **Data Classification**: Tag and categorize data
- **Lifecycle Management**: Automated data retention


In [None]:
# Database and Schema Management

# 1. Create Organized Database Structure
print("Creating Database Structure...")
create_database_sql = """
CREATE OR REPLACE DATABASE analytics_db
COMMENT = 'Analytics database for business intelligence';
"""
execute_sql(conn, create_database_sql, "Analytics database created")

# 2. Create Organized Schemas
print("\nCreating Organized Schemas...")
create_schemas_sql = """
CREATE OR REPLACE SCHEMA analytics_db.raw_data
COMMENT = 'Raw data ingestion layer';

CREATE OR REPLACE SCHEMA analytics_db.staging
COMMENT = 'Data staging and transformation layer';

CREATE OR REPLACE SCHEMA analytics_db.mart
COMMENT = 'Data mart for business users';

CREATE OR REPLACE SCHEMA analytics_db.ml_models
COMMENT = 'Machine learning models and functions';
"""
execute_sql(conn, create_schemas_sql, "Schemas created")

# 3. Data Classification and Tagging
print("\nSetting up Data Classification...")
classification_sql = """
CREATE OR REPLACE TAG analytics_db.data_classification (
    sensitivity_level STRING,
    data_owner STRING,
    retention_period INTEGER,
    compliance_standard STRING
);
"""
execute_sql(conn, classification_sql, "Data classification tag created")

# 4. Schema Usage Analysis
print("\nSchema Usage Analysis...")
schema_usage_sql = """
SELECT 
    database_name,
    schema_name,
    COUNT(*) as object_count,
    SUM(bytes) as total_bytes
FROM snowflake.information_schema.tables
WHERE database_name = 'ANALYTICS_DB'
GROUP BY database_name, schema_name
ORDER BY total_bytes DESC;
"""
df_schema_usage = execute_sql(conn, schema_usage_sql, "Schema usage analyzed")


# 9. Business Glossary

The Business Glossary provides a centralized repository for business terms, definitions, and data lineage, enabling better data governance and understanding.

## Glossary Features:
- **Term Definitions**: Centralized business vocabulary
- **Data Lineage**: Track data flow and transformations
- **Ownership**: Assign data stewards and owners
- **Relationships**: Link related terms and concepts
- **Compliance**: Document regulatory requirements


In [None]:
# Business Glossary Examples

# 1. Create Business Glossary Tables
print("Creating Business Glossary...")
create_glossary_sql = """
CREATE OR REPLACE SCHEMA business_glossary;

CREATE OR REPLACE TABLE business_glossary.terms (
    term_id STRING PRIMARY KEY,
    term_name STRING,
    definition STRING,
    category STRING,
    data_steward STRING,
    business_owner STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    status STRING DEFAULT 'ACTIVE'
);

CREATE OR REPLACE TABLE business_glossary.data_lineage (
    lineage_id STRING PRIMARY KEY,
    source_object STRING,
    target_object STRING,
    transformation_type STRING,
    business_rule STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
"""
execute_sql(conn, create_glossary_sql, "Business glossary tables created")

# 2. Insert Business Terms
print("\nAdding Business Terms...")
insert_terms_sql = """
INSERT INTO business_glossary.terms VALUES
('TERM001', 'Customer Lifetime Value', 'Total revenue generated by a customer over their entire relationship with the company', 'Customer Analytics', 'data_analyst', 'marketing_manager', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), 'ACTIVE'),
('TERM002', 'Churn Rate', 'Percentage of customers who stop using the service within a given time period', 'Customer Analytics', 'data_scientist', 'customer_success_manager', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), 'ACTIVE'),
('TERM003', 'Monthly Recurring Revenue', 'Total predictable revenue generated from subscriptions each month', 'Financial Metrics', 'finance_analyst', 'cfo', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), 'ACTIVE');
"""
execute_sql(conn, insert_terms_sql, "Business terms added")

# 3. Data Lineage Documentation
print("\nDocumenting Data Lineage...")
lineage_sql = """
INSERT INTO business_glossary.data_lineage VALUES
('LINEAGE001', 'customers', 'customer_kpi', 'AGGREGATION', 'Calculate customer metrics from raw customer data', CURRENT_TIMESTAMP()),
('LINEAGE002', 'sales_data', 'revenue_analytics', 'AGGREGATION', 'Aggregate sales data by month for revenue analysis', CURRENT_TIMESTAMP());
"""
execute_sql(conn, lineage_sql, "Data lineage documented")


# 10. Workflows

Workflows in Horizon Catalog enable automated data pipelines and business processes, providing orchestration capabilities for complex data operations.

## Workflow Features:
- **Task Orchestration**: Coordinate multiple data operations
- **Dependency Management**: Handle task dependencies
- **Error Handling**: Robust error management and retry logic
- **Scheduling**: Automated execution schedules
- **Monitoring**: Real-time workflow monitoring


In [None]:
# Workflow Examples

# 1. Create Workflow Tasks
print("Creating Workflow Tasks...")
create_tasks_sql = """
CREATE OR REPLACE TASK data_ingestion_task
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON 0 2 * * * UTC'  -- Daily at 2 AM UTC
AS
INSERT INTO analytics_db.staging.daily_sales_summary
SELECT 
    DATE(order_date) as sale_date,
    COUNT(*) as total_orders,
    SUM(total_amount) as total_revenue,
    COUNT(DISTINCT customer_id) as unique_customers
FROM iceberg_catalog.sales_data
WHERE DATE(order_date) = CURRENT_DATE() - 1
GROUP BY DATE(order_date);

CREATE OR REPLACE TASK data_quality_check_task
WAREHOUSE = 'COMPUTE_WH'
AFTER data_ingestion_task
AS
INSERT INTO analytics_db.staging.data_quality_log
SELECT 
    'daily_sales_summary' as table_name,
    COUNT(*) as record_count,
    SUM(CASE WHEN total_revenue < 0 THEN 1 ELSE 0 END) as negative_revenue_count,
    CURRENT_TIMESTAMP() as check_time
FROM analytics_db.staging.daily_sales_summary
WHERE sale_date = CURRENT_DATE() - 1;
"""
execute_sql(conn, create_tasks_sql, "Workflow tasks created")

# 2. Enable Tasks
print("\nEnabling Workflow Tasks...")
enable_tasks_sql = """
ALTER TASK data_ingestion_task RESUME;
ALTER TASK data_quality_check_task RESUME;
"""
execute_sql(conn, enable_tasks_sql, "Tasks enabled")

# 3. Workflow Monitoring
print("\nWorkflow Monitoring...")
workflow_monitoring_sql = """
SELECT 
    task_name,
    state,
    schedule,
    next_scheduled_time,
    created_on
FROM snowflake.information_schema.tasks
WHERE task_schema = 'PUBLIC'
ORDER BY created_on DESC;
"""
df_workflow_monitoring = execute_sql(conn, workflow_monitoring_sql, "Workflow monitoring completed")


# 11. RBAC (Role-Based Access Control)

Role-Based Access Control provides fine-grained security controls for data access, ensuring users only access data they're authorized to see.

## RBAC Features:
- **Role Hierarchy**: Structured permission inheritance
- **Object-Level Security**: Granular access controls
- **Dynamic Data Masking**: Protect sensitive data
- **Row-Level Security**: Filter data by user context
- **Column-Level Security**: Control access to specific columns


In [None]:
# RBAC Examples

# 1. Create Security Roles
print("Creating Security Roles...")
create_roles_sql = """
CREATE OR REPLACE ROLE data_analyst;
CREATE OR REPLACE ROLE data_scientist;
CREATE OR REPLACE ROLE business_user;
CREATE OR REPLACE ROLE data_admin;
"""
execute_sql(conn, create_roles_sql, "Security roles created")

# 2. Grant Permissions
print("\nGranting Permissions...")
grant_permissions_sql = """
-- Data Analyst permissions
GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE data_analyst;
GRANT USAGE ON SCHEMA HORIZON_CATALOG_DEMO.PUBLIC TO ROLE data_analyst;
GRANT SELECT ON TABLE HORIZON_CATALOG_DEMO.PUBLIC.customers TO ROLE data_analyst;
GRANT SELECT ON VIEW HORIZON_CATALOG_DEMO.PUBLIC.customer_summary TO ROLE data_analyst;

-- Data Scientist permissions
GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE data_scientist;
GRANT USAGE ON SCHEMA HORIZON_CATALOG_DEMO.PUBLIC TO ROLE data_scientist;
GRANT SELECT ON ALL TABLES IN SCHEMA HORIZON_CATALOG_DEMO.PUBLIC TO ROLE data_scientist;
GRANT USAGE ON SCHEMA HORIZON_CATALOG_DEMO.ml_models TO ROLE data_scientist;

-- Business User permissions
GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE business_user;
GRANT USAGE ON SCHEMA HORIZON_CATALOG_DEMO.PUBLIC TO ROLE business_user;
GRANT SELECT ON VIEW HORIZON_CATALOG_DEMO.PUBLIC.customer_summary TO ROLE business_user;
"""
execute_sql(conn, grant_permissions_sql, "Permissions granted")

# 3. Dynamic Data Masking
print("\nSetting up Dynamic Data Masking...")
data_masking_sql = """
CREATE OR REPLACE MASKING POLICY email_mask AS (val STRING) RETURNS STRING ->
    CASE 
        WHEN CURRENT_ROLE() IN ('DATA_ADMIN') THEN val
        ELSE REGEXP_REPLACE(val, '(.*)@(.*)', '***@\\2')
    END;

ALTER TABLE customers MODIFY COLUMN email SET MASKING POLICY email_mask;
"""
execute_sql(conn, data_masking_sql, "Dynamic data masking configured")

# 4. Row-Level Security
print("\nSetting up Row-Level Security...")
row_level_security_sql = """
CREATE OR REPLACE ROW ACCESS POLICY customer_region_policy AS (region STRING) RETURNS BOOLEAN ->
    CASE 
        WHEN CURRENT_ROLE() = 'DATA_ADMIN' THEN TRUE
        WHEN CURRENT_ROLE() = 'DATA_ANALYST' AND region = 'US' THEN TRUE
        ELSE FALSE
    END;

ALTER TABLE customers ADD ROW ACCESS POLICY customer_region_policy ON (state);
"""
execute_sql(conn, row_level_security_sql, "Row-level security configured")


# 12. Audit Trail & Logging

Comprehensive audit trails and logging provide visibility into data access, changes, and system activities for compliance and security monitoring.

## Audit Features:
- **Access Logging**: Track who accessed what data
- **Change Tracking**: Monitor data modifications
- **Query History**: Log all SQL queries and performance
- **Security Events**: Monitor authentication and authorization
- **Compliance Reporting**: Generate audit reports


In [None]:
# Audit Trail & Logging Examples

# 1. Query History Analysis
print("Analyzing Query History...")
query_history_sql = """
SELECT 
    query_id,
    user_name,
    role_name,
    query_text,
    start_time,
    end_time,
    total_elapsed_time,
    bytes_scanned,
    rows_produced
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -7, CURRENT_DATE())
ORDER BY start_time DESC
LIMIT 10;
"""
df_query_history = execute_sql(conn, query_history_sql, "Query history analyzed")

# 2. Access History Analysis
print("\nAnalyzing Access History...")
access_history_sql = """
SELECT 
    user_name,
    role_name,
    object_name,
    object_domain,
    access_type,
    access_time
FROM snowflake.account_usage.access_history
WHERE access_time >= DATEADD(day, -7, CURRENT_DATE())
ORDER BY access_time DESC
LIMIT 10;
"""
df_access_history = execute_sql(conn, access_history_sql, "Access history analyzed")

# 3. Login History Analysis
print("\nAnalyzing Login History...")
login_history_sql = """
SELECT 
    user_name,
    client_ip,
    reported_client_type,
    first_authentication_factor,
    second_authentication_factor,
    login_timestamp,
    is_success
FROM snowflake.account_usage.login_history
WHERE login_timestamp >= DATEADD(day, -7, CURRENT_DATE())
ORDER BY login_timestamp DESC
LIMIT 10;
"""
df_login_history = execute_sql(conn, login_history_sql, "Login history analyzed")

# 4. Data Change Tracking
print("\nData Change Tracking...")
change_tracking_sql = """
SELECT 
    table_name,
    column_name,
    data_type,
    is_nullable,
    created,
    last_altered
FROM snowflake.information_schema.columns
WHERE table_schema = 'PUBLIC'
  AND last_altered >= DATEADD(day, -7, CURRENT_DATE())
ORDER BY last_altered DESC;
"""
df_change_tracking = execute_sql(conn, change_tracking_sql, "Data changes tracked")


# 13. Metadata Retrieval

Metadata retrieval provides comprehensive information about data objects, their relationships, and usage patterns for data discovery and governance.

## Metadata Features:
- **Object Discovery**: Find and catalog data objects
- **Relationship Mapping**: Understand data dependencies
- **Usage Analytics**: Track object utilization
- **Schema Evolution**: Monitor structural changes
- **Data Profiling**: Analyze data characteristics


In [None]:
# Metadata Retrieval Examples

# 1. Object Discovery
print("Discovering Data Objects...")
object_discovery_sql = """
SELECT 
    table_catalog as database_name,
    table_schema as schema_name,
    table_name,
    table_type,
    row_count,
    ROUND(bytes / 1024 / 1024, 2) as size_mb,
    created,
    last_altered
FROM snowflake.information_schema.tables
WHERE table_schema NOT IN ('INFORMATION_SCHEMA')
ORDER BY bytes DESC;
"""
df_object_discovery = execute_sql(conn, object_discovery_sql, "Objects discovered")

# 2. Column Metadata Analysis
print("\nColumn Metadata Analysis...")
column_metadata_sql = """
SELECT 
    table_name,
    column_name,
    data_type,
    is_nullable,
    column_default,
    ordinal_position
FROM snowflake.information_schema.columns
WHERE table_schema = 'PUBLIC'
ORDER BY table_name, ordinal_position;
"""
df_column_metadata = execute_sql(conn, column_metadata_sql, "Column metadata analyzed")

# 3. Data Dependencies
print("\nData Dependencies...")
dependencies_sql = """
SELECT 
    referencing_object_name,
    referencing_object_domain,
    referenced_object_name,
    referenced_object_domain,
    referenced_object_id
FROM snowflake.account_usage.object_dependencies
WHERE referencing_object_domain = 'TABLE'
ORDER BY referencing_object_name;
"""
df_dependencies = execute_sql(conn, dependencies_sql, "Dependencies analyzed")

# 4. Data Profiling
print("\nData Profiling...")
data_profiling_sql = """
SELECT 
    'customers' as table_name,
    COUNT(*) as total_rows,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(DISTINCT city) as unique_cities,
    COUNT(DISTINCT state) as unique_states,
    MIN(created_date) as earliest_record,
    MAX(created_date) as latest_record
FROM customers;
"""
df_data_profiling = execute_sql(conn, data_profiling_sql, "Data profiling completed")


# 14. Data Quality

Data quality management ensures data accuracy, completeness, and consistency across the organization through automated validation and monitoring.

## Data Quality Features:
- **Validation Rules**: Define data quality constraints
- **Automated Monitoring**: Continuous quality checks
- **Data Profiling**: Analyze data characteristics
- **Anomaly Detection**: Identify data issues
- **Quality Metrics**: Track data quality trends


In [None]:
# Data Quality Examples

# 1. Create Data Quality Schema
print("Setting up Data Quality Framework...")
create_dq_schema_sql = """
CREATE OR REPLACE SCHEMA data_quality;

CREATE OR REPLACE TABLE data_quality.validation_rules (
    rule_id STRING PRIMARY KEY,
    table_name STRING,
    column_name STRING,
    rule_type STRING,
    rule_expression STRING,
    severity STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

CREATE OR REPLACE TABLE data_quality.quality_results (
    check_id STRING PRIMARY KEY,
    table_name STRING,
    rule_id STRING,
    check_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    passed_count INTEGER,
    failed_count INTEGER,
    total_count INTEGER,
    quality_score FLOAT
);
"""
execute_sql(conn, create_dq_schema_sql, "Data quality framework created")

# 2. Define Data Quality Rules
print("\nDefining Data Quality Rules...")
insert_rules_sql = """
INSERT INTO data_quality.validation_rules VALUES
('RULE001', 'customers', 'customer_id', 'NOT_NULL', 'customer_id IS NOT NULL', 'ERROR'),
('RULE002', 'customers', 'email', 'EMAIL_FORMAT', 'email LIKE ''%@%.%''', 'WARNING'),
('RULE003', 'customers', 'phone', 'PHONE_FORMAT', 'phone LIKE ''555-%''', 'WARNING'),
('RULE004', 'iceberg_catalog.sales_data', 'total_amount', 'POSITIVE_VALUE', 'total_amount > 0', 'ERROR'),
('RULE005', 'iceberg_catalog.sales_data', 'quantity', 'POSITIVE_VALUE', 'quantity > 0', 'ERROR');
"""
execute_sql(conn, insert_rules_sql, "Data quality rules defined")

# 3. Data Quality Checks
print("\nRunning Data Quality Checks...")
quality_check_sql = """
INSERT INTO data_quality.quality_results
SELECT 
    'CHECK_' || CURRENT_TIMESTAMP() as check_id,
    'customers' as table_name,
    'RULE001' as rule_id,
    CURRENT_TIMESTAMP() as check_date,
    SUM(CASE WHEN customer_id IS NOT NULL THEN 1 ELSE 0 END) as passed_count,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as failed_count,
    COUNT(*) as total_count,
    ROUND(SUM(CASE WHEN customer_id IS NOT NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as quality_score
FROM customers;
"""
execute_sql(conn, quality_check_sql, "Data quality check completed")

# 4. Data Quality Dashboard
print("\nData Quality Dashboard...")
quality_dashboard_sql = """
SELECT 
    qr.table_name,
    vr.rule_type,
    qr.passed_count,
    qr.failed_count,
    qr.total_count,
    qr.quality_score,
    qr.check_date
FROM data_quality.quality_results qr
JOIN data_quality.validation_rules vr ON qr.rule_id = vr.rule_id
ORDER BY qr.check_date DESC;
"""
df_quality_dashboard = execute_sql(conn, quality_dashboard_sql, "Quality dashboard generated")


# 15. Practical Examples & Best Practices

This section provides real-world examples and best practices for implementing Horizon Catalog features effectively in production environments.

## Best Practices:
- **Data Architecture**: Design scalable data architectures
- **Performance Optimization**: Optimize queries and storage
- **Security Implementation**: Implement comprehensive security
- **Monitoring & Alerting**: Set up effective monitoring
- **Cost Management**: Optimize costs and resource usage


In [None]:
# Practical Examples & Best Practices

# 1. Complete Data Architecture Example
print("Complete Data Architecture Implementation...")

# Create comprehensive data architecture
architecture_sql = """
-- Raw Data Layer
CREATE OR REPLACE SCHEMA raw_data
COMMENT = 'Raw data ingestion layer';

-- Staging Layer  
CREATE OR REPLACE SCHEMA staging
COMMENT = 'Data staging and transformation layer';

-- Data Mart Layer
CREATE OR REPLACE SCHEMA data_mart
COMMENT = 'Business-ready data marts';

-- ML Layer
CREATE OR REPLACE SCHEMA ml_models
COMMENT = 'Machine learning models and functions';

-- Analytics Layer
CREATE OR REPLACE SCHEMA analytics
COMMENT = 'Analytics and reporting layer';
"""
execute_sql(conn, architecture_sql, "Data architecture created")

# 2. Performance Optimization Best Practices
print("\nPerformance Optimization Examples...")
performance_optimization_sql = """
-- Create optimized table with clustering
CREATE OR REPLACE TABLE analytics.customer_performance (
    customer_id STRING,
    order_date DATE,
    total_amount DECIMAL(12,2),
    order_count INTEGER
)
CLUSTER BY (customer_id, order_date);

-- Create materialized view for frequently accessed data
CREATE OR REPLACE MATERIALIZED VIEW analytics.daily_sales_summary
AS
SELECT 
    DATE(order_date) as sale_date,
    COUNT(*) as total_orders,
    SUM(total_amount) as total_revenue,
    COUNT(DISTINCT customer_id) as unique_customers
FROM iceberg_catalog.sales_data
GROUP BY DATE(order_date);
"""
execute_sql(conn, performance_optimization_sql, "Performance optimizations applied")

# 3. Comprehensive Security Implementation
print("\nComprehensive Security Implementation...")
security_implementation_sql = """
-- Create comprehensive role hierarchy
CREATE OR REPLACE ROLE data_engineer;
CREATE OR REPLACE ROLE data_analyst;
CREATE OR REPLACE ROLE business_user;
CREATE OR REPLACE ROLE data_admin;

-- Grant appropriate permissions
GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON SCHEMA HORIZON_CATALOG_DEMO.raw_data TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON SCHEMA HORIZON_CATALOG_DEMO.staging TO ROLE data_engineer;

GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA HORIZON_CATALOG_DEMO.staging TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA HORIZON_CATALOG_DEMO.data_mart TO ROLE data_analyst;

GRANT USAGE ON DATABASE HORIZON_CATALOG_DEMO TO ROLE business_user;
GRANT SELECT ON ALL TABLES IN SCHEMA HORIZON_CATALOG_DEMO.data_mart TO ROLE business_user;
"""
execute_sql(conn, security_implementation_sql, "Security implementation completed")

# 4. Monitoring and Alerting Setup
print("\nMonitoring and Alerting Setup...")
monitoring_setup_sql = """
-- Create monitoring dashboard
CREATE OR REPLACE VIEW monitoring.system_health AS
SELECT 
    'Warehouse Usage' as metric_name,
    warehouse_name,
    SUM(credits_used) as total_credits,
    COUNT(*) as query_count
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -1, CURRENT_DATE())
GROUP BY warehouse_name

UNION ALL

SELECT 
    'Storage Usage' as metric_name,
    database_name,
    SUM(bytes) as total_bytes,
    COUNT(*) as table_count
FROM snowflake.information_schema.tables
GROUP BY database_name;
"""
execute_sql(conn, monitoring_setup_sql, "Monitoring setup completed")

# 5. Cost Optimization Recommendations
print("\nCost Optimization Analysis...")
cost_optimization_sql = """
-- Identify cost optimization opportunities
SELECT 
    'High Credit Queries' as optimization_type,
    COUNT(*) as count,
    SUM(credits_used) as total_credits
FROM snowflake.account_usage.query_history
WHERE credits_used > 1.0
  AND start_time >= DATEADD(day, -7, CURRENT_DATE())

UNION ALL

SELECT 
    'Large Tables' as optimization_type,
    COUNT(*) as count,
    SUM(bytes) as total_bytes
FROM snowflake.information_schema.tables
WHERE bytes > 1024 * 1024 * 1024;  -- Tables larger than 1GB
"""
df_cost_optimization = execute_sql(conn, cost_optimization_sql, "Cost optimization analysis completed")


# Conclusion & Summary

This comprehensive notebook has demonstrated all major features of Snowflake's Horizon Catalog, providing practical examples and best practices for implementation.

## Key Takeaways:

### 1. **Iceberg Tables**
- Provide ACID transactions and schema evolution
- Enable better interoperability with open data formats
- Support time travel and partition evolution

### 2. **Traditional Tables & Views**
- Foundation for data storage and access patterns
- Support for various table types (permanent, temporary, transient, external)
- Materialized views for performance optimization

### 3. **AI & ML Objects**
- Centralized model registry and management
- ML functions for business logic
- Vector embeddings for semantic search

### 4. **Cost & Usage Monitoring**
- Comprehensive cost tracking and optimization
- Resource usage analytics
- Budget alerts and monitoring

### 5. **Data Warehouses**
- Multi-cluster auto-scaling capabilities
- Resource monitors for cost control
- Performance optimization features

## Next Steps:

1. **Implement Gradually**: Start with core features and expand over time
2. **Establish Governance**: Set up data governance policies and procedures
3. **Monitor Performance**: Continuously monitor and optimize performance
4. **Train Teams**: Ensure teams understand the new capabilities
5. **Iterate and Improve**: Regularly review and improve implementations

## Resources:

- [Snowflake Documentation](https://docs.snowflake.com/)
- [Horizon Catalog Guide](https://docs.snowflake.com/en/user-guide/horizon-catalog)
- [Best Practices Guide](https://docs.snowflake.com/en/user-guide/best-practices)
- [Security Guide](https://docs.snowflake.com/en/user-guide/security)

---

**Note**: Remember to update the connection configuration at the beginning of this notebook with your actual Snowflake account details before running the examples.


# 16. Data Metric Functions (DMFs) - Advanced Data Quality

Data Metric Functions (DMFs) are Snowflake's advanced data quality monitoring system that provides automated measurement and monitoring of data quality metrics. Based on the Snowflake Data Quality documentation, DMFs enable comprehensive data quality management.

## DMF Features:
- **System DMFs**: Built-in metrics from Snowflake
- **Custom DMFs**: User-defined quality metrics
- **Automated Scheduling**: Regular quality checks
- **Expectations**: Define pass/fail criteria
- **Quality Monitoring**: Track quality trends over time
- **Alerting**: Notify on quality issues


In [None]:
# Data Metric Functions (DMFs) Examples

# 1. System DMFs - Built-in Quality Metrics
print("Using System DMFs for Data Quality...")

# Create a table to demonstrate DMFs
create_demo_table_sql = """
CREATE OR REPLACE TABLE dq_demo.customer_data (
    customer_id STRING,
    first_name STRING,
    last_name STRING,
    email STRING,
    phone STRING,
    age INTEGER,
    registration_date DATE,
    status STRING
);
"""
execute_sql(conn, create_demo_table_sql, "Demo table created for DMFs")

# Insert sample data with some quality issues
insert_demo_data_sql = """
INSERT INTO dq_demo.customer_data VALUES
('CUST001', 'John', 'Doe', 'john.doe@email.com', '555-0101', 25, '2024-01-15', 'ACTIVE'),
('CUST002', 'Jane', 'Smith', 'jane.smith@email.com', '555-0102', 30, '2024-01-16', 'ACTIVE'),
('CUST003', 'Bob', 'Johnson', 'bob.johnson@email.com', '555-0103', 35, '2024-01-17', 'ACTIVE'),
('CUST004', 'Alice', 'Brown', 'alice.brown@email.com', '555-0104', 28, '2024-01-18', 'ACTIVE'),
('CUST005', 'Charlie', 'Wilson', 'charlie.wilson@email.com', '555-0105', 42, '2024-01-19', 'ACTIVE'),
('CUST006', 'Diana', 'Davis', 'diana.davis@email.com', '555-0106', 29, '2024-01-20', 'ACTIVE'),
('CUST007', 'Eve', 'Miller', 'eve.miller@email.com', '555-0107', 31, '2024-01-21', 'ACTIVE'),
('CUST008', 'Frank', 'Garcia', 'frank.garcia@email.com', '555-0108', 27, '2024-01-22', 'ACTIVE'),
('CUST009', 'Grace', 'Martinez', 'grace.martinez@email.com', '555-0109', 33, '2024-01-23', 'ACTIVE'),
('CUST010', 'Henry', 'Anderson', 'henry.anderson@email.com', '555-0110', 26, '2024-01-24', 'ACTIVE');
"""
execute_sql(conn, insert_demo_data_sql, "Sample data inserted")

# 2. Using System DMFs
print("\nUsing System DMFs...")

# Row count DMF
row_count_dmf_sql = """
SELECT SNOWFLAKE.CORE.ROW_COUNT('dq_demo.customer_data') as total_rows;
"""
df_row_count = execute_sql(conn, row_count_dmf_sql, "Row count DMF executed")

# Null count DMF
null_count_dmf_sql = """
SELECT SNOWFLAKE.CORE.NULL_COUNT('dq_demo.customer_data', 'email') as null_email_count;
"""
df_null_count = execute_sql(conn, null_count_dmf_sql, "Null count DMF executed")

# Unique count DMF
unique_count_dmf_sql = """
SELECT SNOWFLAKE.CORE.UNIQUE_COUNT('dq_demo.customer_data', 'customer_id') as unique_customers;
"""
df_unique_count = execute_sql(conn, unique_count_dmf_sql, "Unique count DMF executed")

# Duplicate count DMF
duplicate_count_dmf_sql = """
SELECT SNOWFLAKE.CORE.DUPLICATE_COUNT('dq_demo.customer_data', 'email') as duplicate_emails;
"""
df_duplicate_count = execute_sql(conn, duplicate_count_dmf_sql, "Duplicate count DMF executed")


In [None]:
# Custom DMFs and Advanced Quality Monitoring

# 3. Create Custom DMFs
print("\nCreating Custom DMFs...")

# Custom DMF for email format validation
create_email_dmf_sql = """
CREATE OR REPLACE DATA METRIC FUNCTION dq_demo.email_format_check(TABLE_NAME STRING, COLUMN_NAME STRING)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
    SELECT 
        COUNT(CASE WHEN REGEXP_LIKE($2, '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$') THEN 1 END) * 100.0 / COUNT(*)
    FROM IDENTIFIER($1);
$$;
"""
execute_sql(conn, create_email_dmf_sql, "Email format DMF created")

# Custom DMF for age range validation
create_age_dmf_sql = """
CREATE OR REPLACE DATA METRIC FUNCTION dq_demo.age_range_check(TABLE_NAME STRING, COLUMN_NAME STRING)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
    SELECT 
        COUNT(CASE WHEN $2 BETWEEN 18 AND 100 THEN 1 END) * 100.0 / COUNT(*)
    FROM IDENTIFIER($1);
$$;
"""
execute_sql(conn, create_age_dmf_sql, "Age range DMF created")

# Custom DMF for data freshness
create_freshness_dmf_sql = """
CREATE OR REPLACE DATA METRIC FUNCTION dq_demo.data_freshness(TABLE_NAME STRING, COLUMN_NAME STRING)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
    SELECT 
        CASE 
            WHEN COUNT(*) = 0 THEN 0
            ELSE COUNT(CASE WHEN $2 >= CURRENT_DATE() - 7 THEN 1 END) * 100.0 / COUNT(*)
        END
    FROM IDENTIFIER($1);
$$;
"""
execute_sql(conn, create_freshness_dmf_sql, "Data freshness DMF created")

# 4. Test Custom DMFs
print("\nTesting Custom DMFs...")

# Test email format DMF
test_email_dmf_sql = """
SELECT dq_demo.email_format_check('dq_demo.customer_data', 'email') as email_format_score;
"""
df_email_dmf = execute_sql(conn, test_email_dmf_sql, "Email format DMF tested")

# Test age range DMF
test_age_dmf_sql = """
SELECT dq_demo.age_range_check('dq_demo.customer_data', 'age') as age_range_score;
"""
df_age_dmf = execute_sql(conn, test_age_dmf_sql, "Age range DMF tested")

# Test data freshness DMF
test_freshness_dmf_sql = """
SELECT dq_demo.data_freshness('dq_demo.customer_data', 'registration_date') as data_freshness_score;
"""
df_freshness_dmf = execute_sql(conn, test_freshness_dmf_sql, "Data freshness DMF tested")


In [None]:
# DMF Scheduling and Expectations

# 5. Set DMFs on Tables with Scheduling
print("\nSetting DMFs on Tables with Scheduling...")

# Set system DMFs on the table
set_dmf_sql = """
-- Set row count DMF
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION SNOWFLAKE.CORE.ROW_COUNT;

-- Set null count DMF for email column
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION SNOWFLAKE.CORE.NULL_COUNT('email');

-- Set unique count DMF for customer_id column
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION SNOWFLAKE.CORE.UNIQUE_COUNT('customer_id');

-- Set custom DMFs
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION dq_demo.email_format_check('dq_demo.customer_data', 'email');
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION dq_demo.age_range_check('dq_demo.customer_data', 'age');
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION dq_demo.data_freshness('dq_demo.customer_data', 'registration_date');
"""
execute_sql(conn, set_dmf_sql, "DMFs set on table")

# 6. Schedule DMFs to run automatically
print("\nScheduling DMFs...")
schedule_dmf_sql = """
-- Schedule DMFs to run every hour
ALTER TABLE dq_demo.customer_data SET DATA METRIC FUNCTION SCHEDULE = 'USING CRON 0 * * * * UTC';
"""
execute_sql(conn, schedule_dmf_sql, "DMFs scheduled")

# 7. Create Expectations for Quality Checks
print("\nCreating Expectations for Quality Checks...")
create_expectations_sql = """
-- Create expectation for row count (should be > 0)
CREATE OR REPLACE EXPECTATION dq_demo.row_count_expectation
ON dq_demo.customer_data
FOR SNOWFLAKE.CORE.ROW_COUNT
EXPECT VALUE > 0;

-- Create expectation for email format (should be > 90%)
CREATE OR REPLACE EXPECTATION dq_demo.email_format_expectation
ON dq_demo.customer_data
FOR dq_demo.email_format_check('dq_demo.customer_data', 'email')
EXPECT VALUE > 90.0;

-- Create expectation for age range (should be > 95%)
CREATE OR REPLACE EXPECTATION dq_demo.age_range_expectation
ON dq_demo.customer_data
FOR dq_demo.age_range_check('dq_demo.customer_data', 'age')
EXPECT VALUE > 95.0;
"""
execute_sql(conn, create_expectations_sql, "Expectations created")

# 8. Monitor DMF Results
print("\nMonitoring DMF Results...")
monitor_dmf_sql = """
SELECT 
    table_name,
    metric_function_name,
    metric_value,
    expectation_result,
    check_time
FROM snowflake.account_usage.data_metric_function_results
WHERE table_name = 'CUSTOMER_DATA'
ORDER BY check_time DESC
LIMIT 10;
"""
df_dmf_results = execute_sql(conn, monitor_dmf_sql, "DMF results monitored")


# 17. Advanced Data Governance Features

Horizon Catalog provides comprehensive data governance capabilities that go beyond basic data quality monitoring, including advanced classification, lineage tracking, and compliance features.

## Advanced Governance Features:
- **Sensitive Data Classification**: Automatic detection and tagging of sensitive data
- **Data Lineage Tracking**: Complete data flow visualization
- **Access Policies**: Advanced row-level and column-level security
- **Object Tagging**: Hierarchical tagging system for data organization
- **Compliance Reporting**: Automated compliance and audit reporting
- **Data Retention Policies**: Automated data lifecycle management


In [None]:
# Advanced Data Governance Examples

# 1. Sensitive Data Classification
print("Setting up Sensitive Data Classification...")

# Create classification tags
create_classification_tags_sql = """
CREATE OR REPLACE TAG governance.data_classification (
    sensitivity_level STRING,
    data_type STRING,
    compliance_standard STRING,
    retention_period INTEGER,
    data_owner STRING
);

CREATE OR REPLACE TAG governance.pii_classification (
    pii_type STRING,
    encryption_required BOOLEAN,
    masking_required BOOLEAN,
    access_level STRING
);
"""
execute_sql(conn, create_classification_tags_sql, "Classification tags created")

# Apply classification tags to tables
apply_classification_sql = """
-- Classify customer data as sensitive
ALTER TABLE customers SET TAG governance.data_classification = (
    'HIGH',
    'CUSTOMER_DATA',
    'GDPR',
    2555,  -- 7 years
    'data_governance_team'
);

-- Classify email as PII
ALTER TABLE customers MODIFY COLUMN email SET TAG governance.pii_classification = (
    'EMAIL',
    TRUE,
    TRUE,
    'RESTRICTED'
);

-- Classify phone as PII
ALTER TABLE customers MODIFY COLUMN phone SET TAG governance.pii_classification = (
    'PHONE',
    TRUE,
    TRUE,
    'RESTRICTED'
);
"""
execute_sql(conn, apply_classification_sql, "Classification tags applied")

# 2. Advanced Access Policies
print("\nCreating Advanced Access Policies...")

# Create aggregation policy for sensitive data
create_aggregation_policy_sql = """
CREATE OR REPLACE AGGREGATION POLICY governance.customer_aggregation_policy
AS (agg_type STRING) RETURNS AGGREGATION_CONSTRAINT ->
    CASE 
        WHEN agg_type = 'COUNT' THEN AGGREGATION_CONSTRAINT(1, 1000)
        WHEN agg_type = 'SUM' THEN AGGREGATION_CONSTRAINT(1, 1000)
        WHEN agg_type = 'AVG' THEN AGGREGATION_CONSTRAINT(1, 1000)
        ELSE AGGREGATION_CONSTRAINT(1, 1000)
    END;

-- Apply aggregation policy to sensitive columns
ALTER TABLE customers MODIFY COLUMN email SET AGGREGATION POLICY governance.customer_aggregation_policy;
ALTER TABLE customers MODIFY COLUMN phone SET AGGREGATION POLICY governance.customer_aggregation_policy;
"""
execute_sql(conn, create_aggregation_policy_sql, "Aggregation policy created")

# 3. Data Lineage Tracking
print("\nSetting up Data Lineage Tracking...")

# Create lineage tracking table
create_lineage_table_sql = """
CREATE OR REPLACE TABLE governance.data_lineage (
    lineage_id STRING PRIMARY KEY,
    source_object STRING,
    target_object STRING,
    transformation_type STRING,
    business_rule STRING,
    data_flow_description STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
"""
execute_sql(conn, create_lineage_table_sql, "Lineage tracking table created")

# Insert lineage information
insert_lineage_sql = """
INSERT INTO governance.data_lineage VALUES
('LINEAGE001', 'customers', 'customer_summary', 'VIEW', 'Customer data aggregation', 'Creates customer summary view from raw customer data', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP()),
('LINEAGE002', 'iceberg_catalog.sales_data', 'revenue_analytics', 'AGGREGATION', 'Revenue calculation', 'Aggregates sales data by month for revenue analysis', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP()),
('LINEAGE003', 'customers', 'customer_kpi', 'JOIN_AGGREGATION', 'Customer metrics calculation', 'Joins customer data with sales data to calculate KPIs', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP());
"""
execute_sql(conn, insert_lineage_sql, "Lineage information inserted")


In [None]:
# Data Retention and Compliance Features

# 4. Data Retention Policies
print("\nSetting up Data Retention Policies...")

# Create retention policy
create_retention_policy_sql = """
CREATE OR REPLACE RETENTION POLICY governance.customer_retention_policy
AS (retention_period INTEGER) RETURNS RETENTION_CONSTRAINT ->
    CASE 
        WHEN retention_period <= 365 THEN RETENTION_CONSTRAINT('DELETE', CURRENT_DATE() + retention_period)
        WHEN retention_period <= 2555 THEN RETENTION_CONSTRAINT('ARCHIVE', CURRENT_DATE() + retention_period)
        ELSE RETENTION_CONSTRAINT('RETAIN', CURRENT_DATE() + retention_period)
    END;

-- Apply retention policy to customer data
ALTER TABLE customers SET RETENTION POLICY governance.customer_retention_policy(2555);  -- 7 years
"""
execute_sql(conn, create_retention_policy_sql, "Retention policy created")

# 5. Compliance Reporting
print("\nCreating Compliance Reporting...")

# Create compliance dashboard
create_compliance_dashboard_sql = """
CREATE OR REPLACE VIEW governance.compliance_dashboard AS
SELECT 
    'Data Classification' as compliance_area,
    COUNT(*) as total_objects,
    COUNT(CASE WHEN sensitivity_level = 'HIGH' THEN 1 END) as high_sensitivity_count,
    COUNT(CASE WHEN compliance_standard = 'GDPR' THEN 1 END) as gdpr_compliant_count
FROM (
    SELECT 
        table_name,
        tag_value:sensitivity_level::STRING as sensitivity_level,
        tag_value:compliance_standard::STRING as compliance_standard
    FROM snowflake.account_usage.tag_references
    WHERE tag_name = 'DATA_CLASSIFICATION'
) classified_data

UNION ALL

SELECT 
    'Data Quality' as compliance_area,
    COUNT(*) as total_checks,
    COUNT(CASE WHEN expectation_result = 'PASSED' THEN 1 END) as passed_checks,
    COUNT(CASE WHEN expectation_result = 'FAILED' THEN 1 END) as failed_checks
FROM snowflake.account_usage.data_metric_function_results
WHERE check_time >= DATEADD(day, -7, CURRENT_DATE());
"""
execute_sql(conn, create_compliance_dashboard_sql, "Compliance dashboard created")

# 6. Data Privacy Controls
print("\nImplementing Data Privacy Controls...")

# Create privacy policy
create_privacy_policy_sql = """
CREATE OR REPLACE PRIVACY POLICY governance.customer_privacy_policy
AS (privacy_level STRING) RETURNS PRIVACY_CONSTRAINT ->
    CASE 
        WHEN privacy_level = 'PUBLIC' THEN PRIVACY_CONSTRAINT('ALLOW_ALL')
        WHEN privacy_level = 'INTERNAL' THEN PRIVACY_CONSTRAINT('RESTRICT_EXTERNAL')
        WHEN privacy_level = 'CONFIDENTIAL' THEN PRIVACY_CONSTRAINT('RESTRICT_ALL')
        ELSE PRIVACY_CONSTRAINT('RESTRICT_ALL')
    END;

-- Apply privacy policy
ALTER TABLE customers SET PRIVACY POLICY governance.customer_privacy_policy('CONFIDENTIAL');
"""
execute_sql(conn, create_privacy_policy_sql, "Privacy policy created")

# 7. Governance Monitoring
print("\nSetting up Governance Monitoring...")

# Create governance monitoring view
governance_monitoring_sql = """
CREATE OR REPLACE VIEW governance.governance_monitoring AS
SELECT 
    'Tag Usage' as metric_name,
    COUNT(*) as total_tags,
    COUNT(DISTINCT tag_name) as unique_tags,
    COUNT(DISTINCT object_name) as tagged_objects
FROM snowflake.account_usage.tag_references

UNION ALL

SELECT 
    'Access Policies' as metric_name,
    COUNT(*) as total_policies,
    COUNT(DISTINCT policy_name) as unique_policies,
    COUNT(DISTINCT object_name) as protected_objects
FROM snowflake.account_usage.policy_references

UNION ALL

SELECT 
    'Data Quality' as metric_name,
    COUNT(*) as total_checks,
    COUNT(DISTINCT table_name) as monitored_tables,
    COUNT(CASE WHEN expectation_result = 'PASSED' THEN 1 END) as passed_checks
FROM snowflake.account_usage.data_metric_function_results
WHERE check_time >= DATEADD(day, -7, CURRENT_DATE());
"""
execute_sql(conn, governance_monitoring_sql, "Governance monitoring view created")


# 18. Catalog Management and Discovery

Horizon Catalog provides advanced catalog management capabilities for data discovery, metadata management, and catalog operations across different data sources and formats.

## Catalog Management Features:
- **Multi-Catalog Support**: Manage multiple catalogs from different sources
- **Catalog Discovery**: Automatic discovery of data sources
- **Metadata Synchronization**: Keep metadata in sync across catalogs
- **Catalog Federation**: Query across multiple catalogs
- **Data Source Integration**: Connect to various data sources
- **Catalog Versioning**: Track catalog changes over time


In [None]:
# Catalog Management and Discovery Examples

# 1. Multi-Catalog Setup
print("Setting up Multi-Catalog Environment...")

# Create additional catalogs
create_catalogs_sql = """
-- Create Delta Lake catalog
CREATE CATALOG IF NOT EXISTS delta_catalog
WITH (
    CATALOG_TYPE = 'DELTA',
    CATALOG_PROVIDER = 'DATABRICKS',
    CATALOG_SOURCE = 'EXTERNAL'
);

-- Create Hive catalog
CREATE CATALOG IF NOT EXISTS hive_catalog
WITH (
    CATALOG_TYPE = 'HIVE',
    CATALOG_PROVIDER = 'HADOOP',
    CATALOG_SOURCE = 'EXTERNAL'
);

-- Create unified catalog
CREATE CATALOG IF NOT EXISTS unified_catalog
WITH (
    CATALOG_TYPE = 'UNIFIED',
    CATALOG_PROVIDER = 'SNOWFLAKE',
    CATALOG_SOURCE = 'SNOWFLAKE'
);
"""
execute_sql(conn, create_catalogs_sql, "Multiple catalogs created")

# 2. Catalog Discovery and Registration
print("\nSetting up Catalog Discovery...")

# Register external data sources
register_sources_sql = """
-- Register S3 data source
CREATE OR REPLACE EXTERNAL STAGE s3_data_source
URL = 's3://my-data-bucket/'
CREDENTIALS = (AWS_KEY_ID = 'your_key' AWS_SECRET_KEY = 'your_secret');

-- Register Azure data source
CREATE OR REPLACE EXTERNAL STAGE azure_data_source
URL = 'azure://my-storage-account.blob.core.windows.net/my-container/'
CREDENTIALS = (AZURE_SAS_TOKEN = 'your_sas_token');

-- Register GCS data source
CREATE OR REPLACE EXTERNAL STAGE gcs_data_source
URL = 'gcs://my-bucket/'
CREDENTIALS = (GCS_SERVICE_ACCOUNT_KEY = 'your_service_account_key');
"""
execute_sql(conn, register_sources_sql, "External data sources registered")

# 3. Catalog Federation
print("\nSetting up Catalog Federation...")

# Create federated views across catalogs
create_federated_views_sql = """
-- Create federated view across Iceberg and Delta catalogs
CREATE OR REPLACE VIEW federated.sales_unified AS
SELECT 
    'iceberg' as source_catalog,
    order_id,
    customer_id,
    order_date,
    total_amount
FROM iceberg_catalog.sales_data

UNION ALL

SELECT 
    'delta' as source_catalog,
    order_id,
    customer_id,
    order_date,
    total_amount
FROM delta_catalog.sales_data;

-- Create cross-catalog analytics view
CREATE OR REPLACE VIEW federated.cross_catalog_analytics AS
SELECT 
    source_catalog,
    COUNT(*) as total_orders,
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value
FROM federated.sales_unified
GROUP BY source_catalog;
"""
execute_sql(conn, create_federated_views_sql, "Federated views created")

# 4. Metadata Synchronization
print("\nSetting up Metadata Synchronization...")

# Create metadata sync table
create_metadata_sync_sql = """
CREATE OR REPLACE TABLE catalog_management.metadata_sync (
    sync_id STRING PRIMARY KEY,
    source_catalog STRING,
    target_catalog STRING,
    sync_type STRING,
    last_sync_time TIMESTAMP,
    sync_status STRING,
    records_synced INTEGER,
    sync_duration_ms INTEGER
);
"""
execute_sql(conn, create_metadata_sync_sql, "Metadata sync table created")

# 5. Catalog Monitoring
print("\nSetting up Catalog Monitoring...")

# Create catalog monitoring view
catalog_monitoring_sql = """
CREATE OR REPLACE VIEW catalog_management.catalog_monitoring AS
SELECT 
    'Catalog Usage' as metric_name,
    catalog_name,
    COUNT(*) as total_queries,
    SUM(query_duration_ms) as total_duration_ms,
    AVG(query_duration_ms) as avg_duration_ms
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -7, CURRENT_DATE())
  AND catalog_name IS NOT NULL
GROUP BY catalog_name

UNION ALL

SELECT 
    'Catalog Objects' as metric_name,
    catalog_name,
    COUNT(*) as total_objects,
    COUNT(DISTINCT schema_name) as unique_schemas,
    COUNT(DISTINCT table_name) as unique_tables
FROM snowflake.information_schema.tables
WHERE catalog_name IS NOT NULL
GROUP BY catalog_name;
"""
execute_sql(conn, catalog_monitoring_sql, "Catalog monitoring view created")


# Enhanced Conclusion & Summary

This comprehensive notebook now covers **ALL** major features of Snowflake's Horizon Catalog, including the advanced data governance and data metric functions capabilities. The notebook provides practical examples and best practices for implementing a complete data platform.

## Complete Feature Coverage:

### **Core Data Management (Sections 1-3)**
- **Iceberg Tables**: ACID transactions, schema evolution, time travel
- **Traditional Tables & Views**: All table types, materialized views, dynamic tables
- **AI & ML Objects**: Model registry, ML functions, vector embeddings

### **Operations & Monitoring (Sections 4-6)**
- **Costs & Usage Monitoring**: Comprehensive cost tracking and optimization
- **Data Warehouses**: Multi-cluster auto-scaling, resource monitors
- **Data Shares**: Secure real-time data sharing with governance

### **Business Intelligence (Sections 7-9)**
- **Semantic Views**: Business-friendly data abstractions
- **Databases & Schemas**: Logical organization and data classification
- **Business Glossary**: Centralized business vocabulary and lineage

### **Automation & Security (Sections 10-12)**
- **Workflows**: Automated data pipelines with task orchestration
- **RBAC**: Fine-grained security with dynamic masking and row-level security
- **Audit Trail & Logging**: Comprehensive activity logging and compliance

### **Data Discovery & Quality (Sections 13-15)**
- **Metadata Retrieval**: Data discovery, relationship mapping, usage analytics
- **Data Quality**: Validation rules, automated monitoring, quality metrics
- **Practical Examples**: Real-world implementations and optimization strategies

### **Advanced Features (Sections 16-18)**
- **Data Metric Functions (DMFs)**: Advanced data quality monitoring with system and custom DMFs
- **Advanced Data Governance**: Sensitive data classification, lineage tracking, compliance
- **Catalog Management**: Multi-catalog support, federation, metadata synchronization

## Key Benefits of Horizon Catalog:

### **Enterprise-Grade Data Quality**
- **Automated Monitoring**: DMFs provide continuous data quality assessment
- **Custom Metrics**: Define business-specific quality measurements
- **Expectations**: Set pass/fail criteria for automated quality checks
- **Scheduling**: Regular quality checks without manual intervention

### **Comprehensive Data Governance**
- **Sensitive Data Classification**: Automatic detection and tagging of PII
- **Data Lineage**: Complete visibility into data flow and transformations
- **Access Policies**: Advanced security controls including aggregation policies
- **Compliance Reporting**: Automated compliance and audit reporting

### **Multi-Catalog Management**
- **Federation**: Query across multiple data sources seamlessly
- **Discovery**: Automatic discovery and registration of data sources
- **Synchronization**: Keep metadata in sync across different catalogs
- **Monitoring**: Track usage and performance across all catalogs

## Implementation Roadmap:

### **Phase 1: Foundation (Weeks 1-4)**
1. Set up basic Iceberg and traditional tables
2. Implement basic RBAC and security policies
3. Create semantic views for business users
4. Set up basic cost monitoring

### **Phase 2: Quality & Governance (Weeks 5-8)**
1. Implement DMFs for data quality monitoring
2. Set up sensitive data classification
3. Create data lineage documentation
4. Implement advanced access policies

### **Phase 3: Advanced Features (Weeks 9-12)**
1. Set up multi-catalog federation
2. Implement automated workflows
3. Create comprehensive compliance reporting
4. Set up advanced monitoring and alerting

## Resources and Documentation:

- **Snowflake Data Quality Documentation**: Comprehensive guide to DMFs
- **Snowflake Documentation**: Complete Snowflake platform documentation
- **Horizon Catalog Guide**: Official Horizon Catalog documentation
- **Best Practices Guide**: Implementation best practices
- **Security Guide**: Security implementation guide

## Next Steps:

1. **Start with Core Features**: Begin with Iceberg tables and basic governance
2. **Implement DMFs**: Set up data quality monitoring early in your implementation
3. **Establish Governance**: Create data classification and lineage tracking
4. **Scale Gradually**: Add advanced features as your needs grow
5. **Monitor and Optimize**: Continuously monitor performance and costs

---

**Note**: This notebook provides a complete implementation guide for Snowflake Horizon Catalog. Remember to update the connection configuration and adapt the examples to your specific environment and requirements.


# 19. Business Vocabulary & Data Dictionary

A comprehensive business vocabulary provides standardized definitions for business terms, metrics, and concepts, enabling better communication between business and technical teams while ensuring data consistency across the organization.

## Business Vocabulary Features:
- **Term Definitions**: Centralized business terminology
- **Metric Definitions**: Standardized calculation methods
- **Data Mappings**: Link business terms to data objects
- **Business Rules**: Document business logic and constraints
- **Term Relationships**: Define relationships between concepts
- **Version Control**: Track changes to business definitions


In [None]:
# Business Vocabulary & Data Dictionary Examples

# 1. Create Business Vocabulary Schema
print("Creating Business Vocabulary Framework...")

# Create comprehensive business vocabulary schema
create_vocabulary_schema_sql = """
CREATE OR REPLACE SCHEMA business_vocabulary;

-- Business Terms Dictionary
CREATE OR REPLACE TABLE business_vocabulary.business_terms (
    term_id STRING PRIMARY KEY,
    term_name STRING NOT NULL,
    definition STRING NOT NULL,
    category STRING,
    subcategory STRING,
    business_domain STRING,
    data_type STRING,
    calculation_method STRING,
    business_owner STRING,
    data_steward STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    version STRING DEFAULT '1.0',
    status STRING DEFAULT 'ACTIVE',
    approved_by STRING,
    approval_date TIMESTAMP
);

-- Business Metrics Dictionary
CREATE OR REPLACE TABLE business_vocabulary.business_metrics (
    metric_id STRING PRIMARY KEY,
    metric_name STRING NOT NULL,
    metric_definition STRING NOT NULL,
    calculation_formula STRING,
    data_source STRING,
    frequency STRING,
    unit_of_measure STRING,
    business_owner STRING,
    technical_owner STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    version STRING DEFAULT '1.0',
    status STRING DEFAULT 'ACTIVE'
);

-- Data Object Mappings
CREATE OR REPLACE TABLE business_vocabulary.data_mappings (
    mapping_id STRING PRIMARY KEY,
    business_term_id STRING,
    data_object_name STRING,
    data_object_type STRING,
    column_name STRING,
    mapping_type STRING,
    transformation_rule STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    FOREIGN KEY (business_term_id) REFERENCES business_vocabulary.business_terms(term_id)
);

-- Term Relationships
CREATE OR REPLACE TABLE business_vocabulary.term_relationships (
    relationship_id STRING PRIMARY KEY,
    source_term_id STRING,
    target_term_id STRING,
    relationship_type STRING,
    relationship_description STRING,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    FOREIGN KEY (source_term_id) REFERENCES business_vocabulary.business_terms(term_id),
    FOREIGN KEY (target_term_id) REFERENCES business_vocabulary.business_terms(term_id)
);
"""
execute_sql(conn, create_vocabulary_schema_sql, "Business vocabulary framework created")


In [None]:
# Business Terms and Definitions

# 2. Insert Comprehensive Business Terms
print("\nAdding Business Terms...")

# Insert core business terms
insert_business_terms_sql = """
INSERT INTO business_vocabulary.business_terms VALUES
-- Customer Analytics Terms
('TERM001', 'Customer', 'An individual or organization that purchases products or services from the company', 'Customer Analytics', 'Customer Management', 'Sales', 'Entity', 'Business Definition', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM002', 'Customer Lifetime Value (CLV)', 'Total revenue generated by a customer over their entire relationship with the company', 'Customer Analytics', 'Customer Value', 'Sales', 'Metric', 'SUM(total_amount) WHERE customer_id = X', 'marketing_manager', 'data_scientist', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM003', 'Customer Acquisition Cost (CAC)', 'Total cost of acquiring a new customer, including marketing and sales expenses', 'Customer Analytics', 'Customer Acquisition', 'Marketing', 'Metric', 'Total Marketing Spend / Number of New Customers', 'marketing_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM004', 'Churn Rate', 'Percentage of customers who stop using the service within a given time period', 'Customer Analytics', 'Customer Retention', 'Customer Success', 'Metric', '(Customers Lost / Total Customers) * 100', 'customer_success_manager', 'data_scientist', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM005', 'Customer Segment', 'A group of customers with similar characteristics, behaviors, or needs', 'Customer Analytics', 'Customer Segmentation', 'Marketing', 'Entity', 'Business Definition', 'marketing_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

-- Sales Terms
('TERM006', 'Order', 'A request from a customer to purchase products or services', 'Sales', 'Order Management', 'Sales', 'Entity', 'Business Definition', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM007', 'Revenue', 'Total income generated from sales of products or services', 'Sales', 'Financial Metrics', 'Finance', 'Metric', 'SUM(order_amount)', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM008', 'Average Order Value (AOV)', 'Average amount spent per order', 'Sales', 'Order Metrics', 'Sales', 'Metric', 'SUM(total_amount) / COUNT(orders)', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM009', 'Sales Conversion Rate', 'Percentage of leads or prospects that convert to paying customers', 'Sales', 'Conversion Metrics', 'Sales', 'Metric', '(Converted Leads / Total Leads) * 100', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

-- Product Terms
('TERM010', 'Product', 'A tangible or intangible item that is offered for sale', 'Product Management', 'Product Catalog', 'Product', 'Entity', 'Business Definition', 'product_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM011', 'Product Category', 'A classification of products based on their characteristics or use', 'Product Management', 'Product Classification', 'Product', 'Entity', 'Business Definition', 'product_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM012', 'Inventory Turnover', 'Number of times inventory is sold and replaced in a given period', 'Product Management', 'Inventory Metrics', 'Operations', 'Metric', 'Cost of Goods Sold / Average Inventory', 'operations_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

-- Financial Terms
('TERM013', 'Gross Profit', 'Revenue minus cost of goods sold', 'Finance', 'Profitability', 'Finance', 'Metric', 'Revenue - Cost of Goods Sold', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM014', 'Net Profit Margin', 'Percentage of revenue that remains as profit after all expenses', 'Finance', 'Profitability', 'Finance', 'Metric', '(Net Profit / Revenue) * 100', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP()),

('TERM015', 'Monthly Recurring Revenue (MRR)', 'Total predictable revenue generated from subscriptions each month', 'Finance', 'Recurring Revenue', 'Finance', 'Metric', 'SUM(monthly_subscription_amount)', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE', 'cfo', CURRENT_TIMESTAMP());
"""
execute_sql(conn, insert_business_terms_sql, "Business terms added")

# 3. Insert Business Metrics
print("\nAdding Business Metrics...")

insert_business_metrics_sql = """
INSERT INTO business_vocabulary.business_metrics VALUES
('METRIC001', 'Customer Lifetime Value', 'Total revenue generated by a customer over their entire relationship', 'SUM(total_amount) WHERE customer_id = X', 'customers, sales_data', 'Monthly', 'USD', 'marketing_manager', 'data_scientist', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC002', 'Customer Acquisition Cost', 'Total cost of acquiring a new customer', 'Total Marketing Spend / Number of New Customers', 'marketing_data, customer_data', 'Monthly', 'USD', 'marketing_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC003', 'Churn Rate', 'Percentage of customers who stop using the service', '(Customers Lost / Total Customers) * 100', 'customers', 'Monthly', 'Percentage', 'customer_success_manager', 'data_scientist', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC004', 'Average Order Value', 'Average amount spent per order', 'SUM(total_amount) / COUNT(orders)', 'sales_data', 'Daily', 'USD', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC005', 'Sales Conversion Rate', 'Percentage of leads that convert to customers', '(Converted Leads / Total Leads) * 100', 'leads_data, customers', 'Weekly', 'Percentage', 'sales_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC006', 'Monthly Recurring Revenue', 'Total predictable revenue from subscriptions', 'SUM(monthly_subscription_amount)', 'subscription_data', 'Monthly', 'USD', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC007', 'Gross Profit Margin', 'Percentage of revenue remaining after COGS', '(Gross Profit / Revenue) * 100', 'sales_data, cost_data', 'Monthly', 'Percentage', 'finance_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE'),

('METRIC008', 'Inventory Turnover', 'Number of times inventory is sold and replaced', 'Cost of Goods Sold / Average Inventory', 'inventory_data, sales_data', 'Monthly', 'Ratio', 'operations_manager', 'data_analyst', CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), '1.0', 'ACTIVE');
"""
execute_sql(conn, insert_business_metrics_sql, "Business metrics added")


In [None]:
# Data Mappings and Term Relationships

# 4. Create Data Object Mappings
print("\nCreating Data Object Mappings...")

insert_data_mappings_sql = """
INSERT INTO business_vocabulary.data_mappings VALUES
-- Customer mappings
('MAP001', 'TERM001', 'customers', 'TABLE', 'customer_id', 'Direct', 'Primary key for customer identification', CURRENT_TIMESTAMP()),
('MAP002', 'TERM001', 'customers', 'TABLE', 'first_name', 'Direct', 'Customer first name', CURRENT_TIMESTAMP()),
('MAP003', 'TERM001', 'customers', 'TABLE', 'last_name', 'Direct', 'Customer last name', CURRENT_TIMESTAMP()),
('MAP004', 'TERM001', 'customers', 'TABLE', 'email', 'Direct', 'Customer email address', CURRENT_TIMESTAMP()),

-- Order mappings
('MAP005', 'TERM006', 'iceberg_catalog.sales_data', 'TABLE', 'order_id', 'Direct', 'Unique identifier for each order', CURRENT_TIMESTAMP()),
('MAP006', 'TERM006', 'iceberg_catalog.sales_data', 'TABLE', 'customer_id', 'Foreign Key', 'Reference to customer who placed the order', CURRENT_TIMESTAMP()),
('MAP007', 'TERM006', 'iceberg_catalog.sales_data', 'TABLE', 'order_date', 'Direct', 'Date when the order was placed', CURRENT_TIMESTAMP()),
('MAP008', 'TERM006', 'iceberg_catalog.sales_data', 'TABLE', 'total_amount', 'Direct', 'Total amount of the order', CURRENT_TIMESTAMP()),

-- Product mappings
('MAP009', 'TERM010', 'staging_products', 'TABLE', 'product_id', 'Direct', 'Unique identifier for each product', CURRENT_TIMESTAMP()),
('MAP010', 'TERM010', 'staging_products', 'TABLE', 'product_name', 'Direct', 'Name of the product', CURRENT_TIMESTAMP()),
('MAP011', 'TERM011', 'staging_products', 'TABLE', 'category', 'Direct', 'Product category classification', CURRENT_TIMESTAMP()),
('MAP012', 'TERM010', 'staging_products', 'TABLE', 'price', 'Direct', 'Price of the product', CURRENT_TIMESTAMP()),

-- Calculated metrics mappings
('MAP013', 'TERM002', 'customer_kpi', 'VIEW', 'lifetime_value', 'Calculated', 'SUM(total_amount) for each customer', CURRENT_TIMESTAMP()),
('MAP014', 'TERM008', 'customer_kpi', 'VIEW', 'avg_order_value', 'Calculated', 'AVG(total_amount) for each customer', CURRENT_TIMESTAMP()),
('MAP015', 'TERM007', 'revenue_analytics', 'VIEW', 'total_revenue', 'Calculated', 'SUM(total_amount) by month', CURRENT_TIMESTAMP());
"""
execute_sql(conn, insert_data_mappings_sql, "Data mappings created")

# 5. Create Term Relationships
print("\nCreating Term Relationships...")

insert_term_relationships_sql = """
INSERT INTO business_vocabulary.term_relationships VALUES
-- Customer-related relationships
('REL001', 'TERM001', 'TERM002', 'CALCULATES', 'Customer entity is used to calculate Customer Lifetime Value', CURRENT_TIMESTAMP()),
('REL001', 'TERM001', 'TERM003', 'CALCULATES', 'Customer entity is used to calculate Customer Acquisition Cost', CURRENT_TIMESTAMP()),
('REL001', 'TERM001', 'TERM004', 'CALCULATES', 'Customer entity is used to calculate Churn Rate', CURRENT_TIMESTAMP()),
('REL001', 'TERM001', 'TERM005', 'CLASSIFIES', 'Customer entity is classified into Customer Segments', CURRENT_TIMESTAMP()),

-- Order-related relationships
('REL002', 'TERM006', 'TERM007', 'CALCULATES', 'Order entity is used to calculate Revenue', CURRENT_TIMESTAMP()),
('REL002', 'TERM006', 'TERM008', 'CALCULATES', 'Order entity is used to calculate Average Order Value', CURRENT_TIMESTAMP()),
('REL002', 'TERM001', 'TERM006', 'PLACES', 'Customer places Orders', CURRENT_TIMESTAMP()),
('REL002', 'TERM010', 'TERM006', 'CONTAINS', 'Order contains Products', CURRENT_TIMESTAMP()),

-- Product-related relationships
('REL003', 'TERM010', 'TERM011', 'CLASSIFIED_BY', 'Product is classified by Product Category', CURRENT_TIMESTAMP()),
('REL003', 'TERM010', 'TERM012', 'CALCULATES', 'Product entity is used to calculate Inventory Turnover', CURRENT_TIMESTAMP()),

-- Financial relationships
('REL004', 'TERM007', 'TERM013', 'CALCULATES', 'Revenue is used to calculate Gross Profit', CURRENT_TIMESTAMP()),
('REL004', 'TERM013', 'TERM014', 'CALCULATES', 'Gross Profit is used to calculate Net Profit Margin', CURRENT_TIMESTAMP()),
('REL004', 'TERM015', 'TERM007', 'SUBSET_OF', 'Monthly Recurring Revenue is a subset of total Revenue', CURRENT_TIMESTAMP());
"""
execute_sql(conn, insert_term_relationships_sql, "Term relationships created")

# 6. Create Business Vocabulary Views
print("\nCreating Business Vocabulary Views...")

create_vocabulary_views_sql = """
-- Business Terms Summary View
CREATE OR REPLACE VIEW business_vocabulary.terms_summary AS
SELECT 
    bt.term_name,
    bt.definition,
    bt.category,
    bt.subcategory,
    bt.business_domain,
    bt.business_owner,
    bt.data_steward,
    bt.status,
    bt.version,
    COUNT(dm.mapping_id) as data_mappings_count
FROM business_vocabulary.business_terms bt
LEFT JOIN business_vocabulary.data_mappings dm ON bt.term_id = dm.business_term_id
GROUP BY bt.term_id, bt.term_name, bt.definition, bt.category, bt.subcategory, 
         bt.business_domain, bt.business_owner, bt.data_steward, bt.status, bt.version;

-- Metrics Summary View
CREATE OR REPLACE VIEW business_vocabulary.metrics_summary AS
SELECT 
    bm.metric_name,
    bm.metric_definition,
    bm.calculation_formula,
    bm.data_source,
    bm.frequency,
    bm.unit_of_measure,
    bm.business_owner,
    bm.technical_owner,
    bm.status,
    bm.version
FROM business_vocabulary.business_metrics bm;

-- Data Lineage View
CREATE OR REPLACE VIEW business_vocabulary.data_lineage_view AS
SELECT 
    bt.term_name,
    bt.definition,
    dm.data_object_name,
    dm.data_object_type,
    dm.column_name,
    dm.mapping_type,
    dm.transformation_rule
FROM business_vocabulary.business_terms bt
JOIN business_vocabulary.data_mappings dm ON bt.term_id = dm.business_term_id
ORDER BY bt.term_name, dm.data_object_name;
"""
execute_sql(conn, create_vocabulary_views_sql, "Business vocabulary views created")


In [None]:
# Business Vocabulary Analytics and Reporting

# 7. Create Business Vocabulary Analytics
print("\nCreating Business Vocabulary Analytics...")

# Query business terms by category
business_terms_analytics_sql = """
SELECT 
    category,
    subcategory,
    COUNT(*) as term_count,
    COUNT(CASE WHEN status = 'ACTIVE' THEN 1 END) as active_terms,
    COUNT(DISTINCT business_owner) as unique_owners,
    COUNT(DISTINCT data_steward) as unique_stewards
FROM business_vocabulary.business_terms
GROUP BY category, subcategory
ORDER BY category, subcategory;
"""
df_business_terms_analytics = execute_sql(conn, business_terms_analytics_sql, "Business terms analytics completed")

# Query metrics by frequency
metrics_analytics_sql = """
SELECT 
    frequency,
    unit_of_measure,
    COUNT(*) as metric_count,
    COUNT(DISTINCT business_owner) as unique_owners,
    COUNT(DISTINCT technical_owner) as unique_technical_owners
FROM business_vocabulary.business_metrics
GROUP BY frequency, unit_of_measure
ORDER BY frequency, unit_of_measure;
"""
df_metrics_analytics = execute_sql(conn, metrics_analytics_sql, "Metrics analytics completed")

# Query data mappings by object type
mappings_analytics_sql = """
SELECT 
    dm.data_object_type,
    dm.mapping_type,
    COUNT(*) as mapping_count,
    COUNT(DISTINCT dm.data_object_name) as unique_objects,
    COUNT(DISTINCT dm.business_term_id) as unique_terms
FROM business_vocabulary.data_mappings dm
GROUP BY dm.data_object_type, dm.mapping_type
ORDER BY dm.data_object_type, dm.mapping_type;
"""
df_mappings_analytics = execute_sql(conn, mappings_analytics_sql, "Mappings analytics completed")

# 8. Create Business Vocabulary Dashboard
print("\nCreating Business Vocabulary Dashboard...")

create_dashboard_sql = """
CREATE OR REPLACE VIEW business_vocabulary.vocabulary_dashboard AS
SELECT 
    'Business Terms' as vocabulary_type,
    COUNT(*) as total_items,
    COUNT(CASE WHEN status = 'ACTIVE' THEN 1 END) as active_items,
    COUNT(DISTINCT category) as unique_categories,
    COUNT(DISTINCT business_owner) as unique_owners
FROM business_vocabulary.business_terms

UNION ALL

SELECT 
    'Business Metrics' as vocabulary_type,
    COUNT(*) as total_items,
    COUNT(CASE WHEN status = 'ACTIVE' THEN 1 END) as active_items,
    COUNT(DISTINCT frequency) as unique_categories,
    COUNT(DISTINCT business_owner) as unique_owners
FROM business_vocabulary.business_metrics

UNION ALL

SELECT 
    'Data Mappings' as vocabulary_type,
    COUNT(*) as total_items,
    COUNT(*) as active_items,
    COUNT(DISTINCT data_object_type) as unique_categories,
    COUNT(DISTINCT business_term_id) as unique_owners
FROM business_vocabulary.data_mappings;
"""
execute_sql(conn, create_dashboard_sql, "Business vocabulary dashboard created")


# Final Enhanced Conclusion & Summary

This comprehensive notebook now covers **ALL** major features of Snowflake's Horizon Catalog, including advanced data governance, data metric functions, and comprehensive business vocabulary capabilities. The notebook provides a complete enterprise-ready implementation guide for modern data platforms.

## Complete Feature Coverage (19 Sections):

### **Core Data Management (Sections 1-3)**
- **Iceberg Tables**: ACID transactions, schema evolution, time travel
- **Traditional Tables & Views**: All table types, materialized views, dynamic tables
- **AI & ML Objects**: Model registry, ML functions, vector embeddings

### **Operations & Monitoring (Sections 4-6)**
- **Costs & Usage Monitoring**: Comprehensive cost tracking and optimization
- **Data Warehouses**: Multi-cluster auto-scaling, resource monitors
- **Data Shares**: Secure real-time data sharing with governance

### **Business Intelligence (Sections 7-9)**
- **Semantic Views**: Business-friendly data abstractions
- **Databases & Schemas**: Logical organization and data classification
- **Business Glossary**: Centralized business vocabulary and lineage

### **Automation & Security (Sections 10-12)**
- **Workflows**: Automated data pipelines with task orchestration
- **RBAC**: Fine-grained security with dynamic masking and row-level security
- **Audit Trail & Logging**: Comprehensive activity logging and compliance

### **Data Discovery & Quality (Sections 13-15)**
- **Metadata Retrieval**: Data discovery, relationship mapping, usage analytics
- **Data Quality**: Validation rules, automated monitoring, quality metrics
- **Practical Examples**: Real-world implementations and optimization strategies

### **Advanced Features (Sections 16-18)**
- **Data Metric Functions (DMFs)**: Advanced data quality monitoring with system and custom DMFs
- **Advanced Data Governance**: Sensitive data classification, lineage tracking, compliance
- **Catalog Management**: Multi-catalog support, federation, metadata synchronization

### **Business Vocabulary (Section 19)**
- **Business Terms Dictionary**: Centralized business terminology with definitions
- **Business Metrics**: Standardized calculation methods and formulas
- **Data Mappings**: Link business terms to data objects and columns
- **Term Relationships**: Define relationships between business concepts
- **Vocabulary Analytics**: Search, reporting, and analytics for business vocabulary

## Key Benefits of Complete Horizon Catalog Implementation:

### **Enterprise-Grade Data Quality**
- **Automated Monitoring**: DMFs provide continuous data quality assessment
- **Custom Metrics**: Define business-specific quality measurements
- **Expectations**: Set pass/fail criteria for automated quality checks
- **Scheduling**: Regular quality checks without manual intervention

### **Comprehensive Data Governance**
- **Sensitive Data Classification**: Automatic detection and tagging of PII
- **Data Lineage**: Complete visibility into data flow and transformations
- **Access Policies**: Advanced security controls including aggregation policies
- **Compliance Reporting**: Automated compliance and audit reporting

### **Business Vocabulary Management**
- **Standardized Terminology**: Consistent business language across the organization
- **Data-Business Alignment**: Clear mapping between business terms and data objects
- **Searchable Dictionary**: Easy discovery of business terms and definitions
- **Version Control**: Track changes to business definitions over time

### **Multi-Catalog Management**
- **Federation**: Query across multiple data sources seamlessly
- **Discovery**: Automatic discovery and registration of data sources
- **Synchronization**: Keep metadata in sync across different catalogs
- **Monitoring**: Track usage and performance across all catalogs

## Implementation Roadmap:

### **Phase 1: Foundation (Weeks 1-4)**
1. Set up basic Iceberg and traditional tables
2. Implement basic RBAC and security policies
3. Create semantic views for business users
4. Set up basic cost monitoring

### **Phase 2: Quality & Governance (Weeks 5-8)**
1. Implement DMFs for data quality monitoring
2. Set up sensitive data classification
3. Create data lineage documentation
4. Implement advanced access policies

### **Phase 3: Business Vocabulary (Weeks 9-10)**
1. Create comprehensive business terms dictionary
2. Map business terms to data objects
3. Define business metrics and calculations
4. Set up vocabulary search and analytics

### **Phase 4: Advanced Features (Weeks 11-12)**
1. Set up multi-catalog federation
2. Implement automated workflows
3. Create comprehensive compliance reporting
4. Set up advanced monitoring and alerting

## Resources and Documentation:

- **Snowflake Data Quality Documentation**: Comprehensive guide to DMFs
- **Snowflake Documentation**: Complete Snowflake platform documentation
- **Horizon Catalog Guide**: Official Horizon Catalog documentation
- **Best Practices Guide**: Implementation best practices
- **Security Guide**: Security implementation guide
- **Business Vocabulary Guide**: Data governance and business terminology

## Next Steps:

1. **Start with Core Features**: Begin with Iceberg tables and basic governance
2. **Implement DMFs**: Set up data quality monitoring early in your implementation
3. **Establish Business Vocabulary**: Create standardized business terminology
4. **Scale Gradually**: Add advanced features as your needs grow
5. **Monitor and Optimize**: Continuously monitor performance and costs

---

**Note**: This notebook provides a complete implementation guide for Snowflake Horizon Catalog with comprehensive business vocabulary management. Remember to update the connection configuration and adapt the examples to your specific environment and requirements.


# Additional Resources & Learning Path

## Recommended Learning Path

### **Beginner Level (Weeks 1-2)**
1. **Snowflake Fundamentals**: Learn basic SQL and Snowflake concepts
2. **Data Types**: Understand Snowflake data types and best practices
3. **Basic Queries**: Practice with SELECT, JOIN, and aggregation functions
4. **Table Creation**: Learn to create and manage basic tables

### **Intermediate Level (Weeks 3-6)**
1. **Advanced SQL**: Window functions, CTEs, and complex queries
2. **Data Loading**: Bulk loading, streaming, and data transformation
3. **Views and Materialized Views**: Create and optimize views
4. **Security Basics**: Users, roles, and basic permissions

### **Advanced Level (Weeks 7-12)**
1. **Horizon Catalog Features**: Implement all features from this notebook
2. **Data Governance**: Advanced security, masking, and compliance
3. **Performance Optimization**: Query optimization and warehouse sizing
4. **Automation**: Tasks, streams, and dynamic tables

## Community Resources

- **Snowflake Community**: [community.snowflake.com](https://community.snowflake.com)
- **Snowflake University**: Free online courses and certifications
- **Snowflake Blog**: Latest updates and best practices
- **GitHub Examples**: Open-source Snowflake projects and scripts
- **Stack Overflow**: Community Q&A for technical questions

## Certification Path

1. **SnowPro Core Certification**: Fundamental Snowflake knowledge
2. **SnowPro Advanced Certifications**: Specialized areas (Data Engineer, Data Scientist, etc.)
3. **SnowPro Architect Certification**: Advanced architecture and design
4. **Continuous Learning**: Stay updated with new features and best practices


# Troubleshooting & Common Issues

## Common Implementation Issues

### **Connection Issues**
- **Problem**: Cannot connect to Snowflake
- **Solution**: Verify account URL, username, password, and warehouse settings
- **Check**: Network connectivity and firewall settings
- **Verify**: Account status and user permissions

### **Permission Errors**
- **Problem**: "Insufficient privileges" errors
- **Solution**: Grant appropriate roles and permissions
- **Check**: Role hierarchy and privilege inheritance
- **Verify**: Object ownership and access policies

### **Performance Issues**
- **Problem**: Slow query execution
- **Solution**: Optimize queries, use appropriate warehouse size
- **Check**: Query execution plan and resource usage
- **Verify**: Data clustering and partitioning strategies

### **Data Quality Issues**
- **Problem**: DMFs not working as expected
- **Solution**: Verify table permissions and DMF syntax
- **Check**: Data types and column names
- **Verify**: Schedule configuration and warehouse availability

## Best Practices Checklist

### **Security**
- [ ] Use least privilege principle for roles
- [ ] Implement data masking for sensitive data
- [ ] Set up row-level security where needed
- [ ] Regular access reviews and audits
- [ ] Enable multi-factor authentication

### **Performance**
- [ ] Right-size warehouses for workloads
- [ ] Use clustering keys for large tables
- [ ] Implement materialized views for frequent queries
- [ ] Monitor and optimize query performance
- [ ] Set up auto-suspend for cost optimization

### **Data Governance**
- [ ] Classify sensitive data appropriately
- [ ] Document data lineage and business terms
- [ ] Implement data quality monitoring
- [ ] Set up retention policies
- [ ] Regular compliance reviews

### **Cost Management**
- [ ] Monitor credit usage and costs
- [ ] Set up resource monitors and alerts
- [ ] Optimize warehouse sizing
- [ ] Review and clean up unused objects
- [ ] Implement cost allocation strategies


# Appendix: Quick Reference Guide

## SQL Quick Reference

### **Common DDL Commands**
```sql
-- Create database
CREATE DATABASE database_name;

-- Create schema
CREATE SCHEMA schema_name;

-- Create table
CREATE TABLE table_name (column1 TYPE, column2 TYPE);

-- Create view
CREATE VIEW view_name AS SELECT * FROM table_name;

-- Create materialized view
CREATE MATERIALIZED VIEW mv_name AS SELECT * FROM table_name;
```

### **Common DML Commands**
```sql
-- Insert data
INSERT INTO table_name VALUES (value1, value2);

-- Update data
UPDATE table_name SET column = value WHERE condition;

-- Delete data
DELETE FROM table_name WHERE condition;

-- Select data
SELECT column1, column2 FROM table_name WHERE condition;
```

### **Security Commands**
```sql
-- Create role
CREATE ROLE role_name;

-- Grant privileges
GRANT SELECT ON TABLE table_name TO ROLE role_name;

-- Create masking policy
CREATE MASKING POLICY policy_name AS (val STRING) RETURNS STRING -> '***';

-- Create row access policy
CREATE ROW ACCESS POLICY policy_name AS (col STRING) RETURNS BOOLEAN -> TRUE;
```

## Horizon Catalog Features Summary

| Feature | Description | Use Case |
|---------|-------------|----------|
| Iceberg Tables | Open table format with ACID transactions | Data lake integration, schema evolution |
| DMFs | Data Metric Functions for quality monitoring | Automated data quality checks |
| Business Vocabulary | Centralized business terminology | Data governance, business alignment |
| Multi-Catalog | Support for multiple data sources | Data federation, unified querying |
| Advanced Security | Row-level, column-level, aggregation policies | Compliance, data protection |
| Data Lineage | Track data flow and transformations | Governance, impact analysis |

## Contact & Support

- **Snowflake Support**: [support.snowflake.com](https://support.snowflake.com)
- **Documentation**: [docs.snowflake.com](https://docs.snowflake.com)
- **Community Forum**: [community.snowflake.com](https://community.snowflake.com)
- **Training**: [learn.snowflake.com](https://learn.snowflake.com)

---

**Notebook Version**: 1.0  
**Last Updated**: January 2025  
**Total Sections**: 19  
**Total Code Cells**: 52  

*This notebook provides a comprehensive guide to Snowflake Horizon Catalog implementation. For the latest updates and features, please refer to the official Snowflake documentation.*
