# RoBERTa

In [None]:
from tqdm import tqdm
import seaborn as sns
import numpy as np
import pandas as pd
from pathlib import Path
from io import StringIO
import json
import pickle
import matplotlib.pyplot as plt
import re
from collections import Counter

In [None]:
# !unzip datasets.zip

In [None]:
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    DataCollatorWithPadding,
    AutoModelForSequenceClassification,
)
from datasets import load_dataset
import datasets

In [None]:
# !pip install --upgrade pandas transformers datasets

## Transfer Learning

In [None]:
import torch
torch.cuda.get_device_name(0)

- https://github.com/rafalposwiata/depression-detection-lt-edi-2022

### Dataset

Loading and mergin dataset

In [None]:
data_text = ""

for filename in sorted(["ds", "ts_hs", "ts_ht"]):
    with open(
        Path("datasets", "task_0", f"{filename}.tsv"), "rt", encoding="utf8"
    ) as f:
        data_text += f.read()

df = pd.read_csv(StringIO(data_text), sep="\t")
df = df.drop_duplicates().reset_index(names="old_idx").reset_index(names="new_idx")

splitting the dataset in training and testing (following previous works)

In [None]:
lookup = df.set_index("old_idx")["new_idx"]
pth = Path("datasets", "task_0", "train_test_splitting.json")
idx = json.load(open(pth, "rt"))
idx = {k: [lookup[i] for i in lst if i in lookup] for k, lst in idx.items()}

In [None]:
x, y = df["pp_text"], df["label"]
x_train, x_test = x.loc[idx["train"]], x.loc[idx["test"]]
y_train, y_test = y.loc[idx["train"]].astype(int), y.loc[idx["test"]].astype(int)

converting the dataset in HuggingFace format

In [None]:
dataset = {
    "train": datasets.Dataset.from_list(
        [{"label": int(y), "text": str(x)} for y, x in zip(y_train, x_train)]
    ),
    "test": datasets.Dataset.from_list(
        [{"label": int(y), "text": str(x)} for y, x in zip(y_test, x_test)]
    ),
}

dataset = datasets.DatasetDict(dataset)

some example

In [None]:
dataset["test"][-1], dataset["test"][0]

### Tokenization

In [None]:
tokenizer_hf = AutoTokenizer.from_pretrained("ShreyaR/finetuned-roberta-depression")

In [None]:
def to_hf_tokens(examples):
    return tokenizer_hf(
        examples["text"],
        padding="max_length",
        truncation=True,
    )

In [None]:
tokenized_datasets = dataset.map(to_hf_tokens, batched=True)
tokenized_datasets.items()

In [None]:
# data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

### Fine Tuning

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    "ShreyaR/finetuned-roberta-depression",
    # "rafalposwiata/deproberta-large-depression",
)

Setting ```gradient accumulation``` if needed

In [None]:
training_args = TrainingArguments(
    "test-trainer",
    # no_cuda=True,
    seed=42,
    # per_device_train_batch_size=1,
    # gradient_accumulation_steps=8,
    # gradient_checkpointing=True,
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    # eval_dataset=tokenized_datasets["eval"],
    # data_collator=data_collator,
    # tokenizer=tokenizer,
)

In [None]:
# trainer.train()
# predictions = trainer.predict(tokenized_datasets["test"])

![image.png](attachment:image.png)

restoring results of predicion from pickle

In [None]:
y_fine = pickle.load(open("predictions/fine.pkl", "rb"))
y_raw = pickle.load(open("predictions/raw.pkl", "rb"))

### Predicition

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [None]:
y_true = y_raw[1]

Using 0.5 as threshold to convert logit to binary predictions

In [None]:
y_fine = [ 1 if p < n else 0 for p, n in y_fine[0]]
y_raw = [ 1 if p < n else 0 for p, n in y_raw[0]]

#### Before Fine Tuning

In [None]:
conf_matrix = confusion_matrix(y_true=y_true, y_pred=y_raw)

fig, ax = plt.subplots(figsize=(2, 2))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(
            x=j,
            y=i,
            s=conf_matrix[i, j],
            va="center",
            ha="center",
            size="large",
        )

plt.xlabel("Predictions", fontsize=14)
plt.ylabel("Actuals", fontsize=14)
plt.show()


In [None]:
print("Precision:", precision_score(y_true, y_raw))
print("Recall: ", recall_score(y_true, y_raw))
print("Accuracy: ", accuracy_score(y_true, y_raw))
print("F1 Score: ", f1_score(y_true, y_raw))

#### After Fine Tuning

In [None]:
conf_matrix = confusion_matrix(y_true=y_true, y_pred=y_fine)

fig, ax = plt.subplots(figsize=(2, 2))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(
            x=j,
            y=i,
            s=conf_matrix[i, j],
            va="center",
            ha="center",
            size="large",
        )

plt.xlabel("Predictions", fontsize=14)
plt.ylabel("Actuals", fontsize=14)
plt.show()


In [None]:
print("Precision:", precision_score(y_true, y_fine))
print("Recall: ", recall_score(y_true, y_fine))
print("Accuracy: ", accuracy_score(y_true, y_fine))
print("F1 Score: ", f1_score(y_true, y_fine))


saving classification categories, for later purpose

In [None]:
group = []
for i, v in enumerate(zip(y_fine, y_true)):
    match v:
        case (0, 0):
            group.append("TN")
        case (1, 1):
            group.append("TP")
        case (0, 1):
            group.append("FN")
        case (1, 0):
            group.append("FP")
        case _:
            raise ValueError("!")

## Explain Results

### Hidden Embeddings

To give an interpretation of how RoBERTa has predicted labels, we're going to use a lexical approach.

LIWC assigns words to different categories, such as pronouns, affective words, and cognitive processes, and provides a numerical score for each category based on the frequency of the words in the text.

In [None]:
import liwc
parse, category_names = liwc.load_token_parser('dic/LIWC2007_English080730.dic')

In [None]:
K = len(category_names)
N = len(x_test)

kat_lookup = dict(zip(category_names, range(K)))

defining a function to stem / tokenize words, in order to let liwc match and count them

In [None]:
def to_liwc_tokens(text):
    for match in re.finditer(r"\w+", text, re.UNICODE):
        yield match.group(0)

extracting the last hidden embeddings

In [None]:
voc2hidden = trainer.model.get_input_embeddings()

Extracting lexical embedding and RoBERTa embadding for each document

In [None]:
y_lex = []
x_sem = []
w_grp = []

for n in tqdm(range(N)):
    text = x_test.iloc[n]
    kat_freq = np.zeros(K)

    for t in to_liwc_tokens(text):
        for m in parse(t):
            k = kat_lookup[m]
            kat_freq[k] += 1

    s = kat_freq.sum()
    if not s:
        continue

    kat_freq /= s

    input_ids = tokenizer_hf.encode(text, return_tensors="pt").to("cuda")
    tens = voc2hidden(input_ids)
    if len(tens[0]) > 512:
        continue
    cls_token = tens[0][0]

    x_sem.append(cls_token.to("cpu").detach().numpy())
    y_lex.append(kat_freq)
    w_grp.append(group[n])


y_lex = np.array(y_lex)
x_sem = np.array(x_sem)

Regressing lexical embedding over semantic embedding with Ridge Regression

The ridge regression still take into account all the independent variable (rather than the ElasticNet), but penalize the  less relevant ones.

In [None]:
from sklearn.linear_model import ElasticNet, Ridge, LinearRegression

In [None]:
# e_reg = ElasticNet(random_state=42)
# e_reg.fit(x_sem, y_lex)

# print(e_reg.coef_)
# # print(e_reg.intercept_)

In [None]:
ridge_reg = Ridge(alpha=0.05)
ridge_reg.fit(x_sem, y_lex)

# print(ridge_reg.coef_)
# print(ridge_reg.intercept_)

Plotting the coefficients of hidden semantic embeddings for each dependent lexical probability (matrix shape only for visualization purpose)

In [None]:
def plot_coeff(category):
    i = category_names.index(category)
    ax = sns.heatmap(
        (ridge_reg.coef_[i]//1e-11).reshape((32, 24)),
        cmap=sns.color_palette("vlag", as_cmap=True),
    )
    ax.set(
        xticklabels=[],
        yticklabels=[],
        title=category.upper(),
    )
    ax.tick_params(bottom=False, left=False)

In [None]:
plot_coeff("death")

In [None]:
plot_coeff("posemo")

For well known lexical opposite, coefficient shoud be distant:

checking by element-wise product (smaller is better)

In [None]:
for a, b in [["posemo", "negemo"], ["i", "past"], ["adverb", "negate"]]:
    i = category_names.index(a)
    j = category_names.index(b)
    v = np.multiply(ridge_reg.coef_[i], ridge_reg.coef_[j])
    # v = np.dot(ridge_reg.coef_[i], ridge_reg.coef_[j])
    print(v.max())

### Attention

In [None]:
from wordcloud import WordCloud

In [None]:
TEST_0 = "One of the most important things you could realize is that I found me alone"
TEST_1 = x_test.iloc[3]

Extracting attention from CLS to other tokens of the last RoBERTa layer

In [None]:
def word_prob(text, to_filter=False):
    input_ids = tokenizer_hf.encode(text, return_tensors="pt").to("cuda")
    if input_ids.shape[-1] > 512:
        return [], []
    outputs = trainer.model(
        input_ids,
        output_attentions=True,
        # attention_mask=attention_mask,
    )
    attention_weights = outputs[-1]
    attention_norms = attention_weights[-1][0, :, :, :].norm(dim=0)
    word_score = attention_norms[0, 1:-1].cpu().detach().numpy()

    word_name = [tokenizer_hf.decode(e).lower().strip() for e in input_ids[0, 1:-1]]

    if to_filter:
        word_score, word_name = list(
            zip(
                *[
                    (word_score[i], word_name[i])
                    for i in range(len(word_name))
                    if len(word_name[i]) > 1 and not word_name[i].startswith("depress")
                ]
            )
        )

    return word_name, word_score


In [None]:
def plot_text_wc(text, score=None):
    wc = WordCloud()
    if score is None:
        wc.generate(text)
    else:
        wc.generate_from_frequencies(dict(zip(text, score)))

    plt.figure()
    plt.imshow(wc)
    plt.axis("off")
    plt.show()


Plottig word cloud were words are weighted by attention

In [None]:
word_name, word_score = word_prob(TEST_0)
plot_text_wc(word_name, word_score)

In [None]:
word_name, word_score = word_prob(TEST_1, to_filter=True)
plot_text_wc(word_name, word_score)

Plottig word cloud for False Positive / False Nagative class 

In [None]:
test_fp = [x_test.iloc[i] for i,g in enumerate(w_grp) if g == "FP"]
test_fn = [x_test.iloc[i] for i,g in enumerate(w_grp) if g == "FN"]
test_tp = [x_test.iloc[i] for i,g in enumerate(w_grp) if g == "TP"]

In [None]:
def plot_top_k_words(a, how_many=4):
    bag = []
    for text in a:
        word_name, word_score = word_prob(text)
        if len(word_name) < how_many:  # phrase too short
            continue
        for i in np.argpartition(word_score, -how_many)[-how_many:]:
            word = word_name[i]
            if len(word) > 2 and not word.startswith("depress"):
                bag.append(word)

    f_text = " ".join(bag)
    plot_text_wc(f_text)


Visualizing how attention wordclouds are different between classification groups

In [None]:
plot_top_k_words(test_tp, how_many=1)

In [None]:
plot_top_k_words(test_fp)

In [None]:
plot_top_k_words(test_fn)

### failed attempts - similarities

In [None]:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.spatial import distance

import plotly.express as px

Here we compute the cosine similiarity between all pairs of document embeddings

In [None]:
M = len(y_lex)
cosine_mx = np.zeros((M,M))
for i in tqdm(range(M)):
    for j in range(M):
        cosine_mx[i,j] = distance.cosine(y_lex[i], y_lex[j])

We reduce the dimentionality of the embeddings to visualize if missclassification is somehow related to embedding position

In [None]:
NC = 5

In [None]:
# X_embedded = TSNE(n_components=NC, learning_rate='auto',
#                   init='random', perplexity=3).fit_transform(cosine_mx)
# X_embedded.shape

In [None]:
pca = PCA(n_components=NC)
new_coord = pca.fit_transform([e for e,g in zip(cosine_mx,w_grp) if g != "TN"])

In [None]:
labels = {
    str(i): f"PC {i+1} ({var:.1f}%)"
    for i, var in enumerate(pca.explained_variance_ratio_ * 100)
}

fig = px.scatter_matrix(
    new_coord, # X_embedded,
    labels=labels,
    dimensions=range(NC),
    color=[g for g in w_grp if g != "TN"],
    height=1000,
)
fig.update_traces(diagonal_visible=False)
fig.show()

tf idf

In [None]:
vect = TfidfVectorizer()
tfidf_mx = vect.fit_transform(x_test)
words = vect.get_feature_names_out()

In [None]:
# a, b = tfidf_mx.shape
# tiv = np.zeros((a,b))
# for i in tqdm(range(a)):
#     for j in range(b):
#         tiv = tfidf_mx[i,j]