# Step 2: Load Data from ADLS to Fabric Warehouse

This notebook loads data from Azure Data Lake Storage Gen2 (ADLS) to Microsoft Fabric Warehouse.

## Process Overview
1. Configure connection parameters
2. Connect to target Fabric Warehouse
3. Setup external objects (credential, data source, file format)
4. Discover extracted tables in ADLS
5. Create schemas and tables in Fabric Warehouse
6. Load data using COPY INTO
7. Update statistics
8. Validate row counts (optional)

## Prerequisites
- Completed Step 1: Data extraction to ADLS
- Access to Microsoft Fabric Warehouse
- Access to ADLS Gen2 storage account
- Appropriate permissions (see PERMISSIONS_GUIDE.md)
- ODBC Driver 17 for SQL Server installed

## Configuration

Update the configuration parameters below with your environment details.

In [None]:
# Target Fabric Warehouse Configuration
target_workspace = "<your-workspace-name>"
target_warehouse = "<your-warehouse-name>"

# Azure Data Lake Storage Configuration
storage_account = "<your-storage-account>"
container = "migration-staging"

# Optional: Source database for validation
validate_row_counts = False  # Set to True to validate row counts against source
source_server = "<your-synapse-server>.sql.azuresynapse.net"  # Only needed if validate_row_counts = True
source_database = "<your-database-name>"  # Only needed if validate_row_counts = True

# Migration Settings
update_statistics = True  # Update statistics after loading
batch_size = 50  # Number of tables to process in each batch

# Authentication Configuration
# Options: 'token', 'interactive'
auth_type = 'token'  # Use 'token' in Fabric notebooks with managed identity

print("Configuration loaded successfully ✓")
print(f"Target: {target_workspace}/{target_warehouse}")
print(f"Storage: {storage_account}/{container}")
print(f"Row count validation: {'Enabled' if validate_row_counts else 'Disabled'}")

## Setup and Import Helper Functions

Load the migration helper functions for database connections and utilities.

In [None]:
# Import helper functions
import sys
sys.path.append('/lakehouse/default/Files/notebooks/utils')

from migration_helpers import ConnectionHelper, MigrationUtils, StorageHelper, Colors
import time
from datetime import datetime

print("Helper functions imported successfully ✓")

## Connect to Fabric Warehouse

Establish connection to Microsoft Fabric Warehouse.

In [None]:
# Get authentication token for Fabric
auth_config = {}

if auth_type == 'token':
    # Get token from Fabric runtime
    token = ConnectionHelper.get_spark_token("https://analysis.windows.net/powerbi/api")
    auth_config = {'auth_type': 'token', 'token': token}
else:
    auth_config = {'auth_type': auth_type}

# Connect to Fabric Warehouse
target_conn = ConnectionHelper.connect_fabric_warehouse(target_workspace, target_warehouse, auth_config)

# Optionally connect to source for validation
source_conn = None
if validate_row_counts:
    print("\nConnecting to source database for validation...")
    source_auth_config = {'auth_type': auth_type}
    if auth_type == 'token':
        source_token = ConnectionHelper.get_spark_token("https://database.windows.net/.default")
        source_auth_config['token'] = source_token
    source_conn = ConnectionHelper.connect_azure_sql(source_server, source_database, source_auth_config)

print("\n" + "="*70)
print("Connections established successfully!")
print("="*70)

## Setup External Objects

Create external objects in Fabric Warehouse required for data loading:
- Database scoped credential
- External data source pointing to ADLS
- External file format (Parquet)

In [None]:
# Setup external objects in Fabric Warehouse
setup_success = MigrationUtils.setup_external_objects(target_conn, storage_account, container)

if not setup_success:
    raise Exception("Failed to setup external objects. Please check permissions and retry.")

print("\n" + "="*70)
print("External objects created successfully!")
print("="*70)

## Discover Tables in Storage

Scan ADLS to identify all tables that were extracted and are ready to be loaded.

This uses the folder structure (schema/table) to identify tables.

In [None]:
# List all schemas and tables from ADLS by scanning folder structure
print(f"{Colors.BLUE}Discovering tables in storage...{Colors.END}")

# Use Spark to list directories
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

storage_path = StorageHelper.get_adls_path(storage_account, container)

# Get list of schemas (top-level directories)
try:
    # List all paths in the container
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    
    schemas_list = dbutils.fs.ls(storage_path)
    
    tables_to_load = []
    
    for schema_item in schemas_list:
        if schema_item.isDir():
            schema_name = schema_item.name.rstrip('/')
            
            # List tables in this schema
            tables_list = dbutils.fs.ls(schema_item.path)
            
            for table_item in tables_list:
                if table_item.isDir():
                    table_name = table_item.name.rstrip('/')
                    tables_to_load.append((schema_name, table_name))
    
    print(f"{Colors.GREEN}✅ Found {len(tables_to_load)} tables in storage{Colors.END}\n")
    
    # Display tables
    if len(tables_to_load) > 0:
        print(f"{Colors.BOLD}Tables to load:{Colors.END}")
        for i, (schema, table) in enumerate(tables_to_load[:20], 1):
            print(f"  {i:2d}. [{schema}].[{table}]")
        if len(tables_to_load) > 20:
            print(f"  ... and {len(tables_to_load) - 20} more tables")
        print()
    
except Exception as e:
    print(f"{Colors.RED}❌ Failed to discover tables: {e}{Colors.END}")
    print("Note: Make sure you have access to the storage account and container.")
    raise

## Load Tables to Fabric Warehouse

Load each table from ADLS to Fabric Warehouse using COPY INTO.

This process:
- Creates schemas if they don't exist
- Creates tables with appropriate schema
- Loads data using COPY INTO with retry logic
- Tracks progress and errors
- Optionally validates row counts

In [None]:
# Initialize tracking
loading_stats = {
    'total': len(tables_to_load),
    'loaded': 0,
    'failed': 0,
    'validated': 0,
    'validation_failed': 0,
    'start_time': datetime.now()
}

failed_tables = []

print("\n" + "="*70)
print("Starting Table Loading")
print("="*70 + "\n")

cursor = target_conn.cursor()

for idx, (schema, table) in enumerate(tables_to_load, 1):
    try:
        print(f"\n[{idx}/{len(tables_to_load)}] Loading [{schema}].[{table}]...")
        
        start_time = time.time()
        
        # Create schema if not exists
        cursor.execute(f"""
            IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
            BEGIN
                EXEC('CREATE SCHEMA [{schema}]')
            END
        """)
        
        # Drop table if exists (for clean reload)
        cursor.execute(f"""
            IF EXISTS (SELECT * FROM sys.tables t
                      INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                      WHERE s.name = '{schema}' AND t.name = '{table}')
            BEGIN
                DROP TABLE [{schema}].[{table}]
            END
        """)
        
        # Load data using COPY INTO
        location = f"{schema}/{table}/"
        
        # Retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                cursor.execute(f"""
                    COPY INTO [{schema}].[{table}]
                    FROM '{location}'
                    WITH (
                        DATA_SOURCE = 'MigrationStaging',
                        FILE_TYPE = 'PARQUET',
                        MAXERRORS = 10000,
                        ERRORFILE = 'errors/{schema}/{table}/'
                    )
                """)
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    print(f"   Retry {attempt + 1}/{max_retries} after {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    raise e
        
        # Get row count
        cursor.execute(f"SELECT COUNT(*) FROM [{schema}].[{table}]")
        row_count = cursor.fetchone()[0]
        
        target_conn.commit()
        
        duration = time.time() - start_time
        loading_stats['loaded'] += 1
        
        print(f"{Colors.GREEN}   ✅ Loaded {row_count:,} rows in {duration:.1f}s{Colors.END}")
        
        # Validate row count if enabled
        if validate_row_counts and source_conn:
            validation = MigrationUtils.validate_row_count(source_conn, target_conn, schema, table)
            if validation['status'] == 'success' and validation['match']:
                loading_stats['validated'] += 1
                print(f"   {Colors.GREEN}✓ Row count validated{Colors.END}")
            elif validation['status'] == 'mismatch':
                loading_stats['validation_failed'] += 1
                print(f"   {Colors.RED}✗ Row count mismatch: source={validation['source_count']:,}, target={validation['target_count']:,}{Colors.END}")
        
        print(f"   Progress: {loading_stats['loaded']}/{loading_stats['total']}")
        
    except Exception as e:
        loading_stats['failed'] += 1
        failed_tables.append((schema, table, str(e)))
        print(f"{Colors.RED}   ❌ Failed: {e}{Colors.END}")
        continue

loading_stats['end_time'] = datetime.now()

# Print summary
duration = (loading_stats['end_time'] - loading_stats['start_time']).total_seconds()

print("\n" + "="*70)
print("LOADING SUMMARY")
print("="*70)
print(f"Total tables:       {loading_stats['total']}")
print(f"Loaded:             {Colors.GREEN}{loading_stats['loaded']}{Colors.END}")
print(f"Failed:             {Colors.RED}{loading_stats['failed']}{Colors.END}")

if validate_row_counts:
    print(f"Validated:          {Colors.GREEN}{loading_stats['validated']}{Colors.END}")
    print(f"Validation failed:  {Colors.RED}{loading_stats['validation_failed']}{Colors.END}")

print(f"Duration:           {duration:.1f} seconds ({duration/60:.1f} minutes)")
print(f"Start time:         {loading_stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"End time:           {loading_stats['end_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)

if loading_stats['failed'] > 0:
    print(f"\n{Colors.YELLOW}⚠️  Failed Tables:{Colors.END}")
    for schema, table, error in failed_tables:
        print(f"  - [{schema}].[{table}]: {error}")
else:
    print(f"\n{Colors.GREEN}✅ All tables loaded successfully!{Colors.END}")

## Update Statistics

Update statistics on all loaded tables for optimal query performance.

In [None]:
if update_statistics:
    print(f"{Colors.BLUE}Updating statistics...{Colors.END}\n")
    
    cursor = target_conn.cursor()
    
    # Get all user tables
    cursor.execute("""
        SELECT s.name, t.name
        FROM sys.tables t
        INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
        WHERE s.name NOT IN ('sys', 'INFORMATION_SCHEMA')
    """)
    
    all_tables = cursor.fetchall()
    
    stats_updated = 0
    stats_failed = 0
    
    for schema, table in all_tables:
        try:
            cursor.execute(f"UPDATE STATISTICS [{schema}].[{table}]")
            stats_updated += 1
            print(f"   ✓ [{schema}].[{table}]")
        except Exception as e:
            stats_failed += 1
            print(f"   {Colors.YELLOW}⚠️  Failed for [{schema}].[{table}]: {e}{Colors.END}")
    
    target_conn.commit()
    print(f"\n{Colors.GREEN}✅ Statistics updated for {stats_updated} tables{Colors.END}")
    if stats_failed > 0:
        print(f"{Colors.YELLOW}⚠️  Failed to update statistics for {stats_failed} tables{Colors.END}")
else:
    print("Statistics update skipped (update_statistics = False)")

## Cleanup and Close Connections

Close all database connections.

In [None]:
# Close connections
if target_conn:
    target_conn.close()
    print(f"{Colors.GREEN}✅ Target connection closed{Colors.END}")

if source_conn:
    source_conn.close()
    print(f"{Colors.GREEN}✅ Source connection closed{Colors.END}")

print("\n" + "="*70)
print("Loading process completed!")
print("="*70)
print("\nNext steps:")
print("1. Verify tables in Fabric Warehouse")
print("2. Run notebook '03_validate_migration.ipynb' for comprehensive validation")
print("3. Test queries and performance")