In [None]:
import urllib.request
import zipfile

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import tensorflow as tf

from tensorflow.keras.layers.experimental import preprocessing

###Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!cp -r "/content/drive/MyDrive/datasets/master_thesis/DisneylandReviews.csv.zip" "/content/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
zip_file_path = 'DisneylandReviews.csv.zip'

extract_to_path = ''

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)

In [None]:
train_df=pd.read_csv("DisneylandReviews.csv",encoding='ISO-8859-1')
train_text_list=[text[:1500] for text in train_df['Review_Text']]
train_target_list=[text for text in train_df['Rating']]
train_target_list = [1 if x > 3 else 0 for x in train_target_list]
train_text_negative=[]
train_text_positive=[]
train_target_negative=[]
train_target_positive=[]
for idx, y in enumerate(train_target_list):
  if y==0:
    train_text_negative.append(train_text_list[idx])
    train_target_negative.append(train_target_list[idx])
  else:
    train_text_positive.append(train_text_list[idx])
    train_target_positive.append(train_target_list[idx])


import random
negative_indices = random.sample(range(len(train_text_negative)), 700)
positive_indices = random.sample(range(len(train_text_positive)), 700)

train_text_list=[train_text_negative[i] for i in negative_indices[:500]]+[train_text_positive[i] for i in positive_indices[:500]]
train_target_list=[train_target_negative[i] for i in negative_indices[:500]]+[train_target_positive[i] for i in positive_indices[:500]]

valid_text_list=[train_text_negative[i] for i in negative_indices[500:600]]+[train_text_positive[i] for i in positive_indices[500:600]]
valid_target_list=[train_target_negative[i] for i in negative_indices[500:600]]+[train_target_positive[i] for i in positive_indices[500:600]]

test_text_list=[train_text_negative[i] for i in negative_indices[600:]]+[train_text_positive[i] for i in positive_indices[600:]]
test_target_list=[train_target_negative[i] for i in negative_indices[600:]]+[train_target_positive[i] for i in positive_indices[600:]]

text_list=[text for text in train_text_list]+[text for text in valid_text_list]+[text for text in test_text_list]

train_text_array=np.array(train_text_list)
train_target_array=np.array(train_target_list)

valid_text_array=np.array(valid_text_list)
valid_target_array=np.array(valid_target_list)

test_text_array=np.array(test_text_list)
test_target_array=np.array(test_target_list)

BUFFER_SIZE = len(train_text_list)
BATCH_SIZE = 16

In [None]:
def clean_and_format_text(input_text):
  lower_text = tf.strings.lower(input_text)
  cleaned_text = tf.strings.regex_replace(lower_text, '[^ a-z.?!,¿]', '')
  spaced_text = tf.strings.regex_replace(cleaned_text, '[.?!,¿]', r' \0 ')
  final_text=tf.strings.strip(spaced_text)
  return final_text


maximum_vocab_size = 20000

text_vectorizer = preprocessing.TextVectorization(
    standardize=clean_and_format_text,
    max_tokens=maximum_vocab_size
)
text_vectorizer.adapt(text_list)
def pair_tokenizer(input_text, target_text):
    processed_input = text_vectorizer(input_text)
    return processed_input, target_text

def prepare_dataset_batches(dataset):
    return (
        dataset
        .cache()
        .shuffle(BUFFER_SIZE)
        .batch(BATCH_SIZE)
        .map(pair_tokenizer, num_parallel_calls=tf.data.AUTOTUNE)
        .prefetch(tf.data.AUTOTUNE)
    )

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_text_array, train_target_array))
valid_dataset=tf.data.Dataset.from_tensor_slices((valid_text_array, valid_target_array))
test_dataset=tf.data.Dataset.from_tensor_slices((test_text_array, test_target_array))

train_batches=prepare_dataset_batches(train_dataset)
valid_batches=prepare_dataset_batches(valid_dataset)
test_batches=prepare_dataset_batches(test_dataset)


### the Model

In [None]:
from tensorflow import keras
import tensorflow as tf

In [None]:
class TheSelfAttention(keras.layers.Layer):
  def __init__(self,embed_size,heads,**kwargs):
    super(TheSelfAttention,self).__init__(**kwargs)

    self.embedding_dim = embed_size
    self.num_heads = heads
    self.depth = embed_size//heads

    assert self.depth * self.num_heads == self.embedding_dim

    self.dense_value = keras.layers.Dense(self.depth, name="value")
    self.dense_key = keras.layers.Dense(self.depth, name="key")
    self.dense_query = keras.layers.Dense(self.depth, name="query")
    self.final_dense = keras.layers.Dense(self.embedding_dim, name="output")

  def call(self,values,keys,queries,mask):
    batch_size = tf.shape(queries)[0]

    def split_heads(x):
        return tf.reshape(x, (batch_size, tf.shape(x)[1], self.num_heads, self.depth))

    values = self.dense_value(split_heads(values))
    keys = self.dense_key(split_heads(keys))
    queries = self.dense_query(split_heads(queries))

    attention_scores = tf.einsum("bnhd,bmhd->bhnm", queries, keys)

    attention_weights=tf.nn.softmax(attention_scores/(self.depth**(1/2)),axis=3)
    context_layer=tf.reshape(tf.einsum("nhql,nlhd->nqhd", attention_weights,values),(batch_size,tf.shape(queries)[1],self.num_heads*self.depth))
    attended_output = self.final_dense(context_layer)
    return attended_output

In [None]:
class TransformerBlock(keras.layers.Layer):
  def __init__(self,embedding_dim, num_heads, dropout_rate, forward_expansion, **kwargs):
    super(TransformerBlock,self).__init__(**kwargs)
    self.self_attention=TheSelfAttention(embedding_dim, num_heads)
    self.norm1=keras.layers.LayerNormalization()
    self.norm2=keras.layers.LayerNormalization()

    self.feed_forward=tf.keras.Sequential([
        keras.layers.Dense(forward_expansion * embedding_dim,input_shape=(None,embed_size)),
        keras.layers.Activation("relu"),
        keras.layers.Dense(embedding_dim)
    ]
    )

    self.dropout=keras.layers.Dropout(dropout)
  def call(self,values,keys,queries,mask,training):
    attention_output=self.self_attention(values,keys,queries,mask)

    out1=self.dropout(self.norm1(attention_output+queries))
    forward_output=self.feed_forward(out1)
    out2=self.dropout(self.norm2(forward_output+out1),training=training)
    return out2

In [None]:
class Model(keras.layers.Layer):
  def __init__(
      self,
      vocab_size,
      embedding_dim,
      num_layers,
      num_heads,
      forward_expansion,
      dropout_rate,
      max_length,
      **kwargs
    ):
    super(Model,self).__init__(**kwargs)
    self.embed_size=embed_size
    self.token_embedding = keras.layers.Embedding(vocab_size, embedding_dim)
    self.position_embedding = keras.layers.Embedding(max_length, output_dim=embedding_dim)
    self.transformer_layers = [TransformerBlock(embedding_dim, num_heads, dropout_rate, forward_expansion) for _ in range(num_layers)]
    self.dropout=keras.layers.Dropout(dropout_rate)
    self.final_layer=tf.keras.layers.LSTM(2,activation='softmax')

  def call(self,inputs,mask,training):
    batch_size=tf.shape(inputs)[0]
    seq_len=tf.shape(inputs)[1]

    positions=tf.range(0,seq_len)
    positions=tf.reshape(positions,(1,seq_len))
    positions=tf.tile(positions,[batch_size,1])

    positions = self.position_embedding(positions)
    x = self.token_embedding(inputs) + positions
    x = self.dropout(x, training=training)

    for layer in self.transformer_layers:
      x=layer(x,x,x,mask,training=training)
    x = self.dropout(x, training=training)
    output = self.final_layer(x)
    return output

### Metrics and Training

In [None]:
num_epochs=30
learning_rate=4e-5
src_vocab_size=maximum_vocab_size
embed_size=512
heads=8
num_encoder_layers=4
dropout=0.1
max_length=100
forward_expansion=2


optimizer=keras.optimizers.Adam(learning_rate)
loss_object=tf.keras.losses.SparseCategoricalCrossentropy()
train_accuracies=[]
valid_accuracies=[]
test_accuracies=[]

train_f1s=[]
valid_f1s=[]
test_f1s=[]

def accuracy_function(real, pred):
  accuracies = tf.equal(real, tf.argmax(pred, axis=1))
  accuracies = tf.cast(accuracies, dtype=tf.float32)
  return tf.reduce_sum(accuracies)/tf.cast(tf.shape(accuracies)[0],dtype=tf.float32)


train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.Mean(name='train_accuracy')
train_f1=tf.keras.metrics.Mean(name='train_f1')

valid_loss=tf.keras.metrics.Mean(name='valid_loss')
valid_accuracy=tf.keras.metrics.Mean(name='valid_accuracy')
valid_f1=tf.keras.metrics.Mean(name='valid_f1')

test_loss=tf.keras.metrics.Mean(name='test_loss')
test_accuracy=tf.keras.metrics.Mean(name='test_accuracy')
test_f1=tf.keras.metrics.Mean(name='test_f1')

train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None), dtype=tf.int64),
]
train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None), dtype=tf.int64),
]

In [None]:
def f1_score_function(real, pred):

    predicted_classes = tf.argmax(pred, axis=1)

    TP = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(real, 1), tf.equal(predicted_classes, 1)), dtype=tf.float32))
    FP = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(real, 0), tf.equal(predicted_classes, 1)), dtype=tf.float32))
    FN = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(real, 1), tf.equal(predicted_classes, 0)), dtype=tf.float32))


    precision = TP / (TP + FP)
    recall = TP / (TP + FN)


    precision = tf.where(tf.math.is_nan(precision), tf.zeros_like(precision), precision)
    recall = tf.where(tf.math.is_nan(recall), tf.zeros_like(recall), recall)

    f1_score = 2 * ((precision * recall) / (precision + recall))


    f1_score = tf.where(tf.math.is_nan(f1_score), tf.zeros_like(f1_score), f1_score)

    return f1_score

In [None]:
model=Model(
          src_vocab_size,
          embed_size,
          num_encoder_layers,
          heads,
          forward_expansion,
          dropout,
          max_length
      )

@tf.function(input_signature=train_step_signature)
def train_step(inp_data,target):
  with tf.GradientTape() as tape:
    output=model(inp_data,None,True)
    loss=loss_object(target,output)
  gradients=tape.gradient(loss,model.trainable_variables)
  optimizer.apply_gradients(zip(gradients,model.trainable_variables))

  train_loss(loss)
  train_accuracy(accuracy_function(target,output))
  train_f1(f1_score_function(target,output))

@tf.function(input_signature=train_step_signature)
def valid_step(inp_data,target):
  output=model(inp_data,None,False)
  loss=loss_object(target,output)
  valid_loss(loss)
  valid_accuracy(accuracy_function(target,output))
  valid_f1(f1_score_function(target,output))

@tf.function(input_signature=train_step_signature)
def test_step(inp_data,target):
  output=model(inp_data,None,False)
  loss=loss_object(target,output)
  test_loss(loss)
  test_accuracy(accuracy_function(target,output))
  test_f1(f1_score_function(target,output))


for epoch in range(num_epochs):
  train_loss.reset_states()
  train_accuracy.reset_states()
  train_f1.reset_states()

  valid_accuracy.reset_states()
  valid_f1.reset_states()
  valid_loss.reset_states()

  test_accuracy.reset_states()
  test_f1.reset_states()
  test_loss.reset_states()

  for (batch, (inp, tar)) in enumerate(train_batches):
    tar=tf.cast(tar,dtype=tf.int64)
    train_step(inp, tar)
  for inp,tar in valid_batches:
    tar=tf.cast(tar,dtype=tf.int64)
    valid_step(inp,tar)

  for inp,tar in test_batches:
    tar=tf.cast(tar,dtype=tf.int64)
    test_step(inp,tar)

  print(f'Loss {train_loss.result():.4f} train_f1 {train_f1.result():.4f} Accuracy {train_accuracy.result():.4f}\
   valid_Loss {valid_loss.result():.4f} valid_Accuracy {valid_accuracy.result():.4f} valid_f1 {valid_f1.result():.4f}  test_Loss {test_loss.result():.4f} test_Accuracy {test_accuracy.result():.4f} test_f1 {test_f1.result():.4f}')

  train_accuracies.append(train_accuracy.result())
  valid_accuracies.append(valid_accuracy.result())
  test_accuracies.append(test_accuracy.result())

  train_f1s.append(train_f1.result())
  valid_f1s.append(valid_f1.result())
  test_f1s.append(test_f1.result())




Loss 0.7065 train_f1 0.5278 Accuracy 0.5347   valid_Loss 0.6754 valid_Accuracy 0.6202 valid_f1 0.6534  test_Loss 0.6855 test_Accuracy 0.6202 test_f1 0.6303
Loss 0.6835 train_f1 0.5295 Accuracy 0.5734   valid_Loss 0.6456 valid_Accuracy 0.6058 valid_f1 0.6506  test_Loss 0.6711 test_Accuracy 0.6106 test_f1 0.6474
Loss 0.6776 train_f1 0.5804 Accuracy 0.5823   valid_Loss 0.6577 valid_Accuracy 0.5913 valid_f1 0.6352  test_Loss 0.6685 test_Accuracy 0.5817 test_f1 0.6222
Loss 0.6783 train_f1 0.4983 Accuracy 0.5923   valid_Loss 0.6667 valid_Accuracy 0.5865 valid_f1 0.6601  test_Loss 0.6816 test_Accuracy 0.5769 test_f1 0.6542
Loss 0.6738 train_f1 0.5817 Accuracy 0.5933   valid_Loss 0.6602 valid_Accuracy 0.6010 valid_f1 0.6768  test_Loss 0.6803 test_Accuracy 0.5721 test_f1 0.6413
Loss 0.6645 train_f1 0.6210 Accuracy 0.6042   valid_Loss 0.6706 valid_Accuracy 0.6010 valid_f1 0.6011  test_Loss 0.6667 test_Accuracy 0.6202 test_f1 0.6094
Loss 0.6696 train_f1 0.5763 Accuracy 0.5893   valid_Loss 0.6638 

# save result

In [None]:
df = pd.DataFrame({
    'Train Accuracies': [i.numpy() for i in train_accuracies],
    'Valid Accuracies': [i.numpy() for i in valid_accuracies],
    'Test Accuracies': [i.numpy() for i in test_accuracies],
    'Train F1 Scores': [i.numpy() for i in train_f1s],
    'Valid F1 Scores': [i.numpy() for i in valid_f1s],
    'Test F1 Scores': [i.numpy() for i in test_f1s]
})

excel_file_path = 'LSTM_Disneyland5.xlsx'
df.to_excel(excel_file_path, engine='openpyxl')

!cp -r "/content/LSTM_Disneyland5.xlsx" "/content/drive/MyDrive/datasets/master_thesis/record/"

In [None]:


highest_value = max(valid_accuracies)


index_of_highest = valid_accuracies.index(highest_value)

print(test_accuracies[index_of_highest], test_f1s[index_of_highest])

tf.Tensor(0.77403843, shape=(), dtype=float32) tf.Tensor(0.7945787, shape=(), dtype=float32)
