<a href="https://colab.research.google.com/github/kubaxcz/playground/blob/master/sentiment_analysis_bert_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import numpy as np
import math
import random
import re
import pandas as pd
from bs4 import BeautifulSoup

from google.colab import drive

In [2]:
!pip install bert-for-tf2
!pip install sentencepiece



In [0]:
try:
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

import tensorflow_hub as hub

from tensorflow.keras import layers
import bert

## Data preprocessing

In [4]:
drive.mount("/content/drive")

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
cols = ["sentiment","id","date","query","user","text"]
data = pd.read_csv("/content/drive/My Drive/tmp/bert/sentiment_data/train.csv",
                   header=None,
                   names=cols,
                   engine="python",
                   encoding="latin1")

In [0]:
data.drop(['id','date','query','user'],
          axis=1,
          inplace=True)

In [7]:
data.head()

Unnamed: 0,sentiment,text
0,0,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,is upset that he can't update his Facebook by ...
2,0,@Kenichan I dived many times for the ball. Man...
3,0,my whole body feels itchy and like its on fire
4,0,"@nationwideclass no, it's not behaving at all...."


## Preprocessing

### Cleaning

In [0]:
def clean_tweet(tweet):
  tweet = BeautifulSoup(tweet, "lxml").get_text()
  tweet = re.sub(r"@[A-Za-z0-9]+", " ", tweet)
  tweet = re.sub(r"https?://[A-Za-z0-9./]+", " ", tweet)
  tweet = re.sub(r"[^a-zA-Z.!?']", " ", tweet)
  tweet = re.sub(r" +", " ", tweet)
  return tweet

In [0]:
data_clean = [clean_tweet(tweet) for tweet in data.text]

In [0]:
data_labels = data.sentiment.values
data_labels[data_labels == 4]=1

### Tokenization

In [0]:
FullTokenizer = bert.bert_tokenization.FullTokenizer
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=False)
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = FullTokenizer(vocab_file,do_lower_case)

In [12]:
tokenizer.tokenize("My dog loves strawberries.")

['my', 'dog', 'loves', 'straw', '##berries', '.']

In [13]:
tokenizer.convert_tokens_to_ids(tokenizer.tokenize("My dog loves strawberries."))

[2026, 3899, 7459, 13137, 20968, 1012]

In [0]:
def encode_sentence(sent):
  return tokenizer.convert_tokens_to_ids(tokenizer.tokenize(sent))

In [0]:
data_inputs = [encode_sentence(sentence) for sentence in data_clean]

### Dataset creation

In [0]:
data_with_len = [[sent, data_labels[i], len(sent)]
                 for i, sent in enumerate(data_inputs)]
random.shuffle(data_with_len)
data_with_len.sort(key=lambda x: x[2])
sorted_all = [(sent_lab[0], sent_lab[1])
              for sent_lab in data_with_len if sent_lab[2] > 2]

In [0]:
all_dataset = tf.data.Dataset.from_generator(lambda: sorted_all,
                                             output_types=(tf.int32, tf.int32))

In [0]:
BATCH_SIZE=32
all_batched = all_dataset.padded_batch(BATCH_SIZE,padded_shapes=((None, ),()))

In [0]:
NB_BATCHES = math.ceil(len(sorted_all)/BATCH_SIZE)
NB_BATCHES_TEST = NB_BATCHES // 10
all_batched.shuffle(NB_BATCHES)
test_dataset = all_batched.take(NB_BATCHES_TEST)
train_dataset = all_batched.skip(NB_BATCHES_TEST)

## Model Building

In [0]:
class DCNN(tf.keras.Model):
  def __init__(self,
               vocab_size,
               emb_dim=128,
               nb_filters=50,
               FFN_units=512,
               nb_classes=2,
               dropout_rate=0.1,
               training=False,
               name="dcnn"):
    super(DCNN, self).__init__(name=name)
    self.embedding = layers.Embedding(vocab_size,emb_dim)
    self.bigram = layers.Conv1D(filters=nb_filters,kernel_size=2,padding="valid",activation="relu")
    self.trigram = layers.Conv1D(filters=nb_filters,kernel_size=3,padding="valid",activation="relu")
    self.fourgram = layers.Conv1D(filters=nb_filters,kernel_size=4,padding="valid",activation="relu")
    self.pool = layers.GlobalMaxPooling1D()
    self.dense_1 = layers.Dense(units=FFN_units, activation="relu")
    self.dropout = layers.Dropout(rate=dropout_rate)
    if nb_classes == 2:
      self.last_dense = layers.Dense(units=1, activation="sigmoid")
    else:
      self.last_dense = layers.Dense(units=nb_classes, activation="softmax")

  def call(self, inputs, training):
    x = self.embedding(inputs)
    x_1 = self.bigram(x)
    x_1 = self.pool(x_1)
    x_2 = self.trigram(x)
    x_2 = self.pool(x_2)
    x_3 = self.fourgram(x)
    x_3 = self.pool(x_3)

    merged = tf.concat((x_1, x_2, x_3), axis = -1)
    merged = self.dense_1(merged)
    merged = self.dropout(merged, training)
    output = self.last_dense(merged)

    return output


## Training

In [0]:
VOCAB_SIZE = len(tokenizer.vocab)
EMB_DIM = 200
NB_FILTERS = 100
FFN_UNITS = 256
NB_CLASSES = 2

DROPOUT_RATE = 0.2

NB_EPOCHS = 5

In [0]:
Dcnn = DCNN(vocab_size=VOCAB_SIZE,
            emb_dim=EMB_DIM,
            nb_filters=NB_FILTERS,
            FFN_units=FFN_UNITS,
            nb_classes=NB_CLASSES,
            dropout_rate=DROPOUT_RATE)

In [0]:
if NB_CLASSES == 2:
  Dcnn.compile(loss="binary_crossentropy",
               optimizer="adam",
               metrics=["accuracy"])
else:
  Dcnn.compile(loss="sparse_categorical_crossentropy",
               optimizer="adam",
               metrics=["sparse_categorical_accuracy"])

In [0]:
checkpoint_path = "/content/drive/My Drive/tmp/bert/ckpt_bert_sent"

ckpt = tf.train.Checkpoint(Dcnn=Dcnn)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=1)

if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print("Latest checkpoint has been restored!")

In [0]:
class MyCustomCallback(tf.keras.callbacks.Callback):
  
  def on_epoch_end(self, epoch, logs=None):
    ckpt_manager.save()
    print("Checkpoint is saved at {}".format(checkpoint_path))

Result

In [26]:
Dcnn.fit(train_dataset,
         epochs=NB_EPOCHS,
         callback=[MyCustomCallback()])

Epoch 1/5
Epoch 2/5

In [27]:
results = Dcnn.evaluate(test_dataset)
print(results)

[0.4496183693408966, 0.7606815695762634]


In [0]:
def get_prediction(sentence):
  tokens = encode_sentence(sentence)
  inputs = tf.expand_dims(tokens, 0)

  output = Dcnn(inputs, training=False)

  sentiment = math.floor(output*2)

  if sentiment == 0:
    print("Output of the model: {}\nPredicted sentiment: negative.".format(output))
  if sentiment == 1:
    print("Output of the model: {}\nPredicted sentiment: positive.".format(output))  

In [29]:
get_prediction("This movie was pretty interesting.")

Output of the model: [[0.99813217]]
Predicted sentiment: positive.
