In [None]:
import os
import json

# Entidades válidas
valid_labels = {
    'B-LOCAL', 'I-LOCAL',
    'B-ORGANIZACAO', 'I-ORGANIZACAO',
    'B-PESSOA', 'I-PESSOA',
    'B-TEMPO', 'I-TEMPO',
}


# Pasta base onde estão as partições
base_dir = "../Base de Dados/Paramopama/divisions/"
output_dir = "../Base de Dados/Paramopama/divisions/"
os.makedirs(output_dir, exist_ok=True)

# Função para converter BIO para JSON estruturado
def convert_bio_file_to_json(txt_path):
    with open(txt_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    samples = []
    tokens, labels = [], []

    for line in lines:
        line = line.strip()
        if not line:
            if tokens:
                samples.append((tokens, labels))
                tokens, labels = [], []
            continue
        parts = line.split()
        if len(parts) == 2:
            token, label = parts
            tokens.append(token)
            labels.append(label)
    
    # última frase
    if tokens:
        samples.append((tokens, labels))

    data_json = []
    for tokens, labels in samples:
        text = " ".join(tokens)
        entities = []
        i = 0
        while i < len(labels):
            if labels[i].startswith("B-"):
                label_type = labels[i][2:]
                entity_tokens = [tokens[i]]
                i += 1
                while i < len(labels) and labels[i].startswith("I-") and labels[i][2:] == label_type:
                    entity_tokens.append(tokens[i])
                    i += 1
                entities.append({
                    "text": " ".join(entity_tokens),
                    "label": label_type
                })
            else:
                i += 1
        data_json.append({
            "text": text,
            "entities": entities
        })
    
    return data_json

# Iterar sobre os 10 arquivos de divisão e salvar em JSON
for i in range(10):
    txt_file = os.path.join(base_dir, f"division_{i}.txt")
    json_file = os.path.join(output_dir, f"division_{i}.json")
    json_data = convert_bio_file_to_json(txt_file)
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump(json_data, f, indent=2, ensure_ascii=False)
    print(f"Convertido: {txt_file} → {json_file}")



In [None]:
import os
import json
import torch
from tqdm import tqdm
from datasets import Dataset
from sklearn.metrics import classification_report
from sklearn.preprocessing import MultiLabelBinarizer
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

# ========== CONFIGURAÇÕES ==========
MODEL_NAME = "/home/juliaribeiro/Qwen3-14B"
MAX_LEN = 2048
NUM_EPOCHS = 3
BASE_DIR = "../Base de Dados/UlyssesNER-BR/json_divisions/"
LABELS = [
    "DATA",
    "EVENTO",
    "FUNDapelido",
    "FUNDlei",
    "FUNDprojetodelei",
    "LOCALconcreto",
    "LOCALvirtual",
    "ORGgovernamental",
    "ORGnaogovernamental",
    "ORGpartido",
    "PESSOAcargo",
    "PESSOAgrupocargo",
    "PESSOAindividual",
    "PRODUTOoutros",
    "PRODUTOprograma",
    "PRODUTOsistema"
]


os.makedirs("modelos_por_fold", exist_ok=True)
open("rrelatorio_folds.txt", "a", encoding="utf-8").close()

mlb = MultiLabelBinarizer(classes=LABELS)

# ========== FUNÇÕES AUXILIARES ==========

def load_data(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def prepare_dataset(data):
    samples = []
    for item in data:
        ents = item.get("entities", [])
        ents_fmt = [{"text": e["text"], "label": e["label"]} for e in ents if e["label"] in LABELS]
        if ents_fmt:
            samples.append({
                "instruction": "Extraia entidades nomeadas do texto abaixo e retorne em JSON com campos 'text' e 'label'.",
                "input": item["text"],
                "output": json.dumps(ents_fmt, ensure_ascii=False),
                "labels": [e["label"] for e in ents_fmt]
            })
    return Dataset.from_list(samples)

def tokenize(example, tokenizer):
    prompt = f"{example['instruction']}\nTexto: {example['input']}\nEntidades:"
    full_prompt = prompt + " " + example["output"]

    tokenized = tokenizer(full_prompt, truncation=True, padding="max_length", max_length=MAX_LEN)
    tokenized["labels"] = tokenized["input_ids"].copy()
    tokenized["true_labels"] = example["labels"]

    try:
        entidades = json.loads(example["output"])
        if isinstance(entidades, list):
            tokenized["true_entities"] = entidades
        else:
            tokenized["true_entities"] = []
    except Exception:
        tokenized["true_entities"] = []

    tokenized["input_text"] = example["input"]
    return tokenized

# ========== CHECKPOINT DE FOLD ==========

CHECKPOINT_FILE = "uultimo_fold.txt"

def read_last_fold():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            try:
                return int(f.read().strip())
            except:
                return 0
    return 0

def write_last_fold(fold):
    with open(CHECKPOINT_FILE, "w") as f:
        f.write(str(fold))

# ========== MAIN: VALIDAÇÃO CRUZADA ==========

def main():
    division_files = [f"division_{i}.json" for i in range(10)]
    all_reports = []
    start_fold = read_last_fold()

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=False, local_files_only=True)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    for fold in range(start_fold, 10):
        if os.path.exists(f"rresultados_fold_{fold}.json"):
            print(f"Fold {fold} já processado. Pulando...")
            with open(f"rresultados_fold_{fold}.json", "r", encoding="utf-8") as f:
                all_reports.append(json.load(f))
            continue

        print(f"\n=========== FOLD {fold} ===========")

        test_file = os.path.join(BASE_DIR, division_files[fold])
        train_files = [os.path.join(BASE_DIR, f) for i, f in enumerate(division_files) if i != fold]

        test_data = load_data(test_file)
        train_data = []
        for tf in train_files:
            train_data.extend(load_data(tf))

        ds_train = prepare_dataset(train_data)
        ds_test = prepare_dataset(test_data)

        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_use_double_quant=True,
            bnb_4bit_compute_dtype=torch.float16,
        )


        
        model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME,
            quantization_config=bnb_config,
            torch_dtype=torch.float16,
            device_map="auto",
            offload_buffers=True, 
            trust_remote_code=True,
            local_files_only=True
        )

        model = prepare_model_for_kbit_training(model)

        peft_config = LoraConfig(
            r=16,
            lora_alpha=64,
            lora_dropout=0.05,
            bias="none",
            task_type="CAUSAL_LM",
            target_modules=["q_proj", "k_proj", "v_proj", "o_proj"]
        )
        model = get_peft_model(model, peft_config)

        tokenized_train = ds_train.map(lambda x: tokenize(x, tokenizer), batched=False)
        tokenized_test = ds_test.map(lambda x: tokenize(x, tokenizer), batched=False)

        training_args = TrainingArguments(
            output_dir=f"modelos_por_fold/fold_{fold}",
            per_device_train_batch_size=1,
            num_train_epochs=NUM_EPOCHS,
            learning_rate=1e-4,
            logging_steps=10,
            bf16=True,
            save_strategy="no",
            eval_strategy="no",
            report_to="none",
            push_to_hub=False
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_train,
            tokenizer=tokenizer,
        )

        trainer.train()
        print(tokenized_test[0])

        model.eval()
        y_true_list, y_pred_list, predicted_entities_list = [], [], []

        for item in tqdm(tokenized_test, desc=f"Avaliando Fold {fold}"):
            input_text = f"{item['instruction']}\nTexto: {item['input']}\nEntidades:"
            inputs = tokenizer(input_text, return_tensors="pt", padding=True, truncation=True, max_length=MAX_LEN).to(model.device)
            output_ids = model.generate(**inputs, max_new_tokens=256)
            output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)

            try:
                start = output_text.find("[")
                json_output = json.loads(output_text[start:]) if start != -1 else []
                predicted = [e["label"] for e in json_output if isinstance(e, dict) and e.get("label") in LABELS]
            except Exception:
                json_output = []
                predicted = []

            y_true_list.append(item["true_labels"])
            y_pred_list.append(predicted)
            predicted_entities_list.append(json_output)

        
        with open(f"eerros_fold_{fold}.txt", "w", encoding="utf-8") as f_err:
            for i, (true, pred) in enumerate(zip(y_true_list, y_pred_list)):
                if set(true) != set(pred):
                    item = tokenized_test[i]
                    f_err.write("-" * 80 + "\n")
                    f_err.write(f"Texto:\n{item['input']}\n\n")
                    f_err.write("Entidades verdadeiras:\n")
                    f_err.write(json.dumps(item.get("true_entities", []), ensure_ascii=False, indent=2))
                    f_err.write("\n\nEntidades previstas:\n")
                    f_err.write(json.dumps(predicted_entities_list[i], ensure_ascii=False, indent=2))
                    f_err.write("\n\n")


        y_true_bin = mlb.fit_transform(y_true_list)
        y_pred_bin = mlb.transform(y_pred_list)

        report = classification_report(y_true_bin, y_pred_bin, target_names=LABELS, output_dict=True, zero_division=0)
        all_reports.append(report)

        with open(f"rresultados_fold_{fold}.json", "w", encoding="utf-8") as f:
            json.dump(report, f, ensure_ascii=False, indent=2)

        with open("rrelatorio_folds.txt", "a", encoding="utf-8") as f:
            f.write(f"\n====== FOLD {fold} ======\n")
            for label in LABELS:
                f.write(f"- {label}: Precision={report[label]['precision']:.3f}, Recall={report[label]['recall']:.3f}, F1={report[label]['f1-score']:.3f}\n")
            f.write(f"\nMacro Avg: Precision={report['macro avg']['precision']:.3f}, Recall={report['macro avg']['recall']:.3f}, F1={report['macro avg']['f1-score']:.3f}\n")
            f.write(f"Micro Avg: Precision={report['micro avg']['precision']:.3f}, Recall={report['micro avg']['recall']:.3f}, F1={report['micro avg']['f1-score']:.3f}\n")
            f.write("="*50 + "\n")

        write_last_fold(fold + 1)

        import gc
        del model
        del trainer
        torch.cuda.empty_cache()
        gc.collect()

    final_report = {}
    for label in LABELS:
        precision = sum(r[label]['precision'] for r in all_reports) / len(all_reports)
        recall = sum(r[label]['recall'] for r in all_reports) / len(all_reports)
        f1 = sum(r[label]['f1-score'] for r in all_reports) / len(all_reports)
        final_report[label] = {"precision": precision, "recall": recall, "f1": f1}

    macro_avg = {
        "precision": sum(r["macro avg"]["precision"] for r in all_reports) / len(all_reports),
        "recall": sum(r["macro avg"]["recall"] for r in all_reports) / len(all_reports),
        "f1": sum(r["macro avg"]["f1-score"] for r in all_reports) / len(all_reports),
    }

    micro_avg = {
        "precision": sum(r["micro avg"]["precision"] for r in all_reports) / len(all_reports),
        "recall": sum(r["micro avg"]["recall"] for r in all_reports) / len(all_reports),
        "f1": sum(r["micro avg"]["f1-score"] for r in all_reports) / len(all_reports),
    }

    with open("rrelatorio_final.txt", "w", encoding="utf-8") as f:
        f.write("=== MÉDIAS DOS 10 FOLDS ===\n\n")
        for label, scores in final_report.items():
            f.write(f"{label}: Precision={scores['precision']:.3f}, Recall={scores['recall']:.3f}, F1={scores['f1']:.3f}\n")
        f.write(f"\nMacro Avg:\nPrecision={macro_avg['precision']:.3f}, Recall={macro_avg['recall']:.3f}, F1={macro_avg['f1']:.3f}\n")
        f.write(f"\nMicro Avg:\nPrecision={micro_avg['precision']:.3f}, Recall={micro_avg['recall']:.3f}, F1={micro_avg['f1']:.3f}\n")

    print("\u2705 Processamento completo. Relatórios gerados.")

if __name__ == "__main__":
    main()





Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

Map:   0%|          | 0/1606 [00:00<?, ? examples/s]

Map:   0%|          | 0/207 [00:00<?, ? examples/s]

  trainer = Trainer(
No label_names provided for model class `PeftModelForCausalLM`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.
`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
  return fn(*args, **kwargs)


Step,Training Loss


In [1]:
import requests
import json

def extrair_entidades_gemma(texto, modelo="gemma:2b"):
    prompt = (
        "Extraia as entidades nomeadas do texto abaixo e retorne apenas um JSON com a lista das entidades, "
        "cada uma com o texto e o tipo.\n\n"
        "Exemplo:\n"
        "Texto: John Lennon era músico.\n"
        "Entidades: [{\"texto\": \"John Lennon\", \"tipo\": \"Pessoa\"}]\n\n"
        "Agora extraia as entidades do texto:\n"
        f"Texto: {texto}\n"
        "Entidades:"
    )

    url = "http://localhost:11434/api/generate"
    payload = {
        "model": modelo,
        "prompt": prompt,
        "max_tokens": 100,
        "temperature": 0.0
    }

    response = requests.post(url, json=payload)
    response.raise_for_status()

    resposta_texto = response.json().get("response", "")
    
    # Tenta extrair o JSON das entidades do texto retornado
    try:
        # Encontrar a primeira ocorrência de [ para tentar extrair a lista JSON
        start = resposta_texto.find("[")
        end = resposta_texto.rfind("]") + 1
        json_str = resposta_texto[start:end]
        entidades = json.loads(json_str)
    except Exception as e:
        print("Erro ao extrair JSON:", e)
        entidades = None

    return entidades

# Exemplo de uso:
texto_exemplo = "Albert Einstein nasceu na Alemanha."
entidades = extrair_entidades_gemma(texto_exemplo)
print(entidades)



JSONDecodeError: Extra data: line 2 column 1 (char 97)

In [8]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("/home/juliaribeiro/Qwen3-14B/Qwen3-14B", trust_remote_code=False, local_files_only=True)
print("✅ Tokenizer carregado com sucesso!")
print(tokenizer("exemplo de texto"))


Exception: expected value at line 1 column 1

In [None]:
import transformers

assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast)

In [None]:
tokenizer("Olá, isto é uma sentença!")

In [None]:
example = ner_dataset["train"][2]
print(example["tokens"])

In [None]:
tokenized_input = tokenizer(example["tokens"], is_split_into_words=True, truncation=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
print(tokens)

In [None]:
len(example[f"ner_tags"]), len(tokenized_input["input_ids"])

In [None]:
print(tokenized_input.word_ids())

In [None]:
word_ids = tokenized_input.word_ids()
aligned_labels = [-100 if i is None else example[f"ner_tags"][i] for i in word_ids]
print(len(aligned_labels), len(tokenized_input["input_ids"]))

In [None]:
label_all_tokens = True

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )

    labels = []
    for i, label in enumerate(examples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word id that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
tokenized_datasets = ner_dataset.map(tokenize_and_align_labels, batched=True, load_from_cache_file=False)

In [None]:
tokenized_datasets["train"]["labels"][2]

In [None]:
from transformers import TFAutoModelForTokenClassification

model = TFAutoModelForTokenClassification.from_pretrained(
    model_id, num_labels=len(label_list), from_pt=True
)

In [None]:
batch_size = 8

In [None]:
from transformers import create_optimizer

num_train_epochs = 3
num_train_steps = (len(tokenized_datasets["train"]) // batch_size) * num_train_epochs
optimizer, lr_schedule = create_optimizer(
    init_lr=2e-5,
    num_train_steps=num_train_steps,
    weight_decay_rate=0.01,
    num_warmup_steps=0,
)

In [None]:
import tensorflow as tf

model.compile(optimizer=optimizer)

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer, return_tensors="tf")

In [None]:
train_set = tokenized_datasets["train"].to_tf_dataset(
    columns=["attention_mask", "input_ids", "labels"],
    shuffle=True,
    batch_size=batch_size,
    collate_fn=data_collator,
)
validation_set = tokenized_datasets["test"].to_tf_dataset(
    columns=["attention_mask", "input_ids", "labels"],
    shuffle=False,
    batch_size=batch_size,
    collate_fn=data_collator,
)
test_set = tokenized_datasets["validation"].to_tf_dataset(
    columns=["attention_mask", "input_ids", "labels"],
    shuffle=False,
    batch_size=batch_size,
    collate_fn=data_collator,
)

In [None]:
import numpy as np
from datasets import load_metric
from transformers.keras_callbacks import KerasMetricCallback

metric = load_metric("seqeval")
labels = [label_list[i] for i in example[f"ner_tags"]]
metric.compute(predictions=[labels], references=[labels])


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }


metric_callback = KerasMetricCallback(
    metric_fn=compute_metrics, eval_dataset=validation_set
)

In [None]:
from transformers.keras_callbacks import PushToHubCallback
# from tensorflow.keras.callbacks import TensorBoard

# model_name = model_checkpoint.split("/")[-1]
# push_to_hub_model_id = f"{model_name}-finetuned-{task}"

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./tc_model_save/logs")

# push_to_hub_callback = PushToHubCallback(
#     output_dir="./tc_model_save",
#     tokenizer=tokenizer,
#     hub_model_id=push_to_hub_model_id,
# )

callbacks = [metric_callback, tensorboard_callback]

model.fit(
    
    train_set,
    validation_data=validation_set,
    epochs=num_train_epochs,
    callbacks=callbacks,
)

In [None]:
import os

db_path = "optuna_study.db"

if os.path.exists(db_path):
    os.remove(db_path)
    print("Banco de dados deletado com sucesso.")
else:
    print("O banco de dados não existe.")





In [None]:
import sqlite3

conn = sqlite3.connect("optuna_study.db")
cursor = conn.cursor()

cursor.execute("SELECT * FROM trials")
print(cursor.fetchall())

conn.close()




In [None]:
model.predict(validation_set)

In [None]:
model.save_pretrained("./modelNER")

In [None]:
model.config.id2label.values()

In [None]:
from datasets import load_metric
import numpy as np


metric = load_metric("seqeval")


def evaluate(model, dataset, ner_labels):
  all_predictions = []
  all_labels = []
  for batch in dataset:
      logits = model.predict(batch)["logits"]
      labels = batch["labels"]
      predictions = np.argmax(logits, axis=-1)
      for prediction, label in zip(predictions, labels):
          for predicted_idx, label_idx in zip(prediction, label):
              if label_idx == -100:
                #   print(label)
                  continue
              all_predictions.append(ner_labels[predicted_idx])
              all_labels.append(ner_labels[label_idx])
              #print('\npredicted=',ner_labels[predicted_idx], '\nlabel=',ner_labels[label_idx])
  #print("all_predictions=",[all_predictions],'\nall_labels=',[all_labels])
  return metric.compute(predictions=[all_predictions], references=[all_labels])

#results = evaluate(model, tf_eval_dataset, ner_labels=list(model.config.id2label.values()))
results = evaluate(model, test_set, ner_labels=list(model.config.id2label.values()))
results