In [None]:
!pip install topmost

In [None]:
import topmost
import os

In [None]:
device = "cuda"  # or "cpu"
dataset_dir = "/kaggle/input/revista-de-ciencias-mdicas-de-la-habana-cuba"
output_dir = "./results"
os.makedirs(output_dir, exist_ok=True)

dataset = topmost.data.DynamicDataset(dataset_dir, batch_size=200, read_labels=True, device=device)

In [None]:
import random
import joblib
import json
import os

In [None]:
def save_txt(obj, path):
    with open(path, 'w') as file:
        file.write(obj)

def save_top_words_txt(top_words, path):
    with open(os.path.join(path, 'top_words.txt'), 'w') as file:
        for i, time in enumerate(top_words):
            file.write(f'================= Time {i} ================= \n')
            for j, topic in enumerate(time):
                file.write(f'================= Topic {j} ================= \n')
                file.write(f'{topic}')
                file.write('\n')
            file.write('\n')

def save_json(obj, path):
    with open(path, 'w') as file:
        json.dump(obj, file)

def save_joblib(obj, path):
    joblib.dump(obj, path)

def save_result(top_words, trainer, config, metrics_json):
    global samples 
    path = os.path.join(output_dir, f'{samples}')
    os.makedirs(path, exist_ok=True)

    try:
        save_top_words_txt(top_words, path)
        save_json(config, os.path.join(path, 'hyperparameters.json'))
        save_json(metrics_json, os.path.join(path, 'metrics_json.json'))
        save_json({"config": config, "metrics": metrics_json}, os.path.join(path, 'metrics_config.json'))
        # save_joblib(trainer, os.path.join(path, 'trainer.joblib'))
        save_joblib(top_words, os.path.join(path, 'top_words.joblib'))
        print('🍀 Saved')
    except Exception as e:
        traceback.print_exc(e)        

def get_hyperparameter_space():
    return {
  "training": {
    "learning_rate": [0.001, 0.002, 0.005, 0.01],
    "batch_size": [100, 200, 300, 500],
    "num_epoch": [400, 600, 800, 1000]
  },
  "model": {
    "num_topics": [20, 50, 100],
    "en1_units": [50, 100, 200, 300],
    "dropout": [0.0, 0.1, 0.3, 0.5],
    "beta_temp": [0.5, 1.0, 1.5, 2.0],
    "temperature": [0.05, 0.1, 0.2, 0.5],
    "weight_neg": [1e6, 5e6, 1e7, 1e8],
    "weight_pos": [1.0, 10.0, 100.0],
    "weight_UWE": [1e2, 1e3, 1e4],
    "neg_topk": [5, 10, 15, 20, 30]
  }
}

def get_default_config():
    return { "training": {
                "learning_rate": 0.002,
                "batch_size": 200,
                "num_epoch": 800 },

            "model": {
                "num_topics": 50,
                "en1_units": 100,
                "dropout": 0. , 
                "beta_temp": 1.0,
                "temperature": 0.1,
                "weight_neg": 1.0e+7,
                "weight_pos": 1.0e+1,
                "weight_UWE": 1.0e+3,
                "neg_topk": 15 }
           }

def random_configuration(hyperparameter_space):
    return { "training": {
                "learning_rate": random.choice(hyperparameter_space["training"]["learning_rate"]),
                "batch_size": random.choice(hyperparameter_space["training"]["batch_size"]),
                "num_epoch": random.choice(hyperparameter_space["training"]["num_epoch"]) },

            "model": {
                "num_topics": random.choice(hyperparameter_space["model"]["num_topics"]),
                "en1_units": random.choice(hyperparameter_space["model"]["en1_units"]),
                "dropout": random.choice(hyperparameter_space["model"]["dropout"]) , 
                "beta_temp": random.choice(hyperparameter_space["model"]["beta_temp"]),
                "temperature": random.choice(hyperparameter_space["model"]["temperature"]),
                "weight_neg": random.choice(hyperparameter_space["model"]["weight_neg"]),
                "weight_pos": random.choice(hyperparameter_space["model"]["weight_pos"]),
                "weight_UWE": random.choice(hyperparameter_space["model"]["weight_UWE"]),
                "neg_topk":random.choice(hyperparameter_space["model"]["neg_topk"]) }
           }

## Hyperparameters Search

## - Evaluacion

In [None]:
########################### Evaluate ####################################
import numpy as np
from topmost import eva
from gensim.corpora import Dictionary
from gensim.models import CoherenceModel
from topmost.data.file_utils import split_text_word

In [None]:
def _coherence_modified(reference_corpus, vocab, top_words, cv_type='c_v'):
    # print('🍀')
    
    split_top_words = split_text_word(top_words)
    num_top_words = len(split_top_words[0])
    for item in split_top_words:
        assert num_top_words == len(item)

    split_reference_corpus = split_text_word(reference_corpus)
    dictionary = Dictionary(split_text_word(vocab))

    cm = CoherenceModel(texts=split_reference_corpus, dictionary=dictionary, topics=split_top_words, topn=num_top_words, coherence=cv_type)
    cv_per_topic = cm.get_coherence_per_topic()
    # print(f"Coherence scores per topic: {cv_per_topic}")

    valid_scores = [score for score in cv_per_topic if not np.isnan(score)]
    if not valid_scores:
        # raise ValueError("All coherence scores are NaN.")
        return 0
    score = np.mean(valid_scores)

    return score
    
eva.topic_coherence._coherence = _coherence_modified

In [None]:
def eval(top_words, trainer):
    # get theta (doc-topic distributions)
    train_theta, test_theta = trainer.export_theta()

    train_times = dataset.train_times.cpu().numpy()
    # compute topic coherence
    dynamic_TC = eva.dynamic_coherence(dataset.train_texts, train_times, dataset.vocab, top_words)
    print("dynamic_TC: ", dynamic_TC)

    # compute topic diversity
    dynamic_TD = eva.dynamic_diversity(top_words, dataset.train_bow.cpu().numpy(), train_times, dataset.vocab)
    print("dynamic_TD: ", dynamic_TD)

    # evaluate clustering
    clustering = eva._clustering(test_theta, dataset.test_labels)
    print(clustering)

    # evaluate classification
    classification = eva._cls(train_theta, test_theta, dataset.train_labels, dataset.test_labels)
    print(classification)

    json = {
        "dynamic_TC": dynamic_TC,
        "dynamic_TD": dynamic_TD,
        "clustering": clustering,
        "classification": classification
    }
    
    return json, dynamic_TC, dynamic_TD, clustering, classification

## - Modelo

In [None]:
import traceback

In [None]:
def evaluate_configuration(config):
    global tested_configs, samples
    
    config_key = json.dumps(config, sort_keys=True)  # Serializamos el diccionario como clave
    
    if config_key in tested_configs:
        return tested_configs[config_key]
    
    model = topmost.CFDTM(
        vocab_size=dataset.vocab_size,
        train_time_wordfreq=dataset.train_time_wordfreq,
        num_times=dataset.num_times,
        pretrained_WE=dataset.pretrained_WE,
        num_topics=config["model"]["num_topics"],
        en_units=config["model"]["en1_units"],
        temperature=config["model"]["temperature"],
        beta_temp=config["model"]["beta_temp"],
        weight_neg=config["model"]["weight_neg"],
        weight_pos=config["model"]["weight_pos"],
        weight_UWE=config["model"]["weight_UWE"],
        neg_topk=config["model"]["neg_topk"],
        dropout=config["model"]["dropout"],
        embed_size=300
    )
  
    model = model.to(device)  
    trainer = topmost.DynamicTrainer(model, dataset, batch_size=config["training"]["batch_size"], learning_rate=config["training"]["learning_rate"], epochs=config["training"]["num_epoch"])
    top_words, _ = trainer.train()
    
    try:
        metrics_json, dynamic_TC, dynamic_TD, clustering, classification = eval(top_words, trainer)
        tested_configs[config_key] = 0.5 * dynamic_TC + 0.5 * dynamic_TD
        save_result(top_words, trainer, config, metrics_json)
        samples += 1
    except Exception as e:
        tested_configs[config_key] = float('-inf')
        print(e)
        
    save_joblib(tested_configs, os.path.join(output_dir, 'tested_configs.joblib'))
    
    return tested_configs[config_key]

## - Algortimo evolutivo

In [None]:
def evolutionary_search(search_space, generations=10, mutation_rate=0.1, population_size=20, population=None, generation_count=0):
    global samples
    
    if not population:
        population = [random_configuration(search_space) for _ in range(population_size - 1)]
        population.append(get_default_config())

    # print(population)
    
    for generation in range(generation_count, generations):
        scores = [(config, evaluate_configuration(config)) for config in population]
        scores.sort(key=lambda value: value[1], reverse=True)  

        joblib.dump(scores[0], f'best_per_generation_path_{generation}.joblib')
        print(f"🧬 Generación {generation + 1}, mejor resultado: {scores[0][1]:.4f}")
        
        num_parents = population_size // 2
        parents = [config for config, _ in scores[:num_parents]]
        
        children = []
        while len(children) < population_size - num_parents:
            parent_1, parent_2 = random.sample(parents, 2)
            child = {
                "model": {key: random.choice([parent_1["model"][key], parent_2["model"][key]]) for key in search_space["model"].keys()},
                "training": {key: random.choice([parent_1["training"][key], parent_2["training"][key]]) for key in search_space["training"].keys()},
            }
            
            if random.random() < mutation_rate:
                mutate_model_training = random.choice(list(search_space.keys()))
                print(mutate_model_training)
                param_to_mutate = random.choice(list(search_space[mutate_model_training].keys()))
                print(param_to_mutate)
                child[mutate_model_training][param_to_mutate] = random.choice(search_space[mutate_model_training][param_to_mutate])
            
            children.append(child)
        
        population = parents + children
        joblib.dump(population, os.path.join(output_dir, 'population.joblib'))
    
    best_config, best_score = max(scores, key=lambda x: x[1])
    return best_config, best_score

## Main

In [None]:
def load_json(path):
    with open(os.path.join(path, 'metrics_config.json'), "r") as file:
        return json.load(file)

In [None]:
import os
import json

results_path = '/kaggle/working/results'
results_path = [dir_.path for dir_ in os.scandir(results_path) if not dir_.is_file()]

print(len(results_path))

In [None]:
# population_size = 50
# # population = None
# population = [load_json(path) for path in results_path]
# population.sort(key=lambda value: value["metrics"]["dynamic_TC"] + value["metrics"]["dynamic_TD"], reverse=True)
# population = [item["config"] for item in population[:population_size]]
# print(population)
# joblib.dump(population, os.path.join(output_dir, 'population.joblib'))

In [None]:
population_size = 50
# population = None
population = joblib.load(os.path.join(output_dir, 'population.joblib'))

In [None]:
import joblib
# tested_configs = dict()
tested_configs = joblib.load('/kaggle/working/results/tested_configs.joblib')
# print(tested_configs)

In [None]:
best_per_generation_path = './best_per_generation_path'
os.makedirs(best_per_generation_path, exist_ok=True)

In [None]:
generation = len([dir_.path for dir_ in os.scandir(best_per_generation_path) if dir_.is_file()])

In [None]:
samples = len(results_path)

In [None]:
hyperparameter_space = get_hyperparameter_space()

best_config, best_score = evolutionary_search(hyperparameter_space, population_size=population_size, generations=100, mutation_rate=0.2, population=population, generation_count=generation)
print(best_config)
print(best_score)

In [None]:
best_per_generation = '/kaggle/working/best_per_generation_path'
best_per_generation = [dir_.path for dir_ in os.scandir(results_path) if not dir_.is_file()]
print(len(best_per_generation))

## Evaluando resultados

In [None]:
import os
import json

results_path = '/kaggle/working/results'
results_path = [dir_.path for dir_ in os.scandir(results_path) if not dir_.is_file()]
# results_path = [dir_.path for dir_ in os.scandir(results_path) if not dir_.is_file()]

print(results_path)

In [None]:
old_tc = float('-inf')
TC = []
old_td = float('-inf')
TD = []
old_tc_td = float('-inf')
TC_TD = []

for path in results_path:
    with open(os.path.join(path, 'metrics_json.json'), 'r') as file:
        metrics = json.load(file)

    cr_tc = metrics["dynamic_TC"]
    if cr_tc >= old_tc:
        with open(os.path.join(path, 'metrics_config.json'), 'r') as file:
            new = json.load(file)
        if cr_tc == old_tc:
            TC.append((new, path))
        else:
            old_tc = cr_tc
            TC = [(new, path)]

    cr_td = metrics["dynamic_TD"]
    if cr_td >= old_td:
        with open(os.path.join(path, 'metrics_config.json'), 'r') as file:
            new = json.load(file)
        if cr_td == old_td:
            TD.append((new, path))
        else:
            old_td = cr_td
            TD = [(new, path)]

    cr_tc_td = 0.5 * cr_tc + 0.5 * cr_td
    if cr_tc_td >= old_tc_td:
        with open(os.path.join(path, 'metrics_config.json'), 'r') as file:
            new = json.load(file)
        if cr_tc_td == old_tc_td:
            TC_TD.append((new, path))
        else:
            old_tc_td = cr_tc_td
            TC_TD = [(new, path)]

print(TC)
print(len(TC))
print('-------------------------------------')
print(TD)
print(len(TD))
print('-------------------------------------')
print(TC_TD)
print(len(TC_TD))

best_results_path = './best_results'
os.makedirs(best_results_path, exist_ok=True)
save_json(TC, os.path.join(best_results_path, 'best_tc.json'))
save_json(TD, os.path.join(best_results_path, 'best_td.json'))
save_json(TC_TD, os.path.join(best_results_path, 'best_tc_td.json'))

In [None]:
!zip -r best_results/best_results.zip /kaggle/working/results/0