In [123]:
from collections import Counter
import pandas as pd
import numpy as np
import re
import ast
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [124]:
pd.set_option('display.max_colwidth', None)

In [125]:
TRANSFER_PATH = 'Transfer.xlsx'
FILE_PATH = None

MIN_ALIAS_LENGTH = 10
MIN_SCORE_ALIAS = 0.6

In [126]:
def get_path_latest_file():
    '''Asigna la variable de ruta al archivo de OFAC mas reciente en modificacion'''
    global FILE_PATH
    
    directorio = Path.cwd()
    archivos = list(directorio.glob("OFAC_*"))

    FILE_PATH = max(archivos, key=lambda f: f.stat().st_mtime, default=None)
    FILE_PATH = FILE_PATH.name

    print(f"Archivo más reciente: {FILE_PATH}" if FILE_PATH else "No se encontraron archivos.")

def clean_strings(s):
    '''Limpia la cadena eliminando caracteres especiales, numeros y espacios'''
    return re.sub(r'[^a-zA-Z]', '', s)

def verify_if_number(s):
    return re.search(r'\d', s)

def compare_words(word1, word2):
    '''Compara dos palabras y devuelve el porcentaje de similitud entre 1 y 0'''
    counter1 = Counter(word1)
    counter2 = Counter(word2)

    intersection = sum((counter1 & counter2).values())
    smallest_length = min(len(word1), len(word2))

    if smallest_length == 0:
        return 0.0
    
    simililarity_score = (np.exp(intersection / smallest_length) - 1) / (np.exp(1) - 1)
    return simililarity_score

def compare_names(name1, name2):
    '''Compara dos nombres y devuelve el porcentaje de similitud entre 1 y 0'''
    name1 = clean_strings(name1).lower()
    name2 = clean_strings(name2).lower()

    words1 = name1.split()
    words2 = name2.split()

    scores = []

    for word1 in words1:
        word_scores = [compare_words(word1, word2) for word2 in words2]
        if word_scores:
            scores.append(max(word_scores))

    if not scores:
        return 0.0
    
    final_score = sum(scores) / len(scores)
    return final_score

def compare_names_vectorized_transfer(df_names_array, transfer_names_array):
    '''Compara una matriz de nombres y devuelve una matriz de simulitud'''
    vectorizer = TfidfVectorizer().fit(np.concatenate([df_names_array, transfer_names_array]))
    df_vectors = vectorizer.transform(df_names_array)
    transfer_vectors = vectorizer.transform(transfer_names_array)

    score_matrix = cosine_similarity(df_vectors, transfer_vectors)
    return score_matrix

In [127]:
def process_aliases(df):
    '''Procesa la columna Alias para convertir las cadenas en listas'''
    df['Alias'] = df['Alias'].apply(lambda x: ast.literal_eval(x) if pd.notna(x) else [])
    return df

def process_documents(df):
    '''Procesa la columna Documentos para convertir las cadenas en listas'''
    df['Documentos'] = df['Documentos'].apply(lambda x: ast.literal_eval(x) if pd.notna(x) else [])
    return df

In [128]:
def filter_names(df):
    '''Filtra los alias y los deja en la columna Alias con los nombres similares'''
    names_list = []
    for _, row in df.iterrows():
        full_name = row['Nombre Completo']
        names = []
        for alias in row['Alias']:
            if verify_if_number(alias):
                continue
            elif len(alias) < MIN_ALIAS_LENGTH:
                continue
            value = compare_names(full_name, alias)
            if value > MIN_SCORE_ALIAS:
                names.append(alias)
        names_list.append(names)
    df['Alias'] = names_list
    return df

def filter_documents(df):
    '''Filtra los documentos y los deja en la columna Documentos con los documentos validos'''
    docs_list = []
    for _, row in df.iterrows():
        new_documents = []
        documents = row['Documentos']
        for document in documents:
            doc_type = document.split()[0]
            if doc_type in ['CC', 'PAS', 'NIT']:
                new_documents.append(document)
        if new_documents == []:
            docs_list.append(["Def 1"])
        else:
            docs_list.append(new_documents)
    df['Documentos'] = docs_list
    return df

In [129]:
def expand_dataframe(df):
    '''Expande el DataFrame original a uno nuevo con las combinaciones de nombres y documentos'''
    new_rows = []
    for _, row in df.iterrows():
        names = [row['Nombre Completo']] + row['Alias']
        documents = row['Documentos']

        for name in names:
            for document in documents:
                new_rows.append({"Nombre": name, "Documento": document})
    
    expanded_df = pd.DataFrame(new_rows)
    return expanded_df

In [130]:
def save_to_excel(df, filename):
    '''Guarda el DataFrame en un archivo Excel y ajusta el ancho de las columnas'''
    df.to_excel(filename, index=False)
    wb = load_workbook(filename)
    ws = wb.active

    for col in ws.columns:
        max_length = 0
        col_letter = get_column_letter(col[0].column)

        for cell in col:
            try:
                if cell.value:
                    max_length = max(max_length, len(str(cell.value)))
            except:
                pass
        adjusted_width = (max_length + 2)
        ws.column_dimensions[col_letter].width = adjusted_width

    wb.save(filename)

In [131]:
def load_and_transform_transfer_excel(file_path):
    '''Carga el archivo en un DataFrame y realiza las transformaciones necesarias'''
    df = pd.read_excel(file_path)
    df = df[['NOMBRE']]
    return df

In [132]:
def compare_lists(df, transfer):
    '''Compara listas y encuentra los mejores matches, agregando columnas de comparacion'''
    df_names_array = df['Nombre'].fillna("N/A").values
    transfer_names_array = transfer['NOMBRE'].values

    score_matrix = compare_names_vectorized_transfer(df_names_array, transfer_names_array)

    best_match_indices = np.argmax(score_matrix, axis=1)
    best_scores = np.max(score_matrix, axis=1)

    df["Comparado"] = transfer_names_array[best_match_indices]
    df["Score"] = best_scores * 100

    return df

In [133]:
def main():
    '''Funcion principal que ejecuta el proceso completo'''
    get_path_latest_file()

    df = pd.read_excel(FILE_PATH)
    df = process_aliases(df)
    df = process_documents(df)
    df.drop(columns=["Accion"], inplace=True)

    df = filter_names(df)
    df = filter_documents(df)

    df = expand_dataframe(df)

    filename = f"PreTransfer_{FILE_PATH}"
    save_to_excel(df, filename)

    print(f"Archivo guardado en {filename}")

    transfer = load_and_transform_transfer_excel(TRANSFER_PATH)
    final = compare_lists(df, transfer)

    filename = f"Transfer_{FILE_PATH}"
    save_to_excel(final, filename)

    print(f"Archivo guardado en {filename}")

if __name__ == "__main__":
    main()

Archivo más reciente: OFAC_2025-02-26.xlsx
Archivo guardado en PreTransfer_OFAC_2025-02-26.xlsx
Archivo guardado en Transfer_OFAC_2025-02-26.xlsx
