In [43]:
!git clone https://github.com/cardiffnlp/tweeteval

fatal: destination path 'tweeteval' already exists and is not an empty directory.


  pid, fd = os.forkpty()


In [44]:
!pip install keras tensorflow

!pip install nltk



In [45]:
import numpy as np
import pandas as pd
import os

from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Bidirectional
from keras.layers import Embedding
from tensorflow.keras.preprocessing.text import Tokenizer # Changed import statement to use tensorflow.keras
from tensorflow.keras.preprocessing.sequence import pad_sequences 

In [46]:
# Define the dataset folder path
dataset_path = "//kaggle/working/tweeteval/datasets/emoji"  # Replace this with the actual path if different

# Function to load texts and labels from a file
def load_texts_and_labels(text_file, label_file):
    # Read texts
    with open(text_file, "r", encoding="utf-8") as f:
        texts = f.read().splitlines()

    # Read labels
    with open(label_file, "r", encoding="utf-8") as f:
        labels = f.read().splitlines()

    # Return a list of tuples (text, label)
    return list(zip(texts, labels))

# Dictionary to hold the dataset
data = {}

# Load train, validation, and test sets
data["train"] = load_texts_and_labels(
    os.path.join(dataset_path, "train_text.txt"),
    os.path.join(dataset_path, "train_labels.txt")
)
data["val"] = load_texts_and_labels(
    os.path.join(dataset_path, "val_text.txt"),
    os.path.join(dataset_path, "val_labels.txt")
)
data["test"] = load_texts_and_labels(
    os.path.join(dataset_path, "test_text.txt"),
    os.path.join(dataset_path, "val_labels.txt")  # Assuming test labels are in `val_labels.txt`
)

In [47]:
# Convert data into DataFrames for train, validation, and test sets
def convert_to_dataframe(data_split):
    return pd.DataFrame(data_split, columns=["TEXT", "Label"])

# Create DataFrames for train, validation, and test sets
train_data = convert_to_dataframe(data["train"])
val_data = convert_to_dataframe(data["val"])
test_data = convert_to_dataframe(data["test"])

# Display the first few rows of each DataFrame to confirm the format
print("Training Data:")
print(train_data.head())

print("\nValidation Data:")
print(val_data.head())

print("\nTest Data:")
print(test_data.head())


Training Data:
                                                TEXT Label
0  Sunday afternoon walking through Venice in the...    12
1  Time for some BBQ and whiskey libations. Chomp...    19
2  Love love love all these people ️ ️ ️ #friends...     0
3                               ️ ️ ️ ️ @ Toys"R"Us      0
4  Man these are the funniest kids ever!! That fa...     2

Validation Data:
                                                TEXT Label
0  A little throwback with my favourite person @ ...     0
1  glam on @user yesterday for #kcon makeup using...     7
2  Democracy Plaza in the wake of a stunning outc...    11
3   Then &amp; Now. VILO @ Walt Disney Magic Kingdom     0
4               Who never... @ A Galaxy Far Far Away     2

Test Data:
                                                TEXT Label
0                                  en Pelham Parkway     0
1  The calm before...... | w/ sofarsounds @user |...     7
2  Just witnessed the great solar eclipse @ Tampa...    11
3  This lit

In [48]:
mapping_path = "/kaggle/working/tweeteval/datasets/emoji/mapping.txt"

# Load the mapping file into a DataFrame
# Assuming mapping.txt is formatted as tab-separated, with each line as: Index Emoji Description
mapping_df = pd.read_csv(mapping_path, sep="\t", header=None, names=["index", "emoticons", "description"])

# Add a numeric "number" column that matches the row index
mapping_df["number"] = mapping_df.index

# Rename columns to match the required format
mapping_df = mapping_df.rename(columns={"index": "Unnamed: 0"})

# Drop the "description" column if it's not needed
mappings= mapping_df[["Unnamed: 0", "emoticons", "number"]]

# Display the first few rows to verify
print(mappings.head())

  Unnamed: 0                      emoticons  number
0          ❤                    _red_heart_       0
1          😍  _smiling_face_with_hearteyes_       1
2          😂       _face_with_tears_of_joy_       2
3          💕                   _two_hearts_       3
4          🔥                         _fire_       4


In [49]:
train_data.shape, test_data.shape, mappings.shape

((45000, 2), (5000, 2), (20, 3))

In [50]:
train_length = train_data.shape[0]
test_length = test_data.shape[0]
train_length, test_length

(45000, 5000)

In [51]:
# import nltk

# # Download the 'stopwords' dataset
# nltk.download('stopwords')

# from nltk.corpus import stopwords

# stop_words = stopwords.words("english")
# stop_words[:5]

In [52]:
# tokenize the sentences
# def tokenize(tweets):
#     stop_words = stopwords.words("english")
#     tokenized_tweets = []
#     for tweet in tweets:
#         # split all words in the tweet
#         words = tweet.split(" ")
#         tokenized_string = ""
#         for word in words:
#             # remove @handles -> useless -> no information
#             if word and word[0] != '@' and word not in stop_words:
#                 # if a hashtag, remove # -> adds no new information
#                 if word[0] == "#":
#                     word = word[1:]
#                 tokenized_string += word + " "
#         tokenized_tweets.append(tokenized_string)
#     return tokenized_tweets

In [53]:
import requests
import re

def load_slang_dict(url):
    """
    Load the slang dictionary from the provided URL.
    Assumes the file contains each slang and its meaning separated by a hyphen (-).
    """
    slang_dict = {}
    response = requests.get(url)
    if response.status_code == 200:
        lines = response.text.splitlines()
        for line in lines:
            if '-' in line:  # Assumes the format is "slang - meaning"
                parts = line.split('-', 1)  # Split on the first hyphen
                if len(parts) == 2:
                    slang, meaning = parts
                    slang_dict[slang.strip().lower()] = meaning.strip()
    else:
        raise Exception(f"Failed to fetch slang dictionary from {url}. HTTP {response.status_code}")
    return slang_dict

def expand_slangs(word, slang_dict):
    """
    Replace slangs with their expanded forms based on a given dictionary.
    """
    return slang_dict.get(word.lower(), word)

def reduce_elongated_words(word):
    """
    Reduce elongated words (e.g., 'soooo' -> 'so').
    """
    return re.sub(r"(.)\1{2,}", r"\1", word)

def tokenize(tweets, slang_dict):
    """
    Tokenizes tweets, handles slangs, reduces elongated words,
    and retains @ mentions and stopwords.
    """
    tokenized_tweets = []
    
    for tweet in tweets:
        # Split words in the tweet
        words = tweet.split()
        processed_words = []
        
        for word in words:
            # Remove URLs
            word = re.sub(r"http\S+|www\S+", "", word)
            
            # Remove hashtags (#), but keep the word
            if word.startswith("#"):
                word = word[1:]
            
            # Expand slangs
            word = expand_slangs(word, slang_dict)
            
            # Reduce elongated words
            word = reduce_elongated_words(word)
            
            # Keep @ mentions and punctuation intact
            processed_words.append(word)
        
        tokenized_tweets.append(processed_words)
    
    return tokenized_tweets

# Load the slang dictionary from the GitHub URL
slang_dict_url = "https://raw.githubusercontent.com/haierlord/resource/master/slangs"
slang_dict = load_slang_dict(slang_dict_url)

In [54]:
# translate tweets to a sequence of numbers
def encod_tweets(tweets):
    tokenizer = Tokenizer(filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n', split=" ", lower=True)
    tokenizer.fit_on_texts(tweets)
    return tokenizer, tokenizer.texts_to_sequences(tweets)

In [55]:
# apply padding to dataset and convert labels to bitmaps
def format_data(encoded_tweets, max_length, labels):
    x = pad_sequences(encoded_tweets, maxlen= max_length, padding='post', truncating='post')
    y = []
    for emoji in labels:
        emoji_index = int(emoji)
        bit_vec = np.zeros(20)
        bit_vec[emoji_index] = 1
        y.append(bit_vec)
    y = np.asarray(y)
    return x, y

In [56]:
# create weight matrix from pre trained embeddings
def create_weight_matrix(vocab, raw_embeddings):
    vocab_size = len(vocab) + 1
    weight_matrix = np.zeros((vocab_size, 300))
    for word, idx in vocab.items():
        if word in raw_embeddings:
            weight_matrix[idx] = raw_embeddings[word]
    return weight_matrix

In [59]:
from tensorflow.keras.layers import Embedding, LSTM, Bidirectional, Dense, Dropout, Attention, Layer, Concatenate
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback, ReduceLROnPlateau
from sklearn.metrics import f1_score
import numpy as np
import tensorflow.keras.backend as K 

class F1ScoreCallback(Callback):
    def __init__(self, x_val, y_val):
        super(F1ScoreCallback, self).__init__()
        self.x_val = x_val
        self.y_val = y_val

    def on_epoch_end(self, epoch, logs=None):
        y_pred = np.argmax(self.model.predict(self.x_val), axis=-1)
        y_true = np.argmax(self.y_val, axis=-1)
        f1 = f1_score(y_true, y_pred, average='macro')
        print(f' - val_f1: {f1}')
        logs['val_f1'] = f1

# Attention Layer
class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1]), initializer="random_normal", trainable=True)
        self.b = self.add_weight(shape=(input_shape[-1],), initializer="zeros", trainable=True)
        self.u = self.add_weight(shape=(input_shape[-1], 1), initializer="random_normal", trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        q = K.tanh(K.dot(inputs, self.W) + self.b) 
        a = K.dot(q, self.u) 
        a = K.squeeze(a, -1)
        a = K.softmax(a)
        a = K.expand_dims(a, -1)
        output = inputs * a
        return K.sum(output, axis=1)

# Modified model
def final_model(weight_matrix, vocab_size, max_length, x_train, y_train, x_val, y_val, embedding_dim=300, lstm_units=512, epochs=5, learning_rate=0.01):
    # Embedding layer
    embedding_layer = Embedding(
        input_dim=vocab_size,
        output_dim=embedding_dim,
        weights=[weight_matrix],
        input_length=max_length,
        trainable=True,
        mask_zero=False
    )

    # Model architecture with BiLSTM and Attention
    model = Sequential()
    model.add(embedding_layer)
    
    model.add(Dropout(0.3))
    
    # Bidirectional LSTM layer
    model.add(Bidirectional(LSTM(lstm_units, return_sequences=True)))
    
    # Attention mechanism
    model.add(AttentionLayer())
    
    # MLP layer for final classification
    model.add(Dense(200, activation='relu'))
    model.add(Dropout(0.3))
    model.add(Dense(20, activation='softmax'))  # Assuming 20 classes for emoji classification
    
    # Compile model with Adam optimizer and learning rate
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
    # Callbacks
    lr_reduction = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)
    f1_callback = F1ScoreCallback(x_val, y_val)

    # Train the model
    model.fit(
        x_train, y_train,
        epochs=epochs,
        validation_data=(x_val, y_val),
        callbacks=[lr_reduction, f1_callback]
    )

    # Evaluate model on validation set
    score = model.evaluate(x_val, y_val)
    
    return model, score


In [60]:
import math

In [64]:
tokenized_tweets = tokenize(train_data['TEXT'], slang_dict)
tokenized_tweets += tokenize(test_data['TEXT'], slang_dict)
# max_length = math.ceil(sum([len(s.split(" ")) for s in tokenized_tweets])/len(tokenized_tweets))
# tokenizer, encoded_tweets = encod_tweets(tokenized_tweets)
# max_length, len(tokenized_tweets)

# Calculate the average token length per tweet
average_length = sum([len(tokens) for tokens in tokenized_tweets]) / len(tokenized_tweets)
max_length = math.ceil(average_length)

# Encode tweets
tokenizer, encoded_tweets = encod_tweets(tokenized_tweets)

# Display results
max_length, len(tokenized_tweets)

(12, 50000)

In [65]:
x, y = format_data(encoded_tweets[:train_length], max_length, train_data['Label'])
len(x), len(y)

(45000, 45000)

In [66]:
x_test, y_test = format_data(encoded_tweets[train_length:], max_length, test_data['Label'])
len(x_test), len(y_test)

(5000, 5000)

In [67]:
vocab = tokenizer.word_index
# vocab, len(vocab)

In [68]:
from gensim.models.keyedvectors import KeyedVectors

In [69]:
word2vec_model = KeyedVectors.load_word2vec_format('/kaggle/input/swm-wordembed/model_swm_300-6-10-low.w2v', binary=False)

# Create the weight matrix
weight_matrix = create_weight_matrix(vocab, word2vec_model)

In [None]:
model, score= final_model(weight_matrix, len(vocab)+1, max_length, x, y, x_test, y_test, epochs = 50)
model, score

In [None]:
# Save the model in HDF5 format
model.save("final_model.h5")


In [None]:
# from tensorflow.keras.layers import Input, Embedding, LSTM, Bidirectional, Dense, Dropout, Concatenate
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.preprocessing.sequence import pad_sequences
# from keras.layers import Layer
# import tensorflow.keras.backend as K
# import numpy as np

# def create_final_model(
#     word_embedding_matrix, vocab_size, max_length,
#     char_vocab_size, pos_vocab_size, ner_vocab_size,
#     word_embedding_dim=300, char_embedding_dim=50, pos_embedding_dim=50, ner_embedding_dim=50,
#     lstm_hidden_size=512, mlp_hidden_size=200, learning_rate=0.01
# ):
#     # Custom Attention Layer
#     class CustomAttention(Layer):
#         def __init__(self, **kwargs):
#             super(CustomAttention, self).__init__(**kwargs)
        
#         def build(self, input_shape):
#             self.attention_weights = self.add_weight(
#                 name="attention_weights",
#                 shape=(input_shape[-1], 1),
#                 initializer="random_normal",
#                 trainable=True,
#             )
#             super(CustomAttention, self).build(input_shape)
        
#         def call(self, inputs):
#             attention_logits = K.dot(inputs, self.attention_weights)  # Compute attention logits
#             attention_logits = K.squeeze(attention_logits, axis=-1)  # Remove the extra dimension
#             attention_scores = K.softmax(attention_logits)  # Compute attention scores
#             weighted_input = inputs * K.expand_dims(attention_scores, axis=-1)  # Apply attention scores
#             output = K.sum(weighted_input, axis=1)  # Aggregate
#             return output

#         def compute_output_shape(self, input_shape):
#             return (input_shape[0], input_shape[-1])

#     # Input layers for word, char, POS, and NER embeddings
#     word_input = Input(shape=(max_length,), name="word_input")
#     char_input = Input(shape=(max_length,), name="char_input")
#     pos_input = Input(shape=(max_length,), name="pos_input")
#     ner_input = Input(shape=(max_length,), name="ner_input")

#     # Word embedding layer using pre-trained word embeddings
#     word_embedding_layer = Embedding(
#         input_dim=vocab_size,
#         output_dim=word_embedding_dim,
#         weights=[word_embedding_matrix],
#         input_length=max_length,
#         trainable=False,
#         mask_zero=True
#     )(word_input)

#     # Char, POS, and NER embedding layers
#     char_embedding_layer = Embedding(
#         input_dim=char_vocab_size,
#         output_dim=char_embedding_dim,
#         input_length=max_length,
#         trainable=True
#     )(char_input)

#     pos_embedding_layer = Embedding(
#         input_dim=pos_vocab_size,
#         output_dim=pos_embedding_dim,
#         input_length=max_length,
#         trainable=True
#     )(pos_input)

#     ner_embedding_layer = Embedding(
#         input_dim=ner_vocab_size,
#         output_dim=ner_embedding_dim,
#         input_length=max_length,
#         trainable=True
#     )(ner_input)

#     # Concatenate all embeddings
#     concatenated_embeddings = Concatenate(axis=-1)([
#         word_embedding_layer, char_embedding_layer, pos_embedding_layer, ner_embedding_layer
#     ])

#     # Bi-Directional LSTM to process concatenated embeddings
#     bi_lstm_output = Bidirectional(LSTM(
#         lstm_hidden_size,
#         return_sequences=True
#     ))(concatenated_embeddings)

#     # Custom Attention Mechanism
#     sentence_representation = CustomAttention()(bi_lstm_output)

#     # Multi-layer Perceptron (MLP)
#     dense_layer = Dense(mlp_hidden_size, activation="relu")(sentence_representation)
#     output_layer = Dense(20, activation="softmax", name="output_layer")(dense_layer)

#     # Compile the model
#     model = Model(inputs=[word_input, char_input, pos_input, ner_input], outputs=output_layer)
#     optimizer = Adam(learning_rate=learning_rate)
#     model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])

#     return model


In [None]:
# from tensorflow.keras.preprocessing.sequence import pad_sequences

# # Helper function to preprocess sequences
# def encode_and_pad(sequences, tokenizer, max_length):
#     encoded_sequences = tokenizer.texts_to_sequences(sequences)
#     padded_sequences = pad_sequences(encoded_sequences, maxlen=max_length, padding="post", truncating="post")
#     return padded_sequences

# # Tokenizers for each type
# word_tokenizer = Tokenizer()
# char_tokenizer = Tokenizer()
# pos_tokenizer = Tokenizer()
# ner_tokenizer = Tokenizer()

# # Fit tokenizers
# word_tokenizer.fit_on_texts(data["train"]["words"])  # Tokenize training words
# char_tokenizer.fit_on_texts(data["train"]["chars"])  # Tokenize training characters
# pos_tokenizer.fit_on_texts(data["train"]["pos"])    # Tokenize POS tags
# ner_tokenizer.fit_on_texts(data["train"]["ner"])    # Tokenize NER tags

# # Create training inputs
# x_train_words = encode_and_pad(data["train"]["words"], word_tokenizer, max_length)
# x_train_chars = encode_and_pad(data["train"]["chars"], char_tokenizer, max_length)
# x_train_pos = encode_and_pad(data["train"]["pos"], pos_tokenizer, max_length)
# x_train_ner = encode_and_pad(data["train"]["ner"], ner_tokenizer, max_length)

# # Create validation inputs
# x_val_words = encode_and_pad(data["val"]["words"], word_tokenizer, max_length)
# x_val_chars = encode_and_pad(data["val"]["chars"], char_tokenizer, max_length)
# x_val_pos = encode_and_pad(data["val"]["pos"], pos_tokenizer, max_length)
# x_val_ner = encode_and_pad(data["val"]["ner"], ner_tokenizer, max_length)

# # Create test inputs
# x_test_words = encode_and_pad(data["test"]["words"], word_tokenizer, max_length)
# x_test_chars = encode_and_pad(data["test"]["chars"], char_tokenizer, max_length)
# x_test_pos = encode_and_pad(data["test"]["pos"], pos_tokenizer, max_length)
# x_test_ner = encode_and_pad(data["test"]["ner"], ner_tokenizer, max_length)

# # Labels
# y_train = data["train"]["labels"]
# y_val = data["val"]["labels"]
# y_test = data["test"]["labels"]


In [None]:

# # Summarize the model
# model.summary()