Funciónes de normalización

In [16]:
import json
import unicodedata
import re
from difflib import get_close_matches

def normalize_string(s: str, delete_non_letters: bool = False) -> str:
    s = str(s)
    # Convertir a ASCII, eliminar acentos
    s = unicodedata.normalize('NFKD', s).encode('ASCII', 'ignore').decode('ASCII')
    
    # Eliminar caracteres especiales, dejar solo letras, números y espacios
    if delete_non_letters:
        s = re.sub(r'[^a-zA-Z0-9\s]', '', s)
    else:
        # Si no se eliminan los caracteres no letras, aún podríamos querer eliminar algunos caracteres especiales
        s = re.sub(r'[^\w\s]', '', s)
    
    # Convertir a mayúsculas
    return s.upper()

def get_unsensitive_close_matches(word, possibilities, n=3, cutoff=0.6):
    """
    Encuentra lista de coincidiencias suficientemente buenas
    comparando valores normalizados
    """
    normalized_word = normalize_string(word, True).lower()
    normalized_possibilities = [normalize_string(str(s), True).lower() for s in possibilities]
    close_normalized_matches = get_close_matches(normalized_word, normalized_possibilities, n, cutoff)
    close_matches = []
    for cnm in close_normalized_matches:
        index = normalized_possibilities.index(cnm)
        close_matches.append(possibilities[index])
    return close_matches

def get_closest_match(possibilities: list[str], word: str, default_value = "") -> str:
    if not word:
        return default_value
    matches = get_unsensitive_close_matches(word, possibilities, n=5, cutoff=0.7)
    return matches[0] if matches else default_value


def normalize_json(data):
    if isinstance(data, dict):
        return {normalize_string(key): normalize_json(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [normalize_json(item) for item in data]
    elif isinstance(data, str):
        return normalize_string(data)
    else:
        return data

def find_department(department_cities: dict, city: str) -> str:
    if not city:
        return ""
    all_cities = {}
    for dept, cities in department_cities.items():
        for city in cities:
            all_cities[city] = dept
    if city:
        return all_cities[city]
    return ""

with open("utils/ciudades_normalizadas.json", "r", encoding="utf-8") as file:
    department_cities = json.load(file)

def format_place(department_name: str, city_name: str) -> tuple[str, str]:
    department_name = str(department_name)
    city_name = str(city_name)
    if city_name == "nan":
        city_name = ""

    if city_name.lower() == "caqueta" or city_name.lower() == "florencia":
        city_name = "Florencia"
        department_name = "Caqueta"

    all_cities = {}
    for dept, cities in department_cities.items():
        for city in cities:
            all_cities[city] = dept
    city_name_formated = get_closest_match(list(all_cities.keys()), city_name)

    departments = list(department_cities.keys())
    if "bogota" in  normalize_string(city_name).lower():
        department_name = get_closest_match(departments, "bogota dc")
    department_name_formated = get_closest_match(departments, department_name)
    if not department_name_formated:
        department_name_formated = all_cities.get(city_name_formated, "")

    return department_name_formated, city_name_formated

Normalizar nombres ciudades

In [16]:
# Leer el archivo JSON
with open('utils/ciudades.json', 'r', encoding='utf-8') as file:
    ciudades = json.load(file)

# Normalizar el JSON
ciudades_normalizadas = normalize_json(ciudades)

# Guardar el JSON normalizado
with open('utils/ciudades_normalizadas.json', 'w', encoding='utf-8') as file:
    json.dump(ciudades_normalizadas, file, ensure_ascii=False, indent=2)

Normalizar nombres en base de datos

In [18]:
from openpyxl import load_workbook

# Nombre del archivo
filename = 'datos.xlsx'

# Cargar el archivo Excel
wb = load_workbook(filename)
sheet = wb['TODOS']

# Normalizar los valores en la columna 'Nombre'
for cell in sheet["A"][1:]:  # Empezamos desde la segunda fila para evitar el encabezado
    if cell.value:
        cell.value = normalize_string(str(cell.value))

# Guardar los cambios sobreescribiendo el archivo original
wb.save("datos_normalized.xlsx")

Definición de clase para leer hojas de cálculo

In [17]:
import pandas as pd
from openpyxl import load_workbook
from openpyxl.utils.dataframe import dataframe_to_rows

def get_cities()->dict:
    with open('utils/ciudades_normalizadas.json', 'r', encoding='utf-8') as file:
        return json.load(file)

class ExcelDataProcessor:
    def __init__(self, filename, sheet_name, nombre_col, correo_col, 
                 dept_col=None, city_col=None, 
                 combined_col=None, combined_format=None):
        self.filename = filename
        self.sheet_name = sheet_name
        self.nombre_col = nombre_col
        self.correo_col = correo_col
        self.dept_col = dept_col
        self.city_col = city_col
        self.combined_col = combined_col
        self.combined_format = combined_format
        self.data = None
        self.read_excel()

    def read_excel(self):
        try:
            df = pd.read_excel(self.filename, sheet_name=self.sheet_name)
            required_columns = [self.nombre_col, self.correo_col]
            
            if self.combined_col:
                required_columns.append(self.combined_col)
            if self.dept_col:
                required_columns.append(self.dept_col)
            if self.city_col:
                required_columns.append(self.city_col)
            
            if not all(col in df.columns for col in required_columns):
                missing_cols = [col for col in required_columns if col not in df.columns]
                raise ValueError(f"Columnas faltantes: {', '.join(missing_cols)}")
            
            df = df.dropna(subset=[self.correo_col])

            self.data = df
            self._process_dept_city()
            return True
        except Exception as e:
            print(f"Error al leer el archivo Excel: {str(e)}")
            return False

    def _process_dept_city(self):
        self.data['Departamento'] = ''
        self.data['Ciudad'] = ''

        if self.dept_col and self.city_col:
            self.data['Departamento'] = self.data[self.dept_col]
            self.data['Ciudad'] = self.data[self.city_col]
        elif self.dept_col:
            self.data['Departamento'] = self.data[self.dept_col]
        elif self.city_col:
            self.data['Ciudad'] = self.data[self.city_col]
        elif self.combined_col and self.combined_format:
            pattern = self.combined_format.replace("%dept", "(.+?)").replace("%city", "(.+?)")
            extracted = self.data[self.combined_col].str.extract(pattern)
            if '%dept' in self.combined_format and '%city' in self.combined_format:
                self.data['Departamento'] = extracted[0]
                self.data['Ciudad'] = extracted[1]
            elif '%dept' in self.combined_format:
                self.data['Departamento'] = extracted[0]
            elif '%city' in self.combined_format:
                self.data['Ciudad'] = extracted[0]

        self.data = self.data.rename(columns={
            self.nombre_col: 'Nombre',
            self.correo_col: 'Correo'
        })
        self.data = self.data[['Nombre', 'Correo', 'Departamento', 'Ciudad']]

    def get_data(self):
        return self.data

    def get_by_column(self, column_name):
        if column_name in self.data.columns:
            return self.data[column_name].tolist()
        else:
            print(f"La columna {column_name} no existe en los datos")
            return []

    def get_by_index(self, index):
        if 0 <= index < len(self.data):
            return self.data.iloc[index].to_dict()
        else:
            print(f"Índice {index} fuera de rango")
            return {}
    
    def normalize(self, format_function):
        if not callable(format_function):
            raise ValueError("format_function debe de ser una función callable")
        
        def apply_format(row):
            dept, city = format_function(row["Departamento"], row["Ciudad"])
            return pd.Series({"Departamento": dept, "Ciudad": city})
        
        normalized = self.data.apply(apply_format, axis=1)
        self.data[["Departamento", "Ciudad"]] = normalized


class DataMaster:
    def __init__(self, filename: str, sheet_name: str):
        self.filename = filename
        self.sheet_name = sheet_name
        self.data = None
        self.load_data()

    def load_data(self):
        try:
            self.data = pd.read_excel(self.filename, sheet_name=self.sheet_name)
            required_columns = ['Correo', 'Departamento', 'Ciudad']
            if not all(col in self.data.columns for col in required_columns):
                missing_cols = [col for col in required_columns if col not in self.data.columns]
                raise ValueError(f"Columnas faltantes: {', '.join(missing_cols)}")
        except Exception as e:
            print(f"Error al cargar los datos: {str(e)}")

    def get_data(self):
        return self.data
    
    def prepare_info_data(self, info):
        self.info_data = info.get_data()
        self.email_index = [(normalize_string(str(email), delete_non_letters=True), i) 
                            for i, email in enumerate(self.info_data['Correo'])]
        self.name_index = [(normalize_string(str(name), delete_non_letters=True), i) 
                           for i, name in enumerate(self.info_data['Nombre'])]
    
    def find_row(self, master_row, col_name: str):
        master_col_norm = normalize_string(str(master_row[col_name]), delete_non_letters=True)
        if col_name == 'Correo':
            index_list = self.email_index
        else:  # 'Nombre'
            index_list = self.name_index
        
        for info_col_norm, i in index_list:
            if master_col_norm.replace(" ","") in info_col_norm.replace(" ",""):
                return self.info_data.iloc[i]
        return None
        
    def update_places(self, info):
        self.prepare_info_data(info)
        mask = self.data['Departamento'].isna() | self.data['Ciudad'].isna()
        rows_to_update = self.data[mask]
        
        for index, row in rows_to_update.iterrows():
            info_row = self.find_row(row, "Correo")
            if info_row is None:
                info_row = self.find_row(row, "Nombre")
            if info_row is not None:
                self.data.at[index, "Departamento"] = info_row["Departamento"]
                self.data.at[index, "Ciudad"] = info_row["Ciudad"]

    def get_by_column(self, column_name: str):
        if column_name in self.data.columns:
            return self.data[column_name].tolist()
        else:
            print(f"La columna {column_name} no existe en los datos")
            return []

    def get_by_index(self, index: int):
        if 0 <= index < len(self.data):
            return self.data.iloc[index].to_dict()
        else:
            print(f"Índice {index} fuera de rango")
            return {}
    
    def show_nontna(self):
        notna = self.data[(self.data["Ciudad"].notna()) | (self.data["Departamento"].notna())]
        print(notna)

    def save(self):
        try:
            # Cargar el libro de trabajo existente
            book = load_workbook(self.filename)
            
            # Obtener la hoja existente o crear una nueva si no existe
            if self.sheet_name in book.sheetnames:
                sheet = book[self.sheet_name]
            else:
                sheet = book.create_sheet(self.sheet_name)

            # Limpiar el contenido existente de la hoja
            for row in sheet[sheet.dimensions]:
                for cell in row:
                    cell.value = None

            # Escribir los nuevos datos en la hoja
            rows = dataframe_to_rows(self.data, index=False, header=True)
            for r_idx, row in enumerate(rows, 1):
                for c_idx, value in enumerate(row, 1):
                    sheet.cell(row=r_idx, column=c_idx, value=value)

            # Guardar los cambios
            book.save(self.filename)
            
            print(f"Datos actualizados exitosamente en {self.filename}, hoja: {self.sheet_name}")
        except Exception as e:
            print(f"Error al guardar los datos: {str(e)}")

In [18]:
# Libros
amigos_visibles = "data/amigos_visibles.xlsx"
construccion = "data/construccion.xlsx"
afrus = "data/afrus.xlsx"
donaciones_av = "data/donaciones_av.xlsx"
connect = "data/connect.xlsx"
aliados = "data/aliados.xlsx"
evenbrite = "data/evenbrite.xlsx"
eag = "data/eag.xlsx"
empresarios_tv = "data/talento_visible_empresas.xlsx"
alta_gerencia = "data/alta_gerencia.xlsx"
lidera = "data/lidera.xlsx"
otros = "data/otros.xlsx"
influencers = "data/influencers.xlsx"
medios_comunicacion = "data/medios_comunicacion.xlsx"
letras_vanguardia_medellin = "data/letras_vanguardia_medellin.xlsx"
letras_vanguardia_bogota = "data/letras_vanguardia_bogota.xlsx"
veni_te_leo = "data/veni_te_leo.xlsx"
red_manos = "data/red_manos.xlsx"

In [27]:
# Data Master
data_master = DataMaster("datos.xlsx", "TODOS")

In [22]:
# Amigos Visibles
databases_1 = [
    ExcelDataProcessor(amigos_visibles, "1.PROVEEDORES", "Organización", "Correo 1", city_col="Cuidad"),
    ExcelDataProcessor(amigos_visibles, "2. EQUIPO MV, JUNTA,ASAMBL ", "Nombre", "Correo 1", city_col="Ciudad de residencia"),
    ExcelDataProcessor(amigos_visibles, "4. ORGANIZACIONES MINGALAB", "Nombre de la organización", "Correo 1", city_col="Ciudad de residencia"),
    ExcelDataProcessor(amigos_visibles, "4. PODER PACÍFICO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "4. ESCUELA GOBIERNO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "4. ESCUELA ECONOMIA", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "5. POT ÉTNICA MDP, CE,MBA,MERC", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "6. FJCP I Y II", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "6. MIT", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "6. ESCUELA SALUD PUB", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "7. DALE ", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "7. MUJERES LIDERES CARTAGENA", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. POTENCIA ÉTNICA AUDIOVISUAL", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. FOCO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. PMB", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. FOCO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. FOCO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
    ExcelDataProcessor(amigos_visibles, "8. FOCO", "Nombre", "Correo 1", city_col="Ciudad de Residencia"),
]


  warn(msg)
  warn(msg)


In [28]:
for db in databases_1:
    db.normalize(format_place)
    data_master.update_places(db)

  self.data.at[index, "Departamento"] = info_row["Departamento"]
  self.data.at[index, "Ciudad"] = info_row["Ciudad"]


In [24]:
databases_2 = [
    ExcelDataProcessor(afrus, "18-09-2024-Personas-24-11-2023-", "Nombre completo", "Correo electrónico", city_col="Ciudad"),
    ExcelDataProcessor(donaciones_av, "Hoja 1", "Nombre completo", "Correo electrónico", dept_col="Departamento", city_col="Ciudad"),
    ExcelDataProcessor(connect, "BASE RED MV QUIBDÓ", "Nombre completo", "Correo", city_col="Municipio de Residencia o atención"),
    ExcelDataProcessor(connect, "BASE DE TUTORES", "NOMBRE ", "CORREO", city_col="NODO"),
    ExcelDataProcessor(evenbrite, "Sheet 1", "Nombre completo", "Correo electrónico de comprador", city_col="Ciudad del comprador"),
    ExcelDataProcessor(empresarios_tv, "Hoja1", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(alta_gerencia, "PARTICIPANTES ESC. ALTA GERENCI", "NOMBRES Y APELLIDOS DEL/LA PARTICIPANTE", "CORREO ELECTRÓNICO DEL/LA PARTICIPANTE", dept_col="DEPTO DE RESIDENCIA", city_col="CIUDAD O MUNICIPIO DE RESIDENCIA"),
    ExcelDataProcessor(lidera, "Músicos", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(lidera, "Actores Actrices", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(lidera, "Cine ", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(lidera, "Académicos", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(lidera, "Artes Visuales", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(lidera, "Danza", "Nombre", "Correo", city_col="Ciudad"),
    ExcelDataProcessor(medios_comunicacion, "Directores y Periodistas ", "NOMBRE", "DIRECCION", city_col="CIUDAD"),
    ExcelDataProcessor(letras_vanguardia_medellin, "BASE LETRAS DE VANGUARDIA", "Nombre", "CORREO ELECTRÓNICO DEL/LA PARTICIPANTE", dept_col="DEPTO DE RESIDENCIA", city_col="CIUDAD O MUNICIPIO DE RESIDENCIA"),
    ExcelDataProcessor(letras_vanguardia_bogota, "PARTICIPANTES ESCUELA VEC", "Nombre", "CORREO ELECTRÓNICO DEL/LA PARTICIPANTE", dept_col="DEPTO DE RESIDENCIA", city_col="CIUDAD O MUNICIPIO DE RESIDENCIA"),
    ExcelDataProcessor(veni_te_leo, "PARTICIPANTES ESCUELA VEC", "NOMBRES", "CORREO ELECTRÓNICO DEL/LA PARTICIPANTE", dept_col="DEPTO DE RESIDENCIA", city_col="CIUDAD O MUNICIPIO DE RESIDENCIA"),
    ExcelDataProcessor(red_manos, "BASE COMUNICACIONES PRELIMI (2)", "Nombre completo", "Correo Electrónico", dept_col="Departamento residencia", city_col="Municipio de residencia")
]

  warn(msg)
  warn(msg)
  warn(msg)


In [29]:
for db in databases_2:
    db.normalize(format_place)
    data_master.update_places(db)

In [30]:
#data_master.show_nontna()
data_master.save()

Datos actualizados exitosamente en datos.xlsx, hoja: TODOS
