<a href="https://colab.research.google.com/github/pimverschuuren/FakeNews/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook implements a multilayered bidirectional LSTM model as a binary classifier that identifies text as fake or true.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Create objects that will facilitate the tokenization of the textual data. Only include the columns that contain the text and the label.



In [2]:
import torch

SEED = 1234

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

from torchtext.legacy import data
from torchtext.legacy import datasets

TEXT = data.Field(tokenize = 'spacy',
                  tokenizer_language = 'en_core_web_sm',
                  include_lengths = True)
LABEL = data.LabelField(dtype = torch.float)

fields = [(None, None), (None, None), (None, None), ('text', TEXT),('label',LABEL)]

Load the csv file into a pytorch dataset object. 

In [3]:
tab_data = data.TabularDataset(path = 'drive/MyDrive/Colab Notebooks/FakeNews/FakeNews.csv',
                                        format = 'csv',
                                        fields = fields,
                                        skip_header = True
)

Split the dataset into training, testing and validation datasets.

In [4]:
import random

SEED = 1234

train_data, test_data = tab_data.split(split_ratio=0.7, random_state = random.seed(SEED))
train_data, validation_data = train_data.split(split_ratio=0.75, random_state = random.seed(SEED))

Print the lengths of each dataset.

In [5]:
print("Number of training examples: "+str(len(train_data)))
print("Number of validation examples: "+str(len(validation_data)))
print("Number of testing examples: "+str(len(test_data)))

Number of training examples: 10858
Number of validation examples: 3619
Number of testing examples: 6205


Use the training data to create a vocabulary with a maximum amount of words. Here we pass pretrained word vectors from the GloVe project that will supply initial weights for the embedding layer.



In [6]:
max_vocab_size = 25000

TEXT.build_vocab(train_data, max_size=max_vocab_size, 
                 vectors = "glove.6B.100d", 
                 unk_init = torch.Tensor.normal_)
LABEL.build_vocab(train_data)

.vector_cache/glove.6B.zip: 862MB [02:46, 5.19MB/s]                           
100%|█████████▉| 399999/400000 [00:19<00:00, 20577.69it/s]


Print the size of the vocabulary. The size will be the maximum of words plus two for the unk and pad tokens for words outside the vocabulary and padding, respectively.

In [7]:
print("Unique tokens in TEXT vocabulary: "+str(len(TEXT.vocab)))
print("Unique tokens in LABEL vocabulary: "+str(len(LABEL.vocab)))

Unique tokens in TEXT vocabulary: 25002
Unique tokens in LABEL vocabulary: 2


Create an iterator that contains batches of text for the training loop later in the notebook. If run on Google Colab, turn on the GPU in Notebook settings. Sort the texts within the batch on length so that the model learns the most in the beginning of the batch.

In [21]:
import torch

batch_size = 32

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, validation_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, validation_data, test_data), 
    batch_sizes = (batch_size, batch_size, batch_size),
    sort=False,
    sort_within_batch=True,
    sort_key = lambda x: len(x.text),
    device = device)

Define a the LSTM model here.

In [22]:
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        
        super().__init__()
        
        # The embedding layer takes the tokenized text input with the vocab
        # size as dimension and outputs a vector with less elements of 
        # dimension embedding_dim. Include padding in this model to make
        # the sentences equal in length.
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        # Define the LSTM layers.
        self.lstm = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout)
        
        # The LSTM is bidirectional so 2 * hidden_dim outputs
        # are passed to the linear layer.
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        # Include dropout for regularization.
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        
        embedded = self.dropout(self.embedding(text))
        
        # Pack the text such that the hidden and cell state output of the 
        # the sequence is always of the non-padded token.
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'))

        # Pass the packed sequence to the LSTM
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        
        # Unpack the sequence.
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

        # Apply dropoout.
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))

        return self.fc(hidden)

Now lets instantiate an model object.


*   input_dim = the size of the vocabulary
*   embedding_dim = the size of the vector that the embedding layer outputs.
*   hiddem_dim = the size of the hidden states
*   output_dim = one that quantifies the class probability i.e. being fake news or not.





In [23]:
input_dim = len(TEXT.vocab)
embedding_dim = 100
hiddem_dim = 256
output_dim = 1
n_layers = 2
bidirectional = True
dropout = 0.5
pad_idx = TEXT.vocab.stoi[TEXT.pad_token]

model = LSTM(input_dim, 
            embedding_dim, 
            hiddem_dim, 
            output_dim, 
            n_layers, 
            bidirectional, 
            dropout, 
            pad_idx)

Get the embedding layer weights from the pretrained GloVe word vectors. And set copy them to the model.

In [24]:
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)

tensor([[-0.1117, -0.4966,  0.1631,  ...,  1.2647, -0.2753, -0.1325],
        [-0.8555, -0.7208,  1.3755,  ...,  0.0825, -1.1314,  0.3997],
        [-0.1077,  0.1105,  0.5981,  ..., -0.8316,  0.4529,  0.0826],
        ...,
        [ 0.9178, -0.0447,  0.0414,  ...,  0.1672, -0.6644, -0.1582],
        [-0.1308,  0.5205,  0.3289,  ..., -0.0279,  0.2165, -0.5381],
        [-0.0693,  0.7122,  0.4987,  ..., -0.0958,  0.3595, -0.5329]])

Set the weights in the embedding layer corresponding to the unk and pad tokens to zero. We do not want to learn anything from these.

In [25]:
unk_idx = TEXT.vocab.stoi[TEXT.unk_token]

model.embedding.weight.data[unk_idx] = torch.zeros(embedding_dim)
model.embedding.weight.data[pad_idx] = torch.zeros(embedding_dim)

Define the optimizer.

In [26]:
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

Define the loss criterion that will be minimized.

In [27]:
criterion = torch.nn.BCEWithLogitsLoss()

Place the model and loss function on the GPU

In [28]:
model = model.to(device)
criterion = criterion.to(device)

Define the accuracy of the model

In [29]:
def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    acc = correct.sum() / len(correct)
    return acc

Define the training loop

In [30]:
def train(model, iterator, optimizer, criterion):
    
    # Define placeholders for the loss and accuracy.
    epoch_loss = 0
    epoch_acc = 0
    
    # Set the model in train mode. This only affects 
    # dropout and batch norm layers.
    model.train()
    
    # Loop over the batches
    for batch in iterator:
        
        # Reset the gradient for each batch.
        optimizer.zero_grad()
        
        # Get the texts and text lengths.
        text, text_lengths = batch.text

        # Make the predictions.
        predictions = model(text, text_lengths).squeeze(1)
        
        # Calculate the loss function.
        loss = criterion(predictions, batch.label)
        
        # Calculate the accuracy.
        acc = binary_accuracy(predictions, batch.label)
        
        # Perform back propagation.
        loss.backward()
        
        # Take a step in parameter space.
        optimizer.step()
        
        # Sum the loss and accuracy.
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    # Return the average loss and accuracy.
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

Define a loop to evaluate the model on the testing data.

In [31]:
def evaluate(model, iterator, criterion):
    
    # Define placeholders for the loss and accuracy.
    epoch_loss = 0
    epoch_acc = 0
    
    # Set the model in eval mode. This only affects 
    # dropout and batch norm layers.
    model.eval()
    
    # Fix the parameters.
    with torch.no_grad():
    
        # Loop over the batches.
        for batch in iterator:

            # Get the texts and text lengths.
            text, text_lengths = batch.text

            # Make predictions.
            predictions = model(text, text_lengths).squeeze(1)
            
            # Calculate the loss.
            loss = criterion(predictions, batch.label)
            
            # Calculate the accuracy.
            acc = binary_accuracy(predictions, batch.label)

            # Sum the loss and accuracy.
            epoch_loss += loss.item()
            epoch_acc += acc.item()

    # Return the average loss and accuracy.
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

Define a function that will calculate the passed time for each epoch.

In [32]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

Run the training.

In [33]:
N_EPOCHS = 5

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, validation_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epoch: 01 | Epoch Time: 2m 18s
	Train Loss: 0.355 | Train Acc: 83.46%
	 Val. Loss: 0.097 |  Val. Acc: 96.03%
Epoch: 02 | Epoch Time: 2m 19s
	Train Loss: 0.111 | Train Acc: 95.69%
	 Val. Loss: 0.048 |  Val. Acc: 98.30%
Epoch: 03 | Epoch Time: 2m 19s
	Train Loss: 0.113 | Train Acc: 95.78%
	 Val. Loss: 0.035 |  Val. Acc: 98.79%
Epoch: 04 | Epoch Time: 2m 23s
	Train Loss: 0.038 | Train Acc: 98.73%
	 Val. Loss: 0.019 |  Val. Acc: 99.42%
Epoch: 05 | Epoch Time: 2m 24s
	Train Loss: 0.023 | Train Acc: 99.24%
	 Val. Loss: 0.017 |  Val. Acc: 99.48%


Test the model.

In [34]:
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.018 | Test Acc: 99.45%


The trained LSTM model shows a great accuracy of ~99% on the testing data.