In [1]:
import torch
import pickle
from torch import nn
from torch.utils.data import DataLoader
from torch.autograd import Variable

print("PyTorch version:")
print(torch.__version__)
print("GPU Detected:")
# print(torch.cuda.is_available())
print(torch.backends.mps.is_available())

# defining a shortcut function for later:
import os

# gpu = torch.device("cuda:0")
gpu = torch.device("mps")

PyTorch version:
2.2.0
GPU Detected:
True


In [2]:
with open("./data/train_text", "rb") as f:
    train_text = pickle.load(f)
with open("./data/test_text", "rb") as f:
    test_text = pickle.load(f)

train_text_data = DataLoader(train_text, batch_size=1, shuffle=True)
test_text_data = DataLoader(test_text, batch_size=1, shuffle=True)

In [3]:
print(len(train_text_data.dataset))
print(len(test_text_data.dataset))

5707
1426


In [4]:
# check batch dimension
batch_size = train_text_data.batch_size
for data, label in train_text_data:
    print("shape: {0}".format(data.size()))
    break
sequence_length = data.size()[1]

shape: torch.Size([1, 807, 300])


In [5]:
from math import floor


class TextClassificationModel(nn.Module):
    def __init__(self, sequence_size, kernel_size, num_class, dropout, activation_fn):
        super(TextClassificationModel, self).__init__()
        self.hidden_layers = nn.ModuleList([])
        self.hidden_layers.append(
            nn.Conv1d(sequence_size, 512, kernel_size=kernel_size)
        )  # layer1
        self.hidden_layers.append(
            nn.Conv1d(512, 256, kernel_size=kernel_size)
        )  # layer2
        self.hidden_layers.append(
            nn.Conv1d(256, 128, kernel_size=kernel_size)
        )  # layer3
        self.flatten = nn.Flatten()  # Flatten layer
        self.dropout = nn.Dropout(dropout)
        # calculate the size of the flatten operation. 
        # vw size + 2 * padding - dilation(kernel -1 ) -1 )/ stride + 1
        L_out = floor((300 +   2*0 -   1*(kernel_size -   1) - 1) /   1 +   1)
        L_out = floor((L_out +   2*0 -   1*(kernel_size -   1) - 1) /   1 +   1)
        L_out = floor((L_out +   2*0 -   1*(kernel_size -   1) - 1) /   1 +   1)
        print(L_out)
        self.output_projection = nn.Linear(128 * L_out, num_class)
        self.nonlinearity = activation_fn

    def forward(self, x):
        for hidden_layer in self.hidden_layers:
            x = hidden_layer(x)
            x = self.dropout(x)
            x = self.nonlinearity(x)

        x = self.flatten(x)
        out = self.output_projection(x)

        out_distribution = nn.functional.log_softmax(out, dim=-1)
        return out_distribution


def train(
    train_dataloader, test_dataloader, nll_criterion, num_epochs, ffnn, ffnn_optimizer
):
    # A counter for the number of gradient updates we've performed.
    num_iter = 0

    # Iterate `num_epochs` times.
    for epoch in range(num_epochs):
        print("Starting epoch {}".format(epoch + 1))
        # Iterate over the train_dataloader, unpacking the images and labels
        for data, labels in train_dataloader:
            # If we're using the GPU, move reshaped_images and labels to the GPU.
            if gpu:
                data = data.to(gpu)
                labels = labels.to(gpu)

            # Run the forward pass through the model to get predicted log distribution.
            predicted = ffnn(data)

            # Calculate the loss
            batch_loss = nll_criterion(predicted, labels)

            # Clear the gradients as we prepare to backprop.
            ffnn_optimizer.zero_grad()

            # Backprop (backward pass), which calculates gradients.
            batch_loss.backward()

            # Take a gradient step to update parameters.
            ffnn_optimizer.step()

            # Increment gradient update counter.
            num_iter += 1

            # Calculate test set loss and accuracy every 500 gradient updates
            # It's standard to have this as a separate evaluate function, but
            # we'll place it inline for didactic purposes.
            if num_iter % 500 == 0:
                # Set model to eval mode, which turns off dropout.
                ffnn.eval()
                # Counters for the num of examples we get right / total num of examples.
                num_correct = 0
                total_examples = 0
                total_test_loss = 0

                with torch.no_grad():
                    # Iterate over the test dataloader
                    for test_data, test_labels in test_dataloader:

                        # If we're using the GPU, move tensors to the GPU.
                        if gpu:
                            test_data = test_data.to(gpu)
                            test_labels = test_labels.to(gpu)

                        # Run the forward pass to get predicted distribution.
                        predicted = ffnn(test_data)

                        # Calculate loss for this test batch. This is averaged, so multiply
                        # by the number of examples in batch to get a total.
                        total_test_loss += nll_criterion(
                            predicted, test_labels
                        ).data * test_labels.size(0)

                        # Get predicted labels (argmax)
                        _, predicted_labels = torch.max(predicted.data, 1)

                        # Count the number of examples in this batch
                        total_examples += test_labels.size(0)

                        # Count the total number of correctly predicted labels.
                        # predicted == labels generates a ByteTensor in indices where
                        # predicted and labels match, so we can sum to get the num correct.
                        num_correct += torch.sum(predicted_labels == test_labels.data)
                accuracy = 100 * num_correct / total_examples
                average_test_loss = total_test_loss / total_examples
                print(
                    "Iteration {}. Test Loss {}. Test Accuracy {}.".format(
                        num_iter, average_test_loss, accuracy
                    )
                )
                # Set the model back to train mode, which activates dropout again.
                ffnn.train()

In [6]:
model = TextClassificationModel(
    sequence_size=sequence_length, num_class=2, kernel_size=10, dropout=0.5, activation_fn=nn.ReLU()
)
nll_criterion = nn.NLLLoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.0005)
model.to(gpu)

273


  from .autonotebook import tqdm as notebook_tqdm


TextClassificationModel(
  (hidden_layers): ModuleList(
    (0): Conv1d(807, 512, kernel_size=(10,), stride=(1,))
    (1): Conv1d(512, 256, kernel_size=(10,), stride=(1,))
    (2): Conv1d(256, 128, kernel_size=(10,), stride=(1,))
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (dropout): Dropout(p=0.5, inplace=False)
  (output_projection): Linear(in_features=34944, out_features=2, bias=True)
  (nonlinearity): ReLU()
)

In [7]:
num_epochs = 10
train(
    train_text_data, 
    test_text_data, 
    nll_criterion, 
    num_epochs, 
    model, 
    optimiser
)


Starting epoch 1
Iteration 500. Test Loss 0.4533337950706482. Test Accuracy 79.1725082397461.
Iteration 1000. Test Loss 0.3758504390716553. Test Accuracy 84.43197631835938.
Iteration 1500. Test Loss 0.42299675941467285. Test Accuracy 81.69705200195312.
Iteration 2000. Test Loss 0.5005826354026794. Test Accuracy 78.9621353149414.
Iteration 2500. Test Loss 0.4278031885623932. Test Accuracy 80.22440338134766.
Iteration 3000. Test Loss 0.4671684503555298. Test Accuracy 78.26087188720703.
Iteration 3500. Test Loss 0.42800748348236084. Test Accuracy 79.87377166748047.
Iteration 4000. Test Loss 0.4158167541027069. Test Accuracy 81.06591796875.
Iteration 4500. Test Loss 0.4673496186733246. Test Accuracy 78.401123046875.
Iteration 5000. Test Loss 0.44849711656570435. Test Accuracy 77.0687255859375.
Iteration 5500. Test Loss 0.45144519209861755. Test Accuracy 78.54137420654297.
Starting epoch 2
Iteration 6000. Test Loss 0.43018999695777893. Test Accuracy 78.401123046875.
Iteration 6500. Test Los