In [0]:
import requests
import json
import time
import urllib.parse
import re
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor
import threading

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

print("=== DUR PRODUCT APIs 1 & 3 - FULL EXTRACTION (ALL RECORDS) ===")

# Configuration
ENCODED_SERVICE_KEY = "h9Dbf2cz0HOrqZb5BIqrfrti%2FD5zZLTYAxFpQuywAB7ZUx3yb67jBDuD5uNlHvAszz9c14NffOmMNQjGv5FzwA%3D%3D"
SERVICE_KEY = urllib.parse.unquote(ENCODED_SERVICE_KEY)
BASE_URL = "https://apis.data.go.kr/1471000/DURPrdlstInfoService03"

# Full extraction configuration
API_CONFIGS = [
    {
        "name": "병용금기",
        "api_id": 1,
        "endpoint": "getUsjntTabooInfoList03",
        "expected_records": 240873,
        "table_name": "main.default.dur_product_interaction_bronze",
        "description": "Drug Interaction Contraindications",
        "size_category": "LARGE"
    },
    {
        "name": "DUR품목정보",
        "api_id": 3,
        "endpoint": "getDurPrdlstInfoList03",
        "expected_records": 24065,
        "table_name": "main.default.dur_product_info_bronze",
        "description": "General DUR Product Info",
        "size_category": "MEDIUM"
    }
]

# Thread lock for safe logging
log_lock = threading.Lock()

def safe_print(message):
    """Thread-safe printing with timestamp"""
    with log_lock:
        timestamp = time.strftime("%H:%M:%S")
        print(f"[{timestamp}] {message}")

def clean_column_names(df):
    """Clean column names to remove invalid characters"""
    old_columns = df.columns
    new_columns = []
    
    for col_name in old_columns:
        # Remove invalid characters, spaces, and normalize
        clean_name = re.sub(r'[^a-zA-Z0-9_]', '_', col_name.strip())
        clean_name = re.sub(r'_+', '_', clean_name).strip('_')
        
        if clean_name and clean_name[0].isdigit():
            clean_name = 'col_' + clean_name
        if not clean_name:
            clean_name = f'col_{len(new_columns)}'
            
        new_columns.append(clean_name)
    
    # Rename columns
    for old_col, new_col in zip(old_columns, new_columns):
        if old_col != new_col:
            df = df.withColumnRenamed(old_col, new_col)
    
    return df, dict(zip(old_columns, new_columns))

def clean_record(record):
    """Clean record - convert all values to strings"""
    cleaned = {}
    for key, value in record.items():
        cleaned[key] = "" if value is None else str(value)
    return cleaned

def make_api_call(endpoint, page_no, num_rows=100):
    """Make API call with retry logic"""
    url = f"{BASE_URL}/{endpoint}"
    params = {
        "serviceKey": SERVICE_KEY,
        "pageNo": page_no,
        "numOfRows": num_rows,
        "type": "json"
    }
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, headers=headers, timeout=30)
            response.raise_for_status()
            data = response.json()
            
            header = data.get("header", {})
            if header.get("resultCode") != "00":
                raise Exception(f"API Error: {header.get('resultMsg')}")
            
            return data
            
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = (attempt + 1) * 2  # Exponential backoff
                time.sleep(wait_time)
                continue
            else:
                raise e

def extract_full_data(config):
    """Extract all data from API with enhanced progress tracking"""
    safe_print(f"🚀 Starting {config['name']} FULL extraction...")
    safe_print(f"   Expected: {config['expected_records']:,} records ({config['size_category']})")
    
    all_records = []
    page_no = 1
    total_count = None
    last_progress_report = 0
    errors_count = 0
    max_errors = 10
    
    # Progress reporting intervals based on size
    if config['size_category'] == 'LARGE':
        progress_interval = 5.0  # Report every 5%
        page_report_interval = 100  # Report every 100 pages
    else:
        progress_interval = 10.0  # Report every 10%
        page_report_interval = 50   # Report every 50 pages
    
    start_time = time.time()
    
    while True:
        try:
            # Dynamic delay based on dataset size and page number
            if page_no > 1:
                if config['size_category'] == 'LARGE':
                    delay = 0.5 if page_no % 100 == 0 else 0.3  # Longer delay every 100 pages
                else:
                    delay = 0.3
                time.sleep(delay)
            
            data = make_api_call(config['endpoint'], page_no)
            
            body = data.get("body", {})
            if total_count is None:
                total_count = body.get("totalCount", 0)
                estimated_pages = (total_count + 99) // 100
                estimated_time = estimated_pages * 0.4 / 60  # Rough estimate in minutes
                safe_print(f"   📊 {config['name']}: {total_count:,} records ({estimated_pages:,} pages)")
                safe_print(f"   ⏱️  Estimated processing time: {estimated_time:.0f} minutes")
            
            items = body.get("items", [])
            if not items:
                safe_print(f"   ✅ {config['name']}: No more data at page {page_no}")
                break
            
            # Clean each record
            cleaned_items = [clean_record(item) for item in items]
            all_records.extend(cleaned_items)
            
            # Progress reporting
            current_progress = (len(all_records) / total_count * 100) if total_count > 0 else 0
            
            # Report based on progress percentage or page intervals
            should_report = (
                (current_progress - last_progress_report >= progress_interval) or
                (page_no % page_report_interval == 0) or
                (len(all_records) >= total_count)
            )
            
            if should_report:
                elapsed_time = time.time() - start_time
                records_per_min = (len(all_records) / elapsed_time * 60) if elapsed_time > 0 else 0
                remaining_records = total_count - len(all_records)
                eta_minutes = (remaining_records / records_per_min) if records_per_min > 0 else 0
                
                safe_print(f"   ✅ {config['name']} Page {page_no}: {len(all_records):,}/{total_count:,} "
                          f"({current_progress:.1f}%) | {records_per_min:.0f} rec/min | ETA: {eta_minutes:.0f}min")
                last_progress_report = current_progress
            
            # Check completion
            if len(all_records) >= total_count:
                safe_print(f"   🎉 {config['name']}: All records collected!")
                break
            
            page_no += 1
            
            # Safety check
            max_expected_pages = (config['expected_records'] + 99) // 100 + 50
            if page_no > max_expected_pages:
                safe_print(f"   ⚠️  {config['name']}: Safety break at page {page_no}")
                break
                
        except Exception as e:
            errors_count += 1
            safe_print(f"   ❌ {config['name']}: Error on page {page_no} (error {errors_count}/{max_errors}): {str(e)}")
            
            if errors_count >= max_errors:
                safe_print(f"   🛑 {config['name']}: Too many errors, stopping extraction")
                break
            
            if page_no == 1:
                safe_print(f"   🛑 {config['name']}: Critical error on first page, stopping")
                break
            
            # Continue with next page for transient errors
            page_no += 1
            time.sleep(2)  # Wait longer after error
            continue
    
    # Final extraction summary
    total_time = time.time() - start_time
    completeness = (len(all_records) / config['expected_records'] * 100) if config['expected_records'] > 0 else 0
    
    safe_print(f"   🏁 {config['name']}: Extraction complete!")
    safe_print(f"      Records: {len(all_records):,} ({completeness:.1f}% complete)")
    safe_print(f"      Time: {total_time/60:.1f} minutes")
    safe_print(f"      Performance: {len(all_records)/(total_time/60):.0f} records/minute")
    safe_print(f"      Errors: {errors_count}")
    
    return all_records, total_count

def create_bronze_table(config, records):
    """Create bronze table with enhanced error handling"""
    if not records:
        safe_print(f"   ❌ {config['name']}: No records to process")
        return None
    
    try:
        safe_print(f"   💾 {config['name']}: Creating bronze table...")
        safe_print(f"      Processing {len(records):,} records...")
        
        # Create DataFrame
        df_start = time.time()
        df = spark.createDataFrame(records)
        df_time = time.time() - df_start
        
        # Clean column names
        clean_start = time.time()
        cleaned_df, column_mapping = clean_column_names(df)
        clean_time = time.time() - clean_start
        
        if column_mapping:
            cleaned_count = sum(1 for old, new in column_mapping.items() if old != new)
            safe_print(f"      🔧 Cleaned {cleaned_count} column names in {clean_time:.1f}s")
        
        safe_print(f"      ✅ DataFrame ready: {cleaned_df.count():,} records, {len(cleaned_df.columns)} columns ({df_time:.1f}s)")
        
        # Write to Delta table
        write_start = time.time()
        cleaned_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(config['table_name'])
        write_time = time.time() - write_start
        
        safe_print(f"      ✅ Bronze table created: {config['table_name']} ({write_time:.1f}s)")
        
        # Verification
        verification_df = spark.table(config['table_name'])
        verified_count = verification_df.count()
        
        safe_print(f"      📊 Verification: {verified_count:,} records in table")
        
        # Data integrity check
        integrity_ok = verified_count == len(records)
        safe_print(f"      🔍 Data integrity: {'✅ Perfect' if integrity_ok else '⚠️ Check needed'}")
        
        return {
            'name': config['name'],
            'table_name': config['table_name'],
            'records': verified_count,
            'expected': config['expected_records'],
            'columns': len(cleaned_df.columns),
            'column_mapping': column_mapping,
            'integrity_ok': integrity_ok
        }
        
    except Exception as e:
        safe_print(f"   ❌ {config['name']}: Error creating table - {str(e)}")
        safe_print(f"   🔄 Trying SQL fallback approach...")
        
        try:
            # SQL fallback
            temp_df = spark.createDataFrame(records)
            cleaned_temp_df, _ = clean_column_names(temp_df)
            
            temp_view_name = f"temp_api_{config['api_id']}_full"
            cleaned_temp_df.createOrReplaceTempView(temp_view_name)
            
            spark.sql(f"""
                CREATE OR REPLACE TABLE {config['table_name']}
                USING DELTA
                AS SELECT * FROM {temp_view_name}
            """)
            
            safe_print(f"      ✅ Bronze table created via SQL: {config['table_name']}")
            
            verification_df = spark.table(config['table_name'])
            verified_count = verification_df.count()
            
            return {
                'name': config['name'],
                'table_name': config['table_name'],
                'records': verified_count,
                'expected': config['expected_records'],
                'columns': len(cleaned_temp_df.columns),
                'column_mapping': {},
                'integrity_ok': verified_count == len(records)
            }
            
        except Exception as e2:
            safe_print(f"   ❌ {config['name']}: SQL fallback also failed - {str(e2)}")
            return None

def process_full_api(config):
    """Process a single API full extraction"""
    start_time = time.time()
    
    safe_print(f"\n{'='*60}")
    safe_print(f"Processing API {config['api_id']}: {config['name']} (FULL)")
    safe_print(f"Description: {config['description']}")
    safe_print(f"Size: {config['size_category']} - {config['expected_records']:,} records")
    safe_print(f"{'='*60}")
    
    try:
        # Extract all data
        records, total_count = extract_full_data(config)
        
        if not records:
            safe_print(f"   ❌ {config['name']}: No data extracted")
            return None
        
        # Create bronze table
        result = create_bronze_table(config, records)
        
        if result:
            processing_time = time.time() - start_time
            result['processing_time'] = processing_time
            
            records_per_sec = len(records) / processing_time if processing_time > 0 else 0
            safe_print(f"   ⏱️  {config['name']}: COMPLETED in {processing_time/60:.1f} minutes ({records_per_sec:.0f} rec/sec)")
            
            return result
        else:
            return None
        
    except Exception as e:
        safe_print(f"   💥 {config['name']}: FAILED - {str(e)}")
        return None

def main():
    """Main execution - process both APIs in parallel"""
    overall_start = time.time()
    
    total_expected = sum(config['expected_records'] for config in API_CONFIGS)
    
    print(f"📋 Processing APIs 1 & 3 - Full Extraction:")
    print(f"   Total expected records: {total_expected:,}")
    print(f"   Estimated total time: 60-90 minutes")
    
    for config in API_CONFIGS:
        print(f"   API {config['api_id']}: {config['name']} ({config['size_category']}: {config['expected_records']:,} records)")
        print(f"      Table: {config['table_name']}")
    
    print(f"\n🔄 Starting parallel full extraction...")
    print(f"   ⚠️  Large dataset - this will take significant time")
    print(f"   📊 Progress will be reported regularly")
    
    results = []
    
    # Process both APIs in parallel
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(process_full_api, config) for config in API_CONFIGS]
        
        for future in futures:
            result = future.result()
            if result:
                results.append(result)
    
    # Final comprehensive summary
    overall_time = time.time() - overall_start
    
    print(f"\n{'='*80}")
    print(f"🎉 === FULL EXTRACTION COMPLETED ===")
    print(f"{'='*80}")
    print(f"⏱️  Total processing time: {overall_time/60:.1f} minutes")
    print(f"✅ Successful extractions: {len(results)}/{len(API_CONFIGS)}")
    
    if results:
        print(f"\n📊 Final Results Summary:")
        print(f"{'API Name':<15} {'Records':<12} {'Expected':<12} {'Complete':<10} {'Columns':<8} {'Time':<10} {'Integrity':<10}")
        print(f"{'-'*85}")
        
        total_records = 0
        total_expected_final = 0
        
        for result in results:
            total_records += result['records']
            total_expected_final += result['expected']
            completeness = result['records'] / result['expected'] * 100
            time_min = result['processing_time'] / 60
            integrity = "✅ OK" if result['integrity_ok'] else "⚠️ Check"
            
            print(f"{result['name']:<15} "
                  f"{result['records']:<12,} "
                  f"{result['expected']:<12,} "
                  f"{completeness:<9.1f}% "
                  f"{result['columns']:<8} "
                  f"{time_min:<9.1f}m "
                  f"{integrity:<10}")
        
        print(f"{'-'*85}")
        overall_completeness = total_records / total_expected_final * 100 if total_expected_final > 0 else 0
        print(f"{'TOTAL':<15} {total_records:<12,} {total_expected_final:<12,} {overall_completeness:<9.1f}%")
        
        print(f"\n💾 Bronze Tables Created:")
        for result in results:
            print(f"   ✅ {result['table_name']}")
        
        print(f"\n🏆 Final Success Summary:")
        print(f"   📊 Total records extracted: {total_records:,}")
        print(f"   🎯 Overall completeness: {overall_completeness:.1f}%")
        print(f"   ⚡ Average extraction speed: {total_records/(overall_time/60):.0f} records/minute")
        print(f"   🏗️  Bronze tables ready for silver layer transformations!")
        
        # Performance insights
        if len(results) >= 2:
            api1_result = next((r for r in results if 'interaction' in r['table_name']), None)
            api3_result = next((r for r in results if 'info' in r['table_name']), None)
            
            if api1_result and api3_result:
                api1_speed = api1_result['records'] / (api1_result['processing_time'] / 60)
                api3_speed = api3_result['records'] / (api3_result['processing_time'] / 60)
                
                print(f"\n⚡ Performance Comparison:")
                print(f"   API 1 (Large): {api1_speed:.0f} records/minute")
                print(f"   API 3 (Medium): {api3_speed:.0f} records/minute")
        
        # Column cleaning summary
        total_columns_cleaned = sum(len([v for k, v in r.get('column_mapping', {}).items() if k != v]) for r in results)
        if total_columns_cleaned > 0:
            print(f"\n🔧 Schema Cleaning Summary:")
            print(f"   Total column names cleaned: {total_columns_cleaned}")
            
    else:
        print(f"\n❌ No extractions completed successfully")
        print(f"💡 Check logs above for specific error details")

# Execute the full extraction
if __name__ == "__main__":
    main()