In [None]:
import fitz
import numpy as np
from numpy import integer
from parso.python.tree import String
from pydantic import BaseModel
import pandas as pd
from typing import Dict, List
from pathlib import Path

In [None]:
# helper object for rectangles
class BoxObject(BaseModel):
    left: float
    bottom: float
    right: float
    top: float

    @property
    def get_rectangle(self):
        return self.left, self.bottom, self.right, self.top

    @property
    def width(self):
        return self.right - self.left

    @property
    def height(self):
        return self.top - self.bottom


class MyRect(BaseModel):

    x0: float
    y0: float
    x1: float
    y1: float

    def get_rect(self):
        return self.x0, self.y0, self.x1, self.y1

In [None]:
dir_pdf = Path.home() / "mnt/imidat/IMI-NLPCHIR/PDF/ARC_BEFRAD"
dir_target = Path.home() / "mnt/imidat/IMI-NLPCHIR/KrczalPaulPdf/online_buffer"
CASE_VAR = "with_pid_case"


#test_pdf_name = '0000049533_0063324964_0000000000000010110342036.pdf' # EN
#test_pdf_name = '0003144716_0063222981_0000000000000010109312434.pdf' #HUMBEF
#test_pdf_name = '0001886770_0062915440_0000000000000010106859385.pdf' #'HUMBEF'
#test_pdf_name = '0000878456_0063443781_0000000000000010111183386.pdf' # BEFRAD
test_pdf_name = '0003145866_0063250122_0000000000000010109587173.pdf' # BEFRAD
# test_pdf_name = '0002169796_0062805612_0000000000000010105555277.pdf' # BEF_SO
#test_pdf_name = '0000035826_0062821574_0000000000000010105780634.pdf' #PA


In [None]:
doc_lines: fitz.Document = fitz.open(dir_pdf / CASE_VAR / test_pdf_name)
vertical_lines = []
horizontal_lines = []
rects = []
horizontal_lines_from_table = []

for pnum, page in enumerate(doc_lines, start=1):
    drawings = page.get_drawings()
    for d in drawings:
        for item in d["items"]:
            if item[0] != "re":  # Nur Rechtecke in diesem Typ
                continue
            x0, y0, x1, y1 = item[1]

            # ensure that qr code is not includet
            if x1 - x0 > 2 or y1 - y0 > 2:
                rects.append(
                {
                    "x0": x0,
                    "y0": y0,
                    "x1": x1,
                    "y1": y1,
                    "height": y1 - y0,
                    "width": x1 - x0,
                    "page": pnum
                }
            )

                # Linke Kante
                if abs(y1 - y0) >= 0:
                    vertical_lines.append(
                        {
                            "page": pnum,
                            "x": x0,
                            "y0": y0,
                            "y1": y1,
                        }
                    )
                # Rechte Kante
                if abs(y1 - y0) >= 0:
                    vertical_lines.append(
                        {
                            "page": pnum,
                            "x": x1,
                            "y0": y0,
                            "y1": y1,
                        }
                    )

                # Obere Kante
                if abs(x1 - x0) >= 0:
                    horizontal_lines.append(
                        {
                            "page": pnum,
                            "y": y0,
                            "x0": x0,
                            "x1": x1,
                        }
                    )
                # Untere Kante
                if abs(x1 - x0) >= 0:
                    horizontal_lines.append(
                        {
                            "page": pnum,
                            "y": y1,
                            "x0": x0,
                            "x1": x1,
                        }
                    )

test_df_lines = pd.DataFrame(vertical_lines)
df_lines_horizontal = pd.DataFrame(horizontal_lines)
rects_df = pd.DataFrame(rects)
rects_df.to_csv(dir_target / 'rects.csv', index=False, sep=";", decimal=",", encoding='utf-8')

test_df_lines

In [None]:
# detect large bounding boxes for table rows

row_horizontal_lines = []

# this flag is used so that the last element in the itration is also appended
append_prev = False

for i in range(len(rects_df) - 1):

    current_row = rects_df.iloc[i]
    next_row = rects_df.iloc[i + 1]

    if (
        current_row["y1"] - current_row["y1"] == next_row["y1"] - next_row["y1"]
        and next_row["x0"] > current_row["x0"]
        and current_row['height'] > 2
    ):
        row_horizontal_lines.append(current_row)
        append_prev = True
    elif (
        append_prev == True
        and current_row["y1"] - current_row["y1"] == next_row["y1"] - next_row["y1"]
        and current_row['height'] > 2
    ):
        row_horizontal_lines.append(current_row)



buffer_df = pd.DataFrame(row_horizontal_lines)
buffer_df.to_csv(dir_target / 'rects_filtered.csv', index=False, sep=";", decimal=",", encoding='utf-8')

target_boxes = []
for page, group in buffer_df.groupby("y1", sort=False):

    # only accept horizontal lines with len > 3
    if len(group) < 3:  # a row need min. 3 horizontal lines <==> min. 3 Boxes next to each other
        continue

    collapsed_rects = {
        "x0": group["x0"].min(),
        "y0": group["y0"].min(),
        "x1": group["x1"].max(),
        "y1": group["y1"].max(),
        "height": group["height"].max(),
        "width": group["width"].max(),
        "page": group["page"].min() # it doesnt matter which vlaue cause groups cant overlap pages
    }
    target_boxes.append(collapsed_rects)

target_boxes_df = pd.DataFrame(target_boxes)
target_boxes_df

In [None]:
doc = fitz.open(dir_pdf / CASE_VAR / test_pdf_name)
rows = []
for pnum, page in enumerate(doc, start=1):
    for block in page.get_text("dict")["blocks"]:
        if block["type"] != 0: continue
        for line in block["lines"]:
            for span in line["spans"]:
                x0, y0, x1, y1 = span["bbox"]
                rows.append({
                    "page": pnum,
                    "text": span["text"],
                    "font": span["font"],
                    "size": span["size"],
                    "x0": x0, "y0": y0, #top y0
                    "x1": x1, "y1": y1, #bottom = y1
                    "width": x1 - x0,
                    "height": y1 - y0,
                    "flags": span["flags"],
                })
df = pd.DataFrame(rows)
df.replace({'text': ' '}, {'text': pd.NA}, inplace=True)
df.dropna(inplace=True)
# Table Stuff

tol = 4  # pixel tolerance for “same baseline”
to_drop = []
new_rows = []

for _ , row in target_boxes_df.iterrows():
    page = row['page']
    x0r = row['x0']
    y0r = row['y0']
    x1r = row['x1']
    y1r = row['y1']
    group = df[
        (df['page'] == page) &
        # control width enclosing
        (df['x0'] >= x0r) & (df['x1'] <= x1r) &
        # control height enclosing
        (df['y0'] >= y0r) & (df['y1'] <= y1r)
    ]

    if group.empty:
        continue

    # record indexes of rows that are going to be merged
    indices = group.index.tolist()
    to_drop.extend(indices)

    # search for horizontal lines with the same bounding boxes of the row
    vertical_bounding_lines = test_df_lines[
        (test_df_lines['page'] == page) &
        # control width
        (test_df_lines['x'] >= x0r) & (test_df_lines['x'] <= x1r) &
        # control height
        (test_df_lines['y0'] >= y0r) & (test_df_lines['y1'] <= y1r)
    ]
    # due to the nature of these document a visible lines contains of 4 drawn lines (rect)
    # so one vertical lines is represented by two separate lines here (two of the four)
    # => collapse the lines for easier processing if the x difference is too small
    vertical_bounding_lines = vertical_bounding_lines.reset_index(drop=True)
    collapsed_vertical_bounding = []
    last_kept = vertical_bounding_lines.loc[0, 'x']
    keep = [True]
    # build a mask to remove duplicates (with margin)
    for i in range(1, len(vertical_bounding_lines)):
        current = vertical_bounding_lines.loc[i, 'x']
        if current - last_kept > 2:
            keep.append(True)
            last_kept = current
        else:
            keep.append(False)

    # this df contains all the vertical lines for a given row
    collapsed_vertical_bounding_df = vertical_bounding_lines[keep]

    # build cell bounding's from the lines
    cells = []
    for i in range(0, len(collapsed_vertical_bounding_df)):
        if i < len(collapsed_vertical_bounding_df) - 1:
            row_current = collapsed_vertical_bounding_df.iloc[i]
            row_next = collapsed_vertical_bounding_df.iloc[i+1]
            bounding = {
                'page': page,
                'x0': row_current['x'],
                'x1': row_next['x']
            }
            cells.append(bounding)
    cells_df = pd.DataFrame(cells)

    # collapse rows from group based on found cells
    collapsed_text_from_cells = []
    for _,cell in cells_df.iterrows():
        x0 = cell['x0']
        x1 = cell['x1']

        cell_items = group[
            (group['x0'] >= x0) &
            (group['x1'] <= x1)
        ]

        collapsed_text_from_cells.append(
            ' '.join(cell_items['text'].tolist())
        )

    print('|'.join(collapsed_text_from_cells))


df.to_csv(dir_target / 'out_raw.csv', index=False, sep=";", decimal=",", encoding='utf-8')
# collapse lines into one row
df['y1_rounded'] = df['y1'].round(1)
grouped = []
for (page, y1), group in df.groupby(["page", "y1_rounded"], sort=False):
    group_sorted = group.sort_values("x0")  # Left-to-right order

    group_sorted.to_csv(dir_target / 'sorted.csv', index=False, sep=";", decimal=",", encoding='utf-8')

    fonts = list(group_sorted["font"])
    sizes = list(group_sorted["size"])

    collapsed = {
        "page": page,
        "y1": y1,
        "top": group_sorted["y0"].min(),
        "x0": group_sorted["x0"].min(),
        "x1": group_sorted["x1"].max(),
        "width": group_sorted["x1"].max() - group_sorted["x0"].min(),
        "height": group_sorted["height"].max(),
        "text": " ".join(group_sorted["text"]),
        "fonts": list(group_sorted["font"]),
        "sizes": list(group_sorted["size"]),
        "font_flow_begin": fonts[0] if fonts else None,
        "font_flow_end": fonts[-1] if fonts else None,
        "size_flow_begin": sizes[0] if sizes else None,
        "size_flow_end": sizes[-1] if sizes else None,
        "flags": list(group_sorted["flags"])
    }

    grouped.append(collapsed)

collapsed_df = pd.DataFrame(grouped)
#df.to_csv(dir_target / 'out.csv', index=False, sep=";", decimal=",", encoding='utf-8')
#df
collapsed_df.to_csv(dir_target / 'out.csv', index=False, sep=";", decimal=",", encoding='utf-8')
collapsed_df

In [None]:

# returns true if distance between two lines as aprox. equal to the current lines height
def equals_within_boundary(a, b, tolerance):
    return abs(a - b) <= tolerance

In [None]:
# build the comparisons early to prevent a type error
block_ids = []
current_block = 0
prev_font_begin = None
prev_font_end = None
prev_size_begin = None
prev_size_end = None
prev_bottom = None

for idx, row in collapsed_df.iterrows():
    if idx == 0:
        current_block = 1
    else:
        # Start a new block if font or size changes, or if the bottom coordinate decreases
        if (row['bottom'] < prev_bottom): # or row['size_flow_begin'] != prev_size_end
            current_block+=1
        # only begin a new block based on font change if two lines are further arppart then their heigth
        elif (row['font_flow_begin'] != prev_font_end and not equals_within_boundary(row['bottom'], prev_bottom, row['height'] + 3)):
            current_block+=1
        elif (row['font_flow_begin'] == prev_font_end and (abs(row['bottom'] - prev_bottom) > row['height'] + 3)):
            current_block+=1
    block_ids.append(current_block)
    prev_font_begin = row['font_flow_begin']
    prev_font_end = row['font_flow_end']
    prev_size_begin = row['size_flow_begin']
    prev_size_end = row['size_flow_end']
    prev_bottom = row['bottom']

collapsed_df['block_id'] = block_ids

# Collapse into blocks by aggregating text and bounding boxes
collapsed = collapsed_df.groupby(['page', 'block_id'], as_index=False).agg({
    'text': lambda texts: '\n'.join(texts),
    'x0': 'min',
    'top': 'min',
    'x1': 'max',
    'bottom': 'max'
})

In [None]:
collapsed['box'] = collapsed.apply(
    lambda _row: BoxObject(
        left=_row['x0'],
        bottom=_row['bottom'],
        right=_row['x1'],
        top=_row['top']
    ), axis=1
)


In [None]:

for r in target_boxes_df.iterrows():
    row = r[1]
    print(row)

In [None]:
doc = fitz.open(dir_pdf / CASE_VAR / test_pdf_name)
for r in collapsed.iterrows():
    row = r[1]
    page = doc[row['page'] - 1]  #pyMu indexes start at 0
    box = row['box']
    rect = fitz.Rect(box.left, box.top, box.right, box.bottom)
    shape = page.new_shape()
    shape.draw_rect(rect)
    shape.finish(
        color=(1, 0, 0),
        width=0.5,
        fill=None
    )
    shape.commit()

# annotate detected boxes
for r in target_boxes_df.iterrows():
    row = r[1]
    page = doc[int(row["page"]) - 1]
    rect = fitz.Rect(row["x0"], row["y0"], row["x1"], row["y1"])
    shape = page.new_shape()
    shape.draw_rect(rect)
    shape.finish(
        color=(0, 1, 0),
        width=1,
        fill=None
    )
    shape.commit()

""" for r in test_df_lines.iterrows():
    row = r[1]
    page = doc[int(row["page"]) - 1]
    x = row["x"]
    y0 = row["y0"]
    y1 = row["y1"]

    # Zeichne als sehr schmaler vertikaler Kasten
    shape = page.new_shape()
    rect = fitz.Rect(x - 0.25, y0, x + 0.25, y1)
    shape.draw_rect(rect)
    shape.finish(
        color=(0, 1, 0),  # Grün
        width=0.5,
        fill=(0, 1, 0)  # Optional: grün ausfüllen
    )
    shape.commit()

for r in df_lines_horizontal.iterrows():
    row = r[1]
    page = doc[int(row['page']) - 1]
    y = row['y']
    x0 = row['x0']
    x1 = row['x1']

    shape = page.new_shape()
    rect = fitz.Rect(x0, y - 0.25, x1, y + 0.25)
    shape.draw_rect(rect)
    shape.finish(
        color=(0, 1, 0),  # Grün
        width=0.5,
        fill=(0, 1, 0)  # Optional: grün ausfüllen
    )
    shape.commit() """

doc.save(dir_target / "output_HUMBEF.pdf")
doc.close()