In [None]:
import csv
import os
import tensorflow as tf
import numpy as np
import pickle
import collections
import string
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, LSTM, Dropout, Activation, Embedding, Bidirectional
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
import tqdm.notebook as tq

In [None]:
LEMMATIZE_WORDS = False
REMOVE_PUNCTUATION = False
REMOVE_STOPWORDS = False
USE_PRETRAINED_EMBEDDINGS = False

In [None]:
# read dataset
with open('../data/dataset/train/us_train.text', 'r', encoding="utf8") as file:
    train_text = file.readlines()

with open('../data/dataset/train/us_train.labels', 'r', encoding="utf8") as file:
    train_labels = np.array([int(label.strip()) for label in file])

with open('../data/dataset/trial/us_trial.text', 'r', encoding="utf8") as file:
    val_text = file.readlines()

with open('../data/dataset/trial/us_trial.labels', 'r', encoding="utf8") as file:
    val_labels = np.array([int(label.strip()) for label in file])

In [None]:
# clean tweets according to the variables set
def cleanTweet(tweet, lemmatize_words, remove_punctuation, remove_stop_words):
  
  tweet = tweet.lower().strip()

  if remove_punctuation:
    tweet = tweet.translate(str.maketrans('','',string.punctuation))
  if lemmatize_words:
    wnl = WordNetLemmatizer()
    tweet = ' '.join([ wnl.lemmatize(word) for word in tweet.split(' ')])
  if remove_stop_words:
    tweet = ' '.join([word for word in tweet.split(' ') if word not in stop_words.words('english')])
  
  return tweet

for i in tq.tqdm(range(0, len(train_text))):
  train_text[i] = cleanTweet(train_text[i],  lemmatize_words=LEMMATIZE_WORDS, remove_punctuation=REMOVE_PUNCTUATION, remove_stop_words=REMOVE_STOPWORDS)
print(train_text[0])
for i in tq.tqdm(range(0, len(val_text))):
  val_text[i] = cleanTweet(val_text[i],  lemmatize_words=LEMMATIZE_WORDS, remove_punctuation=REMOVE_PUNCTUATION, remove_stop_words=REMOVE_STOPWORDS)
print(val_text[0])

In [None]:
# save cleaned tweets to file
with open('train_text', 'wb') as fp:
    pickle.dump(train_text, fp)

with open('val_text', 'wb') as fp:
    pickle.dump(val_text, fp)

In [None]:
# load cleaned tweets from file
with open('train_text', 'rb') as fp:
    train_text = pickle.load(fp)

with open('val_text', 'rb') as fp:
    val_text = pickle.load(fp)

In [None]:
# Create a Tokenizer and find the max length of the tweets
tokenizer = Tokenizer(num_words=20000)

max_length = max(len(tweet) for tweet in train_text)

tokenizer.fit_on_texts(train_text)

def vectorize_text(tweets):
  tweets = tokenizer.texts_to_sequences(tweets)
  tweets = pad_sequences(tweets, maxlen=max_length, truncating='post', padding='post')
  return tweets

In [None]:
# Vectorize tweets
train_vectorized = vectorize_text(train_text)
val_vectorized = vectorize_text(val_text)

In [None]:
# Download Pre-Trained English GloVe embeddings
if USE_PRETRAINED_EMBEDDINGS:dm
  import requests
  import zipfile
  URL = "http://nlp.stanford.edu/data/glove.42B.300d.zip"

  def fetch_data(url=URL, target_file='glove.zip', delete_zip=*****************")
      response = requests.g #read chunk by chunk
      handle = open(target_file, "wb")
      for chunk in tqdm.tqdm(response.iter_content(chunk_size=512)):
          if chunk:  
              handle.write(chun") 
      #extract zip_file
      zf = {} file".format(target_file))
      zf.exile".format(dataset_name+".zip"))
          os.remove(path=zip_file)

  fetch_data()

In [None]:
# Create pre-trained embedding matrix
if USE_PRETRAINED_EMBEDDINGS:
  glove_file = "glove.42B.300d.txt"

  EMBEDDING_VECTOR_LENGTH =200
  def construct_embedding_matrix(glove_file, word_index):
      embedding_dict = {}
      with open(glove_file,'r') as f:
          for line in f:
              values=lin) the word
              word=values[0]
              if word in word_index.keys():
    t the vector
                  vector = np.asarray(values[1:], 'float32')
                  embedding_dict[ed to 0 vectors

      num_words=initialize it to 0
      embedding_matrix=np.zeros((num_words, EMBEDDING_VECTOR_LENGTH))

      for word,i in tqdm.tqdm(word_index.items()):
          if i < num_words:
              vect=embedding_dict.get(word, [])
              if len(vect)>0:
                  embedding_matrix[i] = vect[:EMBEDDING_VECTOR_LENGTH]
      return embedding_matrix
    
  embedding_matrix =  construct_embedding_matrix(glove_file, tokenizer.word_index)

In [None]:
# Construct and compile model
def construct_model():
    model = Sequential()

    if USE_PRETRAINED_EMBEDDINGS:
      model.add(Embedding(len(tokenizer.word_index)+1, EMBEDDING_VECTOR_LENGTH, embeddings_initializer=Constant(embedding_matrix), trainable=False, mask_zero=True))
    else:
      model.add(Embedding(len(tokenizer.word_index)+1, 64, mask_zero=True))
    model.add(Dropout(0.2))
    model.add(Bidirectional(LSTM(64)))
    model.add(Dropout(0.2))
    model.add(Dense(20, activation="softmax"))

    return model

model = construct_model()

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# Create training checkpoints
checkpoint_path = "cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True)

In [None]:
model.summary()

In [None]:
# Train model
num_epochs = 3
history = model.fit(train_vectorized, train_labels, epochs=num_epochs, validation_data=(val_vectorized, val_labels), callbacks=[cp_callback], batch_size=64)

In [None]:
# Save model
model.save("../models/LSTM_US")

In [None]:
# Load model
model = tf.keras.models.load_model('../models/LSTM_US')

In [None]:
# Load test data
with open('../data/dataset/test/us_test.text', 'r', encoding="utf8") as file:
    test_text = file.readlines()

test_vectorized = vectorize_text(test_text)

In [None]:
# Make Predictions on test data
pred = model.predict(test_vectorized)

In [None]:
# Save predictions to file
labels = []
for output in pred:
  best_label = np.argmax(output)
  labels.append(best_label)

with open('output.labels' , 'w') as f:
  for label in labels:
    f.write("%s\n" %label)


In [None]:
# Test model on custom input
text = ["This is a test tweet for my lovely NLP Assignment"]
text_vectorized = vectorize_text(text)

predictions = model.predict(text_vectorized)

emojis = {0: '❤️', 1: '😍', 2: '😂', 3: '💕', 4: '🔥', 5: '😊', 6: '😎', 7:'✨', 8: '💙', 9: '😘', 10: '📷', 11: '🇺🇸', 12: '☀', 13: '💜', 14: '😉', 15: '💯', 16: '😁', 17: '🎄', 18: '📸', 19: '😜'}

print('\"'+text[0] + "\" - Predicted Emoji: " + emojis[np.argmax(predictions)])edictions)])