## IMDB review sentiment Analysis


#### Μέλη Ομάδας:
- Ευάγγελος Λευτάκης : 3200093
- Ρέα Σκλήκα : 3210181
- Σοφία-Ζωή Σωτηρίου : 3210192

### Dependencies

In [1]:
!pip install -U pandas
!pip install -U numpy
!pip install -U scipy
!pip install -U tensorflow
!pip install -U scikit-learn



In [2]:
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import sklearn
import tensorflow as tf




### 1st Part

First We Prepare the train and test data and create a binary representation of it.

In [3]:
from sklearn.feature_extraction.text import CountVectorizer

(x_train_imdb, y_train), (x_test_imdb, y_test) = tf.keras.datasets.imdb.load_data()


# x_train_imdb, y_train_imdb = temp_x_train_imdb[:split_index], y_train[:split_index]
# x_dev_imdb, y_dev_imdb = temp_x_train_imdb[split_index:], y_train[split_index:]

word_index = tf.keras.datasets.imdb.get_word_index()
index2word = dict((i + 3, word) for (word, i) in word_index.items())
index2word[0] = '[pad]'
index2word[1] = '[bos]'
index2word[2] = '[oov]'
x_train_imdb = np.array([' '.join([index2word[idx] for idx in text]) for text in x_train_imdb])
x_test_imdb = np.array([' '.join([index2word[idx] for idx in text]) for text in x_test_imdb])

binary_vectorizer = CountVectorizer(binary=True, min_df=100)
x_train = binary_vectorizer.fit_transform(x_train_imdb)
x_test = binary_vectorizer.transform(x_test_imdb)
print(
    'Vocabulary size:', len(binary_vectorizer.vocabulary_)
)
x_train = x_train.astype(int)
x_test = x_test.astype(int)
# print(x_train.shape)



Vocabulary size: 3834


#### Logistic Regression:


In [55]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy.sparse import issparse

class LogisticRegression():
   

    def __init__(self,epochs=20,learning_rate=0.001,threshold=0.5, regularization_factor=1):
        self.epochs=epochs;
        self.learning_rate=learning_rate
        self.weights = np.array([])
        self.threshold=threshold
        self.regularization_factor = regularization_factor
        
    
    def set_threshold(self,value):
        self.threshold = value
    def sigmoid(self , t):
        return 1/(1+np.exp(-t))


    def fit(self,x_train_input,y_train_input):
        ''' '''
        feature_vector_length = x_train_input.shape[1]
        split_index = int(0.8 * x_train_input.shape[0])  # 80% for training, 20% for dev
        x_train, y_train = x_train_input[:split_index], y_train_input[:split_index]
        x_dev, y_dev= x_train_input[split_index:], y_train_input[split_index:]
        x_dev = x_dev.toarray()

        x_train = x_train.toarray()
        ones_column = np.ones(x_train.shape[0])
        x_train = np.insert(x_train, 0, ones_column, axis=1)
        self.weights = self.initialize_weights(feature_vector_length)
        
        for epoch in range(self.epochs):
            for i in range(x_train.shape[0]):
                x_i = x_train[i-1]
                y_i = y_train[i-1]

                self.update_weights(x_i,y_i)
        y_dev_predicted = self.predict(x_dev)
        return self.evaluate(y_dev_predicted, y_dev)


        
    
    def predict(self, x_test):
        
        features_length = x_test.shape[1]
        if issparse(x_test):
            x_test = x_test.toarray()
        ones_column = np.ones(x_test.shape[0])
        x_test = np.insert(x_test, 0, ones_column, axis=1)#insert 1 in x_vector for w0

        y_test_predicted = []


        for test_case in x_test:
            probability_positive =  self.sigmoid(np.dot(self.weights,test_case))
            predicted_class = 1 if probability_positive >= self.threshold else 0
            y_test_predicted.append(predicted_class)
        return np.array(y_test_predicted)



    @staticmethod
    def initialize_weights(size):
        '''We Initialize the weights with random values with a mean of 0 and a standard deviation of 0.01
          This is a common practice to prevent the weights from being too large at the begining '''
        return np.random.randn(size+1) * 0.01

    def update_weights(self,x_test,y_test):
        predicted_prob = self.sigmoid(np.dot(self.weights,x_test))
        gradient = ((y_test - predicted_prob) * x_test)
        self.weights += self.learning_rate * gradient 
    
    def evaluate(self, y_true, y_predicted):
        accuracy = accuracy_score(y_true, y_predicted)
        print("Accuracy:", accuracy)

        precision = precision_score(y_true, y_predicted)
        print("Precision:", precision)

        # Compute recall
        recall = recall_score(y_true, y_predicted)
        print("Recall:", recall)

        # Compute F1 score
        f1 = f1_score(y_true, y_predicted)
        print("F1 Score:", f1)
        print("Threshold: "+str(self.threshold))
        return np.array([precision,recall])
    

    def generate_pr_curve(self, x_train_input,y_train_input,size=20):
        currentThreshold = 0
        percisions=[]
        recalls=[]
        for i in range(size):
            currentThreshold += 1./size
            if (currentThreshold>0.3 and currentThreshold<0.75):
                self.set_threshold(currentThreshold)
                fit_pr = self.fit(x_train_input , y_train_input)
                percisions.append(fit_pr[0])
                recalls.append(fit_pr[1])
        print(percisions)
        plt.plot(recalls,percisions ,label='Precision-Recall Curve')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Diagram')
        plt.legend()
        plt.show()


#### Results
Here we try out the Logistic Regression learning algorithm

In [63]:
LogReg = LogisticRegression(100,threshold=0.5, regularization_factor=0.01)
LogReg.fit(x_train,y_train)
y_predicted = LogReg.predict(x_test)
# fit(x_train,y_train)
 
# y_predicted = LogReg.predict(x_test)

LogReg.evaluate(y_test,y_predicted)

(20000,)
Accuracy: 0.8774
Precision: 0.8813284730660186
Recall: 0.8717948717948718
F1 Score: 0.8765357502517622
Threshold: 0.5
Accuracy: 0.8712
Precision: 0.8669144393484106
Recall: 0.87704
F1 Score: 0.8719478247037303
Threshold: 0.5


array([0.86691444, 0.87704   ])

#### Naive Bayes:

In [7]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
#from scipy.sparse import isspars

class NaiveBayes: 
    def __init__(self):
            self.probabilityC1= None #probabilityC0 = 1 -probabilityC1
            self.boundProbabilityC0 = None 
            self.boundProbabilityC1 = None

    def fit(self, x_train_input, y_train_input): 
        split_index = int(0.8 * x_train_input.shape[0])  # 80% for training, 20% for dev
        x_train, y_train = x_train_input[:split_index], y_train_input[:split_index]
        x_dev, y_dev= x_train_input[split_index:], y_train_input[split_index:]

        numberOfExamples = x_train.shape[0]
        numberOfFeatures = x_train.shape[1]
        x_train = x_train.toarray()
        #y_train = y_train.flatten()

        positiveReviews = 0
        for label in y_train: 
            if label == 1: 
                positiveReviews += 1 

        self.probabilityC1 = positiveReviews / len(y_train)

        '''
        C0: P( X(i) = 0 | C = 0) and C1 = P ( X(i) = 0 | C = 1)
        P( X(i) = 1 | C = 0) = 1 - P( X(i) = 0 | C = 0) and P( X(i) = 1 | C = 1) = P( X(i) = 0 | C = 1)
        '''
        self.boundProbabilityC0 = np.zeros(numberOfFeatures)  
        self.boundProbabilityC1 = np.zeros(numberOfFeatures)
        for i in range(numberOfFeatures): 

            #For every word that is not on a review add to the array correspnding to the review result of the example 
            for j in range(numberOfExamples):  
                if y_train[j] == 0 and x_train[j][i] == 0: 
                    self.boundProbabilityC0[i] += 1 
                elif y_train[j] == 1 and x_train[j][i] == 0: 
                    self.boundProbabilityC1[i] += 1 
        """
        Adding Laplace estimator with an alpha value of 1 
        """
        self.boundProbabilityC0 = [x + 1 / (numberOfExamples - positiveReviews + x) for x in self.boundProbabilityC0] 
        self.boundProbabilityC1 = [x + 1 / (positiveReviews+ x) for x in self.boundProbabilityC1]

    def predict(self, x_test, y_test):
        prediction = list()
        c1 = self.boundProbabilityC1
        c0 = self.boundProbabilityC0
        numberOfFeatures = x_test.shape[1]
        x_test = x_test.toarray()

        for x in range(x_test.shape[0]): 
            positiveReviewProbability = self.probabilityC1 
            negativeReviewProbability = (1 - self.probabilityC1)

            for y in range(numberOfFeatures):
                if x_test[x][y] == 0 and y_test[x] == 0: 
                    negativeReviewProbability = round(negativeReviewProbability * self.boundProbabilityC0[y], 2)
                elif  x_test[x][y] == 1 and y_test[x] == 0: 
                    negativeReviewProbability = round(negativeReviewProbability * (1 - self.boundProbabilityC0[y]), 2)
                elif  x_test[x][y] == 0 and y_test[x] == 1:
                    positiveReviewProbability = round(positiveReviewProbability * self.boundProbabilityC1[y], 2)
                elif  x_test[x][y] == 1 and y_test[x] == 1:
                    positiveReviewProbability = round(positiveReviewProbability * (1 - self.boundProbabilityC1[y]), 2)

            if positiveReviewProbability > negativeReviewProbability: 
                prediction.append(1)
            elif negativeReviewProbability > positiveReviewProbability:
                prediction.append(0)

        return np.array(prediction)
    
    def evaluate(self, y_true, y_predicted):
        accuracy = accuracy_score(y_true, y_predicted)
        print("Accuracy:", accuracy)

        precision = precision_score(y_true, y_predicted)
        print("Precision:", precision)

        # Compute recall
        recall = recall_score(y_true, y_predicted)
        print("Recall:", recall)

        # Compute F1 score
        f1 = f1_score(y_true, y_predicted)
        print("F1 Score:", f1)
        #print("Threshold: "+str(self.threshold))
        return np.array([precision,recall])
    

    


In [8]:
NB = NaiveBayes()
NB.fit(x_train,y_train)
y_predicted = NB.predict(x_test, y_test)
# fit(x_train,y_train)
 
# y_predicted = LogReg.predict(x_test)

NB.evaluate(y_test,y_predicted)

  negativeReviewProbability = round(negativeReviewProbability * self.boundProbabilityC0[y], 2)
  positiveReviewProbability = round(positiveReviewProbability * self.boundProbabilityC1[y], 2)
  negativeReviewProbability = round(negativeReviewProbability * (1 - self.boundProbabilityC0[y]), 2)
  positiveReviewProbability = round(positiveReviewProbability * (1 - self.boundProbabilityC1[y]), 2)


Accuracy: 0.49596
Precision: 0.4960075895327694
Recall: 0.50192
F1 Score: 0.4989462801701857


array([0.49600759, 0.50192   ])


### [AdaBoost](Machine-Learning-Algorithms/AdaBoost.py)

In [51]:
import numpy as np
import random as random
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.utils import shuffle

class AdaBoost:
    def __init__(self, learning_rate=1.0):
        self.M = None
        self.models = []
        self.weights = []
        self.learning_rate = learning_rate
    
    def fit (self, x_train_input, y_train_input, M):
        split_index = int(0.8 * x_train_input.shape[0])  # 80% for training, 20% for dev
        x_train, y_train = x_train_input[:split_index], y_train_input[:split_index]
        x_dev, y_dev= x_train_input[split_index:], y_train_input[split_index:]
        x_train = x_train.toarray()
    
        self.M = M
        # first all samples get the same weight, 1/total number of examples
        # that makes the samples all equally important
        sample_weights = np.ones(x_train.shape[0]) * (1 / x_train.shape[0])
        for m in range(M):
            x_train, y_train = shuffle(x_train, y_train, random_state=m)

            stump = CreateStump()
            x_train, y_train = stump.fit( x_train, y_train, sample_weights)
            sample_weights = stump.weights
            self.models.append(stump)
            self.weights.extend([self.learning_rate * w for w in sample_weights])  # Accumulate weights
           
        y_dev_predicted = self.predict(x_dev)
        return self.evaluate(y_dev,y_dev_predicted)

    def predict(self, x_test):
        x_test = x_test.toarray()
        
        predictions = np.zeros(x_test.shape[0])
        stump_says = np.zeros(len(self.models))

        for i, stump in enumerate(self.models):
            # Get predictions from each stump
            stump_pred = stump.predict(x_test)
            stump_says[i] = stump.amountOfSay
            # Update overall predictions based on the stump's amount of say
            predictions += stump_pred * stump_says[i]

        # Final prediction is based on the sign of the weighted sum
        final_predictions = np.sign(predictions)

        return final_predictions
    
    def evaluate(self, y_true, y_predicted):
        # Handle NaN values, replace them with 0
        nan_indices1 = np.isnan(y_true)
        if np.any(nan_indices1):
            y_true = np.nan_to_num(y_true)
        nan_indices2 = np.isnan(y_predicted)
        if np.any(nan_indices2):
            y_predicted = np.nan_to_num(y_predicted)

        accuracy = accuracy_score(y_true, y_predicted)
        print("Accuracy:", accuracy)

        precision = precision_score(y_true, y_predicted)
        print("Precision:", precision)

        # Compute recall
        recall = recall_score(y_true, y_predicted)
        print("Recall:", recall)

        # Compute F1 score
        f1 = f1_score(y_true, y_predicted)
        print("F1 Score:", f1)


class CreateStump:
    def __init__(self):
        self.amountOfSay = None
        self.word = None
        self.weights = []

    def fit(self, x, y, w):
        n = x.shape[1]
        m = x.shape[0]

        best_amount_of_say = None
        best_word = None
        best_error = float('inf')
        for word_index in range(n):           
            # Vectorized calculations
            incorrect_mask = x[:, word_index] != y

            # Calculate weighted error
            total_error = np.sum(w[incorrect_mask])

            # Choose the word that minimizes the weighted error
            if total_error < best_error:
                best_amount_of_say = 0.5 * np.log((1 - total_error) / (total_error + 1e-10))
                best_word = word_index
                best_error = total_error
        self.amountOfSay = best_amount_of_say
        self.word = best_word
        # Update weights
        incorrect_mask = x[:, self.word] != y
        w[incorrect_mask] *= np.exp(self.amountOfSay)
        w[~incorrect_mask] *= np.exp(-self.amountOfSay)

        # normalize the weights so they all add up to 1
        self.weights = w / np.sum(w)
        indices = np.random.choice(np.arange(m), size=m, p=self.weights)
        return x[indices], y[indices] 

    def predict(self, x_test):
        # Make predictions using the chosen word index
        # Convert predictions to binary (0 or 1)
        stump_pred_binary = x_test[:, self.word] >= 0.5
        return stump_pred_binary

#### Results:

In [54]:
AB = AdaBoost()
AB.fit(x_train,y_train,10)

y_predicted = AB.predict(x_test)

AB.evaluate(y_test,y_predicted)

Accuracy: 0.4938
Precision: 0.4938
Recall: 1.0
F1 Score: 0.6611326817512385
Accuracy: 0.5
Precision: 0.5
Recall: 1.0
F1 Score: 0.6666666666666666
