#  Convolutional Neural Networks in Text Classification with PyTorch

The text classification problem can be approached in a number of ways with respect to encoding the text data to numerical values.

1. Text is modeled as the *frequency of occurrence of words* in a given text with respect of these words in the complete corpus. Example: `CountVectorizer()` and `TfidfVectorizer()` in `scikit-learn`.
2. Text is modeled as the *sequence of words or characters*. This type of approach is used mainly by the **Recurrent Neural Networks** (**RNN**).
3. Text is modeled as a *distribution of words in a given space*. This is achieved through the use of the **Convolutional Neural Network** architecture.

## What is a semantic space? 

Semantic spaces are representations of natural language that are capable of capturing meaning.

Reference: https://en.wikipedia.org/wiki/Semantic_space.

A *semantic space* is a way of representing the meaning of words using vectors, matrices, or other mathematical structures. 

The idea: 

**"Words that are similar in meaning will have similar or close vectors in the semantic space, while words that are different or unrelated will have distant or orthogonal vectors".**


Slogan: **"You shall know a word by the company it keeps"** (J.R. Firth).


For example, 

*   `fire and dog` are two words unrelated in their meaning, and in fact they are not often used in the same sentence. 
*   On the other hand, the words `dog and cat` are sometimes seen together, so they may share some aspect of meaning.

Mathematically,

![](images/cos.png)


## A common architecture for CNN in text classification


*   each word in a document is represented as an *embedding vector*, 
*   a single convolutional layer with m filters is applied, producing an m-dimensional vector for each document ngram.
*  The vectors are combined using max-pooling followed by a ReLU activation.
*  The result is then passed to a linear layer for the final classification.



### Word embedding

It is a special way of creating features that group together similar words. Word embeddings would create similar features for various shades of blue. Word embeddings have another interesting property: they are mathematical representations of words that obey intuitive rules. For example, in word embeddings, if we take the features for "King", subtract the features for "man", and add the features for "woman", we get a set of features that are very close to those of "queen".


word embeddings are a lookup table in PyTorch. They are a way of representing words as dense vectors of real numbers, one per word in your vocabulary. 

We can use the `torch.nn.Embedding` class or the `torch.nn.functional.embedding` function to create and retrieve word embeddings using indices. 

These modules take an input tensor of indices and return the corresponding embeddings from a weight matrix that is learned during training. You can also specify some optional parameters, such as padding index, `max_norm`, `norm_type`, `scale_grad_by_freq`, and `sparse`, to customize the behavior of the embedding layer. 

Sources:  

*   [Embedding — PyTorch 2.1 documentation](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html)
*   [torch.nn.functional.embedding — PyTorch 2.1 documentation](https://pytorch.org/tutorials/beginner/nlp/word_embeddings_tutorial.html)

*   [What "exactly" happens inside embedding layer in pytorch?](https://stackoverflow.com/questions/58718612/what-exactly-happens-inside-embedding-layer-in-pytorch)
*   [What kind of word embedding is used in the original transformer?](https://ai.stackexchange.com/questions/26235/what-kind-of-word-embedding-is-used-in-the-original-transformer)                                                                                                                                                                                                                                                                               

### A CNN Architecture

![](images/waakss1l.png)


___



## Text processing pipeline



### Step 1. Preprocessing text data

![](images/text_processing_pipeline_preprocessing.png)

In [1]:
import nltk
import numpy as np
import pandas as pd
import re 
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('stopwords')

class Preprocessing:
    
    def __init__(self):
    
        self.data = 'datasets/tweets.csv'

        self.X_raw = None
        self.y = None
        self.X_cleaned = None
        self.X_tokenized = None
        self.X_stopwords_removed = None
        self.X_lemmatized = None
        #self.vocabular = None
        #self.word2idx = None
        #self.vector_size = None
        #self.X_encoded = None
        #self.X_padded = None

    def load_data(self):
        # Reads the raw csv file and split into
        # features (X) and target (y)
        
        df = pd.read_csv(self.data)
        #df.drop(['id','keyword','location'], axis=1, inplace=True)
        
        self.X_raw = df['text'].values
        self.y = df['target'].values

    def clean_text(self):
        # Removes special symbols and just keep
        # words in lower or upper form
        
        self.X_cleaned = [x.lower() for x in self.X_raw]
        self.X_cleaned = [re.sub(r'[^\w\s]', '', x) for x in self.X_cleaned]
        
    def text_tokenized(self):
        # Tokenizes each sentence by implementing the nltk tool
        
        self.X_tokenized = [word_tokenize(x) for x in self.X_cleaned]

    def text_stopwords_removed(self):
        ## Create a list of stopwords
        
        stop_words = set(stopwords.words("english"))
        no_stopwords = []
        
        for tokens in self.X_tokenized:
            tokens = [token for token in tokens if token not in stop_words]
            no_stopwords.append(tokens)
            
        self.X_stopwords_removed = no_stopwords

    def text_lemmatized(self):
    
        lemmatizer = WordNetLemmatizer()

        text_lemmas = []
        for tokens in self.X_stopwords_removed:
            lemmas = [lemmatizer.lemmatize(word, pos="v") for word in tokens]
            lemmas = [lemmatizer.lemmatize(word, pos="n") for word in lemmas]
            lemmas = [lemmatizer.lemmatize(word, pos="a") for word in lemmas]
            lemmas = [lemmatizer.lemmatize(word, pos="r") for word in lemmas]
            lemmas = [lemmatizer.lemmatize(word, pos="s") for word in lemmas]
            text_lemmas.append(lemmas)
        
        self.X_lemmatized = text_lemmas
        
    #preprocessing.load_data()
    #preprocessing.clean_text()
    #preprocessing.text_tokenized()
    #preprocessing.text_stopwords_removed()
    #preprocessing.text_lemmatized()

[nltk_data] Downloading package punkt to /home/repl/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/repl/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /home/repl/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package stopwords to /home/repl/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Step 2. Encoding the cleaned, tokenized, stopwords-removed and lemmatized text data

![](images/text_processing_pipeline_encoding.png)

In [63]:
import copy

class Encoding:
    
    def __init__(self, lemmatized_texts, num_words):
        
        self.X_lemmatized = lemmatized_texts
        self.num_words = num_words
        self.vector_size = None
        self.fdist = None
        self.X_encoded_texts = None
        self.text4encoding = None
        self.X_padded_codes = None
    
    def text_encoding(self):

        vocabulary = dict()
        fdist = nltk.FreqDist()  
        
        for tokens in self.X_lemmatized:  
            for word in tokens:
                fdist[word] += 1
        
        self.fdist = fdist
        common_words = fdist.most_common(self.num_words)

        for idx, word in enumerate(common_words):
            vocabulary[word[0]] = (idx+1)
        
        self.vocabulary = vocabulary
      
        encoded_texts = list()
        texts4encoding = list()
        
        for tokens in self.X_lemmatized:
            temp_codes = list()
            temp_words = list()
            
            for word in tokens:
                if word in self.vocabulary.keys():
                    temp_codes.append(self.vocabulary[word])
                    temp_words.append(word)
                             
            encoded_texts.append(temp_codes)
            texts4encoding.append(temp_words)

        self.vector_size = np.max([len(x) for x in encoded_texts])
        self.X_encoded_texts = encoded_texts
        self.texts4encoding = texts4encoding
  
    def codes_padding(self):
        pad_idx = 0
        padded_codes = list()
        
        codes_from_texts = copy.deepcopy(self.X_encoded_texts)
        for encoded_text in codes_from_texts:
            while len(encoded_text) < self.vector_size:
                encoded_text.append(pad_idx)
            padded_codes.append(encoded_text)

        self.X_padded_codes = np.array(padded_codes)


## Step 3. Building Dataset and DataLoader 
![](images/text_processing_pipeline_dataset_dataloader.png)

In [45]:
from torch.utils.data import Dataset, DataLoader

class DatasetMapping(Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = y
        
    def __len__(self):
        return len(self.X)
      
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [46]:
from sklearn.model_selection import train_test_split

class   DatasetLoading:
    
    def __init__(self, padded_codes, targets):
        
        self.X = padded_codes
        self.y = targets
        self.X_train = None
        self.y_train = None
        self.X_test = None
        self.y_test = None
        
    def data_split(self):
        
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=0.20, random_state=20231116)    

    def data_mapping(self):
        
        self.train = DatasetMapping(self.X_train, self.y_train)
        self.test = DatasetMapping(self.X_test, self.y_test)

    def data_loading(self):
        self.loader_train = DataLoader(self.train, batch_size=params.batch_size)
        self.loader_test = DataLoader(self.test, batch_size=params.batch_size)  

## Building, training and evaluating the cnn model

![](images/text_classification_model.PNG)

### Step 4. Building the CNN model

In [179]:
import torch
import torch.nn as nn

class TextClassificationCNN(nn.ModuleList):

    def __init__(self, params):
        super(TextClassificationCNN, self).__init__()
        
        # Parameters regarding text preprocessing
        self.vector_size = params.vector_size
        self.num_words = params.num_words
        self.embedding_dim = params.embedding_dim
      
        # Dropout definition
        self.dropout = nn.Dropout(params.dropout)
       
        # CNN parameters definition
        # Kernel sizes
        self.kernel_1 = 2
        self.kernel_2 = 3
        self.kernel_3 = 4
        self.kernel_4 = 5
      
        # Output size for each convolution
        self.out_size = params.out_size
        # Number of strides for each convolution
        self.stride = params.stride
      
        # Embedding layer definition
        self.embedding = nn.Embedding(self.num_words + 1, self.embedding_dim, padding_idx=0)
      
        # Convolution layers definition
        self.conv_1 = nn.Conv1d(self.vector_size, self.out_size, 
                                self.kernel_1, self.stride)
        self.conv_2 = nn.Conv1d(self.vector_size, self.out_size, 
                                self.kernel_2, self.stride)
        self.conv_3 = nn.Conv1d(self.vector_size, self.out_size, 
                                self.kernel_3, self.stride)
        self.conv_4 = nn.Conv1d(self.vector_size, self.out_size, 
                                self.kernel_4, self.stride)
      
        
        # Max pooling layers definition
        self.pool_1 = nn.MaxPool1d(self.kernel_1, self.stride)
        self.pool_2 = nn.MaxPool1d(self.kernel_2, self.stride)
        self.pool_3 = nn.MaxPool1d(self.kernel_3, self.stride)
        self.pool_4 = nn.MaxPool1d(self.kernel_4, self.stride)

        # Fully connected layer definition
        #self.fc = nn.Linear(self.in_features_fc(), 1)
        self.fc1 = nn.Linear(self.in_features_fc(), 64) 
        self.fc2 = nn.Linear(64, 1)  
      
    def in_features_fc(self):
        '''Calculates the number of output features after Convolution + Max pooling
        Convolved_Features = 
        ((embedding_dim + (2 * padding) - dilation * (kernel - 1) - 1) / stride) + 1
        Pooled_Features = 
        ((embedding_dim + (2 * padding) - dilation * (kernel - 1) - 1) / stride) + 1
      
        source: https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html
        '''
        # Calculate size of convolved/pooled features for convolution_1/max_pooling_1 features
        out_conv_1 = ((self.embedding_dim - 1 * (self.kernel_1 - 1) - 1) / self.stride) + 1
        out_conv_1 = math.floor(out_conv_1)
        out_pool_1 = ((out_conv_1 - 1 * (self.kernel_1 - 1) - 1) / self.stride) + 1
        out_pool_1 = math.floor(out_pool_1)
      
        # Calculate size of convolved/pooled features for convolution_2/max_pooling_2 features
        out_conv_2 = ((self.embedding_dim - 1 * (self.kernel_2 - 1) - 1) / self.stride) + 1
        out_conv_2 = math.floor(out_conv_2)
        out_pool_2 = ((out_conv_2 - 1 * (self.kernel_2 - 1) - 1) / self.stride) + 1
        out_pool_2 = math.floor(out_pool_2)
      
        # Calculate size of convolved/pooled features for convolution_3/max_pooling_3 features
        out_conv_3 = ((self.embedding_dim - 1 * (self.kernel_3 - 1) - 1) / self.stride) + 1
        out_conv_3 = math.floor(out_conv_3)
        out_pool_3 = ((out_conv_3 - 1 * (self.kernel_3 - 1) - 1) / self.stride) + 1
        out_pool_3 = math.floor(out_pool_3)
      
        # Calculate size of convolved/pooled features for convolution_4/max_pooling_4 features
        out_conv_4 = ((self.embedding_dim - 1 * (self.kernel_4 - 1) - 1) / self.stride) + 1
        out_conv_4 = math.floor(out_conv_4)
        out_pool_4 = ((out_conv_4 - 1 * (self.kernel_4 - 1) - 1) / self.stride) + 1
        out_pool_4 = math.floor(out_pool_4)
      
        # Returns "flattened" vector (input for fully connected layer)
        return (out_pool_1 + out_pool_2 + out_pool_3 + out_pool_4) * self.out_size

    def forward(self, x):

        # Sequence of tokes is filterd through an embedding layer
        x = self.embedding(x)
      
        # Convolution layer 1 is applied
        x1 = self.conv_1(x)
        x1 = torch.relu(x1)
        x1 = self.pool_1(x1)
      
        # Convolution layer 2 is applied
        x2 = self.conv_2(x)
        x2 = torch.relu((x2))
        x2 = self.pool_2(x2)
   
         # Convolution layer 3 is applied
        x3 = self.conv_3(x)
        x3 = torch.relu(x3)
        x3 = self.pool_3(x3)
      
        # Convolution layer 4 is applied
        x4 = self.conv_4(x)
        x4 = torch.relu(x4)
        x4 = self.pool_4(x4)
      
        # The output of each convolutional layer is concatenated into a unique vector
        union = torch.cat((x1, x2, x3, x4), 2)
        union = union.reshape(union.size(0), -1)

        # The "flattened" vector is passed through a fully connected layer
        #out = self.fc(union)
        out1 = self.fc1(union)
      
        # Dropout is applied		
        out1 = self.dropout(out1)
        # Activation function is applied
        #out = torch.sigmoid(out)
        out1 = torch.relu(out1)
        
        out2 = self.fc2(out1)
        #out2 = self.dropout(out2)
        out2 = torch.sigmoid(out2)
      
        return out2.squeeze()

In [177]:
from dataclasses import dataclass

@dataclass
class Parameters:
    # Preprocessing parameters
    vector_size: int = 25   # standard length of each row vector in the input
    num_words: int = 9300  # number of words in the vocabulary
    test_size = 0.20         
    random_state = 42
   
    # Model parameters
    embedding_dim: int = 256
    out_size: int = 32
    stride: int = 2
    #dilation: int = 2
   
    
    # Training parameters
    epochs: int = 50
    batch_size: int = 128
    learning_rate: float = 0.001
    dropout: float = 0.05
    
params=Parameters()

In [135]:
### Step 1. Preprocessing
data = Preprocessing()
data.load_data()
data.clean_text()
data.text_tokenized()
data.text_stopwords_removed()
data.text_lemmatized()

### Step 2. Encoding
code = Encoding(data.X_lemmatized, params.num_words)
code.text_encoding()
code.codes_padding()

### Step 3. Dataset and DataLoader
dsl = DatasetLoading(code.X_padded_codes, data.y)
dsl.data_split()
dsl.data_mapping()
dsl.data_loading()

In [181]:
print(f"lemmatized row max length = {max([len(x) for x in code.X_lemmatized])}")
print(f"row max length = {max([len(x) for x in code.texts4encoding])}")
print(f"vocab length = {len(code.vocabulary)}")
print(f"unique words length = {len(code.fdist)}\n")

lemmatized row max length = 25
row max length = 25
vocab length = 9300
unique words length = 19999



### Step 4. Train and evaluate the CNN model

In [180]:
import math
import torch.optim as optim
import torch.nn.functional as F

loader_train = dsl.loader_train
loader_test = dsl.loader_test
y_train = dsl.y_train
y_test = dsl.y_test


cnn_model = TextClassificationCNN(params)
optimizer = optim.RMSprop(cnn_model.parameters(), lr= params.learning_rate)

# Starts training phase
for epoch in range(params.epochs):
    
    # Set model in training model
    cnn_model.train()
    train_predictions = []
    
     # Starts batch training
    for x_batch, y_batch in loader_train:

        y_batch = y_batch.type(torch.FloatTensor)
            
        # Feed the model
        y_pred = cnn_model(x_batch)
         
        # Loss calculation
        loss = F.binary_cross_entropy(y_pred, y_batch)
         
        # Clean gradientes
        optimizer.zero_grad()
         
        # Gradients calculation
        loss.backward()
         
        # Gradients update
        optimizer.step()
         
        # Save predictions
        train_predictions += list(y_pred.detach().numpy())
        
    # Metrics calculation for train accuracy
     
    true_positives = 0
    true_negatives = 0
    
    for true, pred in zip(y_train, train_predictions):
        if (pred >= 0.5) and (true == 1):
            true_positives += 1
        elif (pred < 0.5) and (true == 0):
            true_negatives += 1
        else:
        	pass
    train_accuracy = (true_positives + true_negatives) / len(y_train)
    
    # Metrics calculation for test accuracy
    
    # Set the model in evaluation mode
    cnn_model.eval()
    test_predictions = []
    
    # Start evaluation phase
    with torch.no_grad():
        for x_batch, y_batch in loader_test:
            y_pred = cnn_model(x_batch)
            test_predictions += list(y_pred.detach().numpy())
    
    true_positives = 0
    true_negatives = 0
    
    for true, pred in zip(y_test, test_predictions):
        if (pred >= 0.5) and (true == 1):
            true_positives += 1
        elif (pred < 0.5) and (true == 0):
            true_negatives += 1
        else:
        	pass
    test_accuracy = (true_positives + true_negatives) / len(y_test)
     
    
    
    print("Epoch: %d, loss: %.5f, Train accuracy: %.5f, Test accuracy: %.5f" % (epoch+1, loss.item(), train_accuracy, test_accuracy))

Epoch: 1, loss: 0.67441, Train accuracy: 0.54204, Test accuracy: 0.58634
Epoch: 2, loss: 0.62419, Train accuracy: 0.59672, Test accuracy: 0.61589
Epoch: 3, loss: 0.50018, Train accuracy: 0.70082, Test accuracy: 0.74064
Epoch: 4, loss: 0.40677, Train accuracy: 0.79343, Test accuracy: 0.77150
Epoch: 5, loss: 0.35273, Train accuracy: 0.84483, Test accuracy: 0.77610
Epoch: 6, loss: 0.29666, Train accuracy: 0.87816, Test accuracy: 0.78398
Epoch: 7, loss: 0.23659, Train accuracy: 0.89672, Test accuracy: 0.75968
Epoch: 8, loss: 0.23768, Train accuracy: 0.91412, Test accuracy: 0.79383
Epoch: 9, loss: 0.18433, Train accuracy: 0.92381, Test accuracy: 0.73342
Epoch: 10, loss: 0.13944, Train accuracy: 0.94236, Test accuracy: 0.78923
Epoch: 11, loss: 0.14775, Train accuracy: 0.95649, Test accuracy: 0.69337
Epoch: 12, loss: 0.11419, Train accuracy: 0.95961, Test accuracy: 0.73014
Epoch: 13, loss: 0.11690, Train accuracy: 0.95731, Test accuracy: 0.78398
Epoch: 14, loss: 0.10645, Train accuracy: 0.966