In [None]:
# Import required libraries
import pandas as pd
import wbgapi as wb
import yaml
import os
import shutil
import time
from pathlib import Path
from typing import List, Dict, Tuple

print("✅ Libraries imported successfully")


In [None]:
def clean_output_directory(output_dir: str = "output") -> None:
    """
    Clean the output directory by removing all files and recreating it.
    Uses a more robust approach to handle Windows permission issues.
    
    Args:
        output_dir (str): Directory path to clean
    """
    import os
    import shutil
    import time
    
    # Check if output directory exists
    if os.path.exists(output_dir):
        try:
            # First try to remove individual files
            for filename in os.listdir(output_dir):
                file_path = os.path.join(output_dir, filename)
                try:
                    if os.path.isfile(file_path):
                        # Remove file
                        os.remove(file_path)
                        print(f"🗑️  Removed file: {filename}")
                    elif os.path.isdir(file_path):
                        # Remove subdirectory
                        shutil.rmtree(file_path)
                        print(f"🗑️  Removed subdirectory: {filename}")
                except PermissionError:
                    print(f"⚠️  Could not remove {filename} (permission denied)")
                except Exception as e:
                    print(f"⚠️  Error removing {filename}: {e}")
            
            # Now try to remove the directory itself
            try:
                # Wait a moment for Windows to release handles
                time.sleep(0.1)
                shutil.rmtree(output_dir)
                print(f"🗑️  Removed existing output directory: {output_dir}")
            except PermissionError:
                print(f"⚠️  Could not remove directory {output_dir} (permission denied)")
                print(f"    Directory contents have been cleared, continuing...")
            except Exception as e:
                print(f"⚠️  Error removing directory: {e}")
                
        except Exception as e:
            print(f"⚠️  Error accessing directory: {e}")
    
    # Create fresh output directory (or ensure it exists)
    os.makedirs(output_dir, exist_ok=True)
    print(f"📁 Output directory ready: {output_dir}")
    
    # List contents to verify cleanup
    try:
        contents = os.listdir(output_dir)
        if not contents:
            print(f"✅ Output directory is clean and ready")
        else:
            print(f"📋 Directory contains: {contents}")
    except Exception as e:
        print(f"⚠️  Could not list directory contents: {e}")

# Clean output directory before starting
clean_output_directory()
print("="*50)


In [None]:
def extract_yaml_instructions():
    """Extract configuration from YAML file."""
    yaml_file = Path.cwd() / "WorldBank.yaml"
    
    if not yaml_file.exists():
        print(f"❌ Error: WorldBank.yaml not found in current directory")
        print(f"   Looking for: {yaml_file}")
        return None, None, None, None
    
    try:
        with open(yaml_file, 'r') as f:
            config = yaml.safe_load(f)
            search_terms = config["search_terms"]
            countries = config["countries"]
            start_year = config.get("start_year", 2000)
            end_year = config.get("end_year", 2025)
            print(f"\n✅ Instructions retrieved from yaml file: {yaml_file}")
            return search_terms, countries, start_year, end_year
    except Exception as e:
        print(f"❌ Error reading YAML file: {e}")
        return None, None, None, None

# Load configuration from YAML file
config = extract_yaml_instructions()
search_terms, countries, start_year, end_year = config

print(f"✅ Configuration loaded:")
print(f"   Search terms: {search_terms}")
print(f"   Countries: {countries}")
print(f"   Date range: {start_year}-{end_year}")


In [None]:
def search_wb_indicators(search_term: str, max_results: int = 100) -> pd.DataFrame:
    """Search World Bank indicators by term"""
    try:
        # Search indicators
        results = wb.series.info(q=search_term)
        
        # Convert Featureset to DataFrame using .items
        df = pd.DataFrame(results.items).head(max_results)
        
        if df.empty:
            return pd.DataFrame({"id": [], "name": []})
        
        # Rename 'value' column to 'name' for consistency
        if 'value' in df.columns:
            df = df.rename(columns={'value': 'name'})
        
        # Select available columns
        available_cols = []
        for col in ["id", "name"]:
            if col in df.columns:
                available_cols.append(col)
        
        if available_cols:
            return pd.DataFrame(df[available_cols])
        else:
            return pd.DataFrame(df)
    
    except Exception as e:
        print(f"Error searching indicators: {e}")
        return pd.DataFrame({"id": [], "name": []})

def display_indicator_options(results: pd.DataFrame):
    """Display search results in a readable format"""
    print("\nSearch Results:")
    for idx, row in results.iterrows():
        name_val = row.get('name', row.get('value', 'N/A'))
        if name_val is not None:
            name = str(name_val)[:80] + "..." if len(str(name_val)) > 80 else str(name_val)
        else:
            name = 'N/A'
        print(f"[{idx}] {name} | ID: {row['id']}")

def search_result_getter(search_terms: list):
    """Get search results for all search terms."""
    search_results = {}
    for item in search_terms:
        print(f"\n==== Searching: {item['term']} ====")
        results = search_wb_indicators(item['term'])
        display_indicator_options(results)
        search_results[item['term']] = results
    return search_results

# Search for indicators
search_results = search_result_getter(search_terms=search_terms)

print(f"✅ Search completed for {len(search_terms)} terms")
print("Results will be displayed during selection phase.")


In [None]:
def get_user_selection_manual(results: pd.DataFrame, selected_indices: List[int]) -> List[Dict]:
    """Manual selection for notebook environments - pass indices directly"""
    selected = []
    
    for idx in selected_indices:
        if idx < len(results):
            row = results.iloc[idx]
            # Create clean label from name
            name = row.get('name', row.get('value', f'Indicator_{idx}'))
            label = str(name).split('(')[0].strip().replace(' ', '_').replace(',', '').replace(':', '')[:30]
            selected.append({
                "id": row['id'],
                "label": f"{label}_A",  # World Bank data is typically annual
                "name": str(name)
            })
    
    return selected

def get_user_indicator_selections(search_results: dict) -> list:
    """
    Prompt user to select indicators from each search term's results.
    Returns a list of selected indicators with their metadata.
    """
    all_selected = []
    
    print(f"\n🎯 INDICATOR SELECTION")
    print("=" * 50)
    
    for term, results in search_results.items():
        if results.empty:
            print(f"\n⚠️  No results found for '{term}' - skipping")
            continue
            
        print(f"\n{'='*50}")
        print(f"SELECTING INDICATORS FOR: {term.upper()}")
        print(f"{'='*50}")
        
        # Display the options again for this specific term
        display_indicator_options(results)
        
        # Get user input with retry logic
        while True:
            try:
                selection = input(f"\nEnter the indices for '{term}' (comma-separated, or 'skip' to skip): ").strip()
                
                if selection.lower() in ['skip', 's', '']:
                    print(f"Skipping '{term}'")
                    break
                
                # Parse the selection
                selected_indices = [int(i.strip()) for i in selection.split(",") if i.strip().isdigit()]
                
                if not selected_indices:
                    print("No valid indices entered. Try again or type 'skip'.")
                    continue
                
                # Validate indices
                invalid_indices = [i for i in selected_indices if i >= len(results)]
                if invalid_indices:
                    print(f"Invalid indices: {invalid_indices}. Max index is {len(results)-1}")
                    continue
                
                # Process the selection using the manual selection function
                selected = get_user_selection_manual(results, selected_indices)
                all_selected.extend(selected)
                print(f"✅ Selected for '{term}': {[s['label'] for s in selected]}")
                break
                
            except (ValueError, KeyboardInterrupt):
                print("Invalid input. Enter numbers separated by commas, or 'skip'.")
                continue
            except EOFError:
                print("\nInput interrupted. Skipping this term.")
                break
    
    return all_selected

# Get user selections - options will be displayed during selection
all_selected = get_user_indicator_selections(search_results)

print(f"✅ User selections complete: {len(all_selected)} indicators selected")


In [None]:
def transform_wide_to_long_format(wide_df: pd.DataFrame, country: str, selected_indicators: List[Dict]) -> pd.DataFrame:
    """
    Transform data from wide format (indicator_year columns) to long format.
    
    Format transformation:
    FROM: Country | Indicator1_2001 | Indicator1_2002 | Indicator2_2001 | Indicator2_2002 | ...
    TO:   Country | Year | Indicator1 | Indicator2 | ...
    
    Args:
        wide_df (pd.DataFrame): Wide format DataFrame with indicator_year columns
        country (str): Country code
        selected_indicators (List[Dict]): List of selected indicator metadata
        
    Returns:
        pd.DataFrame: Long format DataFrame with Country, Year, and indicator columns
    """
    # Check if DataFrame is empty
    if wide_df.empty:
        print(f"⚠️  No data to transform for {country}")
        return pd.DataFrame()
    
    # Debug: Check structure of selected indicators
    print(f"🔍 Debug - First indicator structure: {selected_indicators[0] if selected_indicators else 'No indicators'}")
    
    # Initialize list to store transformed rows
    transformed_rows = []
    
    # Extract all years from column names
    years = set()
    for col in wide_df.columns:
        # Column format: country_indicator_label_year
        parts = col.split('_')
        if len(parts) >= 2:
            try:
                # Last part should be the year
                year = int(parts[-1])
                years.add(year)
            except ValueError:
                continue
    
    # Sort years for consistent ordering
    years = sorted(years)
    print(f"📅 Found data for years: {min(years)}-{max(years)} ({len(years)} years)")
    
    # Create one row for each year
    for year in years:
        # Initialize row with country and year
        row = {'Country': country, 'Year': year}
        
        # Add each indicator's value for this year
        for indicator in selected_indicators:
            # Get indicator name - try different possible keys
            indicator_name = None
            if 'name' in indicator:
                indicator_name = indicator['name']
            elif 'label' in indicator:
                indicator_name = indicator['label']
            elif 'id' in indicator:
                indicator_name = indicator['id']
            else:
                indicator_name = f"Indicator_{selected_indicators.index(indicator)}"
            
            # Find the column that matches this indicator and year
            indicator_col = None
            for col in wide_df.columns:
                # Check if column contains indicator info and ends with year
                if str(year) in col and (
                    ('label' in indicator and indicator['label'] in col) or
                    ('id' in indicator and indicator['id'] in col) or
                    col.endswith(f"_{year}")
                ):
                    indicator_col = col
                    break
            
            # Extract value for this indicator and year
            if indicator_col and indicator_col in wide_df.columns:
                # Get the value (assuming single row in wide_df)
                value = wide_df[indicator_col].iloc[0] if len(wide_df) > 0 else None
                # Use indicator name as column name
                row[indicator_name] = value
            else:
                # No data available for this indicator/year
                row[indicator_name] = None
        
        # Add completed row to results
        transformed_rows.append(row)
    
    # Convert to DataFrame
    long_df = pd.DataFrame(transformed_rows)
    
    # Log transformation results
    print(f"✅ Transformed {country}: {len(years)} years × {len(selected_indicators)} indicators")
    print(f"   Final shape: {long_df.shape}")
    print(f"   Columns: {list(long_df.columns)}")
    
    return long_df

print("✅ Data transformation function defined")


In [None]:
def fetch_selected_indicators(indicator_list: List[Dict], countries: List[str], start_year: int, end_year: int) -> pd.DataFrame:
    """Fetch selected World Bank indicators for specified countries and date range"""
    all_data = pd.DataFrame()
    
    for indicator in indicator_list:
        try:
            print(f"Fetching {indicator['name'][:50]}...")
            
            # Fetch data one country at a time without time parameter (API works better this way)
            country_data = []
            for country in countries:
                try:
                    # Fetch all available data (no time parameter)
                    data = wb.data.DataFrame(
                        series=indicator['id'], 
                        economy=country
                    )
                    
                    if data is not None and not data.empty:
                        # Filter to desired year range manually
                        year_cols = [col for col in data.columns 
                                   if col.startswith('YR') and 
                                   start_year <= int(col[2:]) <= end_year]
                        
                        if year_cols:
                            filtered_data = data[year_cols]
                            
                            # Rename columns to include country and indicator
                            new_columns = {}
                            for col in filtered_data.columns:
                                year = col[2:]  # Remove 'YR' prefix
                                new_columns[col] = f"{country}_{indicator['label']}_{year}"
                            
                            filtered_data = filtered_data.rename(columns=new_columns)
                            country_data.append(filtered_data)
                            print(f"  ✅ {country}: {len(year_cols)} years of data")
                        else:
                            print(f"  ⚠️  {country}: No data for {start_year}-{end_year}")
                
                except Exception as e:
                    print(f"  ❌ Failed for {country}: {e}")
            
            # Combine data for this indicator
            if country_data:
                indicator_data = pd.concat(country_data, axis=1)
                
                # Concatenate with existing data
                if all_data.empty:
                    all_data = indicator_data
                else:
                    all_data = pd.concat([all_data, indicator_data], axis=1)
        
        except Exception as e:
            print(f"Failed to fetch {indicator['id']}: {e}")
    
    return all_data

# Fetch data for each country and transform to long format
country_datasets = {}

print(f"🚀 Processing {len(countries)} countries with {len(all_selected)} indicators")
print(f"📅 Date range: {start_year}-{end_year}")
print("="*70)

for country in countries:
    print(f"\n🌍 Processing {country}...")
    
    # Fetch data in wide format
    wide_data = fetch_selected_indicators(all_selected, [country], start_year, end_year)
    
    # Transform to long format
    if not wide_data.empty:
        long_data = transform_wide_to_long_format(wide_data, country, all_selected)
        country_datasets[country] = long_data
        
        # Show sample of transformed data
        if not long_data.empty:
            print(f"📊 Sample data for {country}:")
            print(long_data.head(3))
    else:
        print(f"⚠️  No data found for {country}")
        country_datasets[country] = pd.DataFrame()

# Summary
successful_countries = [country for country, df in country_datasets.items() if not df.empty]
print(f"\n✅ Processing complete!")
print(f"📊 Successful countries: {len(successful_countries)} / {len(countries)}")
print(f"🌍 Countries with data: {successful_countries}")


In [None]:
# Export each country's data to separate CSV files
import os

# Use the clean output directory
output_dir = "output"
print(f"📁 Using clean output directory: {output_dir}")

# Export each country's long-format data
for country, df in country_datasets.items():
    if df.empty:
        print(f"⚠️  Skipping {country} - no data")
        continue
    
    # Create filename
    filename = f"{country}_worldbank_data_long_format.csv"
    filepath = os.path.join(output_dir, filename)
    
    try:
        # Export to CSV
        df.to_csv(filepath, index=False)
        print(f"✅ Exported {country} data to {filepath}")
        print(f"   Shape: {df.shape}")
        print(f"   Columns: {list(df.columns)}")
        
    except Exception as e:
        print(f"❌ Failed to export {country}: {e}")

print(f"\n🎉 Export complete! Data is in long format:")
print(f"   Column 1: Country")
print(f"   Column 2: Year") 
print(f"   Column 3+: Individual indicators")


In [None]:
print("🎯 WORKFLOW SUMMARY")
print("="*70)
print("✅ 1. Loaded configuration from YAML")
print("✅ 2. Searched World Bank indicators")
print("✅ 3. User selected specific indicators (with options displayed)")
print("✅ 4. Fetched data for each country")
print("✅ 5. Transformed data from wide to long format")
print("✅ 6. Exported separate CSV files for each country")
print()
print("📊 FINAL DATA FORMAT:")
print("   - Each country has its own CSV file")
print("   - Column 1: Country code")
print("   - Column 2: Year")
print("   - Column 3+: Individual indicator values")
print()
print("🎉 World Bank data extraction complete!")

# Display sample of final format if data exists
if country_datasets:
    sample_country = list(country_datasets.keys())[0]
    sample_df = country_datasets[sample_country]
    if not sample_df.empty:
        print(f"\n📋 SAMPLE OUTPUT FORMAT ({sample_country}):")
        print(sample_df.head())
