In [8]:
import os
import json
import pandas as pd
import Levenstein
import re
import edlib
import codecs
import itertools
from bs4 import BeautifulSoup
from extract_text import *

## Get ICDAR data (aligned)

In [1]:
! wget --no-check-certificate -O "ICDAR2019-POCR-ground-truth.zip" "https://zenodo.org/records/3515403/files/ICDAR2019-POCR-ground-truth.zip?download=1"

--2025-02-05 13:02:43--  https://zenodo.org/records/3515403/files/ICDAR2019-POCR-ground-truth.zip?download=1
Resolving zenodo.org (zenodo.org)... 188.185.43.25, 188.185.48.194, 188.185.45.92, ...
Connecting to zenodo.org (zenodo.org)|188.185.43.25|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 54341904 (52M) [application/octet-stream]
Saving to: ‘ICDAR2019-POCR-ground-truth.zip’


2025-02-05 13:02:45 (26,8 MB/s) - ‘ICDAR2019-POCR-ground-truth.zip’ saved [54341904/54341904]



In [2]:
! unzip -o "ICDAR2019-POCR-ground-truth.zip"

Archive:  ICDAR2019-POCR-ground-truth.zip
   creating: evaluation_4M_without_Finnish/
   creating: evaluation_4M_without_Finnish/BG/
   creating: evaluation_4M_without_Finnish/BG/BG1/
  inflating: evaluation_4M_without_Finnish/BG/BG1/1.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/0.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/2.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/3.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/4.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/5.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/6.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/7.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/8.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/9.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/10.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/11.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/12.txt  
  inflating: evaluation_4M_without_Finnish/BG/BG1/13.txt  


In [3]:

#re-used pre-processing functions

def remove_ext(fname):
    """Removes the extension from a filename
    """
    bn = os.path.basename(fname)
    return os.path.splitext(bn)[0]


def create_dirs(fname, is_file=False):
    """Create (output) directories if they don't exist

    Removes the file name part if is_file is set to True.
    """
    fname = os.path.abspath(fname)
    if is_file:
        fname = os.path.dirname(fname)

    if not os.path.exists(fname):
        os.makedirs(fname)


def out_file_name(out_dir, fname, ext=None):
    """Return path of output file, given a directory, file name and extension.

    If fname is a path, it is converted to its basename.

    Args:
        out_dir (str): path to the directory where output should be written.
        fname (str): path to the input file.
        ext (str): file extension of the output file (defaults to None).

    Returns:
        str: out_dir + fname with extension replaced. If `ext` is `None`, the
            original extension is kept.
    """
    if ext is None:
        return os.path.join(out_dir, os.path.basename(fname))

    fname = remove_ext(fname)
    return os.path.join(out_dir, '{}.{}'.format(fname, ext))


def cwl_file(fname):
    return {'class': 'File', 'path': fname}


def get_files(directory, recursive=False):
    """Return a list of all files in the directory."""
    files_out = []
    if recursive:
        for root, dirs, files in os.walk(os.path.abspath(directory)):
            files = [os.path.join(root, f) for f in files]
            files_out.append(files)
        files_out = list(itertools.chain(*files_out))
    else:
        files_out = [os.path.join(directory, f) for f in os.listdir(directory)]
        files_out = list(filter(lambda f: os.path.isfile(f), files_out))

    # order alphabetically on file name
    return sorted(files_out)


def split(s):
    return s.split()


def read_xml(fname):
    with codecs.open(fname, encoding='utf-8') as f:
        xml = f.read()
    return BeautifulSoup(xml, 'xml')


def write_xml(soup, fname):
    with codecs.open(fname, 'wb', encoding='utf-8') as f:
        
        f.write(str(soup))

In [9]:

in_dir = './evaluation_4M_without_Finnish/DE/DE2'
out_dir = 'DE_2'

def command(in_file, out_dir):
    create_dirs(out_dir)

    lines = in_file.readlines()
    # OCR_toInput: lines[0][:14]
    # OCR_aligned: lines[1][:14]
    # GS_aligned: lines[2][:14]
    ocr = to_character_list(lines[1][14:].strip())
    gs = to_character_list(lines[2][14:].strip())

    # Write texts
    out_file = out_file_name(os.path.join(out_dir, 'ocr'), os.path.basename(in_file.name))
    print(out_file)
    create_dirs(out_file, is_file=True)
    with codecs.open(out_file, 'wb', encoding='utf-8') as f:
        f.write(u''.join(ocr))

    out_file = out_file_name(os.path.join(out_dir, 'gs'), os.path.basename(in_file.name))
    print(out_file)
    create_dirs(out_file, is_file=True)
    with codecs.open(out_file, 'wb', encoding='utf-8') as f:
        f.write(u''.join(gs))

    out_file = out_file_name(os.path.join(out_dir, 'aligned'), os.path.basename(in_file.name), 'json')
    print(out_file)
    create_dirs(out_file, is_file=True)
    with codecs.open(out_file, 'wb', encoding='utf-8') as f:
        json.dump({'ocr': ocr, 'gs': gs}, f)
        
    
in_files = get_files(in_dir)
for in_file in in_files:
    print(in_file)
    
    command(open(in_file), out_dir)

./evaluation_4M_without_Finnish/DE/DE2/0.txt
DE_2/ocr/0.txt
DE_2/gs/0.txt
DE_2/aligned/0.json
./evaluation_4M_without_Finnish/DE/DE2/1.txt
DE_2/ocr/1.txt
DE_2/gs/1.txt
DE_2/aligned/1.json
./evaluation_4M_without_Finnish/DE/DE2/10.txt
DE_2/ocr/10.txt
DE_2/gs/10.txt
DE_2/aligned/10.json
./evaluation_4M_without_Finnish/DE/DE2/11.txt
DE_2/ocr/11.txt
DE_2/gs/11.txt
DE_2/aligned/11.json
./evaluation_4M_without_Finnish/DE/DE2/12.txt
DE_2/ocr/12.txt
DE_2/gs/12.txt
DE_2/aligned/12.json
./evaluation_4M_without_Finnish/DE/DE2/13.txt
DE_2/ocr/13.txt
DE_2/gs/13.txt
DE_2/aligned/13.json
./evaluation_4M_without_Finnish/DE/DE2/14.txt
DE_2/ocr/14.txt
DE_2/gs/14.txt
DE_2/aligned/14.json
./evaluation_4M_without_Finnish/DE/DE2/15.txt
DE_2/ocr/15.txt
DE_2/gs/15.txt
DE_2/aligned/15.json
./evaluation_4M_without_Finnish/DE/DE2/16.txt
DE_2/ocr/16.txt
DE_2/gs/16.txt
DE_2/aligned/16.json
./evaluation_4M_without_Finnish/DE/DE2/17.txt
DE_2/ocr/17.txt
DE_2/gs/17.txt
DE_2/aligned/17.json
./evaluation_4M_without_Finn

## Our Annotated Sample

In [None]:
data_dir='annots_csv'
annots=[]
for n, file in enumerate(os.listdir(data_dir),1):

    if file.endswith('csv'):
        with open(os.path.join(data_dir, file), 'r') as f:
            print(file)
            annots+=[pd.read_csv(f, delimiter=';')]

In [None]:
data_actd = pd.concat([annots[1][annots[1].loc[:,"edited-reading_order"].isna() != True],
annots[0][annots[0].loc[:,"edited-reading_order"].isna() != True]],axis=0)


In [None]:
data_actd=data_actd.drop(columns=['Diabetiker_journal_blocks_OCR', 'complexity_type', 'Unnamed: 0'])
data_actd_grouped = data_actd.groupby(['year','nr','page'])[['block_text', 'edited-reading_order']].agg(list).reset_index()


In [None]:
data_actd_grouped['block_text']=data_actd_grouped.block_text.apply(lambda t: '\n'.join(t))
data_actd_grouped['edited-reading_order']=data_actd_grouped['edited-reading_order'].apply(lambda t: '\n'.join(t))

### Add Old Annotations

In [None]:
data_dir = "corrected_ocr_old"
dfs=[]
for n, file in enumerate(os.listdir(data_dir),1):

    if file.endswith('noro.txt'):
        
        base_ocr_file = ''.join(file.split('_noro'))
        with open(os.path.join(data_dir, file), 'r') as f:
            gs_ocr=f.read()
        with open(os.path.join(data_dir, base_ocr_file), 'r') as f:
            base_ocr=f.read()
        
        df=pd.DataFrame([(base_ocr, gs_ocr)], columns = ['ocr','gs'])
        df['source']=file.split('_noro')[0]
        dfs+=[df]

df_add = pd.concat(dfs,axis=0).query('ocr != ""')

## Align GT and OCR (edlib)

In [None]:
os.makedirs("DJ_aligned", exist_ok=True)

def align_pages(df, out_file):
    query=df['ocr']
    ref=df['gt']
    # query=df.block_text
    # ref=df['edited-reading_order']
    a = edlib.align(query, ref, task="path")
    # code from
    ref_pos = a["locations"][0][0]
    query_pos = 0
    ref_aln = []
    match_aln = ""
    query_aln = []

    for step, code in re.findall(r"(\d+)(\D)", a["cigar"]):
        step = int(step)
        if code == "=":
            for c in ref[ref_pos: ref_pos + step]:
                ref_aln.append(c)
            #ref_aln += ref[ref_pos : ref_pos + step]
            ref_pos += step
            for c in query[query_pos: query_pos + step]:
                query_aln.append(c)
            #query_aln += query[query_pos : query_pos + step]
            query_pos += step
            match_aln += "|" * step
        elif code == "X":
            for c in ref[ref_pos: ref_pos + step]:
                ref_aln.append(c)
            #ref_aln += ref[ref_pos : ref_pos + step]
            ref_pos += step
            for c in query[query_pos: query_pos + step]:
                query_aln.append(c)
            #query_aln += query[query_pos : query_pos + step]
            query_pos += step
            match_aln += "." * step
        elif code == "D":
            for c in ref[ref_pos: ref_pos + step]:
                ref_aln.append(c)
            #ref_aln += ref[ref_pos : ref_pos + step]
            ref_pos += step
            #query_aln += " " * step
            query_pos += 0
            for i in range(step):
                query_aln.append('')
            match_aln += " " * step
        elif code == "I":
            for i in range(step):
                ref_aln.append('')
            #ref_aln += " " * step
            ref_pos += 0
            for c in query[query_pos: query_pos + step]:
                query_aln.append(c)
            #query_aln += query[query_pos : query_pos + step]
            query_pos += step
            match_aln += " " * step
        else:
            pass
    
    with codecs.open(out_file, 'wb', encoding='utf-8') as f:
        try:
            json.dump({'ocr': query_aln, 'gs': ref_aln}, f, encoding='utf-8')
        except TypeError:
            json.dump({'ocr': query_aln, 'gs': ref_aln}, f)


out_file='DJ_aligned/'+df.source+'.json'
data_actd_grouped.apply(lambda df: align_pages(df, out_file),axis=1)
out_file='DJ_aligned/'+str(int(df.year))+'__'+df.nr+'__'+str(int(df.page))+'.json'
df_add.apply(lambda df: align_pages(df, out_file),axis=1)



## Process Aligned Dataset

In [None]:
def get_split_indices(chars, max_length=1000, delimiters={'.', ';'}):
    split_indices = []
    temp_sublist = []
    
    i = 0
    while i < len(chars):
        temp_sublist.append(chars[i])
        
        if len(temp_sublist) >= max_length:
            # Check if a split point exists in the nearby range
            found_split_point = False
            for j in range(i, min(i + 100, len(chars))):
                if chars[j] in delimiters:
                    # Record the index of the delimiter for splitting
                    split_indices.append(j)
                    temp_sublist = []  # Reset for the next sublist
                    i = j  # Continue from after the delimiter
                    found_split_point = True
                    break
            
            # If no split point was found, add a split at max_length
            if not found_split_point:
                split_indices.append(i)
                temp_sublist = []

        i += 1

    return split_indices

In [None]:
data_dirs = ["DE_1/aligned", "DE_2/aligned", "DJ_aligned"]
sources = ["Neue_Zürcher_Zeitung(NZZ)", "German National Library", "Diabetiker Journal"]
dfs=[]
# data_dir = "DE_1/aligned"
# data_dir  = "DE_2/aligned"
# data_dir  = "DJ_aligned"
for n, data_dir, source in enumerate(zip(data_dirs, sources)):

   
    for file in os.listdir(data_dir):
        f_id = file.split('.')[0]
        if file.endswith('.json'):
            with open(os.path.join(data_dir, file), encoding='utf-8') as f:
                d = json.load(f)
                
                d['gs']=["¬" if element == "⸗" else element for element in d['gs']]
                
                # identify gs indices to split on
                max_length=150
                indices=get_split_indices(d['gs'], max_length=max_length, delimiters={'\n'})#'.', ';',
                spans_gs = [''.join(d['gs'][i+1:j]) for i,j in zip(indices, indices[1:])]
                spans_ocr = [''.join(d['ocr'][i+1:j]) for i,j in zip(indices, indices[1:])]
                # spans_gs = [''.join(d['gs'])]
                # spans_ocr = [''.join(d['ocr'])]

                dfs+=[pd.DataFrame(zip([source]*len(spans_ocr), [f_id]*len(spans_ocr), spans_ocr, spans_gs), columns=['source','file_id','ocr','gs'])]

                # dfs+=[pd.DataFrame(zip(["Neue_Zürcher_Zeitung(NZZ)"]*len(spans_ocr), [f_id]*len(spans_ocr), spans_ocr, spans_gs), columns=['source','file_id','ocr','gs'])]
                # dfs+=[pd.DataFrame(zip(["Diabetiker_journal"]*len(spans_ocr), [f_id]*len(spans_ocr), spans_ocr, spans_gs), columns=['source','file_id','ocr','gs'])]
                # dfs+=[pd.DataFrame(zip(["German_National_Library"]*len(spans_ocr), [f_id]*len(spans_ocr), spans_ocr, spans_gs), columns=['source','file_id','ocr','gs'])]
            

In [None]:
# data_GNL = pd.concat(dfs, axis=0)
# data_ICDAR = pd.concat(dfs, axis=0)
# data_DJ = pd.concat(dfs, axis=0).reset_index(drop=True)

## Merge the Dataframes

In [None]:
# dataset = pd.concat([data_ICDAR, data_DJ, data_GNL], axis=0)
dataset = pd.concat(dfs, axis=0)
print(f'max len of gs string: {dataset.gs.map(len).max()}')
dataset.columns=['Publication', 'file_id', 'OCR Text', 'Ground Truth','CER']

## Calculate CER

In [None]:

# Compute character error rate (CER)
def cer(prediction, target):
    distance = Levenshtein.distance(prediction, target)
    return distance / len(target)

dataset['CER'] = dataset.apply(lambda row: cer(row['Ground Truth'], row['OCR Text']), axis=1)

## Train-test Split

In [None]:
def assign_CER_interval(df):

    if df.CER >= 0.1:
        interval = '>=0.1'
    elif df.CER < 0.1 and df.CER != 0:
        interval = '<0.1&!=0'
    elif df.CER == 0:
        interval = '0'

    return interval


dataset['CER_interval'] = dataset.apply(assign_CER_interval,axis=1)



In [None]:
from sklearn.model_selection import train_test_split

train, valid = train_test_split(
    dataset.query("Publication in ['Diabetiker_journal']"),
    test_size=0.1,
    shuffle=True,
    stratify=dataset.query("Publication in ['Diabetiker_journal']")['CER_interval'],
    random_state=42
)