# Step 3: SQL connectivity


In [0]:

%pip install great_expectations sqlalchemy pyodbc pandas


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:

import sys
import os


is_databricks = False
try:
    dbutils.fs.ls('/')
    is_databricks = True

except NameError:

    raise EnvironmentError("This notebook requires Databricks environment")



gx_available = False
try:
    import great_expectations as gx
   
    gx_available = True
except ImportError:
    print()


if not gx_available:

    
   
    %pip install great-expectations[sql,azure,databricks,postgresql,snowflake] --upgrade --quiet
    
   
    %pip install sqlalchemy>=2.0.0 pandas>=1.5.0 plotly pyodbc --upgrade --quiet
    

    dbutils.library.restartPython()



try:
    import great_expectations as gx
    import pandas as pd
    import sqlalchemy
    

except ImportError as e:
    print(f"{e}")



In [0]:
try:

    step2_results = dbutils.notebook.run("./step2_gx_context_setup", 0)
    step2_data = json.loads(step2_results)
    
    if step2_data.get("context_created"):
        print(f"Step 2 context available: {step2_data.get('context_type')}")
        discovered_methods = step2_data.get("context_methods", [])
        method_categories = step2_data.get("method_categories", {})
        print(f"Total methods discovered: {len(discovered_methods)}")
    else:
        print(f"Step 2 context not available, creating fresh context")
        discovered_methods = []
        method_categories = {}
        
except Exception as e:
    print(f"Could not load Step 2 results: {e}")
    discovered_methods = []
    method_categories = {}

Could not load Step 2 results: name 'json' is not defined


In [0]:



import pandas as pd
import json
import warnings
from datetime import datetime
from pyspark.sql import SparkSession


warnings.filterwarnings('ignore', category=DeprecationWarning, module='pyspark')

step3_results = {
    "status": "running",
    "unity_catalog_connection": False,
    "data_loaded": False,
    "target_table": None,
    "record_count": 0,
    "columns": [],
    "data_types": {},
    "sample_data": [],
    "ready_for_step4": False,
    "error_message": None,
    "connection_method": "spark_sql"
}




UC_CONFIG = {
    "catalog": "aueasset_edp-unitycatalog-tst",
    "schema": "aca", 
    "table": "dq_error_result",
    "full_table_name": "`aueasset_edp-unitycatalog-tst`.`aca`.`dq_error_result`"
}


CRITICAL_COLUMNS = ["WellCode", "Process", "record_create_date"]
MAX_ROWS_TO_LOAD = 1000  



step3_results["target_table"] = UC_CONFIG['full_table_name']
step3_results["critical_columns"] = CRITICAL_COLUMNS


try:

    spark_session = spark  
   
    

    test_query = f"DESCRIBE TABLE {UC_CONFIG['full_table_name']}"

    

    schema_df = spark.sql(test_query)
    schema_info = schema_df.collect()
    
    column_names = [row['col_name'] for row in schema_info if row['col_name'] not in ['', None]]
    column_types = [row['data_type'] for row in schema_info if row['col_name'] not in ['', None]]
    

    print(f"Table schema loaded: {len(column_names)} columns")
    
    step3_results["unity_catalog_connection"] = True
    step3_results["columns"] = column_names
    step3_results["data_types"] = dict(zip(column_names, column_types))
    

    missing_critical = [col for col in CRITICAL_COLUMNS if col not in column_names]
    if missing_critical:
        print(f"Missing critical columns: {missing_critical}")
    else:
        print(f"All critical columns present: {CRITICAL_COLUMNS}")
    
except Exception as e:
    print(f"Unity Catalog connection failed: {e}")
    step3_results["error_message"] = f"Unity Catalog connection failed: {e}"
    step3_results["status"] = "error"
    raise




def load_unity_catalog_data():
    """
    Load data from Unity Catalog using Spark SQL (native Databricks approach)
    """
    try:
        print(f"Target table: {UC_CONFIG['full_table_name']}")
        

        query = f"""
        SELECT 
            WellCode,
            Process,
            record_create_date,
            dq_date,
            rule_name,
            rule_status,
            severity,
            sum_frs_rows,
            sum_frs_covered,
            sum_frs_not_covered
        FROM {UC_CONFIG['full_table_name']}
        ORDER BY record_create_date DESC
        LIMIT {MAX_ROWS_TO_LOAD}
        """
        
        print(f"Executing Unity Catalog query...")
        spark_df = spark.sql(query)
        

        unity_df = spark_df.toPandas()
        

        print(f"Shape: {unity_df.shape[0]:,} rows × {unity_df.shape[1]} columns")
        print(f"Columns: {list(unity_df.columns)}")
        

        print(f"Data Quality Summary:")
        if 'WellCode' in unity_df.columns:
            print(f"  • Unique Wells: {unity_df['WellCode'].nunique():,}")
        if 'Process' in unity_df.columns:
            print(f"  • Unique Processes: {unity_df['Process'].nunique()}")
        if 'record_create_date' in unity_df.columns:
            print(f"  • Date Range: {unity_df['record_create_date'].min()} to {unity_df['record_create_date'].max()}")
        
        null_count = unity_df.isnull().sum().sum()
        print(f"  • Missing Values: {null_count:,}")
        
        return unity_df, True
        
    except Exception as e:
        print(f"Data loading failed: {e}")
        return None, False


unity_df, data_loading_success = load_unity_catalog_data()

if data_loading_success and unity_df is not None:
    step3_results["data_loaded"] = True
    step3_results["record_count"] = len(unity_df)
    step3_results["columns"] = list(unity_df.columns)
    

    step3_results["sample_data"] = unity_df.head(5).to_dict('records')
    

    

    critical_column_analysis = {}
    for col in CRITICAL_COLUMNS:
        if col in unity_df.columns:
            null_count = unity_df[col].isnull().sum()
            null_percentage = (null_count / len(unity_df)) * 100
            unique_count = unity_df[col].nunique()
            
            critical_column_analysis[col] = {
                "exists": True,
                "null_count": int(null_count),
                "null_percentage": float(null_percentage),
                "unique_count": int(unique_count),
                "data_type": str(unity_df[col].dtype)
            }
            
            status = "" if null_count == 0 else ""
            print(f"   {status} {col}: {null_count} nulls ({null_percentage:.1f}%)")
        else:
            critical_column_analysis[col] = {
                "exists": False,
                "null_count": None,
                "null_percentage": None,
                "unique_count": None,
                "data_type": None
            }
            print("")
    
    step3_results["critical_column_analysis"] = critical_column_analysis
    

    globals()['unity_df'] = unity_df
    globals()['df'] = unity_df 
    globals()['UC_CONFIG'] = UC_CONFIG
    globals()['CRITICAL_COLUMNS'] = CRITICAL_COLUMNS
    globals()['unity_data_loaded'] = True
    globals()['data_available'] = True
    


    
else:
    step3_results["error_message"] = "Data loading failed"
    step3_results["status"] = "error"
    

    globals()['unity_df'] = None
    globals()['df'] = None
    globals()['unity_data_loaded'] = False
    globals()['data_available'] = False


ready_for_step4 = (
    step3_results["unity_catalog_connection"] and
    step3_results["data_loaded"] and
    step3_results["record_count"] > 0
)

step3_results["ready_for_step4"] = ready_for_step4
step3_results["status"] = "success" if ready_for_step4 else "error"



if ready_for_step4:
    print(f"")

else:

    if step3_results.get("error_message"):
        print(f"   • {step3_results['error_message']}")








print(json.dumps({k: v for k, v in step3_results.items() if k not in ['sample_data']}, indent=2))



if ready_for_step4:
    print("")

else:

    print("")

print("=" * 60)




Table schema loaded: 13 columns
All critical columns present: ['WellCode', 'Process', 'record_create_date']
Target table: `aueasset_edp-unitycatalog-tst`.`aca`.`dq_error_result`
Executing Unity Catalog query...
Shape: 1,000 rows × 10 columns
Columns: ['WellCode', 'Process', 'record_create_date', 'dq_date', 'rule_name', 'rule_status', 'severity', 'sum_frs_rows', 'sum_frs_covered', 'sum_frs_not_covered']
Data Quality Summary:
  • Unique Wells: 508
  • Unique Processes: 2
  • Date Range: 2025-07-01 08:49:09.466000 to 2025-07-01 08:49:09.466000
  • Missing Values: 0
    WellCode: 0 nulls (0.0%)
    Process: 0 nulls (0.0%)
    record_create_date: 0 nulls (0.0%)

{
  "status": "success",
  "unity_catalog_connection": true,
  "data_loaded": true,
  "target_table": "`aueasset_edp-unitycatalog-tst`.`aca`.`dq_error_result`",
  "record_count": 1000,
  "columns": [
    "WellCode",
    "Process",
    "record_create_date",
    "dq_date",
    "rule_name",
    "rule_status",
    "severity",
    "sum_f

In [0]:

def clean_for_json(obj):
    """
    Clean object for JSON serialization by converting non-serializable types
    """
    import datetime
    import numpy as np
    
    if isinstance(obj, dict):
        return {key: clean_for_json(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [clean_for_json(item) for item in obj]
    elif isinstance(obj, (pd.Timestamp, datetime.datetime, datetime.date, np.datetime64)):
        return obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)
    elif isinstance(obj, (pd.Series, pd.DataFrame)):
        return obj.to_dict() if hasattr(obj, 'to_dict') else str(obj)
    elif hasattr(obj, 'item'):  # numpy types
        return obj.item()
    elif isinstance(obj, (np.integer, np.int32, np.int64)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float32, np.float64)):
        return float(obj)
    elif isinstance(obj, (np.bool_, bool)):
        return bool(obj)
    elif not isinstance(obj, (str, int, float, bool, type(None))):
        return str(obj)
    else:
        return obj

# Clean and return results for notebook orchestration
step3_results = clean_for_json(step3_results)
dbutils.notebook.exit(json.dumps(step3_results))


In [0]:


# print(f"SAVING STEP 3 RESULTS TO DBFS")
# print("-" * 50)

# try:
#     import os
#     import json
#     from datetime import datetime
    
#     # Create results directory if it doesn't exist
#     results_dir = "/dbfs/FileStore/great_expectations/step_results/"
#     os.makedirs(results_dir, exist_ok=True)
    
#     # Save step3 results with timestamp
#     result_file = f"{results_dir}step3_results.json"
    
#     # Add timestamp to results for tracking
#     timestamped_results = {
#         **step3_results,
#         "saved_timestamp": datetime.now().isoformat(),
#         "step_name": "step3"
#     }
    
#     with open(result_file, 'w') as f:
#         json.dump(timestamped_results, f, indent=2, default=str)
        
#     print(f"Step 3 results saved to: {result_file}")
#     print(f"Records: {step3_results.get('record_count', 0)} loaded")
#     print(f"Columns: {len(step3_results.get('columns', []))} discovered")
#     print(f"Status: {step3_results.get('status', 'unknown')}")
#     print(f"Available for downstream steps")
    
# except Exception as e:
#     print(f"Could not save step3 results: {e}")
#     print(f"Results still available via notebook return value")

# print("-" * 50)