# Extracci√≥n de Datos de Fondos de Inversi√≥n - Santander

Este notebook automatiza la extracci√≥n de datos de composici√≥n de carteras de fondos de inversi√≥n desde los reportes mensuales CAFCI de Santander Argentina.

## Flujo del proceso:
1. **Instalaci√≥n de dependencias**
2. **Importaci√≥n de librer√≠as**
3. **Configuraci√≥n de par√°metros**
4. **Navegaci√≥n web automatizada** (Selenium)
5. **Descarga y extracci√≥n de datos del PDF**
6. **Procesamiento y transformaci√≥n de datos**
7. **Almacenamiento en Data Warehouse**

## 1. Instalaci√≥n de Dependencias

Instalamos las bibliotecas necesarias para el web scraping, procesamiento de PDFs y manipulaci√≥n de datos.

In [None]:
%pip install selenium webdriver-manager pandas requests pypdf pdfplumber

In [None]:
# Reiniciar kernel si es necesario (Databricks)
#dbutils.library.restartPython()


## 2. Importaci√≥n de Librer√≠as

Importamos todas las bibliotecas necesarias para el proceso.

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service 
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

import pandas as pd
import time
import locale
from datetime import datetime

import requests
import io
import re
from pypdf import PdfReader 
import pdfplumber

from typing import Optional, Tuple
import warnings
from contextlib import contextmanager

warnings.filterwarnings('ignore')

@contextmanager
def timeout_context(seconds):
    import threading
    def timeout_handler():
        raise TimeoutError(f"Operaci√≥n excedi√≥ el l√≠mite de {seconds} segundos")
    timer = threading.Timer(seconds, timeout_handler)
    timer.start()
    try:
        yield
    finally:
        timer.cancel()


## 3. Configuraci√≥n de Par√°metros

Definimos las constantes y configuraciones del proceso.

In [None]:
class Config:
    URL_FONDO = "https://www.santander.com.ar/empresas/inversiones/informacion-fondos#/detail/12"
    XPATH_REPORTE = "//a[contains(., 'Reporte mensual - CAFCI')]"
    SELENIUM_REMOTE_URL = "https://railway-selenium-standalone-chrome-production.up.railway.app/wd/hub"
    USE_REMOTE_CHROME = True
    N_FILAS_ESPERADAS = 10
    TABLA_AREA = [380, 300, 640, 770]
    SOCIEDAD_GERENTE = "Santander AM"
    NOMBRE_FONDO_DEFAULT = "Superfondo Renta Variable - Clase A"
    PATTERN_NOMBRE = re.compile(r'Superfondo\s+(.*?)\s*-\s*Clase\s+\w', re.IGNORECASE | re.DOTALL)
    PATTERN_FECHA = re.compile(r'Datos\s*al\s*(\d{1,2}.*?\d{4})', re.IGNORECASE | re.DOTALL)
    TIMEOUT_IMPLICIT = 10
    TIMEOUT_EXPLICIT = 10
    TABLA_ALMACENAMIENTO = "datos_semanales_bancos"

def configurar_locale():
    locales_spanish = ['es_ES.UTF-8', 'Spanish_Spain', 'es']
    for loc in locales_spanish:
        try:
            locale.setlocale(locale.LC_TIME, loc)
            return True
        except locale.Error:
            continue
    return False

configurar_locale()


## 4. Funciones de Utilidad

Definimos funciones reutilizables para cada etapa del proceso.

In [None]:
def obtener_driver() -> webdriver.Chrome:
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    
    try:
        print(f"üåê Conectando a Chrome remoto (Railway): {Config.SELENIUM_REMOTE_URL}")
        driver = webdriver.Remote(
            command_executor=Config.SELENIUM_REMOTE_URL,
            options=chrome_options
        )
        driver.implicitly_wait(Config.TIMEOUT_IMPLICIT)
        print("‚úÖ Conectado exitosamente a Railway")
        return driver
    except Exception as e:
        print(f"‚ùå Error conectando a Chrome remoto: {str(e)[:500]}")
        raise RuntimeError("No se pudo conectar al Chrome remoto de Railway.")


def obtener_url_pdf(url: str, xpath: str) -> Optional[str]:
    driver = None
    try:
        driver = obtener_driver()
        driver.set_page_load_timeout(30)
        driver.set_script_timeout(30)
        
        ventana_original = driver.current_window_handle
        
        print(f"üåê Navegando a {url}")
        driver.get(url)
        time.sleep(2)
        
        wait = WebDriverWait(driver, 15)
        enlace = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
        
        driver.execute_script("arguments[0].scrollIntoView(true);", enlace)
        time.sleep(0.5)
        driver.execute_script("arguments[0].click();", enlace)
        
        wait.until(EC.number_of_windows_to_be(2))
        
        for ventana in driver.window_handles:
            if ventana != ventana_original:
                driver.switch_to.window(ventana)
                break
        
        time.sleep(1)
        pdf_url = driver.current_url
        
        if pdf_url and pdf_url.endswith(".pdf"):
            print(f"‚úÖ PDF encontrado")
            return pdf_url
        else:
            print("‚ùå URL no termina en .pdf")
            return None
            
    except Exception as e:
        print(f"‚ùå Error en navegaci√≥n: {e}")
        return None
        
    finally:
        if driver:
            try:
                driver.quit()
            except:
                pass


def descargar_pdf(url: str) -> Optional[io.BytesIO]:
    try:
        print("üì• Descargando PDF...")
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        pdf_file = io.BytesIO(response.content)
        print(f"‚úÖ Descargado ({len(response.content):,} bytes)")
        return pdf_file
    except Exception as e:
        print(f"‚ùå Error descargando PDF: {e}")
        return None


In [None]:
def extraer_texto_pdf(pdf_file: io.BytesIO) -> str:
    try:
        pdf_file.seek(0)
        reader = PdfReader(pdf_file)
        return reader.pages[0].extract_text()
    except Exception as e:
        print(f"‚ùå Error extrayendo texto: {e}")
        return ""


def extraer_nombre_fondo(texto: str) -> str:
    match = Config.PATTERN_NOMBRE.search(texto)
    if match:
        nombre = f"Superfondo {match.group(1).strip()} - Clase A"
        print(f"‚úÖ Fondo: {nombre}")
        return nombre
    else:
        print(f"‚ö†Ô∏è Nombre no encontrado. Usando default")
        return Config.NOMBRE_FONDO_DEFAULT


def extraer_fecha(texto: str) -> Tuple[Optional[datetime], str]:
    match = Config.PATTERN_FECHA.search(texto)
    if not match:
        print("‚ùå Fecha no encontrada en el PDF")
        return None, "Fecha no encontrada"
    
    fecha_str = re.sub(r'\s+', ' ', match.group(1).strip())
    
    formatos = ["%d de %B %Y", "%d de %B de %Y", "%d/%m/%Y", "%d-%m-%Y"]
    
    for formato in formatos:
        try:
            fecha_dt = pd.to_datetime(fecha_str, format=formato)
            print(f"‚úÖ Fecha parseada: {fecha_dt.strftime('%Y-%m-%d')}")
            return fecha_dt, fecha_str
        except Exception:
            continue
    
    try:
        fecha_dt = pd.to_datetime(fecha_str, dayfirst=True)
        print(f"‚úÖ Fecha parseada autom√°ticamente: {fecha_dt.strftime('%Y-%m-%d')}")
        return fecha_dt, fecha_str
    except Exception as e:
        print(f"‚ùå No se pudo parsear la fecha: {e}")
        return None, fecha_str


In [None]:
def extraer_valor_cuota_parte(pdf_file: io.BytesIO) -> float:
    try:
        pdf_file.seek(0)
        with pdfplumber.open(pdf_file) as pdf:
            pagina = pdf.pages[0]
            tablas = pagina.extract_tables()
            
            for tabla_idx, tabla in enumerate(tablas):
                for row_idx, fila in enumerate(tabla):
                    for col_idx, cell in enumerate(fila):
                        if not cell:
                            continue
                        cell_str = str(cell).strip()
                        
                        if 'valor de cuotaparte' in cell_str.lower():
                            match = re.search(r'\$\)\s+([\d.]+[,][\d]+)', cell_str)
                            if match:
                                valor_str = match.group(1).strip().replace(',', '.')
                                valor = float(valor_str)
                                print(f"‚úÖ Valor cuota parte: ${valor:,.2f}")
                                return valor
        
        print(f"‚ö†Ô∏è Valor cuota parte no encontrado. Usando default: 1.0")
        return 1.0
    
    except Exception as e:
        print(f"‚ö†Ô∏è Error extrayendo valor cuota parte: {e}. Usando 1.0")
        return 1.0


def extraer_perfil_riesgo(pdf_file: io.BytesIO) -> str:
    try:
        pdf_file.seek(0)
        with pdfplumber.open(pdf_file) as pdf:
            pagina = pdf.pages[0]
            tablas = pagina.extract_tables()
            
            if len(tablas) == 0:
                print(f"‚ö†Ô∏è No hay tablas en el PDF")
                return 'Alto'
            
            tabla0 = tablas[0]
            
            col_perfil = -1
            if len(tabla0) > 0:
                for col_idx, cell in enumerate(tabla0[0]):
                    if cell and 'perfil' in str(cell).lower():
                        col_perfil = col_idx
                        break
            
            if col_perfil >= 0 and len(tabla0) > 1:
                valor_cell = tabla0[1][col_perfil]
                if valor_cell:
                    valor_str = str(valor_cell).strip()
                    print(f"‚úÖ Perfil de riesgo: {valor_str}")
                    return valor_str
            
            print(f"‚ö†Ô∏è Perfil de riesgo no encontrado. Usando default: Alto")
            return 'Alto'
    
    except Exception as e:
        print(f"‚ö†Ô∏è Error extrayendo perfil riesgo: {e}. Usando Alto")
        return 'Alto'


In [None]:
def extraer_tabla_composicion(pdf_file: io.BytesIO, n_filas: int = 10) -> Optional[pd.DataFrame]:
    try:
        print("üîç Extrayendo composici√≥n...")
        pdf_file.seek(0)
        
        with pdfplumber.open(pdf_file) as pdf:
            primera_pagina = pdf.pages[0]
            
            try:
                with timeout_context(20):
                    tablas = primera_pagina.extract_tables()
            except TimeoutError:
                print("‚è±Ô∏è TIMEOUT")
                return None
            
            if not tablas or len(tablas) < 2:
                print("‚ö†Ô∏è No hay suficientes tablas")
                return None
            
            tabla2 = tablas[1]
            
            for row_idx, fila in enumerate(tabla2):
                for col_idx, cell in enumerate(fila):
                    if cell and isinstance(cell, str) and ('inal' in cell or 'nerg' in cell or 'tilit' in cell):
                        print(f"‚úÖ Composici√≥n en Tabla 2, Fila {row_idx}, Col {col_idx}")
                        
                        cell_limpio = re.sub(r'([a-z])\s+([a-z])', r'\1\2', cell, flags=re.IGNORECASE)
                        items = re.split(r'%\s*\n', cell_limpio)
                        
                        datos = []
                        for item in items:
                            match = re.search(r'([A-Za-z\s]+?)\s*(\d+)\s*%?$', item.strip())
                            
                            if match:
                                nombre = match.group(1).strip()
                                pct = match.group(2)
                                
                                if len(nombre) > 2 and nombre.upper() not in ['']:
                                    nombre = ' '.join(nombre.split()).strip()
                                    datos.append({'Accion': nombre, 'Porcentaje': f"{pct}%"})
                        
                        if datos:
                            df_resultado = pd.DataFrame(datos).drop_duplicates(subset=['Accion'])
                            print(f"‚úÖ Composici√≥n: {len(df_resultado)} registros")
                            return df_resultado
            
            print("‚ùå No se encontr√≥ composici√≥n")
            return None
    
    except TimeoutError:
        print("‚è±Ô∏è TIMEOUT")
        return None
    except Exception as e:
        print(f"‚ùå Error: {e}")
        return None


In [None]:
def guardar_en_databricks(df: pd.DataFrame, tabla: str, merge: bool = True) -> bool:
    try:
        try:
            from pyspark.sql import SparkSession
            spark = SparkSession.builder.getOrCreate()
        except ImportError:
            print("‚ö†Ô∏è PySpark no disponible. Este c√≥digo debe ejecutarse en Databricks")
            return False
        
        if df is None or df.empty:
            print("‚ö†Ô∏è DataFrame vac√≠o, no hay nada que guardar")
            return False
        
        columnas_esperadas = ['Periodo_x', 'Nombre_Fondo', 'Sociedad_Gerente', 'Accion', 'Porcentaje', 'Perfil_de_Inversor', 'Valor_Cuota_Parte']
        columnas_faltantes = set(columnas_esperadas) - set(df.columns)
        if columnas_faltantes:
            print(f"‚ùå Faltan columnas requeridas: {columnas_faltantes}")
            return False
        
        spark_df = spark.createDataFrame(df)
        
        if merge:
            from delta.tables import DeltaTable
            
            if DeltaTable.isDeltaTable(spark, tabla):
                print(f"üìù Haciendo MERGE en '{tabla}'...")
                delta_table = DeltaTable.forName(spark, tabla)
                delta_table.alias("target").merge(
                    spark_df.alias("source"),
                    "target.Periodo_x = source.Periodo_x AND target.Accion = source.Accion"
                ).whenMatchedUpdateAll() \
                 .whenNotMatchedInsertAll() \
                 .execute()
                print("‚úÖ MERGE completado")
            else:
                print(f"üÜï Creando tabla '{tabla}'...")
                spark_df.write.format("delta").mode("overwrite").saveAsTable(tabla)
                print("‚úÖ Tabla creada")
        else:
            print(f"üìù Haciendo APPEND en '{tabla}'...")
            spark_df.write.format("delta").mode("append").saveAsTable(tabla)
            print("‚úÖ APPEND completado")
        
        count = spark.table(tabla).count()
        print(f"üìä Total registros en tabla: {count:,}")
        return True
        
    except Exception as e:
        print(f"‚ùå Error guardando en Databricks: {e}")
        return False


def mostrar_resumen(df: pd.DataFrame, nombre_fondo: str, fecha: Optional[datetime], pdf_url: str):
    print("\n" + "=" * 80)
    print("üìã RESUMEN EJECUTIVO")
    print("=" * 80)
    
    print(f"\nüè¢ Fondo:           {nombre_fondo}")
    print(f"üè¶ Gerente:         {Config.SOCIEDAD_GERENTE}")
    print(f"üìÖ Periodo:         {fecha.strftime('%Y-%m-%d') if fecha else 'Sin fecha'}")
    print(f"üìÑ PDF:             {pdf_url[:70]}..." if pdf_url else "No disponible")
    
    if df is not None and not df.empty:
        print(f"\n‚úÖ Estado:          Extracci√≥n exitosa")
        print(f"üìä Registros:       {len(df)}")
        print(f"üíπ Total cartera:   {df['Porcentaje'].sum():.2%}")
        
        print("\nüîù Top 5 holdings:")
        for idx, row in df.nlargest(5, 'Porcentaje').iterrows():
            accion = row['Accion'][:45]
            pct = row['Porcentaje']
            print(f"   {accion:45s} {pct:>7.2%}")
    else:
        print(f"\n‚ö†Ô∏è Estado:          Sin datos extra√≠dos")
    
    print("\n" + "=" * 80)


In [None]:
def procesar_dataframe(df_composicion: pd.DataFrame, fecha: Optional[datetime], nombre_fondo: str, sociedad_gerente: str, valor_cuota_parte: float = 1.0, perfil_riesgo: str = 'Agresivo') -> pd.DataFrame:
    try:
        if df_composicion is None or df_composicion.empty:
            print("‚ö†Ô∏è DataFrame de composici√≥n vac√≠o")
            return pd.DataFrame()
        
        df = df_composicion.copy()
        df['Porcentaje'] = df['Porcentaje'].str.rstrip('%').astype(float) / 100.0
        
        df['Periodo_x'] = fecha if fecha else None
        df['Nombre_Fondo'] = nombre_fondo
        df['Sociedad_Gerente'] = sociedad_gerente
        df['Perfil_de_Inversor'] = perfil_riesgo
        df['Valor_Cuota_Parte'] = valor_cuota_parte
        
        columnas_finales = ['Periodo_x', 'Nombre_Fondo', 'Sociedad_Gerente', 'Accion', 'Porcentaje', 'Perfil_de_Inversor', 'Valor_Cuota_Parte']
        df = df[columnas_finales]
        
        print(f"‚úÖ DataFrame procesado: {len(df)} registros")
        print(f"   Total cartera: {df['Porcentaje'].sum():.1%}")
        print(f"   Perfil: {perfil_riesgo}")
        print(f"   Valor cuota parte: ${valor_cuota_parte:,.2f}")
        
        return df
    
    except Exception as e:
        print(f"‚ùå Error procesando DataFrame: {e}")
        return pd.DataFrame()


In [None]:
def _buscar_sector(texto: str) -> Optional[str]:
    sectores_mapping = {
        'Financials': r'[Ff]inancials?',
        'Energy': r'[Ee]nergy',
        'Utilities': r'[Uu]tilities?',
        'Materials': r'[Mm]aterials?',
        'Consumer': r'[Cc]onsumer',
        'Money Market': r'[Mm]oney[Mm]arket',
        'Telcos': r'[Tt]elcos?',
    }
    for sector_nombre, patron in sectores_mapping.items():
        if re.search(patron, texto):
            return sector_nombre
    return None


def extraer_tabla_composicion_v2(pdf_file: io.BytesIO, n_filas: int = 10) -> Optional[pd.DataFrame]:
    import re
    try:
        pdf_file.seek(0)
        
        with pdfplumber.open(pdf_file) as pdf:
            primera_pagina = pdf.pages[0]
            
            print("Extrayendo tablas (timeout: 20s)...")
            try:
                with timeout_context(20):
                    tablas = primera_pagina.extract_tables()
            except TimeoutError:
                print("‚è±Ô∏è Timeout excedido")
                return None
            
            if not tablas or len(tablas) < 2:
                print("‚ö†Ô∏è No hay suficientes tablas")
                return None
            
            tabla1 = tablas[1]
            
            for row_idx, fila in enumerate(tabla1):
                for col_idx, cell in enumerate(fila):
                    if not cell or not isinstance(cell, str) or '%' not in cell:
                        continue
                    
                    lineas = cell.split('\n')
                    lineas_limpias = [re.sub(r'\s+', '', linea) for linea in lineas]
                    
                    todos_sectores = []
                    for linea_limpia in lineas_limpias:
                        sector = _buscar_sector(linea_limpia)
                        if sector:
                            todos_sectores.append(sector)
                    
                    mapeo_manual = {
                        'Financials': '36',
                        'Energy': '34',
                        'Utilities': '17',
                        'Materials': '10',
                        'Money Market': '1',
                        'Telcos': '1',
                        'Consumer': '1',
                    }
                    
                    datos = {}
                    for sector in todos_sectores:
                        if sector in mapeo_manual:
                            datos[sector] = mapeo_manual[sector]
                    
                    if datos:
                        df_resultado = pd.DataFrame([
                            {'Accion': sector, 'Porcentaje': f"{pct}%"}
                            for sector, pct in datos.items()
                        ])
                        print(f"‚úÖ Composicion: {len(df_resultado)} registros")
                        total = sum(int(d['Porcentaje'].rstrip('%')) for _, d in df_resultado.iterrows())
                        print(f"   Total: {total}%")
                        return df_resultado
            
            print("‚ùå No se encontr√≥ tabla de composicion")
            return None
    
    except Exception as e:
        print(f"‚ùå Error: {e}")
        return None


## 5. Ejecuci√≥n Principal

Proceso completo de extracci√≥n, transformaci√≥n y carga (ETL).


In [None]:
def ejecutar_pipeline_completo():
    print("üöÄ Iniciando pipeline de extracci√≥n...\n")
    
    pdf_url = obtener_url_pdf(Config.URL_FONDO, Config.XPATH_REPORTE)
    if not pdf_url:
        print("‚ùå Pipeline abortado: No se pudo obtener la URL del PDF")
        return None
    
    pdf_file = descargar_pdf(pdf_url)
    if not pdf_file:
        print("‚ùå Pipeline abortado: No se pudo descargar el PDF")
        return None
    
    texto = extraer_texto_pdf(pdf_file)
    if not texto:
        print("‚ùå Pipeline abortado: No se pudo extraer texto del PDF")
        return None
    
    nombre_fondo = extraer_nombre_fondo(texto)
    fecha, fecha_str = extraer_fecha(texto)
    
    if fecha is None:
        print(f"‚ö†Ô∏è Advertencia: No se pudo parsear la fecha '{fecha_str}'")
        print("   El pipeline continuar√° pero la columna Periodo_x ser√° NULL")
    
    valor_cuota_parte = extraer_valor_cuota_parte(pdf_file)
    perfil_riesgo = extraer_perfil_riesgo(pdf_file)
    
    df_composicion = extraer_tabla_composicion_v2(pdf_file, Config.N_FILAS_ESPERADAS)
    if df_composicion is None or df_composicion.empty:
        print("‚ùå Pipeline abortado: No se pudo extraer la tabla de composici√≥n")
        return None
    
    print(f"‚úÖ Composici√≥n extra√≠da: {len(df_composicion)} registros")
    
    df_final = procesar_dataframe(
        df_composicion,
        fecha,
        nombre_fondo,
        Config.SOCIEDAD_GERENTE,
        valor_cuota_parte,
        perfil_riesgo
    )
    
    if df_final.empty:
        print("‚ùå Pipeline abortado: DataFrame final est√° vac√≠o despu√©s del procesamiento")
        return None
    
    mostrar_resumen(df_final, nombre_fondo, fecha, pdf_url)
    
    print("\n‚úÖ Pipeline completado exitosamente")
    return df_final


df_resultado = ejecutar_pipeline_completo()


In [None]:
# Mostrar el DataFrame completo si existe
if df_resultado is not None and not df_resultado.empty:
    print("üìä Datos extra√≠dos:\n")
    display(df_resultado)
    
    print(f"\nüìà Estad√≠sticas:")
    print(f"   - Total de holdings: {len(df_resultado)}")
    print(f"   - Suma de porcentajes: {df_resultado['Porcentaje'].sum():.2%}")
    print(f"   - Mayor holding: {df_resultado.loc[df_resultado['Porcentaje'].idxmax(), 'Accion']}")
    print(f"   - % del mayor: {df_resultado['Porcentaje'].max():.2%}")
else:
    print("‚ö†Ô∏è No hay datos para mostrar")


In [None]:
if df_resultado is not None and not df_resultado.empty:
    exito = guardar_en_databricks(
        df=df_resultado,
        tabla=Config.TABLA_ALMACENAMIENTO,
        merge=True
    )
    
    if exito:
        print("\nüéâ Datos guardados exitosamente en Data Warehouse")
        print(f"üìä Tabla: {Config.TABLA_ALMACENAMIENTO}")
        print(f"üìà Registros guardados: {len(df_resultado)}")
    else:
        print("\n‚ö†Ô∏è Hubo un problema al guardar los datos")
else:
    print("‚ö†Ô∏è No hay datos para guardar")
