In [1]:
from datasets import Dataset, ClassLabel
import pandas as pd
import re
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding
import torch
import torch.nn as nn
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import numpy as np

#1. Načtení CSV datasetu 
df2 = pd.read_csv("empathetic_dialogues.csv")
df2['text'] = df2['Situation'].fillna('') + " " + df2['empathetic_dialogues'].fillna('')

# Mapování emocí na integer labely
emotion2label = {
    "neutral": 1,
    "happy": 2,
    "sad": 0,
    "angry": 0,
    "fear": 0,
    "disgust": 0,
    "surprise": 2
}
df2['label'] = df2['emotion'].map(emotion2label).fillna(1).astype(int)

ds2 = Dataset.from_pandas(df2[['text','label']])

# 2. Maskování citlivých údajů 
def mask_sensitive(text: str) -> str:
    if not isinstance(text, str): return ""
    text = re.sub(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "<EMAIL>", text)
    text = re.sub(r"http\S+|www\.\S+", "<URL>", text)
    text = re.sub(r"\+?\d[\d\-\s]{7,}\d", "<PHONE>", text)
    text = re.sub(r"@[A-Za-z0-9_]+", "<USER>", text)
    text = re.sub(r"\b\d{4,}\b", "<NUMBER>", text)
    text = re.sub(r"(?<![.!?]\s)(?<!^)(\b[A-Z][a-z]+)", "<NAME>", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

ds2_clean = ds2.map(lambda x: {'text': mask_sensitive(x['text']), 'label': x['label']})
ds2_clean = ds2_clean.filter(lambda x: len(x['text'])>5)

# 3. Tokenizace 
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=128)

# Převod labelu na ClassLabel
new_features = ds2_clean.features.copy()
new_features['label'] = ClassLabel(names=["Negative","Neutral","Positive"])
ds2_clean = ds2_clean.cast(new_features)
tokenized_ds2 = ds2_clean.map(tokenize, batched=True)

# 4. Rozdělení datasetu 
split_ds2 = tokenized_ds2.train_test_split(test_size=0.2, stratify_by_column="label", seed=42)
train_ds2 = split_ds2['train']
testval_ds2 = split_ds2['test'].train_test_split(test_size=0.5, stratify_by_column="label", seed=42)
valid_ds2 = testval_ds2['train']
test_ds2 = testval_ds2['test']

print(f"Train2: {len(train_ds2)}, Validation2: {len(valid_ds2)}, Test2: {len(test_ds2)}")

# 5. Výpočet vah tříd s kontrolou chybějících tříd 
def compute_class_weights(dataset, num_labels=3):
    counter = Counter(dataset['label'])
    total = sum(counter.values())
    weights = []
    for i in range(num_labels):
        if i in counter:
            weights.append(total / (num_labels * counter[i]))
        else:
            weights.append(0.0)
    return torch.tensor(weights, dtype=torch.float)

class_weights2 = compute_class_weights(train_ds2)
print("Class weights for second dataset:", class_weights2)

# 6. Data collator 
data_collator = DataCollatorWithPadding(tokenizer)

# 7. Load previously trained model 
model = RobertaForSequenceClassification.from_pretrained("./roberta-sentiment2", num_labels=3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 8. Custom Weighted Trainer 
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels").to(device)
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = nn.CrossEntropyLoss(weight=class_weights2.to(device))
        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

# 9. TrainingArguments 
training_args2 = TrainingArguments(
    output_dir="./roberta-sentiment-empathetic",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs_empathetic",
    logging_steps=100,
)

# 10. Inicializace Traineru 
trainer2 = WeightedTrainer(
    model=model,
    args=training_args2,
    train_dataset=train_ds2,
    eval_dataset=valid_ds2,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=lambda eval_pred: {
        "accuracy": accuracy_score(eval_pred[1], np.argmax(eval_pred[0], axis=-1)),
        "precision": precision_score(eval_pred[1], np.argmax(eval_pred[0], axis=-1), average="weighted"),
        "recall": recall_score(eval_pred[1], np.argmax(eval_pred[0], axis=-1), average="weighted"),
        "f1": f1_score(eval_pred[1], np.argmax(eval_pred[0], axis=-1), average="weighted"),
    }
)

# 11. Continued fine-tuning 
trainer2.train()

# 12. Vyhodnocení na testovacích datech 
metrics2 = trainer2.evaluate(test_ds2)
print("Výsledky na testovacích datech (empathetic-dialogues):")
for k,v in metrics2.items():
    print(f"{k:<12}: {v:.4f}")


Map:   0%|          | 0/64636 [00:00<?, ? examples/s]

Filter:   0%|          | 0/64636 [00:00<?, ? examples/s]

Casting the dataset:   0%|          | 0/64636 [00:00<?, ? examples/s]

Map:   0%|          | 0/64636 [00:00<?, ? examples/s]

Train2: 51708, Validation2: 6464, Test2: 6464
Class weights for second dataset: tensor([4.7785, 0.3583, 0.0000])


  trainer2 = WeightedTrainer(


Step,Training Loss
100,0.7384
200,0.7491
300,0.6792
400,0.6239
500,0.6399
600,0.59
700,0.7109
800,0.6516
900,0.6355
1000,0.5681


Výsledky na testovacích datech (empathetic-dialogues):
eval_loss   : 0.3222
eval_accuracy: 0.9765
eval_precision: 0.9780
eval_recall : 0.9765
eval_f1     : 0.9771
eval_runtime: 12.0245
eval_samples_per_second: 537.5680
eval_steps_per_second: 33.5980
epoch       : 3.0000
