In [1]:
# Import necessary libraries

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# Enable inline plotting
%matplotlib inline

# Import tqdm for progress bar
from tqdm import tqdm


# Import Keras libraries
import keras
from keras.models import Sequential
from keras.layers import Dense,Dropout
from keras.optimizers import adam
# Use TensorFlow as the backend for Keras


Using TensorFlow backend.


In [2]:
# Import Google Colab drive module (if necessary)
from google.colab import drive
drive.mount('/content/gdrive')

ModuleNotFoundError: No module named 'google.colab'

In [None]:
# Load data from 'Train.csv' file stored in Google Drive
root_path = 'gdrive/My Drive/Train.csv'
data=pd.read_csv(root_path)

In [None]:
# Extract text column from data
text = data['text'].values

In [None]:
# Create a dictionary to store the count of each word in the text
word_list ={}
for i in tqdm(text):
  for j in i.split():
    if j in word_list:
      word_list[j] +=1
    else:
      word_list[j]=0

# Create a dictionary to store the index number of each word in the text
word_index={}
number=4
for i in tqdm(text):
  for j in i.split():
    if  word_list[j] > 100:
      if j not in word_index:
        word_index[j]= number
        number+=1

print("Number of unique words in the dictionary : ",len(word_index))

In [None]:
# Shift all index numbers by 3 to reserve the first 3 indices
word_index = {k:(v+3) for k,v in word_index.items()}

# Add special indices for padding, start, unknown, and unused words
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2  # unknown
word_index["<UNUSED>"] = 3

# Create a dictionary to map index numbers back to their corresponding words
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])

# Function to encode a review
def encode_review(text):
  sent=[word_index["<START>"]]
  for i in text.split():
    if i in word_index:
      sent.append(word_index[i])
    else:
      sent.append(word_index['<UNUSED>'])
  return sent

# Function to decode a review
def decode_review(text):
    return ' '.join([reverse_word_index.get(i, '?') for i in text])


In [None]:
encode_review('I love this movie')

In [None]:
decode_review([1,13,595,104,48])

In [None]:
# Encode all text data in the dataset
train_x=[]
for i in tqdm(text):
  train_x.append(encode_review(i))

In [None]:
# Pad the sequences of integers to have the same length

train_data = keras.preprocessing.sequence.pad_sequences(train_x,
                                                        value=word_index["<PAD>"],
                                                        padding='post',
                                                        maxlen=256)


In [None]:
# Creating the model

vocab_size = 10000

model = keras.Sequential()

# Add an embedding layer
model.add(keras.layers.Embedding(vocab_size, 16))

# Add a global average pooling layer
model.add(keras.layers.GlobalAveragePooling1D())

# Add dropout regularization to prevent overfitting
model.add(Dropout(0.2))

# Add two dense layers with ReLU activation
model.add(keras.layers.Dense(16, activation='relu'))
model.add(keras.layers.Dense(32,activation='relu'))

# Add another dropout layer
model.add(Dropout(0.2))

# Add another dense layer with ReLU activation
model.add(keras.layers.Dense(16,activation='relu'))

# Add the final dense layer with sigmoid activation
model.add(keras.layers.Dense(1, activation='sigmoid'))

model.summary()

# The Embedding layer maps the input data (vocabulary words) to a lower-dimensional space. 
# The GlobalAveragePooling1D layer then reduces the dimensionality of the input data by taking the average of all the values along the time dimension.
# The Dropout layers are applied to the output of the previous layers to prevent overfitting. 
# The Dense layers then process the input data and apply the ReLU or sigmoid activation functions. The final output layer produces the model's predictions.


In [None]:
# Compile the model with Adam optimizer, binary crossentropy loss, and accuracy metric

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['acc'])


In [None]:
# Store the labels in a separate variable
train_y = data['label'].values

In [None]:
# Fit the model on the training data with validation split of 0.2, 50 epochs, and batch size of 512
history = model.fit(train_data,
                    train_y,
                    epochs=50,
                    validation_split=0.2,
                    batch_size=512,
                    verbose=1)


In [None]:
# Test the encode and decode functions
print(encode_review('hello there'))
print(decode_review(encode_review("hello there")))

In [None]:

# Pad the review list using the pad_sequences function
# The value used for padding is the word index for the "<PAD>" token
# Padding is added to the end of the sequences (post-padding)
# The maximum length of the padded sequences is 256
emb = keras.preprocessing.sequence.pad_sequences([encode_review('Though it was bad we enjoyed the movie and we liked it')],
                                                       value=word_index["<PAD>"],
                                                       padding='post',
                                                       maxlen=256)

# Use the model to predict the sentiment of the review.
model.predict(emb)

In [None]:
def classify(text):
  emb = keras.preprocessing.sequence.pad_sequences([encode_review(text)],
                                                  value=word_index['<PAD>'],
                                                  padding='post',
                                                  maxlen=256)
  
  pred = model.predict(emb)
  
  # Check the prediction and return 'positive' or 'negative'  
  if (pred*100) >50.0 :
    return "positive"
  else:
    return "negative"
 
 # Test the classify function with a negative review
classify("I dont like this game")

# Test the classify function with a positive review
classify('Though it was bad we enjoyed the movie and we liked it')

In [None]:
# Get the dictionary containing the model's training history
history_dict = history.history

# Extract the training and validation accuracy and loss values
acc = history_dict['acc']
val_acc = history_dict['val_acc']
loss = history_dict['loss']
val_loss = history_dict['val_loss']

# Get the number of epochs

epochs = range(1, len(acc) + 1)

# Plot the training loss values
plt.plot(epochs, loss, 'bo', label='Training loss')

# Plot the validation loss values
plt.plot(epochs, val_loss, 'b', label='Validation loss')

# Set the plot title and labels
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')

# Show the legend
plt.legend()

# Display the plot
plt.show()


In [None]:
# Plot the training accuracy values
plt.plot(epochs, acc, 'bo', label='Training acc')

# Plot the validation accuracy values
plt.plot(epochs, val_acc, 'b', label='Validation acc')

# Set the plot title and labels
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')

# Show the legend
plt.legend()

# Display the plot
plt.show()


In [None]:
# Load data from 'steam_reviews.csv' file stored in Google Drive
path='gdrive/My Drive/steam_reviews.csv'
data = pd.read_csv(path)

In [None]:
# Get the values from the 'review' and 'helpful' columns
reviews = data['review'].values
helpful = data['helpful'].values

In [None]:
# Initialize empty lists for storing the labeled sentiment of each review, 
# and the indices of the reviews that were successfully labeled
labeled_sentiment = []
idx =[]

# Iterate through the reviews and classify the sentiment of each review
for index , i in tqdm(enumerate(reviews[:10000])):
  try :
    labeled_sentiment.append(classify(i))
    idx.append(index)
  except:
    pass

In [None]:
# Initialize empty lists for storing the final reviews and helpful reviews
final_reviews=[]
final_helpful=[]
for i in tqdm(range(len(reviews[:10000]))):
  if i in idx:
    final_reviews.append(reviews[i])
    final_helpful.append(helpful[i])

In [None]:
# Create a new dataframe using the filtered reviews and helpfulness ratings, 
# and the labeled sentiment of each review
new_data = pd.DataFrame(data=list(zip(final_helpful,final_reviews,labeled_sentiment)),columns=['helpful','review','sentiment'])

# Get the values from the 'review' and 'sentiment' columns in the new dataframe
reviews = new_data['review'].values
label = new_data['sentiment'].values

In [None]:
# Initialize an empty dictionary for storing the mapping of words to indices
word_dict={}
val=1
for i in tqdm(reviews):
  for j in i.split():
    if j not in word_dict:
      word_dict[j]=val
      val+=1


In [None]:
# Add special tokens to the word dictionary
word_dict['<PAD>']=0
word_dict['<START>']=val
word_dict['<UNK>'] = len(word_dict)

In [None]:
# Convert the reviews to a list of lists of integers
train_data =[]
for i in tqdm(reviews):
  sent = [word_dict['<START>']]
  for j in i.split():
    sent.append(word_dict[j])
  train_data.append(sent)
  

In [None]:
# Pad the reviews so that they all have the same length
train_data = keras.preprocessing.sequence.pad_sequences(train_data,
                                                        value=word_dict["<PAD>"],
                                                        padding='post',
                                                        maxlen=256)

In [None]:
# Convert labels to a list of integers (0 for negative, 1 for positive)
train_out =[]
for i in label:
  if i == 'negative':
    train_out.append(0)
  else:
    train_out.append(1)

The neural_network() method is the base architechture that we are going to use to train our model 

In [None]:
def neural_network(drop=0.2):
    # Create a Sequential model
    model = Sequential()

    # Add an Embedding layer with vocabulary size and embedding dimension
    model.add(keras.layers.Embedding(len(word_dict), 16))
    
    # Add a GlobalAveragePooling1D layer to average the embeddings
    model.add(keras.layers.GlobalAveragePooling1D())

    # Add a Dense layer with 32 units and ReLU activation
    model.add(keras.layers.Dense(32, activation='relu'))
    # Add a Dense layer with 64 units and ReLU activation
    model.add(keras.layers.Dense(64, activation='relu'))
    # Add a Dropout layer with the specified dropout rate
    model.add(keras.layers.Dropout(drop))

    # Add a Dense layer with 1 unit and sigmoid activation
    model.add(keras.layers.Dense(1, activation='sigmoid'))


# **Harris Hawks Optimization Algorithm**

In [None]:
class HHO():
  
  # Initialize the HHO algorithm with the number of individuals in the population (N), the number of iterations to be performed (T), 
  # the number of features in the problem (nfeatures), and an optional population of individuals (pop)
  def __init__(self,N,T,nfeatures,pop=[]):
    self.N=N
    self.T=T
    self.n_features = nfeatures
    # If a population is not provided, initialize it with random individuals
    if len(pop)==0:
      self.population = np.random.random((self.N,self.n_features))
    # If a population is provided, use it
    else:
      self.population=pop[:]
    self.best_fitness =0
    
    
  def optimize(self):
    # Set the upper and lower bounds for the population
    self.UB = len(self.population) - 1
    self.LB = 0
    
    # Iterate for the specified number of iterations
    for i in tqdm(range(self.T)):
        # If this is the first iteration, print a newline character
        if i == 0:
            print()
        # Initialize an empty list to store the fitness values of the individuals
        fits = []
        
        # Evaluate the fitness of each individual in the population
        for j in range(len(self.population)):
            # If this is not the first iteration, apply a sigmoid function to the individual's first feature
            if i != 0:
                indi_drop = self.sigmoid(self.population[j][0])
            # If this is the first iteration, use the individual's first feature as is
            else:
                indi_drop = self.population[j][0]
            # Create a neural network with the modified first feature
            model = neural_network(indi_drop)
            # Compile the model
            model.compile(optimizer='adam', loss='mse', metrics=['acc'])
            # Fit the model to the training data and evaluate its accuracy
            history = model.fit(train_data, train_out, epochs=1, batch_size=8)
            acc = history.history['acc'][0]
            # Store the individual's accuracy as its fitness
            fits.append(acc)
            # Update the best fitness value if the current individual has a higher fitness
            best_fitness = max(fits)
            if acc >= best_fitness:
                # Store the index of the best individual
                index = j
                self.best_fitness = best_fitness
        
        # If this is not the first iteration, use the sigmoid function to modify the best individual's first feature
        if i != 0:
            self.rabbit = self.population[index][0]
        # If this is the first iteration, use the best individual's first feature as is
        else:
            self.rabbit = self.sigmoid(self.population[index][0])
        # Print the selected dropout rate
        print()
        print("Dropout selected from Iteration ", i+1, " : ", self.rabbit)
        print()
        
        # Update the rest of the population
        for t, hawk in enumerate(self.population):
            # Generate random values for E and J
            E = 2 * np.random.rand() - 1
            J = 2 * (1 - np.random.rand())
            
            # Update the value of E based on the current iteration
            E = self.update_E(E, i+1)
            
            # If E is greater than or equal to 1, perform exploration
            if np.abs(E) >= 1:
                self.exploration(t)
            
            # If E is less than 1, perform exploitation or cooperation
            if np.abs(E) < 1:
                # Generate a random value for r
                r = np.random.rand()
          
          if r>=0.5 and np.abs(E)>=0.5 :
            #update vector using eqn(4)
            self.soft_baise(t,E,J)
          elif r>=0.5 and np.abs(E) <0.5:
            #update vector using eqn (6)
            self.hard_baise(t,E)
          elif r<0.5 and np.abs(E) >=0.5:
            #update vector using eqn (10)
            self.soft_baise_dive(t,E,J)
          elif r<0.5 and np.abs(E) <0.5:
            #update vector using eqn (11)
            self.hard_baise_dive(t,E,J)
           
      #print("\t Best fitness value in this iteration = ",best_fitness) 
      #print("best rabbit in this iteration : ",self.rabbit)
    
    return self.rabbit,index

  def update_E(self,E,t):
    # update the value of E at each time step
    E = 2 * E *(1 - t/self.T)
    return E
  
  def soft_baise(self,t,E,J):
    # update population using soft baise approach
    del_x = self.rabbit - self.population[t]
    
    # update population at time (t+1)%UB+1
    self.population[(t+1)%self.UB+1]= del_x  - E * np.abs( J * self.rabbit - self.population[t] )
  
  def hard_baise(self,t,E):
    # update population using hard baise approach
    del_x = self.rabbit - self.population[t]
    
    # update population at time (t+1)%UB+1
    self.population[(t+1)%self.UB+1] = self.rabbit - E * np.abs(del_x)
  
  def soft_baise_dive(self,t,E,J):
    # update population using soft baise with dive approach
    Y = self.rabbit - E* np.abs(J * self.rabbit - self.population[t])
    
    # sample a random noise vector
    s = np.random.randn(1,self.n_features)
    
    u=np.random.random()
    v= np.random.random()
    
    # calculate standard deviation of population at time t
    std = np.std(self.population[t])
    beta = 1/1.5
    
    # calculate the scaling factor for the noise vector
    lf= 0.01 * u * std /(np.abs(v)**beta)
    
    # create a new candidate solution by adding the noise vector to Y
    Z = Y + s * lf
    
    # update population at time (t+1)%UB+1 based on the fitness of Y and Z
    if self.fitness(Y) < self.fitness(self.population[t]):
      self.population[(t+1)%self.UB+1] = Y
    elif self.fitness(Z) < self.fitness(self.population[t]):
      self.population[(t+1)%self.UB+1] = Z
  
  def hard_baise_dive(self,t,E,J):
    # calculate average of the population at time t
    avg=0  
    for i in self.population:
      avg+=i
  
    pop = len(self.population)
    avg=avg/pop
    
    # calculate Y using hard baise approach
    Y = self.rabbit - E* np.abs(J*self.rabbit - avg)
    
    # sample a random noise vector
    s = np.random.randn(1,self.n_features)
    
    u=np.random.random()
    v= np.random.random()
    
    # calculate standard deviation of population at time t
    std = np.std(self.population[t])
    beta = 1/1.5
    
    # calculate the scaling factor for the noise vector
    lf= 0.01 * u * std /(np.abs(v)**beta)
    
    # create a new candidate solution by adding the noise vector to Y
    Z = Y + s * lf
    
    # update population at time (t+1)%UB+1 based on the fitness of Y and Z
    if self.fitness(Y) < self.fitness(self.population[t]):
      self.population[(t+1)%self.UB+1] = Y
    elif self.fitness(Z) < self.fitness(self.population[t]):
      self.population[(t+1)%self.UB+1] = Z
  
  def exploration(self,t):
    # update population using exploration approach
    q = np.random.random()
    r1= np.random.random()
    r2= np.random.random()
    r3= np.random.random()
    r4= np.random.random()
  
    # calculate average of the population at time t
    avg=0  
    for i in self.population:
      avg+=i
  
    pop = len(self.population)
    avg=avg/pop
   
    # select a random solution from the population
    k = np.random.randint(low=0,high=pop-1)
    
    if q >= 0.5 :
      # update population at time (t+1)%pop based on the selected solution and the current solution
      self.population[(t+1)%pop] = self.population[k] -r1 * (self.population[k] - 2* r2 *self.population[t])
    else :
      # update population at time (t+1)%pop based on the average of the population and the rabbit
      self.population[(t+1)%pop] = (self.rabbit - avg) - r3 *( 0 + r4 * (pop-1 - 0))
    
  def fitness(self,hawk):
    # calculate the fitness of the given solution
    sum=np.mean(hawk)
    return sum

  def sigmoid(self,x):
    # calculate the sigmoid function
    return 1/(1+np.exp(-x))

In [None]:
def create_population(pop_size):
  # create a population of solutions
  pop=[]
  for i in range(pop_size):
    # create an individual solution
    individual=[]
    # sample a random dropout rate
    dropout = np.random.random()
    # add the dropout rate to the individual solution
    individual.append(dropout)
    # add the individual solution to the population
    pop.append(individual)
  # return the population as a numpy array
  return np.array(pop,dtype='float32')

# create a population of size 5
print(create_population(5))
# create a population of size 25
pop = create_population(25)
print('length : ',len(pop))

# create an instance of HHO with the given population
opt = HHO(len(pop),2,1,pop)
# optimize the population
optimized_dropout = opt.optimize()

# print the optimized dropout rate
print("Optimized Dropout : ",optimized_dropout)

#**Training Our Main Model** 

In [None]:
# create an instance of the neural_network model with dropout rate 0
model = neural_network(0)

# compile the model with mean squared error as the loss function, Adam optimizer and accuracy as the metric
model.compile(loss='mse',optimizer='adam',metrics=['acc'])

# fit the model on the training data and output, with 25 epochs and a validation split of 0.2
h1 = model.fit(train_data,train_out,epochs=25,validation_split=0.2,batch_size=8)

# create an instance of the neural_network model with dropout rate 2
model = neural_network(2)

# compile the model with mean squared error as the loss function, Adam optimizer and accuracy as the metric
model.compile(loss='mse',optimizer='adam',metrics=['acc'])

# fit the model on the training data and output, with 25 epochs and a validation split of 0.2
h2 = model.fit(train_data,train_out,epochs=25,validation_split=0.2,batch_size=8)

### model = neural_network(0.271245)
model.compile(loss='mse',optimizer='adam',metrics=['acc'])
h3 = model.fit(train_data,train_out,epochs=25,validation_split=0.2,batch_size=8)

In [None]:
def encode(text):
  out =[word_dict['<START>']]
  for i in text.split():
    if i in word_dict:
      out.append(word_dict[i])
    else:
      out.append(word_dict['<UNK>'])
      
  return out

In [None]:
def classify(text):
  emb = keras.preprocessing.sequence.pad_sequences([encode(text)],
                                                  value=word_dict['<PAD>'],
                                                  padding='post',
                                                  maxlen=256)
  
  pred = model.predict(emb)
  
  #print("prediction obtained : ", pred)
  
  if (pred*100) >50.0 :
    #print("Positive review")
    return "positive"
  else:
    #print("Negative review")
    return "negative"
 

In [None]:
classify('bad gaME')

In [None]:
# create a list of 25 integers
y = list(range(1,26))

# print the list
print(y)
# print the accuracy values for the first model
print(h1.history['acc'])



In [None]:
# plot the validation accuracy for the three models
plt.plot(y,h1.history['val_acc'])
plt.plot(y,h2.history['val_acc'])
plt.plot(y,h3.history['val_acc'])

# add a legend to the plot
plt.legend(['No Dropout','Constant Dropout','With HHO'])

# create a boxplot of the validation accuracy for the three models
plt.boxplot([h1.history['val_acc'],h2.history['val_acc'],h3.history['val_acc']])

# add labels to the x-axis
plt.xlabel(['No Dropout','Constant Dropout','with HHO dropout'])

In [None]:
# print the maximum validation accuracy for each model
print('No Dropout       : ',max(h1.history['val_acc']))
print('Constant Dropout : ',max(h2.history['val_acc']))
print('with HHO dropout : ',max(h3.history['val_acc']))

# print optimized dropout
print('Optimized Dropout:',optimized_dropout)