In [None]:
import os
import zipfile

with zipfile.ZipFile('/content/drive/MyDrive/Datasets/SOURCE4/cvl-database-1-1.zip' ,'r') as cvl_f:
  cvl_f.extractall('/content/')

In [None]:
import json

with open('/content/drive/MyDrive/Datasets/SOURCE4/test_annotations.json', 'r') as f:
  test_annotations = json.load(f)

with open('/content/drive/MyDrive/Datasets/SOURCE4/train_annotations.json', 'r') as f:
  train_annotations = json.load(f)

with open('/content/drive/MyDrive/Datasets/SOURCE4/validation_annotations.json', 'r') as f:
  validation_annotations = json.load(f)

In [None]:
WORDS_TRAIN_DIR = '/content/cvl-database-1-1/trainset/words'
WORDS_TRAIN_SUBDIRS = [d for d in os.listdir(WORDS_TRAIN_DIR) if os.path.isdir(os.path.join(WORDS_TRAIN_DIR, d))]

WORDS_SUBDIRS_FILES = {}
for d in WORDS_TRAIN_SUBDIRS:
  WORDS_SUBDIRS_FILES[d] = [f for f in os.listdir(os.path.join(WORDS_TRAIN_DIR, d)) if f[-4:] == ".tif"]

In [None]:
import re

def extract_w(filename):
    pattern = r'\d+-\d+-\d+-\d+-(.+)\.tif'

    match = re.search(pattern, filename)

    if match:
        words = match.group(1)
        return words
    else:
        return None


In [None]:
WORDS_FROM_SUBDIRS = {}
for d in [*WORDS_SUBDIRS_FILES]:
  WORDS_FROM_SUBDIRS[d] = [extract_w(f) for f in WORDS_SUBDIRS_FILES[d]]

In [None]:
import pandas as pd

freq_df = pd.read_csv('/content/drive/MyDrive/Datasets/etc/unigram_freq.csv')
freq_df = freq_df.sort_values(by=['count'], axis=0, ascending=False)
freq_words = freq_df['word'].to_numpy()
freq_df.head()

In [None]:
!pip install english-words lxml

In [None]:
from english_words import get_english_words_set
import numpy as np

words_list = list(get_english_words_set(['web2']))

SOURCE2_WORDS_PATH = '/content/drive/MyDrive/Datasets/SOURCE2/words_new.txt'
words_txt = []
with open(SOURCE2_WORDS_PATH, 'r') as w_f:
    lines = w_f.readlines()
    for line in lines:
        if line[0] == "#":
            continue
        spl = line.strip().split(" ")
        words_txt.append(spl[-1])

unique_source2_words = np.unique(np.array(words_txt))
freq_without_source2 = [w for w in freq_words if w not in unique_source2_words]
freq_source2_intersection = [w for w in freq_words if w in unique_source2_words]


In [None]:
model_vocab_size = 50250

freq_new_size = model_vocab_size - len(unique_source2_words)
cropped_freq = freq_without_source2[:freq_new_size]

unique_source2_words, source2_counts = np.unique(words_txt, return_counts=True)
sorted_pairs = sorted(zip(unique_source2_words, source2_counts), key=lambda x: x[1], reverse=True)
sorted_unique, sorted_counts = zip(*sorted_pairs)

sorted_unique = list(sorted_unique)
sorted_counts = list(sorted_counts)

total_counts = sum(sorted_counts)
probabilities = [count / total_counts for count in sorted_counts]

In [None]:
WORDS_IN_DICT = {}
WORDS_NOT_IN_DICT = {}

for d in [*WORDS_FROM_SUBDIRS]:
  WORDS_IN_DICT[d] = [w for w in WORDS_FROM_SUBDIRS[d] if w in cropped_freq or w in sorted_unique]
  WORDS_NOT_IN_DICT[d] = [w for w in WORDS_FROM_SUBDIRS[d] if w not in cropped_freq and w not in sorted_unique]

In [None]:
import math

VALIDATION_DIRS_TRAINSET = [id.split("-")[0] for id in [*validation_annotations]]
VAL_WORDS_TRAINSET = {}

for dir in VALIDATION_DIRS_TRAINSET:
  if dir in [*WORDS_IN_DICT]:
    VAL_WORDS_TRAINSET[dir] = WORDS_IN_DICT[dir]
    del WORDS_IN_DICT[dir]
  else:
    print(f"Directory {dir} not in WORDS_IN_DICT")

VALIDATION_DIRS_TRAINSET = [*VAL_WORDS_TRAINSET]


In [None]:
import random
import time
import shutil
from PIL import Image

out_train_dir = '/content/SOURCE3/train'
out_validation_dir = '/content/SOURCE3/validation'
out_test_dir = '/content/SOURCE3/test'

train_csv = '/content/SOURCE3/train.csv'
validation_csv = '/content/SOURCE3/validation.csv'
test_csv = '/content/SOURCE3/test.csv'

os.makedirs(out_train_dir, exist_ok=True)
os.makedirs(out_validation_dir, exist_ok=True)
os.makedirs(out_test_dir, exist_ok=True)

def generate_filename(frmt='jpg'):
    return str(time.time()) + '_' + str(random.randint(100, 999)) + '.' + frmt

In [None]:
WORDS_TEST_DIR = '/content/cvl-database-1-1/testset/words'
WORDS_TESR_SUBDIRS = [d for d in os.listdir(WORDS_TEST_DIR) if os.path.isdir(os.path.join(WORDS_TEST_DIR, d))]

TEST_SUBDIRS_FILES = {}
for d in WORDS_TESR_SUBDIRS:
  TEST_SUBDIRS_FILES[d] = [f for f in os.listdir(os.path.join(WORDS_TEST_DIR, d)) if f[-4:] == ".tif"]

In [None]:
WORDS_FROM_TEST_SUBDIRS = {}
for d in [*TEST_SUBDIRS_FILES]:
  WORDS_FROM_TEST_SUBDIRS[d] = [extract_w(f) for f in TEST_SUBDIRS_FILES[d]]

In [None]:
TEST_WORDS_IN_DICT = {}

for d in [*WORDS_FROM_TEST_SUBDIRS]:
  TEST_WORDS_IN_DICT[d] = [w for w in WORDS_FROM_TEST_SUBDIRS[d] if w in cropped_freq or w in sorted_unique]


In [None]:
import random

test_dirnames = []
train_dirnames = []
validation_dirnames =[]

cur_test_len = 0
achieved_target = False

for d in [*TEST_WORDS_IN_DICT]:
  if d in [id.split("-")[0] for id in [*test_annotations]]:
    test_dirnames.append(d)
  elif d in [id.split("-")[0] for id in [*train_annotations]]:
    train_dirnames.append(d)
  elif d in [id.split("-")[0] for id in [*validation_annotations]]:
    validation_dirnames.append(d)

print(len([*TEST_WORDS_IN_DICT]), len(test_dirnames) +
      len(train_dirnames) + len(validation_dirnames))

In [None]:
filenames_list = []
transcriptions_list = []
files_writers_ids = []
initial_filenames_list = []


def process_images(words_dir, dir_names, words_from_subdirs, subdirs_files,
                   out_dir, cropped_freq, sorted_unique):
    for d in dir_names:
        for idx in range(len(words_from_subdirs[d])):
            word = words_from_subdirs[d][idx]
            if word in cropped_freq or word in sorted_unique:
              src_path = os.path.join(words_dir, d, subdirs_files[d][idx])

              with Image.open(src_path) as img:
                  out_filename = generate_filename()
                  out_path = os.path.join(out_dir, out_filename)
                  while os.path.exists(out_path):
                      out_path = os.path.join(out_dir, generate_filename())

                  img.convert('RGB').save(out_path, 'JPEG')

                  filenames_list.append(out_filename)
                  transcriptions_list.append(word)
                  files_writers_ids.append(d)
                  initial_filenames_list.append(subdirs_files[d][idx])


def save_to_csv_and_archive(filenames_list, transcriptions_list, files_writers_ids,
                            initial_filenames_list, csv_path, archive_path, out_dir):
    df = pd.DataFrame({
        'filename': filenames_list,
        'transcription': transcriptions_list,
        'writer_id': files_writers_ids,
        'initial_filename': initial_filenames_list
    })

    df.to_csv(csv_path, index=False)

    shutil.make_archive(archive_path, "zip", out_dir)

In [None]:
process_images(
    WORDS_TEST_DIR, test_dirnames, WORDS_FROM_TEST_SUBDIRS, TEST_SUBDIRS_FILES, out_test_dir, cropped_freq, sorted_unique
)

save_to_csv_and_archive(
    filenames_list, transcriptions_list, files_writers_ids,
    initial_filenames_list, test_csv, "/content/SOURCE3/test", out_test_dir)

filenames_list = []
transcriptions_list = []
files_writers_ids = []
initial_filenames_list = []


process_images(
    WORDS_TRAIN_DIR, [*WORDS_IN_DICT], WORDS_FROM_SUBDIRS,
    WORDS_SUBDIRS_FILES, out_train_dir, cropped_freq, sorted_unique
)

process_images(
    WORDS_TEST_DIR, train_dirnames, WORDS_FROM_TEST_SUBDIRS,
    TEST_SUBDIRS_FILES, out_train_dir, cropped_freq, sorted_unique
)

save_to_csv_and_archive(
    filenames_list, transcriptions_list, files_writers_ids,
    initial_filenames_list, train_csv, "/content/SOURCE3/train", out_train_dir)

filenames_list = []
transcriptions_list = []
files_writers_ids = []
initial_filenames_list = []

process_images(
    WORDS_TRAIN_DIR, VALIDATION_DIRS_TRAINSET, WORDS_FROM_SUBDIRS,
    WORDS_SUBDIRS_FILES, out_validation_dir, cropped_freq, sorted_unique
)

process_images(
    WORDS_TEST_DIR, validation_dirnames, WORDS_FROM_TEST_SUBDIRS,
    TEST_SUBDIRS_FILES, out_validation_dir, cropped_freq, sorted_unique
)

save_to_csv_and_archive(
    filenames_list, transcriptions_list, files_writers_ids,
    initial_filenames_list, validation_csv, "/content/SOURCE3/validation", out_validation_dir)

In [None]:
from lxml import etree

def extract_bbox_coords(points):
    xs = []
    ys = []

    if not points:  # If points list is empty, return None
        print("No points found for bounding box.")
        return None

    for point in points:
        x = point.get('x')
        y = point.get('y')

        if x is None or y is None:  # Check if 'x' or 'y' attributes are missing
            print(f"Point missing x or y attribute: {point.attrib}")
            return None

        xs.append(float(x))
        ys.append(float(y))

    return [min(xs), min(ys), max(xs), max(ys)]

def extract_annotations(xml_filepath):
    ns = {'pc': 'http://schema.primaresearch.org/PAGE/gts/pagecontent/2010-03-19'}

    with open(xml_filepath, 'rb') as file:
        tree = etree.parse(file)

    handwriting_crop_bbox = []
    page_id = None

    for attr_region in tree.findall('.//pc:AttrRegion[@attrType="3"][@fontType="2"]', namespaces=ns):
        page_id = attr_region.get('id')
        if page_id is None:
            print("No page ID found.")
            page_id = None

        min_area_rect = attr_region.find('.//pc:minAreaRect', namespaces=ns)
        if min_area_rect is not None:
            handwriting_crop_bbox = extract_bbox_coords(min_area_rect.findall('.//pc:Point', namespaces=ns))
        else:
            print(f"No 'minAreaRect' found for page ID: {page_id}")
            handwriting_crop_bbox = None

    annotated_regions = []
    annotated_regions_contents = []

    for region in tree.findall('.//pc:AttrRegion[@attrType="2"][@fontType="2"]', namespaces=ns):
        region_id = region.get('id')
        if region_id is None:
            print("No region ID found.")
            region_id = None

        median_word_height = region.get('medianWordHeight')
        if median_word_height is None:
            print(f"No medianWordHeight found for region ID: {region_id}")
            median_word_height = None
        else:
            median_word_height = float(median_word_height)

        font_angle_rad = region.get('fontAngleRad')
        if font_angle_rad is None:
            print(f"No fontAngleRad found for region ID: {region_id}")
            font_angle_rad = None
        else:
            font_angle_rad = float(font_angle_rad)

        min_area_rect = region.find('.//pc:minAreaRect', namespaces=ns)
        if min_area_rect is not None:
            region_bbox = extract_bbox_coords(min_area_rect.findall('.//pc:Point', namespaces=ns))
        else:
            print(f"No 'minAreaRect' found for region ID: {region_id}")
            region_bbox = None

        annotated_regions.append({
            "region_id": region_id,
            "median_word_height": median_word_height,
            "font_angle_rad": font_angle_rad,
            "region_bbox": region_bbox,
        })

        region_contents = {
            "transcriptions": [],
            "ids": [],
            "bboxes": []
        }

        # Find all subregions within the region that contain transcriptions
        for subregion in region.findall('.//pc:AttrRegion[@text]', namespaces=ns):
            text = subregion.get('text')
            if text is None:
                print(f"No text found in subregion for region ID: {region_id}")
                text = None

            id = subregion.get('id')
            if id is None:
                print(f"No ID found for subregion in region ID: {region_id}")
                id = None

            min_area_rect = subregion.find('.//pc:minAreaRect', namespaces=ns)
            if min_area_rect is not None:
                bbox = extract_bbox_coords(min_area_rect.findall('.//pc:Point', namespaces=ns))
            else:
                print(f"No 'minAreaRect' found for subregion ID: {id}")
                bbox = None  # Set to None if not found

            region_contents["transcriptions"].append(text)
            region_contents["ids"].append(id)
            region_contents["bboxes"].append(bbox)

        annotated_regions_contents.append(region_contents)

    return {
        "page_id": page_id,
        "handwriting_crop_bbox": handwriting_crop_bbox,
        "regions": annotated_regions,
        "regions_contents": annotated_regions_contents,
    }


In [None]:
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt

def draw_bboxes(image_path, annotations, output_path="/content/annotated_image.tif"):
    image = Image.open(image_path)
    draw = ImageDraw.Draw(image)

    if annotations["handwriting_crop_bbox"]:
        bbox = annotations["handwriting_crop_bbox"]
        draw.rectangle(bbox, outline="red", width=2)

    for region in annotations["regions"]:
        if region["region_bbox"]:
            bbox = region["region_bbox"]
            draw.rectangle(bbox, outline="blue", width=2)

    for region_contents in annotations["regions_contents"]:
        for bbox in region_contents["bboxes"]:
            if bbox:
                draw.rectangle(bbox, outline="green", width=2)

    # Save the image
    image.save(output_path)
    print(f"Annotated image saved at {output_path}")

In [None]:
def label_encoding(file_path):
    with open(file_path, 'rb') as file:
        content = file.read()

    content = content.replace(b'encoding="UTF-16"', b'encoding="UTF-8"')

    with open(file_path, 'wb') as file:
        file.write(content)



In [None]:
def crop_from_bbox(image_path, bbox, out_path, annotation):
  image = Image.open(image_path).crop(bbox)
  for region in annotation['regions']:
    x_min, y_min, x_max, y_max = region["region_bbox"]
    region["region_bbox"] = [x_min - bbox[0], y_min - bbox[1], x_max - bbox[0], y_max - bbox[1]]
    w, h = image.size
    if (min(region["region_bbox"]) < 0 or
        region["region_bbox"][-2] > w or
        region["region_bbox"][-1] > h):
        raise ValueError(f'bbox is out of bounds {region["region_bbox"]} for image size: {image.size}')

  for content in annotation['regions_contents']:
    for idx in range(len(content["bboxes"])):
      x_min, y_min, x_max, y_max = content["bboxes"][idx]
      content["bboxes"][idx] = [x_min - bbox[0], y_min - bbox[1], x_max - bbox[0], y_max - bbox[1]]

  image.save(out_path, "JPEG", quality=95)
  return annotation

In [None]:
train_pages_annotations = {}
val_pages_annotations = {}
test_pages_annotations = {}

annotations_dir = '/content/SOURCE3/detection'
annotations_train_dir = '/content/SOURCE3/detection/train'
annotations_test_dir = '/content/SOURCE3/detection/test'
annotations_val_dir = '/content/SOURCE3/detection/validation'

os.makedirs(annotations_dir, exist_ok=True)
os.makedirs(annotations_train_dir, exist_ok=True)
os.makedirs(annotations_test_dir, exist_ok=True)
os.makedirs(annotations_val_dir, exist_ok=True)

train_xml_dir = '/content/cvl-database-1-1/trainset/xml'
test_xml_dir = '/content/cvl-database-1-1/testset/xml'

train_pages_dir = '/content/cvl-database-1-1/trainset/pages'
test_pages_dir = '/content/cvl-database-1-1/testset/pages'

# Throwing out german texts (most of them are corrupted, not relevant)
train_xml_files = [f for f in os.listdir(train_xml_dir)\
                   if f[-4:] == '.xml' and '-6_attributes.xml'\
                   not in f and '-3_attributes.xml' not in f]
val_xml_files = [f for f in train_xml_files if f[:4] in VALIDATION_DIRS_TRAINSET]
train_xml_files = [f for f in train_xml_files if f[:4] not in VALIDATION_DIRS_TRAINSET]

# There are two blank pages left by writer, one corrupted
merged_xml_files = [f for f in os.listdir(test_xml_dir)\
                   if f[-4:] == '.xml' and '-6_attributes.xml'\
                   not in f and '-3_attributes.xml' not in f \
                    and '0431-3' not in f and '0431-4' not in f \
                    and '0161-4' not in f and '0161-2' not in f\
                    and '0161-3' not in f and '0161-6' not in f]

def process_page_and_xml(xml_files_list, xml_dir, pages_dir, annotations_dir, annotations_dict):
  for xml_f in xml_files_list:
    xml_filename = os.path.join(xml_dir, xml_f)
    print(xml_filename)
    label_encoding(xml_filename)
    annotation = extract_annotations(xml_filename)

    crop_bbox = annotation['handwriting_crop_bbox']
    page_id = annotation['page_id']

    page_filepath = os.path.join(pages_dir, page_id + '.tif')
    out_page_filepath = os.path.join(annotations_dir, page_id + '.jpg')

    annotation = crop_from_bbox(page_filepath, crop_bbox, out_page_filepath, annotation)

    annotations_dict[page_id] = {"regions": annotation["regions"],
                                 "regions_contents": annotation["regions_contents"]}





In [None]:
test_files_from_merged = []
val_files_from_merged = []
train_files_from_merged = []

for xml_f in merged_xml_files:
  xml_writer_id = xml_f[:4]

  if xml_writer_id in train_dirnames:
      train_files_from_merged.append(xml_f)
  elif xml_writer_id in test_dirnames:
    test_files_from_merged.append(xml_f)
  elif xml_writer_id in validation_dirnames:
      val_files_from_merged.append(xml_f)
  else:
    raise ValueError("XML file doesn't belong to any valid writer id's")


In [None]:
test_count = len(test_files_from_merged)
train_count = len(train_files_from_merged)
val_count = len(val_files_from_merged)

In [None]:
print(test_count / (test_count + train_count + val_count))

We need to split pages a bit differenly that in case when we splitted for words.

In [None]:
new_test_files_from_merged = []
for xml_f in test_files_from_merged:
  if random.random() <= 0.5:
    if random.random() <= 0.13:
      val_files_from_merged.append(xml_f)
    else:
      train_files_from_merged.append(xml_f)
  else:
    new_test_files_from_merged.append(xml_f)

test_files_from_merged = new_test_files_from_merged

In [None]:
test_count = len(test_files_from_merged)
train_count = len(train_files_from_merged)
val_count = len(val_files_from_merged)
print(test_count / (test_count + train_count + val_count))

In [None]:
import json

process_page_and_xml(
    train_xml_files, train_xml_dir, train_pages_dir,
    annotations_train_dir, train_pages_annotations)

process_page_and_xml(
    train_files_from_merged, test_xml_dir, test_pages_dir,
    annotations_train_dir, train_pages_annotations)


train_annotations_json = '/content/SOURCE3/detection/train_annotations.json'
test_annotations_json = '/content/SOURCE3/detection/test_annotations.json'
val_annotations_json = '/content/SOURCE3/detection/validation_annotations.json'


with open(train_annotations_json, 'w') as f:
  json.dump(train_pages_annotations, f)

process_page_and_xml(
    test_files_from_merged, test_xml_dir, test_pages_dir,
    annotations_test_dir, test_pages_annotations)


with open(test_annotations_json, 'w') as f:
  json.dump(test_pages_annotations, f)


process_page_and_xml(
    val_xml_files, train_xml_dir, train_pages_dir,
    annotations_val_dir, val_pages_annotations)

process_page_and_xml(
    val_files_from_merged, test_xml_dir, test_pages_dir,
    annotations_val_dir, val_pages_annotations)


with open(val_annotations_json, 'w') as f:
  json.dump(val_pages_annotations, f)



In [None]:
print(f'Percentage of valid from init. testset: {len(merged_xml_files)/len(os.listdir(test_xml_dir)):.2f}')
print(f'Percentage of valid from init. train: {len(train_xml_files + val_xml_files)/len(os.listdir(train_xml_dir)):.2f}')

In [None]:
shutil.make_archive('/content/SOURCE3/detection', "zip", '/content/SOURCE3/detection/')