## Imports

In [1]:
import os
import time
import pathlib

import numpy as np
import pandas as pd
import cv2
import torch
from torch import nn
from torchvision import transforms
from PIL import Image, ImageDraw, ImageTk, ImageFont
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.patches import Patch

from transformers import AutoModelForObjectDetection, TableTransformerForObjectDetection

from IPython.display import clear_output

In [2]:
# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

Device: cuda


## Function Definitions 

In [3]:
class MaxResize(object):
    def __init__(self, max_size=800):
        self.max_size = max_size

    def __call__(self, image):
        width, height = image.size
        current_max_size = max(width, height)
        scale = self.max_size / current_max_size
        resized_image = image.resize(
            (int(round(scale * width)), int(round(scale * height)))
        )

        return resized_image


# for output bounding box post-processing
def box_cxcywh_to_xyxy(x):
    x_c, y_c, w, h = x.unbind(-1)
    b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
    return torch.stack(b, dim=1)


def rescale_bboxes(out_bbox, size):
    img_w, img_h = size
    b = box_cxcywh_to_xyxy(out_bbox)
    b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)
    return b


# Object detection
def outputs_to_objects(outputs, img_size, id2label):
    m = outputs.logits.softmax(-1).max(-1)
    pred_labels = list(m.indices.detach().cpu().numpy())[0]
    pred_scores = list(m.values.detach().cpu().numpy())[0]
    pred_bboxes = outputs["pred_boxes"].detach().cpu()[0]
    pred_bboxes = [elem.tolist() for elem in rescale_bboxes(pred_bboxes, img_size)]

    objects = []
    for label, score, bbox in zip(pred_labels, pred_scores, pred_bboxes):
        class_label = id2label[int(label)]
        if not class_label == "no object":
            objects.append(
                {
                    "label": class_label,
                    "score": float(score),
                    "bbox": [float(elem) for elem in bbox],
                }
            )

    return objects


def fig2img(fig):
    """Convert a Matplotlib figure to a PIL Image and return it"""
    import io

    buf = io.BytesIO()
    fig.savefig(buf)
    buf.seek(0)
    img = Image.open(buf)
    return img


def visualize_detected_tables(img, det_tables, out_path=None):
    plt.imshow(img, interpolation="lanczos")
    fig = plt.gcf()
    fig.set_size_inches(20, 20)
    ax = plt.gca()

    for det_table in det_tables:
        bbox = det_table["bbox"]

        # Extend the bottom edge of the bounding box
        extend_height = (bbox[3] - bbox[1]) * 0.05
        bbox[3] += extend_height

        # Extend the top edge of the bounding box
        bbox[1] -= extend_height

        if det_table["label"] == "table":
            facecolor = (1, 0, 0.45)
            edgecolor = (1, 0, 0.45)
            alpha = 0.3
            linewidth = 2
            hatch = "//////"
        elif det_table["label"] == "table rotated":
            facecolor = (0.95, 0.6, 0.1)
            edgecolor = (0.95, 0.6, 0.1)
            alpha = 0.3
            linewidth = 2
            hatch = "//////"
        else:
            continue

        rect = patches.Rectangle(
            bbox[:2],
            bbox[2] - bbox[0],
            bbox[3] - bbox[1],
            linewidth=linewidth,
            edgecolor="none",
            facecolor=facecolor,
            alpha=0.1,
        )
        ax.add_patch(rect)
        rect = patches.Rectangle(
            bbox[:2],
            bbox[2] - bbox[0],
            bbox[3] - bbox[1],
            linewidth=linewidth,
            edgecolor=edgecolor,
            facecolor="none",
            linestyle="-",
            alpha=alpha,
        )
        ax.add_patch(rect)
        rect = patches.Rectangle(
            bbox[:2],
            bbox[2] - bbox[0],
            bbox[3] - bbox[1],
            linewidth=0,
            edgecolor=edgecolor,
            facecolor="none",
            linestyle="-",
            hatch=hatch,
            alpha=0.2,
        )
        ax.add_patch(rect)

    plt.xticks([], [])
    plt.yticks([], [])

    legend_elements = [
        Patch(
            facecolor=(1, 0, 0.45),
            edgecolor=(1, 0, 0.45),
            label="Table",
            hatch="//////",
            alpha=0.3,
        ),
        Patch(
            facecolor=(0.95, 0.6, 0.1),
            edgecolor=(0.95, 0.6, 0.1),
            label="Table (rotated)",
            hatch="//////",
            alpha=0.3,
        ),
    ]
    plt.legend(
        handles=legend_elements,
        bbox_to_anchor=(0.5, -0.02),
        loc="upper center",
        borderaxespad=0,
        fontsize=10,
        ncol=2,
    )
    plt.gcf().set_size_inches(10, 10)
    plt.axis("off")

    if out_path is not None:
        plt.savefig(out_path, bbox_inches="tight", dpi=150)

    return fig


def objects_to_crops(img, tokens, objects, class_thresholds, padding=10):
    """
    Process the bounding boxes produced by the table detection model into
    cropped table images and cropped tokens.
    """

    table_crops = []
    for obj in objects:
        if obj["score"] < class_thresholds[obj["label"]]:
            continue

        cropped_table = {}

        bbox = obj["bbox"]
        bbox = [
            bbox[0] - padding,
            bbox[1] - padding,
            bbox[2] + padding,
            bbox[3] + padding,
        ]

        cropped_img = img.crop(bbox)

        table_tokens = [token for token in tokens if iob(token["bbox"], bbox) >= 0.5]
        for token in table_tokens:
            token["bbox"] = [
                token["bbox"][0] - bbox[0],
                token["bbox"][1] - bbox[1],
                token["bbox"][2] - bbox[0],
                token["bbox"][3] - bbox[1],
            ]

        # If table is predicted to be rotated, rotate cropped image and tokens/words:
        if obj["label"] == "table rotated":
            cropped_img = cropped_img.rotate(270, expand=True)
            for token in table_tokens:
                bbox = token["bbox"]
                bbox = [
                    cropped_img.size[0] - bbox[3] - 1,
                    bbox[0],
                    cropped_img.size[0] - bbox[1] - 1,
                    bbox[2],
                ]
                token["bbox"] = bbox

        cropped_table["image"] = cropped_img
        cropped_table["tokens"] = table_tokens

        table_crops.append(cropped_table)

    return table_crops


def get_cell_coordinates_by_row(table_data):
    # Extract rows and columns
    rows = [entry for entry in table_data if entry["label"] == "table row"]
    columns = [entry for entry in table_data if entry["label"] == "table column"]

    # Sort rows and columns by their Y and X coordinates, respectively
    rows.sort(key=lambda x: x["bbox"][1])
    columns.sort(key=lambda x: x["bbox"][0])

    # Function to find cell coordinates
    def find_cell_coordinates(row, column):
        cell_bbox = [
            column["bbox"][0],
            row["bbox"][1],
            column["bbox"][2],
            row["bbox"][3],
        ]
        return cell_bbox

    # Generate cell coordinates and count cells in each row
    cell_coordinates = []

    for row in rows:
        row_cells = []
        for column in columns:
            cell_bbox = find_cell_coordinates(row, column)
            row_cells.append({"column": column["bbox"], "cell": cell_bbox})

        # Sort cells in the row by X coordinate
        row_cells.sort(key=lambda x: x["column"][0])

        # Append row information to cell_coordinates
        cell_coordinates.append(
            {"row": row["bbox"], "cells": row_cells, "cell_count": len(row_cells)}
        )

    # Sort rows from top to bottom
    cell_coordinates.sort(key=lambda x: x["row"][1])

    return cell_coordinates

## Configuration for the transformer models.

In [4]:
model = AutoModelForObjectDetection.from_pretrained(
    "microsoft/table-transformer-detection", revision="no_timm"
)

print("Model config: ", model.config.id2label)

detection_transform = transforms.Compose(
    [
        MaxResize(800),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# update id2label to include "no object"
id2label = model.config.id2label
id2label[len(model.config.id2label)] = "no object"


#  Structure Model
structure_model = TableTransformerForObjectDetection.from_pretrained(
    "microsoft/table-structure-recognition-v1.1-all"
)
structure_model.to("cpu")

structure_transform = transforms.Compose(
    [
        MaxResize(1000),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# update id2label to include "no object"
structure_id2label = structure_model.config.id2label
structure_id2label[len(structure_id2label)] = "no object"

Model config:  {0: 'table', 1: 'table rotated'}


## Load pretrained model

In [5]:
# Define the TinyVGG class
class TinyVGG(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(hidden_units),  # Add BatchNorm here
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(hidden_units),  # Add BatchNorm here
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.BatchNorm2d(hidden_units),  # Add BatchNorm here
            nn.ReLU(),
            nn.Conv2d(hidden_units, out_channels=hidden_units, kernel_size=3, padding=1),
            nn.BatchNorm2d(hidden_units),  # Add BatchNorm here
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units*16*16, out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.classifier(x)
        return x

# Load the model state_dict correctly
model_path = "models/tinyvgg_model.pt"
checkpoint = torch.load(model_path, map_location=torch.device('cpu'))

# Create the model instance
tfc_model = TinyVGG(
    input_shape=checkpoint['input_shape'],
    hidden_units=checkpoint['hidden_units'],
    output_shape=checkpoint['output_shape']
)

# Load the state_dict into the model
tfc_model.load_state_dict(checkpoint['model_state_dict'])

# Set the model to evaluation mode
tfc_model.eval()

TinyVGG(
  (conv_block_1): Sequential(
    (0): Conv2d(3, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(25, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(25, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(25, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block_2): Sequential(
    (0): Conv2d(25, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(25, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(25, 25, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(25, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0):

In [7]:
# # Do a sample prediction from the image.
# # Define transformation
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor()
])

# # Load and transform an example image
# image_path = "dataset/trimmed/true/true-20240328_145719-cell-5.png"
# image = Image.open(image_path).convert("RGB")


# input_image_tensor = transform(image).unsqueeze(0)  # Add batch dimension
# tfc_model.to("cpu")
# input_image_tensor = input_image_tensor.to("cpu")

# # Perform inference
# with torch.inference_mode():
#     output = tfc_model(input_image_tensor)

# # Logits
# print("Logits: ", output)

# # Probabilities
# probs = torch.softmax(output, dim=1)
# print("Probabilities: ", probs)

class_names = ["none", "true", "false"]
class_names.sort()
print("Class Names: ", class_names)

# # Predicted label
# label = torch.argmax(probs, dim=1)
# print("Predicted Label: ", label)
# print("Predicted Class: ", class_names[label])

Class Names:  ['false', 'none', 'true']


## Read dataset and correct answers csv path 

In [7]:
# # Prompt for the path to the model answers CSV file
# # correct_answers_csv_path = input("Enter the path to the model answers CSV file: ")
# correct_answers_csv_path = "data/ModelAnswer.csv"
# correct_answers = pd.read_csv(correct_answers_csv_path)

# # Get total number of questions
# TOTAL_QUESTIONS = len(correct_answers)
# ANSWERS = correct_answers["Correct Answer"].tolist()

# print(f"Total number of questions: {TOTAL_QUESTIONS}")
# print(f"Answers: {ANSWERS}")

# # Prompt for the path to the evaluation data folder
# evaluation_dataset_folder = 'evaluation-data'

# # Use the current working directory if running interactively
# evaluation_dataset_folder = pathlib.Path(os.getcwd()) / evaluation_dataset_folder

# # Display the total number of questions and the evaluation dataset folder path for verification
# print(f"Evaluation dataset folder: {evaluation_dataset_folder}")

Total number of questions: 10
Answers: [False, False, False, False, False, False, True, True, True, True]
Evaluation dataset folder: d:\Projects\IIST\AutoEval\evaluation-data


In [20]:
model_answer_type1_path = "evaluation-data/model_answer_type1.csv"
model_answer_type2_path = "evaluation-data/model_answer_type2.csv"
img_model_answer_mapping = "evaluation-data/img_model_answer_mapping.csv"

TOTAL_QUESTIONS = 10
correct_answers_type1 = pd.read_csv(model_answer_type1_path)
correct_answers_type2 = pd.read_csv(model_answer_type2_path)
img_mappings = pd.read_csv(img_model_answer_mapping)

print(img_mappings.head())

ANSWERS_TYPE1 = correct_answers_type1["Correct Answer"].tolist()
ANSWERS_TYPE2 = correct_answers_type2["Correct Answer"].tolist()

print(f"Total number of questions: {TOTAL_QUESTIONS}")
print(f"Answers Type 1: {ANSWERS_TYPE1}")
print(f"Answers Type 2: {ANSWERS_TYPE2}")

# Load the evaluation dataset
evaluation_dataset_folder = 'evaluation-data'
evaluation_dataset_folder = pathlib.Path(os.getcwd()) / evaluation_dataset_folder / "phase_1_eval_dataset_enc/phase_1_eval_dataset"
print(f"Evaluation dataset folder: {evaluation_dataset_folder}")

      img_name        model_answer
0  x9v67l6.jpg  model_answer_type2
1  ewA8Rpn.jpg  model_answer_type2
2  E6MwJ00.jpg  model_answer_type2
3  9tuGAkX.jpg  model_answer_type2
4  r53EbYv.jpg  model_answer_type2
Total number of questions: 10
Answers Type 1: [False, False, False, False, False, False, True, True, True, True]
Answers Type 2: [True, True, False, False, False, False, False, True, True, True]
Evaluation dataset folder: d:\Projects\IIST\AutoEval\evaluation-data\phase_1_eval_dataset_enc\phase_1_eval_dataset


## Cell Extraction

In [21]:
def perform_extraction_and_classify_image(image_path, model):
    file_name = image_path.split("/")[-1]
    file_name_without_extension = file_name.split(".")[0]
    print("Extracting data for:", file_name)

    image = Image.open(image_path).convert("RGB")

    pixel_values = detection_transform(image).unsqueeze(0)
    pixel_values = pixel_values.to("cpu")

    with torch.no_grad():
        outputs = model(pixel_values)

    objects = outputs_to_objects(outputs, image.size, id2label)

    tokens = []
    detection_class_thresholds = {"table": 0.5, "table rotated": 0.5, "no object": 10}

    tables_crops = objects_to_crops(
        image, tokens, objects, detection_class_thresholds, padding=0
    )
    if len(tables_crops) == 0:
        print("No tables detected")
        return
    cropped_table = tables_crops[0]["image"].convert("RGB")

    pixel_values = structure_transform(cropped_table).unsqueeze(0)
    pixel_values = pixel_values.to("cpu")

    # forward pass
    with torch.no_grad():
        outputs = structure_model(pixel_values)

    cells = outputs_to_objects(outputs, cropped_table.size, structure_id2label)
    
    cell_coordinates = get_cell_coordinates_by_row(cells)

    # Plotting the cropped cell regions
    original_img_np = np.array(cropped_table)

    # Extract cell crops
    cell_crops = []
    for i, row in enumerate(cell_coordinates):
        if i == 0 and len(cell_coordinates) > TOTAL_QUESTIONS:  # Skip header if no of rows > total questions, the image might have a header.
            continue
        last_cell = row["cells"][-1]
        cell_x, cell_y, cell_w, cell_h = [int(x) for x in last_cell["cell"]]
        cell_crop = original_img_np[cell_y:cell_h, cell_x:cell_w]
        cell_crops.append(cell_crop)
    
    # If more than 10 cell crops, take only the first 10
    if len(cell_crops) > TOTAL_QUESTIONS:
        cell_crops = cell_crops[:TOTAL_QUESTIONS]
    
    # Perform classification for each cell crop
    predictions = []
    for cell_crop in cell_crops:
        cell_image = Image.fromarray(cell_crop).convert("RGB")
        input_image_tensor = transform(cell_image).unsqueeze(0)  # Add batch dimension
        input_image_tensor = input_image_tensor.to("cpu")

        # Perform inference
        with torch.inference_mode():
            output = tfc_model(input_image_tensor)

        # Probabilities
        probs = torch.softmax(output, dim=1)
        label = torch.argmax(probs, dim=1).item()
        predictions.append(class_names[label])
    
    return predictions

## Runner Code

In [22]:
# Define the path to the folder containing images and the CSV file for marks
folder_path = pathlib.Path("evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset")
marks_csv_path ="evaluation-data/results.csv"

if not os.path.exists(marks_csv_path):
    with open(marks_csv_path, "w") as f:
        f.write("img_name,pred_marks\n")

# Read the existing marks CSV file into a DataFrame
marks_df = pd.read_csv(marks_csv_path)

# List to hold paths of images that resulted in errors during processing
errored_images = []

# Iterate through each image in the folder
for image_path in folder_path.glob("*.jpg"):
    image_name = image_path.name
    image_path_str = str(image_path).replace("\\", "/")
    print(f"Processing image: {image_path_str}")

    answer_type = img_mappings[img_mappings["img_name"] == image_name]["model_answer"].values[0]
    ANSWER = ANSWERS_TYPE1 if answer_type == 'model_answer_type1' else ANSWERS_TYPE2
    total_marks = 0
    try:
        # Perform extraction and classification on the image
        predictions = perform_extraction_and_classify_image(image_path_str, model)
        
        # Compare predictions with correct answers and calculate total marks
        for i in range(min(len(predictions), len(ANSWER))):  # Use min() to avoid index out of range error
            if predictions[i].lower() == str(ANSWER[i]).lower():  # Compare ignoring case
                total_marks += 1
                
        print(f"{image_name} scored: {total_marks}")
        
    except Exception as e:
        # Record the image path that caused an error
        errored_images.append(image_path_str)
        print(f"ERROR: {image_path_str} - {str(e)}")
        
    # Append the results (image name and predicted marks) to the marks DataFrame
    marks_df = pd.concat([marks_df, pd.DataFrame({"img_name": [image_name], "pred_marks": [total_marks]})], ignore_index=True)
    
# Write the updated marks DataFrame back to the CSV file
marks_df.to_csv(marks_csv_path, index=False)

# Print list of images that caused errors during processing
if len(errored_images) > 0:
    print("\nErrored images: ", end="")
    for err_image in errored_images:
        print(err_image, end=", ")

Processing image: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/04t0i6U.jpg
Extracting data for: 04t0i6U.jpg
No tables detected
ERROR: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/04t0i6U.jpg - object of type 'NoneType' has no len()
Processing image: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/0c7xEno.jpg
Extracting data for: 0c7xEno.jpg
0c7xEno.jpg scored: 4
Processing image: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/0CtvlvW.jpg
Extracting data for: 0CtvlvW.jpg
0CtvlvW.jpg scored: 0
Processing image: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/0Ld34zC.jpg
Extracting data for: 0Ld34zC.jpg
No tables detected
ERROR: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/0Ld34zC.jpg - object of type 'NoneType' has no len()
Processing image: evaluation-data/phase_1_eval_dataset_enc/phase_1_eval_dataset/0lnF9tq.jpg
Extracting data for: 0lnF9tq.jpg
No tables detected
ERROR: evaluation-data/phase_1

#### Properly add the marks to _`submission.csv`_ from _`results.csv`_

In [2]:
import pandas as pd

# Define paths to the results and submission CSV files
results_csv_path = "evaluation-data/results.csv"
submission_csv_path = "evaluation-data/submission.csv"

# Read the results CSV file
results_df = pd.read_csv(results_csv_path)

# Read the submission CSV file
submission_df = pd.read_csv(submission_csv_path)

# Create a dictionary from the results for quick lookup
results_dict = results_df.set_index('img_name')['pred_marks'].to_dict()

# Update the submission DataFrame with the corresponding marks
submission_df['pred_marks'] = submission_df['img_name'].map(results_dict)

# Write the updated submission DataFrame back to the CSV file
submission_df.to_csv(submission_csv_path, index=False)

print("Submission file updated successfully.")

Submission file updated successfully.
