In [1]:
import pandas as pd
import pickle
from sklearn.feature_extraction.text import CountVectorizer
import gzip
from transformers import pipeline
import torch.nn
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Training Set
trainDF=pd.read_csv("data\\train.csv")

# Validation Set
validationDF=pd.read_csv("data\\validation.csv")

# Testing Set
testDF=pd.read_csv("data\\test.csv")

# Full Data Set
fullDF=pd.read_csv("data\\data.csv")

EMOTIONSDICT={"sadness":0, "joy":1, "love":2, "anger":3, "fear":4, "surprise":5}

In [3]:
class NaiveBayes(object):
    
    def __init__(self, df):
        self.vocabulary={} #Set that contains all vocabulary words in our training set.
        self.sentences=self.createSparseMat(df) #Tokenized representation of all sentences in dataframe.
        self.targets=self.createTargets(df) #Targets for each sentence, corresponding to the labelled emotions.

        self.numSamples=self.sentences.shape[0] #Number of sentences, N.
        self.numFeatures=self.sentences.shape[1] #Number of words in our vocabulary, D.
        self.numTargets=max(self.targets)+1 #Number of different sentiments, C.g
        self.labelledSentences=self.createLabelledSentences() #Dictioanry, CxN_cxD, where each key represents
            #an emotion and holds an array of all sentences that are labelled that emotion.
        self.wordFrequencies=self.createWordFrequencies() #CxD dimensional array, where each array is the sum
            #of the d-th word across all sentences of class c.
        self.totalWordFrequencies=self.createTotalWordFrequencies() #C dimensional array, where each index is the 
            #sum of all words in sentences of class c.

        self.priorProbabilities=None
        self.posteriorProbabilities=None
        
    def __getitem__(self, idx):
        return {
            "text":self.sentences[idx],
            'emotions': self.targets[idx]
        }
        
    def createWordFrequencies(self):
        res=[0]*self.numTargets
        for c in range(self.numTargets):
            res[c]=np.sum(self.labelledSentences[c], axis=0)
        return res
    
    def createTotalWordFrequencies(self):
        res=[0]*self.numTargets
        for c in range(self.numTargets):
            res[c]=np.sum(self.labelledSentences[c])
        return res
    
    def createLabelledSentences(self):
        res=dict.fromkeys([0,1,2,3,4,5,6])
        for key in res:
            res[key]=[]
        for i in range(len(self.sentences)):
            target=self.targets[i]
            res[target].append(self.sentences[i])
        return res
    
    #Returns a bag of words matrix representation of each sentence in our dataset.
    def createSparseMat(self, df, test=0):
        
        phrases=[]
        for i in range(len(df)):
            sample=df.loc[i, "text"]
            phrases.append(sample)
            
        #If we're testing, we need to use the vocabulary from our training set to make the sparse matrix.
        vectorizer=None
        if test:
            vectorizer=CountVectorizer(vocabulary=self.vocabulary)
            sparseMat=vectorizer.fit_transform(phrases)
            return sparseMat.toarray()
        else:
            vectorizer=CountVectorizer()
            sparseMat=vectorizer.fit_transform(phrases)
            self.vocabulary=vectorizer.vocabulary_
            return sparseMat.toarray()
    
    #Creates corresponding targets to each sentence.
    def createTargets(self,df):
        labels=[]
        for i in range(len(df)):
            labels.append(df.loc[i,"emotions"])
        return labels
    
    #Creates and stores parameters as model attributes.
    def fit(self):
        priorProbabilities=self.createPriorProbabilities()
        self.priorProbabilities=priorProbabilities
        
        posteriorProbabilities=self.createPosteriorProbabilities()
        self.posteriorProbabilities=posteriorProbabilities
        
    #Creates prior probabilities of each emotion using multinoulli classification. 
    def createPriorProbabilities(self):
        priorProbabilities=[0]*self.numTargets
        for i in range(len(priorProbabilities)):
            count=0
            for target in self.targets:
                if int(target)==i:
                    count+=1
            priorProbabilities[i]=count/self.numSamples
        return priorProbabilities

    #Creates posterior probabilites, the theta_(d,c).
    def createPosteriorProbabilities(self):
        posteriorProbabilites=[[0]*self.numFeatures for d in range(self.numTargets)] #CxD
        
        for c in range(self.numTargets):
            posteriorProbabilites[c]=np.divide(self.wordFrequencies[c], self.totalWordFrequencies[c])
        
        return posteriorProbabilites
    
    #Predicts the labels for a test/validation dataframe.
    def predict(self, testdf):
        testMatrix=self.createSparseMat(testdf, test=1)
        predictedTargets=[]
        
        for i in range(len(testMatrix)):
            sentence=testMatrix[i]
            probabilities=self.predictSentence(sentence)
            predictedTargets.append(probabilities.index(max(probabilities)))
            
        return predictedTargets

    #Gets accuracy for a test dataframe.
    def getAcc(self, testdf):
        testTargets=self.createTargets(testdf)
        predictedTargets=self.predict(testdf)
        
        right=0
        for i in range(len(testTargets)):
            if testTargets[i]==predictedTargets[i]:
                right+=1
        return right/len(testTargets)
    
    def normalizationFactor(self,sentence):
        sentenceSum=sum(sentence)
        downstairs=1
        for freq in sentence:
            downstairs*=np.math.factorial(freq)
        return np.math.factorial(sentenceSum)/downstairs
    
    #Returns probability of sentence being each class c.
    def predictSentence(self, sentence):
        probabilities=[0]*self.numTargets
        
        for c in range(self.numTargets):
            res=1
            res*=self.priorProbabilities[c] #Probability of it being class c.
            for d in range(self.numFeatures):
                factor=self.posteriorProbabilities[c][d]**(sentence[d])
                res*=(factor) #Posterior probabilities.
            probabilities[c]=res
            
        pSum=sum(probabilities)
        for i in range(len(probabilities)):
            probabilities[i]=probabilities[i]/pSum

        return probabilities
        

In [4]:
bayesModel=NaiveBayes(trainDF) #Training model
testModel=NaiveBayes(testDF) #Easy access to processed data


bayesModel.fit()

print("Validation accuracy is:", bayesModel.getAcc(validationDF))
print("Test accuracy is:", bayesModel.getAcc(testDF))

  probabilities[i]=probabilities[i]/pSum


Validation accuracy is: 0.62
Test accuracy is: 0.643


In [42]:
#Importing pre-trained model without changing weights. 
bert = pipeline("sentiment-analysis",model='bhadresh-savani/bert-base-uncased-emotion')

right=0
for i in range(len(testModel.sentences)):
    sentence=testDF.loc[i,"text"]
    prediction=bert(sentence)
    predictedTarget=EMOTIONSDICT[prediction[0]["label"]]
    if predictedTarget==testModel.targets[i]:
        right+=1

print("Accuracy is:", right/len(testDF))

Accuracy is: 0.9265


In [20]:
from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
from torch.utils.data import DataLoader, TensorDataset, Dataset
from pandatorch import data

In [21]:
#Creating the model and tokenizer.
modelName='bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(modelName)
model = BertForSequenceClassification.from_pretrained(modelName, num_labels=6)

def tokenizeFunction(sentence):
    return tokenizer(sentence, padding="max_length", truncation=True)

class CustomDataset(Dataset):
    def __init__(self, dataframe, tokenizer):
        self.data = dataframe
        self.tokenizer=tokenizer
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.loc[idx, "text"]  # 'text' column
        emotions = self.data.loc[idx, "emotions"]  # 'emotions' column
        inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            'input_ids': inputs["input_ids"].squeeze(),
            'attention_mask': inputs["attention_mask"].squeeze(),
            'labels': int(emotions),
        }

#Tokenizing the training and text strings
trainDataset=CustomDataset(trainDF, tokenizer)
testDataset=CustomDataset(testDF, tokenizer)
trainLoader=DataLoader(trainDataset, batch_size=16, shuffle=True)
testLoader=DataLoader(testDataset, batch_size=16, shuffle=True)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [43]:
#Implementing pre-trained model, with changing weights.

#Creating datasets for the training and text dataframes.
trainingArgs= TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    evaluation_strategy="steps",
    save_steps=500,
    eval_steps=500,
)

trainer = Trainer(
    model=model,
    args=trainingArgs,
    train_dataset=trainDataset,
    #data_collator=lambda data: {"input_ids": data[0], "attention_mask": data[1], "labels": data[2]},
    eval_dataset=testDataset,
)

trainer.train()

  0%|          | 0/3000 [2:30:57<?, ?it/s]
  0%|          | 1/3000 [2:28:23<7416:44:08, 8903.05s/it]


KeyboardInterrupt: 