In [1]:
#%pip install tensorflow==2.16.1 



In [2]:
import os
import numpy as np
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import ops
import tensorflow as tf
from keras import layers
from collections import Counter
from camel_tools.tokenizers.word import simple_word_tokenize
import camel_tools



In [3]:
print(tf.__version__)


2.16.1


## Utility Classes

## Build layers

Let's start by defining a TransformerBlock layer:

In [7]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.ffn = keras.Sequential(
            [
                keras.layers.Dense(ff_dim, activation="relu"),
                keras.layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

Next, let's define a TokenAndPositionEmbedding layer:

In [9]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = keras.layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.pos_emb = keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, inputs):
        maxlen = ops.shape(inputs)[-1]
        positions = ops.arange(start=0, stop=maxlen, step=1)
        position_embeddings = self.pos_emb(positions)
        token_embeddings = self.token_emb(inputs)
        return token_embeddings + position_embeddings

### Build the NER model class as a keras.Model subclass


In [11]:
class NERModel(keras.Model):
    def __init__(
        self, num_tags, vocab_size, maxlen=128, embed_dim=32, num_heads=2, ff_dim=32
    ):
        super().__init__()
        self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
        self.dropout1 = layers.Dropout(0.1)
        self.ff = layers.Dense(ff_dim, activation="relu")
        self.dropout2 = layers.Dropout(0.1)
        self.ff_final = layers.Dense(num_tags, activation="softmax")

    def call(self, inputs, training=False):
        x = self.embedding_layer(inputs)
        x = self.transformer_block(x)
        x = self.dropout1(x, training=training)
        x = self.ff(x)
        x = self.dropout2(x, training=training)
        x = self.ff_final(x)
        return x

## Make the NER label lookup table


In [13]:
def make_tag_lookup_table():
    iob_labels = ["B", "I"]
    ner_labels = ["PER", "ORG", "LOC", "EVE", 'NUM', 'MON', 'LAN', 'TIME']
    all_labels = [(label1, label2) for label2 in ner_labels for label1 in iob_labels]
    all_labels = ["-".join([a, b]) for a, b in all_labels]
    all_labels = ["[PAD]", "O"] + all_labels
    return dict(zip(range(0, len(all_labels) + 1), all_labels))

mapping = make_tag_lookup_table()
print(mapping)

{0: '[PAD]', 1: 'O', 2: 'B-PER', 3: 'I-PER', 4: 'B-ORG', 5: 'I-ORG', 6: 'B-LOC', 7: 'I-LOC', 8: 'B-EVE', 9: 'I-EVE', 10: 'B-NUM', 11: 'I-NUM', 12: 'B-MON', 13: 'I-MON', 14: 'B-LAN', 15: 'I-LAN', 16: 'B-TIME', 17: 'I-TIME'}


Get a list of all tokens in the training dataset. This will be used to create the vocabulary.

In [15]:
def read_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        sentences, tags = [], []
        sentence, tag = [], []
        for line in file:
            line = line.strip()
            if line:
                word, label = line.rsplit(' ', 1)
                sentence.append(word)
                tag.append(label)
            else:
                if sentence:
                    sentences.append(sentence)
                    tags.append(tag)
                sentence, tag = [], []
        if sentence:
            sentences.append(sentence)
            tags.append(tag)
    return sentences, tags

training_set_path = 'C:/Users/TESTUSER/Desktop/UniversityCoursesFiles/uniYear4/AI/ArabicNamedEntityRecognition/code/data/train_cleaned.txt'
validation_set_path = 'C:/Users/TESTUSER/Desktop/UniversityCoursesFiles/uniYear4/AI/ArabicNamedEntityRecognition/code/data/val_cleaned.txt'

train_sentences, train_tags = read_data(training_set_path)
val_sentences, val_tags = read_data(validation_set_path)

### Tokenize sentences using CAMeL Tools


In [17]:
all_tokens = []
for sentence in train_sentences + val_sentences:
    all_tokens.extend(simple_word_tokenize(' '.join(sentence)))

In [18]:
counter = Counter(all_tokens)
vocab_size = 44561  # Adjust if needed
vocabulary = [token for token, count in counter.most_common(vocab_size - 2)]  # -2 for [UNK] and [PAD]

lookup_layer = keras.layers.StringLookup(vocabulary=vocabulary)
tag_lookup = keras.layers.StringLookup(vocabulary=list(mapping.values()), num_oov_indices=0, mask_token=None)

### Process data for model training


In [20]:
def process_data(sentences, tags):
    token_ids = []
    tag_ids = []
    for sentence, tag_seq in zip(sentences, tags):
        tokenized_sentence = simple_word_tokenize(' '.join(sentence))
        token_ids.append(lookup_layer(tokenized_sentence))
        tag_ids.append(tag_lookup(tag_seq))
    return token_ids, tag_ids

train_token_ids, train_tag_ids = process_data(train_sentences, train_tags)
val_token_ids, val_tag_ids = process_data(val_sentences, val_tags)

 ### Create Dataset

In [22]:
def pad_sequences(sequences, max_len=None):
    if max_len is None:
        max_len = max(len(seq) for seq in sequences)
    padded_seqs = []
    for seq in sequences:
        padded = seq.numpy().tolist() + [0] * (max_len - len(seq))  # Assuming 0 is your padding token
        padded_seqs.append(padded)
    return tf.ragged.constant(padded_seqs).to_tensor()

# Use this in your make_dataset function
def make_dataset(token_ids, tag_ids, batch_size):
    # Find the max length for both token_ids and tag_ids
    max_len = max(max(len(seq) for seq in token_ids), max(len(seq) for seq in tag_ids))
    
    padded_token_ids = pad_sequences(token_ids, max_len)
    padded_tag_ids = pad_sequences(tag_ids, max_len)
    
    dataset = tf.data.Dataset.from_tensor_slices((padded_token_ids, padded_tag_ids))
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(tf.data.AUTOTUNE)

batch_size = 32
train_dataset = make_dataset(train_token_ids, train_tag_ids, batch_size)
val_dataset = make_dataset(val_token_ids, val_tag_ids, batch_size)

### Model compilation and training


In [24]:
for tokens, tags in train_dataset.take(1):
    print(tokens.shape, tags.shape)
    print(tokens.numpy(), tags.numpy())  # Check if you see newline characters or unexpected patterns

(32, 270) (32, 270)
[[   65  6483 23781 ...     0     0     0]
 [   29   510  2863 ...     0     0     0]
 [   65   928  2798 ...     0     0     0]
 ...
 [  570   431   651 ...     0     0     0]
 [23800     8   663 ...     0     0     0]
 [  416  9391     7 ...     0     0     0]] [[1 1 1 ... 0 0 0]
 [1 1 2 ... 0 0 0]
 [1 1 1 ... 0 0 0]
 ...
 [1 1 1 ... 0 0 0]
 [1 1 2 ... 0 0 0]
 [1 1 1 ... 0 0 0]]


In [None]:
num_tags = len(mapping)

ner_model = NERModel(num_tags, vocab_size, maxlen=512, embed_dim=32, num_heads=4, ff_dim=64)
class CustomNonPaddingTokenLoss(keras.losses.Loss):
    def __init__(self, name="custom_ner_loss"):
        super().__init__(name=name)

    def call(self, y_true, y_pred):
        loss_fn = keras.losses.SparseCategoricalCrossentropy(
            from_logits=False, reduction=tf.keras.losses.Reduction.NONE
        )
        loss = loss_fn(y_true, y_pred)
        mask = ops.cast((y_true > 0), dtype="float32")
        loss = loss * mask
        return ops.sum(loss) / ops.sum(mask)

loss = CustomNonPaddingTokenLoss()

tf.config.run_functions_eagerly(False)
#ner_model.compile(optimizer="adam", loss=loss)
ner_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.1), loss=loss)
ner_model.fit(train_dataset, validation_data=val_dataset, epochs=20)

Epoch 1/20
[1m445/723[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m21s[0m 78ms/step - loss: 1.7508

In [30]:
# tf.config.run_functions_eagerly(True)
# ner_model.compile(optimizer="adam", loss=loss)
# ner_model.fit(train_dataset, epochs=10)


# def tokenize_and_convert_to_ids(text):
#     tokens = text.split()
#     return lowercase_and_convert_to_ids(tokens)


# # Sample inference using the trained model
# sample_input = tokenize_and_convert_to_ids(
#     "eu rejects german call to boycott british lamb"
# )
# sample_input = ops.reshape(sample_input, shape=[1, -1])
# print(sample_input)

# output = ner_model.predict(sample_input)
# prediction = np.argmax(output, axis=-1)[0]
# prediction = [mapping[i] for i in prediction]

# # eu -> B-ORG, german -> B-MISC, british -> B-MISC
# print(prediction)

TypeError: reshape() got an unexpected keyword argument 'shape'

In [36]:
def tokenize_and_convert_to_ids(text):
    tokens = simple_word_tokenize(text)
    return lookup_layer(tokens)

sample_text = "صورة من فئة 500 مليون خلال فترة الانتداب البريطاني على فلسطين."
sample_input = tokenize_and_convert_to_ids(sample_text)
sample_input = tf.reshape(sample_input, shape=[1, -1])

output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]
print(prediction)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 401ms/step
['O', 'O', 'O', 'I-NUM', 'O', 'O', 'O', 'B-EVE', 'I-EVE', 'O', 'I-ORG', 'O']
