# Modelo Tablenet (Paliwal, 2019) - ajustado em abril/maio 2024
Artigo: Tablenet: Deep learning model for end-to-end table detection and tabular data extraction from scanned document images
https://arxiv.org/pdf/2001.01469.pdf

# Carregando bibliotecas do Modelo

In [None]:
!pip install --upgrade --force-reinstall pillow==9.0.0 #rodar essa linha primeiro (que força reiniciar a sessão)

In [None]:
from IPython.display import clear_output
!pip install pytorch_lightning
!pip install pytesseract
!pip install --upgrade --force-reinstall --no-deps albumentations
!sudo apt install tesseract-ocr
!pip install pymupdf
!pip install pdf2image
!pip install --upgrade pillow==9.0.0
clear_output()

In [None]:
import pytesseract
import shutil
import pytz
import os
import random
from datetime import datetime
try:
    from PIL import Image #esse problema de import da classe Image deve ser considerado para demais modelos
except ImportError:
    import Image
clear_output()

In [None]:
#download do modelo (best_modelo.ckpt) - nao precisa carregar de novo pois o modelo já foi baixado
!gdown --id 19nNp42l0rN1epa9AwEQEqGX5fbImMV6s
#download de exemplos de imagens para predição (samples.rar)
!gdown --id 1xiEUoeV_L8kpd2BwHyD8Grz2OCRMGjCn
!unrar x /content/samples.rar
clear_output()

In [None]:
import pytorch_lightning as pl
import torch
from torch import nn, optim
from torch.nn import functional as F
from torchvision.models import vgg19, vgg19_bn #(COMENTEI POIS DEU ERRO)
from collections import OrderedDict
from typing import List
import numpy as np
import pandas as pd
from albumentations import Compose
from PIL import Image
from pytesseract import image_to_string
from skimage.filters import threshold_otsu
from skimage.segmentation import clear_border
from skimage.measure import label, regionprops
from skimage.morphology import closing, square, convex_hull_image
from skimage.transform import resize
from skimage.util import invert
import cv2
#from tablenet import TableNetModule
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
#funcao para converter PDF para png
from pdf2image import convert_from_path

# Classe e Funções do Modelo

In [None]:
"""TableNet Module."""

EPSILON = 1e-15


class TableNetModule(pl.LightningModule):
    """Pytorch Lightning Module for TableNet."""

    def __init__(self, num_class: int = 1, batch_norm: bool = False):
        """Initialize TableNet Module.

        Args:
            num_class (int): Number of classes per point.
            batch_norm (bool): Select VGG with or without batch normalization.
        """
        super().__init__()
        self.model = TableNet(num_class, batch_norm)
        self.num_class = num_class
        self.dice_loss = DiceLoss()

    def forward(self, batch):
        """Perform forward-pass.

        Args:
            batch (tensor): Batch of images to perform forward-pass.

        Returns (Tuple[tensor, tensor]): Table, Column prediction.
        """
        return self.model(batch)

    def training_step(self, batch, batch_idx):
        """Get training step.

        Args:
            batch (List[Tensor]): Data for training.
            batch_idx (int): batch index.

        Returns: Tensor
        """
        samples, labels_table, labels_column = batch
        output_table, output_column = self.forward(samples)

        loss_table = self.dice_loss(output_table, labels_table)
        loss_column = self.dice_loss(output_column, labels_column)

        self.log('train_loss_table', loss_table)
        self.log('train_loss_column', loss_column)
        self.log('train_loss', loss_column + loss_table)
        return loss_table + loss_column

    def validation_step(self, batch, batch_idx):
        """Get validation step.

        Args:
            batch (List[Tensor]): Data for training.
            batch_idx (int): batch index.

        Returns: Tensor
        """
        samples, labels_table, labels_column = batch
        output_table, output_column = self.forward(samples)

        loss_table = self.dice_loss(output_table, labels_table)
        loss_column = self.dice_loss(output_column, labels_column)

        if batch_idx == 0:
            self._log_images("validation", samples, labels_table, labels_column, output_table, output_column)

        self.log('valid_loss_table', loss_table, on_epoch=True)
        self.log('valid_loss_column', loss_column, on_epoch=True)
        self.log('validation_loss', loss_column + loss_table, on_epoch=True)
        self.log('validation_iou_table', binary_mean_iou(output_table, labels_table), on_epoch=True)
        self.log('validation_iou_column', binary_mean_iou(output_column, labels_column), on_epoch=True)
        return loss_table + loss_column

    def test_step(self, batch, batch_idx):
        """Get test step.

        Args:
            batch (List[Tensor]): Data for training.
            batch_idx (int): batch index.

        Returns: Tensor
        """
        samples, labels_table, labels_column = batch
        output_table, output_column = self.forward(samples)

        loss_table = self.dice_loss(output_table, labels_table)
        loss_column = self.dice_loss(output_column, labels_column)

        if batch_idx == 0:
            self._log_images("test", samples, labels_table, labels_column, output_table, output_column)

        self.log('test_loss_table', loss_table, on_epoch=True)
        self.log('test_loss_column', loss_column, on_epoch=True)
        self.log('test_loss', loss_column + loss_table, on_epoch=True)
        self.log('test_iou_table', binary_mean_iou(output_table, labels_table), on_epoch=True)
        self.log('test_iou_column', binary_mean_iou(output_column, labels_column), on_epoch=True)
        return loss_table + loss_column

    def configure_optimizers(self):
        """Configure optimizer for pytorch lighting.

        Returns: optimizer and scheduler for pytorch lighting.

        """
        optimizer = optim.SGD(self.parameters(), lr=0.0001)
        scheduler = {
            'scheduler': optim.lr_scheduler.OneCycleLR(optimizer,
                                                       max_lr=0.0001, steps_per_epoch=204, epochs=500, pct_start=0.1),
            'interval': 'step',
        }

        return [optimizer], [scheduler]

    def _log_images(self, mode, samples, labels_table, labels_column, output_table, output_column):
        """Log image on to logger."""
        self.logger.experiment.add_images(f'{mode}_generated_images', samples[0:4], self.current_epoch)
        self.logger.experiment.add_images(f'{mode}_labels_table', labels_table[0:4], self.current_epoch)
        self.logger.experiment.add_images(f'{mode}_labels_column', labels_column[0:4], self.current_epoch)
        self.logger.experiment.add_images(f'{mode}_output_table', output_table[0:4], self.current_epoch)
        self.logger.experiment.add_images(f'{mode}_output_column', output_column[0:4], self.current_epoch)


class TableNet(nn.Module):
    """TableNet."""

    def __init__(self, num_class: int, batch_norm: bool = False):
        """Initialize TableNet.

        Args:
            num_class (int): Number of classes per point.
            batch_norm (bool): Select VGG with or without batch normalization.
        """
        super().__init__()
        self.vgg = vgg19(pretrained=True).features if not batch_norm else vgg19_bn(pretrained=True).features
        self.layers = [18, 27] if not batch_norm else [26, 39]
        self.model = nn.Sequential(nn.Conv2d(512, 512, kernel_size=1),
                                   nn.ReLU(inplace=True),
                                   nn.Dropout(0.8),
                                   nn.Conv2d(512, 512, kernel_size=1),
                                   nn.ReLU(inplace=True),
                                   nn.Dropout(0.8))
        self.table_decoder = TableDecoder(num_class)
        self.column_decoder = ColumnDecoder(num_class)

    def forward(self, x):
        """Forward pass.

        Args:
            x (tensor): Batch of images to perform forward-pass.

        Returns (Tuple[tensor, tensor]): Table, Column prediction.
        """
        results = []
        for i, layer in enumerate(self.vgg):
            x = layer(x)
            if i in self.layers:
                results.append(x)
        x_table = self.table_decoder(x, results)
        x_column = self.column_decoder(x, results)
        return torch.sigmoid(x_table), torch.sigmoid(x_column)


class ColumnDecoder(nn.Module):
    """Column Decoder."""

    def __init__(self, num_classes: int):
        """Initialize Column Decoder.

        Args:
            num_classes (int): Number of classes per point.
        """
        super().__init__()
        self.decoder = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Dropout(0.8),
            nn.Conv2d(512, 512, kernel_size=1),
            nn.ReLU(inplace=True),
        )
        self.layer = nn.ConvTranspose2d(1280, num_classes, kernel_size=2, stride=2, dilation=1)

    def forward(self, x, pools):
        """Forward pass.

        Args:
            x (tensor): Batch of images to perform forward-pass.
            pools (Tuple[tensor, tensor]): The 3 and 4 pooling layer from VGG-19.

        Returns (tensor): Forward-pass result tensor.

        """
        pool_3, pool_4 = pools
        x = self.decoder(x)
        x = F.interpolate(x, scale_factor=2)
        x = torch.cat([x, pool_4], dim=1)
        x = F.interpolate(x, scale_factor=2)
        x = torch.cat([x, pool_3], dim=1)
        x = F.interpolate(x, scale_factor=2)
        x = F.interpolate(x, scale_factor=2)
        return self.layer(x)


class TableDecoder(ColumnDecoder):
    """Table Decoder."""

    def __init__(self, num_classes):
        """Initialize Table decoder.

        Args:
            num_classes (int): Number of classes per point.
        """
        super().__init__(num_classes)
        self.decoder = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=1),
            nn.ReLU(inplace=True),
        )


class DiceLoss(nn.Module):
    """Dice loss."""

    def __init__(self):
        """Dice Loss."""
        super().__init__()

    def forward(self, inputs, targets, smooth=1):
        """Calculate loss.

        Args:
            inputs (tensor): Output from the forward pass.
            targets (tensor): Labels.
            smooth (float): Value to smooth the loss.

        Returns (tensor): Dice loss.

        """
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)

        return 1 - dice


def binary_mean_iou(inputs, targets):
    """Calculate binary mean intersection over union.

    Args:
        inputs (tensor): Output from the forward pass.
        targets (tensor): Labels.

    Returns (tensor): Intersection over union value.
    """
    output = (inputs > 0).int()

    if output.shape != targets.shape:
        targets = torch.squeeze(targets, 1)

    intersection = (targets * output).sum()

    union = targets.sum() + output.sum() - intersection

    result = (intersection + EPSILON) / (union + EPSILON)

    return result


In [None]:
"""Marmot Dataset Module."""

from pathlib import Path
from typing import List

import numpy as np
import pytorch_lightning as pl
from albumentations import Compose
from PIL import Image
from torch.utils.data import Dataset, DataLoader


class MarmotDataset(Dataset):
    """Marmot Dataset."""

    def __init__(self, data: List[Path], transforms: Compose = None) -> None:
        """Marmot Dataset initialization.

        Args:
            data (List[Path]): A list of Path.
            transforms (Optional[Compose]): Compose object from albumentations.
        """
        self.data = data
        self.transforms = transforms

    def __len__(self):
        """Dataset Length."""
        return len(self.data)

    def __getitem__(self, item):
        """Get sample data.

        Args:
            item (int): sample id.

        Returns (Tuple[tensor, tensor, tensor]): Image, Table Mask, Column Mask
        """
        sample_id = self.data[item].stem

        image_path = self.data[item]
        table_path = self.data[item].parent.parent.joinpath("table_mask", sample_id + ".bmp")
        column_path = self.data[item].parent.parent.joinpath("column_mask", sample_id + ".bmp")

        image = np.array(Image.open(image_path))
        table_mask = np.expand_dims(np.array(Image.open(table_path)), axis=2)
        column_mask = np.expand_dims(np.array(Image.open(column_path)), axis=2)
        mask = np.concatenate([table_mask, column_mask], axis=2) / 255
        sample = {"image": image, "mask": mask}
        if self.transforms:
            sample = self.transforms(image=image, mask=mask)

        image = sample["image"]
        mask_table = sample["mask"][:, :, 0].unsqueeze(0)
        mask_column = sample["mask"][:, :, 1].unsqueeze(0)
        return image, mask_table, mask_column


class MarmotDataModule(pl.LightningDataModule):
    """Pytorch Lightning Data Module for Marmot."""

    def __init__(self, data_dir: str = "./data", transforms_preprocessing: Compose = None,
                 transforms_augmentation: Compose = None, batch_size: int = 8, num_workers: int = 4):
        """Marmot Data Module initialization.

        Args:
            data_dir (str): Dataset directory.
            transforms_preprocessing (Optional[Compose]): Compose object from albumentations applied
             on validation an test dataset.
            transforms_augmentation (Optional[Compose]): Compose object from albumentations applied
             on training dataset.
            batch_size (int): Define batch size.
            num_workers (int): Define number of workers to process data.
        """
        super().__init__()
        self.data = list(Path(data_dir).rglob("*.bmp"))
        self.transforms_preprocessing = transforms_preprocessing
        self.transforms_augmentation = transforms_augmentation
        self.batch_size = batch_size
        self.num_workers = num_workers

        self.setup()

    def setup(self, stage: str = None) -> None:
        """Start training, validation and test datasets.

        Args:
            stage (Optional[str]): Used to separate setup logic for trainer.fit and trainer.test.
        """
        n_samples = len(self.data)
        self.data.sort()
        train_slice = slice(0, int(n_samples * 0.8))
        val_slice = slice(int(n_samples * 0.8), int(n_samples * 0.9))
        test_slice = slice(int(n_samples * 0.9), n_samples)

        self.complaint_train = MarmotDataset(self.data[train_slice], transforms=self.transforms_augmentation)
        self.complaint_val = MarmotDataset(self.data[val_slice], transforms=self.transforms_preprocessing)
        self.complaint_test = MarmotDataset(self.data[test_slice], transforms=self.transforms_preprocessing)

    def train_dataloader(self, *args, **kwargs) -> DataLoader:
        """Create Dataloader.

        Returns: DataLoader
        """
        return DataLoader(self.complaint_train, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self, *args, **kwargs) -> DataLoader:
        """Create Dataloader.

        Returns: DataLoader
        """
        return DataLoader(self.complaint_val, batch_size=self.batch_size, num_workers=self.num_workers)

    def test_dataloader(self, *args, **kwargs) -> DataLoader:
        """Create Dataloader.

        Returns: DataLoader
        """
        return DataLoader(self.complaint_test, batch_size=self.batch_size, num_workers=self.num_workers)


In [None]:
class Predict:
    """Predict images using pre-trained model."""

    def __init__(self, checkpoint_path: str, transforms: Compose, threshold: float = 0.5, per: float = 0.005):
        """Predict images using pre-trained TableNet model.
        Args:
            checkpoint_path (str): model weights path.
            transforms (Optional[Compose]): Compose object from albumentations used for pre-processing.
            threshold (float): threshold to consider the value as correctly classified.
            per (float): Minimum area for tables and columns to be considered.
        """
        self.transforms = transforms
        self.threshold = threshold
        self.per = per

        self.model = TableNetModule.load_from_checkpoint(checkpoint_path)
        self.model.eval()
        self.model.requires_grad_(False)
        self.inp_img = ""

        #====>ALTERACAO 1
        # Move o modelo para a GPU, se disponível
        if torch.cuda.is_available():
          self.model.cuda()

    def predict(self, image: Image) -> List[pd.DataFrame]:
        """Predict a image table values.
        Args:
            image (Image): PIL.Image to
        Returns (List[pd.DataFrame]): Tables in pandas DataFrame format.
        """
        processed_image = self.transforms(image=np.array(image))["image"]

        #====>ALTERACAO 2
        # Move a imagem processada para a GPU, se disponível
        if torch.cuda.is_available():
          processed_image = processed_image.cuda()

        self.inp_img = cv2.resize(np.array(image), (896, 896))
        #cv2_imshow(self.inp_img) #AQUI EXIBE AS IMAGENS TRABALHADAS 1 DE 3 (COMENTEI PARA NAO FICAR PESADO)

        #====>ALTERACAO 3
        # Faz a predição na GPU
        with torch.no_grad():
          table_mask, column_mask = self.model.forward(processed_image.unsqueeze(0))

        #====>ALTERACAO 4
        # Move as máscaras para a CPU para processamento adicional
        table_mask = table_mask.cpu()
        column_mask = column_mask.cpu()

        table_mask, column_mask = self.model.forward(processed_image.unsqueeze(0))

        table_mask = self._apply_threshold(table_mask)
        column_mask = self._apply_threshold(column_mask)
        # print(type(table_mask))
        # print(type(column_mask))
        # print(table_mask.shape)
        # print(np.unique(table_mask))
        # print(column_mask.shape)
        # print(np.unique(column_mask))
        # cv2_imshow((table_mask*255).astype(np.uint8))
        # cv2_imshow((column_mask*255).astype(np.uint8))

        segmented_tables = self._process_tables(self._segment_image(table_mask))
        # print(type(segmented_tables[0]))
        # print(segmented_tables[0].shape)
        # for i in range(len(segmented_tables)):
        #     cv2_imshow((segmented_tables[i]*255).astype(np.uint8))

        # for table in segmented_tables:
        #     abc = self._segment_image(column_mask*table)
        #     cv2_imshow(abc*255)
        #cv2_imshow(self.inp_img*cv2.merge((table_mask, table_mask, table_mask))) #AQUI EXIBE AS IMAGENS 2 DE 3 TRABALHADAS (COMENTEI PARA NAO FICAR PESADO)
        #cv2_imshow(self.inp_img*cv2.merge((column_mask, column_mask, column_mask))) #AQUI EXIBE AS IMAGENS 3 DE 3 TRABALHADAS (COMENTEI PARA NAO FICAR PESADO)
        tables = []
        for table in segmented_tables:
            segmented_columns = self._process_columns(self._segment_image(column_mask * table))
            if segmented_columns:
                cols = []
                for column in segmented_columns.values():
                    #cv2_imshow(self.inp_img*cv2.merge((column, column, column)))
                    cols.append(self._column_to_dataframe(column, image))
                tables.append(pd.concat(cols, ignore_index=True, axis=1))
        return tables

    def _apply_threshold(self, mask):

        #mask = mask.squeeze(0).squeeze(0).numpy() > self.threshold

        #ALTERACAO 5
        mask = mask.cpu().squeeze(0).squeeze(0).numpy() > self.threshold

        return mask.astype(int)

    def _process_tables(self, segmented_tables):
        width, height = segmented_tables.shape
        tables = []
        for i in np.unique(segmented_tables)[1:]:
            table = np.where(segmented_tables == i, 1, 0)
            if table.sum() > height * width * self.per:
                tables.append(convex_hull_image(table))

        return tables

    def _process_columns(self, segmented_columns):
        width, height = segmented_columns.shape
        cols = {}
        for j in np.unique(segmented_columns)[1:]:
            column = np.where(segmented_columns == j, 1, 0)
            column = column.astype(int)

            if column.sum() > width * height * self.per:
                position = regionprops(column)[0].centroid[1]
                cols[position] = column
        return OrderedDict(sorted(cols.items()))

    @staticmethod
    def _segment_image(image):
        thresh = threshold_otsu(image)
        bw = closing(image > thresh, square(2))
        cleared = clear_border(bw)
        label_image = label(cleared)
        return label_image

    @staticmethod
    def _column_to_dataframe(column, image):
        width, height = image.size
        column = resize(np.expand_dims(column, axis=2), (height, width), preserve_range=True) > 0.01

        crop = column * image
        white = np.ones(column.shape) * invert(column) * 255
        crop = crop + white
        ocr = image_to_string(Image.fromarray(crop.astype(np.uint8)))
        return pd.DataFrame({"col": [value for value in ocr.split("\n") if len(value) > 0]})

In [None]:
def predict(image_path: str, model_weights: str) -> List[pd.DataFrame]:
    """Predict table content.

    Args:
        image_path (str): image path.
        model_weights (str): model weights path.

    Returns (List[pd.DataFrame]): Tables in pandas DataFrame format.
    """
    import albumentations as album
    from albumentations.pytorch.transforms import ToTensorV2

    transforms = album.Compose([
        album.Resize(896, 896, always_apply=True),
        album.Normalize(),
        ToTensorV2()
    ])
    pred = Predict(model_weights, transforms)

    image = Image.open(image_path)

    x = pred.predict(image)
    #print(pred.predict(image))

    #ajustei aqui para retornar a lista de dataframes
    return (x)

# Funções Pré-processamento para MAIN

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from IPython.display import clear_output
!pip install pymupdf
!pip install pdf2image
!apt-get install poppler-utils
clear_output()

In [None]:
#Bibliotecas
from PIL import Image
from PIL import ImageDraw
from huggingface_hub import hf_hub_download
import fitz
import os
import pandas as pd
from pdf2image import convert_from_path
from torchvision import transforms
import torch
import numpy as np
import csv
from tqdm.auto import tqdm
import cv2
from bs4 import BeautifulSoup as bs

In [None]:
#caminho dos certificados para analise
CERT_PATH = "/content/drive/MyDrive/DataSets/Certificados/In/"

#caminho de saida para geração das anotações das tabelas
OUT_PATH = "/content/drive/MyDrive/DataSets/Certificados/Out/Test/"

#diretorio dos arquivos do GT - Ground Truth (REFERENCIA para comparacao das tabelas)
GT_PATH = "/content/drive/MyDrive/DataSets/Certificados/Out/GT/"

#diretorio de escrita de arquivos
DIROUT_TABLENET = "/content/drive/MyDrive/DataSets/Certificados/Out/TableNet/"

#caminho do modelo TABLENET
TABLENET_MODEL_PATH = '/content/drive/MyDrive/DataSets/Tablenet/Tablenet.ckpt'

In [None]:
#funcao para varrer os arquivos PDF dos laboratórios nas pastas e retorna um dataframe
#! o método está bem lento (deve verificar o motivo posteriormente)
def InfoPDF(pathPDF):

  lenPages = 0
  doc = fitz.open(pathPDF)

  lenPages = len(doc)
  for pagina in doc:
    isText = bool(pagina.get_text())
    break

  typeArq = ["TEXT" if isText else "IMAGE"]

  return typeArq, lenPages

def listFiles_OLD(CERT_PATH):

  dfArq = pd.DataFrame()
  dirs = [nome for nome in os.listdir(CERT_PATH) if os.path.isdir(os.path.join(CERT_PATH, nome))]

  i = 0
  for dir in dirs:

    #coletando dados do diretorio (id, laboratorio)
    lstDir = dir.split("_")

    if(len(lstDir)==3):

      for file in os.listdir(os.path.join(CERT_PATH, dir)):

        #é arquivo PDF
        if file.lower().endswith(".pdf"):

          pathFile = CERT_PATH + dir + "/" + file
          dfArq.at[i,"LAB"] = lstDir[2]
          dfArq.at[i,'PATH'] = pathFile

          typeArq, qtdPages = InfoPDF(pathFile)
          dfArq.at[i,'TYPE'] = typeArq
          dfArq.at[i,'QTDPAGES'] = qtdPages

          i = i + 1

  return dfArq

def listFiles(CERT_PATH, LAB_PATH):

  dfArq = pd.DataFrame()

  if LAB_PATH is None:
    dirs = [nome for nome in os.listdir(CERT_PATH) if os.path.isdir(os.path.join(CERT_PATH, nome))]
  else:
    dirs = [LAB_PATH]

  i = 0
  for dir in dirs:

    #coletando dados do diretorio (id, laboratorio)
    lstDir = dir.split("_")
    print("lstDir ", lstDir);

    if(len(lstDir)==3):

      for file in os.listdir(os.path.join(CERT_PATH, dir)):

        #é arquivo PDF
        if file.lower().endswith(".pdf"):

          pathFile = CERT_PATH + dir + "/" + file
          print("pathFile ", pathFile);
          dfArq.at[i,"LAB"] = lstDir[2]
          dfArq.at[i,'PATH'] = pathFile

          typeArq, qtdPages = InfoPDF(pathFile)
          dfArq.at[i,'TYPE'] = typeArq
          dfArq.at[i,'QTDPAGES'] = qtdPages

          i = i + 1

  return dfArq

#dfArq = listFiles(CERT_PATH)

def deleteFiles2(dirpath):
  # Obtém a lista de arquivos no diretório
  files = os.listdir(dirpath)

  # Itera sobre os arquivos e os remove
  for file in files:
    filepath = os.path.join(dirpath, file)
    if os.path.isfile(filepath):
      os.remove(filepath)

def is_image_by_extension(file_path):
  image_extensions = ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']  # Adicione outras extensões se necessário
  file_extension = file_path.lower().split('.')[-1]
  return file_extension in image_extensions

def is_image(file_path):
    try:
        # Tenta abrir o arquivo como uma imagem
        Image(file_path)
        return True
    except IOError:
        # Se não for possível abrir como uma imagem, retorna False
        return False

def pdf_page_to_png(pdf_path, page_number, output_path):
  # Convertendo a página do PDF para uma lista de imagens
  images = convert_from_path(pdf_path, first_page=page_number, last_page=page_number)

  # Salvando a imagem como PNG
  images[0].save(output_path, 'PNG')

class MaxResize(object):
  def __init__(self, max_size=800):
      self.max_size = max_size

  def __call__(self, image):
      width, height = image.size
      current_max_size = max(width, height)
      scale = self.max_size / current_max_size
      resized_image = image.resize((int(round(scale*width)), int(round(scale*height))))

      return resized_image

def get_cell_coordinates_by_row(table_data):
  # Extract rows and columns
  rows = [entry for entry in table_data if entry['label'] == 'table row']
  columns = [entry for entry in table_data if entry['label'] == 'table column']

  # Sort rows and columns by their Y and X coordinates, respectively
  rows.sort(key=lambda x: x['bbox'][1])
  columns.sort(key=lambda x: x['bbox'][0])

  # Function to find cell coordinates
  def find_cell_coordinates(row, column):
      cell_bbox = [column['bbox'][0], row['bbox'][1], column['bbox'][2], row['bbox'][3]]
      return cell_bbox

  # Generate cell coordinates and count cells in each row
  cell_coordinates = []

  for row in rows:
      row_cells = []
      for column in columns:
          cell_bbox = find_cell_coordinates(row, column)
          row_cells.append({'column': column['bbox'], 'cell': cell_bbox})

      # Sort cells in the row by X coordinate
      row_cells.sort(key=lambda x: x['column'][0])

      # Append row information to cell_coordinates
      cell_coordinates.append({'row': row['bbox'], 'cells': row_cells, 'cell_count': len(row_cells)})

  # Sort rows from top to bottom
  cell_coordinates.sort(key=lambda x: x['row'][1])

  return cell_coordinates

def aumentar_qualidade_e_contraste(imagem_path, fator_contraste, fator_brilho):
  # Carregar a imagem
  imagem = cv2.imread(imagem_path)

  # Converter a imagem para o espaço de cores LAB (Luminância, Azul, Vermelho)
  lab = cv2.cvtColor(imagem, cv2.COLOR_BGR2LAB)

  # Separar os canais L, A, B
  l, a, b = cv2.split(lab)

  # Aplicar o aumento de contraste na imagem L (luminância)
  l = cv2.add(l, fator_brilho)
  l = cv2.multiply(l, fator_contraste)

  # Mesclar novamente os canais LAB
  lab = cv2.merge((l, a, b))

  # Converter a imagem de volta para o espaço de cores BGR
  imagem_contraste = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)

  return imagem_contraste

In [None]:
#FUNÇÕES DE MANIPULACAO DE IMAGENS E ARQUIVOS

#ler dados da celula
def apply_ocr(cell_coordinates):
    # let's OCR row by row
    data = dict()
    max_num_columns = 0
    for idx, row in enumerate(tqdm(cell_coordinates)):
      row_text = []
      for cell in row["cells"]:
        # crop cell out of image
        cell_image = np.array(cropped_table.crop(cell["cell"]))
        # apply OCR
        result = reader.readtext(np.array(cell_image))
        if len(result) > 0:
          # print([x[1] for x in list(result)])
          text = " ".join([x[1] for x in result])
          row_text.append(text)

      if len(row_text) > max_num_columns:
          max_num_columns = len(row_text)

      data[idx] = row_text

    #print("Max number of columns:", max_num_columns)

    # pad rows which don't have max_num_columns elements
    # to make sure all rows have the same number of columns
    for row, row_data in data.copy().items():
        if len(row_data) != max_num_columns:
          row_data = row_data + ["" for _ in range(max_num_columns - len(row_data))]
        data[row] = row_data

    return data

#similaridade de strings conhecido como "Distância de Levenshtein"
def calcPercSimStrings(str1, str2):

  #retirando quebra de linhas da string
  str1 = str1.replace("\n", " ")
  str2 = str2.replace("\n", " ")

  tamanho_str1 = len(str1)
  tamanho_str2 = len(str2)

  matriz = [[0] * (tamanho_str2 + 1) for _ in range(tamanho_str1 + 1)]

  for i in range(tamanho_str1 + 1):
    matriz[i][0] = i

  for j in range(tamanho_str2 + 1):
    matriz[0][j] = j

  for i in range(1, tamanho_str1 + 1):
    for j in range(1, tamanho_str2 + 1):
        if str1[i - 1] == str2[j - 1]:
            custo_substituicao = 0
        else:
            custo_substituicao = 1
        matriz[i][j] = min(matriz[i - 1][j] + 1,       # Deletar
                            matriz[i][j - 1] + 1,       # Inserir
                              matriz[i - 1][j - 1] + custo_substituicao)  # Substituir

  distancia = matriz[tamanho_str1][tamanho_str2]
  maximo_tamanho = max(tamanho_str1, tamanho_str2)

  similaridade = 0
  if maximo_tamanho > 0:
    similaridade = (maximo_tamanho - distancia) / maximo_tamanho
  #print (" similaridade entre {0} e {1}: {2}".format(str1, str2, similaridade * 100))
  return similaridade * 100

#funcao que compara o valor de duas listas e calcula a media do percentual de similaridade entre eles
#(para calcular o valor do bbox das tabelas e células das tabelas)

def calcPercSimValueLists(lista1, lista2):
  if len(lista1) != len(lista2):
      print("calcPercSimValueLists, listas de tamanhos diferentes, lista1=",lista1,"/ lista2 = ",lista2)
      raise ValueError("As listas devem ter o mesmo comprimento.")

  percSim = [ (1 / (1 + (abs(num1 - num2)))) * 100 for num1, num2 in zip(lista1, lista2)]


  return sum(percSim) / len (percSim)

#(para calcular o percentual de similaridade entre dois números
def calcPercSimValueNums(num1, num2):

  percSim = (1 / (1 + (abs(num1 - num2)))) * 100
  #print (" similaridade entre os numeros {0} e {1}: {2}".format(num1, num2, percSim))
  return percSim

#coletar os arquivos de acordo com premissas (prefixo e sufixo)
def getFilesByPrefix(path, prefix, sufix):
  lstFiles = []
  for fileName in os.listdir(path):
      if prefix in fileName and fileName.endswith(sufix):
          lstFiles.append(fileName)
  return lstFiles

#coletar no arquivo do GT dados das tabelas de uma determinada pagina
def getListFilesGTInfo(curfile, page, path):

  prefix = curfile + "|" + str(page)
  sufix = "_INFO.info"
  lstTables = getFilesByPrefix (path, prefix, sufix)

  return lstTables

def getGTInfo(tableID, curfile, page, path):

  prefix = tableID + "|" + curfile + "|" + str(page)
  sufix = "_INFO.info"
  lstTables = getFilesByPrefix (path, prefix, sufix)

  return lstTables

#coletar as informacoes da tabela do arquivo _INFO e retornar para uma lista
def getListTablesInfo(path, listFiles):

  listTablesInfo = []
  for fileName in listFiles:
    with open(path + fileName, 'r') as file:
      conteudo = file.read()
      listTablesInfo.append(eval(conteudo))

  return listTablesInfo

# verificar se a estrutura dos dois dicionários INFO são similares
def checkDimensionINFO(dicTableGT, dicTable):

  msgErro = "0 - SUCESSO"
  isIdentical = True

  #se o tamanho das chaves dos dicionários são diferentes
  if(dicTableGT.keys() != dicTable.keys()):
    print("Dicionários não possuem o mesmo indice")
    msgErro = "1 - Dicionários não possuem o mesmo indice"
    isIdentical = False

  #se não tiver a mesma dimensao já descarta
  if dicTableGT["DIMENSION"] != dicTable["DIMENSION"]:
    msgErro = "2 - Dicionários não possuem a mesma dimensão"
    isIdentical = False

  #se o tamanho das colunas HEAD e FIRST_LINE não batem
  if(len(dicTableGT["HEAD"]) != len(dicTable["HEAD"])):
    print("Dicionários não possuem o mesmo tamanho da chave HEAD")
    msgErro = "3 - Dicionários não possuem o mesmo tamanho da chave HEAD"
    isIdentical = False

  if(len(dicTableGT["FIRST_LINE"]) != len(dicTable["FIRST_LINE"])):
    msgErro = "4 - Dicionários não possuem o mesmo tamanho da chave FIRST_LINE"
    isIdentical = False

  #return msgErro, isSimilar
  return isIdentical

#calcular similaridade da string das células dos dicionários INFO de mesma dimensão
#(para determinar se dois dicionarios INFO sao similares)
def getPercSimTablesINFO(dicTableGT, dicTable , percTolerancia):

  i = 0
  for textTableGT in dicTableGT["HEAD"]:
    textTable = dicTable["HEAD"][i]
    if (calcPercSimStrings(textTableGT, textTable) < percTolerancia):
      print("Tolerancia entre string ", textTableGT, "e ", textTable,  " menor que ", percTolerancia)
      return False
    i+=1

  i = 0
  for textTableGT in dicTableGT["FIRST_LINE"]:
    textTable = dicTable["FIRST_LINE"][i]
    if (calcPercSimStrings(textTableGT, textTable) < percTolerancia):
      print("Tolerancia entre string ", textTableGT, "e ", textTable,  " menor que ", percTolerancia)
      return False
    i+=1

  return True


#verifica se a dimensão dos dicionários INFO possuem tamanhos parecidos (maximo 1 de diferença)
def isAlmostSimilarINFO(dicTableGT, dicTable):

  #devem possuir a mesma quantidade de linhas e quantidade semelhante de colunas (maximo no modulo 1)

  #colunas
  qtdColGT = len(dicTableGT["HEAD"])
  qtdColTab = len(dicTable["HEAD"])

  #linhas
  qtdLinhasGT = dicTableGT["DIMENSION"].split("X")[0]
  qtdLinhas = dicTable["DIMENSION"].split("X")[0]

  if abs(qtdColGT-qtdColTab) <=1 and qtdLinhasGT == qtdLinhas:
    return True

  return False

#verificar maior valor na lista
def maiorValor(lista):

  maiorValor = 0
  for item in lista:
    if item > maiorValor:
        maiorValor = item

  return maiorValor

#verificar possivel similaridade no cabeçalho dos dicionários INFO
def checkAVGSimilaritiesINFO(dicTableGT, dicTable, percTolerancia):

  qtdColGT = len(dicTableGT["HEAD"])
  qtdColTab = len(dicTable["HEAD"])

  arrSim = []
  arrSummary = []
  limit = 4

  #verificar similaridade nas 3 primeiras colunas
  for i in range(qtdColTab):

    for j in range(qtdColGT):
      strTab = dicTable["HEAD"][i]
      strGT = dicTableGT["HEAD"][j]
      arrSim.append(calcPercSimStrings(strTab, strGT))

    arrSummary.append(maiorValor(arrSim))
    arrSim = []

    if(i>= limit):
      break

  #print(arrSummary)
  avgPerc = sum(arrSummary) / len(arrSummary)
  return avgPerc >= percTolerancia


#verificar possivel similaridade no cabeçalho dos dicionários FIRST_LINE
def checkAVGSimilarities2INFO(dicTableGT, dicTable, percTolerancia):

  qtdColGT = len(dicTableGT["FIRST_LINE"])
  qtdColTab = len(dicTable["FIRST_LINE"])

  arrSim = []
  arrSummary = []
  limit = 3

  #verificar similaridade nas 3 primeiras colunas
  for i in range(qtdColTab):

    for j in range(qtdColGT):
      strTab = dicTable["FIRST_LINE"][i]
      strGT = dicTableGT["FIRST_LINE"][j]
      arrSim.append(calcPercSimStrings(strTab, strGT))

    arrSummary.append(maiorValor(arrSim))
    arrSim = []

    if(i>= limit):
      break

  #print(arrSummary)
  avgPerc = sum(arrSummary) / len(arrSummary)
  return avgPerc >= percTolerancia

def getDicTableInfo(labName, curfile, page, qtdlinhas, qtdcolunas, bbox, head, firstLine):
  dicTableInfo = {}

  dicTableInfo["LAB"] = labName
  dicTableInfo["FILE"] = curfile
  dicTableInfo["PAGE"] = page
  dicTableInfo["TABLEID"] = "TBD"
  dicTableInfo["DIMENSION"] = str(qtdlinhas) + "X" + str(qtdcolunas)
  dicTableInfo["BBOX"] = bbox
  dicTableInfo["HEAD"] = head
  dicTableInfo["FIRST_LINE"] = firstLine

  return dicTableInfo

def SaveDicTableInfo (filePath, dicTableInfo):

  strFile = "{"
  lenDic = len(dicTableInfo.items())
  #print(lenDic)
  i = 0
  for chave, valor in dicTableInfo.items():
    vir = "," if i < lenDic-1 else ""
    if type(valor) == str:
      strFile += "'" + str(chave)+ "':'" + str(valor) + "'" + vir + "\n"
    else:
      strFile += "'" + str(chave) + "':" + str(valor) + vir + "\n"
    i = i + 1

  strFile += "}"
  with open(filePath, 'w') as arquivo:
    arquivo.write(strFile)

#numero de ocorrencias de um numero em uma lista
def numTimes(list, num):

  cont = 0
  for valor in list:
    if valor == num:
      cont+=1

  return cont

def isDecimal(valor):
  try:

    if valor == "NAN" or valor == "nan":
      return False
    else:
      float(valor)
      return True
  except ValueError:
      return False

def noteTokensHTML(df):

  lsthead = []
  lsttd = []
  hasheader = False
  hasdata = False
  dict_tokens_html = {}
  lsthead

  #percorre o dataframe para construir a estrutura html de colunas
  for i in range(len(df)):
    primeiraColuna = True
    j = 0
    for column in df.columns:

        value = "NAN"
        if not df.empty and not pd.isna(df.at[i, column]):
          # Value exists, access it
          value = str(df.at[i, column])
          value = value.replace("'","")
          value = value.replace("\\","")

        #primeira linha sao os cabeçalhos
        if i == 0:
          hasheader = True
          lsthead.append("<td>")
          #lsthead.append(value) #apenas para teste, comentar depois
          lsthead.append("</td>")
        else:
          hasdata = True
          if(primeiraColuna):
            primeiraColuna = False
            #a partir da 3a linha fecha a linha anterior </tr>
            if(j==0 and i > 1):
              lsttd.append("</tr>")
            lsttd.append("<tr>")
          lsttd.append("<td>")
          #lsttd.append(value) #apenas para teste, comentar depois
          lsttd.append("</td>")

        primeiraColuna = False
        j = j + 1

  if(hasheader):
    lsthead.insert(0,"<thead>")
    lsthead.insert(1,"<tr>")
    lsthead.append("</tr>")
    lsthead.append("</thead>")

  if(hasdata):
    lsttd.insert(0,"<tbody>")
    lsttd.append("</tbody>")

  #se a estrutura tiver completa, adiciona no dicionario tokens
  if(hasheader and hasdata):
    lsthead.extend(lsttd)
  else:
    dict_tokens_html = {"tokens": "vazio"}
    lsthead.extend(dict_tokens_html)

  lsthead.insert(0,"<table>")
  lsthead.extend("</table>")
  return lsthead


def noteListTokensBbox(cell_coordinates, lstData, lstTableRef):
  list_tokens_bbox = []

  #dimensao dos dados
  qtdRowData = len(lstData)
  qtdColData = len(lstData[0])
  print("dimensao Data {0} x {1} ".format(qtdRowData, qtdColData))

  #carregando BBOX de cada celula por linha para uma lista
  qtdlinhasBbox = len(cell_coordinates)
  qtdcolunasBbox = len(cell_coordinates[0]["cells"])
  print("dimensao Bbox {0} x {1} ".format(qtdlinhasBbox, qtdcolunasBbox))


  i = 0
  for row in cell_coordinates:
    j = 0
    for bbox in row["cells"]:
      x1 = round(bbox["cell"][0])
      y1 = round(bbox["cell"][1])
      x2 = round(bbox["cell"][2])
      y2 = round(bbox["cell"][3])

      if(lstTableRef is None):
        dict_tokens_bbox = {'tokens': list(lstData[i][j]), 'bbox': [x1, y1, x2, y2]}
      else:
        dict_tokens_bbox = {'tokens': list(lstData[i][j]), 'bbox': [x1 - lstTableRef[0], y1 - lstTableRef[1], x2 - lstTableRef[0], y2 - lstTableRef[1]]}
      list_tokens_bbox.append(dict_tokens_bbox)
      j+=1
    i+=1

  return list_tokens_bbox

def noteListTokensBbox(lstBbox, df, lstTableRef):
  list_tokens_bbox = []

  #dimensao dos dados
  qtdRowDf = 0 if df.empty else df.shape[0]
  qtdColDf = 0 if df.empty else df.shape[1]
  print("noteListTokensBbox - dimensao df {0} x {1} ".format(qtdRowDf, qtdColDf))

  #carregando BBOX de cada celula por linha para uma lista
  #qtdlinhasBbox = 0 if len(lstBbox) == 0 else len(lstBbox)
  #qtdcolunasBbox = 0 if len(lstBbox) == 0 and len(lstBbox[0]) else len(lstBbox)

  for i in range(qtdRowDf):
    for j in range(qtdColDf):
      #bbox = lstBbox[i][j]
      #x1 = round(bbox[0])
      #y1 = round(bbox[1])
      #x2 = round(bbox[2])
      #y2 = round(bbox[3])
      x1 = np.nan
      y1 = np.nan
      x2 = np.nan
      y2 = np.nan

      value = "NAN"
      if not df.empty and not pd.isna(df.at[i, j]):
        # Value exists, access it
        value = str(df.at[i, j])

      if(lstTableRef is None):
        dict_tokens_bbox = {'tokens': list(value), 'bbox': [x1, y1, x2, y2]}
      else:
        dict_tokens_bbox = {'tokens': list(value), 'bbox': [x1, y1, x2, y2]}
        #dict_tokens_bbox = {'tokens': list(df.at[i,j]), 'bbox': [x1 - lstTableRef[0], y1 - lstTableRef[1], x2 - lstTableRef[0], y2 - lstTableRef[1]]}
      list_tokens_bbox.append(dict_tokens_bbox)

  return list_tokens_bbox

def printMetaDados(dicMetaData):

  strout = []
  strout.append("{ \n")

  if "filename" in dicMetaData:
    strout.append("filename: '" + str(dicMetaData["filename"]) + "',\n")
  else:
    strout[0] = "chave filename não existe na estrutura"
    return strout

  if "split" in dicMetaData:
    strout.append("split: '" + str(dicMetaData["split"]) + "',\n")
  else:
    strout[0] = "chave split não existe na estrutura"
    return strout

  if "imgid" in dicMetaData:
    strout.append("'imgid': " + str(dicMetaData["imgid"]) + ",\n")
  else:
    strout[0] = "chave imgid não existe na estrutura"
    return strout

  if "html" in dicMetaData:

    strout.append("--INICIO HTML \n")
    strout.append("'html': \n {")

    if "cells" in dicMetaData["html"] and "structure" in dicMetaData["html"]:

      if isinstance(dicMetaData["html"]["cells"], list ) and isinstance(dicMetaData["html"]["structure"], list ):

        #varrendo o conteudo da lista dicMetaData["html"]["cells"]
        #que contem as duas sublistas tokens e bbox
        strout.append("--INICIO CELLS \n")
        strout.append("'cells': [\n")
        i = 0
        for arrcells in dicMetaData["html"]["cells"]:

           #print da estrutura dos dicionarios tokens e bbox
           tokens =  arrcells["tokens"]
           bbox =  arrcells["bbox"]
           comma = ","
           if i == len(dicMetaData["html"]["cells"]) -1:
            comma = ""
           else:
            comma = ","

           strout.append("      {'tokens': " + str(tokens) + ", 'bbox': " + str(bbox) + "}" + comma + " \n")
           i = i + 1

        strout.append("] --FIM CELLS\n")

        #print da estrutura do dicionario structure
        if(dicMetaData["html"] is not None and dicMetaData["html"]["structure"] is not None):
          #strout.append("      'structure': [" + str("','".join(dicMetaData["html"]["structure"])) + "' \n")
          strout.append("      'structure': ['" + str("','".join([x for x in dicMetaData["html"]["structure"] if x is not None])) + "' \n")

        else:
          strout.append("      'structure': ['None'] \n")


        strout.append("] --FIM STRUCTURE \n")

      else:
        strout[0] = "chave html/cells ou structure não existe na estrutura"
        return strout

    strout.append("} --FIM HTML\n")
  else:
    strout[0] = "chave html não existe na estrutura"
    return strout

  strout.append("} \n")
  return strout

def saveAnnotationFile(dicMetaData, dirout , numPage):

  vecArq = dicMetaData["filename"].split("/")

  nomeArq = ""
  if(len(vecArq)>0):

    tableID = dicMetaData["imgid"].replace("'","")
    filename = vecArq[len(vecArq)-1].split(".")[0]
    labname = vecArq[len(vecArq)-2].split(".")[0]

    nomeArq = tableID + "|" + filename + "|" + str(numPage) + "_METADADOS.mtd"
    print("gravando arquivo ", nomeArq)

    strout = printMetaDados(dicMetaData)

    labDirOut = dirout + "/" + labname + "/"
    if not os.path.exists(labDirOut):
      os.makedirs(labDirOut)

    with open(labDirOut + nomeArq, 'w') as arquivo:
      for linha in strout:
            arquivo.write(linha)

def getPos(lst, key):

  for k, item in enumerate(lst):
    if item == key:
      return k

  return -1

#FUNCAO DE VERIFICA SE EXISTE O BUG DE TRUNCAR O VALOR ∞
def isBUGInfinito(dicTable, dicTableGT):

  char = ""
  posInf = getPos(dicTableGT["FIRST_LINE"], "∞")
  posInfV = getPos(dicTableGT["FIRST_LINE"], "V")

  #possui valor ∞ na tabela? segue análise
  if posInf >-1:
    char = "∞"
    print("Possui valor ∞ na tabela, posInf", posInf)
    #2 - possuem o mesmo valor de dimensao em DIMENSION
    if dicTable["DIMENSION"] == dicTableGT["DIMENSION"]:
      print("Valor DIMENSION iguais")
      #4 primeiras colunas das duas tabelas possuem o mesmo valor?
      print("checkAVGSimilarities2INFO >=60 perc? ",checkAVGSimilarities2INFO(dicTableGT, dicTable, 80))
      if checkAVGSimilarities2INFO(dicTableGT, dicTable, 60):
        print("Quatro primeiras colunas similares")
        #4 - chave HEAD tem o tamanho um a menos que GT
        if len(dicTable["HEAD"]) == len(dicTableGT["HEAD"])-1 and len(dicTable["FIRST_LINE"]) == len(dicTableGT["FIRST_LINE"])-1:
          #dicTable["FIRST_LINE"].insert(posInf, "∞")
          return True, posInf, "∞"

  #possui valor ∞ na tabela? segue análise
  if posInfV >-1:
    char = "V"
    print("Possui valor V na tabela, posInf", posInf)
    #2 - possuem o mesmo valor de dimensao em DIMENSION
    if dicTable["DIMENSION"] == dicTableGT["DIMENSION"]:
      print("Valor DIMENSION iguais")
      #4 primeiras colunas das duas tabelas possuem o mesmo valor?
      print("checkAVGSimilarities2INFO >=60 perc? ",checkAVGSimilarities2INFO(dicTableGT, dicTable, 80))
      if checkAVGSimilarities2INFO(dicTableGT, dicTable, 60):
        print("Quatro primeiras colunas similares")
        #4 - chave HEAD tem o tamanho um a menos que GT
        if len(dicTable["HEAD"]) == len(dicTableGT["HEAD"])-1 and len(dicTable["FIRST_LINE"]) == len(dicTableGT["FIRST_LINE"])-1:
          #dicTable["FIRST_LINE"].insert(posInf, "∞")
          return True, posInfV, "V"

  return False, -1, ""

import glob

def deleteFiles(dir, ext):
  # Obter todos os arquivos com a extensão especificada
  files = glob.glob(os.path.join(dir, f'*.{ext}'))

  # Remover cada arquivo encontrado
  for file in files:
      try:
          os.remove(file)
          print(f"Arquivo {file} removido com sucesso.")
      except OSError as e:
          print(f"Erro ao remover o arquivo {file}: {e}")

def printHTML2(lst, tipo): #com TAB
  # tipo: RAW (cru) ou PRETTY (html com identações)

  strout = ""
  #strres = ''.join(lst)
  strres = ''.join([str(x) for x in lst])
  if tipo == "PRETTY":
    soup = bs(strres, 'html.parser')
    strout = soup.prettify()
    # Substituir espaços por TAB
    strout = strout.replace("  ", "\t")
  else:
    strout = strres

  return strout

def printElementMetaData(dicMetaData, elem):

  strout = []
  strout.append("[")

  if not "filename" in dicMetaData:
    strout[0] = "chave filename não existe na estrutura"
    return strout

  if not "split" in dicMetaData:
    strout[0] = "chave split não existe na estrutura"
    return strout

  if not "imgid" in dicMetaData:
    strout[0] = "chave imgid não existe na estrutura"
    return strout

  if "html" in dicMetaData:
    if "cells" in dicMetaData["html"] and "structure" in dicMetaData["html"]:
      if isinstance(dicMetaData["html"]["cells"], list ) and isinstance(dicMetaData["html"]["structure"], list ):

        i = 0

        if(elem == "BBOX"):
          #strout.append("{")
          for arrcells in dicMetaData["html"]["cells"]:

            tokens =  arrcells["tokens"]
            bbox =  arrcells["bbox"]
            comma = ","
            if i == len(dicMetaData["html"]["cells"]) -1:
              comma = ""
            else:
              comma = ","

            strout.append("{'tokens': " + str(tokens) + ", 'bbox': " + str(bbox) + "}" + comma + " \n")
            i = i + 1

        elif(elem == "HTML_PRETTY"):
          strout.append(printHTML2(dicMetaData["html"]["structure"], "PRETTY"))

        else:
          html = "'"
          html = html + "','".join([element for element in dicMetaData["html"]["structure"] if element]) + "'"
          strout.append(html)

      else:
        strout[0] = "chave html/cells ou structure não existe na estrutura"
        return strout

  else:
    strout[0] = "chave html não existe na estrutura"
    return strout

  strout.append("]")
  return strout

def saveElementMetadata(dicMetaData, elem, dirout, numPage):

  vecArq = dicMetaData["filename"].split("/")
  #print(vecArq)
  nomeArq = ""
  if(len(vecArq)>0):

    tableID = str(dicMetaData["imgid"]).replace("'","")
    filename = vecArq[len(vecArq)-1].split(".")[0]
    labname = vecArq[len(vecArq)-2].split(".")[0]

    nomeArq = tableID + "|" + filename + "|" + str(numPage) + "_" + elem + "." + elem.lower()
    strout = printElementMetaData(dicMetaData, elem)

    labDirOut = dirout + "/" + labname + "/"

    print("Arquivo de anotação ", elem, " gerado = ",  nomeArq)
    with open(labDirOut + nomeArq, 'w') as arquivo:
      for linha in strout:
          arquivo.write(linha)

#retorna o tableID de maior similiaridade entre os dicionarios GT de comparacao
def getMaiorSimilaridade(dic, listasGT):

  lstDicRes = []

  for dicGT in listasGT:

    lstRes = []

    #ajusta caso necessário a dimensao entre as tabelas
    #print("a tratar...", dic["FIRST_LINE"])
    #print(type(dic["FIRST_LINE"]))
    dic["FIRST_LINE"] = ajustColList(dic["FIRST_LINE"], len(dicGT["FIRST_LINE"]))
    lenDic = len(dic["FIRST_LINE"])

    print("getMaiorSimilaridade, FIRST_LINE GT", dicGT["FIRST_LINE"])
    print("getMaiorSimilaridade, FIRST_LINE ANALISE", dic["FIRST_LINE"])
    for i in range(len(dicGT["FIRST_LINE"])):

      lenDicGT = len(dicGT["FIRST_LINE"])

      if lenDic != lenDicGT: #tamanhos diferentes, retornar vazio
        return ""

      str1 = str(dic["FIRST_LINE"][i])
      str2 = str(dicGT["FIRST_LINE"][i])
      strDec1 = str1.replace(",", ".").strip()
      strDec2 = str2.replace(",", ".").strip()

      #se os valores forem numeros converter para float para calcular similiaridade com maior exatidao
      if (isDecimal(strDec1) and isDecimal(strDec2)):
        #print(" Similaridade entre dois numeros ", strDec1, " e ", strDec2, " = ", round(calcPercSimValueNums(float(strDec1), float(strDec2)), 2))
        lstRes.append(round(calcPercSimValueNums(float(strDec1), float(strDec2)), 2))
      #no caso de string
      else:
        #print(" Similaridade entre duas strings ", strDec1, " e ", strDec2, " = ", round(calcPercSimStrings(str1, str2),4))
        lstRes.append(round(calcPercSimStrings(str1, str2),2))

    lstDicRes.append({"TABLEID": dicGT["TABLEID"], "RESULT":lstRes})

  #verificando maior media
  tableId = ""
  maiorMedia = 0
  for dicRes in lstDicRes:
    avg = sum(dicRes["RESULT"]) / len(dicRes["RESULT"])
    print("Media ", avg)
    if avg > maiorMedia:
      tableId = dicRes["TABLEID"]
      maiorMedia = avg

  return tableId

def ajustColList(lst1, qtdColsRef):

  # Calcula o número de colunas de cada lista
  #num_cols_lstRef = len(lstRef)
  num_cols_lst1 = len(lst1) if lst1 else 0

  # Se lst1 tiver menos colunas que lstRef, preenche com NaN
  if num_cols_lst1 < qtdColsRef:
    # Calcula o número de colunas a serem adicionadas
    num_cols_adicionais = qtdColsRef - num_cols_lst1
    # Preenche lst2 com NaN nas novas colunas

    #print("num_cols_adicionais ", num_cols_adicionais)
    for i in range(num_cols_adicionais):
      lst1.append("NAN")

  #se lst1 tiver mais coluna que lstRef, remove as colunas adicionais de lst1
  elif num_cols_lst1 > qtdColsRef:
    # Calcula o número de colunas a serem removidas
    num_cols_adicionais = num_cols_lst1 - qtdColsRef
    for i in range(num_cols_adicionais):
      if (len(lst1)>0):
        del(lst1[len(lst1)-1])

  return lst1

#funcao para ajustar uma lista de acordo com a quantidade de linhas e colunas de referencia
# se tiver a mais linhas ou colunas, adiciona, se tiver menos, remove
def ajustList(lst1, qtdRowsRef, qtdColsRef):

  qtdRows = len(lst1)

  #lsteste = eval(strdata)
  lst1_ajust = []

  #1 - ajustando as colunas
  for lstRow in lst1:
    lst1_ajust.append(ajustColList(lstRow, qtdColsRef))

  #1 - ajustando as linhas
  #se precisar adicionar linhas
  if(qtdRowsRef > qtdRows):
    qtdLinhasAdicionais = qtdRowsRef - qtdRows
    for i in range(qtdLinhasAdicionais):
      if len(lst1_ajust) >0 and len(lst1_ajust[0]) >0:
        lst1_ajust.insert(len(lst1_ajust), ["NAN" for _ in range(len(lst1_ajust[0]))])
  #se precisar remover linhas adicionais
  elif(qtdRows > qtdRowsRef):
    qtdLinhasAdicionais = qtdRows - qtdRowsRef
    for i in range(qtdLinhasAdicionais):
      del(lst1_ajust[len(lst1_ajust)-1])

  return lst1_ajust

#funcao para ajustar a estrutura cell_coordinates em relacao a referencia para possibilitar a comparacao e geracao de estatisticas
def ajustCellCord (cellCord, qtdRowsRef, qtdColsRef):

  lstCoord = [999999, 999999, 999999, 999999] #nova lista de coordenadas
  dicNewCol = {'column': lstCoord, 'cell': lstCoord} #uma nova coluna (celula)
  #nova linha da tabela
  newLine =  "{'row': [99999, 99999, 99999, 99999], \
              'cells': [], \
              'cell_count': 0}"
  dicNewLine = eval(newLine)

  #verificando dimensao da estrutura atual
  qtdRows = 0
  qtdCols = 0
  if cellCord is not None and len(cellCord) >0:
    qtdRows = len(cellCord)
    qtdCols = len(cellCord[0]["cells"])
  #qtdRows = len(cellCord)
  #qtdCols = len(cellCord[0]["cells"])

  #adicionando estrutura inicial para cada quantidade de colunas de referencia
  for i in range(qtdColsRef):
    dicNewLine["cells"].insert(i,dicNewCol)

  #adiciona para cada coluna adicional necessária
  if qtdCols < qtdColsRef:
    qtdColAdicionais = qtdColsRef - qtdCols

    print("Adicionando coluna, qtd = ", qtdColAdicionais)
    for i in range(qtdColAdicionais):
      for row in cellCord:
        row['cells'].append(dicNewCol)

  #removendo uma coluna para cada adicional
  elif qtdCols > qtdColsRef:
    qtdColAdicionais = qtdCols - qtdColsRef

    print("Removendo coluna, qtd = ", qtdColAdicionais)
    for i in range(qtdColAdicionais):
      for row in cellCord:
        row['cells'] = row['cells'][:-1]

  #adiciona linha para cada linha adicional necessária
  if qtdRows < qtdRowsRef:
    qtdRowAdicionais = qtdRowsRef - qtdRows

    print("Adicionando linha, qtd = ", qtdRowAdicionais)
    for i in range(qtdRowAdicionais):
      cellCord.append(dicNewLine)

  #removendo linha para cada linha adicional necessária
  elif qtdRows > qtdRowsRef:
    qtdRowAdicionais = qtdRows - qtdRowsRef

    print("Removendo linha, qtd = ", qtdRowAdicionais)
    #removendo linha para cada adicional
    for i in range(qtdRowAdicionais):
      del(cellCord[len(cellCord)-1])

  return cellCord

def temRepeticoes(lista):
    return len(lista) != len(set(lista))

#ajustar df2 para incrementar mais linhas ou colunas em relacao a referencia (df1)
def ajustDataframe(df1, df2):

  # Verifica se o número de colunas de df2 é menor que o de df1
  if df2.shape[1] < df1.shape[1]:
      # Calcula quantas colunas precisam ser adicionadas
      num_cols_adicionais = df1.shape[1] - df2.shape[1]
      # Adiciona as colunas extras em df2 preenchidas com NaN
      for i in range(num_cols_adicionais):
          df2[f'C{i+1}'] = np.nan
  #neste caso remove as colunas adicionais
  elif df2.shape[1] > df1.shape[1]:
    # Calcula quantas colunas precisam ser removidas
    num_cols_remover = df2.shape[1] - df1.shape[1]
    # Remove as colunas extras em df2 da direita para a esquerda
    df2 = df2.iloc[:, :-num_cols_remover]

  # Verifica se o número de linhas de df2 é menor que o de df1
  if df2.shape[0] < df1.shape[0]:
      # Calcula quantas linhas precisam ser adicionadas
      num_linhas_adicionais = df1.shape[0] - df2.shape[0]
      # Adiciona as linhas extras em df2 preenchidas com NaN
      linhas_extras = pd.DataFrame(index=[f'L{i+1}' for i in range(num_linhas_adicionais)],
                                    columns=df2.columns)
      df2 = pd.concat([df2, linhas_extras])
  #neste caso remove as linhas adicionais
  elif df2.shape[0] > df1.shape[0]:
    # Calcula quantas linhas precisam ser removidas
    num_linhas_remover = df2.shape[0] - df1.shape[0]
    # Remove as linhas extras em df2 de baixo para cima
    df2 = df2.iloc[:-num_linhas_remover, :]

  return df2

import copy

#ajustar df2 para incrementar mais linhas ou colunas em relacao a referencia (df1)
def ajustDataframe(df1, qtdLinhasRef, qtdColunasRef):

  dfAux = copy.deepcopy(df1)
  # Verifica se o número de colunas de df1 é menor que qtdColunasRef
  if df1.shape[1] < qtdColunasRef:
      # Calcula quantas colunas precisam ser adicionadas
      num_cols_adicionais = qtdColunasRef - df1.shape[1]
      # Adiciona as colunas extras em df1 preenchidas com NaN
      #print("adicionando qtd colunas", num_cols_adicionais)
      ultIndice = len(dfAux.columns) - 1
      for i in range(num_cols_adicionais):
          if (i==0):
            novoIndice = ultIndice + 1
          else:
            novoIndice = ultIndice + (i+1)
          #dfAux[f'C{i+1}'] = np.nan novoIndice
          dfAux[novoIndice] = np.nan
  #neste caso remove as colunas adicionais
  elif qtdColunasRef < df1.shape[1]:
    # Calcula quantas colunas precisam ser removidas
    num_cols_remover = df1.shape[1] - qtdColunasRef
    # Remove as colunas extras em df1 da direita para a esquerda
    dfAux = dfAux.iloc[:, :-num_cols_remover]

  # Verifica se o número de linhas de df1 é menor que qtdLinhasRef
  if df1.shape[0] < qtdLinhasRef:
      # Calcula quantas linhas precisam ser adicionadas
      num_linhas_adicionais = qtdLinhasRef - df1.shape[0]
      # Adiciona as linhas extras em df1 preenchidas com NaN
      #linhas_extras = pd.DataFrame(index=[f'L{i+1}' for i in range(num_linhas_adicionais)],
                                    #columns=df1.columns)
      #print("adicionando qtd linhas", num_linhas_adicionais)
      ultIndice = len(dfAux.columns) - 1
      linhas_extras = pd.DataFrame(index=[f'{ultIndice + i + 1}' for i in range(num_linhas_adicionais)],
                             columns=dfAux.columns)
      dfAux = pd.concat([dfAux, linhas_extras])
  #neste caso remove as linhas adicionais
  elif qtdLinhasRef < df1.shape[0]:
    # Calcula quantas linhas precisam ser removidas
    num_linhas_remover = df1.shape[0] - qtdLinhasRef
    # Remove as linhas extras em df1 de baixo para cima
    dfAux = dfAux.iloc[:-num_linhas_remover, :]

  #normalizando indices
  dfAux = dfAux.reset_index(drop=True)

  return dfAux

#funcao para converter PDF para png
def pdf_page_to_png(pdf_path, page_number, output_path):
  # Convertendo a página do PDF para uma lista de imagens
  images = convert_from_path(pdf_path, first_page=page_number, last_page=page_number)

  # Salvando a imagem como PNG
  images[0].save(output_path, 'PNG')

# Código MAIN para Rodar em Lote




In [None]:
################################### FUNCAO PRINCIPAL - GERAR ARQUIVOS DE METADADOS #######################
dfArq = listFiles(CERT_PATH, None) # carregar a lista de arquivos no DATAFRAME

lstDF = None
df = None
# Definir o fuso horário de SP
fuso_horario_brasilia = pytz.timezone('America/Sao_Paulo')

pathLab = DIROUT_TABLENET + "/" + LAB_PATH + "/"

if(os.path.exists(pathLab)):
  print('Removendo arquivos gerados anteriormente.., caminho:', pathLab)
  deleteFiles2(pathLab) #deletando os arquivos anteriores

for filepath, pages in zip(dfArq["PATH"], dfArq["QTDPAGES"]):

  arrcurfile = filepath.split("/")
  curFile = arrcurfile[len(arrcurfile)-1]
  labName = arrcurfile[len(arrcurfile)-2]

  #inicializando variaveis GT
  listFilesGT = []
  listTablesInfoGT = []

  #coletando dados das tabelas GT para comparacao e gerar o ID da imagem correto
  GT_LAB_OUT = GT_PATH + labName + "/"
  print("GT_LAB_OUT", GT_LAB_OUT)

  #varrendo cada pagina do arquivo
  for i in range(int(pages)):

    page = i+1
    #verificando a quantidade de tabelas por pagina

    #verificando se a pasta do laboratorio existe, caso negativo, cria
    labDirOut = DIROUT_TABLENET + "/" + labName + "/"
    if not os.path.exists(labDirOut):
      os.makedirs(labDirOut)

    #definindo variaveis para gravacao do arquivo de saida
    noExtension = curFile.replace(".pdf","")
    #arquivo de PDF de leitura
    path_pdf_in = filepath
    #caminho para arquivo BMP convertido
    path_bmp_noextension =  labDirOut + "/" + noExtension

    #nova pagina, carrega a lista de referencia (GT) da pagina em questao
    listFilesGT = getListFilesGTInfo(noExtension, page, GT_LAB_OUT)
    listTablesInfoGT = getListTablesInfo(GT_LAB_OUT,listFilesGT)
    qtdTabelasGT = len(listTablesInfoGT)

    print("Arquivo: [", path_pdf_in , "] / qtd de tabelas para ler da pagina[" + str(page) + "]: ", len(listTablesInfoGT))

    #temos tabelas para processar....
    if(qtdTabelasGT >0):

      ####1 - converter pdf para png  ####
      #convert_pdf_to_bmp(path_pdf_in, path_bmp_noextension, page)

      output_png = path_bmp_noextension + "_" + str(page) + ".png"

      pdf_page_to_png(path_pdf_in, page, output_png)

      #carregar a imagem do modelo para o dataframe
      lstDF = predict(output_png, TABLENET_MODEL_PATH)
      qtdlinhasLstDF = 0
      qtdColunasLstDF = 0
      if len(lstDF) > 0:
        qtdlinhasLstDF = len(lstDF)
        qtdColunasLstDF = len(lstDF[0].columns)

      qtdlinhasGT = 0
      qtdColunasGT = 0
      if len(listTablesInfoGT) > 0:
        qtdlinhasGT = len(listTablesInfoGT)
        qtdColunasGT = len(listTablesInfoGT[0])

      print("=============>Tabela pagina {0} caminho: {1} ".format(page,path_pdf_in))
      print("Dimensão lstDF {0} x {1} ".format(qtdlinhasLstDF, qtdColunasLstDF))
      print("Dimensão listTablesInfoGT {0} x {1} ".format(qtdlinhasGT, qtdColunasGT))

      print("len(lstDF) e len(listTablesInfoGT) com tamanho > 1 - CASO COMPLEXO ")
      #para cada tabela (dataframe) da lista de dataframes
      for df in lstDF:

        qtdlinhasDF = 0
        qtdColunasDF = 0
        if len(lstDF) > 0:
          qtdlinhasDF = len(df)
          qtdColunasDF = len(df.columns)

        dfOriginal = copy.deepcopy(df) #guarda o DF original sem correcoes para gravar no HTML (calculo do TED)
        #verificando possivel ajuste de dimensao do dataframe com GT para comparação
        if listTablesInfoGT is not None and len(listTablesInfoGT) > 0:

          #ajustar o tamanho do dataframe, caso necessário
          qtdLinhasGT = int(listTablesInfoGT[0]["DIMENSION"].split("X")[0])
          qtdColsGT = int(listTablesInfoGT[0]["DIMENSION"].split("X")[1])
          if (qtdColunasDF != qtdColsGT or qtdlinhasDF != qtdLinhasGT):
            print("Ajustando dataframe para dimensão, dimensão atual DF = ", qtdlinhasDF,"x",qtdColunasDF)
            print("Ajustando dataframe para dimensão do GT, nova dimensão = ", qtdLinhasGT,"x",qtdColsGT)
            df = ajustDataframe(df, qtdLinhasGT, qtdColsGT)

        #carregando o objeto dicionário INFO da tabela de análise
        bbox = [999999, 999999, 999999, 999999] #modelo nao traz informacao de coordenadas (ajustar)
        lstHead = [] if len(df) == 0 else df.iloc[0].tolist()
        lstFirst = [] if len(df) == 0 or len(df) == 1 else df.iloc[1].tolist()
        dicTable = getDicTableInfo(labName, noExtension, page, qtdlinhasDF, qtdColunasDF, bbox, lstHead, lstFirst)

        print("dicTable HEAD", dicTable["HEAD"])
        print("dicTable FIRST_LINE", dicTable["FIRST_LINE"])

        #verificando qual tabela de maior similaridade
        #apenas uma tabela GT existente na pagina (df recebe o TABLEID existente)
        if len(listTablesInfoGT) ==1:
          print("Apenas uma tabela GT na página, logo, TABLEID a ser utilizado para dicTable = ",listTablesInfoGT[0]["TABLEID"])
          dicTable["TABLEID"] = listTablesInfoGT[0]["TABLEID"]
        else:
          print("listTablesInfoGT > 1, verificar similaridade...")
          dicTable["TABLEID"] = getMaiorSimilaridade(dicTable, listTablesInfoGT)

        listFileGT = getGTInfo(dicTable["TABLEID"], noExtension, page, GT_LAB_OUT)
        listTableInfoGT = getListTablesInfo(GT_LAB_OUT,listFileGT)
        print("TABLEID de maior similaridade = ", dicTable["TABLEID"])

        #gravar o arquivo INFO da tabela para comparacao com GT
        #encontrou tabela identica ou similar
        if dicTable["TABLEID"] != "TBD" and dicTable["TABLEID"] != "":
          filePath = labDirOut + dicTable["TABLEID"] + "|" + noExtension + "|" + str(page) + "_INFO.info"
          print("gerando arquivo INFO de resultado - SUCESSO: ", filePath)
          SaveDicTableInfo(filePath, dicTable)
        else:
          #no caso de nao ter encontrado tabela similar ao GT para comparação, registrar arquivo de erro
          filePath = labDirOut + noExtension + "|" + str(page) + "|" + "_INFO_ERRO.error"
          dicTable["OBS"] = "ERRO - TABLEID NÃO ENCONTRADO"
          print("gerando arquivo INFO de resultado - ERRO: ", filePath)
          SaveDicTableInfo(filePath, dicTable)

        #modelo do dicmetada
        dicMetaData = {
        "filename": 0,
        "split": "train",
        "imgid": "",
        "html": {
          "cells": 0,
          "structure": 0
                }
        }

        #inicializando a lista
        dicMetaData["filename"] = path_pdf_in
        dicMetaData["imgid"] = dicTable["TABLEID"]

        #gravando no arquivo as informacoes de METADADOS, BBOX E HTML
        lstCells = []
        lstStructure = []

        #print("df = ", df)
        #print("df shape ", df.shape)
        #print("df.at[0,0] ", df.at[0,0])
        #carrega e concatena a lista de tokens das celulas e bbox calculando como referencia as coordenadas da tabela principal
        lstCells.extend(noteListTokensBbox(None, df, None)) # no futuro alterar para inserir bbox
        lstStructure.extend(noteTokensHTML(dfOriginal)) #carrega e concatena a lista de tokens html (df original sem ajustes)
        dicMetaData["html"]["cells"] = lstCells
        dicMetaData["html"]["structure"] = lstStructure

        print("Gravando Metadados: ", DIROUT_TABLENET)
        saveAnnotationFile(dicMetaData, DIROUT_TABLENET , page)
        saveElementMetadata(dicMetaData, "BBOX", DIROUT_TABLENET, page)
        saveElementMetadata(dicMetaData, "HTML", DIROUT_TABLENET, page)
        saveElementMetadata(dicMetaData, "HTML_PRETTY", DIROUT_TABLENET, page)

        #break #end for lstDF

      #end if temos paginas para processar

    #break # end for pages

  #break # end for files

#removendo arquivos BMP
print("Removendo arquivos png temporários...")
deleteFiles(labDirOut, "png")

# Obter a data e hora corrente
data_e_hora_corrente = datetime.now(fuso_horario_brasilia)
# Formatar a data e hora corrente para o formato desejado
data_e_hora_formatadas = data_e_hora_corrente.strftime("%d/%m/%Y %H:%M:%S")
#MAXFILES = 1 #apenas para testes, delimitar a quantidade de certificados a ler
print('FIM DO PROCESSAMENTO ', data_e_hora_formatadas)

# TEDS - CALCULO PARA O MODELO



In [None]:
from IPython.display import clear_output
!pip install distance
!pip install apted
!pip install lxml
!pip install tqdm
clear_output()

In [None]:
import distance
from apted import APTED, Config
from apted.helpers import Tree
from lxml import etree, html
from collections import deque
#from parallel import parallel_process
from tqdm import tqdm

class TableTree(Tree):
    def __init__(self, tag, colspan=None, rowspan=None, content=None, *children):
        self.tag = tag
        self.colspan = colspan
        self.rowspan = rowspan
        self.content = content
        self.children = list(children)

    def bracket(self):
        """Show tree using brackets notation"""
        if self.tag == 'td':
            result = '"tag": %s, "colspan": %d, "rowspan": %d, "text": %s' % \
                     (self.tag, self.colspan, self.rowspan, self.content)
        else:
            result = '"tag": %s' % self.tag
        for child in self.children:
            result += child.bracket()
        return "{{{}}}".format(result)


class CustomConfig(Config):
    @staticmethod
    def maximum(*sequences):
        """Get maximum possible value
        """
        return max(map(len, sequences))

    def normalized_distance(self, *sequences):
        """Get distance from 0 to 1
        """
        return float(distance.levenshtein(*sequences)) / self.maximum(*sequences)

    def rename(self, node1, node2):
        """Compares attributes of trees"""
        if (node1.tag != node2.tag) or (node1.colspan != node2.colspan) or (node1.rowspan != node2.rowspan):
            return 1.
        if node1.tag == 'td':
            if node1.content or node2.content:
                return self.normalized_distance(node1.content, node2.content)
        return 0.


class TEDS(object):
    ''' Tree Edit Distance basead Similarity
    '''
    def __init__(self, structure_only=False, n_jobs=1, ignore_nodes=None):
        assert isinstance(n_jobs, int) and (n_jobs >= 1), 'n_jobs must be an integer greather than 1'
        self.structure_only = structure_only
        self.n_jobs = n_jobs
        self.ignore_nodes = ignore_nodes
        self.__tokens__ = []

    def tokenize(self, node):
        ''' Tokenizes table cells
        '''
        self.__tokens__.append('<%s>' % node.tag)
        if node.text is not None:
            self.__tokens__ += list(node.text)
        for n in node.getchildren():
            self.tokenize(n)
        if node.tag != 'unk':
            self.__tokens__.append('</%s>' % node.tag)
        if node.tag != 'td' and node.tail is not None:
            self.__tokens__ += list(node.tail)

    def load_html_tree(self, node, parent=None):
        ''' Converts HTML tree to the format required by apted
        '''
        global __tokens__
        if node.tag == 'td':
            if self.structure_only:
                cell = []
            else:
                self.__tokens__ = []
                self.tokenize(node)
                cell = self.__tokens__[1:-1].copy()
            new_node = TableTree(node.tag,
                                 int(node.attrib.get('colspan', '1')),
                                 int(node.attrib.get('rowspan', '1')),
                                 cell, *deque())
        else:
            new_node = TableTree(node.tag, None, None, None, *deque())
        if parent is not None:
            parent.children.append(new_node)
        if node.tag != 'td':
            for n in node.getchildren():
                self.load_html_tree(n, new_node)
        if parent is None:
            return new_node

    def evaluate(self, pred, true):
        ''' Computes TEDS score between the prediction and the ground truth of a
            given sample
        '''
        if (not pred) or (not true):
            return 0.0
        parser = html.HTMLParser(remove_comments=True, encoding='utf-8')
        pred = html.fromstring(pred, parser=parser)
        true = html.fromstring(true, parser=parser)
        if pred.xpath('body/table') and true.xpath('body/table'):
            pred = pred.xpath('body/table')[0]
            true = true.xpath('body/table')[0]
            if self.ignore_nodes:
                etree.strip_tags(pred, *self.ignore_nodes)
                etree.strip_tags(true, *self.ignore_nodes)
            n_nodes_pred = len(pred.xpath(".//*"))
            n_nodes_true = len(true.xpath(".//*"))
            n_nodes = max(n_nodes_pred, n_nodes_true)
            tree_pred = self.load_html_tree(pred)
            tree_true = self.load_html_tree(true)
            distance = APTED(tree_pred, tree_true, CustomConfig()).compute_edit_distance()
            return 1.0 - (float(distance) / n_nodes)
        else:
            return 0.0

    def batch_evaluate(self, pred_json, true_json):
        ''' Computes TEDS score between the prediction and the ground truth of
            a batch of samples
            @params pred_json: {'FILENAME': 'HTML CODE', ...}
            @params true_json: {'FILENAME': {'html': 'HTML CODE'}, ...}
            @output: {'FILENAME': 'TEDS SCORE', ...}
        '''
        samples = true_json.keys()
        if self.n_jobs == 1:
            scores = [self.evaluate(pred_json.get(filename, ''), true_json[filename]['html']) for filename in tqdm(samples)]
        else:
            inputs = [{'pred': pred_json.get(filename, ''), 'true': true_json[filename]['html']} for filename in samples]
            scores = parallel_process(inputs, self.evaluate, use_kwargs=True, n_jobs=self.n_jobs, front_num=1)
        scores = dict(zip(samples, scores))
        return scores


# 5 - Função MAIN para gerar estatísticas (ARQUIVOS GT VERSUS TABLENET)

In [None]:
#montando o caminho para leitura dos arquivos (certificados, imagens, planilhas, etc)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import glob
import json

#coletar as informacoes da tabela do arquivo _INFO e retornar para uma lista
def getListTablesInfo(path, listFiles):

  listTablesInfo = []
  for fileName in listFiles:
    with open(path + fileName, 'r') as file:
      conteudo = file.read()
      listTablesInfo.append(eval(conteudo))

  return listTablesInfo

def sortList(lst, len):
    def personList(item):
        # Extrai o número após 'CTM' e converte para inteiro
        return int(item[len:])

    return sorted(lst, key=personList)

#coletar arquivo de acordo com premissas (prefixo e sufixo)
def getFileByPrefix(path, prefix, sufix):

  for fileName in os.listdir(path):
      if prefix in fileName and fileName.endswith(sufix):
          return fileName
  return ""

def getFiles(folderDir, ext):
  # Construir o padrão de busca usando a extensão fornecida
  pattern = os.path.join(folderDir, f"*.{ext}")

  files = []
  arrPath = folderDir.split("/")

  #print("arrPath ", arrPath)
  if len(arrPath) >0 and len(arrPath[len(arrPath)-1].split("_")) >0:
    #print(folderDir)
    lab = arrPath[len(arrPath)-1].split("_")[2]
    # Usar a função glob para encontrar os arquivos correspondentes ao padrão
    files = glob.glob(pattern)

  # Retornar a lista de arquivos encontrados
  return sorted(files)

def getInfoFiles(list):

  lstInfoFiles = []
  for path in list:

    arrPath = path.split("/")
    file = arrPath[len(arrPath)-1]
    tableId = file.split("|")[0]
    fileName = file.split("|")[1]
    page = file.split("|")[2].split("_")[0]

    lstInfoFiles.append({"TABLEID":tableId, "TABLEID":tableId, "FILE":fileName, "PAGE":page})

  return lstInfoFiles

#files = getFiles("/content/drive/MyDrive/DataSets/Certificados/Out/img2table/LAB_01_CTM", "info")
#lstInfoFiles = getInfoFiles (files)

#funcao que compara o valor de duas listas e calcula a media do percentual de similaridade entre eles
#(para calcular o valor do bbox das tabelas e células das tabelas)
def calcPercSimValueLists(lista1, lista2):
  if len(lista1) != len(lista2):
      print("calcPercSimValueLists, listas de tamanhos diferentes, lista1=",lista1,"/ lista2 = ",lista2)
      raise ValueError("As listas devem ter o mesmo comprimento.")

  percSim = [ (1 / (1 + (abs(num1 - num2)))) * 100 for num1, num2 in zip(lista1, lista2)]
  #print("percSim ", percSim)
  #print("result percSim ", sum(percSim) / len (percSim))
  return sum(percSim) / len (percSim)

#(para calcular o percentual de similaridade entre dois números
def calcPercSimValueNums(num1, num2):

  percSim = (1 / (1 + (abs(num1 - num2)))) * 100
  #print (" similaridade entre os numeros {0} e {1}: {2}".format(num1, num2, percSim))
  return percSim

#similaridade de strings conhecido como "Distância de Levenshtein"
def calcPercSimStrings(str1, str2):

  #retirando quebra de linhas da string
  str1 = str1.replace("\n", " ")
  str2 = str2.replace("\n", " ")

  tamanho_str1 = len(str1)
  tamanho_str2 = len(str2)

  matriz = [[0] * (tamanho_str2 + 1) for _ in range(tamanho_str1 + 1)]

  for i in range(tamanho_str1 + 1):
    matriz[i][0] = i

  for j in range(tamanho_str2 + 1):
    matriz[0][j] = j

  for i in range(1, tamanho_str1 + 1):
    for j in range(1, tamanho_str2 + 1):
        if str1[i - 1] == str2[j - 1]:
            custo_substituicao = 0
        else:
            custo_substituicao = 1
        matriz[i][j] = min(matriz[i - 1][j] + 1,       # Deletar
                            matriz[i][j - 1] + 1,       # Inserir
                              matriz[i - 1][j - 1] + custo_substituicao)  # Substituir

  distancia = matriz[tamanho_str1][tamanho_str2]
  maximo_tamanho = max(tamanho_str1, tamanho_str2)

  similaridade = 0
  if maximo_tamanho > 0:
    similaridade = (maximo_tamanho - distancia) / maximo_tamanho
  #print (" similaridade entre {0} e {1}: {2}".format(str1, str2, similaridade * 100))
  return similaridade * 100

#numero de ocorrencias de um numero em uma lista
def numTimes(list, num):

  cont = 0
  for valor in list:
    if valor == num:
      cont+=1

  return cont

#numero de ocorrencias de um numero ser maior ou igual que um numero
def numTimesMoreThen(list, num):

  cont = 0
  for valor in list:
    if valor >= num and valor <100:
      cont+=1

  return cont

def isDecimal(valor):
  try:
      float(valor)
      return True
  except ValueError:
      return False

#calcular similaridades em valores das celulas bbox e tokens de duas listas de tabelas
def calStatsTablesValues(lstTable1, lstTable2):

  lstResTokens = []
  lstResBbox = []
  for item1, item2 in zip(lstTable1, lstTable2):
    listaBbox1 = item1["bbox"]
    listaBbox2 = item2["bbox"]
    str1 = "".join(item1["tokens"])
    str2 = "".join(item2["tokens"])

    if len(listaBbox1) >0 and len(listaBbox2) >0:
      lstResBbox.append(round(calcPercSimValueLists(listaBbox1, listaBbox2),2))
    else:
      lstResBbox.append(0)

    strDec1 = str1.replace(",", ".").strip()
    strDec2 = str2.replace(",", ".").strip()
    #se os valores forem numeros converter para float para calcular similiaridade com maior exatidao
    if (isDecimal(strDec1) and isDecimal(strDec2)):
      lstResTokens.append( round( calcPercSimValueNums(float(strDec1), float(strDec2) ),2) )
    #no caso de string
    else:
      lstResTokens.append(round(calcPercSimStrings(str1, str2),2))

  return lstResTokens, lstResBbox

#calcular quantidade de valores não lidos pelo modelo (NAN ou 999999)
def calStatsNAN(lstTable):

  qtdNANToken = 0
  qtdNANBbox = 0

  for item in lstTable:
    listaBbox = item["bbox"]
    token = "".join(item["tokens"])

    if numTimes(listaBbox, 999999) ==4:
      qtdNANBbox+=1

    if token == "NAN":
      qtdNANToken+=1

  return qtdNANToken, qtdNANBbox

def removeSpecialChars(strTexto):
  # Remover os caracteres especiais
  speChars = ["\\", ]

  strTexto = strTexto.replace(speChars, "")

  return strTexto

def readFile(filePath):
  try:
    with open(filePath, 'r') as arquivo:
        conteudo = arquivo.read()
        conteudo = conteudo.replace("[nan, nan, nan, nan]", "[999999,999999,999999,999999]")
        #conteudo = conteudo.replace("\'", "")
        conteudo = conteudo.replace("\n", " ")
    return conteudo
  except FileNotFoundError:
    print(f'O arquivo "{filePath}" não foi encontrado.')
    return None
  except Exception as e:
    print(f'Ocorreu um erro ao ler o arquivo: {e}')
    return None

def SaveFileStats (filePath, dicTableStats):

  strFile = json.dumps(dicTableStats)
  strFile = strFile.replace(",",",\n")

  print("Salvando arquivo de estatísticas: ",filePath)
  with open(filePath, 'w') as arquivo:
    arquivo.write(strFile)

def strInDic(dicionario, string):

  for chave, valor in dicionario.items():
      if isinstance(valor, str) and string in valor:
          return True
  return False

def strInList(lstDic, string):

  for dic in lstDic:
    for chave, valor in dic.items():
        if isinstance(valor, str) and string == valor:
            return True
  return False

def ExportCSVSummary(ToolPath, GTPath):
  #coletando os diretorios dos laboratorios
  dirLabs = [nome for nome in os.listdir(ToolPath)]

  listStats = []
  filesSTATS = []

  lstTableIdGT = []
  #colentando os arquivos de statisticas dos laboratorios
  for dirLab in dirLabs:

    labPath =  ToolPath + dirLab
    filesSTATS = getFiles(labPath, "stats")

    #print("filesSTATS ", filesSTATS)
    #coletando arquivos de estatisticas
    for fileSTATS in filesSTATS:
      dicStats = ast.literal_eval(readFile(fileSTATS))
      listStats.append(dicStats)

    #coletando os tablesID do GT para comparacao
    filesGTInfo = GTPath + dirLab
    filesInfo = getFiles(filesGTInfo, "info")

    #print("filesInfo ", filesInfo)

    for fileInfo in filesInfo:
      arrFile = fileInfo.split("/")
      tableId = arrFile[len(arrFile)-1].split("|")[0]
      lstTableIdGT.append(dirLab+"|"+tableId)

  lstErros = []

  for item in lstTableIdGT:

    #print(item)
    lab = item.split("|")[0]
    tableId = item.split("|")[1]

    if not strInList(listStats, tableId): #não encontrou, adicionar ao erro

      print("Não encontrou TABLEID ", tableId, ", adicionando....")
      #print("GTPath ", GTPath)
      #print("lab ", lab)
      #print("tableId ", tableId)
      #print("fileInfo ", fileInfo)

      #coletando informacoes do statsInfo
      #print("parametros a carregar na funcao getFileByPrefix", GTPath + lab, tableId+"|", "info")
      fileInfo = getFileByPrefix(GTPath + lab, tableId+"|", "info")
      #pathInfo = GTPath + dirLab + fileInfo
      lstFile = [fileInfo]
      #print("parametros a carregar na funcao getListTablesInfo", GTPath + lab + "/", lstFile)
      lstInfo = getListTablesInfo(GTPath + lab + "/", lstFile)

      dicTableStats = {}
      dicTableStats["LAB"] = lstInfo[0]["LAB"]
      dicTableStats["FILE"] = lstInfo[0]["FILE"]
      dicTableStats["PAGE"] = lstInfo[0]["PAGE"]
      dicTableStats["TABLEID"] = lstInfo[0]["TABLEID"]
      dicTableStats["DIMENSION"] = lstInfo[0]["DIMENSION"]
      dicTableStats["QTDCELLS"] = 0
      dicTableStats["QTDACERTOSCELLS"] = 0
      dicTableStats["PERCACERTOSCELLS"] = 0
      dicTableStats["PERCACERTOSBBOX"] = 0
      dicTableStats["QTDNAOLIDOSBBOX"] = 0
      dicTableStats["PERCNAOLIDOSBBOX"] = 0
      dicTableStats["QTDNAOLIDOSTOKEN"] = 0
      dicTableStats["PERCNAOLIDOSTOKEN"] = 0
      dicTableStats["TEDS"] = 0

      lstErros.append(dicTableStats)

  lstTotal = listStats + lstErros

  dtStats = pd.DataFrame(lstTotal)
  dtStatsORD = dtStats.sort_values(by=['LAB','FILE', 'PAGE'])
  #print(dtStatsORD)
  print("Arquivo de sumário gerado ", ToolPath + "Summary.xlsx")
  dtStatsORD.to_excel(ToolPath + "Summary.xlsx", index=False)  # index=False para não incluir o índice do DataFrame
  return dtStatsORD

In [None]:
#gerando estatísticas

import os
import ast
import pandas as pd

#diretorio do img2tableDir
Tablenet = "/content/drive/MyDrive/DataSets/Certificados/Out/TableNet/"
#diretorio de referencia GT
GTDir = "/content/drive/MyDrive/DataSets/Certificados/Out/GT/"

#coletando os diretorios dos laboratorios
#dirLabs = [nome for nome in os.listdir(pubTables)]

dirLabs = [] # informe aqui a relacao de pastas dos laboratorios

for dirLab in dirLabs:

  if not os.path.isfile(dirLab):
    labPath =  Tablenet + dirLab
    labPathGT =  GTDir + dirLab
    #coleta os arquivos info para analise dos tablesID
    filesINFO = getFiles(labPath, "info")
    #coleta os tablesID para comparacao
    lstInfoFiles = getInfoFiles(filesINFO)

    #para cada tableID, gerar estatísticas
    for dicInfoFile in lstInfoFiles:

      print("dicInfoFile", dicInfoFile)
      fileInfo = dicInfoFile["TABLEID"] + "|" + dicInfoFile["FILE"] + "|" + dicInfoFile["PAGE"] + "_INFO.info"
      fileTokenBbox = dicInfoFile["TABLEID"] + "|" + dicInfoFile["FILE"] + "|" + dicInfoFile["PAGE"] + "_BBOX.bbox"
      fileHTML = dicInfoFile["TABLEID"] + "|" + dicInfoFile["FILE"] + "|" + dicInfoFile["PAGE"] + "_HTML.html"

      pathFileInfo = labPath + "/" + fileInfo
      pathFileTokenBbox = labPath + "/" + fileTokenBbox
      pathFileHTML = labPath + "/" + fileHTML

      pathFileInfoGT = labPathGT + "/" + fileInfo
      pathFileTokenBboxGT = labPathGT + "/" + fileTokenBbox
      pathFileHTMLGT = labPathGT + "/" + fileHTML
      print("pathFileInfo =", pathFileInfo)

      #gerando estatisticas do token e bbox
      if os.path.exists(pathFileHTMLGT) and os.path.exists(pathFileTokenBbox) and os.path.exists(pathFileTokenBboxGT):

        #carrega lista de tokens/bbox e HTMLs para comparacao do arquivo corrente com GT
        #arquivo info GT (.info)
        lstInfo = ast.literal_eval(readFile(pathFileInfoGT))
        #arquivo bbox (.bbox)
        #print("pathFileTokenBbox", pathFileTokenBbox)
        lstTkBox = ast.literal_eval(readFile(pathFileTokenBbox))
        #arquivo bbox GT (.bbox)
        lstTkBoxGT = ast.literal_eval(readFile(pathFileTokenBboxGT))
        #arquivo html (.html)
        #print("pathFileHTML ", pathFileHTML)
        strHTML = "".join(ast.literal_eval(readFile(pathFileHTML).replace("\n", " ")))
        #arquivo html GT(.html)
        strHTMLGT = "".join(ast.literal_eval(readFile(pathFileHTMLGT).replace("\n", " ")))
        #break

        #TEDS apenas funciona se tiver na estrutura html as tags html e body
        if "<body>" not in strHTML:
          strHTML = "<body>" + strHTML + "</body>"
        if "<html>" not in strHTML:
          strHTML = "<html>" + strHTML + "</html>"
        if "<body>" not in strHTMLGT:
          strHTMLGT = "<body>" + strHTMLGT + "</body>"
        if "<html>" not in strHTMLGT:
          strHTMLGT = "<html>" + strHTMLGT + "</html>"

        qtdLinhas = int(lstInfo["DIMENSION"].split("X")[0])
        qtdColunas = int(lstInfo["DIMENSION"].split("X")[1])
        qtdCells = qtdLinhas * qtdColunas

        #print("lstTkBox",lstTkBox)
        #print("lstTkBoxGT",lstTkBoxGT)
        lstStatsTokens, lstStatsBbox = calStatsTablesValues(lstTkBox, lstTkBoxGT)

        qtdNANToken, qtdNANBbox = calStatsNAN(lstTkBox)

        #calcula qtd de acertos (com similaridade 100% entre os valores dos tokens e tabelas)
        qtdAcertosTokens = numTimes(lstStatsTokens, 100.0)
        #qtdAcertosBbox = numTimes(lstStatsBbox, 100.0)
        percAcertosBbox = round((sum(lstStatsBbox) / len(lstStatsBbox))/100, 2)

        #calcula similidade maior que 95%
        qtdTokensMaior95 = numTimesMoreThen(lstStatsTokens, 95)
        qtdBboxMaior95 = numTimesMoreThen(lstStatsBbox, 95)

        #calcula TEDS entre as estruturas HTMLs (img2table VS GT)
        teds = TEDS()

        scoreTEDS = round(teds.evaluate(strHTML, strHTMLGT), 6)

        #salvado arquivo de estatística
        fileStats = dicInfoFile["TABLEID"] + "|" + dicInfoFile["FILE"] + "|" + dicInfoFile["PAGE"] + "_STATS.stats"
        pathFileStats = labPath + "/" + fileStats

        percAcertosTokens = round((qtdAcertosTokens / qtdCells), 2)
        percNaoLidosBbox = round((qtdNANBbox / qtdCells), 2)
        percNaoLidosToken = round((qtdNANToken / qtdCells), 2)

        dicTableStats = {}
        dicTableStats["LAB"] = lstInfo["LAB"]
        dicTableStats["FILE"] = dicInfoFile["FILE"]
        dicTableStats["PAGE"] = dicInfoFile["PAGE"]
        dicTableStats["TABLEID"] = dicInfoFile["TABLEID"]
        dicTableStats["DIMENSION"] = lstInfo["DIMENSION"]
        dicTableStats["QTDCELLS"] = qtdCells
        dicTableStats["QTDACERTOSCELLS"] = qtdAcertosTokens
        dicTableStats["PERCACERTOSCELLS"] = percAcertosTokens
        dicTableStats["PERCACERTOSBBOX"] = percAcertosBbox
        dicTableStats["QTDNAOLIDOSBBOX"] = qtdNANBbox
        dicTableStats["PERCNAOLIDOSBBOX"] = percNaoLidosBbox
        dicTableStats["QTDNAOLIDOSTOKEN"] = qtdNANToken
        dicTableStats["PERCNAOLIDOSTOKEN"] = percNaoLidosToken
        #dicTableStats["QTDCELLSMAIOR95"] = qtdTokensMaior95
        #dicTableStats["QTDABBOXMAIOR95"] = qtdBboxMaior95
        dicTableStats["TEDS"] = scoreTEDS

        SaveFileStats (pathFileStats, dicTableStats)

        print("Dimensão da tabela ", lstInfo["DIMENSION"], ", total ", str(qtdCells), " células" )
        print("qtdCells:",qtdCells)
        print("qtdAcertosTokens:",qtdAcertosTokens)
        print("percAcertosTokens:",percAcertosTokens,"/",(percAcertosTokens*100),"%")
        print("percAcertosBbox:",percAcertosBbox,"/",(percAcertosBbox*100),"%")
        print("qtdNaoLidosBbox:",qtdNANBbox)
        print("percNaoLidosBbox:",percNaoLidosBbox,"/",(percNaoLidosBbox*100),"%")
        print("qtdNaoLidosTokens:",qtdNANToken)
        print("percNaoLidosToken:",percNaoLidosToken,"/",(percNaoLidosToken*100),"%")
        #print("qtdAcertosTokens>95:",qtdTokensMaior95)
        #print("qtdAcertosBbox>95:",qtdBboxMaior95)
        print('TEDS score:', scoreTEDS,"/",round((scoreTEDS*100),2),"%")
        #break # fim primeiro for

#gerar arquivos de estatística, montar sumário no EXCEL
ExportCSVSummary(pubTables, GTDir)