# Arabic POS-Tagging
In this notebook, POS-Tagging of the Quran is performed. The model from CamelTools is used while also mapping the indivdual tags to the universal tag set.

In [None]:
%pip install transformers
%pip  install camel-tools

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


First, the necessary packages need to be imported:

In [None]:
import os

from google.colab import drive

# Mount the drive
drive.mount('/content/drive')
# Add the path where the camel tools data is stored.
os.environ['CAMELTOOLS_DATA'] = '/content/drive/MyDrive/camel_tools'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from collections import Counter, defaultdict
import pickle

from camel_tools.morphology.database import MorphologyDB
from camel_tools.morphology.analyzer import Analyzer

import numpy as np
from transformers import pipeline

In [None]:
# First, we need to load a morphological database.
# Here, we load the default database which is used for analyzing
# Modern Standard Arabic. 
db = MorphologyDB.builtin_db()

analyzer = Analyzer(db)

In [None]:
# Change the path to where the parallel data file is located on your device/drive
DATA_PATH = "drive/MyDrive/Data/parallel/Tanzil-ar-ha.txt"

UNK = "<unk>"

# Mapping for arabic tags
MAPPING  = {
    "abbrev": "X",
    "adj": "ADJ",
    "adj_comp": "ADJ",
    "adj_num": "ADJ",
    "adv": "ADV",
    "adv_interrog": "ADV",
    "adv_rel": "ADV",
    "conj": "CCONJ",
    "conj_sub": "SCONJ",
    "digit": "X",
    "interj": "INTJ",
    "noun": "NOUN",
    "noun_num": "NOUN",
    "noun_prop": "PROPN",
    "noun_quant": "NOUN",
    "part": "PART",
    "part_det": "PART",
    "part_focus": "PART",
    "part_fut": "PART",
    "part_interrog": "PART",
    "part_neg": "PART",
    "part_restrict": "PART",
    "part_verb": "PART",
    "part_voc": "PART",
    "prep": "ADP",
    "pron": "PRON",
    "pron_dem": "PRON",
    "pron_interrog": "PRON",
    "pron_rel": "PRON",
    "punc": "PUNCT",
    "verb": "VERB",
    "verb_pseudo": "VERB"
  }
  

# Define the model to use for pos tagging
pos = pipeline('token-classification', model='CAMeL-Lab/bert-base-arabic-camelbert-ca-pos-msa', aggregation_strategy="max", device=0)

Downloading:   0%|          | 0.00/1.76k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/416M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/86.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/297k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Next, we define some helper functions. `map_to_idx` is used to map the tags found by the `pipeline` back to the words in the original string.<br>
`infer_unknown` is used to assign tags to words for which no POS-tag was found by `pipeline`. This is done by accessing the morphological database of CaMelTools.

In [None]:
def map_to_idx(words):
  """Maps a list of strings to range of indices in the original string.

  Args:
    words (List[str])
  
  Returns:
    List[Tuple(int)]
  """
  curr = 0
  ranges = []
  for w in words:
    start = curr
    end = curr + len(w)
    ranges.append((start, end))
    curr = end + 1
  return ranges

def infer_unknown(token):
    tags = []
    analysis = analyzer.analyze(token)
    for a in analysis:
      tags.append(a["ud"])
    counter = Counter(tags)
    if tags:
      return counter.most_common(1)[0][0].split("+")[0]
    return UNK
    
  

In [None]:
with open(DATA_PATH) as f:
    counter = 0
    line_dict = defaultdict(list)
    for line in f:
      if counter%1000 == 0:
        print(counter)
      # Split line at delimiter
      line = line.split("|||")
      # Only look at line with the right format.
      if len(line) == 2:
        src, _ = line
        words = src.split()
        idx_range = map_to_idx(words)
        tags = [(UNK, 0) for i in range(len(words))]
        # Tag words
        try:
          tag_list = pos(src)
        except RuntimeError:
          print("Error: Sequence with length {}. Trying to infer tags from database".format(len(words)))
        else:
          # Map the tags to the original words
          for tag_dict in tag_list:
            for word_idx, (start, end) in enumerate(idx_range):
              if start >= tag_dict["start"] and end <= tag_dict["end"]:
                tags[word_idx] = MAPPING[tag_dict["entity_group"]], tag_dict["score"]
        # Try to infer unkown tokens from database if no tag was found by pipeline
        tags = [(infer_unknown(words[idx]), 0.1) if tag == UNK else (tag, score) for idx, (tag, score) in enumerate(tags)]
        line_dict[counter] = tags
        counter += 1

0




1000
Error: Sequence with length 469. Trying to infer tags from database
Error: Sequence with length 469. Trying to infer tags from database
2000
3000
4000
5000
6000
Error: Sequence with length 720. Trying to infer tags from database
Error: Sequence with length 720. Trying to infer tags from database
Error: Sequence with length 618. Trying to infer tags from database
Error: Sequence with length 618. Trying to infer tags from database
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000


In [None]:
OUT = "/content/drive/MyDrive/Data/tagged/Tanzil-ar-ha.tagged"

with open(OUT, "w", encoding="utf-8") as file:
  for i in range(len(line_dict)):
    tags = line_dict[i]
    tag_str = ["{}-{}".format(tag, score) for tag, score in tags]
    file.write("{}\n".format(" ".join(tag_str)))