# Country Currency Data Loading Notebook

## Overview
This notebook provides a comprehensive ETL (Extract, Transform, Load) pipeline for processing country-to-currency mapping data. It reads data from a CSV file stored in a Databricks volume and loads it into a Delta table with proper data quality checks and validation.

## Key Features
- **Modular Design**: Functions are organized for reusability and testing
- **Data Quality Checks**: Validates data integrity before loading
- **Error Handling**: Comprehensive error handling with detailed logging
- **Cross-Platform Support**: Works with both Windows and Linux environments
- **Parameterized Execution**: Uses Databricks widgets for flexible configuration

## Input Parameters
- **catalog_name**: The catalog where the schema exists (use 'hive_metastore' for trial accounts)
- **schema_name**: The schema where the table will be created
- **table_name**: The name of the target Delta table
- **csv_path**: Full path to the CSV file in the Databricks volume
- **warehouse_id**: SQL warehouse ID for table operations (optional)
- **warehouse_name**: SQL warehouse name for reference (optional)

## Output Table Schema
The output Delta table includes all columns from the source CSV file:
- `country_code` (STRING): ISO 3166-1 alpha-3 country code
- `country_number` (INT): ISO 3166-1 numeric country code
- `country` (STRING): Full country name
- `currency_name` (STRING): Official currency name
- `currency_code` (STRING): ISO 4217 currency code
- `currency_number` (INT): ISO 4217 numeric currency code

## Data Quality Checks
- Validates presence of key columns (country_code, currency_code)
- Checks for null values in critical fields
- Verifies data load completeness
- Provides detailed logging of any issues found

## Usage
1. Ensure the CSV file is uploaded to the specified volume path
2. Configure the input parameters using the notebook widgets
3. Run all cells to execute the complete ETL pipeline
4. Review the output logs to confirm successful data loading

In [None]:
# Import required libraries
from typing import Dict, Tuple, List, Optional, Any

In [None]:
# Define functions to make the code more modular and testable
def validate_parameters(params: Dict[str, str]) -> None:
    """
    Validate input parameters to ensure all required fields are present.
    
    Args:
        params: Dictionary containing input parameters
        
    Raises:
        ValueError: If any required parameter is missing
    """
    required_params = ["catalog_name", "schema_name", "table_name", "csv_path"]
    missing = [p for p in required_params if not params.get(p)]
    
    if missing:
        error_msg = f"Missing required parameters: {', '.join(missing)}"
        print(f"ERROR: {error_msg}")
        raise ValueError(error_msg)

In [None]:
def get_full_table_name(params: Dict[str, str]) -> str:
    """
    Construct the fully qualified table name with proper quoting.
    
    Args:
        params: Dictionary containing catalog_name, schema_name, and table_name
        
    Returns:
        String with properly formatted table name
    """
    return f"`{params['catalog_name']}`.`{params['schema_name']}`.`{params['table_name']}`"

In [None]:
def read_csv_data(spark_session: Any, csv_path: str) -> Any:
    """
    Read CSV data into a Spark DataFrame.
    
    Args:
        spark_session: Active Spark session
        csv_path: Path to the CSV file
        
    Returns:
        DataFrame containing the CSV data
    """
    df = spark_session.read.csv(
        csv_path,
        header=True,
        inferSchema=True,
        sep=",",
        nullValue="",
    )
    
    return df

In [None]:
def perform_data_quality_checks(df: Any) -> Tuple[List[Tuple[str, int]], int]:
    """
    Perform data quality checks on the DataFrame.
    
    Args:
        df: DataFrame to check
        
    Returns:
        Tuple containing null counts and total record count
        
    Raises:
        ValueError: If the DataFrame is empty
    """
    # Check for nulls in key columns
    key_columns = ["country_code", "currency_code"]
    null_counts = [(col_name, df.filter(df[col_name].isNull()).count()) 
                  for col_name in key_columns]
    
    # Get total record count
    record_count = df.count()
    
    if record_count == 0:
        raise ValueError("CSV file contains no data to load")
        
    return null_counts, record_count

In [None]:
def write_to_delta_table(df: Any, full_table_name: str) -> None:
    """
    Write DataFrame to a Delta table.
    
    Args:
        df: DataFrame to write
        full_table_name: Full table name with catalog and schema
    """
    df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .option("mergeSchema", "true") \
        .saveAsTable(full_table_name)

In [None]:
def validate_data_load(spark_session: Any, full_table_name: str, expected_count: int) -> int:
    """
    Validate that data was loaded correctly.
    
    Args:
        spark_session: Active Spark session
        full_table_name: Full table name with catalog and schema
        expected_count: Expected number of rows
        
    Returns:
        Actual number of rows loaded
        
    Raises:
        Exception: If no data was loaded
    """
    count_df = spark_session.sql(f"SELECT COUNT(*) as row_count FROM {full_table_name}")
    row_count = count_df.collect()[0]['row_count']
    
    if row_count == 0:
        raise Exception("No data was loaded into the table")
        
    return row_count

In [None]:
# Define input parameters as widgets
dbutils.widgets.text("catalog_name", "", "Catalog Name")
dbutils.widgets.text("schema_name", "", "Schema Name")
dbutils.widgets.text("table_name", "", "Table Name")
dbutils.widgets.text("csv_path", "", "CSV Path")
dbutils.widgets.text("warehouse_id", "", "SQL Warehouse ID (optional)")
dbutils.widgets.text("warehouse_name", "", "SQL Warehouse Name (optional)")

In [None]:
def get_parameters_from_widgets():
    """
    Get parameters from notebook widgets.
    
    Returns:
        Dictionary containing parameters from widgets
    """
    return {
        "catalog_name": dbutils.widgets.get("catalog_name"),
        "schema_name": dbutils.widgets.get("schema_name"),
        "table_name": dbutils.widgets.get("table_name"),
        "csv_path": dbutils.widgets.get("csv_path"),
        "warehouse_id": dbutils.widgets.get("warehouse_id"),
        "warehouse_name": dbutils.widgets.get("warehouse_name")
    }

# Get parameters from widgets
params = get_parameters_from_widgets()

## Parameter Validation
Verify that all required parameters are provided

In [None]:
# Validate parameters
validate_parameters(params)

## Main Execution Function
Orchestrates the data loading process

In [None]:
def main(params):
    """
    Main execution function that orchestrates the data loading process.
    
    Args:
        params: Dictionary containing input parameters
    """
    try:
        # Log parameters for debugging and auditing
        print(f"Starting data loading process with parameters:")
        print(f"Catalog: {params['catalog_name']}")
        print(f"Schema: {params['schema_name']}")
        print(f"Table: {params['table_name']}")
        print(f"CSV Path: {params['csv_path']}")
        if params.get('warehouse_name'):
            print(f"Warehouse: {params['warehouse_name']}")
        
        # Get full table name
        full_table_name = get_full_table_name(params)
        
        print("Reading CSV data...")
        df = read_csv_data(spark, params['csv_path'])
        
        # Show sample data for verification
        print("Sample data from CSV:")
        df.show(5, truncate=False)
        
        # Perform data quality checks
        print("Performing data quality checks...")
        null_counts, record_count = perform_data_quality_checks(df)
        
        for col_name, count in null_counts:
            if count > 0:
                print(f"WARNING: Found {count} rows with null {col_name}")
        
        print(f"Total records to load: {record_count}")
        
        # Write data to the table using Delta format
        print(f"Writing data to table: {full_table_name}")
        write_to_delta_table(df, full_table_name)
        
        # Verify the data was loaded correctly
        print("Validating data load...")
        row_count = validate_data_load(spark, full_table_name, record_count)
        print(f"Successfully loaded {row_count} rows into {full_table_name}")
        
        if row_count != record_count:
            print(f"WARNING: Record count mismatch. CSV had {record_count} rows, but table has {row_count} rows")
        
        result = {
            "status": "success",
            "rows_processed": row_count,
            "table_name": full_table_name
        }
        
        print(f"Job completed successfully with result: {result}")
        
        return result
    except Exception as e:
        error_message = str(e)
        print(f"ERROR: Failed to load data: {error_message}")
        raise Exception(f"Failed to load data: {error_message}")

### Execute Main Data Processing Logic

In [None]:
# Execute the main function which will orchestrate the entire data loading process
result = main(params)
print("Data load process completed.")