In [None]:
# ETL: Extracci√≥n de datos de pa√≠ses m√°s poblados desde Wikipedia
# Autor: ETL Pipeline
# Fecha: 2025-10-20

# ============================================================================
# INSTALACI√ìN DE DEPENDENCIAS
# ============================================================================
!pip install selenium webdriver-manager pandas openpyxl -q

In [None]:
# ============================================================================
# IMPORTS
# ============================================================================
import time
import logging
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import re

In [None]:
 ============================================================================
# CONFIGURACI√ìN DE LOGGING
# ============================================================================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('ETL_Pipeline')

In [None]:
# ============================================================================
# CLASE ETL PRINCIPAL
# ============================================================================
class WikipediaCountriesETL:
    """
    ETL Pipeline para extraer, transformar y cargar datos de pa√≠ses desde Wikipedia.

    Implementa el patr√≥n ETL con las siguientes caracter√≠sticas:
    - Extracci√≥n mediante Selenium
    - Transformaci√≥n con Pandas
    - Validaci√≥n de datos
    - Manejo robusto de errores
    - Logging comprehensivo
    """

    def __init__(self, url: str):
        """
        Inicializa el pipeline ETL.

        Args:
            url: URL de Wikipedia a extraer
        """
        self.url = url
        self.driver = None
        self.raw_data = None
        self.transformed_data = None
        self.metadata = {
            'extraction_time': None,
            'total_records': 0,
            'valid_records': 0,
            'invalid_records': 0,
            'transformation_time': None
        }

    def _setup_driver(self) -> webdriver.Chrome:
        """
        Configura el driver de Selenium con opciones optimizadas y seguras.

        Returns:
            WebDriver configurado
        """
        logger.info("Configurando Selenium WebDriver...")

        chrome_options = Options()
        # Seguridad y performance
        chrome_options.add_argument('--headless')  # Modo sin interfaz gr√°fica
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--window-size=1920,1080')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')

        # Seguridad adicional
        chrome_options.add_argument('--disable-extensions')
        chrome_options.add_argument('--disable-plugins')
        chrome_options.add_argument('--disable-images')  # M√°s r√°pido y seguro
        chrome_options.add_argument('--disable-javascript')  # Solo necesitamos HTML est√°tico
        chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])

        # User agent leg√≠timo
        chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service, options=chrome_options)

        # Timeout de seguridad
        driver.set_page_load_timeout(30)

        logger.info("WebDriver configurado exitosamente")
        return driver

    # ========================================================================
    # EXTRACT
    # ========================================================================
    def extract(self) -> List[Dict]:
        """
        Extrae datos de la tabla de Wikipedia usando Selenium.

        Returns:
            Lista de diccionarios con los datos extra√≠dos
        """
        logger.info(f"Iniciando extracci√≥n desde: {self.url}")

        # Validaci√≥n de URL por seguridad
        if not self.url.startswith('https://'):
            raise ValueError("Solo se permiten URLs HTTPS por seguridad")

        allowed_domains = ['wikipedia.org', 'en.wikipedia.org']
        from urllib.parse import urlparse
        domain = urlparse(self.url).netloc
        if not any(allowed in domain for allowed in allowed_domains):
            raise ValueError(f"Dominio no permitido: {domain}. Solo Wikipedia es permitida.")

        start_time = time.time()

        try:
            self.driver = self._setup_driver()

            # Rate limiting: respetar los servidores
            time.sleep(2)

            self.driver.get(self.url)

            # Esperar a que la tabla est√© presente
            logger.info("Esperando carga de tabla...")
            wait = WebDriverWait(self.driver, 15)
            table = wait.until(
                EC.presence_of_element_located((By.CLASS_NAME, "wikitable"))
            )

            logger.info("Tabla encontrada, extrayendo datos...")

            # Extraer headers
            headers_elements = table.find_elements(By.TAG_NAME, "th")
            headers = [header.text.strip() for header in headers_elements[:7]]
            logger.info(f"Headers encontrados: {headers}")

            # Extraer filas
            rows = table.find_elements(By.TAG_NAME, "tr")[1:]  # Saltar header
            raw_data = []

            for idx, row in enumerate(rows[:50], 1):  # Primeros 50 pa√≠ses
                try:
                    cells = row.find_elements(By.TAG_NAME, "td")
                    if len(cells) >= 6:
                        row_data = {
                            'rank': self._safe_extract(cells[0]),
                            'country': self._safe_extract(cells[1]),
                            'population': self._safe_extract(cells[2]),
                            'percentage': self._safe_extract(cells[3]),
                            'date': self._safe_extract(cells[4]),
                            'source': self._safe_extract(cells[5])
                        }
                        raw_data.append(row_data)

                        if idx % 10 == 0:
                            logger.info(f"Extra√≠dos {idx} registros...")

                except Exception as e:
                    logger.warning(f"Error en fila {idx}: {str(e)}")
                    continue

            self.raw_data = raw_data
            self.metadata['extraction_time'] = time.time() - start_time
            self.metadata['total_records'] = len(raw_data)

            logger.info(f"Extracci√≥n completada: {len(raw_data)} registros en {self.metadata['extraction_time']:.2f}s")
            return raw_data

        except TimeoutException:
            logger.error("Timeout esperando la carga de la p√°gina")
            raise
        except Exception as e:
            logger.error(f"Error durante extracci√≥n: {str(e)}")
            raise
        finally:
            if self.driver:
                self.driver.quit()
                logger.info("WebDriver cerrado")

    def _safe_extract(self, element) -> str:
        """
        Extrae texto de forma segura manejando excepciones.

        Args:
            element: Elemento web a extraer

        Returns:
            Texto del elemento o cadena vac√≠a
        """
        try:
            return element.text.strip()
        except:
            return ""

    # ========================================================================
    # TRANSFORM
    # ========================================================================
    def transform(self) -> pd.DataFrame:
        """
        Transforma los datos extra√≠dos aplicando limpieza y enriquecimiento.

        Returns:
            DataFrame transformado
        """
        logger.info("Iniciando transformaci√≥n de datos...")
        start_time = time.time()

        if not self.raw_data:
            raise ValueError("No hay datos para transformar. Ejecute extract() primero.")

        # Crear DataFrame
        df = pd.DataFrame(self.raw_data)
        initial_count = len(df)
        logger.info(f"DataFrame creado con {initial_count} registros")

        # 1. Limpieza de datos
        df = self._clean_data(df)

        # 2. Conversi√≥n de tipos
        df = self._convert_types(df)

        # 3. Validaci√≥n de datos
        df = self._validate_data(df)

        # 4. Enriquecimiento
        df = self._enrich_data(df)

        # 5. Crear columnas calculadas
        df = self._calculate_metrics(df)

        self.transformed_data = df
        self.metadata['transformation_time'] = time.time() - start_time
        self.metadata['valid_records'] = len(df)
        self.metadata['invalid_records'] = initial_count - len(df)

        logger.info(f"Transformaci√≥n completada: {len(df)} registros v√°lidos en {self.metadata['transformation_time']:.2f}s")
        return df

    def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Limpia los datos removiendo caracteres especiales y normalizando."""
        logger.info("Limpiando datos...")

        # Limpiar poblaci√≥n: remover comas, corchetes, notas
        df['population_clean'] = df['population'].apply(self._clean_population)

        # Limpiar porcentaje
        df['percentage_clean'] = df['percentage'].str.replace('%', '').str.strip()

        # Limpiar pa√≠s (remover notas al pie)
        df['country_clean'] = df['country'].apply(lambda x: re.split(r'\[|\(', x)[0].strip())

        return df

    def _clean_population(self, pop_str: str) -> str:
        """Limpia el string de poblaci√≥n."""
        if not isinstance(pop_str, str):
            return ""
        # Remover todo excepto d√≠gitos y comas
        cleaned = re.sub(r'[^\d,]', '', pop_str)
        # Remover comas
        cleaned = cleaned.replace(',', '')
        return cleaned

    def _convert_types(self, df: pd.DataFrame) -> pd.DataFrame:
        """Convierte columnas a tipos apropiados."""
        logger.info("Convirtiendo tipos de datos...")

        # Convertir poblaci√≥n a num√©rico
        df['population_numeric'] = pd.to_numeric(df['population_clean'], errors='coerce')

        # Convertir porcentaje a num√©rico
        df['percentage_numeric'] = pd.to_numeric(df['percentage_clean'], errors='coerce')

        # Convertir rank a num√©rico
        df['rank_numeric'] = pd.to_numeric(df['rank'], errors='coerce')

        return df

    def _validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Valida y filtra datos inv√°lidos."""
        logger.info("Validando datos...")

        # Filtrar registros sin poblaci√≥n v√°lida
        df_valid = df[df['population_numeric'].notna()].copy()
        removed = len(df) - len(df_valid)

        if removed > 0:
            logger.warning(f"Removidos {removed} registros con poblaci√≥n inv√°lida")

        # Filtrar poblaci√≥n > 0
        df_valid = df_valid[df_valid['population_numeric'] > 0]

        return df_valid

    def _enrich_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Enriquece los datos con informaci√≥n adicional."""
        logger.info("Enriqueciendo datos...")

        # Categorizar pa√≠ses por poblaci√≥n
        df['population_category'] = pd.cut(
            df['population_numeric'],
            bins=[0, 10_000_000, 50_000_000, 100_000_000, 500_000_000, float('inf')],
            labels=['Peque√±o', 'Mediano', 'Grande', 'Muy Grande', 'Mega Poblado']
        )

        # A√±adir timestamp de procesamiento
        df['processed_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        # A√±o de extracci√≥n
        df['extraction_year'] = datetime.now().year

        return df

    def _calculate_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
        """Calcula m√©tricas adicionales."""
        logger.info("Calculando m√©tricas...")

        # Poblaci√≥n en millones
        df['population_millions'] = (df['population_numeric'] / 1_000_000).round(2)

        # Porcentaje acumulado
        df['cumulative_percentage'] = df['percentage_numeric'].cumsum()

        # Ranking por densidad relativa
        df['relative_rank'] = df['population_numeric'].rank(ascending=False, method='dense')

        # Diferencia con el pa√≠s anterior
        df['population_diff'] = df['population_numeric'].diff().abs()
        df['population_diff_millions'] = (df['population_diff'] / 1_000_000).round(2)

        return df

    # ========================================================================
    # LOAD & ANALYSIS
    # ========================================================================
    def load(self, output_format: str = 'csv', filename: str = 'countries_data'):
        """
        Carga los datos transformados en el formato especificado.

        Args:
            output_format: Formato de salida ('csv', 'excel', 'json')
            filename: Nombre del archivo de salida
        """
        if self.transformed_data is None:
            raise ValueError("No hay datos transformados. Ejecute transform() primero.")

        logger.info(f"Cargando datos en formato {output_format}...")

        if output_format == 'csv':
            filepath = f"{filename}.csv"
            self.transformed_data.to_csv(filepath, index=False, encoding='utf-8-sig')
        elif output_format == 'excel':
            filepath = f"{filename}.xlsx"
            self.transformed_data.to_excel(filepath, index=False, engine='openpyxl')
        elif output_format == 'json':
            filepath = f"{filename}.json"
            self.transformed_data.to_json(filepath, orient='records', indent=2, force_ascii=False)
        else:
            raise ValueError(f"Formato no soportado: {output_format}")

        logger.info(f"Datos guardados exitosamente en: {filepath}")
        return filepath

    def get_summary_statistics(self) -> pd.DataFrame:
        """Retorna estad√≠sticas descriptivas del dataset."""
        if self.transformed_data is None:
            raise ValueError("No hay datos transformados disponibles.")

        stats = self.transformed_data[['population_numeric', 'percentage_numeric', 'population_millions']].describe()
        return stats

    def get_metadata(self) -> Dict:
        """Retorna metadata del proceso ETL."""
        return self.metadata

    def display_top_countries(self, n: int = 10):
        """Muestra los top N pa√≠ses con formato mejorado."""
        if self.transformed_data is None:
            raise ValueError("No hay datos transformados disponibles.")

        top_countries = self.transformed_data.head(n)[
            ['rank_numeric', 'country_clean', 'population_millions',
             'percentage_numeric', 'population_category']
        ].copy()

        top_countries.columns = ['Ranking', 'Pa√≠s', 'Poblaci√≥n (M)', '% Mundial', 'Categor√≠a']

        return top_countries

In [None]:
# ============================================================================
# EJECUCI√ìN DEL ETL
# ============================================================================
def main():
    """Funci√≥n principal para ejecutar el pipeline ETL."""

    print("=" * 80)
    print("ETL PIPELINE - PA√çSES M√ÅS POBLADOS DEL MUNDO")
    print("=" * 80)
    print()

    # URL de Wikipedia
    url = "https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population"

    # Crear instancia del ETL
    etl = WikipediaCountriesETL(url)

    try:
        # EXTRACT
        print("\nüì• FASE 1: EXTRACCI√ìN")
        print("-" * 80)
        raw_data = etl.extract()
        print(f"‚úÖ Extra√≠dos {len(raw_data)} registros")

        # TRANSFORM
        print("\nüîÑ FASE 2: TRANSFORMACI√ìN")
        print("-" * 80)
        transformed_df = etl.transform()
        print(f"‚úÖ Transformados {len(transformed_df)} registros v√°lidos")

        # LOAD
        print("\nüíæ FASE 3: CARGA")
        print("-" * 80)
        csv_file = etl.load('csv', 'countries_population')
        excel_file = etl.load('excel', 'countries_population')
        print(f"‚úÖ Datos guardados en: {csv_file} y {excel_file}")

        # AN√ÅLISIS Y RESULTADOS
        print("\nüìä RESULTADOS DEL ETL")
        print("=" * 80)

        # Metadata
        metadata = etl.get_metadata()
        print(f"\n‚è±Ô∏è  Tiempo de extracci√≥n: {metadata['extraction_time']:.2f}s")
        print(f"‚è±Ô∏è  Tiempo de transformaci√≥n: {metadata['transformation_time']:.2f}s")
        print(f"üìà Total de registros: {metadata['total_records']}")
        print(f"‚úÖ Registros v√°lidos: {metadata['valid_records']}")
        print(f"‚ùå Registros inv√°lidos: {metadata['invalid_records']}")

        # Top 10 pa√≠ses
        print("\nüåç TOP 10 PA√çSES M√ÅS POBLADOS")
        print("-" * 80)
        top_10 = etl.display_top_countries(10)
        print(top_10.to_string(index=False))

        # Estad√≠sticas descriptivas
        print("\nüìà ESTAD√çSTICAS DESCRIPTIVAS")
        print("-" * 80)
        stats = etl.get_summary_statistics()
        print(stats)

        # An√°lisis adicional
        print("\nüîç AN√ÅLISIS ADICIONAL")
        print("-" * 80)
        df = etl.transformed_data

        print(f"‚Ä¢ Poblaci√≥n total (top 50): {df['population_numeric'].sum():,.0f} habitantes")
        print(f"‚Ä¢ Poblaci√≥n promedio: {df['population_numeric'].mean():,.0f} habitantes")
        print(f"‚Ä¢ Pa√≠s m√°s poblado: {df.iloc[0]['country_clean']} ({df.iloc[0]['population_millions']:.2f}M)")
        print(f"‚Ä¢ Distribuci√≥n por categor√≠a:")
        print(df['population_category'].value_counts().to_string())

        # Porcentaje acumulado
        top_10_percentage = df.head(10)['percentage_numeric'].sum()
        print(f"\n‚Ä¢ Top 10 pa√≠ses representan: {top_10_percentage:.2f}% de la poblaci√≥n mundial")

        print("\n" + "=" * 80)
        print("‚úÖ ETL COMPLETADO EXITOSAMENTE")
        print("=" * 80)

        return etl

    except Exception as e:
        logger.error(f"Error en el pipeline ETL: {str(e)}")
        raise

In [None]:
# ============================================================================
# EJECUTAR
# ============================================================================
if __name__ == "__main__":
    etl_pipeline = main()