# CNN implementation for Twitter Sentiment Analysis

This is a Python implementation of the CNN proposed by Yoon Kim in https://arxiv.org/pdf/1408.5882.pdf.

The implementation is an adaption of the implementation proposed in https://github.com/bentrevett/pytorch-sentiment-analysis.

In [None]:
import torch
from torchtext import data
from torchtext import datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchtext.vocab as vocab
import time

import numpy as np
from torchnlp.word_to_vector import GloVe

## Hyperparameters and data setup

In [None]:
# INITIALIZE CLASS INSTANCES
TEXT = data.Field()
LABEL = data.LabelField()

fields = {'tokens.token': ('tokens', TEXT),
        'sentiment': ('sentiment', LABEL)}

train_data, val_data, test_data = data.TabularDataset.splits(
    path="../scripts", train="train.jsonl",
    validation="valid.jsonl", test="test.jsonl",
    format="json",
    fields=fields
)

# print(vars(train_data[0]))

# BUILD VOCABULARY
MAX_VOCAB_SIZE = 20000


# --- LOADING EMBEDDINGS FILE --- #
# Glove 200dim twitter pre-trained word embeddings
glove_embeddings = vocab.Vectors(name = '../../data/glove.twitter.27B/glove.twitter.27B.200d.txt',
                                  cache = 'glove_embeddings',
                                  unk_init = torch.Tensor.normal_)

TEXT.build_vocab(train_data, 
                 max_size = MAX_VOCAB_SIZE, 
                 vectors = glove_embeddings)

LABEL.build_vocab(train_data)
# check labels
# print(LABEL.vocab.stoi)

# CREATE ITERATORS
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, val_data, test_data),
    sort = False, #sort by s attribute (quote)
    batch_size = BATCH_SIZE, 
    device = device)

## Define network architecture and forward path

In [None]:
class CNN(nn.Module):
    '''Define network architecture and forward path'''
    def __init__(self, vocab_size, vector_size, 
                 n_filters, filter_sizes, output_dim,
                 dropout, pad_idx):
        
        super().__init__()
        
        # Define word embeddings settings from the INPUT words
        self.embedding = nn.Embedding(vocab_size, vector_size, padding_idx = pad_idx)
        
        # Specify CONVOLUTIONS with filters of different sizes (fs)
        self.convs = nn.ModuleList([nn.Conv2d(in_channels = 1,
                                              out_channels = n_filters,
                                              kernel_size = (fs, vector_size))
                                    for fs in filter_sizes])
        
        # Add a FULLY CONNECTED LAYER for the final classification
        self.linear = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        
        # Drop some of the nodes to increase robustness in training
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        '''Forward path of the network'''
        
        '''We have to unsqueeze our tensor to create a channel dimension value that the Conv2d needs'''
        
        #text = [sent len, batch size]
        
        text = text.permute(1, 0)
                
        #text = [batch size, sent len]
        # Get word embeddings
        embedded  = self.embedding(text)
        
        # embedded = [batch size, sent len, embedding dim]
        embedded = embedded.unsqueeze(1)
        
        # embedded = [batch size, 1, sent len, embedding dim]
        
        # Perform convolutions and apply activation functions
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        
        #each conv result = [batch size, n_filters, sent len - filter_size + 1]
        
        # Add pooling layer to reduce dimensionality
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        
        #each pool result(for each filter size group) = [batch size, n_filters]
        
        # Concatenate pooled outputs into a feature map, add the Dropout layer
        cat = self.dropout(torch.cat(pooled, dim = 1))
        #cat = torch.cat(pooled, dim = 1)
        #cat = [batch size, n_filters*len(filter_sizes)]
        
        # Pass the feature map with dropout to the fully connected layer
        return self.linear(cat)

In [None]:
# SET NETWORK PARAMETERS
# Vocab size
INPUT_DIM = len(TEXT.vocab)

# Vector size (lower-dimensional representation of each word)
EMBEDDING_DIM = 200

# Number of filters (for each filter SIZE)
NUM_FILTERS = 300

# N-grams that we want to analize using filters
FILTER_SIZES = [4,4,4]

# Output of the linear layer (probability of a negative review)
OUTPUT_DIM = len(LABEL.vocab)

# Proportion of units to drop
DROPOUT = 0.5

PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]

# CREATE MODEL INSTANCE
model = CNN(INPUT_DIM, EMBEDDING_DIM, NUM_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
TEXT.vocab.vectors.size()

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

Next, we initialize our embedding layer to use the vocabulary vectors.

In [None]:
# LOAD PRE-TRAINED WORD EMBEDDINGS
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)


Then, we initialize the unknown and padding token embeddings to all zeros.

In [None]:
# ZERO THE INITIAL WEIGHTS OF THE UNNKNOWN AND PADDING TOKENS
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

# The string token used as padding is '<pad>' by default
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]

model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

I use **CrossEntropyLoss** because is used when the examples belong to one of C classes, whereas BCEWithLogitsLoss is used when our examples exlusively belong to only 2 classes (0 and 1) and is also used in the case where our examples belong to between 0 and C classes (aka multilabel classification)

In [None]:
# SET MODEL PARAMETERS
optimizer = optim.Adam(model.parameters(), lr = 0.001)

criterion = nn.CrossEntropyLoss()

# place model and loss function on the GPU
model = model.to(device)
criterion = criterion.to(device)

# HELPER FUNCTIONS (accuracy, epoch time)

def categorical_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y)
    return correct.sum() / torch.FloatTensor([y.shape[0]])

def compute_precision(preds, y):
    ''' Precision = TP  / FP + TP '''
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y)
    incorrect = max_preds.squeeze(1).ne(y)
    correct.sum() / float(correct.sum() + incorrect.sum())

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs
print(optimizer)

In [None]:
# TRAIN LOOP FUNCTION
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        predictions = model(batch.tokens)
        
        loss = criterion(predictions, batch.sentiment)
        
        acc = categorical_accuracy(predictions, batch.sentiment)
        # Backpropagation
        loss.backward()
        # Optimize weights
        optimizer.step()
        # Record accuracy and loss
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# EVALUATION LOOP FUNCTION
from sklearn.metrics import precision_score, recall_score, f1_score
def evaluate_validation(model, iterator, criterion):
    
    global_loss = 0
    global_acc = 0
    epoch_loss = 0
    epoch_acc = 0
    # Evaluation mode, turn off dropout while evaluating
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.tokens)
            
            loss = criterion(predictions, batch.sentiment)
            acc = categorical_accuracy(predictions, batch.sentiment)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    global_loss = epoch_loss / len(iterator)
    global_acc = epoch_acc / len(iterator)
    
    return global_loss, global_acc

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
def evaluate_test(model, iterator, criterion, avg_method):
    
    global_loss = []
    global_acc = []
    
    true_labels = []
    predicted_labels = []

    # Evaluation mode, turn off dropout while evaluating
    model.eval()
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.tokens)
            
            true_labels = [*true_labels, *batch.sentiment.numpy().tolist()]
            predicted_labels = [*predicted_labels, *predictions.argmax(dim = 1).numpy().tolist()]
            
            loss = criterion(predictions, batch.sentiment)
            acc = categorical_accuracy(predictions, batch.sentiment)
            
            global_loss.append(loss.item())
            global_acc.append(acc.item()*100)

    avgRec = recall_score(true_labels, predicted_labels, average=avg_method)*100
    precision = precision_score(true_labels, predicted_labels, average=avg_method)*100

    # binary classif: 
    f1 = f1_score(true_labels, predicted_labels, average=None)
    fq = np.mean(f1[1], f1[2])*100
    # create confusion matrix
    cm_nsamples = confusion_matrix(true_labels, predicted_labels, labels=[0,1,2])
    cm = confusion_matrix(true_labels, predicted_labels, labels=[0,1,2], normalize="true")
    nrows, ncols = cm.shape
    
    #cm_sum = np.sum(cm, axis=1, keepdims=True)
    #cm_perc = cm / cm_sum.astype(float) * 100
    index = ['positive','neutral','negative']  
    columns = ['positive','neutral', 'negative']
    # transform to df for easier ploting
    cm_df = pd.DataFrame(cm,columns,index)
    cm_df.index.name = 'Actual labels'
    cm_df.columns.name = 'Predicted labels'
    annot = np.empty_like(cm).astype(str)    
    nsamples = [2375, 5937, 3972] # [positive, neutral, negative]
    for i in range(nrows):
        count = 0
        for j in range(ncols):
            p = cm[i, j]*100
            if i == j:
                s = np.sum(cm_nsamples[i])
                annot[i, j] = '%.2f%%\n%d/%d' % (p, cm_nsamples[i][j], s)
            elif cm_nsamples[i][j] == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.2f%%\n%d' % (p, cm_nsamples[i][j])
    
    plt.figure(figsize=(10,6))
    plt.title(f'Confusion Matrix\nAvgRec: {avgRec:.3f}% | F1: {f1:.3f}% | Acc: {np.mean(global_acc): .3f}% \nConfusion matrix')
    plot_cm = sns.heatmap(cm_df, annot=annot, fmt='', cmap='rocket_r', vmin=0, vmax=1)
    fig = plot_cm.get_figure()
    fig.savefig('baseline.png') 
    return np.mean(global_loss), np.mean(global_acc), precision, avgRec, f1

In [None]:
# TRAIN LOOP

N_EPOCHS = 5
best_valid_loss = float('inf')
val_loss = []
val_acc = []
tr_loss = []
tr_acc = []
for epoch in range(N_EPOCHS):
    
    start_time = time.time()
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate_validation(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    # Save training metrics
    val_loss.append(valid_loss)
    val_acc.append(valid_acc)
    tr_loss.append(train_loss)
    tr_acc.append(train_acc)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), './model.pt')

    print(f'Epoch: {epoch+1:02} | Epoch time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.3f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.3f}%')

## Accuracy and loss plots

In [None]:
import matplotlib.pyplot as plt
# Plot accuracy and loss
fig, axis = plt.subplots(1, 2, figsize=(15,5))
axis[0].plot(val_loss, label='Validation loss')
axis[0].plot(tr_loss, label='Training loss')
axis[0].set_title('Losses')
axis[0].set_xlabel('Epoch')
axis[0].set_ylabel('Loss')
axis[0].legend()
axis[1].plot(val_acc, label='Validation accuracy')
axis[1].plot(tr_acc, label='Training accuracy')
axis[1].set_title('Accuracies')
axis[1].set_xlabel('Epoch')
axis[1].set_ylabel('Accuracy')
plt.legend()
plt.show()

## Evaluate on the test set

In [None]:
model.load_state_dict(torch.load('./model.pt'))

test_loss, test_acc, test_prec, test_recall, test_f1 = evaluate_test(model, test_iterator, criterion, 'macro')

print(f'Macro avg. recall: {test_recall:.2f}% | Macro avg. precision: {test_prec:.2f}%')
print(f'Macro avg f1: {test_f1:.2f}%')
print(f'Accuracy: {test_acc:.2f}% | Loss: {test_loss:.3f}')

## Classify unlabeled dataset

In [None]:
import spacy
nlp = spacy.load('en')
model.load_state_dict(torch.load('./models/model.pt'))

def predict_class(model, sentence, min_len = 4):
    model.eval()
    tokenized = sentence.split(' ')
    if len(tokenized) < min_len:
        tokenized += ['<pad>'] * (min_len - len(tokenized))
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(1)
    preds = model(tensor)
    max_preds = preds.argmax(dim = 1, keepdim = True)
    return max_preds.item()

# english stopwords would be removed
from nltk.corpus import stopwords
import spacy
import numpy as np

nltk_stopwords = set(stopwords.words('english'))
sp = spacy.load('en_core_web_sm')
spacy_stopwords = sp.Defaults.stop_words
all_stopwords = np.unique(np.concatenate((list(nltk_stopwords), list(spacy_stopwords)), axis=None))

# read json file
import json
bbcproms = "../scripts/tweets2014.json"
with open(bbcproms, 'r') as json_file:
    proms_data = json.load(json_file)


# predict class for each element & add sentiment to tweet
classes = ['neutral', 'negative', 'positive']
count = 0
for el in proms_data['data']:
    el['tokens'] = [token for token in el['tokens'] if token['token'] not in all_stopwords]
    prediction = predict_class(model, el['cleaned_text'])
    sentiment = classes[prediction]
    el['sentiment'] = sentiment
    count += 1
    print(count)

# create json with classified tweets
output = '../scripts/bbcproms_2014_classified.json'
with open(output, 'w') as output_file:
    json.dump(proms_data, output_file, indent=3)