<a href="https://colab.research.google.com/github/matchten/textmsg-analyzer/blob/main/textmsg_analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import Packages
!pip install transformers
!pip install datasets --upgrade
!pip install evaluate
!pip install transformers[torch]
!pip install numpy

In [None]:
# Load original dataset
from transformers.pipelines.pt_utils import KeyDataset
from datasets import concatenate_datasets, load_dataset
from datasets import Dataset, DatasetDict
import numpy as np

new_data = load_dataset("conv_ai_2")

# Preprocess new dataset to convert input into string, not dict

new_data = new_data.select_columns(["dialog", "eval_score"])
new_data = new_data.rename_column("dialog", "text")
new_data = new_data.rename_column("eval_score", "label")
new_data = new_data.filter(lambda x: x["label"] >= 0)


def dict_to_string(example):
    text = "\""
    for message in example["text"]:
        if message["sender_class"] == "Human":
            text += message["text"] + " "
    text = text[:-1]
    text += "\""
    example["text"] = text
    return example

def normalize_labels(example):
    # 0 -> negative, 1 -> neutral, 2 -> positive
    score = example["label"]
    if score < 3:
        example["label"] = 0
    elif score == 3:
        example["label"] = 1
    else:
        example["label"] = 2
    return example

new_data = new_data.map(dict_to_string)
new_data = new_data.map(normalize_labels)

new_data = new_data["train"].train_test_split(test_size = 0.1)

# print(new_data)

In [None]:
# Additional finetune dataset consisting of daily dialogue and emotion sequences
additional_data = load_dataset("daily_dialog")
additional_data = additional_data.select_columns(["dialog", "emotion"])
additional_data = additional_data.rename_column("dialog", "text")
additional_data = additional_data.rename_column("emotion", "label")

def list_to_string(example):
    text = ""
    for message in example["text"]:
        text += message
    example["text"] = text
    return example

def normalize_labels(example):
    emotion_sequence = example["label"]
    for i in range(len(emotion_sequence)):
        emotion = emotion_sequence[i]
        if emotion == 1 or 2 or 3 or 5:
            emotion_sequence[i] = 0
        elif emotion == 0:
            emotion_sequence[i] = 1
        else:
            emotion_sequence[i] = 2

    normalized_emotion = round(sum(emotion for emotion in emotion_sequence)/len(emotion_sequence))
    example["label"] = np.int32(normalized_emotion)
    return example

additional_data = additional_data.map(list_to_string)
additional_data = additional_data.map(normalize_labels)
additional_data["test"] = concatenate_datasets([additional_data["test"], additional_data["validation"]])
del additional_data["validation"]

# print(additional_data)

In [None]:
# Combine the datasets to form a finetuned dataset
finetune_train = concatenate_datasets([new_data["train"], additional_data["train"]])
finetune_test = concatenate_datasets([new_data["test"], additional_data["test"]])

# print(finetune_train)
# print(finetune_test)

In [None]:
# Tokenizing data
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig

MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
config = AutoConfig.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL, num_labels = 3)

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length = 512, return_tensors = 'pt')

tokenized_finetune_train = finetune_train.map(tokenize_function, batched=True)
tokenized_finetune_test = finetune_test.map(tokenize_function, batched=True)

# print(tokenized_finetune_train)
# print(tokenized_finetune_test)

In [7]:
# Creating Model
import numpy as np
import evaluate
import accelerate
from transformers import TrainingArguments, Trainer

metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

training_args = TrainingArguments(
    output_dir="test_trainer",
    evaluation_strategy="epoch",
    num_train_epochs=10,
    per_device_train_batch_size = 15,
    per_device_eval_batch_size = 8,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_finetune_train,
    eval_dataset=tokenized_finetune_test,
    compute_metrics=compute_metrics,
)


In [None]:
trainer.train()