In [1]:
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer
import re
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
data_train = pd.read_csv("train.csv")
data_test = pd.read_csv("test.csv")

In [3]:
def clear_text(text):
    text = str(text)
    text = text.lower()
    text = re.sub(r'[^ a-z]', '', text)
    while text.find('  ') != -1:
        text = text.replace('  ', ' ')
    return text

In [4]:
data_train['clean_text']= data_train['Text'].apply(lambda x:clear_text(x))
data_test['clean_text']= data_test['Text'].apply(lambda x:clear_text(x))

In [5]:
data_train['clean_text'] = data_train['clean_text'].apply(lambda x:word_tokenize(x))
data_test['clean_text'] = data_test['clean_text'].apply(lambda x:word_tokenize(x))

In [6]:
def remove_stop_words(text):
    stop_words = stopwords.words('english')
    filtered_text = []
    for w in text:
        if w not in stop_words:
            filtered_text.append(w)
    return filtered_text

In [7]:
data_train['clean_text'] = data_train['clean_text'].apply(lambda x:remove_stop_words(x))
data_test['clean_text'] = data_test['clean_text'].apply(lambda x:remove_stop_words(x))

In [8]:
wordnet_lemmatizer = WordNetLemmatizer()

def lemmatizer(text):
    return [wordnet_lemmatizer.lemmatize(w) for w in text]

data_train['clean_text'] = data_train['clean_text'].apply(lambda x:lemmatizer(x))
data_test['clean_text'] = data_test['clean_text'].apply(lambda x:lemmatizer(x))

In [9]:
def to_text(data):
    text = []
    for i in data:
        for word in i:
            text.append(word)
    return text

text = to_text(data_train['clean_text'])
text += to_text(data_test['clean_text'])
text.sort()
dictionary = []
for i in text:
    if i not in dictionary:
        dictionary.append(i)

In [10]:
def convert_to_numbers(text):
    res = []
    for word in text:
        res.append(dictionary.index(word))
    return res

def convert_to_text(numbers):
    res = []
    for num in numbers:
        res.append(dictionary[num])
    return res

In [11]:
data_train['numbers'] = data_train['clean_text'].apply(lambda x:convert_to_numbers(x))
data_test['numbers'] = data_test['clean_text'].apply(lambda x:convert_to_numbers(x))

In [12]:
#good: happy, love, surprise
#bad: sadness, anger, fear

def t_or_f(emotion):
    if emotion == 'happy' or emotion == 'love' or emotion == 'surprise':
        return 1        
    else:        #elif emotioin == 'sadness' or 'anger' or 'fear': return 0
        return 0
    
data_train['Emotion_in_digit'] = data_train['Emotion'].apply(lambda x:t_or_f(x))
data_test['Emotion_in_digit'] = data_test['Emotion'].apply(lambda x:t_or_f(x))

# data_train[['Emotion','Emotion_in_digit']]

In [13]:
data_train = data_train[['Emotion_in_digit', 'numbers']]
data_test = data_test[['Emotion_in_digit', 'numbers']]

In [14]:
def vectorize_sequences(sequences, dimension=30000):
    results = np.zeros((len(sequences), dimension))
    for i, sequence in enumerate(sequences):
        for j in sequence:
            results[i, j] = 1.
    return results

In [15]:
x_train = vectorize_sequences(data_train['numbers'])
x_test = vectorize_sequences(data_test['numbers'])
y_train = np.asarray(data_train['Emotion_in_digit']).astype("float32")
y_test = np.asarray(data_test['Emotion_in_digit']).astype("float32")

In [16]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

In [17]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [18]:
vocab_size = 20000  # Only consider the top 20k words
maxlen = 200  # Only consider the first 200 words of each movie review
max_seq_len = 200

x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen = max_seq_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen = max_seq_len)
y_train = np.asarray(y_train).astype("float32")
y_test = np.asarray(y_test).astype("float32")

In [19]:
embed_dim = 128
num_heads = 2
ff_dim = 64  

inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

In [20]:
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
history = model.fit(
    x_train, y_train, batch_size=32, epochs=10, validation_data=(x_test, y_test)
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [21]:
score = model.evaluate(x_test, y_test, verbose=1) 

print('Test score:', score[0]) 
print('Test accuracy:', score[1])

Test score: 0.6830772757530212
Test accuracy: 0.5768174529075623
