In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:

import pandas as pd
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification

In [None]:
# Fixing the randomness of CUDA.
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

np.random.seed(42)
torch.manual_seed(42)

DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("PyTorch Version : {}".format(torch.__version__))
print(DEVICE)

PyTorch Version : 2.2.1+cu121
cuda


In [None]:
worksapce = '/content/drive/MyDrive/btptemp/'
model_save = 'TC+TC+CB+FZ.pt'
model_name = 'TC+TC+CB+FZ'
num_epochs = 30
batch_size = 32
learning_rate = 1e-3
num_classes = 6
padding_idx = 0
metadata_each_dim = 10


col = ['id', 'label', 'statement', 'subject', 'speaker', 'job_title', 'state_info', 'party_affiliation', 'barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts', 'context']

label_map = {0: 'pants-fire', 1: 'false', 2: 'barely-true', 3: 'half-true', 4: 'mostly-true', 5: 'true'}
label_convert = {'pants-fire': 0, 'false': 1, 'barely-true': 2, 'half-true': 3, 'mostly-true': 4, 'true':5}


In [None]:
train_data = pd.read_csv(worksapce + 'train.tsv', sep = '\t', names = col)
test_data = pd.read_csv(worksapce + 'test.tsv', sep = '\t', names = col)
val_data = pd.read_csv(worksapce + 'valid.tsv', sep = '\t', names = col)

# Replace NaN values with 'NaN'
train_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']] = train_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']].fillna(0)
train_data.fillna('NaN', inplace=True)

test_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']] = test_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']].fillna(0)
test_data.fillna('NaN', inplace=True)

val_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']] = val_data[['barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts']].fillna(0)
val_data.fillna('NaN', inplace=True)

In [None]:
# def textProcess(input_text, max_length = -1):
#     tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
#     if max_length == -1:
#         tokens = tokenizer(input_text, truncation=True, padding=True)
#     else:
#         tokens = tokenizer(input_text, truncation=True, padding='max_length', max_length=max_length)
#     return tokens


In [None]:
from transformers import GPT2TokenizerFast

def textProcess(input_text, max_length = -1):
    tokenizer = GPT2TokenizerFast.from_pretrained('Xenova/gpt-4')
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    tokenizer.pad_token_id = 0
    if max_length == -1:
        tokens = tokenizer(input_text, truncation=True, padding=True)
    else:
        tokens = tokenizer(input_text, truncation=True, padding='max_length', max_length=max_length)
    return tokens

In [None]:
# from transformers import XLNetTokenizer

# def textProcess(input_text, max_length=-1):
#   tokenizer = XLNetTokenizer.from_pretrained('xlnet-base-cased')  # Adjust casing based on your needs
#   if max_length == -1:
#     tokens = tokenizer(input_text, truncation=True, padding=True)
#   else:
#     tokens = tokenizer(input_text, truncation=True, padding='max_length', max_length=max_length)
#   return tokens

In [None]:
# Define a custom dataset for loading the data
class LiarDataset(data.Dataset):
    def __init__(self, data_df, statement, label_onehot, label, subject, speaker, job_title, state_info,
                     party_affiliation, barely_true_counts, false_counts, half_true_counts, mostly_true_counts,
                    pants_on_fire_counts, context):
        self.data_df = data_df
        self.statement = statement
        self.label_onehot = label_onehot
        self.label = label
        self.metadata_text = torch.cat((subject.int(), speaker.int(), job_title.int(), state_info.int(), party_affiliation.int(),
                                   context.int()), dim=-1)
        self.metadata_number = torch.cat((torch.tensor(barely_true_counts, dtype=torch.float).unsqueeze(1), torch.tensor(false_counts, dtype=torch.float).unsqueeze(1),
                                   torch.tensor(half_true_counts, dtype=torch.float).unsqueeze(1), torch.tensor(mostly_true_counts, dtype=torch.float).unsqueeze(1),
                                   torch.tensor(pants_on_fire_counts, dtype=torch.float).unsqueeze(1)), dim=-1)



    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, idx):
        statement = self.statement[idx]
        label_onehot = self.label_onehot[idx]
        label = self.label[idx]
        metadata_text = self.metadata_text[idx]
        metadata_number = self.metadata_number[idx]
        return statement, label_onehot, label, metadata_text, metadata_number

In [None]:
# Define the data loaders for training and validation
train_text = torch.tensor(textProcess(train_data['statement'].tolist(), max_length = 768)['input_ids'])
train_label = torch.nn.functional.one_hot(torch.tensor(train_data['label'].replace(label_convert)), num_classes=6).type(torch.float64)
train_subject = torch.tensor(textProcess(train_data['subject'].tolist(), metadata_each_dim)['input_ids'])
train_speaker = torch.tensor(textProcess(train_data['speaker'].tolist(), metadata_each_dim)['input_ids'])
train_job_title = torch.tensor(textProcess(train_data['job_title'].tolist(), metadata_each_dim)['input_ids'])
train_state_info = torch.tensor(textProcess(train_data['state_info'].tolist(), metadata_each_dim)['input_ids'])
train_party_affiliation = torch.tensor(textProcess(train_data['party_affiliation'].tolist(), metadata_each_dim)['input_ids'])
train_context = torch.tensor(textProcess(train_data['context'].tolist(), metadata_each_dim)['input_ids'])

train_dataset = LiarDataset(train_data, train_text, train_label, torch.tensor(train_data['label'].replace(label_convert)),
                            train_subject, train_speaker, train_job_title,
                            train_state_info, train_party_affiliation,
                            train_data['barely_true_counts'].tolist(), train_data['false_counts'].tolist(),
                            train_data['half_true_counts'].tolist(), train_data['mostly_true_counts'].tolist(),
                            train_data['pants_on_fire_counts'].tolist(), train_context)
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_text = torch.tensor(textProcess(val_data['statement'].tolist(), max_length = 768)['input_ids'])
val_label = torch.nn.functional.one_hot(torch.tensor(val_data['label'].replace(label_convert)), num_classes=6).type(torch.float64)
val_subject = torch.tensor(textProcess(val_data['subject'].tolist(), metadata_each_dim)['input_ids'])
val_speaker = torch.tensor(textProcess(val_data['speaker'].tolist(), metadata_each_dim)['input_ids'])
val_job_title = torch.tensor(textProcess(val_data['job_title'].tolist(), metadata_each_dim)['input_ids'])
val_state_info = torch.tensor(textProcess(val_data['state_info'].tolist(), metadata_each_dim)['input_ids'])
val_party_affiliation = torch.tensor(textProcess(val_data['party_affiliation'].tolist(), metadata_each_dim)['input_ids'])
val_context = torch.tensor(textProcess(val_data['context'].tolist(), metadata_each_dim)['input_ids'])

val_dataset = LiarDataset(val_data, val_text, val_label, torch.tensor(val_data['label'].replace(label_convert)),
                          val_subject, val_speaker, val_job_title,
                          val_state_info, val_party_affiliation,
                          val_data['barely_true_counts'].tolist(), val_data['false_counts'].tolist(),
                          val_data['half_true_counts'].tolist(), val_data['mostly_true_counts'].tolist(),
                          val_data['pants_on_fire_counts'].tolist(), val_context)
val_loader = data.DataLoader(val_dataset, batch_size=batch_size)

test_text = torch.tensor(textProcess(test_data['statement'].tolist(), max_length = 768)['input_ids'])
test_label = torch.nn.functional.one_hot(torch.tensor(test_data['label'].replace(label_convert)), num_classes=6).type(torch.float64)
test_subject = torch.tensor(textProcess(test_data['subject'].tolist(), metadata_each_dim)['input_ids'])
test_speaker = torch.tensor(textProcess(test_data['speaker'].tolist(), metadata_each_dim)['input_ids'])
test_job_title = torch.tensor(textProcess(test_data['job_title'].tolist(), metadata_each_dim)['input_ids'])
test_state_info = torch.tensor(textProcess(test_data['state_info'].tolist(), metadata_each_dim)['input_ids'])
test_party_affiliation = torch.tensor(textProcess(test_data['party_affiliation'].tolist(), metadata_each_dim)['input_ids'])
test_context = torch.tensor(textProcess(test_data['context'].tolist(), metadata_each_dim)['input_ids'])

test_dataset = LiarDataset(test_data, test_text, test_label, torch.tensor(test_data['label'].replace(label_convert)),
                          test_subject, test_speaker, test_job_title,
                          test_state_info, test_party_affiliation,
                          test_data['barely_true_counts'].tolist(), test_data['false_counts'].tolist(),
                          test_data['half_true_counts'].tolist(), test_data['mostly_true_counts'].tolist(),
                          test_data['pants_on_fire_counts'].tolist(), test_context)
test_loader = data.DataLoader(test_dataset, batch_size=batch_size)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/460 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.01M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/917k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/4.23M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/98.0 [00:00<?, ?B/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make s

In [None]:
print(train_text.size())
print(test_text.size())
print(val_text.size())

torch.Size([10240, 768])
torch.Size([1267, 768])
torch.Size([1284, 768])


In [None]:
train_text

tensor([[   50,   954,   279,  ...,     0,     0,     0],
        [ 4599,  1550,   279,  ...,     0,     0,     0],
        [65476,  8283, 34008,  ...,     0,     0,     0],
        ...,
        [   50,   954,   459,  ...,     0,     0,     0],
        [ 1966, 33510,   279,  ...,     0,     0,     0],
        [  791,  6011,   315,  ...,     0,     0,     0]])

In [None]:
class FuzzyLayer(nn.Module):
    def __init__(self, input_dim, membership_num):
        super(FuzzyLayer, self).__init__()

        # input_dim: feature number of the dataset
        # membership_num: number of membership function, also known as the class number

        self.input_dim = input_dim
        self.membership_num = membership_num

        self.membership_miu = nn.Parameter(torch.Tensor(self.membership_num, self.input_dim).to(DEVICE), requires_grad=True)
        self.membership_sigma = nn.Parameter(torch.Tensor(self.membership_num, self.input_dim).to(DEVICE), requires_grad=True)

        nn.init.xavier_uniform_(self.membership_miu)
        nn.init.ones_(self.membership_sigma)

    def forward(self, input_seq):
        batch_size = input_seq.size()[0]
        input_seq_exp = input_seq.unsqueeze(1).expand(batch_size, self.membership_num, self.input_dim)
        membership_miu_exp = self.membership_miu.unsqueeze(0).expand(batch_size, self.membership_num, self.input_dim)
        membership_sigma_exp = self.membership_sigma.unsqueeze(0).expand(batch_size, self.membership_num, self.input_dim)

        fuzzy_membership = torch.mean(torch.exp((-1 / 2) * ((input_seq_exp - membership_miu_exp) / membership_sigma_exp) ** 2), dim=-1)
        return fuzzy_membership

In [None]:
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        self.convs = nn.ModuleList([
                                    nn.Conv1d(in_channels = embedding_dim,
                                              out_channels = n_filters,
                                              kernel_size = fs)
                                    for fs in filter_sizes
                                    ])
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        #text = [batch size, sent len]
        embedded = self.embedding(text)
        #embedded = [batch size, sent len, emb dim]

        embedded = embedded.permute(0, 2, 1)
        #embedded = [batch size, emb dim, sent len]

        conved = [F.relu(conv(embedded)) for conv in self.convs]
        #conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]

        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        #pooled_n = [batch size, n_filters]

        cat = self.dropout(torch.cat(pooled, dim = 1))
        #cat = [batch size, n_filters * len(filter_sizes)]

        return self.fc(cat)


### P1 - Trying Capsule network code

In [None]:
!pip install git+https://github.com/pytorch/tnt.git@master

Collecting git+https://github.com/pytorch/tnt.git@master
  Cloning https://github.com/pytorch/tnt.git (to revision master) to /tmp/pip-req-build-otl0fsgm
  Running command git clone --filter=blob:none --quiet https://github.com/pytorch/tnt.git /tmp/pip-req-build-otl0fsgm
  Resolved https://github.com/pytorch/tnt.git to commit ad5936473131543ecbd750d0e35679d7b21040c9
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pyre-extensions (from torchtnt==0.2.3)
  Downloading pyre_extensions-0.0.30-py3-none-any.whl (12 kB)
Collecting typing-inspect (from pyre-extensions->torchtnt==0.2.3)
  Downloading typing_inspect-0.9.0-py3-none-any.whl (8.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->torchtnt==0.2.3)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->

In [None]:
!pip install pytorch-nlp==0.4.1

Collecting pytorch-nlp==0.4.1
  Downloading pytorch_nlp-0.4.1-py3-none-any.whl (82 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/82.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.3/82.3 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pytorch-nlp
Successfully installed pytorch-nlp-0.4.1


In [None]:
!pip install git+https://github.com/leftthomas/CapsuleLayer.git@master

Collecting git+https://github.com/leftthomas/CapsuleLayer.git@master
  Cloning https://github.com/leftthomas/CapsuleLayer.git (to revision master) to /tmp/pip-req-build-xs61qxn0
  Running command git clone --filter=blob:none --quiet https://github.com/leftthomas/CapsuleLayer.git /tmp/pip-req-build-xs61qxn0
  Resolved https://github.com/leftthomas/CapsuleLayer.git to commit 2205a603ac1cb3d24673ea7796a4a7e817914712
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: capsule-layer
  Building wheel for capsule-layer (setup.py) ... [?25l[?25hdone
  Created wheel for capsule-layer: filename=capsule_layer-0.5.0-py3-none-any.whl size=7866 sha256=716c540bf76f8806027acbd82595551f670d3f0442ccc00b162d985f929f3c20
  Stored in directory: /tmp/pip-ephem-wheel-cache-o1t7an8k/wheels/47/ac/61/2e6c57f0c63ebe07137f897a920620192ececc6e267f407807
Successfully built capsule-layer
Installing collected packages: capsule-layer
Successfully installed capsule-layer-0.5.0

In [None]:
import math

import torch
import torch.nn.functional as F
from capsule_layer import CapsuleLinear
from torch import nn
from torch.nn.parameter import Parameter


class CompositionalEmbedding(nn.Module):

    def __init__(self, num_embeddings, embedding_dim, num_codebook, num_codeword=None, num_repeat=10, weighted=True,
                 return_code=False):
        super(CompositionalEmbedding, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.num_codebook = num_codebook
        self.num_repeat = num_repeat
        self.weighted = weighted
        self.return_code = return_code

        if num_codeword is None:
            num_codeword = math.ceil(math.pow(num_embeddings, 1 / num_codebook))
        self.num_codeword = num_codeword
        self.code = Parameter(torch.Tensor(num_embeddings, num_codebook, num_codeword))
        self.codebook = Parameter(torch.Tensor(num_codebook, num_codeword, embedding_dim))

        nn.init.normal_(self.code)
        nn.init.normal_(self.codebook)

    def forward(self, input):
        batch_size = input.size(0)
        index = input.view(-1)
        code = self.code.index_select(dim=0, index=index)
        if self.weighted:
            # reweight, do softmax, make sure the sum of weight about each book to 1
            code = F.softmax(code, dim=-1)
            out = (code[:, :, None, :] @ self.codebook[None, :, :, :]).squeeze(dim=-2).sum(dim=1)
        else:
            # because Gumbel SoftMax works in a stochastic manner, needs to run several times to
            # get more accurate embedding
            code = (torch.sum(torch.stack([F.gumbel_softmax(code) for _ in range(self.num_repeat)]), dim=0)).argmax(
                dim=-1)
            out = []
            for index in range(self.num_codebook):
                out.append(self.codebook[index, :, :].index_select(dim=0, index=code[:, index]))
            out = torch.sum(torch.stack(out), dim=0)
            code = F.one_hot(code, num_classes=self.num_codeword)

        out = out.view(batch_size, -1, self.embedding_dim)
        code = code.view(batch_size, -1, self.num_codebook, self.num_codeword)
        if self.return_code:
            return out, code
        else:
            return out

    def __repr__(self):
        return self.__class__.__name__ + ' (' + str(self.num_embeddings) + ', ' + str(self.embedding_dim) + ')'


class Model(nn.Module):
    def __init__(self, vocab_size, embedding_size, num_codebook, num_codeword, hidden_size, in_length, out_length,
                 num_class, routing_type, embedding_type, classifier_type, num_iterations, num_repeat, dropout):
        super().__init__()

        self.in_length, self.out_length = in_length, out_length
        self.hidden_size, self.classifier_type = hidden_size, classifier_type
        self.embedding_type = embedding_type

        if embedding_type == 'cwc':
            self.embedding = CompositionalEmbedding(vocab_size, embedding_size, num_codebook, num_codeword,
                                                    weighted=True)
        elif embedding_type == 'cc':
            self.embedding = CompositionalEmbedding(vocab_size, embedding_size, num_codebook, num_codeword, num_repeat,
                                                    weighted=False)
        else:
            self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.features = nn.GRU(embedding_size, self.hidden_size, num_layers=2, dropout=dropout, batch_first=True,
                               bidirectional=True)
        if classifier_type == 'capsule' and routing_type == 'k_means':
            self.classifier = CapsuleLinear(out_capsules=num_class, in_length=self.in_length,
                                            out_length=self.out_length, in_capsules=None, share_weight=True,
                                            routing_type='k_means', num_iterations=num_iterations, bias=False)
        elif classifier_type == 'capsule' and routing_type == 'dynamic':
            self.classifier = CapsuleLinear(out_capsules=num_class, in_length=self.in_length,
                                            out_length=self.out_length, in_capsules=None, share_weight=True,
                                            routing_type='dynamic', num_iterations=num_iterations, bias=False)
        else:
            self.classifier = nn.Linear(in_features=self.hidden_size, out_features=num_class, bias=False)

    def forward(self, x):
        embed = self.embedding(x)
        out, _ = self.features(embed)

        out = out[:, :, :self.hidden_size] + out[:, :, self.hidden_size:]
        out = out.mean(dim=1).contiguous()

        if self.classifier_type == 'capsule':
            out = out.view(out.size(0), -1, self.in_length)
            out = self.classifier(out)
            classes = out[1].norm(dim=-1)
        else:
            out = out.view(out.size(0), -1)
            classes = self.classifier(out)

        return classes

        # if self.classifier_type == 'capsule':
        #     out = out.view(out.size(0), -1, self.in_length)
        #     out = self.classifier(out)
        #     out = torch.stack(out)
        #     classes = out.norm(dim=-1)
        #     classes = classes.norm(dim=0)
        # else:
        #     out = out.view(out.size(0), -1)
        #     classes = self.classifier(out)
        # return classes

In [None]:
class CNNBiLSTM(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        super().__init__()

        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.conv = nn.Conv1d(in_channels=embedding_dim, out_channels=32, kernel_size=1)
        self.rnn = nn.LSTM(32,
                           hidden_dim,
                           num_layers=n_layers,
                           bidirectional=bidirectional,
                           dropout=dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, metadata):
        #metadata = [batch size, metadata dim]

        embedded = self.dropout(self.embedding(metadata))
        #embedded = [batch size, metadata dim, emb dim]

        embedded = torch.reshape(embedded, (metadata.size(0), 128, 1))

        conved = F.relu(self.conv(embedded))
        #conved = [batch size, n_filters, metadata dim - filter_sizes[n] + 1]

        conved = torch.reshape(conved, (metadata.size(0), 32))

        outputs, (hidden, cell) = self.rnn(conved)
        #outputs = [metadata dim - filter_sizes[n] + 1, batch size, hid dim * num directions]
        #hidden = [num layers * num directions, batch size, hid dim]
        #cell = [num layers * num directions, batch size, hid dim]

        #concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
        #and apply dropout
        # hidden = self.dropout(torch.cat((hidden[-1,:], hidden[0,:]), dim = -1))
        #hidden = [batch size, hid dim * num directions]

        return self.fc(outputs)

In [None]:
class LiarModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, padding_idx, input_dim, input_dim_metadata, hidden_dim, n_layers, bidirectional):
        super().__init__()

        self.textcnn = TextCNN(vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, padding_idx)

        self.capsulecnn = Model(vocab_size=vocab_size, embedding_size=embedding_dim, num_codebook=8,
                                num_codeword=None, hidden_size=128, in_length=8, out_length=8, num_class=6,
                                routing_type='dynamic', embedding_type=None, classifier_type='capsule',
                                num_iterations=10, num_repeat=10, dropout=0.5)

        self.capsulecnn2 = Model(vocab_size=vocab_size, embedding_size=embedding_dim, num_codebook=8,
                                num_codeword=None, hidden_size=128, in_length=8, out_length=16, num_class=6,
                                routing_type='dynamic', embedding_type=None, classifier_type='capsule',
                                num_iterations=10, num_repeat=10, dropout=0.5)

        self.textcnn2 = TextCNN(vocab_size, input_dim, n_filters, filter_sizes, output_dim, dropout, padding_idx)

        self.cnn_bilstm = CNNBiLSTM(input_dim_metadata, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout)
        self.fuzzy = FuzzyLayer(output_dim, output_dim)
        self.fuse = nn.Linear(output_dim * 4, output_dim)

    def forward(self, text, metadata_text, metadata_number):
        #text = [batch size, sent len]
        #metadata = [batch size, metadata dim]

        text_output = self.textcnn(text)
        # text_output = self.capsulecnn(text)

        metadata_output_text = self.textcnn2(metadata_text)
        # metadata_output_text = self.capsulecnn2(metadata_text)

        metadata_output_number = self.cnn_bilstm(metadata_number)

        metadata_output_fuzzy = self.fuzzy(metadata_output_number)

        fused_output = self.fuse(torch.cat((text_output, metadata_output_text, metadata_output_number, metadata_output_fuzzy), dim=1))
        return fused_output


In [None]:
vocab_size = 100266

embedding_dim = 128
n_filters = 128
filter_sizes = [3,4,5]
output_dim = 6
dropout = 0.5
padding_idx = 0
input_dim = 6 * metadata_each_dim
input_dim_metadata = 5
hidden_dim = 64
n_layers = 1
bidirectional = True

model = LiarModel(vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, padding_idx, input_dim, input_dim_metadata, hidden_dim, n_layers, bidirectional).to(DEVICE)


# Define the optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()


# Record the training process
Train_acc = []
Train_loss = []
Train_macro_f1 = []
Train_micro_f1 = []

Val_acc = []
Val_loss = []
Val_macro_f1 = []
Val_micro_f1 = []

# Evaluate the model on new data
def test(model, test_loader, model_save):
    model.load_state_dict(torch.load(model_save))
    model.eval()

    test_label_all = []
    test_predict_all = []

    test_loss = 0.0
    test_accuracy = 0.0
    with torch.no_grad():
        for statements, label_onehot, label, metadata_text, metadata_number in test_loader:
            statements = statements.to(DEVICE)
            label_onehot = label_onehot.to(DEVICE)
            label = label.to(DEVICE)
            metadata_text = metadata_text.to(DEVICE)
            metadata_number = metadata_number.to(DEVICE)

            test_outputs = model(statements, metadata_text, metadata_number)
            test_loss += criterion(test_outputs, label_onehot).item()
            _, test_predicted = torch.max(test_outputs, 1)

            test_accuracy += sum(test_predicted == label)
            test_predict_all += test_predicted.tolist()
            test_label_all += label.tolist()

    test_loss /= len(test_loader)
    test_accuracy /= len(test_loader.dataset)
    test_macro_f1 = f1_score(test_label_all, test_predict_all, average='macro')
    test_micro_f1 = f1_score(test_label_all, test_predict_all, average='micro')

    print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.4f}, Test F1 Macro: {test_macro_f1:.4f}, Test F1 Micro: {test_micro_f1:.4f}')

def train(num_epochs, model, train_loader, val_loader, optimizer, criterion, model_save):
    epoch_trained = 0
    train_label_all = []
    train_predict_all = []
    val_label_all = []
    val_predict_all = []
    best_valid_loss = float('inf')

    start_time = time.time()
    for epoch in range(num_epochs):
        epoch_trained += 1
        epoch_start_time = time.time()
        # Training
        model.train()
        train_loss = 0.0
        train_accuracy = 0.0
        for statements, label_onehot, label, metadata_text, metadata_number in train_loader:
            statements = statements.to(DEVICE)
            label_onehot = label_onehot.to(DEVICE)
            label = label.to(DEVICE)
            metadata_text = metadata_text.to(DEVICE)
            metadata_number = metadata_number.to(DEVICE)

            optimizer.zero_grad()
            outputs = model(statements, metadata_text, metadata_number)

            loss = criterion(outputs, label_onehot)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, train_predicted = torch.max(outputs, 1)
            train_accuracy += sum(train_predicted == label)
            train_predict_all += train_predicted.tolist()
            train_label_all += label.tolist()
        train_loss /= len(train_loader)
        train_accuracy /= len(train_loader.dataset)
        train_macro_f1 = f1_score(train_label_all, train_predict_all, average='macro')
        train_micro_f1 = f1_score(train_label_all, train_predict_all, average='micro')

        Train_acc.append(train_accuracy.tolist())
        Train_loss.append(train_loss)
        Train_macro_f1.append(train_macro_f1)
        Train_micro_f1.append(train_micro_f1)

        # Validation
        model.eval()
        val_loss = 0.0
        val_accuracy = 0.0
        with torch.no_grad():
            for statements, label_onehot, label, metadata_text, metadata_number in val_loader:
                statements = statements.to(DEVICE)
                label_onehot = label_onehot.to(DEVICE)
                label = label.to(DEVICE)
                metadata_text = metadata_text.to(DEVICE)
                metadata_number = metadata_number.to(DEVICE)

                val_outputs = model(statements, metadata_text, metadata_number)
                val_loss += criterion(val_outputs, label_onehot).item()
                _, val_predicted = torch.max(val_outputs, 1)
                val_accuracy += sum(val_predicted == label)
                val_predict_all += val_predicted.tolist()
                val_label_all += label.tolist()
        val_loss /= len(val_loader)
        val_accuracy /= len(val_loader.dataset)
        val_macro_f1 = f1_score(val_label_all, val_predict_all, average='macro')
        val_micro_f1 = f1_score(val_label_all, val_predict_all, average='micro')

        Val_acc.append(val_accuracy.tolist())
        Val_loss.append(val_loss)
        Val_macro_f1.append(val_macro_f1)
        Val_micro_f1.append(val_micro_f1)

        # if val_loss < best_valid_loss:
        #     best_valid_loss = val_loss
        #     torch.save(model.state_dict(), model_save)
        #     print(f'***** Best Result Updated at Epoch {epoch_trained}, Val Loss: {val_loss:.4f} *****')

        if val_loss < best_valid_loss:
            best_valid_loss = val_loss
            print(f'***** Best Result Updated at Epoch {epoch_trained}, Val Loss: {val_loss:.4f} *****')

        torch.save(model.state_dict(), model_save)

        # Print the losses and accuracy
        epoch_end_time = time.time()
        epoch_time = epoch_end_time - epoch_start_time

        print(f"Epoch [{epoch+1}/{num_epochs}], Time: {epoch_time:.2f}s, Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}, Train F1 Macro: {train_macro_f1:.4f}, Train F1 Micro: {train_micro_f1:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}, Val F1 Macro: {val_macro_f1:.4f}, Val F1 Micro: {val_micro_f1:.4f}")

        test(model, test_loader, model_save)

    end_time = time.time()
    training_time = end_time - start_time
    print(f'Total Training Time: {training_time:.2f}s')


train(num_epochs, model, train_loader, val_loader, optimizer, criterion, model_save)




***** Best Result Updated at Epoch 1, Val Loss: 0.3828 *****
Epoch [1/30], Time: 10.79s, Train Loss: 0.4345, Train Acc: 0.2736, Train F1 Macro: 0.2467, Train F1 Micro: 0.2736, Val Loss: 0.3828, Val Acc: 0.4346, Val F1 Macro: 0.3892, Val F1 Micro: 0.4346
Test Loss: 0.3863, Test Acc: 0.4002, Test F1 Macro: 0.3691, Test F1 Micro: 0.4002
***** Best Result Updated at Epoch 2, Val Loss: 0.3509 *****
Epoch [2/30], Time: 7.86s, Train Loss: 0.3640, Train Acc: 0.4544, Train F1 Macro: 0.3477, Train F1 Micro: 0.3640, Val Loss: 0.3509, Val Acc: 0.4735, Val F1 Macro: 0.4297, Val F1 Micro: 0.4540
Test Loss: 0.3597, Test Acc: 0.4475, Test F1 Macro: 0.4469, Test F1 Micro: 0.4475
***** Best Result Updated at Epoch 3, Val Loss: 0.3474 *****
Epoch [3/30], Time: 8.63s, Train Loss: 0.3343, Train Acc: 0.5101, Train F1 Macro: 0.4018, Train F1 Micro: 0.4127, Val Loss: 0.3474, Val Acc: 0.4642, Val F1 Macro: 0.4411, Val F1 Micro: 0.4574
Test Loss: 0.3586, Test Acc: 0.4230, Test F1 Macro: 0.4225, Test F1 Micro: 0