In [132]:
import pandas as pd
import pickle
from sklearn.feature_extraction.text import CountVectorizer
import gzip
from transformers import pipeline
import torch.nn
import numpy as np
import matplotlib.pyplot as plt
import bertviz
from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
from torch.utils.data import DataLoader, TensorDataset, Dataset
from pandatorch import data
import evaluate

In [67]:
# Training Set
trainDF=pd.read_csv("data\\train.csv")

# Validation Set
validationDF=pd.read_csv("data\\validation.csv")

# Testing Set
testDF=pd.read_csv("data\\test.csv")

# Full Data Set
fullDF=pd.read_csv("data\\data.csv")

EMOTIONSDICT={"sadness":0, "joy":1, "love":2, "anger":3, "fear":4, "surprise":5}

In [83]:
class NaiveBayes(object):
    
    def __init__(self, df, alpha=1):
        self.vocabulary={} #Set that contains all vocabulary words in our training set.
        self.sentences=self.createSparseMat(df) #Tokenized representation of all sentences in dataframe.
        self.targets=self.createTargets(df) #Targets for each sentence, corresponding to the labelled emotions.

        self.numSamples=self.sentences.shape[0] #Number of sentences, N.
        self.numFeatures=self.sentences.shape[1] #Number of words in our vocabulary, D.
        self.numTargets=max(self.targets)+1 #Number of different sentiments, C.
        self.alpha=alpha #Alpha of smoothing.
        self.labelledSentences=self.createLabelledSentences() #Dictioanry, CxN_cxD, where each key represents
            #an emotion and holds an array of all sentences that are labelled that emotion.
        self.wordFrequencies=self.createWordFrequencies() #CxD dimensional array, where each array is the sum
            #of the d-th word across all sentences of class c.
        self.totalWordFrequencies=self.createTotalWordFrequencies() #C dimensional array, where each index is the 
            #sum of all words in sentences of class c.
            
        self.priorProbabilities=None #Prior probabilities.
        self.posteriorProbabilities=None #Posterior probabilities, multinomial.
        
    #Creates word frequencies list.
    def createWordFrequencies(self):
        res=[0]*self.numTargets
        for c in range(self.numTargets):
            res[c]=np.sum(self.labelledSentences[c], axis=0)
        return res
    
    #Creates total word frequencies list.
    def createTotalWordFrequencies(self):
        res=[0]*self.numTargets
        for c in range(self.numTargets):
            res[c]=np.sum(self.labelledSentences[c])
        return res
    
    #Return dictionary with keys that are the targets, and entries being arrays of all sentences that are 
        #that target. 
    def createLabelledSentences(self):
        res=dict.fromkeys(range(self.numTargets+1))
        for key in res:
            res[key]=[]
        for i in range(len(self.sentences)):
            target=self.targets[i]
            res[target].append(self.sentences[i])
        return res
    
    #Returns a bag of words matrix representation of each sentence in our dataset.
    def createSparseMat(self, df, test=0):
        
        phrases=[]
        for i in range(len(df)):
            sample=df.loc[i, "text"]
            phrases.append(sample)
            
        #If we're testing, we need to use the vocabulary from our training set to make the sparse matrix.
        vectorizer=None
        if test:
            vectorizer=CountVectorizer(vocabulary=self.vocabulary)
            sparseMat=vectorizer.fit_transform(phrases)
            return sparseMat.toarray()
        else:
            vectorizer=CountVectorizer()
            sparseMat=vectorizer.fit_transform(phrases)
            self.vocabulary=vectorizer.vocabulary_
            return sparseMat.toarray()
    
    #Creates corresponding targets to each sentence.
    def createTargets(self,df):
        labels=[]
        for i in range(len(df)):
            labels.append(df.loc[i,"emotions"])
        return labels
    
    #Creates and stores parameters as model attributes.
    def fit(self):
        priorProbabilities=self.createPriorProbabilities()
        self.priorProbabilities=priorProbabilities
        
        posteriorProbabilities=self.createPosteriorProbabilities()
        self.posteriorProbabilities=posteriorProbabilities
        
    #Creates prior probabilities of each emotion using multinoulli classification. 
    def createPriorProbabilities(self):
        priorProbabilities=[self.alpha]*self.numTargets
        for i in range(len(self.targets)):
            trueTarget=self.targets[i]
            priorProbabilities[trueTarget]+=1
        return np.divide(priorProbabilities, self.alpha*self.numTargets+self.numSamples)

    #Creates posterior probabilites, the theta_(d,c).
    def createPosteriorProbabilities(self):
        posteriorProbabilites=[[0]*self.numFeatures for c in range(self.numTargets)] #CxD
        
        for c in range(self.numTargets):
            posteriorProbabilites[c]=np.divide(self.wordFrequencies[c]+self.alpha, self.alpha*self.numTargets+self.totalWordFrequencies[c])
        
        return posteriorProbabilites
    
    #Predicts the labels for a test/validation dataframe.
    def predict(self, testdf):
        testMatrix=self.createSparseMat(testdf, test=1)
        predictedTargets=[]
        
        for i in range(len(testMatrix)):
            sentence=testMatrix[i]
            probabilities=self.predictSentence(sentence)
            predictedEmotion=probabilities.index(max(probabilities))
            predictedTargets.append(predictedEmotion)
            
        return predictedTargets

    #Gets accuracy for a test dataframe.
    def getAcc(self, testdf):
        testTargets=self.createTargets(testdf)
        predictedTargets=self.predict(testdf)
        
        right=0
        for i in range(len(testTargets)):
            if testTargets[i]==predictedTargets[i]:
                right+=1
        return right/len(testTargets)
    
    #Returns probability of sentence being each class c.
    def predictSentence(self, sentence):
        probabilities=[0]*self.numTargets
        
        for c in range(self.numTargets):
            res=1
            res*=self.priorProbabilities[c] #Probability of it being class c.
            for d in range(self.numFeatures):
                factor=self.posteriorProbabilities[c][d]**(sentence[d])
                res*=(factor) #Posterior probabilities.
            probabilities[c]=res
           
        pSum=sum(probabilities) 
        for i in range(len(probabilities)):
            probabilities[i]=probabilities[i]/pSum
            

        return probabilities
        

In [None]:
alphas=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
testAccs=[]
for alpha in alphas:
    bayesModel=NaiveBayes(trainDF, alpha) #Training model
    bayesModel.fit()
    testAccs.append(bayesModel.getAcc(testDF))
    
plt.plot(alphas, testAccs)
plt.xlabel("Smoothing Alphas")
plt.ylabel("Test Accuracy")
plt.title("Test Set Accuracy For Various Smoothing Alphas")

In [None]:
#Importing pre-trained model without changing weights. 
bert = pipeline("sentiment-analysis",model='bhadresh-savani/bert-base-uncased-emotion')

testModel=NaiveBayes(testDF, 1)

right=0
for i in range(len(testModel.sentences)):
    sentence=testDF.loc[i,"text"]
    prediction=bert(sentence)
    predictedTarget=EMOTIONSDICT[prediction[0]["label"]]
    if predictedTarget==testModel.targets[i]:
        right+=1

print("Accuracy is:", right/len(testDF))

In [None]:
#Creating the model and tokenizer.
modelName='bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(modelName)
model = BertForSequenceClassification.from_pretrained(modelName, num_labels=6,output_attentions=True)

def tokenizeFunction(sentence):
    return tokenizer(sentence, padding="max_length", truncation=True)

class CustomDataset(Dataset):
    def __init__(self, dataframe, tokenizer):
        self.data = dataframe
        self.tokenizer=tokenizer
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.loc[idx, "text"]  # 'text' column
        emotions = self.data.loc[idx, "emotions"]  # 'emotions' column
        inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            'input_ids': inputs["input_ids"].squeeze(),
            'attention_mask': inputs["attention_mask"].squeeze(),
            'labels': int(emotions),
        }

#Tokenizing the training and text strings
trainDataset=CustomDataset(trainDF, tokenizer)
testDataset=CustomDataset(testDF, tokenizer)
trainLoader=DataLoader(trainDataset, batch_size=16, shuffle=True)
testLoader=DataLoader(testDataset, batch_size=16, shuffle=True)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
#Implementing pre-trained model, with changing weights.

#Loading evaluation metrics.
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)


#Creating training configurations for the model.
trainingArgs= TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    evaluation_strategy="steps",
    save_steps=500,
    eval_steps=500,
)

#Creating a trainer for the model, prints out accuracy.
trainer = Trainer(
    model=model,
    args=trainingArgs,
    train_dataset=trainDataset,
    eval_dataset=testDataset,
    compute_metrics=compute_metrics
)

trainer.train()

Downloading builder script: 100%|██████████| 4.20k/4.20k [00:00<?, ?B/s]


In [None]:
#Printing results of training of evaluation dataset.
results = trainer.evaluate(eval_dataset=testDataset)
print(results)

In [133]:
#Extracting correctly and incorrectly predicted sentences from the testing set.
correctlyPredicted=[] #Indices of all sentences that were correctly predicted.
incorrectlyPredicted=[] #Indices of all sentences that were incorrectly predicted.

for i in range(len(testDF)):
  print(i)
  sentence=validationDF.loc[i,"text"]
  inputs = tokenizer.encode(sentence, return_tensors='pt')
  output=model(inputs)
  probabilities = torch.nn.functional.softmax(output.logits, dim=1)
  predictedLabel = torch.argmax(probabilities, dim=1).item()
  if predictedLabel==validationDF.loc[i,"emotions"]:
    correctlyPredicted.append(i)
  else:
    incorrectlyPredicted.append(i)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
27

In [135]:
#Sample attention view for incorrectly predicted sentence.
def showAttention(i, correct=1):
    sentence=None
    if correct:
        sentence=testDF.loc[incorrectlyPredicted[i],"text"]
    else:
        sentence=testDF.loc[incorrectlyPredicted[i],"text"]
    inputs = tokenizer.encode(sentence, return_tensors='pt')
    output=model(inputs)
    attention=output[-1]
    tokens=tokenizer.convert_ids_to_tokens(inputs[0])
    bertviz.model_view(attention, tokens)
    
showAttention(0, 1) #Shows the attention matrix for the first correctly predicted sentence. 

<IPython.core.display.Javascript object>