# 1. BART: Denoising Autoencoder for Pretraining Sequence-to-Sequence Models [Multi-Class Classifier]:
### BART is trained by (1) corrupting text with an arbitrary noising function, and (2) learning a model to reconstruct the original text. It uses a standard Tranformer-based neural machine translation architecture which, despite its simplicity, can be seen as generalizing BERT (due to the bidirectional encoder), GPT (with the left-to-right decoder), and other recent pre- training schemes. BART is particularly effective when fine tuned for text generation but also works well for comprehension tasks. It matches the performance of RoBERTa on GLUE and SQuAD, and achieves new state-of-the-art results on a range of abstractive di-alogue, question answering, and summarization tasks, with gains of up to 3.5 ROUGE. BART also provides a 1.1 BLEU increase over a back-translation system for machine translation, with only target language pretraining. In the below task we utilize this pre-trained model for Zero-shot Classification.




## **Libraries/Dependencies**

In [10]:
!pip install transformers



In [11]:
# Import all the required libraries
# Use Kaggle's pre-tuned notebooks to get the optimal versions of all the dependencies

import nltk
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
# nltk.download('stopwords')
from string import punctuation
from collections import Counter
from nltk.corpus import stopwords
from torch.utils.data import TensorDataset, DataLoader

In [12]:
# Import all the required libraries
from transformers import pipeline
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

In [13]:
# Create dataframe for final sentiment classification result
def createDataFrame(labels, confidence, tweet):
    labels = pd.DataFrame({'Labels': labels})
    confidence = pd.DataFrame({'Confidence Scores': confidence})
    column_values = ['Labels', 'Confidence']
    sentiment_scores = pd.concat([labels,confidence], ignore_index=False, axis=1)
    print("\n--------------------------------------------------------------------------------------")
    print(f"\n Entered input sentence: {tweet}")
    print("\n Sentiment of the tweet (Probability Distribution): ")
    print(sentiment_scores.to_string(index=False))
    #print("--------------------------------------------------------------------------------------")

In [14]:
def sentiment_bart(tweet):
    labels = []
    confidence = []

    # Possible Sentiment Categories
    #candidate_labels = ["happy", "sad", "warn", "angry", "sorrow", "alert", "neutral"]
    candidate_labels = ["admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion", "curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment", "excitement", "fear", "gratitude", "grief", "joy", "love",  "nervousness", "optimism", "pride", "realization", "relief", "remorse", "sadness", "surprise", "neutral"]
    #candidate_labels = ["OYC", "DTC", "NCF", "KNY", "DCF"]

    # Send the labels and tweets to the classifier pipeline
    result = classifier(tweet, candidate_labels)

    # Extract the labels from results dictionary
    labels.append(result["labels"])
    labels = [item for sublist in labels for item in sublist] # Flatten the list of lists into list

    # Extract the labels from results dictionary
    confidence.append(result["scores"])
    confidence = [(str(float(item)*100))[:6]+" %" for sublist in confidence for item in sublist] # Flatten the list of lists into list

    createDataFrame(labels,confidence, tweet)

In [15]:
# Driver program
print("Neural Sentiment Analysis of COVID-19 Tweets with BART")
print("\n------Available Options------")
print("1. Inference on Sample Tweets")
print("2. Enter Custom Tweets/Sentences")
print("3. Exit")
print("\nPlease select an option from the above:")


sample_1 = 'Many lost their jobs because of covid and it is highly dangerous'
sentiment_bart(sample_1)

sample_2 = 'I am happy that my family members are safe in this tough times'
sentiment_bart(sample_2)

"""
while(True):
    choice = int(input())

    if choice == 1:
        sample_1 = 'Many lost their jobs because of covid and it is highly dangerous'
        sentiment_bart(sample_1)

        sample_2 = 'I am happy that my family members are safe in this tough times'
        sentiment_bart(sample_2)

    elif choice == 2:
        print("\nPlease enter a sentence/tweet:")
        user_input = input()
        sentiment_bart(user_input)

    elif choice == 3:
        print("\nExiting...")
        break
"""

Neural Sentiment Analysis of COVID-19 Tweets with BART

------Available Options------
1. Inference on Sample Tweets
2. Enter Custom Tweets/Sentences
3. Exit

Please select an option from the above:

--------------------------------------------------------------------------------------

 Entered input sentence: Many lost their jobs because of covid and it is highly dangerous

 Sentiment of the tweet (Probability Distribution): 
        Labels Confidence Scores
disappointment          21.438 %
   disapproval          12.346 %
     confusion          8.9288 %
   realization          6.8328 %
       remorse          5.9936 %
          fear          5.2666 %
      surprise          4.1926 %
   nervousness          3.8910 %
 embarrassment          3.5007 %
       disgust          3.1420 %
       sadness          2.6100 %
         grief          2.3548 %
        desire          2.1751 %
     annoyance          1.9904 %
    admiration          1.9645 %
         pride          1.8590 %
      ap

'\nwhile(True):\n    choice = int(input())\n\n    if choice == 1:\n        sample_1 = \'Many lost their jobs because of covid and it is highly dangerous\'\n        sentiment_bart(sample_1)\n\n        sample_2 = \'I am happy that my family members are safe in this tough times\'\n        sentiment_bart(sample_2)\n\n    elif choice == 2:\n        print("\nPlease enter a sentence/tweet:")\n        user_input = input()\n        sentiment_bart(user_input)\n\n    elif choice == 3:\n        print("\nExiting...")\n        break\n'

# 2. Deep Long Short Term Memory Networks [Binary Classifier]:
### Long short-term memory is an artificial recurrent neural network architecture used in the field of deep learning. Unlike standard feedforward neural networks, LSTM has feedback connections. It can not only process single data points, but also entire sequences of data.

## **Load Dataset & Initialize GPU**

In [16]:
# Load the transfer learning tweet dataset
sentiment_df = pd.read_csv('goemotions_3.csv')

In [17]:
# Checking if NVIDIA Graphics Card and CUDA is available
gpu_available = torch.cuda.is_available

if gpu_available:
    print('Parallely Processing using CUDA')
else:
    print('No CUDA Detected')

Parallely Processing using CUDA


## **Pre-processing & Inference Module Definitions**

In [18]:
# Pre-process the text and perform Stemming, Lemmatization and Stop-word removal
def text_preprocessing(text):
    remove_punctuation = [ch for ch in text if ch not in punctuation]
    remove_punctuation = "".join(remove_punctuation).split()
    filtered_text = [word.lower() for word in remove_punctuation if word.lower() not in stopwords.words('english')]
    return filtered_text


# Pad blank topken to keep the length of tweets consistent - mandatory to normalize and train the model
def pad_features(reviews_int, seq_length):
    features = np.zeros((len(reviews_int), seq_length), dtype=int)
    for i, row in enumerate(reviews_int):
        if len(row)!=0:
            features[i, -len(row):] = np.array(row)[:seq_length]
    return features

# Convert the sentences into stream of tokens
def tokenize(tweet):
    test_ints = []
    test_ints.append([vocab_to_int[word] for word in tweet])
    return test_ints

# Predict the sentiment of the tweet - performs binary classification using the model inference
def sentiment(net, test_tweet, seq_length=50):
    print("\n--------------------------------------------------------------------------------------")
    print(f"\n Original input sentence: {test_tweet}")
    test_tweet = text_preprocessing(test_tweet)
    tokenized_tweet = tokenize(test_tweet)

    print(f"\n Pre-processed input sentence: {test_tweet}")
    #print(f"\nSentence converted into tokens:\n{tokenized_tweet}")

    padded_tweet = pad_features(tokenized_tweet, 50)
    feature_tensor = torch.from_numpy(padded_tweet)
    batch_size = feature_tensor.size(0)

    if gpu_available:
        feature_tensor = feature_tensor.cuda()

    h = net.init_hidden(batch_size)
    output, h = net(feature_tensor, h)

    print(output)

    predicted_sentiment = torch.round(output.squeeze())
    print(predicted_sentiment)
    print('ok')

    #if predicted_sentiment == 1:
    #    print("\n Sentiment: Positive")

    #else:
    #    print("\n Sentiment: Negative")

In [19]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [20]:
# Code block to invoke Pre-processing, Padding and Tokenization operations on the tweet

sentiment_df.loc[:, 'text'] = sentiment_df['text'].apply(text_preprocessing)

reviews_split = []
for i, j in sentiment_df.iterrows():
    reviews_split.append(j['text'])

words = []
for review in reviews_split:
    for word in review:
        words.append(word)

counts = Counter(words)
vocab = sorted(counts, key=counts.get, reverse=True)
vocab_to_int = {word:ii for ii, word in enumerate(vocab, 1)}

encoded_reviews = []
for review in reviews_split:
    encoded_reviews.append([vocab_to_int[word] for word in review])

labels_to_int = []
for i, j in sentiment_df.iterrows():
    if j['sentiment']=='admiration':
        labels_to_int.append(1)
    elif j['sentiment']=='amusement':
        labels_to_int.append(2)
    elif j['sentiment']=='anger':
        labels_to_int.append(3)
    elif j['sentiment']=='annoyance':
        labels_to_int.append(4)
    elif j['sentiment']=='approval':
        labels_to_int.append(5)
    elif j['sentiment']=='caring':
        labels_to_int.append(6)
    elif j['sentiment']=='confusion':
        labels_to_int.append(7)
    elif j['sentiment']=='curiosity':
        labels_to_int.append(8)
    elif j['sentiment']=='desire':
        labels_to_int.append(9)
    elif j['sentiment']=='disappointment':
        labels_to_int.append(10)
    elif j['sentiment']=='disapproval':
        labels_to_int.append(11)
    elif j['sentiment']=='disgust':
        labels_to_int.append(12)
    elif j['sentiment']=='embarrassment':
        labels_to_int.append(13)
    elif j['sentiment']=='excitement':
        labels_to_int.append(14)
    elif j['sentiment']=='fear':
        labels_to_int.append(15)
    elif j['sentiment']=='gratitude':
        labels_to_int.append(16)
    elif j['sentiment']=='grief':
        labels_to_int.append(17)
    elif j['sentiment']=='joy':
        labels_to_int.append(18)
    elif j['sentiment']=='love':
        labels_to_int.append(19)
    elif j['sentiment']=='nervousness':
        labels_to_int.append(20)
    elif j['sentiment']=='optimism':
        labels_to_int.append(21)
    elif j['sentiment']=='pride':
        labels_to_int.append(22)
    elif j['sentiment']=='realization':
        labels_to_int.append(23)
    elif j['sentiment']=='relief':
        labels_to_int.append(24)
    elif j['sentiment']=='remorse':
        labels_to_int.append(25)
    elif j['sentiment']=='sadness':
        labels_to_int.append(26)
    elif j['sentiment']=='surprise':
        labels_to_int.append(27)
    else:
        labels_to_int.append(0)

reviews_len = Counter([len(x) for x in encoded_reviews])
non_zero_idx = [ii for ii, review in enumerate(encoded_reviews) if len(encoded_reviews)!=0]
encoded_reviews = [encoded_reviews[ii] for ii in non_zero_idx]
encoded_labels = np.array([labels_to_int[ii] for ii in non_zero_idx])

seq_length = 50
padded_features= pad_features(encoded_reviews, seq_length)

## **Dataset and Dataloaders for Train, Test and Validation**

In [21]:
# Split the dataset into Train (80%), Validation (10%) & Test (10%)
batch_size = 1
split_frac = 0.8
split_idx = int(len(padded_features)*split_frac)

training_x, remaining_x = padded_features[:split_idx], padded_features[split_idx:]
training_y, remaining_y = encoded_labels[:split_idx], encoded_labels[split_idx:]

test_idx = int(len(remaining_x)*0.5)
val_x, test_x = remaining_x[:test_idx], remaining_x[test_idx:]
val_y, test_y = remaining_y[:test_idx], remaining_y[test_idx:]

# Transform the data into a Tensor datastructure
train_data = TensorDataset(torch.from_numpy(training_x), torch.from_numpy(training_y))
test_data = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))
valid_data = TensorDataset(torch.from_numpy(val_x), torch.from_numpy(val_y))

# Prepare the dataloader for Train, Test and Validation
train_loader = DataLoader(train_data, batch_size=batch_size)
test_loader = DataLoader(test_data, batch_size=batch_size)
valid_loader = DataLoader(valid_data, batch_size=batch_size)

## **LSTM Model Architecture**

In [22]:
# Embedding Dimension of Tokens
embedding_dim = 400

# Embedding Dimension of Hidden Layers
hidden_dim = 256

# Output of the model is binary (either Positive or Negative)
output_size = 1

# Number of hidden LSTM cells
n_layers = 2
vocab_size = len(vocab_to_int)+1

In [23]:
# Structure of the Neural Network
class LSTM(nn.Module):
    def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.2):
        super(LSTM, self).__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.embedding_layer = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_dim, output_size)
        self.sig = nn.Sigmoid()

    def forward(self, x, hidden):
        batch_size = x.size(0)
        x = x.long()
        embeds = self.embedding_layer(x)
        lstm_out, hidden = self.lstm(embeds, hidden)
        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
        out = self.dropout(lstm_out)
        out = self.fc(out)
        sig_out = self.sig(out)
        sig_out = sig_out.view(batch_size, -1)
        sig_out = sig_out[:, -1]
        return sig_out, hidden

    def init_hidden(self, batch_size):
        weights = next(self.parameters()).data
        if gpu_available:
            hidden = (weights.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(),weights.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda())
        else:
            hidden = (weights.new(self.n_layers, batch_size, self.hidden_dim).zero_(),weights.new(self.n_layers, batch_size, self.hidden_dim).zero())
        return hidden

net = LSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers)

In [24]:
# Hyperparameters required for training of the network

# Learning Rate
lr = 0.001
#lr = .0001

# Loss Function - Binary Cross Entropy
criterion = nn.BCELoss()

# Gradient Descent based Optimizer - ADAM (Adaptive LR)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)

# Number of epochs to train the model
epochs = 1
count = 0

# Step size
#print_every = 200
print_every = 2000
clip = 5

## **Model Training**

In [25]:
# Train the Neural Network
# Off-load the model to CUDA
if gpu_available:
    net.cuda()

net.train()
for e in range(epochs):
    h = net.init_hidden(batch_size)

    for inputs, labels in train_loader:
        count += 1

        if gpu_available:
            inputs, labels = inputs.cuda(), labels.cuda()
        h = tuple([each.data for each in h])

        net.zero_grad()
        outputs, h = net(inputs, h)
        loss = criterion(outputs.squeeze(), labels.squeeze().float())

        loss.backward()
        nn.utils.clip_grad_norm(net.parameters(), clip)
        optimizer.step()

        if count % print_every == 0:
            val_h = net.init_hidden(batch_size)
            val_losses = []
            net.eval()

            for inputs, labels in valid_loader:
                val_h = tuple([each.data for each in val_h])

                if gpu_available:
                    inputs, labels = inputs.cuda(), labels.cuda()

            outputs, val_h = net(inputs, val_h)
            val_loss = criterion(outputs.squeeze(), labels.squeeze().float())
            val_losses.append(val_loss.item())

            net.train()
            print(f"Epoch: {e+1}/{epochs}.....",f"Step: {count}.....","Train Loss: {:.6f}......".format(loss.item()),"Validation Loss: {:.6f}".format(np.mean(val_losses)))

  nn.utils.clip_grad_norm(net.parameters(), clip)


Epoch: 1/1..... Step: 2000..... Train Loss: -100.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 4000..... Train Loss: 100.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 6000..... Train Loss: -1400.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 8000..... Train Loss: -1500.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 10000..... Train Loss: 100.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 12000..... Train Loss: -600.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 14000..... Train Loss: -300.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 16000..... Train Loss: 100.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 18000..... Train Loss: -700.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 20000..... Train Loss: -1300.000000...... Validation Loss: -200.000000
Epoch: 1/1..... Step: 22000..... Train Loss: -100.000000...... Validation Loss: -200

## **Model Testing**

In [69]:
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

# Train the Neural Network
test_losses = []
num_correct = 0
correct_labels = []
preds = []

h = net.init_hidden(batch_size)
net.eval()

for inputs, labels in test_loader:

#    print('inputs:')
#    print(inputs)
#    print('labels:')
#    print(labels)

    h = tuple([each.data for each in h])

    if gpu_available:
        inputs, labels = inputs.cuda(), labels.cuda()

    outputs, h = net(inputs, h)

#    print(h)
#    print(h.index(max(h)))

#    print(inputs, labels, outputs)

    test_loss = criterion(outputs.squeeze(), labels.squeeze().float())
    test_losses.append(test_loss.item())

    pred = torch.round(outputs.squeeze())
#    pred = outputs.squeeze()
    correct_tensor = pred.eq(labels.float().view_as(pred))

#    print(labels.float()) #1,2,3...
#    print(pred)

#    print('preds:')
#    print(len(preds))
#    print('correct_tensor:')
#    print(correct_tensor)

    correct = np.squeeze(correct_tensor.numpy()) if not gpu_available else np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)

    preds.append(pred.cpu().detach())
    correct_labels.append(labels.float().view_as(pred).cpu().detach())

"""print('preds:')
print(preds)
print('dataset:')
print(correct_labels)"""

print(accuracy_score(correct_labels, preds))
print(recall_score(correct_labels, preds, average='micro'))
print(precision_score(correct_labels, preds, average='micro'))
print(f1_score(correct_labels, preds, average='micro'))

test_acc = num_correct/len(test_loader.dataset)

print("Average Test Loss: {:.4f}".format(np.mean(test_losses)))
print("Average Test Accuracy: {:.4f}".format(test_acc))

0.08309537407195888
0.08309537407195888
0.08309537407195888
0.08309537407195888
Average Test Loss: -637.1359
Average Test Accuracy: 0.0831


## **Main Program**

In [72]:
torch.save(net,'EmoBart_230812.pt')

In [73]:
net2 = torch.load('EmoBart_230812.pt')

In [74]:
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

# Train the Neural Network
test_losses = []
num_correct = 0
correct_labels = []
preds = []

h = net2.init_hidden(batch_size)
net2.eval()

for inputs, labels in test_loader:

#    print('inputs:')
#    print(inputs)
#    print('labels:')
#    print(labels)

    h = tuple([each.data for each in h])

    if gpu_available:
        inputs, labels = inputs.cuda(), labels.cuda()

    outputs, h = net2(inputs, h)

#    print(h)
#    print(h.index(max(h)))

#    print(inputs, labels, outputs)

    test_loss = criterion(outputs.squeeze(), labels.squeeze().float())
    test_losses.append(test_loss.item())

    pred = torch.round(outputs.squeeze())
#    pred = outputs.squeeze()
    correct_tensor = pred.eq(labels.float().view_as(pred))

#    print(labels.float()) #1,2,3...
#    print(pred)

#    print('preds:')
#    print(len(preds))
#    print('correct_tensor:')
#    print(correct_tensor)

    correct = np.squeeze(correct_tensor.numpy()) if not gpu_available else np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)

    preds.append(pred.cpu().detach())
    correct_labels.append(labels.float().view_as(pred).cpu().detach())

"""print('preds:')
print(preds)
print('dataset:')
print(correct_labels)"""

print(accuracy_score(correct_labels, preds))
print(recall_score(correct_labels, preds, average='micro'))
print(precision_score(correct_labels, preds, average='micro'))
print(f1_score(correct_labels, preds, average='micro'))

test_acc = num_correct/len(test_loader.dataset)

print("Average Test Loss: {:.4f}".format(np.mean(test_losses)))
print("Average Test Accuracy: {:.4f}".format(test_acc))

0.08309537407195888
0.08309537407195888
0.08309537407195888
0.08309537407195888
Average Test Loss: -637.1359
Average Test Accuracy: 0.0831
