# Handling Excel Files in Python

**Author:** Luis Paulo Vinatea Barberena  
**Date:** 2025-05-21  

When you need to work with Excel files in Python, you can use the `openpyxl` library. This library allows you to read and write Excel files in the `.xlsx` format.
It is a powerful tool for data manipulation and analysis, especially when dealing with large datasets.

# Import standard libraries


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import glob
import os
import openpyxl
from IPython.display import display, HTML
from matplotlib.ticker import FuncFormatter
import matplotlib.colors as mcolors
import hashlib
import math
import random

# Import project utilities
import sys

sys.path.append("../../")
from src.utils.data_processing import *  # noqa: F403
from src.utils.visualization import *  # noqa: F403

# Visualization settings
plt.style.use("seaborn-v0_8-whitegrid")
%matplotlib inline
sns.set_palette("viridis")
plt.rcParams["figure.figsize"] = (12, 8)
pd.set_option("display.max_columns", None)

# For reproducibility
np.random.seed(42)

# Configuration
XLSX_DATA_PATH = "../../data/raw/*.xlsx"
OUTPUT_DIR = "../../data/processed/sites/df1/"

# Global visualization settings
used_colors = set()
site_color_mapping = {}
available_css4_colors = list(mcolors.CSS4_COLORS.keys())

# =============================================================================
# DATA LOADING FUNCTIONS
# =============================================================================


In [None]:

def load_excel_files(data_path: str) -> dict:
    """
    Load all Excel files from the specified path and create DataFrames.
    
    Args:
        data_path (str): Path pattern for Excel files
        
    Returns:
        dict: Dictionary containing loaded DataFrames
    """
    dfs = {}
    df_count = 1
    
    try:
        for file in glob.glob(data_path):
            print(f"Loading data from {file}")
            
            # Load workbook with openpyxl for inspection
            wb = openpyxl.load_workbook(file, data_only=True, read_only=True)
            file_basename = os.path.basename(file)
            
            print(f"\nWorkbook: {file_basename}")
            print(f"Contains {len(wb.sheetnames)} sheets: {', '.join(wb.sheetnames)}")
            
            # Process each sheet
            for sheet_name in wb.sheetnames:
                df_name = f"df{df_count}"
                df = _load_sheet_with_header_detection(file, sheet_name, wb[sheet_name])
                
                if df is not None:
                    dfs[df_name] = df
                    print(f"  Loaded {df_name} with shape {df.shape}")
                    df_count += 1
            
            wb.close()
            
        print(f"\nSuccessfully loaded {df_count - 1} dataframes.")
        return dfs
        
    except Exception as e:
        print(f"Error loading data: {e}")
        raise


def _load_sheet_with_header_detection(file_path: str, sheet_name: str, sheet) -> pd.DataFrame:
    """
    Load a sheet with automatic header detection.
    
    Args:
        file_path (str): Path to the Excel file
        sheet_name (str): Name of the sheet
        sheet: Openpyxl sheet object
        
    Returns:
        pd.DataFrame: Loaded DataFrame or None if sheet is empty
    """
    try:
        # Check if sheet is empty
        if not sheet.max_row or not sheet.max_column:
            print(f"  Warning: Sheet '{sheet_name}' appears to be empty or corrupted")
            return None
        
        print(f"\n  Sheet: '{sheet_name}'")
        print(f"  Dimensions: {sheet.max_row} rows x {sheet.max_column} columns")
        
        # Check for merged cells
        try:
            merged_cells = list(sheet.merged_cells.ranges)
            if merged_cells:
                print(f"  Contains {len(merged_cells)} merged cell ranges")
        except AttributeError:
            print("  Note: Cannot check merged cells in read-only mode")
        
        # Load initial DataFrame without headers
        df = pd.read_excel(
            file_path,
            sheet_name=sheet_name,
            header=None,
            na_values=["NA", "N/A", ""],
            keep_default_na=True,
        )
        
        # Detect header row
        header_row = _detect_header_row(df)
        
        if header_row is not None:
            # Reload with detected header
            df = pd.read_excel(
                file_path,
                sheet_name=sheet_name,
                header=header_row,
                na_values=["NA", "N/A", ""],
                keep_default_na=True,
            )
            print(f"  Detected header at row {header_row + 1}")
        
        return df
        
    except Exception as e:
        print(f"  Error loading sheet '{sheet_name}': {e}")
        return None


def _detect_header_row(df: pd.DataFrame) -> int:
    """
    Detect the most likely header row in a DataFrame.
    
    Args:
        df (pd.DataFrame): DataFrame to analyze
        
    Returns:
        int: Index of the header row, or None if not found
    """
    for i in range(min(10, len(df))):
        str_count = sum(1 for x in df.iloc[i] if isinstance(x, str))
        if str_count > 0.5 * df.shape[1]:  # More than half are strings
            return i
    return None

In [None]:
# =============================================================================
# DATA CLEANING FUNCTIONS
# =============================================================================

def clean_dataframes(dfs: dict) -> dict:
    """
    Apply comprehensive cleaning to all DataFrames.
    
    Args:
        dfs (dict): Dictionary of DataFrames
        
    Returns:
        dict: Dictionary of cleaned DataFrames
    """
    print("Starting data cleaning process...")
    
    # Apply cleaning steps in order
    dfs = standardize_headers(dfs)
    dfs = remove_empty_rows_and_cols(dfs)
    dfs = handle_nan_values(dfs)
    dfs = title_case_columns(dfs)
    
    print("Data cleaning completed!")
    return dfs


def standardize_headers(dfs: dict) -> dict:
    """
    Standardize column headers across all DataFrames.
    
    Args:
        dfs (dict): Dictionary of DataFrames
        
    Returns:
        dict: Dictionary with standardized headers
    """
    print(f"{'=' * 50}")
    print("STANDARDIZING DATAFRAME HEADERS")
    print(f"{'=' * 50}\n")
    
    for df_name, df_obj in dfs.items():
        print(f"Processing {df_name}...")
        
        # Handle unnamed columns
        df_obj = _fix_unnamed_columns(df_obj)
        
        # Standardize date columns
        df_obj = _standardize_date_columns(df_obj)
        
        dfs[df_name] = df_obj
    
    print("\nHeader standardization complete!")
    return dfs


def _fix_unnamed_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Fix unnamed columns by finding header values in data rows."""
    unnamed_cols = [col for col in df.columns if "Unnamed" in str(col)]
    
    if len(unnamed_cols) > 0:
        print(f"  Found {len(unnamed_cols)} unnamed columns")
        
        for i in range(min(5, len(df))):
            row = df.iloc[i]
            str_values = [v for v in row if isinstance(v, str) and pd.notna(v)]
            
            if len(str_values) >= len(unnamed_cols) * 0.7:
                new_names = {}
                original_cols = df.columns.tolist()
                
                for j, col in enumerate(original_cols):
                    if ("Unnamed" in str(col) and j < len(row) and 
                        pd.notna(row.iloc[j]) and isinstance(row.iloc[j], str)):
                        new_names[col] = str(row.iloc[j]).strip()
                
                if new_names:
                    print(f"  Found potential header values in row {i + 1}")
                    print(f"  Renaming {len(new_names)} columns")
                    
                    df = df.rename(columns=new_names)
                    df = df.drop(index=i)
                    print(f"  New columns: {list(new_names.values())}")
                    break
    
    return df


def _standardize_date_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize date column names to Spanish format."""
    date_mappings = {
        "January": "Enero", "February": "Febrero", "March": "Marzo",
        "April": "Abril", "May": "Mayo", "June": "Junio",
        "July": "Julio", "August": "Agosto", "September": "Septiembre",
        "October": "Octubre", "November": "Noviembre", "December": "Diciembre"
    }
    
    date_cols = [col for col in df.columns if str(col) in date_mappings]
    if date_cols:
        print(f"  Standardizing {len(date_cols)} date-related columns to Spanish format")
        df = df.rename(columns=date_mappings)
    
    return df


def remove_empty_rows_and_cols(dfs: dict) -> dict:
    """Remove completely empty rows and columns from all DataFrames."""
    for df_name, df_obj in dfs.items():
        original_shape = df_obj.shape
        df_obj = df_obj.dropna(how='all').dropna(axis=1, how='all')
        dfs[df_name] = df_obj
        print(f"Cleaned {df_name}: {original_shape} -> {df_obj.shape}")
    return dfs


def handle_nan_values(dfs: dict) -> dict:
    """Analyze and selectively handle NaN values in DataFrames."""
    for df_name, df_obj in dfs.items():
        print(f"\n{'-' * 50}")
        print(f"NaN analysis for {df_name}:")
        
        # Calculate NaN statistics
        total_cells = df_obj.shape[0] * df_obj.shape[1]
        nan_count = df_obj.isna().sum().sum()
        nan_percentage = (nan_count / total_cells) if total_cells > 0 else 0
        
        print(f"- Total NaN values: {nan_count} ({nan_percentage:.1%} of all cells)")
        
        # Handle high-NaN columns
        df_obj = _remove_high_nan_columns(df_obj)
        
        # Handle critical row removal
        df_obj = _remove_critical_nan_rows(df_obj)
        
        dfs[df_name] = df_obj
        print(f"- Final shape after analysis: {df_obj.shape}")
    
    return dfs


def _remove_high_nan_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Remove columns with >80% NaN values."""
    column_nan_pct = df.isna().mean().sort_values(ascending=False)
    high_nan_cols = column_nan_pct[column_nan_pct > 0.8].index.tolist()
    
    if high_nan_cols:
        print(f"- Columns with >80% NaNs: {', '.join(high_nan_cols)}")
        df = df.drop(columns=high_nan_cols)
    
    return df


def _remove_critical_nan_rows(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows where critical columns are all NaN."""
    critical_cols = [col for col in df.columns if "Unnamed" not in str(col)]
    
    if not critical_cols and len(df.columns) > 2:
        critical_cols = [df.columns[1], df.columns[2]]
    
    if critical_cols:
        print(f"- Critical columns: {', '.join(map(str, critical_cols))}")
        rows_to_drop = df[df[critical_cols].isna().all(axis=1)].index
        print(f"- Found {len(rows_to_drop)} rows where all critical columns are NaN")
        df = df.drop(index=rows_to_drop)
    
    return df


def title_case_columns(dfs: dict) -> dict:
    """Apply title case to all column names."""
    for df_name, df_obj in dfs.items():
        df_obj.columns = df_obj.columns.str.title()
        dfs[df_name] = df_obj
    return dfs

In [None]:
# =============================================================================
# DATA ANALYSIS FUNCTIONS
# =============================================================================

def analyze_dataframes(dfs: dict) -> None:
    """Print comprehensive analysis of all DataFrames."""
    print(f"Found {len(dfs)} dataframes in the environment\n")
    
    for df_name, df_obj in dfs.items():
        _print_single_dataframe_info(df_name, df_obj)


def _print_single_dataframe_info(df_name: str, df_obj: pd.DataFrame) -> None:
    """Print detailed information about a single DataFrame."""
    print(f"{'=' * 50}")
    print(f"DataFrame: {df_name} with shape {df_obj.shape}")
    print(f"{'=' * 50}")
    
    # Basic information
    total_cells = df_obj.shape[0] * df_obj.shape[1]
    missing_values = df_obj.isna().sum().sum()
    missing_pct = (missing_values / total_cells) if total_cells > 0 else 0
    
    print("\n📊 Basic Information:")
    print(f"  - Rows: {df_obj.shape[0]}")
    print(f"  - Columns: {df_obj.shape[1]}")
    print(f"  - Missing values: {missing_values} ({missing_pct:.1%} of all cells)")
    
    # Data types
    print("\n📋 Data Types:")
    for dtype, count in df_obj.dtypes.value_counts().items():
        print(f"  - {dtype}: {count} columns")
    
    # Unnamed columns analysis
    unnamed_cols = [col for col in df_obj.columns if "Unnamed" in str(col)]
    if unnamed_cols:
        print(f"\n⚠️ Found {len(unnamed_cols)} unnamed columns")
    
    # Content analysis for specific patterns
    _analyze_content_patterns(df_obj)
    
    # Preview
    print("\n🔍 Data Preview:")
    print(df_obj.head(3))
    print("\n\n")


def _analyze_content_patterns(df: pd.DataFrame) -> None:
    """Analyze content patterns in DataFrame."""
    if "Unnamed: 1" in df.columns and "Unnamed: 2" in df.columns:
        unique_sites = df["Unnamed: 1"].dropna().unique()
        unique_indicators = df["Unnamed: 2"].dropna().unique()
        
        print("\n📈 Content Analysis:")
        print(f"  - Potential sites: {len(unique_sites)}")
        print(f"  - Potential indicators: {len(unique_indicators)}")
        
        if len(unique_sites) > 0:
            print("\n🏢 Sample sites:", ", ".join(map(str, unique_sites[:5])))
        if len(unique_indicators) > 0:
            print("\n📏 Sample indicators:", ", ".join(map(str, unique_indicators[:5])))


def preview_dataframes_html(dfs: dict) -> None:
    """Display DataFrames as scrollable HTML tables."""
    for df_name, df_obj in dfs.items():
        print(f"{'=' * 50}")
        print(f"Previewing {df_name}...")
        
        html_table = df_obj.head(10).to_html(index=False, max_rows=10)
        styled_html = f"""
        <div style="overflow-x: auto; max-height: 500px; overflow-y: auto;">
            {html_table}
        </div>
        """
        display(HTML(styled_html))
        print(f"{'=' * 50}\n")


def display_datatypes(dfs: dict) -> None:
    """Display data types for all DataFrames in HTML tables."""
    for df_name, df_obj in dfs.items():
        print(f"{'=' * 50}")
        print(f"DataFrame: {df_name} with shape {df_obj.shape}")
        print(f"{'=' * 50}")
        
        dtype_df = pd.DataFrame(df_obj.dtypes).reset_index()
        dtype_df.columns = ["Column", "Data Type"]
        display(HTML(dtype_df.to_html(index=False)))
        print(f"{'=' * 50}\n")

In [None]:
# =============================================================================
# SPECIALIZED DATA PROCESSING FUNCTIONS
# =============================================================================

def process_main_dataframe(dfs: dict, df_key: str = "df1") -> dict:
    """
    Process the main DataFrame with specialized business logic.
    
    Args:
        dfs (dict): Dictionary of DataFrames
        df_key (str): Key of the main DataFrame to process
        
    Returns:
        dict: Dictionary of processed section DataFrames
    """
    if df_key not in dfs:
        raise ValueError(f"DataFrame '{df_key}' not found in dfs")
    
    df = dfs[df_key].copy()
    
    # Remove total rows and reset index
    df = df[~df["Stios"].isin(["TOTAL PY"])].reset_index(drop=True)
    
    # Split into sections
    site_dfs = _split_dataframe_by_sections(df)
    
    # Clean sections
    site_dfs = _clean_section_data(site_dfs)
    
    # Apply specialized processing
    site_dfs = _apply_specialized_processing(site_dfs)
    
    # Add calculated columns
    site_dfs = _add_calculated_columns(site_dfs)
    
    return site_dfs


def _split_dataframe_by_sections(df: pd.DataFrame) -> dict:
    """Split DataFrame into sections using TOTAL rows as markers."""
    site_dfs = {}
    current_section = 0
    current_rows = []
    
    for index, row in df.iterrows():
        if "TOTAL" in str(row["Stios"]):
            # Save previous section
            if current_rows:
                site_dfs[f"Section_{current_section}"] = pd.DataFrame(current_rows).reset_index(drop=True)
                current_section += 1
            current_rows = [row]
        else:
            current_rows.append(row)
    
    # Save last section
    if current_rows:
        site_dfs[f"Section_{current_section}"] = pd.DataFrame(current_rows).reset_index(drop=True)
    
    # Handle Section_5 split
    if "Section_5" in site_dfs and len(site_dfs["Section_5"]) > 2:
        last_2_rows = site_dfs["Section_5"].tail(2).copy()
        site_dfs["Section_5"] = site_dfs["Section_5"].iloc[:-2].reset_index(drop=True)
        site_dfs["Section_6"] = last_2_rows.reset_index(drop=True)
    
    return site_dfs


def _clean_section_data(site_dfs: dict) -> dict:
    """Clean section data by removing header rows and filling NaN values."""
    for i in range(1, 6):
        section_name = f"Section_{i}"
        if section_name in site_dfs and len(site_dfs[section_name]) > 1:
            # Remove first two rows (headers)
            site_dfs[section_name] = site_dfs[section_name].iloc[2:].reset_index(drop=True)
            
            # Fill NaN values in Indicador column
            if "Indicador" in site_dfs[section_name].columns:
                df = site_dfs[section_name]
                valid_values = df["Indicador"].dropna()
                valid_values = valid_values[~valid_values.str.lower().eq("indicador")]
                
                if not valid_values.empty:
                    first_valid = valid_values.iloc[0]
                    df["Indicador"] = df["Indicador"].fillna(first_valid)
    
    # Remove last row from Section_5
    if "Section_5" in site_dfs and len(site_dfs["Section_5"]) > 0:
        site_dfs["Section_5"] = site_dfs["Section_5"].iloc[:-1].reset_index(drop=True)
    
    return site_dfs


def _apply_specialized_processing(site_dfs: dict) -> dict:
    """Apply specialized processing to specific sections."""
    # Process Section_2: Replace zeros with column means
    if "Section_2" in site_dfs:
        site_dfs["Section_2"] = _replace_zeros_with_means(site_dfs["Section_2"])
    
    # Process Section_6: Convert gallons to liters
    if "Section_6" in site_dfs:
        site_dfs["Section_6"] = _convert_gallons_to_liters(site_dfs["Section_6"])
    
    return site_dfs


def _replace_zeros_with_means(df: pd.DataFrame) -> pd.DataFrame:
    """Replace zero values with column means for numeric columns."""
    month_columns = ["Enero", "Febrero", "Marzo", "Abril"]
    
    for col in month_columns:
        if col in df.columns:
            # Convert to numeric
            df[col] = pd.to_numeric(df[col], errors='coerce')
            
            # Calculate mean of non-zero values
            non_zero_values = df[col][(df[col] != 0) & (df[col].notna())]
            
            if len(non_zero_values) > 0:
                col_mean = non_zero_values.mean()
                
                # Replace zeros and NaNs with mean
                df[col] = df[col].replace(0, col_mean).fillna(col_mean)
                print(f"Replaced zeros and NaNs in {col} with mean: {col_mean:.2f}")
    
    return df


def _convert_gallons_to_liters(df: pd.DataFrame) -> pd.DataFrame:
    """Convert gallons to liters in the first row if applicable."""
    if len(df) > 0 and "Galones" in str(df.iloc[0]["Indicador"]):
        month_cols = ["Enero", "Febrero", "Marzo", "Abril"]
        for col in month_cols:
            if col in df.columns:
                df.loc[0, col] = df.iloc[0][col] * 3.78541
        
        df.loc[0, "Indicador"] = "Consumo de Diesel (Litros)"
        print("Converted gallons to liters and renamed indicator")
    
    return df


def _add_calculated_columns(site_dfs: dict) -> dict:
    """Add calculated Total column to all sections."""
    numeric_columns = ["Enero", "Febrero", "Marzo", "Abril"]
    
    for section_name, df in site_dfs.items():
        if "Total" not in df.columns:
            # Convert to numeric and calculate totals
            for col in numeric_columns:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors='coerce')
            
            df["Total"] = df[numeric_columns].sum(axis=1, skipna=True)
            site_dfs[section_name] = df
            print(f"Added Total column to {section_name}")
    
    return site_dfs


def save_sections_to_csv(site_dfs: dict, output_dir: str) -> None:
    """Save each section to a separate CSV file."""
    os.makedirs(output_dir, exist_ok=True)
    
    for section_name, df in site_dfs.items():
        filename = section_name.replace("Section_", "section_").lower()
        filepath = os.path.join(output_dir, f"{filename}.csv")
        df.to_csv(filepath, index=False)
        print(f"Saved {filename} to {filepath}")

In [None]:
# =============================================================================
# VISUALIZATION FUNCTIONS
# =============================================================================

def get_site_color(site_name: str) -> str:
    """Get a consistent unique color for a specific site."""
    global used_colors, site_color_mapping
    
    if site_name in site_color_mapping:
        return site_color_mapping[site_name]
    
    unused_colors = [
        color_name for color_name in available_css4_colors
        if mcolors.CSS4_COLORS[color_name] not in used_colors
    ]
    
    if unused_colors:
        random.seed(hash(site_name))
        selected_color_name = random.choice(unused_colors)
        selected_color_hex = mcolors.CSS4_COLORS[selected_color_name]
        used_colors.add(selected_color_hex)
        site_color_mapping[site_name] = selected_color_hex
        return selected_color_hex
    
    # Fallback for when all colors are used
    hash_value = int(hashlib.md5(site_name.encode()).hexdigest()[:8], 16)
    color_index = hash_value % len(available_css4_colors)
    fallback_color_name = available_css4_colors[color_index]
    fallback_color_hex = mcolors.CSS4_COLORS[fallback_color_name]
    site_color_mapping[site_name] = fallback_color_hex
    return fallback_color_hex


def create_environmental_indicators_chart(site_dfs: dict) -> None:
    """Create comprehensive environmental indicators visualization."""
    plt.style.use("seaborn-v0_8-whitegrid")
    
    # Calculate grid dimensions
    num_sections = len(site_dfs)
    cols = 2
    rows = math.ceil(num_sections / cols)
    
    # Create figure
    fig, axes = plt.subplots(rows, cols, figsize=(20, 8 * rows))
    fig.patch.set_facecolor("white")
    fig.suptitle("Indicadores Ambientales por Sitio", fontsize=20, fontweight="bold", 
                y=0.98, color="#2C3E50")
    
    # Prepare axes
    if num_sections == 1:
        axes = [axes]
    elif rows == 1:
        axes = axes
    else:
        axes = axes.flatten()
    
    # Track all sites for global legend
    all_sites = set()
    
    # Create charts for each section
    for idx, (section_name, df) in enumerate(site_dfs.items()):
        ax = axes[idx]
        _create_single_section_chart(ax, df, all_sites)
    
    # Hide unused subplots
    for idx in range(num_sections, len(axes)):
        axes[idx].set_visible(False)
    
    # Add global legend
    _add_global_legend(fig, all_sites)
    
    # Final layout adjustments
    plt.tight_layout(pad=3.0, rect=[0, 0, 0.92, 0.96])
    fig.patch.set_edgecolor("#BDC3C7")
    fig.patch.set_linewidth(2)
    plt.show()


def _create_single_section_chart(ax, df: pd.DataFrame, all_sites: set) -> None:
    """Create chart for a single section."""
    month_cols = ["Enero", "Febrero", "Marzo", "Abril"]
    plot_data = df[df[month_cols].notna().any(axis=1)].copy()
    
    if len(plot_data) > 0:
        plot_data = plot_data.set_index("Stios")[month_cols]
        site_names = plot_data.index.tolist()
        colors = [get_site_color(site) for site in site_names]
        all_sites.update(site_names)
        
        # Create stacked bar chart
        plot_data.T.plot(kind="bar", stacked=True, ax=ax, color=colors, alpha=0.85,
                        width=0.7, edgecolor="white", linewidth=1.5, legend=False)
        
        # Get title from first Indicador value
        title = _get_section_title(df)
        ax.set_title(title, fontweight="bold", fontsize=14, pad=15, 
                    color="#2C3E50", fontfamily="serif")
        
        # Style the chart
        _style_chart(ax)
    else:
        _create_no_data_chart(ax, "No hay datos disponibles")


def _get_section_title(df: pd.DataFrame) -> str:
    """Get appropriate title for section chart."""
    if "Indicador" in df.columns:
        indicador_values = df["Indicador"].dropna()
        if len(indicador_values) > 0:
            return indicador_values.iloc[0]
    return "Unknown"


def _style_chart(ax) -> None:
    """Apply consistent styling to chart."""
    # Remove labels and enhance styling
    ax.set_xlabel("")
    ax.set_ylabel("")
    
    # Style ticks
    ax.tick_params(axis="x", rotation=0, labelsize=9, colors="#2C3E50", length=0)
    ax.tick_params(axis="y", labelsize=8, colors="#34495E")
    
    # Style grid and spines
    ax.grid(True, alpha=0.4, linestyle="--", linewidth=0.8, color="#BDC3C7")
    ax.set_axisbelow(True)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    ax.spines["left"].set_color("#BDC3C7")
    ax.spines["bottom"].set_color("#BDC3C7")
    ax.spines["left"].set_linewidth(1.5)
    ax.spines["bottom"].set_linewidth(1.5)
    
    # Format y-axis
    def format_thousands(x, pos):
        if x >= 1000000:
            return f"{x / 1000000:.1f}M"
        elif x >= 1000:
            return f"{x / 1000:.0f}K"
        else:
            return f"{x:.0f}"
    
    ax.yaxis.set_major_formatter(FuncFormatter(format_thousands))
    ax.set_facecolor("#FAFAFA")
    
    # Add value labels
    for container in ax.containers:
        ax.bar_label(container, 
                    labels=[format_thousands(v, None) if v > 0 else "" 
                           for v in container.datavalues],
                    label_type="center", fontsize=6, fontweight="bold", color="black")


def _create_no_data_chart(ax, message: str) -> None:
    """Create chart for sections with no data."""
    ax.text(0.5, 0.5, f"📊 {message}", transform=ax.transAxes, ha="center", va="center",
           fontsize=12, fontweight="bold", color="#7F8C8D",
           bbox=dict(boxstyle="round,pad=0.5", facecolor="#ECF0F1", 
                    edgecolor="#BDC3C7", alpha=0.8))
    ax.set_xticks([])
    ax.set_yticks([])
    for spine in ax.spines.values():
        spine.set_visible(False)


def _add_global_legend(fig, all_sites: set) -> None:
    """Add global legend for all sites."""
    if all_sites:
        from matplotlib.patches import Patch
        
        sorted_sites = sorted(all_sites)
        legend_colors = [site_color_mapping[site] for site in sorted_sites]
        legend_patches = [Patch(facecolor=color, alpha=0.85, edgecolor="white") 
                         for color in legend_colors]
        
        global_legend = fig.legend(legend_patches, sorted_sites, bbox_to_anchor=(0.95, 0.5),
                                 loc="center left", fontsize=10, frameon=True, fancybox=True,
                                 shadow=True, framealpha=0.9, facecolor="white", 
                                 edgecolor="#BDC3C7", title="Sitios", title_fontsize=12)
        global_legend.get_frame().set_linewidth(1)

In [None]:
# =============================================================================
# MAIN EXECUTION PIPELINE
# =============================================================================

def main():
    """Main execution pipeline for Excel file processing and analysis."""
    print("Starting Excel file processing pipeline...")
    
    try:
        # Step 1: Load data
        print("\n" + "="*60)
        print("STEP 1: LOADING DATA")
        print("="*60)
        dfs = load_excel_files(XLSX_DATA_PATH)
        
        # Step 2: Clean data
        print("\n" + "="*60)
        print("STEP 2: CLEANING DATA")
        print("="*60)
        dfs = clean_dataframes(dfs)
        
        # Step 3: Analyze data
        print("\n" + "="*60)
        print("STEP 3: ANALYZING DATA")
        print("="*60)
        analyze_dataframes(dfs)
        
        # Step 4: Preview data
        print("\n" + "="*60)
        print("STEP 4: PREVIEWING DATA")
        print("="*60)
        preview_dataframes_html(dfs)
        display_datatypes(dfs)
        
        # Step 5: Process main DataFrame
        print("\n" + "="*60)
        print("STEP 5: PROCESSING MAIN DATAFRAME")
        print("="*60)
        site_dfs = process_main_dataframe(dfs)
        
        # Step 6: Save processed data
        print("\n" + "="*60)
        print("STEP 6: SAVING PROCESSED DATA")
        print("="*60)
        save_sections_to_csv(site_dfs, OUTPUT_DIR)
        
        # Step 7: Create visualizations
        print("\n" + "="*60)
        print("STEP 7: CREATING VISUALIZATIONS")
        print("="*60)
        create_environmental_indicators_chart(site_dfs)
        
        print("\n" + "="*60)
        print("PIPELINE COMPLETED SUCCESSFULLY!")
        print("="*60)
        
        return dfs, site_dfs
        
    except Exception as e:
        print(f"\nERROR in pipeline: {e}")
        raise


# Execute the main pipeline
dfs, site_dfs = main()