<a href="https://colab.research.google.com/github/sayarghoshroy/Intro_to_DL_tutorial/blob/master/CNN_Text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using Convolutional Neural Networks for sentence Classification

This code uses a version of the model described in ) [CNN For Sentence Classification (Yoon Kim, 2014)](https://www.aclweb.org/anthology/D14-1181/
) in order to perform sentence classification on the IMDB dataset

In [1]:
!pip install -q torchtext==0.2.3
import torchtext
print (torchtext.__version__)


0.2.3


In [2]:
!pip install -q torch==0.4.1 
import torch
print (torch.__version__)

0.4.1


In [3]:
import os
import sys
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim
import numpy as np
import torchtext
from torchtext import data
from torchtext import datasets
from torchtext.vocab import Vectors, GloVe
print (torch.__version__)
print (torchtext.__version__)

0.4.1
0.2.3


The function to load the dataset, tokenize the sentences and also find out the GloVe embeddings for the vocabulary

In [5]:
def load_dataset(test_sen=None):

    """
    tokenizer : Breaks sentences into a list of words. If sequential=False, no tokenization is applied
    Field : A class that stores information about the way of preprocessing
    fix_length : An important property of TorchText is that we can let the input to be variable length, and TorchText will
                 dynamically pad each sequence to the longest sequence in that "batch". But here we are using fi_length which
                 will pad each sequence to have a fix length of 200.
                 
    build_vocab : It will first make a vocabulary or dictionary mapping all the unique words present in the train_data to an
                  idx and then after it will use GloVe word embedding to map the index to the corresponding word embedding.
                  
    vocab.vectors : This returns a torch tensor of shape (vocab_size x embedding_dim) containing the pre-trained word embeddings.
    BucketIterator : Defines an iterator that batches examples of similar lengths together to minimize the amount of padding needed.
    
    """
    
    tokenize = lambda x: x.split()
    TEXT = data.Field(sequential=True, tokenize=tokenize, lower=True, include_lengths=True, batch_first=True, fix_length=200)
    LABEL = data.LabelField()
    train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
    TEXT.build_vocab(train_data, vectors=GloVe(name='6B', dim=300))
    LABEL.build_vocab(train_data)

    word_embeddings = TEXT.vocab.vectors
    print ("Length of Text Vocabulary: " + str(len(TEXT.vocab)))
    print ("Vector size of Text Vocabulary: ", TEXT.vocab.vectors.size())
    print ("Label Length: " + str(len(LABEL.vocab)))

    train_data, valid_data = train_data.split() # Further splitting of training_data to create new training_data & validation_data
    train_iter, valid_iter, test_iter = data.BucketIterator.splits((train_data, valid_data, test_data), batch_size=32, sort_key=lambda x: len(x.text), repeat=False, shuffle=True)

    '''Alternatively we can also use the default configurations'''
    # train_iter, test_iter = datasets.IMDB.iters(batch_size=32)

    vocab_size = len(TEXT.vocab)

    return TEXT, vocab_size, word_embeddings, train_iter, valid_iter, test_iter

In [6]:
TEXT, vocab_size, word_embeddings, train_iter, valid_iter, test_iter = load_dataset()

downloading aclImdb_v1.tar.gz


.vector_cache/glove.6B.zip: 862MB [02:41, 5.35MB/s]                          
100%|██████████| 400000/400000 [00:37<00:00, 10768.76it/s]


Length of Text Vocabulary: 251639
Vector size of Text Vocabulary:  torch.Size([251639, 300])
Label Length: 2


Clipping the gradient to a maximum and minimum possible value to prevent the exploding gradient problem

In [7]:
def clip_gradient(model, clip_value):
    params = list(filter(lambda p: p.grad is not None, model.parameters()))
    for p in params:
        p.grad.data.clamp_(-clip_value, clip_value)

This function is used to train the given model using an Adam optimiser 

In [8]:
def train_model(model, train_iter, epoch):
    total_epoch_loss = 0
    total_epoch_acc = 0
    model.cuda()
    optim = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()))
    steps = 0
    model.train()
    for idx, batch in enumerate(train_iter):
        text = batch.text[0]
        target = batch.label
        target = torch.autograd.Variable(target).long()
        if torch.cuda.is_available():
            text = text.cuda()
            target = target.cuda()
        if (text.size()[0] is not 32):# One of the batch returned by BucketIterator has length different than 32.
            continue
        optim.zero_grad()
        prediction = model(text)
        loss = loss_fn(prediction, target)
        num_corrects = (torch.max(prediction, 1)[1].view(target.size()).data == target.data).float().sum()
        acc = 100.0 * num_corrects/len(batch)
        loss.backward()
        clip_gradient(model, 1e-1)
        optim.step()
        steps += 1
        
        if steps % 100 == 0:
            print (f'Epoch: {epoch+1}, Idx: {idx+1}, Training Loss: {loss.item():.4f}, Training Accuracy: {acc.item(): .2f}%')
        
        total_epoch_loss += loss.item()
        total_epoch_acc += acc.item()
        
    return total_epoch_loss/len(train_iter), total_epoch_acc/len(train_iter)

In [9]:
def eval_model(model, val_iter):
    total_epoch_loss = 0
    total_epoch_acc = 0
    model.eval()
    with torch.no_grad():
        for idx, batch in enumerate(val_iter):
            text = batch.text[0]
            if (text.size()[0] is not 32):
                continue
            target = batch.label
            target = torch.autograd.Variable(target).long()
            if torch.cuda.is_available():
                text = text.cuda()
                target = target.cuda()
            prediction = model(text)
            loss = loss_fn(prediction, target)
            num_corrects = (torch.max(prediction, 1)[1].view(target.size()).data == target.data).sum()
            acc = 100.0 * num_corrects/len(batch)
            total_epoch_loss += loss.item()
            total_epoch_acc += acc.item()

    return total_epoch_loss/len(val_iter), total_epoch_acc/len(val_iter)

Defining the model class

In [10]:
class CNN(nn.Module):
	def __init__(self, batch_size, output_size, in_channels, out_channels, kernel_heights, stride, padding, keep_probab, vocab_size, embedding_length, weights):
		super(CNN, self).__init__()
		
		"""
		Arguments
		---------
		batch_size : Size of each batch which is same as the batch_size of the data returned by the TorchText BucketIterator
		output_size : 2 = (pos, neg)
		in_channels : Number of input channels. Here it is 1 as the input data has dimension = (batch_size, num_seq, embedding_length)
		out_channels : Number of output channels after convolution operation performed on the input matrix
		kernel_heights : A list consisting of 3 different kernel_heights. Convolution will be performed 3 times and finally results from each kernel_height will be concatenated.
		keep_probab : Probability of retaining an activation node during dropout operation
		vocab_size : Size of the vocabulary containing unique words
		embedding_length : Embedding dimension of GloVe word embeddings
		weights : Pre-trained GloVe word_embeddings which we will use to create our word_embedding look-up table
		--------
		
		"""
		self.batch_size = batch_size
		self.output_size = output_size
		self.in_channels = in_channels
		self.out_channels = out_channels
		self.kernel_heights = kernel_heights
		self.stride = stride
		self.padding = padding
		self.vocab_size = vocab_size
		self.embedding_length = embedding_length
		
		self.word_embeddings = nn.Embedding(vocab_size, embedding_length)
		self.word_embeddings.weight = nn.Parameter(weights, requires_grad=False)
		self.conv1 = nn.Conv2d(in_channels, out_channels, (kernel_heights[0], embedding_length), stride, padding)
		self.conv2 = nn.Conv2d(in_channels, out_channels, (kernel_heights[1], embedding_length), stride, padding)
		self.conv3 = nn.Conv2d(in_channels, out_channels, (kernel_heights[2], embedding_length), stride, padding)
		self.dropout = nn.Dropout(keep_probab)
		self.label = nn.Linear(len(kernel_heights)*out_channels, output_size)
	
	def conv_block(self, input, conv_layer):
		conv_out = conv_layer(input)# conv_out.size() = (batch_size, out_channels, dim, 1)
		activation = F.relu(conv_out.squeeze(3))# activation.size() = (batch_size, out_channels, dim1)
		max_out = F.max_pool1d(activation, activation.size()[2]).squeeze(2)# maxpool_out.size() = (batch_size, out_channels)
		
		return max_out
	
	def forward(self, input_sentences, batch_size=None):
		
		"""
		The idea of the Convolutional Neural Netwok for Text Classification is very simple. We perform convolution operation on the embedding matrix 
		whose shape for each batch is (num_seq, embedding_length) with kernel of varying height but constant width which is same as the embedding_length.
		We will be using ReLU activation after the convolution operation and then for each kernel height, we will use max_pool operation on each tensor 
		and will filter all the maximum activation for every channel and then we will concatenate the resulting tensors. This output is then fully connected
		to the output layers consisting two units which basically gives us the logits for both positive and negative classes.
		
		Parameters
		----------
		input_sentences: input_sentences of shape = (batch_size, num_sequences)
		batch_size : default = None. Used only for prediction on a single sentence after training (batch_size = 1)
		
		Returns
		-------
		Output of the linear layer containing logits for pos & neg class.
		logits.size() = (batch_size, output_size)
		
		"""
		
		input = self.word_embeddings(input_sentences)
		# input.size() = (batch_size, num_seq, embedding_length)
		input = input.unsqueeze(1)
		# input.size() = (batch_size, 1, num_seq, embedding_length)
		max_out1 = self.conv_block(input, self.conv1)
		max_out2 = self.conv_block(input, self.conv2)
		max_out3 = self.conv_block(input, self.conv3)
		
		all_out = torch.cat((max_out1, max_out2, max_out3), 1)
		# all_out.size() = (batch_size, num_kernels*out_channels)
		fc_in = self.dropout(all_out)
		# fc_in.size()) = (batch_size, num_kernels*out_channels)
		logits = self.label(fc_in)
		
		return logits

Listing all Hyper-parameters

In [23]:

learning_rate = 2e-5
batch_size = 32
output_size = 2
in_channels = 1
out_channels = 100
kernel_heights = [2,3,4]
keep_probab =0.5
stride =1
padding =0
vocab_size = len(TEXT.vocab)
embedding_length = 300
word_embeddings =  TEXT.vocab.vectors

loss_fn = F.cross_entropy


model = CNN(batch_size, output_size, in_channels, out_channels, kernel_heights, stride, padding, keep_probab, vocab_size, embedding_length, word_embeddings)

Training the model

In [24]:

for epoch in range(10):
    train_loss, train_acc = train_model(model, train_iter, epoch)
    val_loss, val_acc = eval_model(model, valid_iter)
    
    print(f'Epoch: {epoch+1:02}, Train Loss: {train_loss:.3f}, Train Acc: {train_acc:.2f}%, Val. Loss: {val_loss:3f}, Val. Acc: {val_acc:.2f}%')
    
test_loss, test_acc = eval_model(model, test_iter)
print(f'Test Loss: {test_loss:.3f}, Test Acc: {test_acc:.2f}%')


Epoch: 1, Idx: 100, Training Loss: 0.5618, Training Accuracy:  78.12%
Epoch: 1, Idx: 200, Training Loss: 0.5752, Training Accuracy:  68.75%
Epoch: 1, Idx: 300, Training Loss: 0.3646, Training Accuracy:  84.38%
Epoch: 1, Idx: 400, Training Loss: 0.4188, Training Accuracy:  81.25%
Epoch: 1, Idx: 500, Training Loss: 0.8118, Training Accuracy:  68.75%


  return Variable(arr, volatile=not train), lengths
  return Variable(arr, volatile=not train)


Epoch: 01, Train Loss: 0.500, Train Acc: 74.68%, Val. Loss: 0.409318, Val. Acc: 79.84%
Epoch: 2, Idx: 100, Training Loss: 0.3630, Training Accuracy:  84.38%
Epoch: 2, Idx: 200, Training Loss: 0.2427, Training Accuracy:  84.38%
Epoch: 2, Idx: 300, Training Loss: 0.6100, Training Accuracy:  75.00%
Epoch: 2, Idx: 400, Training Loss: 0.5128, Training Accuracy:  68.75%
Epoch: 2, Idx: 500, Training Loss: 0.3353, Training Accuracy:  84.38%
Epoch: 02, Train Loss: 0.404, Train Acc: 81.58%, Val. Loss: 0.363430, Val. Acc: 83.05%
Epoch: 3, Idx: 100, Training Loss: 0.4209, Training Accuracy:  81.25%
Epoch: 3, Idx: 200, Training Loss: 0.3722, Training Accuracy:  81.25%
Epoch: 3, Idx: 300, Training Loss: 0.4726, Training Accuracy:  81.25%
Epoch: 3, Idx: 400, Training Loss: 0.4197, Training Accuracy:  75.00%
Epoch: 3, Idx: 500, Training Loss: 0.5725, Training Accuracy:  75.00%
Epoch: 03, Train Loss: 0.352, Train Acc: 84.21%, Val. Loss: 0.364105, Val. Acc: 83.29%
Epoch: 4, Idx: 100, Training Loss: 0.32

Testing the model on custom input

In [30]:
test_sen1 = "This is one of the best creation of Nolan. I can say, it's his magnum opus. Loved the soundtrack and especially those creative dialogues."
test_sen2 = "Ohh, such a ridiculous movie. Not gonna recommend it to anyone. Complete waste of time and money."

test_sen1 = TEXT.preprocess(test_sen1)
test_sen1 = [[TEXT.vocab.stoi[x] for x in test_sen1]]

test_sen2 = TEXT.preprocess(test_sen2)
test_sen2 = [[TEXT.vocab.stoi[x] for x in test_sen2]]

with torch.no_grad():
  test_sen = np.asarray(test_sen1)
  test_sen = torch.LongTensor(test_sen)
  test_tensor = Variable(test_sen)
  test_tensor = test_tensor.cuda()
  model.eval()
  output = model(test_tensor, 1)
  out = F.softmax(output, 1)
  if (torch.argmax(out[0]) == 1):
      print ("Sentiment: Positive")
  else:
      print ("Sentiment: Negative")

with torch.no_grad():
  test_sen = np.asarray(test_sen2)
  test_sen = torch.LongTensor(test_sen)
  test_tensor = Variable(test_sen)
  test_tensor = test_tensor.cuda()
  model.eval()
  output = model(test_tensor, 1)
  out = F.softmax(output, 1)
  if (torch.argmax(out[0]) == 1):
      print ("Sentiment: Positive")
  else:
      print ("Sentiment: Negative")



Sentiment: Positive
Sentiment: Negative
