# License

In [None]:
# Copyright (C) 2021 Lucas Eduardo Gulka Pulcinelli
# This file is licensed under the terms of The 3-Clause BSD License, check the LICENSE file for details

# Installs and Imports

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_hub as tfhub
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split

# Data extraction

## Getting all data from csv

In [None]:
df_all = pd.read_csv("./data/NLP_data_raw.csv")

feelings = ["Optimistic", "Thankful", "Empathetic", "Pessimistic", "Anxious", "Sad",
            "Annoyed", "Denial", "Surprise", "Official.report", "Joking"]


In [None]:
df_all[feelings] = [[str(i) in label.split() for i in range(len(feelings))] for label in df_all["Labels"]]
df_all.head()

In [None]:
counts = np.zeros(11)
for row in df_all[feelings].values:
  for i in range(len(row)):
    counts[i] += row[i]
  
plt.barh(feelings, counts)

## Separating train from validation (not stratified)

In [None]:
train, val, t_len, v_len = train_test_split(df_all.values, range(len(df_all.values)), test_size=0.2)

counts_t = np.zeros(11)
for row in [i[3:] for i in train]:
  for i in range(len(row)):
    counts_t[i] += row[i]

counts_v = np.zeros(11)
for row in [i[3:] for i in val]:
  for i in range(len(row)):
    counts_v[i] += row[i]
  
plt.barh(feelings, counts_t)
plt.show()
plt.barh(feelings, counts_v)

In [None]:
train_tweets = np.array([i[1] for i in train])
train_labels = np.array([np.array(i[3:]) for i in train]).astype(np.uint8)
val_tweets   = np.array([i[1] for i in val])
val_labels   = np.array([np.array(i[3:]) for i in val]).astype(np.uint8)

# Text preprocessing (tokenization, padding)

In [None]:
num_words = 7500 #5000 samples in training/validation data, plus a good amount
oov = "<OOV>"
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=num_words, oov_token=oov)


tokenizer.fit_on_texts(train_tweets)
list(tokenizer.word_index)[0:50]

In [None]:
tweet_max_len = 140

train_pad = tf.keras.preprocessing.sequence.pad_sequences(tokenizer.texts_to_sequences(train_tweets), maxlen=tweet_max_len)
val_pad   = tf.keras.preprocessing.sequence.pad_sequences(tokenizer.texts_to_sequences(val_tweets), maxlen=tweet_max_len)
train_pad[0]

# Simple Neural Network

## One feeling, not using LSTM

In [None]:
embedding_dim = 32
model_jok = tf.keras.Sequential([
    tf.keras.layers.Embedding(tokenizer.num_words, embedding_dim),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(16, activation="relu"),
    tf.keras.layers.Dense(1, activation="sigmoid")

])


model_jok.compile(loss=tf.keras.losses.binary_crossentropy, optimizer="adam", 
                metrics=["accuracy", tfa.metrics.F1Score(num_classes=1, average="micro", threshold=0.4)])
model_jok.summary()

In [None]:
epochs = 40


model_jok.fit(x=train_pad, y=np.array([i[-1] for i in train_labels]), 
            validation_data=(val_pad, np.array([i[-1] for i in val])), epochs=epochs)

## All feelings, not using LSTM

In [None]:
def create_fellings_model(input_dim, num_words, feelings):

  fs_outs = []

  inputs = tf.keras.layers.Input(shape=input_dim, name="input")
  embed  = tf.keras.layers.Embedding(num_words, 256, name="embedding")(inputs)
  pol = tf.keras.layers.GlobalAveragePooling1D(name="gpol")(embed)

  for f in feelings:
    
    f_dense = tf.keras.layers.Dense(32, activation="relu", name="dense_"+f)(pol)
    fs_outs.append(tf.keras.layers.Dense(1, activation="sigmoid", name="out_"+f)(f_dense))

  outputs = tf.keras.layers.concatenate(fs_outs, name="output")

  model = tf.keras.models.Model(
      inputs=inputs,
      outputs=outputs,
      name="Twitter_NLP_Simple"
  )

  model.compile(loss = tf.keras.losses.mean_squared_error, optimizer = "adam",
              metrics = ["accuracy", tfa.metrics.F1Score(threshold=0.4, num_classes = len(feelings))])
  return model

model_all = create_fellings_model(tweet_max_len, tokenizer.num_words, feelings)
tf.keras.utils.plot_model(model_all)

In [None]:
epochs = 30


model_all.fit(x=train_pad, y=train_labels, validation_data=(val_pad, val_labels), epochs=epochs)

## One feeling, with bidirectional LSTM

In [None]:
embedding_dim = 32
model_jok = tf.keras.Sequential([
    tf.keras.layers.Embedding(tokenizer.num_words, embedding_dim),
    tf.keras.layers.Dropout(0.7),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),
    tf.keras.layers.Dropout(0.7),
    tf.keras.layers.Dense(32, activation="relu"),
    tf.keras.layers.Dropout(0.7),
    tf.keras.layers.Dense(1, activation="sigmoid")
])


model_jok.compile(loss=tf.keras.losses.mean_squared_error, optimizer="adam", 
                metrics=["accuracy", tfa.metrics.F1Score(num_classes=1, threshold=0.3)])
model_jok.summary()

In [None]:
epochs = 20


model_jok.fit(x=train_pad, y=np.array([i[-1] for i in train_labels]), validation_data=(val_pad, np.array([i[-1] for i in val])), epochs=epochs)

## All feelings, bidirectional LSTM

In [None]:
def create_fellings_model(input_dim, num_words, feelings):

  fs_outs = []

  inputs = tf.keras.layers.Input(shape=input_dim, name="input")
  embed  = tf.keras.layers.Embedding(num_words, 128, name="embed")(inputs)
  lstm1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True), name="lstm1")(embed)
  lstm2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64), name="lstm2")(lstm1)

  for f in feelings:
    f_dense = tf.keras.layers.Dense(32, activation="relu", name="dense_"+f, kernel_initializer = "he_uniform")(lstm2)
    fs_outs.append(tf.keras.layers.Dense(1, activation="sigmoid", name="out_"+f, )(f_dense))

  outputs = tf.keras.layers.concatenate(fs_outs, name="output")

  model = tf.keras.models.Model(
      inputs=inputs,
      outputs=outputs,
      name="Twitter_NLP_LSTM"
  )

  model.compile(loss = tf.keras.losses.mean_squared_error, 
              optimizer = "adam",
              metrics = ["accuracy", tfa.metrics.F1Score(average="macro", threshold=0.4, num_classes = len(feelings))])
  return model

model_all = create_fellings_model(tweet_max_len, tokenizer.num_words, feelings)
tf.keras.utils.plot_model(model_all)

In [None]:
epochs = 10


model_all.fit(x=train_pad, y=train_labels, validation_data=(val_pad, val_labels), epochs=epochs)

# Pretrained Transformer

In [None]:
  PREPROCESS_MODEL = "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3"
  BERT_MODEL       = "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-12_H-768_A-12/1"

  prep_l = tfhub.KerasLayer(PREPROCESS_MODEL, name="preprocess")
  bert_l = tfhub.KerasLayer(BERT_MODEL, name="bert")

In [None]:
def create_fellings_model(feelings):

  fs_outs = {}
  text = tf.keras.layers.Input(shape=(), dtype=tf.string)
  prep = prep_l(text)
  bert = bert_l(prep)
  lstm1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True, name="lstm1"))(bert["sequence_output"])
  lstm2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, name="lstm2"))(lstm1)

  for f in feelings:
    f_dense = tf.keras.layers.Dense(32, activation="relu", name="dense_"+f)(lstm2)
    fs_outs[f]   = tf.keras.layers.Dense(1, activation="sigmoid", name="out_"+f)(f_dense)

  outputs = tf.keras.layers.concatenate(fs_outs.values(), name="output")

  model = tf.keras.models.Model(
      inputs=text,
      outputs=outputs,
      name="Twitter_NLP_BERT"
  )

  model.compile(loss = tf.keras.losses.categorical_crossentropy, optimizer = "adam",
              metrics = [tfa.metrics.F1Score(average="macro", threshold=0.3, num_classes=len(feelings))])
  return model

model_all = create_fellings_model(feelings)
tf.keras.utils.plot_model(model_all)

In [None]:
epochs = 10

model_all.fit(x=train_tweets, y=train_labels, validation_data=(val_tweets, val_labels), epochs=epochs)

# Making predictions

In [None]:
pred = model_all(val_pad[:30])

for i in range(30):
  for j in range(len(feelings)):
    if pred[i][j] > 0.3:
      print(feelings[j], end=' ')
  print()
  for j in range(len(feelings)):
    if val_labels[i][j]:
      print(feelings[j], end=' ')
  print()
  print()