In [None]:
import os
import math
import multiprocessing
import numpy as np
import pandas as pd
import random as rn
import matplotlib.pyplot as plt
import tensorflow as tf
from gensim.models import Word2Vec as w2v
import nltk
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Embedding, LSTM
from keras.utils import np_utils
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.layers.wrappers import Bidirectional
from evaluate import plot_confusion_matrix,calculate_performance_metrics
 
SEED = 123456


os.environ['PYTHONHASHSEED']=str(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
rn.seed(SEED)

physical_devices = tf.config.list_physical_devices('GPU') 
tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
class word2vec:
    def __init__(self, tweet_file=None,tweets=None, num_features=100, min_word_count=3,context_size=7,downsampling=1e-3,seed=1,epochs=12):
        self.file = tweet_file
        self.tweets = tweets
        self.num_features = num_features
        self.min_word_count = min_word_count
        self.context_size = context_size
        self.downsampling = downsampling
        self.epochs=epochs
        self.seed = seed

    def preprocess_tweets(self):
        #Tokenizes tweets to words
        raw_sentences = []
        if self.file is not None:
            tweets = open(self.file, "r",encoding="utf8")
        else:
            tweets = self.tweets
        for tweet in tweets:
            raw_sentences.append(nltk.word_tokenize(tweet))
        self.sentences = raw_sentences


    def make_model(self):
        #Train model for word2vec vectors on your dataset
            self.tweet2vec = w2v(
            sg = 1,
            seed = self.seed,
            workers = multiprocessing.cpu_count(),
            size = self.num_features,
            min_count = self.min_word_count,
            window = self.context_size,
            sample = self.downsampling
        )

        # Build the vocabulary
        self.tweet2vec.build_vocab(self.sentences)
        # Train the model
        self.tweet2vec.train(self.sentences, epochs = 12, total_examples = len(self.sentences))

    def run(self):
        self.preprocess_tweets()
        self.make_model()

In [None]:
df = pd.read_csv('datasets/balanced_data.csv',index_col=False,sep=',')
df.head()

In [None]:
def clean_data(text):
    import re
    HASHTAGS_REGEX = re.compile('#')
    text = HASHTAGS_REGEX.sub('', text)

    MENTIONS_REGEX = re.compile('@[^\s]+')
    text = MENTIONS_REGEX.sub('', text)
    
    LINK_REGEX = re.compile('https?://[^\s]+')
    text = LINK_REGEX.sub('', text)

    puncs = '!"$%^&*()_+~-={}|[]\:";<>,.?/'+'0123456789'
    temp = str.maketrans(dict.fromkeys(puncs,""))
    text=text.translate(temp)
    
    temp= str.maketrans(dict.fromkeys("'`","")) #to preserve can't as cant
    text = text.translate(temp)
    
    clean_text = re.sub(u'[\u007B-\uFFFF]','',text)
    return clean_text.lower()

df.text = df.text.map(clean_data)
df.head()

In [None]:
tot_classes = len(set(df.emoji))
print(tot_classes)

## Training word-embeddings

In [None]:
w2vec = word2vec(tweets = df.text, num_features=100, min_word_count=3,context_size=5,downsampling=1e-3,seed=1,epochs=1000)
w2vec.run()

In [None]:
# w = 'red'
# print(w2vec.tweet2vec.wv.most_similar(positive=w))

In [None]:
tweets = list(df.text)
labels = list(df.emoji)

N = int(0.9*len(tweets))

all_train_tweets = tweets[:N]
all_train_labels = labels[:N]

test_tweets = tweets[N:]
test_labels = labels[N:]

val_N = int(0.9*len(all_train_tweets))

train_tweets = all_train_tweets[:val_N]
train_labels = all_train_labels[:val_N]

val_tweets = all_train_tweets[val_N:]
val_labels = all_train_labels[val_N:]

In [None]:
all_tweets = train_tweets + val_tweets + test_tweets
max_length = math.ceil(sum([len(s.split(" ")) for s in all_tweets])/len(all_tweets))

In [None]:
def encode_docs(tweets):
    #Translate tweets to sequence of numbers
    tokenizer = Tokenizer(filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n', split=" ", lower=True)
    tokenizer.fit_on_texts(tweets)
    return tokenizer, tokenizer.texts_to_sequences(tweets)

In [None]:
def populate_weight_matrix(vocab, raw_embedding):
    # Create weight matrix from pre-trained embeddings
    vocab_size = len(vocab) + 1
    weight_matrix = np.zeros((vocab_size, 100))
    for word, i in vocab.items():
        if word in raw_embedding:
            weight_matrix[i] = raw_embedding[word]
    return weight_matrix

In [None]:
tokenizer, encoded_docs = encode_docs(all_tweets)
temp_train = pad_sequences(encoded_docs[:len(train_tweets)], maxlen=max_length, padding='post')
temp_val = pad_sequences(encoded_docs[len(train_tweets):len(train_tweets+val_tweets)], maxlen=max_length, padding='post')
temp_test = pad_sequences(encoded_docs[-len(test_tweets):], maxlen=max_length, padding='post')
temp_all_train = pad_sequences(encoded_docs[:len(all_train_tweets)], maxlen=max_length, padding='post')

In [None]:
vocab = tokenizer.word_index
weight_matrix = populate_weight_matrix(vocab, w2vec.tweet2vec.wv)

## BiLSTM Model 

In [None]:
y_train = np_utils.to_categorical(train_labels, tot_classes)
y_val = np_utils.to_categorical(val_labels, tot_classes)
embedding_layer = Embedding(len(vocab) + 1, 100, weights=[weight_matrix], input_length=max_length, trainable=True, mask_zero=True)
model_rnn = Sequential()
model_rnn.add(embedding_layer)
model_rnn.add(Bidirectional(LSTM(128, dropout=0.2, return_sequences=True)))
model_rnn.add(Bidirectional(LSTM(128, dropout=0.2)))
model_rnn.add(Dense(200, activation='relu', input_dim=256))
model_rnn.add(Dropout(0.5))
model_rnn.add(Dense(100, activation='relu'))
model_rnn.add(Dropout(0.25))
model_rnn.add(Dense(tot_classes, activation='softmax'))
adam = keras.optimizers.Adam(learning_rate=0.001, beta_1=0.99, beta_2=0.999)

model_rnn.compile(loss='categorical_crossentropy',
              optimizer=adam,
              metrics=['accuracy'])
model_rnn.summary()

In [None]:
history = model_rnn.fit(temp_train, y_train, epochs=4, validation_data=(temp_val, y_val),batch_size=temp_train.shape[0]//100)

In [None]:
bilstm_pred = model_rnn.predict_classes(temp_test)
calculate_performance_metrics(test_labels,bilstm_pred,p_average="weighted",
                              r_average="weighted",f1_average="weighted",normalize_cm="true",figsize=(7,7))