Connected to env (Python 3.11.13)

In [None]:
import pdfplumber as plumber
import pandas as pd
import numpy as np
import os

from sklearn.cluster import DBSCAN

In [None]:
class Cell:
    def __init__(self, x0, y0, x1, y1):
        self.x0 = x0
        self.y0 = y0
        self.x1 = x1
        self.y1 = y1

        self.height = abs(self.y1 - self.y0)
        self.width = abs(self.x1 - self.x0)

        self.top = None
        self.bottom = None
        self.left = None
        self.right = None

        self.text = ""

    def __repr__(self):
        return f"Cell(({self.x0}, {self.y0})-({self.x1}, {self.y1}))"

In [None]:

class Page:
    def __init__(self, page) -> None:
        self.page = page

        self.page_number = self.page.page_number
        self.height = self.page.height
        self.width = self.page.width

        self.row_grid = Page._generate_grid(self.height)
        self.column_grid = Page._generate_grid(self.width)

        self.chars = pd.DataFrame()
        self.lines = pd.DataFrame()
        self.edges = pd.DataFrame()
        self.rects = pd.DataFrame()

    def __repr__(self) -> str:
        return f"Page({self.page_number}, {self.height}x{self.width})"

    def get_page_characters(self):
        if self.chars.empty:
            chars = self.page.chars

            char_df = Page._attribute_df(
                chars,
                mandatory=["text", "x0", "y0", "x1", "y1"],
                selection=["size", "width", "height"],
            )

            for col in ["x0", "y0", "x1", "y1"]:
                char_df[col] = char_df[col].apply(
                    lambda x: Page._snap_to_grid(
                        x, self.column_grid if col in ["x0", "x1"] else self.row_grid
                    )
                )

            self.chars = char_df.drop_duplicates().reset_index(drop=True)

        return self.chars

    def get_page_lines(self):
        if self.lines.empty:
            lines = self.page.lines

            lines_df = Page._attribute_df(
                lines,
                mandatory=["x0", "y0", "x1", "y1"],
                selection=["orientation"],
            )

            lines_df["orientation"] = lines_df.apply(
                lambda row: Page._detect_orientation(
                    row["x0"], row["y0"], row["x1"], row["y1"]
                ),
                axis=1,
            )

            for col in ["x0", "y0", "x1", "y1"]:
                lines_df[col] = lines_df[col].apply(
                    lambda x: Page._snap_to_grid(
                        x, self.column_grid if col in ["x0", "x1"] else self.row_grid
                    )
                )

            self.lines = lines_df.drop_duplicates().reset_index(drop=True)

        return self.lines

    def get_page_edges(self, tol: float = 2.0):
        if self.edges.empty:
            edges = self.page.edges

            edges_df = Page._attribute_df(
                edges,
                mandatory=["x0", "y0", "x1", "y1"],
                selection=["orientation"],
            )

            edges_df["orientation"] = edges_df.apply(
                lambda row: Page._detect_orientation(
                    row["x0"], row["y0"], row["x1"], row["y1"]
                ),
                axis=1,
            )

            coordinate_groups = [["x0", "y0"], ["x1", "y1"]]

            for orientation in ["horizontal", "vertical"]:
                subset = edges_df[edges_df["orientation"] == orientation].copy()
                indices = list(subset.index)

                if not subset.empty:
                    for group in coordinate_groups:
                        points = np.vstack((subset[group].values))

                        clustered = Page._cluster_points(
                            points,
                            self.column_grid,
                            self.row_grid,
                            tol=tol,
                            original_indices=indices,
                        )

                        for col in clustered.columns:
                            subset_col = group[0] if col == "x" else group[1]
                            subset.loc[indices, subset_col] = clustered[col].values

                edges_df.loc[indices, group] = subset[group].values

            # for col in ["x0", "y0", "x1", "y1"]:
            #     edges_df[col] = edges_df[col].apply(
            #         lambda x: Page._snap_to_grid(
            #             x, self.column_grid if col in ["x0", "x1"] else self.row_grid
            #         )
            #     )

            self.edges = edges_df.drop_duplicates().reset_index(drop=True)

        return self.edges

    def get_page_rects(self):
        if self.rects.empty:
            rects = self.page.rects

            rects_df = Page._attribute_df(
                rects,
                mandatory=["x0", "y0", "x1", "y1"],
                selection=[],
            )

            for col in ["x0", "y0", "x1", "y1"]:
                rects_df[col] = rects_df[col].apply(
                    lambda x: Page._snap_to_grid(
                        x, self.column_grid if col in ["x0", "x1"] else self.row_grid
                    )
                )

            self.rects = rects_df.drop_duplicates().reset_index(drop=True)

        return self.rects

    @staticmethod
    def _generate_grid(dimension, resolution=1):
        return [g for g in range(0, int(dimension), resolution)]

    @staticmethod
    def _snap_to_grid(val, grid):
        return max([g for g in grid if g <= val], default=val)

    @staticmethod
    def _detect_orientation(x0, y0, x1, y1, tol=0.0):
        if np.isclose(x0, x1, atol=tol):
            return "vertical"
        elif np.isclose(y0, y1, atol=tol):
            return "horizontal"
        else:
            return "other"

    @staticmethod
    def _attribute_df(
        list_dict: list[dict], mandatory: list[str], selection: list[str]
    ) -> pd.DataFrame:
        """
        Create a DataFrame from a list of dictionaries, ensuring mandatory columns are present.
        """

        if not list_dict:
            # Return empty DataFrame with all mandatory + selection columns as placeholders
            columns = mandatory + selection
            df = pd.DataFrame(columns=columns)
            # Fill selection columns with NaN explicitly (optional here as empty)
            for col in selection:
                df[col] = np.nan
            return df

        df = pd.DataFrame(list_dict)

        for col in mandatory:
            if col not in df.columns:
                raise ValueError(f"Mandatory column '{col}' missing in data.")

        for col in selection:
            if col not in df.columns:
                df[col] = np.nan

        final = mandatory + selection

        return df[final]

    @staticmethod
    def _cluster_points(
        coords_array: np.ndarray,
        column_grid: list[int],
        row_grid: list[int],
        tol: float = 3.0,
        original_indices: list = None,
    ):
        """
        Cluster points in a 2D space using DBSCAN.
        """
        db = DBSCAN(
            eps=tol, min_samples=1, metric="euclidean", algorithm="kd_tree"
        ).fit(coords_array)
        labels = db.labels_

        point_df = pd.DataFrame(coords_array, columns=["x", "y"])
        point_df["cluster"] = labels
        centroids = point_df.groupby("cluster")[["x", "y"]].mean()

        mapped_points = np.array([centroids.loc[label].values for label in labels])

        df = pd.DataFrame(mapped_points, columns=["x", "y"])

        for col in ["x", "y"]:
            df[col] = df[col].apply(
                lambda x: Page._snap_to_grid(x, column_grid if col == "x" else row_grid)
            )

        if original_indices is not None:
            df.index = original_indices

        return df



In [None]:
class Document:
    def __init__(self, path: str) -> None:
        self.path = path
        self.pdf = plumber.open(
            path
        )  # TODO: Change to an encapsulated function that handles PWD protection

        self.pages = []
        self._initialise_pages()

    def _initialise_pages(self):
        for page in self.pdf.pages:
            self.pages.append(Page(page))

In [None]:

path = "../../tests/data/"
file = "DEOGIRI NAGARI SAHAKARI BANK LTD.pdf"

doc = Document(os.path.join(path, file))
page = doc.pages[0]

In [None]:
edge_df = page.get_page_edges()

In [None]:
def pdf_to_array_coords(x, y, height):
    # y-axis flip because PDF origin (0,0) is bottom-left
    row = height - int(round(y))
    col = int(round(x))
    return row, col

In [None]:
height = page.height
width = page.width

In [None]:

from skimage.draw import line as skline
import matplotlib.pyplot as plt

# Create a blank canvas
canvas = np.zeros((page.height, page.width), dtype=np.uint8)

# %%
# Draw lines on the canvas
for _, line_obj in edge_df.iterrows():
    x0, y0 = line_obj["x0"], line_obj["y0"]
    x1, y1 = line_obj["x1"], line_obj["y1"]
    r0, c0 = pdf_to_array_coords(x0, y0, height)
    r1, c1 = pdf_to_array_coords(x1, y1, height)

    # Draw line pixels on the array using skimage's line function
    rr, cc = skline(r0, c0, r1, c1)
    # Ensure indices are inside array bounds
    rr = np.clip(rr, 0, height - 1)
    cc = np.clip(cc, 0, width - 1)

    # Mark line pixels as 1
    canvas[rr, cc] = 1

plt.imshow(canvas, cmap="gray")
# %%

In [None]:
vert_edge_df = edge_df[edge_df["orientation"] == "vertical"].reset_index(drop=True)
hor_edge_df = edge_df[edge_df["orientation"] == "horizontal"].reset_index(drop=True)

In [None]:
vert_edge_df.to_clipboard()

In [None]:
vert_x_coords = np.concatenate([vert_edge_df["x0"], vert_edge_df["x1"]])
vert_y0_coords = np.concatenate([vert_edge_df["y0"]])
vert_y1_coords = np.concatenate([vert_edge_df["y1"]])

vert_x_coords = np.unique(vert_x_coords)
vert_y0_coords = np.unique(vert_y0_coords)
vert_y1_coords = np.unique(vert_y1_coords)

vert_x_coords.sort()
vert_y0_coords.sort()
vert_y1_coords.sort()

In [None]:
merge_threshold = 2.0

groups = []
group = [vert_x_coords[0]]

for x in vert_x_coords[1:]:
    if np.abs(x - group[-1]) < merge_threshold:
        group.append(x)
    else:
        groups.append(group)
        group = [x]
groups.append(group)



In [None]:
groups

In [None]:
group

In [None]:
vert_x_coords

In [None]:
cluster_map = {v: np.mean(group) for group in groups for v in group}

In [None]:
for k, v in cluster_map.items():
    cluster_map[k] = Page._snap_to_grid(v, page.column_grid)

cluster_map

In [None]:
edge_df

In [None]:
edge_df

In [None]:
points = np.vstack((edge_df[["x0", "y0"]].values, edge_df[["x1", "y1"]].values))

In [None]:
from sklearn.cluster import DBSCAN

db = DBSCAN(eps=3.0, min_samples=1).fit(points)
labels = db.labels_

In [None]:
db

In [None]:
point_df = pd.DataFrame(points, columns=['x', 'y'])
point_df['cluster'] = labels
centroids = point_df.groupby('cluster')[['x', 'y']].mean()

mapped_points = np.array([centroids.loc[label].values for label in labels])

new_coords = mapped_points.reshape(2, -1, 2)
edge_df['x0_clean'], edge_df['y0_clean'] = new_coords[0][:,0], new_coords[0][:,1]
edge_df['x1_clean'], edge_df['y1_clean'] = new_coords[1][:,0], new_coords[1][:,1]

In [None]:
edge_df.to_clipboard()

In [None]:

from skimage.draw import line as skline
import matplotlib.pyplot as plt

# Create a blank canvas
canvas = np.zeros((page.height, page.width), dtype=np.uint8)

# %%
# Draw lines on the canvas
for _, line_obj in edge_df.iterrows():
    x0, y0 = line_obj["x0_clean"], line_obj["y0_clean"]
    x1, y1 = line_obj["x1_clean"], line_obj["y1_clean"]
    
    r0, c0 = pdf_to_array_coords(x0, y0, height)
    r1, c1 = pdf_to_array_coords(x1, y1, height)

    # Draw line pixels on the array using skimage's line function
    rr, cc = skline(r0, c0, r1, c1)
    # Ensure indices are inside array bounds
    rr = np.clip(rr, 0, height - 1)
    cc = np.clip(cc, 0, width - 1)

    # Mark line pixels as 1
    canvas[rr, cc] = 1

plt.imshow(canvas, cmap="gray")
# %%

In [None]:
point_df


In [None]:
new_coords

In [None]:
df = pd.DataFrame(columns=["x", "y"])

In [None]:
df

In [None]:
df['x'], df['y'] = new_coords[0][:,0], new_coords[0][:,1]

In [None]:
edge_df