<a href="https://colab.research.google.com/github/naisyh/CD-FYP/blob/main/TF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install compress-fasttext




In [2]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from transformers import DistilBertTokenizer, TFDistilBertModel
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras import backend as K
import compress_fasttext

In [3]:
MAX_LEN = 128                # Max sequence length for BERT and FastText
EMBEDDING_DIM = 300          # FastText vector size
BATCH_SIZE = 16
EPOCHS = 10

In [4]:
    from google.colab import files
    uploaded = files.upload()

In [5]:
df = pd.read_csv('HateMalay Dataset.csv')  # CSV must contain 'messages' and 'hate' columns

# Extract tweets and labels
tweets = df['messages'].astype(str).tolist()
labels = df['hate'].tolist()

# Encode string labels to integers
le = LabelEncoder()
labels_enc = le.fit_transform(labels)

In [6]:
X_train, X_test, y_train, y_test = train_test_split(tweets, labels_enc, test_size=0.1, stratify=labels_enc, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, stratify=y_train, random_state=42)


In [7]:
# ============================
# 🔠 Tokenize with DistilBERT
# ============================
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-multilingual-cased')

def encode_tweets(tweets):
    return tokenizer(tweets, max_length=MAX_LEN, padding='max_length', truncation=True, return_tensors='tf')

train_encodings = encode_tweets(X_train)
val_encodings = encode_tweets(X_val)
test_encodings = encode_tweets(X_test)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/466 [00:00<?, ?B/s]

In [8]:
# ============================
# 🔤 Load FastText
# ============================
fasttext_model = compress_fasttext.models.CompressedFastTextKeyedVectors.load("fasttext-ms-mini")  # Adjust path

# Tokenize raw text into words
def tokenize(text):
    return text.split()

# Convert text to FastText embeddings
def fasttext_embedding(tweet_tokens):
    vectors = []
    for token in tweet_tokens:
        vectors.append(fasttext_model[token] if token in fasttext_model else np.zeros(EMBEDDING_DIM))
    if len(vectors) < MAX_LEN:
        vectors += [np.zeros(EMBEDDING_DIM)] * (MAX_LEN - len(vectors))
    else:
        vectors = vectors[:MAX_LEN]
    return np.array(vectors)

# Prepare FastText inputs
X_train_fasttext = np.array([fasttext_embedding(tokenize(t)) for t in X_train])
X_val_fasttext = np.array([fasttext_embedding(tokenize(t)) for t in X_val])
X_test_fasttext = np.array([fasttext_embedding(tokenize(t)) for t in X_test])




In [9]:
# ============================
# 🧠 Capsule Layer Definition
# ============================
def squash(vectors, axis=-1):
    s_squared_norm = tf.reduce_sum(tf.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + K.epsilon())
    return scale * vectors

class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsules, dim_capsule, routings=3, **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsules = num_capsules
        self.dim_capsule = dim_capsule
        self.routings = routings

    def build(self, input_shape):
        self.W = self.add_weight(shape=[input_shape[-1], self.num_capsules * self.dim_capsule],
                                 initializer='glorot_uniform', trainable=True)

    def call(self, inputs):
        u_hat = tf.tensordot(inputs, self.W, axes=1)  # Linear transformation
        u_hat = tf.reshape(u_hat, (-1, inputs.shape[1], self.num_capsules, self.dim_capsule))
        u_hat = tf.transpose(u_hat, perm=[0, 2, 1, 3])
        b = tf.zeros_like(u_hat[..., 0])  # Routing logits
        for i in range(self.routings):
            c = tf.nn.softmax(b, axis=1)
            s = tf.reduce_sum(c[..., tf.newaxis] * u_hat, axis=2)
            v = squash(s)
            if i < self.routings - 1:
                b += tf.reduce_sum(u_hat * v[:, :, tf.newaxis, :], axis=-1)
        return tf.reshape(v, (-1, self.num_capsules * self.dim_capsule))


In [11]:

# # ============================
# # 🔧 Build Model Architecture
# # ============================
# input_ids = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name='input_ids')
# attention_mask = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name='attention_mask')
# fasttext_input = layers.Input(shape=(MAX_LEN, EMBEDDING_DIM), dtype=tf.float32, name='fasttext_input')

# # --- DistilBERT branch ---
# distilbert_model = TFDistilBertModel.from_pretrained('distilbert-base-multilingual-cased')
# distilbert_outputs = distilbert_model(input_ids, attention_mask=attention_mask)[0]
# conv = layers.Conv1D(filters=64, kernel_size=3, activation='relu')(distilbert_outputs)
# capsule = CapsuleLayer(num_capsules=10, dim_capsule=16)(conv)  # Output shape: (None, 160)

# # --- FastText branch ---
# bi_gru = layers.Bidirectional(layers.GRU(128, return_sequences=True))(fasttext_input)
# attention = layers.Attention()([bi_gru, bi_gru])
# attention_output = tf.reduce_sum(attention * bi_gru, axis=1)  # Output shape: (None, 256)

# # --- Merge both channels ---
# concat = layers.Concatenate()([capsule, attention_output])  # Shape: (None, 416)
# fc1 = layers.Dense(200, activation='relu')(concat)
# fc2 = layers.Dense(100, activation='relu')(fc1)
# output = layers.Dense(2, activation='softmax')(fc2)  # Binary classification output

# # Create Model
# model = Model(inputs=[input_ids, attention_mask, fasttext_input], outputs=output)
# model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
#               loss='sparse_categorical_crossentropy',
#               metrics=['accuracy'])
# model.summary()


Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFDistilBertModel: ['vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_transform.bias']
- This IS expected if you are initializing TFDistilBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFDistilBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDistilBertModel for predictions without further training.


ValueError: Exception encountered when calling layer 'tf_distil_bert_model_1' (type TFDistilBertModel).

Data of type <class 'keras.src.backend.common.keras_tensor.KerasTensor'> is not allowed only (<class 'tensorflow.python.framework.tensor.Tensor'>, <class 'bool'>, <class 'int'>, <class 'transformers.utils.generic.ModelOutput'>, <class 'tuple'>, <class 'list'>, <class 'dict'>, <class 'numpy.ndarray'>) is accepted for attention_mask.

Call arguments received by layer 'tf_distil_bert_model_1' (type TFDistilBertModel):
  • input_ids=<KerasTensor shape=(None, 128), dtype=int32, sparse=False, name=input_ids>
  • attention_mask=<KerasTensor shape=(None, 128), dtype=int32, sparse=False, name=attention_mask>
  • head_mask=None
  • inputs_embeds=None
  • output_attentions=None
  • output_hidden_states=None
  • return_dict=None
  • training=False

In [18]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from transformers import TFDistilBertModel

# Define your constants
MAX_LEN = 128
EMBEDDING_DIM = 300

# Define inputs
input_ids = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name='input_ids')
attention_mask = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name='attention_mask')
fasttext_input = layers.Input(shape=(MAX_LEN, EMBEDDING_DIM), dtype=tf.float32, name='fasttext_input')

# --- DistilBERT branch ---
distilbert_model = TFDistilBertModel.from_pretrained('distilbert-base-multilingual-cased')

# Custom layer to wrap the DistilBERT model
class DistilBertLayer(layers.Layer):
    def __init__(self, model, **kwargs):
        super(DistilBertLayer, self).__init__(**kwargs)
        self.model = model

    def call(self, inputs):
        input_ids, attention_mask = inputs
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        return outputs[0]  # Return the last hidden state

# Create an instance of the custom layer
distilbert_layer = DistilBertLayer(distilbert_model)

# Pass the Keras Input layers directly to the custom layer
distilbert_outputs = distilbert_layer([input_ids, attention_mask])

# Apply convolution and capsule layers on the DistilBERT outputs
conv = layers.Conv1D(filters=64, kernel_size=3, activation='relu')(distilbert_outputs)
capsule = CapsuleLayer(num_capsules=10, dim_capsule=16)(conv)  # Output shape: (None, 160)

# --- FastText branch ---
bi_gru = layers.Bidirectional(layers.GRU(128, return_sequences=True))(fasttext_input)
attention = layers.Attention()([bi_gru, bi_gru])

# Custom layer to handle the attention output
class AttentionOutputLayer(layers.Layer):
    def call(self, inputs):
        bi_gru, attention = inputs
        return tf.reduce_sum(attention * bi_gru, axis=1)  # Output shape: (None, 256)

# Create an instance of the custom attention output layer
attention_output_layer = AttentionOutputLayer()

# Pass the Keras Input layers directly to the custom layer
attention_output = attention_output_layer([bi_gru, attention])

# --- Merge both channels ---
concat = layers.Concatenate()([capsule, attention_output])  # Shape: (None, 416)
fc1 = layers.Dense(200, activation='relu')(concat)
fc2 = layers.Dense(100, activation='relu')(fc1)
output = layers.Dense(2, activation='softmax')(fc2)  # Binary classification output

# Create Model
model = Model(inputs=[input_ids, attention_mask, fasttext_input], outputs=output)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()


Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFDistilBertModel: ['vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_transform.bias']
- This IS expected if you are initializing TFDistilBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFDistilBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDistilBertModel for predictions without further training.


In [19]:

# ============================
# 🚀 Train the Model
# ============================
history = model.fit(
    {'input_ids': train_encodings['input_ids'],
     'attention_mask': train_encodings['attention_mask'],
     'fasttext_input': X_train_fasttext},
    np.array(y_train),
    validation_data=(
        {'input_ids': val_encodings['input_ids'],
         'attention_mask': val_encodings['attention_mask'],
         'fasttext_input': X_val_fasttext},
        np.array(y_val)
    ),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE
)


Epoch 1/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1064s[0m 4s/step - accuracy: 0.6341 - loss: 0.6537 - val_accuracy: 0.6848 - val_loss: 0.6182
Epoch 2/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1036s[0m 4s/step - accuracy: 0.6923 - loss: 0.5945 - val_accuracy: 0.6757 - val_loss: 0.6012
Epoch 3/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1042s[0m 4s/step - accuracy: 0.7144 - loss: 0.5718 - val_accuracy: 0.6780 - val_loss: 0.5905
Epoch 4/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1042s[0m 4s/step - accuracy: 0.7086 - loss: 0.5636 - val_accuracy: 0.6712 - val_loss: 0.5921
Epoch 5/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1043s[0m 4s/step - accuracy: 0.7228 - loss: 0.5449 - val_accuracy: 0.6871 - val_loss: 0.5910
Epoch 6/10
[1m248/248[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1045s[0m 4s/step - accuracy: 0.7243 - loss: 0.5388 - val_accuracy: 0.6757 - val_loss: 0.6029
Epoch 7/10
[1m2

In [25]:
model.summary()


In [26]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [30]:
model.save('my_model.keras')

NotImplementedError: 
Object DistilBertLayer was created by passing
non-serializable argument values in `__init__()`,
and therefore the object must override `get_config()` in
order to be serializable. Please implement `get_config()`.

Example:

class CustomLayer(keras.layers.Layer):
    def __init__(self, arg1, arg2, **kwargs):
        super().__init__(**kwargs)
        self.arg1 = arg1
        self.arg2 = arg2

    def get_config(self):
        config = super().get_config()
        config.update({
            "arg1": self.arg1,
            "arg2": self.arg2,
        })
        return config

In [None]:

# ============================
# 📊 Evaluate the Model
# ============================
test_preds = model.predict({
    'input_ids': test_encodings['input_ids'],
    'attention_mask': test_encodings['attention_mask'],
    'fasttext_input': X_test_fasttext
})
test_pred_labels = np.argmax(test_preds, axis=1)

# Print classification report
print(classification_report(y_test, test_pred_labels, target_names=le.classes_))
