# Prepare Šolar corpus data


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd

from utils.reading import read_xml
from utils.logging import get_logger
from utils.solar_id_enum import SolarId


In [None]:
# Get logger
solar_data_logger = get_logger("Prepare Solar Corpus Data")


In [None]:
# Constants
SOLAR_DIRECTORY = "../../data/solar/"
SOLAR_FILE = "../../data/solar/solar_complete.xml"  # complete solar data
MSD_INDEX_FILE = "../../slovene_specification/MSD_index.npy"  # MSD index

# Solar Text Encoding Initiative (TEI) TAG
SOLAR_TAG_TEI = "{http://www.tei-c.org/ns/1.0}"
SOLAR_TAG_TEI_HEADER = SOLAR_TAG_TEI + "teiHeader"
SOLAR_TAG_TEXT = SOLAR_TAG_TEI + "text"
SOLAR_TAG_STAND_OFF = SOLAR_TAG_TEI + "standOff"
SOLAR_TAG_GROUP = SOLAR_TAG_TEI + "group"
SOLAR_TAG_TEXT = SOLAR_TAG_TEI + "text"
SOLAR_TAG_DIV = SOLAR_TAG_TEI + "div"
SOLAR_TAG_BIBL = SOLAR_TAG_TEI + "bibl"
SOLAR_TAG_P = SOLAR_TAG_TEI + "p"
SOLAR_TAG_DATE = SOLAR_TAG_TEI + "date"
SOLAR_TAG_NOTE = SOLAR_TAG_TEI + "note"
SOLAR_TAG_TERM = SOLAR_TAG_TEI + "term"
SOLAR_TAG_S = SOLAR_TAG_TEI + "s"
SOLAR_TAG_W = SOLAR_TAG_TEI + "w"
SOLAR_TAG_PC = SOLAR_TAG_TEI + "pc"
SOLAR_TAG_SEG = SOLAR_TAG_TEI + "seg"
SOLAR_TAG_LINKGRP = SOLAR_TAG_TEI + "linkGrp"
SOLAR_TAG_LINK = SOLAR_TAG_TEI + "link"

# Solar attribute name for identifying the solar text
# Solar id: solar{#div}{s-t}.{#p}.{#s}.{#w}
SOLAR_ID = "{http://www.w3.org/XML/1998/namespace}id"


In [None]:
def read_solar():
    """
    Reads the solar corpus and returns a list of dictionaries.

    Solar:
    -> teiHeader -> /
    -> text -> group -> text[2 = source + target] -> body ->
        -> div[5485 = # of written works] -> bibl + p[...] ->
        -> (date + note + term[...]) + (s[...]) -> ... + w[...] + pc[...] + seg[...]
    -> standOff -> linkGrp[126159] -> link[...]

    div = school written work
    p = paragraph
    s = sentence
    w = word
    pc = punctuation
    seg = segment
    
    @return: corpus data in xml format
    """
    # Read the solar data and return it
    data_xml = read_xml(SOLAR_FILE)
    solar_data_logger.info("Solar data read")

    return data_xml


In [None]:
def generate_solar_id(solar_id, solar_type=SolarId.D):
    """
    Generates the sentence solar id, which is the same for source and target.

    Solar id: solar{#div}{s-t}.{#p}.{#s}.{#w}

    @param solar_id: solar id - unique id for word, sentence, paragraph and school written work
    @param solar_type: solar word type
    @return: generated solar id
    """
    try:
        if solar_type == SolarId.D:
            # Document (<div>)
            document_id = "".join([number for number in solar_id if number.isdigit()])
            return document_id
        elif solar_type == SolarId.P:
            # Paragraph (<p>)
            solar_id = solar_id.split(".")
            document_id = "".join(
                [number for number in solar_id[0] if number.isdigit()]
            )
            paragraph_id = ".".join([document_id, solar_id[1]])
            return paragraph_id
        elif solar_type == SolarId.S:
            # Sentence (<s>)
            solar_id = solar_id.split(".")
            document_id = "".join(
                [number for number in solar_id[0] if number.isdigit()]
            )
            paragraph_id = solar_id[1]
            sentence_id = ".".join([document_id, paragraph_id, solar_id[2]])
            return sentence_id
        elif solar_type == SolarId.W:
            # Word (<w>)
            solar_id = solar_id.split(".")
            document_id = "".join(
                [number for number in solar_id[0] if number.isdigit()]
            )
            paragraph_id = solar_id[1]
            sentence_id = solar_id[2]
            word_id = ".".join([document_id, paragraph_id, sentence_id, solar_id[3]])
            return word_id
    except IndexError:
        # Check if solar type is valid
        solar_data_logger.warning("Invalid solar id type")
        return solar_id


In [None]:
def get_solar_data_multiple_error():
    """
    Gets the solar source data, combines words into sentences with errors and
    returns the data frame of sentence ids, sentences and error types in
    sentences. Generated sentences contain multiple errors.

    @return: a data frame of solar sentences and corresponding error types
    """
    # Read the source and target token data
    data_source = pd.read_csv(
        SOLAR_DIRECTORY + "solar_source_token.csv", keep_default_na=False
    )
    data_target = pd.read_csv(
        SOLAR_DIRECTORY + "solar_target_token.csv", keep_default_na=False
    )

    # Read the links between source and target tokens and remove entries with ID
    data_link = pd.read_csv(SOLAR_DIRECTORY + "solar_link.csv")
    data_link = data_link[data_link["type"] != "ID"]

    # Store multiple error sentence data
    data = []
    sentence = ""
    sentence_id = ""
    sentence_error = ""

    for temp_data in data_source.itertuples():
        # temp_data: [Index, id, text, lemma, msd_sl, msd_en, space]
        if not sentence_id == generate_solar_id(temp_data[1], SolarId.S):
            # Skip the first sentence errors as it is empty
            if not sentence_id == "":
                # Generate sentence id
                temp_sentence_id = sentence_id.split(".")
                temp_sentence_id_source = "solar{}s.{}.{}".format(*temp_sentence_id)
                temp_sentence_id_target = "solar{}t.{}.{}".format(*temp_sentence_id)

                # Get error type
                temp_sentence_error_source = data_link.loc[
                    data_link["source"].str.contains(temp_sentence_id_source, na=False)
                ]
                temp_sentence_error_target = data_link.loc[
                    data_link["target"].str.contains(temp_sentence_id_target, na=False)
                ]

                # Join source and target error types
                temp_sentence_error = pd.concat(
                    [temp_sentence_error_source, temp_sentence_error_target]
                )

                # Remove duplicate error types
                temp_sentence_error = temp_sentence_error.drop_duplicates()

                # Word id might be type ID or does not exist
                if len(temp_sentence_error):
                    sentence_error = ";".join(temp_sentence_error["type"].tolist())

            # Add sentence triplet to data and reset temp data
            data.append([sentence_id, sentence, sentence_error])

            # If sentence has error, append same sentence without any errors
            if not sentence_error == "":
                # Generate sentence id
                temp_sentence_id = sentence_id.split(".")
                temp_sentence_id_target = "solar{}t.{}.{}".format(*temp_sentence_id)

                # Create a sentence without any errors
                temp_sentence = ""
                for temp_temp_data in data_target[
                    data_target["id"].str.contains(temp_sentence_id_target)
                ].itertuples():
                    temp_sentence += temp_temp_data[2] + (
                        " " if temp_temp_data[6] else ""
                    )

                # Add sentence triplet to data and reset temp data
                data.append([sentence_id, temp_sentence, ""])

            sentence = ""
            sentence_id = generate_solar_id(temp_data[1], SolarId.S)
            sentence_error = ""
            solar_data_logger.info("Processing sentence: " + sentence_id)

        sentence += temp_data[2] + (" " if temp_data[6] else "")

    # Append last sentence and remove first one (empty)
    data.append([sentence_id, sentence, sentence_error])
    # If sentence has error, append same sentence without any errors
    if not sentence_error == "":
        # Generate sentence id
        temp_sentence_id = sentence_id.split(".")
        temp_sentence_id_target = "solar{}t.{}.{}".format(*temp_sentence_id)

        # Create a sentence without any errors
        temp_sentence = ""
        for temp_temp_data in data_target[
            data_target["id"].str.contains(temp_sentence_id_target)
        ].itertuples():
            temp_sentence += temp_temp_data[2] + (" " if temp_temp_data[6] else "")

        # Add sentence triplet to data and reset temp data
        data.append([sentence_id, temp_sentence, ""])

    data = data[1:]

    df_columns = ["id", "sentence", "error"]
    df_rows = data

    df = pd.DataFrame(df_rows, columns=df_columns)
    return df


In [None]:
def save_solar_data_multiple_error():
    """
    Saves the solar corpus sentences and corresponding errors to a csv file.
    """
    data_multiple_error = get_solar_data_multiple_error()

    data_multiple_error.to_csv(
        SOLAR_DIRECTORY + "solar_data_multiple_error_raw.csv", index=False
    )
    solar_data_logger.info("Solar data with multiple errors saved to csv file")

    return


In [None]:
def get_solar_data_no_error(index, data_target, sentence, sentence_id, sentence_error):
    """
    Gets the solar target data, combines words into sentences without errors and
    returns the data frame of sentence ids, sentences and error types in sentence.

    @param index: index of the sentence in the data frame
    @param data_target: data frame of solar target data
    @param sentence: sentence text
    @param sentence_id: sentence id
    @param sentence_error: sentence error type
    @return: a triplet of solar sentences and corresponding error types
    """
    while index < len(data_target):
        temp_data = data_target.iloc[index]

        # Check if sentence is over
        if not sentence_id == generate_solar_id(temp_data["id"], SolarId.S):
            # Return a triplet of sentence id, sentence and error type
            return [sentence_id, sentence, sentence_error]

        sentence += temp_data["text"] + (" " if temp_data["space"] else "")
        index += 1

    return


In [None]:
def get_solar_data_single_error():
    """
    Gets the solar source and target data, combines words into sentences with
    errors and returns the data frame of sentence ids, sentences and error types
    in sentences. Generated sentences contain only one error per sentence.

    We deal with three types of errors: replacement errors, deletion errors
    and addition errors.

    @return: a data frame of solar sentences and corresponding error types
    """
    # Read the source and target token data
    data_source = pd.read_csv(
        SOLAR_DIRECTORY + "solar_source_token.csv", keep_default_na=False
    )
    data_target = pd.read_csv(
        SOLAR_DIRECTORY + "solar_target_token.csv", keep_default_na=False
    )

    # Read the links between source and target tokens and remove entries with ID
    data_link = pd.read_csv(SOLAR_DIRECTORY + "solar_link.csv", keep_default_na=False)
    data_link = data_link[data_link["type"] != "ID"]

    # Change solar links so that every id wil end with ;
    data_link["source"] = data_link["source"] + ";"
    data_link["target"] = data_link["target"] + ";"

    # Store single error sentence data
    data = []
    sentence = ""
    sentence_id = ""
    sentence_error = ""

    index = 0
    while index < len(data_target):
        # temp_data: [Index, id, text, lemma, msd_sl, msd_en, space]
        temp_data = data_target.iloc[index]

        if not sentence_id == generate_solar_id(temp_data["id"], SolarId.S):
            # Add a triplet of sentence id and sentence without errors
            data.append([sentence_id, sentence, sentence_error])
            sentence = ""
            sentence_id = generate_solar_id(temp_data["id"], SolarId.S)
            sentence_error = ""
            solar_data_logger.info("Processing sentence: " + sentence_id)

        # Get error type (replacement, deletion)
        temp_error = data_link.loc[
            data_link["target"].str.contains(temp_data["id"] + ";", na=False)
        ]

        # Check if there is an error (replacement, deletion)
        if len(temp_error):
            temp_error = temp_error.iloc[0]
            # If there is an error, create a new sentence with and without error
            if temp_error["source"] == ";" and len(temp_error["target"]):
                # Handle deletion errors
                solar_data_logger.info("Dealing with: deletion error type")

                # Skip corrupting words with source tokens
                temp_sentence = sentence

                # Find the index after error
                for temp_target_id in temp_error["target"][:-1].split(";"):
                    temp_target_word = data_target.loc[
                        data_target["id"] == temp_target_id
                    ]
                    if len(temp_target_word):
                        temp_target_word = temp_target_word.iloc[0]
                        index += 1
                        sentence += temp_target_word["text"] + (
                            " " if temp_target_word["space"] else ""
                        )

                # End sentence with only target tokens
                data.append(
                    get_solar_data_no_error(
                        index,
                        data_target,
                        temp_sentence,
                        sentence_id,
                        temp_error["type"],
                    )
                )

            elif len(temp_error["source"]) and len(temp_error["target"]):
                # Handle replacement errors
                solar_data_logger.info("Dealing with: replacement error type")

                # Add source tokens to the sentence
                temp_sentence = sentence
                for temp_source_id in temp_error["source"][:-1].split(";"):
                    temp_source_word = data_source.loc[
                        data_source["id"] == temp_source_id
                    ]
                    if len(temp_source_word):
                        temp_source_word = temp_source_word.iloc[0]
                        temp_sentence += temp_source_word["text"] + (
                            " " if temp_source_word["space"] else ""
                        )

                # Find the index after error
                for temp_target_id in temp_error["target"][:-1].split(";"):
                    temp_target_word = data_target.loc[
                        data_target["id"] == temp_target_id
                    ]
                    if len(temp_target_word):
                        temp_target_word = temp_target_word.iloc[0]
                        index += 1
                        sentence += temp_target_word["text"] + (
                            " " if temp_target_word["space"] else ""
                        )

                # End sentence with only target tokens
                data.append(
                    get_solar_data_no_error(
                        index,
                        data_target,
                        temp_sentence,
                        sentence_id,
                        temp_error["type"],
                    )
                )

        else:
            # Get error type (addition)
            temp_error = data_link.loc[
                data_link["source"].str.contains(
                    temp_data["id"].replace("t.", "s.") + ";", na=False
                )
            ]

            # Check if there is an error (addition)
            if len(temp_error):
                temp_error = temp_error.iloc[0]
                # Handle addition error type
                if temp_error["target"] == ";" and len(temp_error["source"]):
                    solar_data_logger.info("Dealing with: addition error type")

                    # Add source tokens to the sentence
                    temp_sentence = sentence
                    for temp_source_id in temp_error["source"][:-1].split(";"):
                        temp_source_word = data_source.loc[
                            data_source["id"] == temp_source_id
                        ]
                        if len(temp_source_word):
                            temp_source_word = temp_source_word.iloc[0]
                            temp_sentence += temp_source_word["text"] + (
                                " " if temp_source_word["space"] else ""
                            )

                    # Skip correct words with target tokens

                    # End sentence with only target tokens
                    data.append(
                        get_solar_data_no_error(
                            index,
                            data_target,
                            temp_sentence,
                            sentence_id,
                            temp_error["type"],
                        )
                    )

                # Handle sentence without error (append target token to sentence)
                sentence += temp_data["text"] + (" " if temp_data["space"] else "")
                index += 1

            else:
                # Handle sentence without error (source or target token does not represent error)
                sentence += temp_data["text"] + (" " if temp_data["space"] else "")
                index += 1

    # Append last sentence and remove first one (empty)
    data.append([sentence_id, sentence, sentence_error])
    data = data[1:]

    df_columns = ["id", "sentence", "error"]
    df_rows = data

    df = pd.DataFrame(df_rows, columns=df_columns)
    return df


In [None]:
def save_solar_data_single_error():
    """
    Saves the solar corpus sentences and corresponding errors to a csv file.
    """
    data_single_error = get_solar_data_single_error()

    data_single_error.to_csv(
        SOLAR_DIRECTORY + "solar_data_single_error_raw.csv", index=False
    )
    solar_data_logger.info("Solar data with single errors saved to csv file")

    return
