Step 1: run `pip install classla` to install [classla](https://github.com/clarinsi/classla)

Step 2: download standard models and initialize pipelines

In [None]:
import classla

# slovenian
classla.download('sl')
sl_nlp = classla.Pipeline('sl')

# serbian
classla.download('sr')
sr_nlp = classla.Pipeline('sr')

# serbian non-standard
classla.download('sr', type='nonstandard')
sr_nlp_nonstandard = classla.Pipeline('sr', type='nonstandard')

Step 3: specify input directories and match them with corresponding pipeline; check well-formedness of the files and correct if needed (only well-formed xmls will be processed)

In [None]:
from lxml import etree
from pathlib import Path
from collections import defaultdict

sl_base = "source_sl"
sr_base = "source_sr"
sr_directory_std = sr_base
sr_directory_nonstd = sr_base + "_nonstandard"

MERGED_SUFF = "_merged"
targ_directory_merged = sr_base + MERGED_SUFF

input_data = {
    sl_base: sl_nlp,
    sr_directory_std: sr_nlp,
    sr_directory_nonstd: sr_nlp_nonstandard
}

files_to_process = defaultdict(list)

def check_directory(directory_nested):
    for file in Path(directory_nested).iterdir():
        if not file.is_file():
            check_directory(file)
            continue
        try:
            etree.parse(file)
            print(f"✅ File {file} is well-formed.")
            files_to_process[directory].append(file)
        except Exception as e:
            print(f"❌ File {file} is NOT well-formed!\n{e}")

for directory in input_data.keys():
    check_directory(directory)

Step 4.1: load [markdown mappings](https://docs.google.com/spreadsheets/d/1PiNjHQ7NoJyYLiTin4VehWd21Tc7LLdoCr52-t9_EjY) and initialize mapping function

In [5]:
import pandas as pd

LANG_SERBIAN = "srp"
LANG_SLOVENIAN = "slv"
LANG_RUSSIAN = "rus"

LANG_CODE_NORMALIZED = {
    LANG_SLOVENIAN: LANG_SLOVENIAN,
    "sln": LANG_SLOVENIAN,
    LANG_SERBIAN: LANG_SERBIAN,
    LANG_RUSSIAN: LANG_RUSSIAN
}

mapping_sheet_id = "1PiNjHQ7NoJyYLiTin4VehWd21Tc7LLdoCr52-t9_EjY"
mapping_sheet_names = {LANG_SLOVENIAN: "sl_ru", LANG_SERBIAN: "srp_ru"}

def mapping_sheet_to_dict(lang):
    mapping_sheet_dict = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{mapping_sheet_id}/gviz/tq?tqx=out:csv&sheet={mapping_sheet_names[lang]}", na_filter= False).to_dict()
    src = list(mapping_sheet_dict["classla"].values())
    trg = list(map(lambda x: x if len(x) > 0 else None, list(mapping_sheet_dict["ncrl"].values())))
    return {src_mapping: trg_mapping for src_mapping, trg_mapping in zip(src, trg)}

gr_map = {
    LANG_SLOVENIAN: mapping_sheet_to_dict(LANG_SLOVENIAN),
    LANG_SERBIAN: mapping_sheet_to_dict(LANG_SERBIAN),
}

additional_ncrl_features = {"ANUM", "NONLEX", "gvrn:acc", "gvrn:dat", "gvrn:gen", "gvrn:ins", "gvrn:loc", "gvrn:nom", "gvrn:voc"}
ncrl_features = set([])
for dct in gr_map.values():
    ncrl_features = ncrl_features.union(dct.values())
ncrl_features = additional_ncrl_features.union(sum([val.split(",") for val in ncrl_features if val is not None], []))

def get_mapping(lang, prop):
    return gr_map[lang][prop]

def is_mapped(lang, prop):
    return prop in gr_map[lang].keys()

def is_ncrl_feature(prop):
    return prop in ncrl_features


def map_to_ncrl(lang, pos, attrs):
    if not is_mapped(lang, pos):
        print(f"[{lang}] unknown POS: {pos}")
        res_pos = pos
    else:
        res_pos = get_mapping(lang, pos)
    if attrs is None:
        return res_pos, None
    res_attrs = set([])
    for attr_pair_str in attrs.lower().split("|"):
        if attr_pair_str == "numtype=ord" and res_pos == "A":
            res_pos = "ANUM"
        elif attr_pair_str == "foreign=yes":
            res_pos = "NONLEX"
        elif attr_pair_str.startswith("case=") and res_pos == "PR":
            res_attrs.add(f"gvrn:{get_mapping(lang, attr_pair_str)}")
        elif not is_mapped(lang, attr_pair_str):
            print(f"[{lang}] unmapped attribute pair: {attr_pair_str}")
            res_attrs.add(attr_pair_str)
        elif (mapping := get_mapping(lang, attr_pair_str)) is not None:
            res_attrs.add(mapping)
    if len(res_attrs) > 0:
        return res_pos, ",".join(res_attrs)
    else:
        return res_pos, None

Step 4.2: initialize file processing functions

In [53]:
import cyrtranslit
from tqdm import tqdm

# processed files will be exported to this folder
STORE_RESULTS_AT = "annotated"

known_keys = {"feats"}
keys_to_ignore = {"id", "text", "lemma", "upos", "xpos", "head", "ner", "deprel"}
non_textual_pos = {"PUNCT", "X", "SYM"}

def add_space_after(token_dict):
    return not ("misc" in token_dict.keys() and "SpaceAfter=No" in token_dict["misc"])

def annotate(lang, ana, token_dict):
    for key, value in token_dict.items():
        if key not in keys_to_ignore and key not in known_keys and (key != "misc" and value != "SpaceAfter=No"):
            print(f"{key} :: {value} in {token_dict}")
    if "feats" in token_dict.keys():
        pos, attrs = map_to_ncrl(lang, token_dict["upos"], token_dict["feats"])
        if attrs is not None:
            ana.attrib["gr"] = f"{pos},{attrs}"
        else:
            ana.attrib["gr"] = pos
    else:
        pos, _ = map_to_ncrl(lang, token_dict["upos"], None)
        ana.attrib["gr"] = pos

def append_text(root, text):
    if len(root.getchildren()) > 0:
        node = last_child(root)
        if node.tail is None:
            node.tail = text
        else:
            node.tail += text
    else:
        if root.text is None:
            root.text = text
        else:
            root.text += text

def last_child(node):
    return node.getchildren()[-1]

def get_word(token):
    return last_child(token).tail.strip()

def fill_text_and_lex(ana, token_dict, lang, is_latin):
    word_token = ana.getparent()
    if lang == LANG_SERBIAN:
        if is_latin:
            ana.tail = token_dict["text"]
            word_token.attrib["translit"] = cyrtranslit.to_cyrillic(token_dict["text"], "sr")
            ana.attrib["lex"] = token_dict["lemma"]
            ana.attrib["lex_translit"] = cyrtranslit.to_cyrillic(token_dict["lemma"], "sr")
        else:
            ana.tail = cyrtranslit.to_cyrillic(token_dict["text"], "sr")
            word_token.attrib["translit"] = token_dict["text"]
            ana.attrib["lex"] = cyrtranslit.to_cyrillic(token_dict["lemma"], "sr")
            ana.attrib["lex_translit"] = token_dict["lemma"]
    else:
        ana.tail = token_dict["text"]
        ana.attrib["lex"] = token_dict["lemma"]

def recreate_original_filepath(filename):
    try:
        if not Path(STORE_RESULTS_AT).joinpath(filename.parent).exists():
            new_filepath = Path(STORE_RESULTS_AT)
            for part in filename.parts:
                if part == filename.name:
                    break
                new_filepath = new_filepath.joinpath(part)
                if not Path(new_filepath).exists():
                    Path.mkdir(new_filepath)
    except Exception as _:
        Path.mkdir(Path(STORE_RESULTS_AT).joinpath(filename.parent))

def format_sentence(sentence):
    previous_variant = sentence.getprevious()
    if previous_variant is not None:
        if previous_variant.tail is None:
            previous_variant.tail = "\n    "
        else:
            previous_variant.tail = previous_variant.tail.strip() + "\n    "

def process_file(file_number, total_file_count, filename, pipeline):
    parsed_file = etree.parse(filename)
    bar_format = f"[{file_number + 1} / {total_file_count}] {str(filename)} " + "|{bar}| {percentage:3.0f}% {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
    for para in tqdm(parsed_file.find("body"), bar_format=bar_format):
        if para.tag != "para":
            print(f"Incorrect sentence pair tag: {para.tag}")
        for sentence in para:
            if sentence.tag != "se":
                print(f"Incorrect sentence tag: {sentence.tag}")
            lang = LANG_CODE_NORMALIZED[sentence.get("lang")]
            if lang == LANG_RUSSIAN or sentence.text is None:
                format_sentence(sentence)
                continue
            tokenized_sentence = etree.Element("se", attrib={"lang": lang})
            if lang == LANG_SERBIAN:
                latin_sentence = cyrtranslit.to_latin(sentence.text, "sr")
                parsed_sentence = pipeline(latin_sentence)
                tokenized_sentence.attrib["has_translit"] = "true"
            else:
                parsed_sentence = pipeline(sentence.text)
            if all(token.to_dict()[0]["upos"] in non_textual_pos for token in parsed_sentence.iter_tokens()):
                # no markdown required for non-textual sentences
                continue
            for token in parsed_sentence.iter_tokens():
                token_dict = token.to_dict()[0]
                if token_dict["upos"] == "PUNCT":
                    append_text(tokenized_sentence, token.text)
                else:
                    tokenized_sentence.append(etree.Element("w"))
                    word_token = last_child(tokenized_sentence)
                    word_token.append(etree.Element("ana"))
                    ana = last_child(word_token)
                    if lang == LANG_SERBIAN:
                        fill_text_and_lex(ana, token_dict, lang, latin_sentence == sentence.text)
                    else:
                        fill_text_and_lex(ana, token_dict, lang, _)
                    annotate(lang, ana, token_dict)
                if add_space_after(token_dict):
                    append_text(tokenized_sentence, " ")

            if "variant_id" in sentence.attrib.keys():
                tokenized_sentence.attrib["variant_id"] = sentence.attrib["variant_id"]
            sentence.getparent().replace(sentence, tokenized_sentence)
            format_sentence(tokenized_sentence)

    if not Path(STORE_RESULTS_AT).exists():
        Path.mkdir(Path(STORE_RESULTS_AT))
    recreate_original_filepath(filename)
    with open(Path(STORE_RESULTS_AT).joinpath(filename), "bw") as annotated_file:
        annotated_file.write(etree.tostring(parsed_file, encoding="utf-8", xml_declaration=True))

Step 4.3: process files

In [None]:
for directory, pipeline in input_data.items():
    total_file_count = len(files_to_process[directory])
    for file_number, filename in enumerate(files_to_process[directory]):
        process_file(file_number, total_file_count, filename, pipeline)

Step 5: check well-formedness of the results and validity of the markdown

In [None]:
for directory in input_data.keys():
    for source_file in files_to_process[directory]:
        file = Path(STORE_RESULTS_AT).joinpath(source_file)
        try:
            parsed_file = etree.parse(file)
            print(f"✅ File {file} is well-formed.")
        except Exception as e:
            print(f"❌ File {file} is NOT well-formed!\n{e}")
            break

        file_is_correctly_annotated = True
        for para in parsed_file.find("body"):
            for sentence in para:
                lang = sentence.get("lang")
                common_message_part = f"[{file.name} ({lang}), para #{para.get('id')}]"
                if lang == LANG_RUSSIAN or sentence.text is None:
                    continue
                if lang == LANG_SERBIAN:
                    for word_token in sentence:
                        ana = last_child(word_token)
                        if "translit" not in word_token.keys():
                            print(f"{common_message_part} missing transliterated version of the word \"{ana.tail}\"")
                            file_is_correctly_annotated = False
                        if "lex" not in ana.keys():
                            print(f"{common_message_part} missing lexeme of the word \"{ana.tail}\"")
                            file_is_correctly_annotated = False
                        if "lex_translit" not in ana.keys():
                            print(f"{common_message_part} missing transliterated version of lexeme of the word \"{ana.tail}\"")
                            file_is_correctly_annotated = False
                        if "gr" not in ana.keys():
                            print(f"{common_message_part} missing grammatical features of the word \"{ana.tail}\"")
                            file_is_correctly_annotated = False
                        else:
                            for gr_prop in ana.get("gr").split(","):
                                if not is_ncrl_feature(gr_prop):
                                    print(f"{common_message_part} unknown grammatical feature in the word \"{ana.tail}\": {gr_prop}")
                                    file_is_correctly_annotated = False
        if file_is_correctly_annotated:
            print(f"✅ File {file} is correctly annotated.")
        print()

Step 6: merge "standard" and "non-standard" files (serbian); list of alternative lexemes available [here](https://docs.google.com/spreadsheets/d/1Hl2ns1449xmOVByr5879jwjC3djMTiaZItPO5V6rtbg/edit), grammar attribute postprocessing sheet -- [here](https://docs.google.com/spreadsheets/d/1hAfJ3A4P7iaN24VE2AGhuNBB2NJ1Rv6PSf-wYucoeuw)

In [99]:
import re

if not Path(STORE_RESULTS_AT).absolute().joinpath(targ_directory_merged).exists():
    Path.mkdir(Path(STORE_RESULTS_AT).joinpath(targ_directory_merged))

alt_lexemes_sheet_id = "1Hl2ns1449xmOVByr5879jwjC3djMTiaZItPO5V6rtbg"
alt_lexemes_sheet_names = {LANG_SERBIAN: "srp"}

def alt_lexemes_sheet_to_dict(lang):
    alt_lexemes_sheet_dict = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{alt_lexemes_sheet_id}/gviz/tq?tqx=out:csv&sheet={alt_lexemes_sheet_names[lang]}", na_filter= False).to_dict()
    std = list(alt_lexemes_sheet_dict["std"].values())
    nonstd = list(alt_lexemes_sheet_dict["nonstd"].values())
    return {nonstd_mapping: std_mapping for std_mapping, nonstd_mapping in zip(std, nonstd)}

alt_lexemes = {
    LANG_SERBIAN: alt_lexemes_sheet_to_dict(LANG_SERBIAN),
}

alt_syls = {
    "je": "e",
    "ije": "e"
}

gr_postproc_sheet_id = "1hAfJ3A4P7iaN24VE2AGhuNBB2NJ1Rv6PSf-wYucoeuw"
gr_postproc_sheet_names = {LANG_SERBIAN: "srp"}

def gr_postproc_sheet_to_list(lang):
    gr_postproc_sheet_dict = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{gr_postproc_sheet_id}/gviz/tq?tqx=out:csv&sheet={gr_postproc_sheet_names[lang]}", na_filter= False).to_dict()
    replace = list(gr_postproc_sheet_dict["replace"].values())
    _with = list(gr_postproc_sheet_dict["with"].values())
    if_lex_nonstd_matches_regex = list(gr_postproc_sheet_dict["if_lex_nonstd_matches_regex"].values())
    else_remove = list(gr_postproc_sheet_dict["else_remove"].values())
    return list(zip(replace, _with, if_lex_nonstd_matches_regex, else_remove))

gr_postproc = {
    LANG_SERBIAN: gr_postproc_sheet_to_list(LANG_SERBIAN),
}

def add_std_lexeme(std_latin, nonstd_latin):
    return any(nonstd_latin.replace(key, value) == std_latin for key, value in alt_syls.items())\
           or ((nonstd_latin in alt_lexemes[LANG_SERBIAN].keys()) and (alt_lexemes[LANG_SERBIAN][nonstd_latin] == std_latin))

def process_lex_diff(ana_std, ana_nonstd):
    nonstd_latin = cyrtranslit.to_latin(ana_nonstd.get("lex"), "sr")
    std_latin = cyrtranslit.to_latin(ana_std.get("lex"), "sr")
    if add_std_lexeme(std_latin, nonstd_latin):
        ana_nonstd.addprevious(etree.Element("ana", attrib={"lex": ana_std.get('lex'), "lex_translit": ana_std.get('lex_translit'), "gr": ana_nonstd.get("gr")}))
    # else:
    #     print(ana_std.get("lex"), ana_nonstd.get("lex"))

def postprocess_gr(lang, nonstd_gr_split, ana_nonstd):
    for tok_to_replace, _with, cond_regex, tok_to_remove in gr_postproc[lang]:
        if (tok_to_replace in nonstd_gr_split) and (tok_to_remove in nonstd_gr_split):
            if re.match(cond_regex, cyrtranslit.to_latin(ana_nonstd.get("lex"), "sr")):
                ana_nonstd.attrib["gr"] = ",".join([prop if prop != tok_to_replace else _with for prop in nonstd_gr_split])
            else:
                ana_nonstd.attrib["gr"] = ",".join([prop for prop in nonstd_gr_split if prop != tok_to_remove])

def replace_path_segment(file, source, target):
    return str(file).replace(source, target)

In [None]:
for file in files_to_process[sr_directory_std]:
    file_std = Path(STORE_RESULTS_AT).joinpath(file)
    file_nonstd = Path(STORE_RESULTS_AT).joinpath(replace_path_segment(file, sr_directory_std, sr_directory_nonstd))

    parsed_file_std = etree.parse(file_std)
    parsed_file_nonstd = etree.parse(file_nonstd) # in-place merge will be performed, results will be saved to a new file
    sentences_std = [se for se in parsed_file_std.findall(".//se") if se.get("lang") == LANG_SERBIAN]
    sentences_nonstd = [se for se in parsed_file_nonstd.findall(".//se") if se.get("lang") == LANG_SERBIAN]
    bar_format = "Merging file: " + str(file_nonstd.name) + " |{bar}| {percentage:3.0f}% {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
    for para_id, (se_std, se_nonstd) in tqdm(list(enumerate(zip(sentences_std, sentences_nonstd))), bar_format=bar_format):
        for wt_std, wt_nonstd in zip(se_std.findall("w"), se_nonstd.findall("w")):
            ana_std = last_child(wt_std)
            ana_nonstd = last_child(wt_nonstd)
            if ana_std.get("lex").lower() != ana_nonstd.get("lex").lower():
                process_lex_diff(ana_std, ana_nonstd)

            std_gr = ana_std.get("gr").split(",")
            nonstd_gr = ana_nonstd.get("gr").split(",")
            if (len(std_gr) != len(nonstd_gr)) or (len(set(std_gr) & set(nonstd_gr)) != len(std_gr)):
                if set(nonstd_gr) == {"INTJ"}:
                    ana_nonstd.attrib["gr"] = ana_std.get("gr") # non-standard model sometimes incorrectly assigns "INTJ", then standard model's prediction should be prioritized
                continue # keeping the non-standard model's prediction otherwise

            postprocess_gr(LANG_SERBIAN, nonstd_gr, ana_nonstd)

    merged_file_path = Path(replace_path_segment(file, sr_directory_std, targ_directory_merged))
    recreate_original_filepath(merged_file_path)
    with open(Path(STORE_RESULTS_AT).joinpath(merged_file_path), "bw") as merged_file:
        merged_file.write(etree.tostring(parsed_file_nonstd, encoding="utf-8", xml_declaration=True))

Step 7: check well-formedness of the results

In [None]:
for source_file in files_to_process[sr_directory_std]:
    file = Path(STORE_RESULTS_AT).joinpath(replace_path_segment(source_file, sr_directory_std, targ_directory_merged))
    try:
        parsed_file = etree.parse(file)
        print(f"✅ File {file} is well-formed.")
    except Exception as e:
        print(f"❌ File {file} is NOT well-formed!\n{e}")