In [49]:
import re
import pickle

# Data Cleaning

In [50]:
numeric_pattern = r"\(\s*\d+\s*/\s*\d+\s*\)"
english = r"[a-zA-Z]"
numbers = r"\s*\d+\s*"
numering_items = r"\s*\d+\s*[-]\s*"
empty_brackets = r'\(\s*\)|\[\s*\]|\{\s*\}|<<\s*>>|"\s*"|\'\s*\''
stand_alone=r'(?<=\s|\^|\(|\[|\{)[^\(\)\[\]\{\}\.,،:;؛؟!\-](?=\s|$|\]|\)|\})'
UNK_CHAR = '\uFFFD'

In [51]:
kawloho_pattern = r"(\s*قَوْلُهُ\s*)"
qala_variations = r"(?:قَالَ|قَالَتْ|قُلْت|قَالُوا|قُلْنَا|أَقُولُ)"
qala_variations = r"(?:قَالَ|قَالَتْ|قُلْت|قَالُوا|قُلْنَا|أَقُولُ)"
qala_pattern = rf"(\s*{qala_variations}\s*:)"

In [52]:
def remove_unbalanced_brackets(text):
    pair_map = {')': '(', '}': '{', ']': '[', '>':'<', '»': '«', '"':'"', "'":"'"}
    openers = set(['(', '{', '[', '<', '«', '"', "'"])
    
    stack = [] 
    indices_to_remove = set()

    for i, char in enumerate(text):
        if char in openers:
            stack.append((char, i))
        
        elif char in pair_map:
            if stack:
                last_opener, _ = stack[-1]
                if last_opener == pair_map[char]:
                    stack.pop()
                else:
                    indices_to_remove.add(i)
            else:
                indices_to_remove.add(i)

    for char, index in stack:
        indices_to_remove.add(index)

    return "".join([char for i, char in enumerate(text) if i not in indices_to_remove])


In [53]:
def clean_punctuation_sequence(text):
    puncs = re.escape(".,:;{}[]()!?'\"/،؛؟")
    pattern = rf"([{puncs}])(?:\s*[{puncs}])+"
    return re.sub(pattern, r"\1", text)

In [54]:
def separate_citations(citation_pattern, lines):
    final_lines = []

    for line in lines:
        modified_line = re.sub(citation_pattern, r"\n\1", line)
        
        parts = modified_line.split('\n')
        
        for part in parts:
            cleaned_part = part.strip()
            if cleaned_part:
                final_lines.append(cleaned_part)
                
    return final_lines

In [55]:
def clean_punctuation_sequence(text):
    collapsible = re.escape(".,:;!?'\"/،؛؟")    
    pattern = rf"([{collapsible}])(?:\s*\1)+"
    
    return re.sub(pattern, r"\1", text)

In [56]:
def split_citations(lines):
    qal_list = [
        "قَالَ","قَالَتْ","قَالُوا","قُلْت","قُلْنَا",
        "أَقُولُ","يَقُولُ","يَقُولُونَ","قِيلَ","يُقَالُ"
    ]
    
    def add_tashkeel(word):
        tashkeel = "[\u064B-\u065F]*"
        return "".join([c + tashkeel for c in word])
    
    qal_regex = "|".join([add_tashkeel(w) for w in qal_list])

    qal_with_colon = rf"(?:{qal_regex})\s*[:：]"

    tashkeel = "[\u064B-\u065F]*"
    qawloho_regex = rf"(?:وَ|فَ)?قَوْل{tashkeel}(?:ه{tashkeel}|هُ{tashkeel})?(?:\s*تَعَالَى)?"

    trigger = rf"({qal_with_colon}|{qawloho_regex})"

    final_lines = []
    for line in lines:
        matches = list(re.finditer(trigger, line))
        if not matches:
            final_lines.append(line.strip())
            continue
        
        last_idx = 0
        for m in matches:
            start = m.start()
            if line[last_idx:start].strip():
                final_lines.append(line[last_idx:start].strip())
            last_idx = start
        
        final_lines.append(line[last_idx:].strip())
        
    return final_lines

In [57]:
def process_text(lines):
    new_lines = []
    for line in lines:
        res = re.sub(numering_items, '', line)
        res = re.sub(numeric_pattern, '', res)
        res = re.sub(english, ' ', res)
        res = re.sub(numbers, ' ', res)
        res = re.sub(empty_brackets, '', res)
        res = re.sub(',', '،', res)
        res = re.sub(';', '؛', res)
        res = re.sub(r'\s+ا\s*هـ?\s+', ' ، ', res)
        res = re.sub(fr'\((\s*{stand_alone}\s*)+\)', f' {UNK_CHAR} ', res)
        res = re.sub(fr'(\s*{stand_alone}\s*)+', f' {UNK_CHAR} ', res) 
        res = re.sub(r'/', '', res)
        res = re.sub(r'\*', '', res)
        res = re.sub(r'–', '-', res)
        res = res.replace('\u200f', '')
        
        res = clean_punctuation_sequence(res)
        res = remove_unbalanced_brackets(res)
        
        res = re.sub(r"\s+", " ", res).strip()
        new_lines.append(res)
    return new_lines

In [58]:
def read_data(file_path='../data/train.txt'):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    return process_text(lines)

In [59]:
train_lines = read_data('../data/train.txt')

In [60]:
new_lines = split_citations(train_lines)

In [61]:
cleaned_lines = [remove_unbalanced_brackets(line) for line in new_lines]

# Preparation

## TA code

In [62]:
file_path = '../utils/arabic_letters.pickle' 

with open(file_path, 'rb') as f:
    arabic_letters = pickle.load(f)


In [63]:
file_path = '../utils/diacritic2id.pickle' 

with open(file_path, 'rb') as f:
    diacritic2id = pickle.load(f)

In [64]:
punctuation = [' ', '،', ':', '؛', '!', '؟', '"', "'", '«', '»', '(', ')', '[', ']', '{', '}', '-', '.']
DIACRITICS_PATTERN = re.compile(r'[\u064B-\u0652]')

In [65]:
def slide_window(text, labels, overlap=50, max_len=200):
    assert len(text) == len(labels), "Text and labels must be of the same length."
    if len(text) <= max_len:
        return [text], [labels]
    
    text_chunks = []
    label_chunks = []
    
    stride = max_len - overlap
    
    for i in range(0, len(text), stride):
        t_chunk = text[i : i + max_len]
        
        l_chunk = labels[i : i + max_len]
        
        text_chunks.append(t_chunk)
        label_chunks.append(l_chunk)
        
        if i + max_len >= len(text): 
            break
            
    return (text_chunks, label_chunks)


In [66]:
def split_text_and_diacritics(text):

    letters = []
    labels = []
    
    i = 0
    while i < len(text):
        char = text[i]
        
        if DIACRITICS_PATTERN.match(char):
            if labels:
                labels[-1] += char
        
        else:
            letters.append(char)
            labels.append("") 
            
        i += 1
        
    return "".join(letters), labels

In [67]:
cleaned_text = []
cleaned_tashkeel = []

for line in cleaned_lines:
    line = re.sub(r'\s+', ' ', line).strip()
    if not line.strip():
        continue
    text, tashkeel = split_text_and_diacritics(line)
    if len(text) <= 5:
        continue
    
    cleaned_text.append(text)
    cleaned_tashkeel.append(tashkeel)

In [68]:
for i in range(len(cleaned_text)):
    assert len(cleaned_text[i]) == len(cleaned_tashkeel[i])

In [69]:
quantile = 0.99
lengths = [len(text) for text in cleaned_text]
MAX_LEN = int(sorted(lengths)[int(len(lengths) * quantile)])
MAX_LEN

807

In [44]:
chunked_lines = []
chunked_labels = []
for i, new_line in enumerate(cleaned_text):
    chunks = slide_window(new_line, cleaned_tashkeel[i], overlap=50, max_len=MAX_LEN)
    
    for i, chunk in enumerate(chunks[0]):
        chunked_lines.append(chunk)
        chunked_labels.append(chunks[1][i])


In [45]:
for i in range(len(chunked_lines)):
    assert len(chunked_lines[i]) == len(chunked_labels[i])

In [46]:
char_set = {}
for line in chunked_lines:

    for char in line:
        if char in char_set:
            continue
        else:
            if char not in arabic_letters and char not in punctuation and char not in diacritic2id.keys():
                print(f"Char {repr(char)} not found in arabic_letters list.")   
            char_set[char] = 1


Char '�' not found in arabic_letters list.


## Padding

In [47]:
def pad_tokens(tokens_text, tokens_labels, max_len):
    assert len(tokens_text) == len(tokens_labels), "Tokens and labels must be of the same length."
    if len(tokens_text) == max_len:
        return (tokens_text[:max_len], tokens_labels[:max_len])
    elif len(tokens_text) < max_len:
        return (tokens_text + ['<PAD>'] * (max_len - len(tokens_text)), 
                tokens_labels + [''] * (max_len - len(tokens_labels)))

In [48]:
padded_texts = []
padded_labels = []

for i, line in enumerate(chunked_lines):
    listed_line = []
    for char in line:
        listed_line.append(char) 
            
    padded_text, padded_label = pad_tokens(listed_line, chunked_labels[i], MAX_LEN)
    padded_texts.append(padded_text)
    padded_labels.append(padded_label)

KeyboardInterrupt: 

In [None]:
for i in range(len(padded_texts)):
    assert len(padded_texts[i]) == len(padded_labels[i]) == MAX_LEN

## Vocab building

In [None]:
def build_vocab(padded_texts, og_lines):
    
    word_set = {}
    # i = 0
    # for line in og_lines:
    #     for word in line.split():
    #         if word == UNK_CHAR:
    #             continue
    #         if word not in word_set:
    #             word_set[word] = i
    #             i += 1                

    char2idx = {
        '<PAD>' : 0,
        UNK_CHAR : 1
    }
    i = 2
    for text in padded_texts:
        for char in text:
            if char not in char2idx:
                char2idx[char] = i
                i += 1
    
    return char2idx, word_set

In [None]:
char2idx, word_vocab = build_vocab(padded_texts, chunked_lines)

In [None]:
idx2char = {v: k for k, v in char2idx.items()}

In [None]:
for letter in arabic_letters:
    if letter not in char2idx:
        assert False, f"Letter {repr(letter)} from arabic_letters not found in vocab."

In [None]:
for letter in char2idx:
    if letter not in arabic_letters and letter not in punctuation and letter != '<PAD>' and letter != UNK_CHAR:
        print(f"Letter {repr(letter)} from vocab not found in arabic_letters.")

In [None]:
with open('../data/cleaned_text.txt', 'w', encoding='utf-8') as f:
    for line in chunked_lines:
        f.write(f"{line}\n")

with open('../data/cleaned_tashkeel.txt', 'w', encoding='utf-8') as f:
    for line in chunked_labels:
        f.write(f"{line}\n")

In [None]:
with open('../data/padded_dirty.pkl', 'wb') as f:
    pickle.dump((padded_texts, padded_labels), f)


In [None]:
import json
with open('../data/char2idx.json', 'w', encoding='utf-8') as f:
    json.dump(char2idx, f, ensure_ascii=False, indent=4)

with open('../data/idx2char.json', 'w', encoding='utf-8') as f:
    json.dump(idx2char, f, ensure_ascii=False, indent=4)

In [None]:
with open('../data/cleaned_val.txt', 'w', encoding='utf-8') as f:
    for line in chunked_lines:
        f.write(f"{line}\n")

with open('../data/cleaned_tashkeel_val.txt', 'w', encoding='utf-8') as f:
    for line in chunked_labels:
        f.write(f"{line}\n")

In [None]:
idx2label = {v: k for k, v in diacritic2id.items()}

In [None]:
with open('../data/cleaned_all.txt', 'w', encoding='utf-8') as f:
    for i in range(len(padded_texts)):
        result_str = ""
        for char, p_id in zip(padded_texts[i], padded_labels[i]):
            if char == '<PAD>': 
                break 
            
            result_str += char + p_id
        f.write(result_str + "\n")
        