In [2]:
# import os
# import re
# import random
# import subprocess
import warnings
# import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt
import torch
from typing import Literal
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AddedToken
# from transformers import TrainingArguments, Trainer
# from transformers import DataCollatorWithPadding
# from transformers import TextClassificationPipeline
# from transformers.pipelines.pt_utils import KeyDataset
# from datasets import Dataset
# from sklearn.model_selection import train_test_split
# from sklearn.model_selection import StratifiedKFold
# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import cohen_kappa_score
# from IPython.display import FileLink, display

warnings.simplefilter('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
class VotingModel(torch.nn.Module):
    def __init__(
        self,
        model_paths: list,
        tokenizer_path: str = None,
        voting_mode: Literal["mean", "linear"] = "mean",
        linear_layer_num: int = 0,  # if voting_mode is "linear"
    ) -> None:
        super(VotingModel, self).__init__()

        self.models = [
            AutoModelForSequenceClassification.from_pretrained(path, num_labels=1)
            for path in model_paths
        ]
        self.tmp_linear = torch.nn.Linear(6, 1).to(
            torch.device("cuda" if torch.cuda.is_available() else "cpu")
        )
        # self.tokenizer = (
        #     AutoTokenizer.from_pretrained(tokenizer_path, trucation=True, padding=True, trunca)
        #     if tokenizer_path
        #     else AutoTokenizer.from_pretrained("microsoft/deberta-base")
        # )

        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
        self.tokenizer.add_tokens([AddedToken("\n", normalized=False)])
        self.tokenizer.add_tokens([AddedToken(" "*2, normalized=False)])

        assert (
            linear_layer_num >= 0
        ), "linear_layer_num must be greater than or equal to 0"
        self.voting_mode = "mean" if linear_layer_num == 0 else "linear"

        num_of_models = len(self.models)
        if self.voting_mode == "linear":
            self.linear_list = [
                torch.nn.Sequential(
                    torch.nn.Linear(num_of_models, num_of_models), torch.nn.ReLU()
                )
                for _ in range(linear_layer_num - 1)
            ] + [torch.nn.Linear(num_of_models, 1)]
            self.linears = torch.nn.Sequential(*self.linear_list)
            print(self.linears)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # initialize settings of models
        for model in self.models:
            model.to(self.device)
            model.eval()
            # model.parameters().requires_grad = False  # freeze all pre-trained layers

        if self.voting_mode == "linear":
            self.linears.to(self.device)

    def forward(self, X):
        X = self.tokenizer(X, return_tensors="pt", padding=True, truncation=True, max_length=1024)
        X = {k: v.to(self.device) for k, v in X.items()}

        with torch.no_grad():
            # print([model(**X) for model in self.models])
            # y = 0
            y = torch.stack(
                # [self.tmp_linear(model(**X).logits) for model in self.models]
                [model(**X).logits for model in self.models]
            )
            # print(y)

        if self.voting_mode == "mean":
            y = y.mean(dim=0)
        elif self.voting_mode == "linear":
            y = y.squeeze(2).transpose(0, 1)
            # print(y.shape)
            y = self.linears(y)

        return y

In [8]:
voting_model = VotingModel(
    [
        "./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/deberta-v3-small_AES2_fold_0_v1",
        "./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/deberta-v3-small_AES2_fold_1_v1",
        "./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/deberta-v3-small_AES2_fold_2_v1",
        "./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/deberta-v3-small_AES2_fold_3_v1",
        "./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/deberta-v3-small_AES2_fold_4_v1",
    ],
    tokenizer_path="./model/alef-b-220-regv1-pc2-deberta-v3-small-skf-s5-e4-pytorch-v1-v1/output_v1/checkpoint-17396-tokenizer",
)

In [5]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    qwk = cohen_kappa_score(labels, predictions.argmax(-1), weights='quadratic')
    results = {
        'qwk': qwk
    }
    return results

In [None]:
# training
train_df = pd.read_csv("./dataset/kaggle/train.csv")
train_scores = []
res = voting_model(train_df["full_text"].tolist()[:15])
res_round = (res + 1).round()
# print(res.squeeze().tolist())
# print(train_df["score"].tolist()[: 5])

d = {
    "prediction": res.squeeze().tolist(),
    "ground_truth": train_df["score"].tolist()[:15],
    "res_round": res_round.squeeze().tolist(),
}
view = pd.DataFrame(data=d)
print(view)

# testing
# test_df = pd.read_csv("./dataset/kaggle/test.csv")
# test_scores = []
# res = voting_model(test_df["full_text"].tolist())
# print(res)

# # shihtl> Save to file "test.csv"
# test_pred_df = pd.DataFrame()
# test_pred_df["essay_id"] = test_df["essay_id"]
# test_pred_df["score"] = test_scores

# test_pred_df.to_csv(f"./submission.csv", index=False)

# test_pred_df.head()

    prediction  ground_truth  res_round
0     1.705298             3        3.0
1     2.022682             3        3.0
2     2.941526             4        4.0
3     2.850662             4        4.0
4     1.825086             3        3.0
5     3.064401             4        4.0
6     1.126608             2        2.0
7     1.981581             3        3.0
8     1.353138             2        2.0
9     1.891761             3        3.0
10    1.240501             2        2.0
11    1.145892             2        2.0
12    3.103931             4        4.0
13    1.966938             3        3.0
14    2.009931             3        3.0


In [None]:
!nvidia-smi

    ground_truth  prediction
0       1.705298           3
1       2.022682           3
2       2.941526           4
3       2.850662           4
4       1.825086           3
5       3.064401           4
6       1.126608           2
7       1.981581           3
8       1.353138           2
9       1.891761           3
10      1.240501           2
11      1.145892           2
12      3.103931           4
13      1.966938           3
14      2.009931           3


In [None]:
# voting_model1 = VotingModel(
#     [
#         ".\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3",
#         ".\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3",
#     ],
# )
# voting_model2 = VotingModel(
#     [
#         ".\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3",
#         ".\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3",
#     ],
#     voting_mode="linear",
#     linear_layer_num=5,
# )

Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at .\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3 and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at .\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36199bec6961081d44eb72fb3f7353f3 and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at .\cache\models--microsoft--deberta-v3-base\snapshots\8ccc9b6f36

Sequential(
  (0): Sequential(
    (0): Linear(in_features=2, out_features=2, bias=True)
    (1): ReLU()
  )
  (1): Sequential(
    (0): Linear(in_features=2, out_features=2, bias=True)
    (1): ReLU()
  )
  (2): Sequential(
    (0): Linear(in_features=2, out_features=2, bias=True)
    (1): ReLU()
  )
  (3): Sequential(
    (0): Linear(in_features=2, out_features=2, bias=True)
    (1): ReLU()
  )
  (4): Linear(in_features=2, out_features=1, bias=True)
)


In [4]:
sample_test = pd.read_csv("./dataset/kaggle/test.csv")
sample_test["full_text"]

0    Many people have car where they live. The thin...
1    I am a scientist at NASA that is discussing th...
2    People always wish they had the same technolog...
Name: full_text, dtype: object

In [None]:
# res1 = voting_model1(sample_test["full_text"].to_list())
# print(res1)
# res2 = voting_model2(sample_test["full_text"].to_list())
# print(res2)

# print(res[0].last_hidden_state[0, 0, :])

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


tensor([[0.3549],
        [0.3550],
        [0.3537]], device='cuda:0')
tensor([[0.1478],
        [0.1478],
        [0.1479]], device='cuda:0', grad_fn=<AddmmBackward0>)


In [None]:
# # print vocab dict of tokenizer
# print(voting_model1.tokenizer.get_vocab()["[CLS]"])

In [None]:
# del voting_model1, voting_model2
# torch.cuda.empty_cache()