In [None]:
""" Script for correcting unintended text modifications during Named Entity Recognition (NER) 

This script addresses issues that may arise during the NER process 
according to the TEI standard, mostly unintended text duplications. 
An example of such a modification is:
>>> (...) Seminar der Univer<lb break="no" facs="#facs_290_r33"/>sität</orgName>sität</cell>
The script corrects this modification to:
>>> (...) Seminar der Univer<lb break="no" facs="#facs_290_r33"/>sität</orgName></cell>

The NER process checks the documents and detects errors. Documents with 
errors are saved in an /error folder. This script takes the NER-processed 
files with errors (hereafter referred to as the "edited file")  and extracts 
all TEI entities with some context. It then inserts the entities into the 
original file (before NER) using search and replace.

The script accepts XML files (.xml) as input.

Requirements:
- This script requires that `beautifulsoup4`, `lxml` and `xmlschema` are installed in the Python environment where you are running this script.

Installation of beautifulsoup4:
- To install beautifulsoup4, run the following command in your command line:
  ```bash
  pip install beautifulsoup4

Installation of lxml:
- To install lxml, run the following command in your command line:
  ```bash
  pip install lxml

Installation of xmlschema:
- To install xmlschema, run the following command in your command line:
  ```bash
  pip install xmlschema
"""

# pip install beautifulsoup4
# pip install lxml
# pip install xmlschema

import os
import re
import copy
import difflib
import xmlschema
import xml.etree.ElementTree as ET

import stats

from bs4 import BeautifulSoup
from lxml import etree

# Paths for the various directories
edited_dir = 'test_data/TEI-XML_NER/error/Amtsblatt/' # Directory containing NER-processed files with errors
original_dir = 'test_data/TEI-XML/Amtsblatt/' # Directory containing the original files
output_dir = 'test_data/postprocessed/' # Output directory for the merged files generated by this script
output_dir_diff_files = 'diffs' # Output directory for the diff files

# Should the postprocessed XML be validated with the TEI schema (takes a long time)?
XML_VALIDATION_ACTIV = False


def is_nested(entity):
    """ Check if the current entity is nested within another entity.

    Args:
        entity (BeautifulSoup Tag): The entity (e.g., <placeName>, <persName>, <orgName>).

    Returns:
        bool: True if the entity is nested within another entity, False otherwise.
    """
    
    parent = entity.find_parent()
    while parent:
        if parent.name in {'placeName', 'persName', 'orgName'} and parent != entity:
            return True
        parent = parent.find_parent()
    return False


def filter_nested_entities(entities):
    """ Remove all nested entities from the list that are already contained within a parent entity

    Args:
        entities (list of BeautifulSoup Tags): List of entity tags from the edited XML.

    Returns:
        list of BeautifulSoup Tags: List of non-nested entities.
    """
    
    non_nested_entities = []

    for entity in entities:
        if not is_nested(entity):
            non_nested_entities.append(entity)

    return non_nested_entities


def count_entities(xml_text):
    """ Count the number of TEI entity tags (placeName, persName, orgName) in the given XML text.
    
    Args:
        xml_text (str): The input text containing XML data.
    
    Returns:
        int: The total count of TEI entities.
    """
    
    total_count  = 0

    tag_patterns = {
        'placeName': r'</?placeName[^>]*>',
        'persName': r'</?persName[^>]*>',
        'orgName': r'</?orgName[^>]*>'
    }

    # Count the number of occurrences of each pattern in the text
    for pattern in tag_patterns.values():
        total_count  += len(re.findall(pattern, xml_text))

    # Divide by 2, as there is always a start and an end tag. Start and end tags are counted to detect if either is missing.
    return (total_count//2)
    

def remove_entity_tags_in_str(text):
    """ Remove all TEI entity tags (placeName, persName, orgName) from a string.

    Args:
        text (str): The input string that may contain TEI entity tags.

    Returns:
        str: The input string with all TEI entity tags removed.
    """
    
    tag_patterns = {
        'placeName': r'</?placeName[^>]*>',
        'persName': r'</?persName[^>]*>',
        'orgName': r'</?orgName[^>]*>'
    }
    
    # Remove the tags for each entity type
    for tag_name, pattern in tag_patterns.items():
        text = re.sub(pattern, '', text)
    
    return text


def get_text_for_lookbehind(entity, lookbehind_length, removeEntityTags = True):
    """ Extract up to 30 characters of text before the entity, used for regex lookbehind.

    Args:
        entity (BeautifulSoup Tag): The entity tag from which to extract the lookbehind text.
        removeEntityTags (bool, optional): Whether to remove nested entity tags from the lookbehind text. Defaults to True.
        lookbehind_length (int): The desired length of the returned lookbehind text

    Returns:
        str: The 30 characters (if removeEntityTags is false) or 20 characters (otherwise) before the entity in the parent element.
    """
    
    parent_element = entity.find_parent()
    
    if parent_element:
        parent_text = ''.join(str(content) for content in parent_element.contents)
        entity_str = str(entity)
        index_of_child = parent_text.find(entity_str)
        text_before_child_with_entities = parent_text[:index_of_child]

        if not removeEntityTags:
            # return 20 characters before entity tag as lookbehind text
            #print(entity, "text_before_child_with_entities:", text_before_child_with_entities[-lookbehind_length:])
            return text_before_child_with_entities[-lookbehind_length:]

        text_before_child_without_entities = remove_entity_tags_in_str(text_before_child_with_entities)

        # return 30 characters before entity tag as lookbehind text
        #print(entity, "text_before_child_without_entities:", text_before_child_without_entities[-lookbehind_length:])
        return text_before_child_without_entities[-lookbehind_length:]

    return ""
    

def prepare_search_text(entity):
    """ Prepare the search text by removing all entity tags from the entity. 
        This will make the text match the text in the original file (before NER).

    Args:
        entity (BeautifulSoup Tag): The entity whose text is being prepared.

    Returns:
        str: The cleaned entity text, without any nested tags.
    """
    
    # Remove all nested entities inside the current entity
    for inner_entity in entity.find_all(['placeName', 'persName', 'orgName']):
        inner_entity.unwrap()  # Removes the tag but retains the content

    # Also remove the parent entity to prepare the text for search
    search_text = ''.join(str(content) for content in entity.contents)
    
    return search_text

    
def insert_done_in_every_word(sentence):
    """ Insert the marker `---DONE---` into each word of the replacement string
    to prevent repeated matches during the search-and-replace process.

    Args:
        sentence (str): The sentence or entity text in which to insert the marker.

    Returns:
        str: The modified sentence with `---DONE---` inserted.
    """
    
    modified_words = []

    pattern = re.compile(r'(<[^>]*>| )')
    words = pattern.split(sentence)
    
    for word in words:
        modified_word = word[:len(word)//2] + "---DONE---" + word[len(word)//2:]
        modified_words.append(modified_word)
    
    modified_sentence = ''.join(modified_words)
    
    return modified_sentence


def validate_result(filename, original_xml, edited_xml, postprocessed_xml):
    """ Validates the postprocessing results by comparing the original, edited, and post-processed files,
    generating an HTML diff, and validating the XML syntax (optional).

    Args:
        filename (str): Name of the file being processed.
        original_xml (str): The original XML content as a string.
        edited_xml (str): The XML content after NER (named entity recognition) processing.
        postprocessed_xml (str): The final XML content after post-processing.
    """

    def sort_attributes_in_xml(xml):
        """ Recursively sort attributes of an XML element and its children.

        Args:
            element (xml.etree.ElementTree.Element): The XML element whose attributes need sorting.
        """
        root = ET.fromstring(xml)
    
        def sort_attributes(elem):
            elem.attrib = dict(sorted(elem.attrib.items()))
            
            for child in elem:
                sort_attributes(child)
    
        sort_attributes(root)

        return root


    def normalize_xml(xml_string):
        """ Normalize an XML string by sorting element attributes and removing excess whitespace/tab.

        Args:
            xml_string (str): The XML content as a string.

        Returns:
            str: The normalized XML string with sorted attributes and no unnecessary whitespaces/tabs.
        """

        xml_sorted = sort_attributes_in_xml(xml_string)
    
        # Remove unnecessary whitespaces, tabs and newlines
        normalized_string = ET.tostring(xml_sorted, encoding='utf-8').decode('utf-8')
        normalized_string = re.sub(r">\s+<", "><", normalized_string)
        normalized_string = re.sub(r"\s+", " ", normalized_string)
        normalized_string = re.sub(r"[\t\n]+", " ", normalized_string)
        
        return normalized_string

    
    
    def compare_xml_strings(xml_string1, xml_string2):
        """ Compare two XML strings after normalizing them. Updates the statistics log with 
        whether the content matches.

        Args:
            xml1 (str): First XML string to compare.
            xml2 (str): Second XML string to compare.
        """
        
        normalized_xml1 = normalize_xml(xml_string1)
        normalized_xml2 = normalize_xml(xml_string2)
    
        if normalized_xml1 == normalized_xml2:
            stats.write_to_statistics(filename,'Content integrity (Original <-> Postprocessed)', 'Yes')
        else:
            stats.write_to_statistics(filename,'Content integrity (Original <-> Postprocessed)', 'No')

    compare_xml_strings(original_xml, remove_entity_tags_in_str(postprocessed_xml))


    def show_differences(text1, text2, text1_description, text2_description):
        """ Display the differences between two XML strings and save the result in an HTML file.

        Args:
            text1 (str): First text to compare.
            text2 (str): Second text to compare.
            text1_description (str): Description of the first text, used in the diff output.
            text2_description (str): Description of the second text, used in the diff output.
        """

        def prepare_for_diff(xml_string):
            """
            Prepare an XML string for diff by sorting attributes and removing unnecessary whitespaces.

            Args:
                xml_string (str): The XML string to prepare for comparison.

            Returns:
                str: A cleaned and sorted XML string ready for diff comparison.
            """

            xml_sorted = sort_attributes_in_xml(xml_string)
            
            xml_string = ET.tostring(xml_sorted, encoding='utf-8', method='xml').decode('utf-8')

            # Remove tabs, leading whitespaces, and namespace prefixes for cleaner comparison
            xml_string = re.sub(r"[\t]+", " ", xml_string)  # Remove tabs
            xml_string = re.sub(r'^\s+', '', xml_string, flags=re.MULTILINE)  # Remove whitespaces at the beginning 
            xml_string = re.sub("ns0:", "", xml_string)
            
            return xml_string
        
        text1 = prepare_for_diff(text1)
        text2 = prepare_for_diff(text2)
        

        def save_html_diff(filename, text1, text2, text1_description, text2_description):
            """ Create and save an HTML file showing the diff between two texts.

            Args:
                filename (str): The name of the file being compared.
                text1 (str): First text to compare.
                text2 (str): Second text to compare.
                text1_description (str): Short description of the first text.
                text2_description (str): Short description of the second text.
            """
            
            differ = difflib.HtmlDiff()
            html_diff = differ.make_file(text1.splitlines(), text2.splitlines(), fromdesc=text1_description, todesc=text2_description)
            
            # Ensure the output directory exists, if not create it
            if not os.path.exists(output_dir_diff_files):
                os.makedirs(output_dir_diff_files)

            html_filename = f'diff_{os.path.splitext(filename)[0]}.html'

            diff_path = os.path.join(f'{output_dir_diff_files}/', html_filename)
          
            # Write the HTML diff file to disk
            with open(diff_path, 'w', encoding='utf-8') as f:
                f.write(html_diff)
            
            print(f"HTML-Diff saved in {diff_path}")


        save_html_diff(filename, text1, text2, text1_description, text2_description)

    show_differences(edited_xml, postprocessed_xml, "NER-processed", "Postprocessed")
        

    def validate_xml(filename, xml_file, xsd_file):
        """ Validate the XML against the TEI schema and log the result.

        Args:
            filename (str): The name of the file being validated.
            xml_file (str): The XML file content to validate.
            xsd_file (str): The XSD schema file to validate against.
        """
        
        print("XML Validation in progress...")
        schema = xmlschema.XMLSchema(xsd_file)
        
        try:
            is_valid = schema.is_valid(xml_file)
            if is_valid:
                stats.write_to_statistics(filename,'XML (TEI Schema) valid', "Yes")
            else:
                stats.write_to_statistics(filename,'XML (TEI Schema) valid', "No")
                print(f"{filename} is invalid.")
        except Exception as err:
            stats.write_to_statistics(filename,'XML (TEI Schema) valid', "No")
            print(f"{filename} is invalid. Error: ")
            print("Error: ", err)

    if XML_VALIDATION_ACTIV:
        validate_xml(filename, postprocessed_xml, 'tei_schemas/tei_all.xsd')


    # Log the number of entities before and after post-processing
    entities_before_postprocessing = count_entities(edited_xml)
    stats.write_to_statistics(filename,'Number of Entities Before Processing', entities_before_postprocessing)
    entities_after_postprocessing = count_entities(postprocessed_xml)
    stats.write_to_statistics(filename,'Number of Entities After Processing', entities_after_postprocessing)

    # Calculate and log any missing entities after processing
    missing_entities = entities_before_postprocessing - entities_after_postprocessing
    stats.write_to_statistics(filename,'Missing Entities', missing_entities)

    
def merge_entities(filename, original_xml, edited_xml):
    """ Merge the named entities from the edited XML into the original XML (only within <body>).

    Args:
        original_xml (str): The XML content of the original file.
        edited_xml (str): The XML content of the NER-processed file with errors.

    Returns:
        str: The original XML content with the corrected entities inserted.
    """
    
    # Parse the original and edited XML
    original_soup = BeautifulSoup(original_xml, 'xml')
    edited_soup = BeautifulSoup(edited_xml, 'xml')

    # Extract <body> content from both documents
    original_body = original_soup.find('body')
    edited_body = edited_soup.find('body')

    # Ensure <body> exists in both documents
    if original_body and edited_body:
        
        # Find all entities in the edited XML (places, people, organizations)
        entities = edited_body.find_all(['placeName', 'persName', 'orgName'])

        # Remove all entities that are already nested within another entity
        non_nested_entities = filter_nested_entities(entities)

        # List to store entities that couldn't be replaced in the round one
        unreplaced_entities = []
        
        original_body_str = str(original_body)

        # Round one using greater context (lookbehind) and ignoring entity tags
        def insert_entities_first_round(unreplaced_entities, original_body_str, lookbehind_length):
            for entity in non_nested_entities:
                
                searchText = prepare_search_text(copy.deepcopy(entity))
                text_for_lookbehind = get_text_for_lookbehind(entity, lookbehind_length)
                
                # Insert "---DONE---" in replacement text to prevent re-matching
                replaceText = insert_done_in_every_word(str(entity))
    
                # Create the regex pattern for contextual replacement
                context_pattern = (
                    r'(?<=' + re.escape(text_for_lookbehind) + r')\s*' + re.escape(searchText)
                )
                
                # Perform the replacement if the lookbehind is found
                original_body_str, count = re.subn(context_pattern, replaceText, original_body_str, count=1)
    
                if count == 0:
                    # If no replacements were made, add the entity to the unreplaced list
                    unreplaced_entities.append(entity)

        unreplaced_entities, original_body_str = insert_entities_first_round(non_nested_entities, original_body_str, lookbehind_length = 40)
        unreplaced_entities, original_body_str = insert_entities_first_round(non_nested_entities, original_body_str, lookbehind_length = 30)

        # Remove "---DONE---" markers
        original_body_str = original_body_str.replace("---DONE---", "")

        
        

        # Round two uses a shorter context and considers the presence of entity tags. 
        # A possible reason for the previous lookbehind not matching could be that an entity 
        # has already been inserted into the original file, causing the current lookbehind 
        # to fail when trying to match with the next entity's surrounding text.

        def insert_entities_second_round(unreplaced_entities, original_body_str, lookbehind_length):
            unreplaced_entities_after_round_two = []
            
            for entity in unreplaced_entities:
                
                searchText = prepare_search_text(copy.deepcopy(entity))
                text_for_lookbehind = get_text_for_lookbehind(entity, lookbehind_length, removeEntityTags = False)
                
                # Do not insert "---DONE---" in this round. The purpose of the second round is to find entities 
                # that were missed in the first round due to already inserted entity tags in the original document.
                replaceText = str(entity)
    
                context_pattern = (
                    r'(?<=' + re.escape(text_for_lookbehind) + r')\s*' + re.escape(searchText)
                )
    
                # Attempt the replacement
                original_body_str, count = re.subn(context_pattern, replaceText, original_body_str, count=1)
    
                if count == 0:
                    # If no replacements were made, add the entity to the unreplaced list
                    unreplaced_entities_after_round_two.append(entity)
                    
            return unreplaced_entities_after_round_two, original_body_str

        unreplaced_entities, original_body_str = insert_entities_second_round(unreplaced_entities, original_body_str, lookbehind_length = 30)
        unreplaced_entities, original_body_str = insert_entities_second_round(unreplaced_entities, original_body_str, lookbehind_length = 20)


        stats.write_to_statistics(filename,'Entities failed', unreplaced_entities)

        '''
        unreplaced_entities_after_round_three = []

        #print(original_body_str)

        # Last round: Try without context
        for entity in unreplaced_entities_after_round_two:
            
            searchText = prepare_search_text(copy.deepcopy(entity))
            
            # Do not insert "---DONE---" in this round. The purpose of the second round is to find entities 
            # that were missed in the first round due to already inserted entity tags in the original document.
            replaceText = str(entity)

            
            # Attempt the replacement
            original_body_str, count = re.subn(searchText, replaceText, original_body_str, count=1)

            if count == 0:
                # If no replacements were made, add the entity to the unreplaced list
                unreplaced_entities_after_round_three.append(entity)
                
        stats.write_to_statistics(filename,'Entities failed (3rd round)', unreplaced_entities_after_round_three)
        '''

        
        # Replace the old <body> with the modified version in the original document
        original_body.replace_with(BeautifulSoup(original_body_str, 'xml').body)

        

        
    return str(original_soup)


def main():
    """ Process XML documents by reading, merging entities, and saving the results.

    This function reads NER-processed XML files,
    merges the identified entities into the corresponding original files,
    and then saves the merged output into a new directory.
    """

    # Ensure the output directory exists, create it if necessary
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for filename in os.listdir(edited_dir):
        
        if filename.endswith('.xml'):

            # Read edited and original xml file
            edited_file_path = os.path.join(edited_dir, filename)
            original_file_path = os.path.join(original_dir, filename)
    
            with open(edited_file_path, 'r', encoding='utf-8') as file:
                edited_xml = file.read()
    
            if os.path.exists(original_file_path): 
                with open(original_file_path, 'r', encoding='utf-8') as file:
                    original_xml = file.read()
            else:
                print(f"Original file not found for {filename}")
                continue
    
            # Merge entities from the edited file into the original file
            result = merge_entities(filename, original_xml, edited_xml)           

            # Validate result
            validate_result(filename, original_xml, edited_xml, result)
    
            # Save the merged result to the output directory as an XML document
            output_file_path = os.path.join(output_dir, filename)
            with open(output_file_path, 'w', encoding='utf-8') as output_file:
                output_file.write(result)

            stats.save_statistics_to_csv()
            print(f"Processed and saved {filename} to {output_file_path}\n")

    print(f"All files processed")

# Let the magic happen
main()


In [None]:
import importlib  # Importiere importlib
import stats  # Importiere dein Modul

# Änderungen an my_module vornehmen...

# Jetzt das Modul neu laden
importlib.reload(stats)  # Lade das Modul neu

In [None]:
pip install xmlschema

In [None]:
import os
import xmlschema

with open("test_data/postprocessed/ABl_1980__S__1008-1016_.xml", 'r', encoding='utf-8') as file:
    xml_file = file.read()

def validate_xml(xml_file, xsd_file):
        # Lade das XSD-Schema
        print(xsd_file)

        schema_file = open(xsd_file)
        schema = xmlschema.XMLSchema(schema_file)

        # Validieren des XML-Dokuments
        try:
            is_valid = schema.is_valid(xml_file)
            if is_valid:
                print(f"File ist gültig.")
            else:
                print(f"File ist ungültig. Fehler:")
        except Exception as err:
            print(f"File ist ungültig. Fehler:")
            print("Error: ", err)
            
        
        
        
    
# Beispielaufruf der Funktion
validate_xml(xml_file, 'tei_all.xsd')
