In [None]:
from IPython.display import clear_output
import gc
import psutil
print(gc.collect())
print("Memoria:", psutil.virtual_memory())

In [None]:
import importlib
import dfrtokenuniverse.splitter
import dfrtokenuniverse.word_inventory
import dfrtokenuniverse.constantes


# Recarga librerías propias

In [None]:
importlib.reload(dfrtokenuniverse.splitter)
importlib.reload(dfrtokenuniverse.word_inventory)
importlib.reload(dfrtokenuniverse.constantes)
from dfrtokenuniverse.splitter import TextSplitter
from dfrtokenuniverse.word_inventory import WordInventory
from dfrtokenuniverse.constantes import KDfrNlp
K = KDfrNlp()
splitter = TextSplitter()
invent = WordInventory()

# Ejemplo splitter e inventario

In [None]:
a_dic = {}
for ttkn, n_grama in splitter.split_by_type("Esto es una, hay otra ¿no?\nPues esta es una más ¡SEGURO! 23587,33", 4):
    invent.normalize_and_count(ttkn, n_grama, a_dic)
print(a_dic)
for ttkn in a_dic:
    for n_grama in a_dic[ttkn]:
        p_ngrama = n_grama.replace("\n", "¬")
        print(f"{K.TTKN_DESC[ttkn]} contiene '{p_ngrama}' con {a_dic[ttkn][n_grama]} apariciones")
del a_dic
del ttkn
print(gc.collect())
print("Memoria:", psutil.virtual_memory())

# Recuperamos texto

In [None]:
import pickle
wiki_path = r"D:\datos\wiki_1909627_entradas.pkl"
print(f"Leyendo desde: {wiki_path}")
with open(wiki_path, "rb") as wiki_file:
    wiki_data = pickle.load(wiki_file)
print(gc.collect())
print("Memoria:", psutil.virtual_memory())

In [None]:
print(f"Documentos en wiki: {len(wiki_data)}")

<center> <h2>Diccionario de n-gramas</h2> </center>

In [None]:
dic_ngramas = {}

In [None]:
saltar  = 0
recoge = 100000 # Entradas de la wikipedia con sus textos
limite = saltar + recoge

<h3>Realizamos varias pasadas para ir acumulando n-gramas</h3>
<hr/>

In [None]:
print(f"Recogemos {recoge} entradas a partir de {saltar}")
cont_entradas = 0
max_len_allowed=35
for entrada in wiki_data:
    if cont_entradas > saltar:
        for ttkn, n_grama in splitter.split_by_type(entrada, max_len=max_len_allowed):
            invent.normalize_and_count(ttkn, n_grama, dic_ngramas)
        for linea in wiki_data[entrada]:
            for ttkn, n_grama in splitter.split_by_type(linea.decode("utf-8"), max_len=max_len_allowed):
                invent.normalize_and_count(ttkn, n_grama, dic_ngramas)
        if cont_entradas > limite:
            break
        if cont_entradas % 1000 == 0:
            clear_output(True)
            print(f"Llevamos {cont_entradas} entradas de {limite} ({round(100*cont_entradas/limite, 2)}%)")
            for ttkn in dic_ngramas:
                print(f"- {len(dic_ngramas[ttkn])} n-gramas recolectados del tipo {K.TTKN_DESC[ttkn]}")
    cont_entradas += 1
salta = limite
limite += recoge

<hr style="border: 1px dashed #aaa;"/>
<center><h2>Liberamos memoria</h2></center>

In [None]:
print("Memoria:", psutil.virtual_memory())
del wiki_data
print(gc.collect())
print("Memoria:", psutil.virtual_memory())

<hr style="color: red; height: 2; border: 3px dotted;"/>
<center><h1 style="color: yellow;">Comienza la fiesta</h1></center>
<hr style="color: red; width: 50%;"/>

In [None]:
max_ngrama_len = 0
max_ngrama_ngrama = ""
max_ngrama_ttkn = ""
maximos = []
for ttkn in dic_ngramas:
    for n_grama in dic_ngramas[ttkn]:
        if len(n_grama) > max_ngrama_len:
            max_ngrama_len = len(n_grama)
            max_ngrama_ngrama = n_grama
            max_ngrama_ttkn = ttkn
    maximos = [n_grama.replace("\n", "¬") for n_grama in sorted(dic_ngramas[ttkn], key=lambda x:dic_ngramas[ttkn][x], reverse=True)[:15]]
    print(f"Para el tipo {K.TTKN_DESC[ttkn].rjust(12)} tenemos {str(len(dic_ngramas[ttkn])).rjust(11)}: {maximos}")
print(f"El ngrama más largo, con {max_ngrama_len} es del tipo {K.TTKN_DESC[ttkn]} ({ttkn}): <|{max_ngrama_ngrama.encode()}|>")
del maximos
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

# Contabilizacion de monogramas

In [None]:
prob_monogramas = {ttkn:{} for ttkn in dic_ngramas}
acum_monogramas = {ttkn:0 for ttkn in dic_ngramas}
for ttkn in dic_ngramas:
    print(f"Contando tipo {K.TTKN_DESC[ttkn].rjust(12)}...", end="")
    for n_grama in dic_ngramas[ttkn]:
        for monograma in n_grama:
            invent._dict_counter(ttkn, monograma, prob_monogramas)
            acum_monogramas[ttkn] += 1
    print(f" {str(acum_monogramas[ttkn]).rjust(11)} apariciones (probabilidad media: {round(100 * len(prob_monogramas[ttkn]) / acum_monogramas[ttkn], 5)}%)")
print("Probabilidades...")
prob_monogramas = {ttkn:{monograma: prob_monogramas[ttkn][monograma] / acum_monogramas[ttkn] for monograma in prob_monogramas[ttkn]} for ttkn in prob_monogramas}
print("¡Terminado!")
del acum_monogramas
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

probabilidad de monogramas

In [None]:
for ttkn in prob_monogramas:
    maximos = [monograma.replace("\n", " ") for monograma in sorted(prob_monogramas[ttkn], key=lambda x: prob_monogramas[ttkn][x], reverse=True)[:10]]
    print(f"Para {K.TTKN_DESC[ttkn].rjust(15)}: {', '.join([f'<{x}>: {round(prob_monogramas[ttkn][x], 7)}' for x in maximos])}")

## Combinatorias

In [None]:
fragmentos = {ttkn: {} for ttkn in dic_ngramas}
fragmentos_n_grama = {ttkn: {} for ttkn in dic_ngramas}
max_len_frag = 5
print(f"Fragmentando ...".rjust(50) + "".join(["|"]*100))
for ttkn in dic_ngramas:
    print(f"Fragmentando {len(dic_ngramas[ttkn])} n-gramas de {K.TTKN_DESC[ttkn]}".rjust(50), end="")
    cada = 1+ len(dic_ngramas[ttkn]) // 100
    cont = 0
    for n_grama in dic_ngramas[ttkn]:
        for l_n in range(2, min([len(n_grama), max_len_frag]) - 1):
            for i_ini in range(len(n_grama) - l_n):
                if n_grama[i_ini:i_ini+l_n] in fragmentos[ttkn]:
                    fragmentos[ttkn][n_grama[i_ini:i_ini+l_n]] += 1
                    if n_grama in fragmentos_n_grama[ttkn][n_grama[i_ini:i_ini+l_n]]:
                        fragmentos_n_grama[ttkn][n_grama[i_ini:i_ini+l_n]][n_grama] += 1
                    else:
                        fragmentos_n_grama[ttkn][n_grama[i_ini:i_ini+l_n]][n_grama] = 1
                else:
                    fragmentos[ttkn][n_grama[i_ini:i_ini+l_n]] = 1
                    fragmentos_n_grama[ttkn][n_grama[i_ini:i_ini+l_n]] = {n_grama: 1}
        cont += 1
        if cont % cada == 0:
            print(".", end="")
    print("!")
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

## Fragmentos óptimos

In [None]:
acum_frag_n_grama = {ttkn: sum([len(fragmentos_n_grama[ttkn][x]) for x in fragmentos_n_grama[ttkn]]) for ttkn in fragmentos_n_grama}
acum_fragmentos = {ttkn: sum([fragmentos[ttkn][x] for x in fragmentos[ttkn]]) for ttkn in fragmentos}
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

In [None]:
usados = {}
excedentes = {}
n_grama = "Esto es una prueba"
evaluados = ["Es", "to", "ta", "es", "una", "pru", "eba", ".", " "]
n_grama = "Esto es una prueba"
invent.explota_fragmento(n_grama, usados, excedentes, evaluados)
n_grama = "Esto es otra prueba"
invent.explota_fragmento(n_grama, usados, excedentes, evaluados)
print("usados", usados)
print("excedentes", excedentes)
del usados
del evaluados
del excedentes
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

## Evaluador de fragmentos en n-gramas para obtener tokens

In [None]:
fragmentos_usados_ttkn = {ttkn: {} for ttkn in fragmentos}
excedentes_ttkn = {ttkn: {} for ttkn in fragmentos}
fragmentos_evaluados_ttkn = {}
print(f"Evaluando fragmentos sobre tokens ...".rjust(70) + "".join(["|"]*100))
for ttkn in dic_ngramas:
    print(f"Ordenando {len(fragmentos[ttkn])} fragmentos de {K.TTKN_DESC[ttkn]}...", end="")
    fragmentos_usados = {}
    excedentes = {}
    previos = sorted(
        fragmentos[ttkn],
        key=lambda x: (
            0.55 + len(x) * (1.0 + len(x) - len(set(x))) / max_len_frag
        ) ** (
            (
                1.0 * (fragmentos[ttkn][x] / acum_fragmentos[ttkn]) +
                3.0 * (len(fragmentos_n_grama[ttkn][x]) /acum_frag_n_grama[ttkn])
            ) / 4.0
        ),
        reverse=True
    )
    print(f"Fragmentos previos: {len(previos)} = {previos[:10]}")
    evaluados = []
    cont = 0
    cada = 1 + len(previos) // 100
    print(f"Optimizando {len(previos)} n-gramas de {K.TTKN_DESC[ttkn]} cada {cada} un punto.".rjust(70), end="")
    no_esta = True
    for fragmento in previos:
        no_esta = True
        for x in evaluados:
            if len(x) < len(fragmento) and x in fragmento:
                no_esta = False
                break
        if no_esta:
            evaluados.append(fragmento)
        cont += 1
        if cont % cada == 0:
            print(".", end="")
    del previos
    print("!")
    print(f"Fragmentos evaluados: {len(evaluados)} = {evaluados[:10]}")
    cada = 1 + len(dic_ngramas[ttkn]) // 100
    print(f"Evaluando {len(dic_ngramas[ttkn])} n-gramas de {K.TTKN_DESC[ttkn]} cada {cada} un punto.".rjust(70), end="")
    cont = 0
    for n_grama in dic_ngramas[ttkn]:
        if isinstance(n_grama, str) and len(n_grama) < 50:
            invent.explota_fragmento(n_grama, fragmentos_usados, excedentes, evaluados)
        cont += 1
        if cont % cada == 0:
            print(".", end="")
    print("!")
    print("Recogiendo datos...")
    fragmentos_usados_ttkn[ttkn] = fragmentos_usados.copy()
    print(f"Fragmentos usados en {K.TTKN_DESC[ttkn]}: {len(fragmentos_usados)}")
    excedentes_ttkn[ttkn] = excedentes.copy()
    print(f"Excedentes en {K.TTKN_DESC[ttkn]}: {len(excedentes)}")
    del fragmentos_usados
    del excedentes
    print(f"Limpiando caché de {gc.collect()} objetos...")
print(f"Limpiando caché de {gc.collect()} objetos...")
print("Memoria:", psutil.virtual_memory())

## Guardado (si procede)

In [None]:
fragmentos_usados_path = r"D:\datos\tokens_usados.pkl"
excedentes_path = r"D:\datos\tokens_excedentes.pkl"
print(f"Guardando {fragmentos_usados_path}")
with open(fragmentos_usados_path, "wb") as file:
    pickle.dump(fragmentos_usados_ttkn, file)
print(f"Guardando {excedentes_path}")
with open(excedentes_path, "wb") as file:
    pickle.dump(excedentes_ttkn, file)

# Diccionario de tokens

## Orden y numeración

In [None]:
ttkn_start_id = {
    K.TTKN_ESP: 50,
    K.TTKN_SEP: 1000,
    K.TTKN_SIM: 5000,
    K.TTKN_DIG: 20000,
    K.TTKN_LET: 40000,
    K.TTKN_UNK: 90000
}

## Generación de tokens

In [None]:
# Empezamos por los tokens del sistema
tokens = {K.TTKN_SIS:token_id for token_id in K.MAIN_TOKEN_DICT}
# Inicializamos los tipos de token previendo su rellenado posterior
for ttkn in fragmentos:
    tokens[ttkn] = {}
# Iniciamos desde el token_id = 10 como minimo
token_id = 10
# A rellenar...
for ttkn in ttkn_start_id:
    if ttkn not in fragmentos_usados_ttkn:
        continue
    # Situamos la numeracion optimizada
    if token_id < ttkn_start_id[ttkn]:
        token_id = ttkn_start_id[ttkn]
    else:
        token_id += ttkn_start_id[ttkn]
    print(f"Para {K.TTKN_DESC[ttkn]} empezamos por {token_id}", end=",")
    # Incluimos en esa trancha de ids los tokens
    for fragmento in fragmentos_usados_ttkn[ttkn]:
        tokens[ttkn][fragmento] = token_id
        token_id += 1
    print(f" usados ({token_id-1}),", end="")
    for excedente in excedentes_ttkn[ttkn]:
        tokens[ttkn][excedente] = token_id
        token_id += 1
    print(f" excedentes ({token_id-1}),", end="")
    # Que no se nos olviden los monogramas encontrados
    if ttkn in prob_monogramas:
        for monograma in sorted(prob_monogramas[ttkn], key=lambda x: prob_monogramas[ttkn][x], reverse=True):
            if monograma not in tokens[ttkn]:
                tokens[ttkn][monograma] = token_id
                token_id += 1
    print(f" monogramas ({token_id-1}),", end="")
    # Para que quede todo relleno, los de las constantes
    if ttkn in K.TIPOS_TKN:
        for caracter in K.TIPOS_TKN[ttkn]:
            if caracter.lower() not in tokens[ttkn]:
                tokens[ttkn][caracter.lower()] = token_id
                token_id += 1
    print(f" y constantes. Terminamos por {token_id-1}. Tenemos {len(tokens[ttkn])}")
    print(f"Llevamos {sum([len(tokens[x]) for x in tokens])}")


# Guardado de Tokens

In [None]:
tokens_path = r"D:\datos\tokens_ttkn_id_v20241208.pkl"
print(f"Guardando {tokens_path}")
with open(tokens_path, "wb") as file:
    pickle.dump(tokens, file)