<a href="https://colab.research.google.com/github/jumana-nounou/lightweightNED/blob/main/bach_system_spacy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installation

In [None]:
!pip install transformers
!git clone https://github.com/aub-mind/arabert
!pip install pyarabic
!pip install farasapy
!pip install sentencepiece
!pip install sacremoses

fatal: destination path 'arabert' already exists and is not an empty directory.


## Imports

In [None]:
import json
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, AutoModel
# from preprocess import NEVER_SPLIT_TOKENS,ArabertPreprocessor
from arabert.preprocess import NEVER_SPLIT_TOKENS,ArabertPreprocessor
from farasa.segmenter import FarasaSegmenter
import torch
import requests
from transformers import MBartForConditionalGeneration, MBart50Tokenizer
from transformers import AutoModelForTokenClassification
import re
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
from pyarabic.araby import tokenize
from transformers import MarianMTModel, MarianTokenizer
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import numpy as np
import spacy


In [None]:
from google.colab import drive

drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import sys
sys.path.insert(0,'/content/arabert')
print(sys.path)

['/content/arabert', '/content/arabert', '/content', '/env/python', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.10/dist-packages/IPython/extensions', '/root/.ipython']


In [None]:
!ls /content/arabert

arabert		  AraELECTRA.png  examples	 __pycache__	   setup.py
arabert_logo.png  aragpt2	  __init__.py	 README.md
araelectra	  AraGPT2.png	  preprocess.py  requirements.txt


## Paths


In [None]:
train_file_path = '/content/drive/MyDrive/Dataset/train.txt'
test_file_path = '/content/drive/MyDrive/Dataset/test.txt'
# val_file_path = '/content/drive/MyDrive/Bachelor/NER/val.txt'


## Functions

Data Cleaning functions

In [None]:
def read_file_with_sentences(file_path):
    data = []
    current_sentence = []
    current_sentence_text = []  # Store the text of the current sentence

    with open(file_path, 'r', encoding='utf-8') as file:
        word_index = 0  # Initialize word index counter
        for line in file:
            line = line.strip()
            if line:
                last_space_index = line.rfind(' ')
                text = line[:last_space_index]
                label = line[last_space_index + 1:]
                current_sentence.append({'word': text.strip(), 'tag': label, 'word_index': word_index})  # Include word index
                current_sentence_text.append(text)  # Collect the text of the word
                word_index += 1  # Increment word index
            else:
                if current_sentence:
                    # Append each word, its tag, and the sentence it belongs to as a dictionary
                    sentence_text = ' '.join(current_sentence_text).replace('O', '')
                    for word_info in current_sentence:
                        word_info['sentence'] = sentence_text
                    data.extend(current_sentence)
                    current_sentence = []
                    current_sentence_text = []  # Reset the text of the sentence
                    word_index = 0  # Reset word index for the next sentence

    return data


In [None]:
def combine_phrases(data):
    combined_data = []
    current_phrase = {'word': '', 'tag': '', 'sentence': '','word_index':''}

    for entry in data:
        if entry['tag'].startswith('B-'):  # Beginning of a new phrase
            if current_phrase['word']:
                combined_data.append(current_phrase.copy())
                current_phrase = {'word': '', 'tag': '', 'sentence': '','word_index':''}

            current_phrase['word'] += entry['word'] + ' '
            current_phrase['tag'] = entry['tag'][2:]  # Remove 'B-' from the tag
            current_phrase['sentence'] = entry['sentence']
            current_phrase['word_index'] = entry['word_index']
        elif entry['tag'].startswith('I-'):  # Inside a phrase
            current_phrase['word'] += entry['word'] + ' '
        else:  # Outside a phrase
            if current_phrase['word']:
                combined_data.append(current_phrase.copy())
                current_phrase = {'word': '', 'tag': '', 'sentence': '','word_index':''}

    # Append the last phrase if present
    if current_phrase['word']:
        combined_data.append(current_phrase)

    return combined_data


In [None]:
def filter_non_O_tags(data):
    filtered_data = [point for point in data if point.get('tag') != 'O']
    return filtered_data


In [None]:
def remove_preposition(word):
    prepositions = ["و","ب", "ك", "ل", "ف"]
    if word[0] in prepositions:
        return word[1:]
    else:
        return word


Wikidata retrieval

In [None]:
def get_wikidata_candidates(entity):
    url = f"https://www.wikidata.org/w/api.php?action=wbsearchentities&search={entity}&language=ar&format=json"
    response = requests.get(url)
    data = response.json()

    candidates = []
    if data.get('search'):
        candidates = [result['id'] for result in data['search']]
    else:
        # Translate the word to English and try getting candidates
        translated_word = translate_to_english(entity)
        if translated_word:
            url = f"https://www.wikidata.org/w/api.php?action=wbsearchentities&search={translated_word}&language=en&format=json"
            response = requests.get(url)
            data = response.json()
            if data.get('search'):
                candidates = [result['id'] for result in data['search']]
            else:
                # If still no candidates, try removing prepositions and get candidates
                updated_word = remove_preposition(entity)
                url = f"https://www.wikidata.org/w/api.php?action=wbsearchentities&search={updated_word}&language=ar&format=json"
                response = requests.get(url)
                data = response.json()
                if data.get('search'):
                    candidates = [result['id'] for result in data['search']]

    return candidates

In [None]:
def get_claim_value(claim):
    mainsnak = claim.get('mainsnak', {})
    datavalue = mainsnak.get('datavalue', {})
    value = datavalue.get('value', {})
    if 'id' in value:
        return value['id']
    elif 'time' in value:
        return value['time']
    return str(value)

def get_entity_labels(entity_ids):
    entity_labels = {}
    url = "https://www.wikidata.org/w/api.php"
    params = {
        "action": "wbgetentities",
        "ids": "|".join(entity_ids),
        "props": "labels",
        "languages": "ar|en",  # Prioritize Arabic then English
        "format": "json"
    }
    response = requests.get(url, params=params)
    data = response.json()
    if 'entities' in data:
        for entity_id, entity_info in data['entities'].items():
            # Prioritize Arabic labels, if available
            label = entity_info['labels']['ar']['value'] if 'ar' in entity_info['labels'] else ''
            # If Arabic label is not available, use English label
            if not label and 'en' in entity_info['labels']:
                label = translate_to_arabic(entity_info['labels']['en']['value'])
            entity_labels[entity_id] = label
    return entity_labels

def get_labels_of_entities(entity_ids):
    entity_labels = get_entity_labels(entity_ids)
    return [entity_labels.get(entity_id, '') for entity_id in entity_ids]

def get_entity_data(entity_id):
    url = f"https://www.wikidata.org/wiki/Special:EntityData/{entity_id}.json"
    response = requests.get(url)
    data = response.json()

    if 'entities' in data and entity_id in data['entities']:
        entity_data = data['entities'][entity_id]
        label_en = entity_data['labels'].get('en', {}).get('value', 'Label not found')
        label_ar = entity_data['labels'].get('ar', {}).get('value', '')
        label = label_ar if label_ar else translate_to_arabic(label_en)

        entity_description_en = entity_data['descriptions'].get('en', {}).get('value', 'Description not found')
        entity_description_ar = entity_data['descriptions'].get('ar', {}).get('value', '')
        description = entity_description_ar if entity_description_ar else translate_to_arabic(entity_description_en)

        claims = entity_data.get('claims', {})
        relevant_properties = {
            'instance of': 'P31',
            'المهنة': 'P106',
            'فئة فرعية من': 'P279',
            'لديه جزء': 'P527',
            'جزء من': 'P361',
            'تقع في': 'P131',
            'البلد': 'P17',
            'رمز العملة': 'P478',
            'رمز العملة': 'P506',
            'فئة': 'P3961',
            'تاريخ التأسيس': 'P571',
            'موقع المقر': 'P159',
            'legal form': 'P1454',
            'key people': 'P3320',        }

        properties_description = []
        occupation_entity_ids = []
        instance_label = ''

        for prop_label, prop_id in relevant_properties.items():
            if prop_id in claims and prop_label != 'instance of':  # Exclude 'instance of' from description
                prop_values = [get_claim_value(claim) for claim in claims[prop_id]]
                if prop_label == 'occupation':
                    occupation_entity_ids.extend(prop_values)
                # Fetch the labels of entities instead of their IDs
                prop_labels = get_labels_of_entities([str(entity_id) for entity_id in prop_values])
                properties_description.append(f"{prop_label}: {', '.join(prop_labels)}")

        occupation_labels = {}
        if occupation_entity_ids:
            occupation_labels = get_entity_labels([str(entity_id) for entity_id in occupation_entity_ids])

        for idx, prop_value in enumerate(properties_description):
            if prop_value.startswith('occupation'):
                occupation_entities = prop_value.split(' ')[1].split(' ')
                occupation_labels_str = ', '.join(occupation_labels.get(occ_entity, occ_entity) for occ_entity in occupation_entities)
                properties_description[idx] = f"occupation: {occupation_labels_str}"

        entity_description = f"{label} {description}"
        entity_description += ' '.join(properties_description)

        instance_of = claims.get('P31', [])
        instance_values = [get_claim_value(instance) for instance in instance_of]
        if 'Q5' in instance_values:  # Q5 represents human
            instance_label = "PERS"
        elif 'Q11424' in instance_values:  # Q11424 represents film
            instance_label = "MISC"
        elif 'Q2221906' in instance_values:  # Q2221906 represents geographical location
            instance_label = "LOC"
        elif 'Q4917' in instance_values:  # Q4917 represents currency
            instance_label = "MISC"
        elif 'Q43229' in instance_values:  # Q43229 represents organization
            instance_label = "ORG"

        return {
            'entity_ID':entity_id,
            'entity_label': label,
            'description': entity_description,
            'instance_label': instance_label
        }
    else:
        return {
            'entity_ID':entity_id,
            'entity_label': '',
            'description': "Entity not found in Wikidata",
            'instance_label': ''
        }

In [None]:
def translate_to_arabic(text_to_translate):
    # Load the English to Arabic translation model and tokenizer
    model_name = "Helsinki-NLP/opus-mt-en-ar"
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)

    # Tokenize the text
    inputs = tokenizer(text_to_translate, return_tensors="pt", padding=True, truncation=False)

    # Translate the text
    translated = model.generate(**inputs, max_length=128, num_beams=4, early_stopping=True)

    # Decode the translated text
    translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)

    return translated_text


In [None]:
def generate_candidates_and_get_their_data(data_structure):
    for entry in data_structure:
        word = entry['word']
        candidates = get_wikidata_candidates(word)
        entry['candidates'] = []

        for candidate in candidates:
            entity_data = get_entity_data(candidate)
            if entity_data:
                label_value = entity_data['entity_label']
                description_value = entity_data['description']
                ner_tag = entity_data['instance_label']

                entry['candidates'].append({
                    'entity': candidate,
                    'label': label_value,
                    'description': description_value,
                    'ner_tag': ner_tag if ner_tag else None  # Keep None for missing ner_tag
                })

        # Handle missing or empty ner_tag using ner_for_entities_with_candidates
        missing_ner_candidates = [c for c in entry['candidates'] if not c.get('ner_tag')]
        if missing_ner_candidates:
            updated_candidates = ner_for_entities_with_candidates([{'word': word, 'candidates': missing_ner_candidates}])
            for index, updated_candidate in enumerate(updated_candidates[0]['candidates']):
                entry['candidates'][index]['ner_tag'] = updated_candidate['ner_tag']

    return data_structure


In [None]:
def translate_to_english(text_to_translate):
    # Load the Arabic to English translation model and tokenizer
    model_name = "Helsinki-NLP/opus-mt-ar-en"
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)

    # Tokenize the text
    inputs = tokenizer(text_to_translate, return_tensors="pt", padding=True, truncation=True)

    # Translate the text
    translated = model.generate(**inputs, max_length=128, num_beams=4, early_stopping=True)

    # Decode the translated text
    translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)

    return translated_text


In [None]:
def map_spacy_labels(entity_label):
    if entity_label == "PERSON":
        return "PERS"
    elif entity_label == "GPE":
        return "LOC"
    elif entity_label == "ORG":
        return "ORG"
    else:
        return "MISC"

def ner_for_entities_with_candidates(input_data):
    nlp = spacy.load("en_core_web_sm")

    for item in input_data:
        word = item['word']
        candidates = item['candidates']

        for candidate in candidates:
            label = candidate['label']
            description = candidate['description']
            combined_text = f"{label} is {description}"

            # Translate combined text to English
            translated_text = translate_to_english(combined_text)

            # Process the translated text with spaCy for NER
            doc = nlp(translated_text)

            # Access named entities and their labels and normalize them
            ner_tag = None
            for ent in doc.ents:
                ner_tag = map_spacy_labels(ent.label_)
                break  # Take the first entity label found

            candidate['ner_tag'] = ner_tag

    return input_data


Embeddings

In [None]:
def get_word_embeddings_arabert(word, sentence):
    # Tokenize the sentence and word separately
    sentence_tokens = tokenizer.encode_plus(sentence, return_tensors='pt', max_length=32, truncation=True, padding=True)
    word_tokens = tokenizer.encode_plus(word, return_tensors='pt', max_length=32, truncation=True, padding=True)

    combined_tokens = {
        'input_ids': torch.cat((sentence_tokens['input_ids'], word_tokens['input_ids']), dim=1),
        'attention_mask': torch.cat((sentence_tokens['attention_mask'], word_tokens['attention_mask']), dim=1)
    }

    with torch.no_grad():
        output = model(**combined_tokens)
        embeddings = output.last_hidden_state.mean(dim=1)

    word_embeddings = embeddings[:, sentence_tokens.input_ids.size(1):].squeeze()

    return word_embeddings

In [None]:
def generate_contextual_embeddings(sentence):
    tokens = tokenizer.encode_plus(sentence, return_tensors='pt', max_length=128, truncation=True, padding=True)

    with torch.no_grad():
        outputs = model(**tokens)
        contextual_embeddings = outputs.last_hidden_state.mean(dim=1)

    return contextual_embeddings.squeeze()

Disambiguation

In [None]:
def get_results_of_disambiguation(entity_id):
    if not entity_id:
        return None, None

    wikidata_api_url = f"https://www.wikidata.org/w/api.php?action=wbgetentities&ids={entity_id}&format=json"
    response = requests.get(wikidata_api_url)

    if response.status_code == 200:
        data = response.json()
        entity_data = data['entities'].get(entity_id)
        if entity_data:
            label = entity_data.get('labels', {}).get('en', {}).get('value')
            description = entity_data.get('descriptions', {}).get('en', {}).get('value')
            return label, description
        else:
            return None, None
    else:
        return None, None


In [None]:
def calculate_similarity(embedding_a, embedding_b):
    if embedding_a is None or embedding_b is None or embedding_a.size(0) == 0 or embedding_b.size(0) == 0:
        return 0.0

    # Assuming embeddings are torch tensors
    if embedding_a.dim() == 1:
        embedding_a = embedding_a.unsqueeze(0)
    if embedding_b.dim() == 1:
        embedding_b = embedding_b.unsqueeze(0)

    # Printing dimensions before reshaping
    # print(f"Original Embedding A shape: {embedding_a.shape}")
    # print(f"Original Embedding B shape: {embedding_b.shape}")

    # Reshaping embeddings if necessary
    if embedding_a.dim() == 2 and embedding_b.dim() == 2:
        if embedding_a.size(1) != embedding_b.size(1):
            # Reshape embedding_b to match the second dimension of embedding_a (e.g., 768)
            desired_shape = (embedding_b.size(0), embedding_a.size(1))  # Get the desired shape
            if embedding_b.size(1) < embedding_a.size(1):
                # If embedding_b has fewer dimensions, pad with zeros
                pad_width = embedding_a.size(1) - embedding_b.size(1)
                embedding_b = torch.tensor(np.pad(embedding_b.numpy(), ((0, 0), (0, pad_width)), mode='constant'))
            else:
                # If embedding_b has more dimensions, truncate to match embedding_a's dimension
                embedding_b = embedding_b[:, :embedding_a.size(1)]

    try:
        # Calculating similarity using cosine similarity without normalization
        similarity_score = cosine_similarity(embedding_a.detach().numpy(), embedding_b.detach().numpy())[0][0]
        return similarity_score  # Return the calculated similarity score

    except Exception as e:
        print(f"Error occurred: {e}")
        return 0.0  # Return default score in case of error


In [None]:
def disambiguation_function(input_data, threshold=0.05, weight_threshold=0.01):
    result_entities = []

    for data_element in input_data:
        best_entity = "Entity not found"
        best_label = "Label not found"
        best_tag = ""
        best_description = "Description not found"
        max_combined_score = -1
        second_best_score = -1
        other_entities = []

        word_embedding = data_element.get('word_embedding')
        sentence_embedding = data_element.get('sentence_embedding')
        candidates = data_element.get('candidates')

        if word_embedding is not None and sentence_embedding is not None and candidates:
            candidates = [candidate for candidate in candidates if
                          candidate.get('label_embedding') is not None and candidate.get('description_embedding') is not None]

            entity_found = False

            for candidate in candidates:
                label_embedding = candidate.get('label_embedding')
                description_embedding = candidate.get('description_embedding')

                sentence_desc_similarity = calculate_similarity(sentence_embedding, description_embedding)

                window_size = 3

                try:
                    word_index = int(data_element.get('word_index', 0))
                except ValueError:
                    # Handle the case when the value cannot be converted to an integer
                    # Set a default value (0 or 1 in this case)
                    word_index = 0  # Change this to 1 if needed

                context_start = max(0, word_index - window_size)
                context_end = min(len(sentence_embedding), word_index + window_size + 1)
                context_embedding = sentence_embedding[context_start:context_end]

                label_word_similarity = calculate_similarity(word_embedding, context_embedding)

                context_desc_similarity = calculate_similarity(context_embedding, description_embedding)

                word_ner_tag = data_element.get('tag')
                candidate_ner_tag = candidate.get('ner_tag')

                ner_similarity = 0.0
                if word_ner_tag and candidate_ner_tag and word_ner_tag == candidate_ner_tag:
                    ner_similarity = 0.3

                combined_score = (0.4 * sentence_desc_similarity) + (
                        0.3 * context_desc_similarity) + (0.3 * label_word_similarity) + ner_similarity

                # Store entity details in other_entities
                other_entities.append({
                    'entity': candidate['entity'],
                    'label': candidate['label'],
                    'description': candidate['description'],
                    'sentence_desc_similarity': sentence_desc_similarity,
                    'context_desc_similarity': context_desc_similarity,
                    'ner_tag': candidate.get('ner_tag', '')
                })

                if combined_score > max_combined_score:
                    second_best_score = max_combined_score
                    max_combined_score = combined_score
                    best_entity = candidate['entity']
                    best_label = candidate['label']
                    best_description = candidate['description']
                    best_tag = candidate_ner_tag
                    entity_found = True

                elif combined_score > second_best_score:
                    second_best_score = combined_score

            if not entity_found:
                continue

            # Normalizing scores
            max_combined_score -= second_best_score
            second_best_score = 0

            if abs(max_combined_score) > 0:
                max_combined_score /= max_combined_score

            if abs(second_best_score) > 0:
                second_best_score /= second_best_score

            if abs(max_combined_score - second_best_score) <= threshold:
                # Further validation based on contextual and NER similarities
                if len(other_entities) >= 2:
                    contextual_similarity_difference = abs(
                        (other_entities[0]['context_desc_similarity']) - (other_entities[1]['context_desc_similarity'])
                    )

                    word_ner_tag = data_element.get('tag')

                    if (contextual_similarity_difference > weight_threshold):
                        best_entity = other_entities[1]['entity']
                        best_label = other_entities[1]['label']
                        best_description = other_entities[1]['description']
                    elif (
                        (contextual_similarity_difference <= weight_threshold)
                        and (word_ner_tag == other_entities[1]['ner_tag'])  # Check NER match with second best entity
                    ):
                        best_entity = other_entities[1]['entity']
                        best_label = other_entities[1]['label']
                        best_description = other_entities[1]['description']
                    elif (
                        (word_ner_tag == other_entities[0]['ner_tag'])
                        or (word_ner_tag == other_entities[1]['ner_tag'])
                    ):
                        best_entity = other_entities[0]['entity'] if word_ner_tag == other_entities[0]['ner_tag'] else other_entities[1]['entity']
                        best_label = other_entities[0]['label'] if word_ner_tag == other_entities[0]['ner_tag'] else other_entities[1]['label']
                        best_description = other_entities[0]['description'] if word_ner_tag == other_entities[0]['ner_tag'] else other_entities[1]['description']
                    else:
                        best_entity = other_entities[0]['entity'] if max_combined_score > second_best_score else other_entities[1]['entity']
                        best_label = other_entities[0]['label'] if max_combined_score > second_best_score else other_entities[1]['label']
                        best_description = other_entities[0]['description'] if max_combined_score > second_best_score else other_entities[1]['description']

        other_entities = [{
            'entity': entity['entity'],
            'entity_label': entity['label'],
            'entity_description': entity['description'],
            'tag':entity['ner_tag'],
            'label':0
        } for entity in other_entities]

        result_entities.append({
            'word': data_element['word'],
            'sentence': data_element['sentence'],
            'tag': data_element['tag'],
            'correctly_labelled':1,
            'best_entity': {
                'entity': best_entity,
                'entity_label': best_label,
                'entity_description': best_description,
                'tag': best_tag,
                'label':1
            },
            'other_entities': other_entities,

        })

    return result_entities


## Preprocessing

First, load your dataset using the read_file_with_sentences function


In [None]:
train_data = read_file_with_sentences(train_file_path)
test_data = read_file_with_sentences(test_file_path)
# val_data = read_file_with_sentences(val_file_path)

In [None]:

test_data = combine_phrases(test_data)
train_data = combine_phrases(train_data)
train_data=train_data+test_data

In [None]:
train_data[79]

{'word': 'حزب الله ',
 'tag': 'ORG',
 'sentence': 'عربيا دعا الرئيس السوري بشار الأسد المجتمع الدولي إلى التحرك بسرعة لترتيب وقف لإطلاق النار ينهي الأزمة بين حزب الله وإسرائيل .',
 'word_index': 18}

Data Cleaning

In [None]:
label_counts = defaultdict(int)

for data_point in train_data:
    if 'tag' not in data_point:
        print(f"Missing 'tag' field in data point {data_point}. Skipping...")
        continue
    label = data_point['tag']
    label_counts[label] += 1

# Get unique labels
all_labels = set(label_counts.keys())

# Print label counts and unique labels
for label, count in label_counts.items():
    print(f"Label: {label}, Count: {count}")

print("Unique Labels:", all_labels)


Label: LOC, Count: 4637
Label: ORG, Count: 2543
Label: PERS, Count: 3592
Label: MISC, Count: 1073
Label: , Count: 69
Unique Labels: {'', 'MISC', 'LOC', 'PERS', 'ORG'}


Drop any word that has the tag O

In [None]:
import random

filtered_train_data = filter_non_O_tags(train_data)

subset_filtered_train_data = filtered_train_data[4000:5000]
subset_filtered_train_data = random.sample(subset_filtered_train_data, 80)


In [None]:
filtered_train_data[0]


{'word': 'فرانكفورت ',
 'tag': 'LOC',
 'sentence': 'فرانكفورت (د ب أ) أعلن اتحاد صناعة السيارات في ألمانيا امس الاول أن شركات صناعة السيارات في ألمانيا تواجه عاما صعبا في ظل ركود السوق الداخلية والصادرات وهي تسعي لان يبلغ الانتاج حوالي خمسة ملايين سيارة في عام 2002 .',
 'word_index': 0}

## Generating candidates

In [None]:
generate_candidates_and_get_their_data(subset_filtered_train_data)

[{'word': 'الصين ',
  'tag': 'LOC',
  'sentence': 'كما يتحدثون أيضاً عن أحلام للإمبراطور ومشاهدات مسئول الفلك لنجم مضيء في اتجاه الغرب , يفسّره فلكيو القصر للإمبراطور آنذاك بأنه بشارة بظهور رجل حكيم في بلاد العرب هو محمد ( صلى الله عليه وسلم ) , فيرسل الإمبراطور وفداً للرسول الجديد احتراماً وتقرّباً , حيث يعود الوفد الصيني من رحلته إلى شبه الجزيرة العربية ومعه وفد عربي من صحابة الرسول وفي مقدمتهم الصحابي الجليل سعد بن أبي وقاص كأول دخول للإسلام في الصين , إلا أن هذه الأسطورة تشبه إلى حد كبير ما جاء في الأساطير البوذية حول دخول الديانة البوذية إلى الصين من الهند .',
  'word_index': 93,
  'candidates': [{'entity': 'Q148',
    'label': 'الصين',
    'description': 'الصين دولة في شرق آسياجزء من: الصين التاريخية, شرق آسيا البلد: الصين تاريخ التأسيس: ',
    'ner_tag': 'LOC'},
   {'entity': 'Q29520',
    'label': 'الصين التاريخية',
    'description': 'الصين التاريخية في المنطقة الثقافية، والحضارة القديمة، والأمة في شرق آسيا، تشير في معظمها إلى جمهورية الصين الشعبية في الحالة السياسية ونادراً م

In [None]:
subset_filtered_train_data[5]

{'word': 'قرية بوادي ',
 'tag': 'ORG',
 'sentence': 'وأضاف الحزب أن القوة الإسرائيلية المهاجمة حاولت السيطرة علي مدرسة قرية بوادي , ونفي المتحدث الأنباء التي ترددت عن مقتل3  من الحزب في الاشتباكات .',
 'word_index': 10,
 'candidates': []}

In [None]:
with open("subset_filtered_train_data.json", "w", encoding='utf-8') as f:
    json.dump(subset_filtered_train_data, f, ensure_ascii=False, indent=4)

## Generating embedding

In [None]:
model_name = "aubmindlab/bert-base-arabertv02"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

In [None]:
for entry in subset_filtered_train_data:
    word = entry['word']
    sentence = entry['sentence']
    sentence_embedding = generate_contextual_embeddings(sentence)
    entry['sentence_embedding'] = sentence_embedding
    candidates = entry['candidates']

    # Embedding for word
    word_embedding = get_word_embeddings_arabert(word,sentence)
    entry['word_embedding'] = word_embedding
    # Embedding for each candidate's label and description if not empty
    for candidate in candidates:
        label = candidate['label']
        description = candidate['description']

        # Embedding for candidate label
        if label != '':
            label_embedding = get_word_embeddings_arabert(label,description)
            candidate['label_embedding'] = label_embedding

        # Embedding for candidate description
        if description != '':
            description_embedding = generate_contextual_embeddings(description)
            candidate['description_embedding'] = description_embedding


## Disambiguation

In [None]:
result_entities = []
for entry in subset_filtered_train_data:
    best_entity = disambiguation_function([entry])
    result_entities.append(best_entity)


print(result_entities)

[[{'word': 'الصين ', 'sentence': 'كما يتحدثون أيضاً عن أحلام للإمبراطور ومشاهدات مسئول الفلك لنجم مضيء في اتجاه الغرب , يفسّره فلكيو القصر للإمبراطور آنذاك بأنه بشارة بظهور رجل حكيم في بلاد العرب هو محمد ( صلى الله عليه وسلم ) , فيرسل الإمبراطور وفداً للرسول الجديد احتراماً وتقرّباً , حيث يعود الوفد الصيني من رحلته إلى شبه الجزيرة العربية ومعه وفد عربي من صحابة الرسول وفي مقدمتهم الصحابي الجليل سعد بن أبي وقاص كأول دخول للإسلام في الصين , إلا أن هذه الأسطورة تشبه إلى حد كبير ما جاء في الأساطير البوذية حول دخول الديانة البوذية إلى الصين من الهند .', 'tag': 'LOC', 'correctly_labelled': 1, 'best_entity': {'entity': 'Q148', 'entity_label': 'الصين', 'entity_description': 'الصين دولة في شرق آسياجزء من: الصين التاريخية, شرق آسيا البلد: الصين تاريخ التأسيس: ', 'tag': 'LOC', 'label': 1}, 'other_entities': [{'entity': 'Q148', 'entity_label': 'الصين', 'entity_description': 'الصين دولة في شرق آسياجزء من: الصين التاريخية, شرق آسيا البلد: الصين تاريخ التأسيس: ', 'tag': 'LOC', 'label': 0}, {'entity':

In [None]:
with open("entities.json", "w", encoding='utf-8') as f:
    json.dump(result_entities, f, ensure_ascii=False, indent=4)

## Training AraBERT

In [None]:
import json
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, AutoModel
# from preprocess import NEVER_SPLIT_TOKENS,ArabertPreprocessor
from arabert.preprocess import NEVER_SPLIT_TOKENS,ArabertPreprocessor
from farasa.segmenter import FarasaSegmenter
import torch
import requests
from transformers import MBartForConditionalGeneration, MBart50Tokenizer
from transformers import AutoModelForTokenClassification
import re
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
from pyarabic.araby import tokenize
from transformers import MarianMTModel, MarianTokenizer
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import numpy as np
import spacy
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


from google.colab import drive

drive.mount('/content/drive')

data_path = '/content/drive/MyDrive/dataset.json'
with open(data_path, 'r', encoding='utf-8') as file:
            dataset = json.load(file)

In [None]:
class MultiCandidateBinaryClassificationDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.examples = self.prepare_examples()

    def __len__(self):
        return len(self.examples)

    def prepare_examples(self):
        examples = []
        for item in self.data:
            word = item['word']
            sentence = item['sentence']
            candidates = item['candidates']
            ner_tag=item['tag']

            for candidate in candidates:
                entity_text = candidate['entity_description']
                entity_ID = candidate['entity']
                label = candidate['label']
                entity_label=candidate['entity_label']

                text = f"{word} {sentence} [SEP] {entity_label} {entity_text}"

                inputs = self.tokenizer.encode_plus(
                    text,
                    max_length=self.max_length,
                    padding='max_length',
                    truncation=True,
                    return_tensors='pt'
                )

                examples.append({
                    'input_ids': inputs['input_ids'].squeeze(),
                    'attention_mask': inputs['attention_mask'].squeeze(),
                    'label': torch.tensor(label, dtype=torch.float),
                    'word': word,
                    'sentence': sentence,
                    'entity_ID':entity_ID,
                    'entity_label': entity_label,
                    'entity_description':entity_text,
                    'tag':ner_tag
                })

        return examples

    def __getitem__(self, idx):
        return self.examples[idx]


In [None]:
class FineTunedAraBERTForMultiCandidateBinaryClassification(nn.Module):
    def __init__(self, arabert_model_name):
        super(FineTunedAraBERTForMultiCandidateBinaryClassification, self).__init__()
        self.arabert = AutoModel.from_pretrained(arabert_model_name)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.arabert.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.arabert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0, :]  # [CLS] token
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits.squeeze(-1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')


arabert_model_name = "aubmindlab/bert-base-arabertv02"
tokenizer = AutoTokenizer.from_pretrained(arabert_model_name)
model = FineTunedAraBERTForMultiCandidateBinaryClassification(arabert_model_name="aubmindlab/bert-base-arabertv02").to(device)
multi_candidate_binary_dataset = MultiCandidateBinaryClassificationDataset(data=dataset, tokenizer=tokenizer)


In [None]:
from sklearn.model_selection import train_test_split

optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
criterion = nn.BCEWithLogitsLoss()
batch_size = 8  # Set your desired batch size
multi_candidate_binary_dataloader = DataLoader(multi_candidate_binary_dataset, batch_size=batch_size, shuffle=True)
num_epochs = 2


validation_size = 0.15
test_size = 0.15

remaining, test_data = train_test_split(multi_candidate_binary_dataset, test_size=test_size, random_state=42)

train_data, validation_data = train_test_split(remaining, test_size=validation_size, random_state=42)

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_data, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False)


In [None]:
epsilon = 1e-7

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    model.train()
    total_loss = 0

    for batch in multi_candidate_binary_dataloader:
        inputs = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(torch.float).to(device)

        # Normalize labels to be between 0 and 1
        max_label_value = torch.max(labels)
        labels_normalized = labels / (max_label_value + epsilon)  # Add epsilon to avoid division by zero

        optimizer.zero_grad()
        logits = model(input_ids=inputs, attention_mask=attention_mask)
        probabilities = torch.sigmoid(logits)
        loss = F.binary_cross_entropy(probabilities, labels_normalized)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(multi_candidate_binary_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs} - Average Loss: {avg_loss:.4f}")

    # Validation after each epoch
    accuracy, precision, recall, f1 = validate(model, validation_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs} - Validation Metrics:")
    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

    # Evaluation on test set after each epoch
    test_accuracy, test_precision, test_recall, test_f1 = validate(model, test_dataloader)
    print("Evaluation Metrics on Test Set:")
    print(f"Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1-score: {test_f1:.4f}")

In [None]:
def validate(model, dataloader):
    model.eval()
    val_labels = []
    val_predictions = []

    with torch.no_grad():
        for batch in dataloader:
            inputs = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(torch.float).to(device)

            logits = model(input_ids=inputs, attention_mask=attention_mask)
            probabilities = torch.sigmoid(logits)
            predicted_labels = (probabilities > 0.5).float()  # Threshold at 0.5

            val_labels.extend(labels.cpu().numpy())
            val_predictions.extend(predicted_labels.cpu().numpy())

    val_labels = np.array(val_labels)
    val_predictions = np.array(val_predictions)

    accuracy = accuracy_score(val_labels, val_predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(val_labels, val_predictions, average='weighted')

    return accuracy, precision, recall, f1


In [None]:
def display_predictions_structured(model, dataloader, tokenizer, output_file):
    predictions_data = []

    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            inputs = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(torch.float).to(device)
            words = batch['word']
            sentences = batch['sentence']
            entity_ids = batch['entity_ID']
            entity_labels = batch['entity_label']
            entity_descriptions = batch['entity_description']
            ner_tags = batch['tag']

            logits = model(input_ids=inputs, attention_mask=attention_mask)
            probabilities = torch.sigmoid(logits)
            predicted_labels = (probabilities > 0.5).float()  # Threshold at 0.5

            for idx in range(len(inputs)):
                word = words[idx]
                sentence = sentences[idx]
                entity_id = entity_ids[idx]
                input_text = tokenizer.decode(inputs[idx], skip_special_tokens=True)
                prediction = "True" if predicted_labels[idx] == 1 else "False"
                entity_label = entity_labels[idx]
                entity_description = entity_descriptions[idx]
                ner_tag = ner_tags[idx]

                prediction_info = {
                    "Word": word,
                    "Sentence": sentence,
                    "Entity_ID": entity_id,
                    "Predicted_Label": prediction,
                    "Entity_Label": entity_label,
                    "Entity_Description": entity_description,
                    "NER_Tag": ner_tag
                }

                # Append the prediction data to the list
                predictions_data.append(prediction_info)

    # Save the data to a JSON file
    with open(output_file, 'w') as outfile:
        json.dump(predictions_data, outfile, ensure_ascii=False, indent=4)


In [None]:
output_file = 'predictions_data.json'
display_predictions_structured(model, validation_dataloader, tokenizer, output_file)


## Some adjustments

In [None]:
def get_entity_data_with_NER_from_json(json_path):
    with open(json_path, 'r', encoding='utf-8') as json_file:
        data = json.load(json_file)

    for entity_data in data:
        candidates = entity_data.get('candidates', [])
        entity_id = None
        word=entity_data['word']

        for candidate in candidates:
            if candidate.get('entity'):
                entity_id = candidate['entity']
                entity_desc = candidate['entity_description']
                entity_label= candidate['entity_label']

                break  # Stop after finding the first entity ID

        if entity_id:
            url = f"https://www.wikidata.org/wiki/Special:EntityData/{entity_id}.json"
            response = requests.get(url)
            entity_data_wikidata = response.json()

            if 'entities' in entity_data_wikidata and entity_id in entity_data_wikidata['entities']:
                entity_info = entity_data_wikidata['entities'][entity_id]
                claims = entity_info.get('claims', {})

                relevant_properties = {
                    'instance of': 'P31'
                    # Add more relevant properties if needed for further NER tag classification
                }

                instance_label = ''

                for prop_label, prop_id in relevant_properties.items():
                    if prop_id in claims and prop_label == 'instance of':
                        prop_values = [claim['mainsnak']['datavalue']['value']['id'] for claim in claims[prop_id]]
                        if 'Q5' in prop_values:  # Q5 represents human
                            instance_label = "PERS"
                        elif 'Q11424' in prop_values:  # Q11424 represents film
                            instance_label = "MISC"
                        elif 'Q2221906' in prop_values:  # Q2221906 represents geographical location
                            instance_label = "LOC"
                        elif 'Q4917' in prop_values:  # Q4917 represents currency
                            instance_label = "MISC"
                        elif 'Q43229' in prop_values:  # Q43229 represents organization
                            instance_label = "ORG"

                # Check if instance_label is still empty
                if not instance_label:
                    # Perform NER using spaCy for translated text
                    # Assuming you have the necessary setup for translate_to_english and map_spacy_labels functions
                    input_data = [{
                        'word': word,  # Assuming 'en' label exists
                        'candidates': [{
                            'label': entity_label,  # Assuming 'en' label exists
                            'description': entity_desc
                        }]
                    }]

                    # NER with translated text using spaCy
                    input_data_with_ner = ner_for_entities_with_candidates(input_data)
                    if input_data_with_ner and 'ner_tag' in input_data_with_ner[0]['candidates'][0]:
                        instance_label = input_data_with_ner[0]['candidates'][0]['ner_tag']

                # Update each candidate with the instance_label
                for candidate in candidates:
                    candidate['ner_tag'] = instance_label

                entity_data_wikidata['entities'][entity_id] = entity_info

            else:
                entity_data_wikidata['entities'][entity_id] = {
                    'entity_ID': entity_id,
                    'ner_tag': ''
                }

    with open(json_path, "w", encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)


In [None]:
fix='/content/drive/MyDrive/fix.json'
get_entity_data_with_NER_from_json(fix)

## Saving Datasets and Models