In [None]:
import pandas as pd
import numpy as np
import unicodedata
import os

import os
import re

from pydantic import BaseModel


from ollama import Client, ChatResponse

from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, ValidationError, Field

from tqdm import tqdm

from enum import Enum
from typing import Optional, Literal
from pydantic import BaseModel, ValidationError, Field
from ollama import Client, ChatResponse
import logging

import logging



In [None]:

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [None]:
def obtener_razones_sociales(file_name: str, sheet_name: str = 'mini') -> list:
    file_path = os.path.join('data', file_name)
    df = pd.read_excel(file_path, sheet_name=sheet_name)

    # Normalizar nombres de columnas: sin tildes, espacios, mayúsculas o caracteres especiales
    def normalizar_columna(col):
        col = unicodedata.normalize('NFKD', col).encode('ascii', 'ignore').decode('utf-8')
        col = col.lower().replace(' ', '_').replace('-', '_')
        col = ''.join(c for c in col if c.isalnum() or c == '_')
        return col

    df.columns = [normalizar_columna(c) for c in df.columns]

    # Obtener valores únicos no nulos de la columna 'razon_social'
    if 'razon_social' not in df.columns:
        raise ValueError("La columna 'razon_social' no se encontró en el archivo.")

    razones_sociales = df['razon_social'].dropna().unique().tolist()
    return razones_sociales

In [None]:

def generar_path_archivo(razon: str, folder: str) -> str:
    """
    Sanitiza un nombre de archivo basado en una razón social y devuelve la ruta completa del archivo.
    
    Args:
        razon (str): Texto a usar como base para el nombre del archivo.
        folder (str): Carpeta donde se guardará el archivo.

    Returns:
        str: Ruta completa del archivo sanitizado.
    """
    # Reemplazar caracteres no permitidos
    sanitized = re.sub(r"[^\w\-\_\. ]", "_", razon)
    filename = f"{sanitized}.txt"
    path = os.path.join(folder, filename)
    return path


In [None]:

def consulta_tipo_empresa(
    razon_social: str,
    host: str = "http://localhost:11434",
    timeout: Optional[float] = None
) -> Optional[str]:
    """
    Consulta al modelo gemma3:latest si la empresa dada es pública, privada o mixta.

    Args:
        razon_social: Nombre (Razón Social) de la empresa a consultar.
        host: URL del servidor Ollama (por defecto localhost:11434).
        timeout: Tiempo máximo de espera en segundos; None = espera indefinida.

    Returns:
        El texto de la respuesta del modelo, o None si falla la petición.
    """
    client = Client(host=host, timeout=timeout)
    prompt = (
        "Dada esta información y la que se obtenga buscando en Internet, "
        f"necesito establecer si esta empresa '{razon_social}' "
        "es pública, privada o mixta según la definición en Colombia."
    )
    try:
        resp: ChatResponse = client.chat(
            model="gemma3:latest",
            messages=[{"role": "user", "content": prompt}]
        )
        return resp.message.content
    except Exception:
        return None

In [None]:

def guardar_respuestas_txt(
    razones_sociales: List[str],
    folder: str = "result",
    host: str = "http://localhost:11434",
    timeout: Optional[float] = None
) -> None:
    """
    Para cada razón social en la lista:
    - Se deduplica automáticamente.
    - Si el archivo ya existe, se omite la consulta.
    - Se consulta al modelo gemma3:latest si no existe.
    - Se guarda la respuesta en un archivo .txt dentro de `folder`.
    - Se muestra una barra de progreso con tqdm.
    """
    # Crear la carpeta destino si no existe
    os.makedirs(folder, exist_ok=True)

    seen = set()

    # Iterar con barra de progreso
    for razon in tqdm(razones_sociales, desc="Procesando empresas", unit="empresa"):
        if razon in seen:
            continue
        seen.add(razon)

        # # Sanitizar nombre de archivo
        # sanitized = re.sub(r"[^\w\-\_\. ]", "_", razon)
        # filename = f"{sanitized}.txt"
        # path = os.path.join(folder, filename)

        # Generar el path del archivo
        path = generar_path_archivo(razon, folder)

        # Si el archivo ya existe, no recalcular ni sobrescribir
        if os.path.exists(path):
            continue

        # Realizar consulta solo si no existe el archivo
        resultado = consulta_tipo_empresa(razon, host=host, timeout=timeout)

        # Guardar en archivo .txt (UTF-8)
        with open(path, "w", encoding="utf-8") as f:
            f.write(resultado or "")



In [None]:
# Función para obtener las razones sociales de un archivo Excel
def crear_excel_desde_textos(razones_sociales: list, carpeta_resultados: str, output_excel: str = 'data/result.xlsx') -> pd.DataFrame:
    """
    Lee archivos de texto basados en una lista de razones sociales y genera un DataFrame,
    que además guarda en un archivo Excel.
    
    Args:
        razones_sociales (list): Lista de razones sociales.
        carpeta_resultados (str): Carpeta donde están ubicados los archivos de texto.
        output_excel (str): Ruta donde se guardará el archivo Excel de salida.

    Returns:
        pd.DataFrame: DataFrame generado.
    """
    data = []

    for razon in razones_sociales:
        # Generar el path del archivo
        path = generar_path_archivo(razon, carpeta_resultados)
        
        # Verificar que el archivo exista
        if not os.path.exists(path):
            print(f"Advertencia: No se encontró el archivo para '{razon}' en '{path}'. Se omitirá.")
            continue

        # Leer el contenido del archivo
        with open(path, 'r', encoding='utf-8') as file:
            contenido = file.read()
        
        # Agregar el contenido a la lista
        data.append({'razon_social': razon, 'informacion_razon_social': contenido})
    
    # Crear un DataFrame a partir de la lista
    df = pd.DataFrame(data)

    # Crear la carpeta de salida si no existe
    os.makedirs(os.path.dirname(output_excel), exist_ok=True)

    # Guardar el DataFrame en un archivo Excel
    df.to_excel(output_excel, index=False, engine='openpyxl')

    return df


In [None]:
def investigar_razones_sociales():
    """
    Función principal para razones sociales.
    """

    try:
        # Obtener las razones sociales de un archivo Excel
        razones_sociales = obtener_razones_sociales('mini.xlsx', sheet_name='mini')
        
        # Guardar las respuestas en archivos .txt
        # guardar_respuestas_txt(razones_sociales, folder='result', timeout=60*10)
        
        # Crear un Excel a partir de los archivos de texto
        crear_excel_desde_textos(razones_sociales, carpeta_resultados='result')

        return True
    except Exception as e:
        print(f"Error: {e}")
        return False


In [None]:


class TipoEmpresa(str, Enum):
    PUBLICA = "publica"
    PRIVADA = "privada"
    MIXTA = "mixta"
    INDEFINIDO = "indefinido"

class CompanyResponse(BaseModel):
    """Modelo adaptado a la salida de deepseek-r1"""
    classe: TipoEmpresa = Field(..., description="Clasificación de la empresa")
    # Mantenemos empresa para capturar el output aunque no lo usemos
    empresa: Optional[str] = Field(None, description="Nombre de la empresa")
    
    class Config:
        extra = 'ignore'  # Ignorar campos adicionales

class CompanyClassifier:
    def __init__(
        self,
        model: str = "deepseek-r1:latest",
        host: str = "http://localhost:11434",
        timeout: float = 30.0
    ):
        self.client = Client(host=host, timeout=timeout)
        self.model = model
        self.json_schema = CompanyResponse.model_json_schema()
        
        self.system_template = (
            "Eres un clasificador de empresas. Responde EXCLUSIVAMENTE en JSON válido usando este formato:\n"
            '{"classe": "publica|privada|mixta|indefinido", "empresa": "nombre"}\n\n'
            "Instrucciones:\n"
            "1. Analiza la información proporcionada\n"
            "2. Usa solo estos valores posibles para 'classe':\n"
            "   - publica: Control estatal mayoritario\n"
            "   - privada: Capital completamente privado\n"
            "   - mixta: Participación público-privada\n"
            "   - indefinido: Información insuficiente\n"
            "3. El campo 'empresa' debe ser el nombre recibido\n"
            "4. ¡NUNCA uses otros campos o formatos!"
        )

    def _build_prompt(self, razon_social: str, contexto: str) -> str:
        return (
            f"Clasificar: {razon_social}\n\n"
            f"Contexto:\n{contexto}\n\n"
            "Ejemplo de respuesta válida:\n"
            '{"classe": "publica", "empresa": "EMPRESA EJEMPLO SA"}'
        )

    def classify(
        self,
        razon_social: str,
        contexto: str,
        max_retries: int = 5
    ) -> Optional[TipoEmpresa]:
        messages = [
            {"role": "system", "content": self.system_template},
            {"role": "user", "content": self._build_prompt(razon_social, contexto)}
        ]
        
        for attempt in range(max_retries + 1):
            try:
                response: ChatResponse = self.client.chat(
                    model=self.model,
                    messages=messages,
                    format="json",
                    options={
                        "temperature": 0.1,
                        "num_ctx": 4096,
                        "seed": 123  # Para mayor consistencia
                    }
                )
                
                # Validación mejorada
                result = CompanyResponse.model_validate_json(response.message.content)
                logger.info(f"Clasificación exitosa: {result.classe}")
                return result.classe
                
            except ValidationError as e:
                logger.error(f"Intento {attempt + 1}: Error validación - {e.errors()[0]['msg']}")
                if attempt == max_retries:
                    break
                    
            except Exception as e:
                logger.error(f"Intento {attempt + 1}: Error general - {str(e)}")
                if attempt == max_retries:
                    break

        logger.warning(f"Clasificación fallida después de {max_retries + 1} intentos")
        return None



In [None]:
# Crear carpeta temporal si no existe
os.makedirs('tmp', exist_ok=True)

file_path = os.path.join('data', "result.xlsx")
sheet_name = "Sheet1"
df = pd.read_excel(file_path, sheet_name=sheet_name)

# Tomar una muestra de 1000 filas
# df = df.sample(n=5, random_state=1)


records = df.to_dict(orient="records")

# Inicializar el clasificador
classifier = CompanyClassifier(timeout=60*10)

data = []

for i, record in enumerate(tqdm(records, desc="Procesando empresas", unit="empresa")):
    tmp_file = os.path.join('tmp', f'resultado_{i}.xlsx')
    
    # Si ya existe el archivo temporal, saltar la iteración
    if os.path.exists(tmp_file):
        continue
        
    razon_social = record.get("razon_social")
    contexto = record.get("informacion_razon_social")

    resultado = classifier.classify(razon_social, contexto)
    
    # Guardar en archivo temporal
    pd.DataFrame([{
        "razon_social": razon_social,
        "tipo_razon_social": resultado.lower() if resultado else None,
        "informacion_razon_social": contexto
    }]).to_excel(tmp_file, index=False)

# Consolidar todos los archivos temporales
all_files = [os.path.join('tmp', f) for f in os.listdir('tmp') if f.endswith('.xlsx')]
df_consolidado = pd.concat([pd.read_excel(f) for f in all_files], ignore_index=True)

# Guardar resultado final
df_consolidado.to_excel("deepseek.xlsx", index=False)

# Opcional: Limpiar carpeta temporal después de consolidar
# import shutil
# shutil.rmtree('tmp')