# Mitteilungsblatt

NER analysis

Mount google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Load the OCRed text from the folder

In [None]:
from genericpath import isdir
import os

path = "/content/drive/MyDrive/OCR results/OCR4all results/1936-01-I/complete.txt"

if os.path.isdir(path):
  for file in os.listdir(path):
    with open(file, "r") as txt_file:
      lines = txt_file.readlines()

else:
  file = path
  with open(file, "r") as txt_file:
      text = txt_file.read()
    
print(text)

## Run spacy on the text

download spacy and the model from the network

In [None]:
!pip install --upgrade spacy
!python -m spacy download de_core_news_lg

import spacy

In [None]:
import spacy 
nlp = spacy.load('de_core_news_lg') 
print("Done")

In [None]:
doc = nlp(text)

entity_list = []

for ent in doc.ents:
   #print(ent.text, ent.start_char, ent.end_char, ent.label_)
   if ent.label_ == 'PER':
     entity_list.append((ent.text, ent.label_))

for entity in entity_list:
  print(entity)


In [None]:
from spacy import displacy

displacy.render(doc, style='ent', jupyter=True, options={'distance': 90})


## Adding evaluation pipeline

In [None]:
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import re

### Getting person spans from spacy model

In [None]:
path = "/content/drive/MyDrive/OCR results/Samples4Markup/Sample1_for_Markup_issue_1936_01"
with open(path, 'r') as openfile:
    text = openfile.read()

In [None]:
def get_spans(ner_model, some_text):
    parsed_text = ner_model(some_text)
    spans = []
    for entity in parsed_text.ents:
        entity_type = entity.label_
        entity_string = entity.text
        if entity_type == 'PER':
            spans.append((entity.start_char, entity.end_char))
    return spans

In [None]:
spacy_spans = get_spans(nlp, text)

In [None]:
spacy_spans[:10]

In [None]:
def get_binary_prediction(spans, textlen):
    spacy_binary_prediction = pd.Series(np.zeros(textlen))
    for span in spans:
        spacy_binary_prediction.loc[span[0]:span[1]] = 1
    return spacy_binary_prediction

In [None]:
spacy_binary_prediction = get_binary_prediction(spacy_spans, len(text))

In [None]:
spacy_binary_prediction [80:110]

### Getting person spans from manual markup

In [None]:
manual_path = "/content/drive/MyDrive/OCR results/Samples4Markup/ManualMarkup/Sample1_for_Markup_issue_1936_01_Default_Annotations.xml"

In [None]:
def get_annotation_spans_from_xml(annotation_file_path):
    with open(annotation_file_path) as openfile:
        soup = BeautifulSoup(openfile, 'xml')
    annotations = soup.find_all('seg')
    annotation_pointers = [seg.find('ptr') for seg in annotations]
    annotation_borders = [get_borders(ptr) for ptr in annotation_pointers]
    manual_target = pd.Series(np.zeros(get_file_len_from_xml(soup)))
    for span in annotation_borders:
        manual_target.loc[span[0]:span[1]] = 1
    return manual_target

In [None]:
def get_borders(ptr):
    target = ptr['target']
    border_range = re.search(r'(.+)char=(.+)', target).group(2)
    border_range = border_range.split(',')
    start = int(border_range[0])
    end = int(border_range[1])
    return start, end

In [None]:
def get_file_len_from_xml(soup):
    filelen = 0
    all_pointers = soup.find_all('ptr')
    for pointer in all_pointers:
        start, end = get_borders(pointer)
        if end > filelen:
            filelen = end
    return filelen

In [None]:
target = get_annotation_spans_from_xml(manual_path)

In [None]:
target[90:110]

In [None]:
target.value_counts()

## Measuring

In [None]:
precision_score(target, spacy_binary_prediction)

In [None]:
recall_score(target, spacy_binary_prediction)

In [None]:
f1_score(target, spacy_binary_prediction)

## Improving spacy with some ad hoc rules

In [None]:
#some cludgy spaghetti ad hoc filter 

def is_fake_person(some_string):
    lower_string = some_string.lower()
    if lower_string.endswith('str'):
        return True
    elif lower_string.endswith('boulevard'): 
        return True
    elif lower_string.endswith('ave.'): 
        return True
    elif lower_string.endswith('bvd.'): 
        return True
    elif lower_string.endswith('gen'): 
        return True
    elif lower_string.endswith('ung'): 
        return True
    #elif some_string.endswith('P. O. B'): 
    #    return True 
    elif 'P.O.B.' in some_string or 'P. O. B' in some_string:
        return True
    elif some_string.endswith('H. O. G.'): 
        return True 
    elif re.search('[למנסעפצקרשתםןףץאבגדהוזחטיכך]', lower_string) is not None:
        return True
    elif re.search('[A-Z]', some_string) is None:
        return True
    elif ' ' not in some_string:
        return True
    elif 'strasse' in lower_string:
        return True
    return False

In [None]:
def get_spans_with_filter(ner_model, some_text):
    parsed_text = ner_model(some_text)
    spans = []
    for entity in parsed_text.ents:
        entity_type = entity.label_
        entity_string = entity.text
        if entity_type == 'PER' and not is_fake_person(entity_string):
            spans.append((entity.start_char, entity.end_char))
    return spans

In [None]:
spacy_spans = get_spans_with_filter(nlp, text)

In [None]:
spacy_binary_prediction = get_binary_prediction(spacy_spans, len(text))

## Measuring the improvement

In [None]:
precision_score(target, spacy_binary_prediction)

In [None]:
recall_score(target, spacy_binary_prediction)

In [None]:
f1_score(target, spacy_binary_prediction)

### Getting person spans from Flair model

In [None]:
!pip3 install flair

In [None]:
from flair.data import Sentence
from flair.models import SequenceTagger

In [None]:
tagger = SequenceTagger.load("flair/ner-german-large")

In [None]:
def get_spans_flair(flair_model, some_text):
    # load tagger
    predicted_spans = []

    # make example sentence
    sentence = Sentence(some_text)

    # predict NER tags
    flair_model.predict(sentence)

    # print sentence
    #print(sentence)

    # print predicted NER spans
    #print('The following NER tags are found:')
    # iterate over entities and print
    for entity in sentence.get_spans('ner'):
        entity_type = entity.tag
        entity_string = entity.text
        if entity_type == 'PER':
            #print(entity_string)
            predicted_spans.append((entity.start_position, entity.end_position))
    return predicted_spans

In [None]:
def calculate_span(start, end, shift):
    return start+shift, end+shift

In [None]:
def get_spans_flair_smarter(flair_model, some_text):
    all_spans = []

    split_text = some_text.split('\n')

    current_shift = 0
    for line in split_text:
        predicted_spans = []
        sentence = Sentence(line)
        # predict NER tags
        flair_model.predict(sentence)

        # iterate over entities
        for entity in sentence.get_spans('ner'):
            entity_type = entity.tag
            entity_string = entity.text
            if entity_type == 'PER':
                span = calculate_span(entity.start_position, entity.end_position, current_shift)
                predicted_spans.append(span)
        all_spans.extend(predicted_spans)

        current_shift+=len(line)+1
    return all_spans

In [None]:
def get_binary_prediction_flair(input_text):
    filelength = len(input_text)
    #lines = openfile.readlines()
    #spans = []
    #for line in tqdm(lines):
        #spans.extend(get_spans_flair(tagger, readfile))
    spans = get_spans_flair_smarter(tagger, input_text)
    predicted = pd.Series(np.zeros(filelength))
    for span in spans:
        predicted.loc[span[0]:span[1]] = 1
    return predicted

In [None]:
get_binary_prediction_flair("""door
Ludwig van Bethoven
door
Leo Tolstoy
door
Max Planck""")

In [None]:
predicted_flair = get_binary_prediction_flair(text)

In [None]:
len(predicted_flair)

## Measuring flair performance

In [None]:
precision_score(target, predicted_flair)

In [None]:
recall_score(target, predicted_flair)

In [None]:
f1_score(target, predicted_flair)

### Generic performance measurer

In [None]:
def evaluate(target, predicted):
    print(precision_score(target, predicted))
    print(recall_score(target, predicted))
    print(f1_score(target, predicted))

In [None]:
def evaluate_on_several_files(predictor, folder_with_txt, folder_with_manual_xml):
    all_predictions = pd.Series()
    all_targets = pd.Series()
    for filename in os.listdir(folder_with_txt):
        file_id = filename.replace('.txt', '')
        filepath = os.path.join(folder_with_txt, filename)
        with open(filepath) as readfile:
            filetext = readfile.text()
        predictions = predictor(filetext)
        targets = get_targets_for_repsective_file(folder_with_manual_xml, file_id)
        all_predictions = pd.concat((all_predictions, predictions))
        all_target = pd.concat((all_target, targets))
    evaluate(all_targets, all_predictions)

In [None]:
def get_targets_for_repsective_file(folder_with_manual_xml, file_id):
    for filename in os.listdir(folder_with_manual_xml):
        if filename.startswith(file_id):
            path_to_xml = os.path.join(folder_with_manual_xml, filename)
            get_annotation_spans_from_xml(path_to_xml)

In [None]:
folder_with_txt = 

In [None]:
!ls

In [None]:
evaluate_on_several_files(get_binary_prediction_flair, )