In [145]:
import torch
import pandas as pd
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import torch.nn.functional as F
from sklearn.model_selection import ParameterGrid

### Evaluation (F-1)

In [146]:
from sklearn.metrics import classification_report
from typing import Dict
import numpy as np
class SharedTaskConstants:
    """
    Use these constants to interface with the data, not with the id2label used
    inside the Huggingface models!!
    """
    targets = ['validity', 'novelty']
    validity_label_mapping = {
        -1: "not-valid",
        0: "not-valid",  # can be excluded since test set does not contain these
        1: "valid",
    }

    novelty_label_mapping = {
        -1: "not-novel",
        0: "not-novel",  # can be excluded since test set does not contain these
        1: "novel",
    }

    validity_id2label = {v: k for k, v in validity_label_mapping.items()}
    novelty_id2label = {v: k for k, v in novelty_label_mapping.items()}

    local_str_mapping = {
        'novel': 1,
        'not-novel': 0,
        'valid': 1,
        'not-valid': 0
    }

    @staticmethod
    def val_nov_metric(is_validity: np.ndarray, should_validity: np.ndarray, is_novelty: np.ndarray,
                       should_novelty: np.ndarray) -> Dict[str, float]:
        ret = dict()

        ret_base_help = {
            "true_positive_validity": np.sum(np.where(
                np.all(np.stack([is_validity >= .5, should_validity >= .5]), axis=0),
                1, 0)),
            "true_positive_novelty": np.sum(np.where(
                np.all(np.stack([is_novelty >= .5, should_novelty >= .5]), axis=0),
                1, 0)),
            "true_positive_valid_novel": np.sum(np.where(
                np.all(np.stack([is_validity >= .5, is_novelty >= .5,
                                 should_validity >= .5, should_novelty >= .5]), axis=0),
                1, 0)),
            "true_positive_nonvalid_novel": np.sum(np.where(
                np.all(np.stack([is_validity < .5, is_novelty >= .5,
                                 should_validity < .5, should_novelty >= .5]), axis=0),
                1, 0)),
            "true_positive_valid_nonnovel": np.sum(np.where(
                np.all(np.stack([is_validity >= .5, is_novelty < .5,
                                 should_validity >= .5, should_novelty < .5]), axis=0),
                1, 0)),
            "true_positive_nonvalid_nonnovel": np.sum(np.where(
                np.all(np.stack([is_validity < .5, is_novelty < .5,
                                 should_validity < .5, should_novelty < .5]), axis=0),
                1, 0)),
            "classified_positive_validity": np.sum(np.where(is_validity >= .5, 1, 0)),
            "classified_positive_novelty": np.sum(np.where(is_novelty >= .5, 1, 0)),
            "classified_positive_valid_novel": np.sum(np.where(
                np.all(np.stack([is_validity >= .5, is_novelty >= .5]), axis=0),
                1, 0)),
            "classified_positive_nonvalid_novel": np.sum(np.where(
                np.all(np.stack([is_validity < .5, is_novelty >= .5]), axis=0),
                1, 0)),
            "classified_positive_valid_nonnovel": np.sum(np.where(
                np.all(np.stack([is_validity >= .5, is_novelty < .5]), axis=0),
                1, 0)),
            "classified_positive_nonvalid_nonnovel": np.sum(np.where(
                np.all(np.stack([is_validity < .5, is_novelty < .5]), axis=0),
                1, 0)),
            "indeed_positive_validity": np.sum(np.where(should_validity >= .5, 1, 0)),
            "indeed_positive_novelty": np.sum(np.where(should_novelty >= .5, 1, 0)),
            "indeed_positive_valid_novel": np.sum(np.where(
                np.all(np.stack([should_validity >= .5, should_novelty >= .5]), axis=0),
                1, 0)),
            "indeed_positive_nonvalid_novel": np.sum(np.where(
                np.all(np.stack([should_validity < .5, should_novelty >= .5]), axis=0),
                1, 0)),
            "indeed_positive_valid_nonnovel": np.sum(np.where(
                np.all(np.stack([should_validity >= .5, should_novelty < .5]), axis=0),
                1, 0)),
            "indeed_positive_nonvalid_nonnovel": np.sum(np.where(
                np.all(np.stack([should_validity < .5, should_novelty < .5]), axis=0),
                1, 0)),
        }

        ret_help = {
            "precision_validity": ret_base_help["true_positive_validity"] /
                                  max(1, ret_base_help["classified_positive_validity"]),
            "precision_novelty": ret_base_help["true_positive_novelty"] /
                                 max(1, ret_base_help["classified_positive_novelty"]),
            "recall_validity": ret_base_help["true_positive_validity"] /
                               max(1, ret_base_help["indeed_positive_validity"]),
            "recall_novelty": ret_base_help["true_positive_novelty"] /
                              max(1, ret_base_help["indeed_positive_novelty"]),
            "precision_valid_novel": ret_base_help["true_positive_valid_novel"] /
                                     max(1, ret_base_help["classified_positive_valid_novel"]),
            "precision_valid_nonnovel": ret_base_help["true_positive_valid_nonnovel"] /
                                        max(1, ret_base_help["classified_positive_valid_nonnovel"]),
            "precision_nonvalid_novel": ret_base_help["true_positive_nonvalid_novel"] /
                                        max(1, ret_base_help["classified_positive_nonvalid_novel"]),
            "precision_nonvalid_nonnovel": ret_base_help["true_positive_nonvalid_nonnovel"] /
                                           max(1, ret_base_help["classified_positive_nonvalid_nonnovel"]),
            "recall_valid_novel": ret_base_help["true_positive_valid_novel"] /
                                  max(1, ret_base_help["indeed_positive_valid_novel"]),
            "recall_valid_nonnovel": ret_base_help["true_positive_valid_nonnovel"] /
                                     max(1, ret_base_help["indeed_positive_valid_nonnovel"]),
            "recall_nonvalid_novel": ret_base_help["true_positive_nonvalid_novel"] /
                                     max(1, ret_base_help["indeed_positive_nonvalid_novel"]),
            "recall_nonvalid_nonnovel": ret_base_help["true_positive_nonvalid_nonnovel"] /
                                        max(1, ret_base_help["indeed_positive_nonvalid_nonnovel"])
        }

        ret.update({
            "f1_validity": 2 * ret_help["precision_validity"] * ret_help["recall_validity"] / max(1e-4, ret_help[
                "precision_validity"] + ret_help["recall_validity"]),
            "f1_novelty": 2 * ret_help["precision_novelty"] * ret_help["recall_novelty"] / max(1e-4, ret_help[
                "precision_novelty"] + ret_help["recall_novelty"]),
            "f1_valid_novel": 2 * ret_help["precision_valid_novel"] * ret_help["recall_valid_novel"] / max(1e-4,
                                                                                                           ret_help[
                                                                                                               "precision_valid_novel"] +
                                                                                                           ret_help[
                                                                                                               "recall_valid_novel"]),
            "f1_valid_nonnovel": 2 * ret_help["precision_valid_nonnovel"] * ret_help["recall_valid_nonnovel"] / max(
                1e-4, ret_help["precision_valid_nonnovel"] + ret_help["recall_valid_nonnovel"]),
            "f1_nonvalid_novel": 2 * ret_help["precision_nonvalid_novel"] * ret_help["recall_nonvalid_novel"] / max(
                1e-4, ret_help["precision_nonvalid_novel"] + ret_help["recall_nonvalid_novel"]),
            "f1_nonvalid_nonnovel": 2 * ret_help["precision_nonvalid_nonnovel"] * ret_help[
                "recall_nonvalid_nonnovel"] / max(1e-4, ret_help["precision_nonvalid_nonnovel"] + ret_help[
                "recall_nonvalid_nonnovel"])
        })

        ret.update({
            "f1_macro": (ret["f1_valid_novel"] + ret["f1_valid_nonnovel"] + ret["f1_nonvalid_novel"] + ret[
                "f1_nonvalid_nonnovel"]) / 4
        })

        return ret

In [147]:
def print_results(baseline_name: str, y_true: dict, y_pred: dict):
    print(f"==== {baseline_name} ====")
    print("Validity")
    results_validity = classification_report(
        y_true['validity'],
        y_pred['validity'],
        target_names=['not-valid', 'valid'],
        labels=[0, 1],
        zero_division=0
    )
    print(results_validity)

    print("Novelty")
    results_novelty = classification_report(
        y_true['novelty'],
        y_pred['novelty'],
        target_names=['not-novel', 'novel'],
        labels=[0, 1],
        zero_division=0
    )
    print(results_novelty)

    print("Combined (organization eval)")
    res = SharedTaskConstants.val_nov_metric(
        np.array(y_pred['validity']),
        np.array(y_true['validity']),
        np.array(y_pred['novelty']),
        np.array(y_true['novelty']),
    )
    print(res['f1_macro'].round(4))
    return res['f1_macro'].round(4)

### Load Train and Test

In [167]:
def str_to_list(text):
    '''
    Return tensor string into list
    '''
    # clean string
    clean_str = text.replace('tensor(', '').replace(')', '').strip()
    # convert to list
    tensor = eval(clean_str, {"torch": torch, "__builtins__": {}})
    return tensor

def process_covariate_data(df):
    '''
    Expanding all tensors in a single cell
    Make confidence into ordinal variables
    '''
    # convert str to tensor (list)
    SBERT_premise = df.SBERT_premise.apply(lambda x: str_to_list(x))
    SBERT_conclusion = df.SBERT_conclusion.apply(lambda x: str_to_list(x))

    # expand the list into individual entries
    df_expand1 = SBERT_premise.apply(pd.Series)
    df_expand2 = SBERT_conclusion.apply(pd.Series)

    # assign a meaningful name
    df_expand1.columns = ['pre_emb{}'.format(i+1) for i in range(df_expand1.shape[1])]
    df_expand2.columns = ['con_emb{}'.format(i+1) for i in range(df_expand2.shape[1])]

    # put everything together
    df_final = pd.concat([df.drop(['SBERT_premise', "SBERT_conclusion"], axis=1), df_expand1, df_expand2], axis=1)
    return df_final

def preprocess_input(x, y):
    '''
    return DataLoader for later input into the model
    '''
    # pd.dataframe to array
    x = np.array(x, dtype=np.float64)
    y = np.array(y, dtype=np.float64)
    # transform y for nn model
    # Transform the data
    transformation_dict = {
    (1, 1): [1, 0, 0, 0],
    (1, 0): [0, 1, 0, 0],
    (0, 1): [0, 0, 1, 0],
    (0, 0): [0, 0, 0, 1],
    }
    y = np.array([transformation_dict[tuple(row)] for row in y])
    # array to tensor
    x_torch = torch.tensor(x)
    y_torch = torch.tensor(y)
    data = TensorDataset(x_torch, y_torch)

    batch_size = 10
    loader = DataLoader(data, batch_size=batch_size, shuffle=True)
    return loader

In [169]:
train = pd.read_csv("../Data/TaskA_train_neural_kg.csv", index_col=False)
test = pd.read_pickle("../Data/TaskA_test_neural_kg.pkl")

In [174]:
X_train = train.loc[:, ["SBERT_premise", "SBERT_conclusion", "SBERT_cosine_sim", "Irrelevancy", "AveDistance"]]
y_train = train.loc[:, ["Validity", "Novelty"]]
X_test = test.loc[:, ["SBERT_premise", "SBERT_conclusion", "SBERT_cosine_sim", "Irrelevancy", "AveDistance"]]
y_test = test.loc[:, ["Validity", "Novelty"]]

In [175]:
# X_train = process_covariate_data(X_train)
# X_test = process_covariate_data(X_test)
# y_train.replace(-1, 0, inplace=True)
# y_test.replace(-1, 0, inplace=True)

X_train = train.loc[:,["SBERT_cosine_sim",'Irrelevancy', 'AveDistance']]
y_train = train.loc[:,["Validity",'Novelty']]
y_train = y_train.replace(-1, 0)

X_test = test.loc[:,["SBERT_cosine_sim",'Irrelevancy', 'AveDistance']]
y_test = test.loc[:,["Validity",'Novelty']]
y_test = y_test.replace(-1, 0)


train_loader = preprocess_input(X_train, y_train)
test_loader = preprocess_input(X_test, y_test)

### NN under pytorch

In [178]:
class SimpleNN(nn.Module):
    def __init__(self, hidden_dim, dropout_rate):
        super(SimpleNN, self).__init__()
        self.input_dim = 3
        self.hidden_dim = hidden_dim
        self.output_dim = 4
        self.fc1 = nn.Linear(self.input_dim, self.hidden_dim)  # Assuming n input features
        self.bn1 = nn.BatchNorm1d(self.hidden_dim)  # Batch normalization layer
        self.dropout1 = nn.Dropout(dropout_rate)
        
        self.fc2 = nn.Linear(self.hidden_dim, self.hidden_dim) 
        self.bn2 = nn.BatchNorm1d(self.hidden_dim)  # Batch normalization layer
        self.dropout2 = nn.Dropout(dropout_rate)
        self.fc3 = nn.Linear(self.hidden_dim, self.output_dim)

        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        x = x.float()
        x = torch.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)

        x = torch.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        x = self.fc3(x)
        
        x = self.softmax(x)
        return x
    
def evaluate_model(model, test_loader):
    model.eval()
    test_label = {"validity": list(y_test.Validity), "novelty": list(y_test.Novelty)}
    test_preds = {"validity": [], "novelty": []}
    with torch.no_grad():
        for inputs, _ in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            predicted = F.one_hot(predicted, num_classes=4)
            # Transform the data
            transformation_dict = {
            (1, 0, 0, 0): (1, 1), 
            (0, 1, 0, 0): (1, 0),
            (0, 0, 1, 0): (0, 1),
            (0, 0, 0, 1): (0, 1)
            }
            predicted = [transformation_dict[tuple(row.tolist())] for row in predicted]
            # Obtain the prediction
            for pred in predicted:
                test_preds["validity"].append(pred[0])
                test_preds["novelty"].append(pred[1]) 
        return print_results("Roberta_based", test_label, test_preds)

def train_model(hyperparameters):
    model = SimpleNN(hidden_dim=hyperparameters['hidden_dim'], dropout_rate=hyperparameters["dropout_rate"])
    loss_function = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters['lr'])
    

    for epoch in range(hyperparameters['epochs']):
        for inputs, targets in train_loader:
            inputs = inputs.float()
            targets = targets.float()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_function(outputs, targets)
            loss.backward()
            optimizer.step()
    f1 = evaluate_model(model, test_loader)
    return f1

In [179]:
param_grid = {
    'hidden_dim': [2, 3],
    'lr': [0.001, 0.01],
    'epochs': [10, 30],
    'dropout_rate': [0.2, 0.4, 0.5]
}
grid = ParameterGrid(param_grid)

best_f1 = 0
best_params = None

for params in grid:
    f1 = train_model(params)
    if f1 > best_f1:
        best_f1 = f1
        best_params = params
    print(params)
    print(f1)
print(f"Best f1: {best_f1}")
print(f"Best Hyperparameters: {best_params}")

==== Roberta_based ====
Validity
              precision    recall  f1-score   support

   not-valid       0.45      0.12      0.19       206
       valid       0.61      0.90      0.73       314

    accuracy                           0.59       520
   macro avg       0.53      0.51      0.46       520
weighted avg       0.55      0.59      0.51       520

Novelty
              precision    recall  f1-score   support

   not-novel       0.57      0.90      0.70       294
       novel       0.46      0.12      0.18       226

    accuracy                           0.56       520
   macro avg       0.52      0.51      0.44       520
weighted avg       0.52      0.56      0.47       520

Combined (organization eval)
0.1633
{'dropout_rate': 0.2, 'epochs': 10, 'hidden_dim': 2, 'lr': 0.001}
0.1633
==== Roberta_based ====
Validity
              precision    recall  f1-score   support

   not-valid       0.45      0.25      0.32       206
       valid       0.62      0.80      0.70       314
