<a href="https://colab.research.google.com/github/perlatomdpi/NLP/blob/main/Sentiment_Analysis_BERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Sentiment analysis model with BERT**


# **Import dependencies**

In [None]:
import numpy as np
import math
import re # advanced text pre-processing
import pandas as pd
from bs4 import BeautifulSoup # encode text
import random

from google.colab import drive # get data from drive

In [None]:
# use tf2
try:
    %tensorflow_version 2.x
except Exception:
    pass
import tensorflow as tf

import tensorflow_hub as hub # to download the weights of bert
from tensorflow.keras import layers # to create cnn layers
import bert

# **Data pre-processing**

In [None]:
drive.mount("/content/drive")

cols = ["sentiment", "id", "query", "user", "text"]
data = pd.read.csv(
    "content/drive/My Drive/.../BERT/data/train.csv",
    header=None,
    names=cols,
    engine="python", 
    encoding="latin1"
)

data.drop(["id", "date", "query", "user"],
          axis=1,
          inplace=True) # inplace garantee that new lighter data are loaded

# **Cleaning**

In [None]:
def clean_tweet(tweet):
    tweet = BeautifulSoup(tweet, "lxml").get_text()
    tweet = re.sub(r"@[A-Za-z0-9]+", '',tweet)   # replace a substring with another
    tweet = re.sub(r"https?://[A-Za-z0-9./]+", '', tweet)
    tweet = re.sub(r"[^A-Za-z.!?]", '', tweet)
    tweet = re.sub(r" +", '', tweet)
    return tweet

data_clean = [clean_tweet(tweet) for tweet in data.text]

data_labels = data.sentiment.values # get the value of the sentiment
data_labels[data_labels == 4] = 1   # in the data 4 is for positve and we converted it to 1

# **Tokenization**

In [None]:
FullTokenizer = bert.bert_tokenization.FullTokenizer # call bert module

bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=False) # create bert layer - hub is were the pre-trainined model is stored
                                             # here we use the light version of bert
                                             # L-12: 12 encoders
                                             # trainable=false --> we don't use bert for fine-tuning weights

vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy() # give access to the vocab file as numpy
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy() # lower case
tokenizer = FullTokenizer(vocab_file, do_lower_case) # create the tokenizer

def encode_sentence(sent):
    return tokenizer.convert_tokens_to_ids(tokenizer.tokenize(sent)) # id: each token is related to a number

# per each sentence run tokenizer and give valid input too our model
data_inputs = [encode_sentence(sentence) for sentence in data_clean]

# **Create dataset and use it as input for the trainin model**

The BERT model receives a fixed length of sentence as input. Usually the maximum length of a sentence depends on the data we are working on. For sentences that are shorter than this maximum length, we will have to add paddings (empty tokens) to the sentences to make up the length.

We will create padded batches (so we pad sentences for each batch independently), this way we add the minimum of padding tokens possible. For that, we sort sentences by length, apply padded_batches and then shuffle.

In [None]:
data_with_len = [[sent, data_labels[i], len(sent)]
                 for i, sent in enumerate(data_inputs)] # iterate over the data

random.shuffle(data_with_len) # shuffle it: first half is negative sentiment, second half is positive sentiment
                              # so with shuffle we have negative and positive sentences mixed and not only positve or negative

data_with_len.sort(key=lambda x: x[2]) # sort according to the length
                                       # per each element x --> [sent, data_labels[i], len(sent)] access to the len(sent)
                                       # so len(sent) is the criteria for sorting

sorted_all = [(sent_lab[0], sent_lab[1])
              for sent_lab in data_with_len if sent_lab[2] > 7] # get sentence with 7 words as minimum


# create dataset via generator --> generator give element one after the other 
all_dataset = tf.data.Dataset.from_generator(lambda: sorted_all,
                                             output_types=(tf.int32, tf.int32)) # interger because the token are the token id


BATCH_SIZE = 32
all_batched = all_dataset.padded_batch(BATCH_SIZE, padded_shapes=((None, ), ())) # None: add empty token to make up the length

NB_BATCHES = math.ceil(len(sorted_all) / BATCH_SIZE) # give the number of batches
NB_BATCHES_TEST = NB_BATCHES // 10                   # get 10 test
all_batched.shuffle(NB_BATCHES)                      # shuffle in order to mix positive and negative lables
test_dataset = all_batched.take(NB_BATCHES_TEST)     # create test data
train_dataset = all_batched.skip(NB_BATCHES_TEST)    # create train

# **Model building**

In [None]:
class DCNN(tf.keras.Model):
    
    def __init__(self,
                 vocab_size,
                 emb_dim=128,
                 nb_filters=50,
                 FFN_units=512,
                 nb_classes=2,
                 dropout_rate=0.1,
                 training=False,
                 name="dcnn"):
        super(DCNN, self).__init__(name=name)
        
        self.embedding = layers.Embedding(vocab_size,
                                          emb_dim)
        self.bigram = layers.Conv1D(filters=nb_filters,
                                    kernel_size=2,
                                    padding="valid",
                                    activation="relu")
        self.trigram = layers.Conv1D(filters=nb_filters,
                                     kernel_size=3,
                                     padding="valid",
                                     activation="relu")
        self.fourgram = layers.Conv1D(filters=nb_filters,
                                      kernel_size=4,
                                      padding="valid",
                                      activation="relu")
        self.pool = layers.GlobalMaxPool1D()
        
        self.dense_1 = layers.Dense(units=FFN_units, activation="relu")
        self.dropout = layers.Dropout(rate=dropout_rate)
        if nb_classes == 2:
            self.last_dense = layers.Dense(units=1,
                                           activation="sigmoid")
        else:
            self.last_dense = layers.Dense(units=nb_classes,
                                           activation="softmax")
    
    def call(self, inputs, training):
        x = self.embedding(inputs)
        x_1 = self.bigram(x) # batch_size, nb_filters, seq_len-1)
        x_1 = self.pool(x_1) # (batch_size, nb_filters)
        x_2 = self.trigram(x) # batch_size, nb_filters, seq_len-2)
        x_2 = self.pool(x_2) # (batch_size, nb_filters)
        x_3 = self.fourgram(x) # batch_size, nb_filters, seq_len-3)
        x_3 = self.pool(x_3) # (batch_size, nb_filters)
        
        merged = tf.concat([x_1, x_2, x_3], axis=-1) # (batch_size, 3 * nb_filters)
        merged = self.dense_1(merged)
        merged = self.dropout(merged, training)
        output = self.last_dense(merged)
        
        return output