In [23]:
import torch
import random
from transformers import AutoTokenizer, AutoModel

model_name = "allegro/herbert-base-cased"
device = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
device

Some weights of the model checkpoint at allegro/herbert-base-cased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.decoder.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.dense.weight', 'cls.sso.sso_relationship.bias', 'cls.sso.sso_relationship.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


'cuda'

<!-- @format -->

# Subtask a) - Augmentacja mechaniczna


In [24]:
def get_embeddings_BERT(train_lines, test_lines, K=0, augment=True, f_a=lambda x: x):
    def representation(L):
        txt = " ".join(L)
        input_ids = tokenizer(txt, return_tensors="pt")["input_ids"].to(device)
        output = model(input_ids=input_ids)
        return output.last_hidden_state.detach().cpu().numpy()[0, 0, :]

    X_train = []
    y_train = []
    X_test = []
    y_test = []

    for line in train_lines:
        L = line.split()
        y = 0 if L[0] == "BAD" else 1

        x = representation(L[1:])
        y_train.append(y)
        X_train.append(x)

        if augment:
            for _ in range(K):
                x = representation(f_a(L[1:]))
                y_train.append(y)
                X_train.append(x)

    for line in test_lines:
        L = line.split()
        y = 0 if L[0] == "BAD" else 1

        x = representation(L[1:])
        y_test.append(y)
        X_test.append(x)
    return X_train, y_train, X_test, y_test

In [25]:
def spoil(L):
    res = []
    for w in L:
        if random.random() < 0.85:
            res.append(w)
        else:
            res.append(w.upper())
    return res


def mechanic_augment(L):
    res = []
    for w in L:
        if random.random() < 0.85:
            res.append(w)
        else:
            wl = list(w)
            if len(w) > 1:
                i1, i2 = random.sample(range(len(w)), 2)
                wl[i1], wl[i2] = wl[i2], wl[i1]
                wl[0] = wl[0].upper()
            res.append("".join(wl))
    return res

<!-- @format -->

# Subtask b) - Augmentacja generatywna


In [26]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

model_name_p = "flax-community/papuGaPT2"
tokenizer_papuga = AutoTokenizer.from_pretrained(model_name_p)
device = "cuda" if torch.cuda.is_available() else "cpu"
model_papuga = AutoModelForCausalLM.from_pretrained(model_name_p).to(device)

In [27]:
def generate_augmented_sentence(sentence: str):
    prompt = sentence.strip(".?!") + " oraz"
    model_inputs = tokenizer_papuga(prompt, return_tensors="pt", padding=False)
    model_inputs = model_inputs.to(device)
    attention_mask = model_inputs["attention_mask"].to(device)

    with torch.no_grad():
        generated_ids = model_papuga.generate(
            model_inputs["input_ids"],
            attention_mask=attention_mask,
            max_new_tokens=15,
            do_sample=True,
            top_k=10,
            top_p=0.90,
            pad_token_id=tokenizer_papuga.eos_token_id,
        )

        res = tokenizer_papuga.decode(generated_ids[0], skip_special_tokens=True)
        for i in range(len(prompt), len(res)):
            if res[i] in "?.!":
                return res[: i + 1]
        return res + "."


w = "Dzień dobry, co u ciebie słychać."
generate_augmented_sentence(w)

'Dzień dobry, co u ciebie słychać oraz jak sobie radzisz z tymi wszystkimi problemami.'

In [28]:
def get_embeddings_papuga(test_lines, train_lines, K=0, augment=False):
    def representation(L):
        txt = " ".join(L)
        input_ids = tokenizer(txt, return_tensors="pt")["input_ids"].to(device)
        output = model(input_ids=input_ids)
        return output.last_hidden_state.detach().cpu().numpy()[0, 0, :]

    X_train, y_train = [], []
    X_test, y_test = [], []

    for line in train_lines:
        L = line.split()
        y = 0 if L[0] == "BAD" else 1

        x = representation(L[1:])
        y_train.append(y)
        X_train.append(x)

        if augment:
            sent = "".join(L[1:])
            for _ in range(K):
                new_sent = generate_augmented_sentence(sent)
                x = representation(new_sent.split())
                y_train.append(y)
                X_train.append(x)

    for line in test_lines:
        L = line.split()
        y = 0 if L[0] == "BAD" else 1

        x = representation(L[1:])
        y_test.append(y)
        X_test.append(x)

    return X_train, y_train, X_test, y_test

<!-- @format -->

# Subtask c) - Word2Vec


<!-- @format -->

# Testy


In [35]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score
import numpy as np

In [36]:
def get_reviews(file_name):
    with open(file_name, "r", encoding="utf-8") as file:
        return [line.rstrip() for line in file.readlines()]


def split_data(lines, size_of_test=4):
    random.shuffle(lines)

    N = len(lines)
    test_size = N // size_of_test
    train_size = N - test_size

    train_lines = lines[:train_size]
    test_lines = lines[train_size:]

    return train_lines, test_lines


lines = get_reviews("reviews.txt")

train_lines, test_lines = split_data(lines)

<!-- @format -->

## Bez augmentacji


In [37]:
X_train, y_train, X_test, y_test = get_embeddings_BERT(
    train_lines, test_lines, K=2, augment=False
)
clf_no_aug = LogisticRegression(max_iter=1000, solver="lbfgs").fit(X_train, y_train)


y_pred = clf_no_aug.predict(X_test)

# ewaluacja wyników
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

# Printowanie wyników
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.79
Precision: 0.8095
Recall: 0.8500


<!-- @format -->

## Z augmentacją mechaniczną


In [38]:
X_train, y_train, X_test, y_test = get_embeddings_BERT(
    train_lines, test_lines, K=3, f_a=mechanic_augment
)


clf_BERT = LogisticRegression(max_iter=1000, solver="lbfgs").fit(X_train, y_train)



y_pred = clf_BERT.predict(X_test)

# ewaluacja wyników
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

# Printowanie wyników
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.82
Precision: 0.8500
Recall: 0.8500


<!-- @format -->

## Z augmentacją generatywną


In [39]:
X_train, y_train, X_test, y_test = get_embeddings_papuga(
    train_lines, test_lines, K=3, augment=True
)

In [40]:
clf_papuga = LogisticRegression(max_iter=1000, solver="lbfgs").fit(X_train, y_train)

y_pred = clf_papuga.predict(X_test)

# ewaluacja wyników
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

# Printowanie wyników
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.6833333333333333
Precision: 0.6433
Recall: 0.7214
