In [104]:
from pypdf import PdfReader, PdfWriter
from pypdf.errors import PdfStreamError
from ocrmypdf.exceptions import SubprocessOutputError
import os
import ocrmypdf
from tqdm import tqdm
from IPython.display import clear_output
import json
import pandas as pd
from typing import Callable, Generator
from datetime import datetime
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.ai.formrecognizer import DocumentAnalysisClient
from io import BufferedReader
from dotenv import load_dotenv
from pathlib import Path
from dateutil.parser import parse, ParserError
import re
import traceback
from collections import defaultdict
from unidecode import unidecode

In [3]:
skip_files = []
files_with_error = []
processed_files = []

raw_dir = './files/raw'
tmp_dir = './files/tmp'
output_dir = './files/ocr'
json_dir = './files/json'
sample_dir = './files/sample'
log_dir = './var'

In [14]:
for dirname in [raw_dir, tmp_dir, output_dir, json_dir, log_dir]:
    if not os.path.exists(dirname):
        os.makedirs(dirname)

### **1. Preprocesamiento** 

#### **1.1. Eliminando Páginas que no contienen los datos numéricos de las pruebas**

El siguiente código elimina los arhivos que tengan cuatro páginas o menos, ya que estos son los reportes de las estaciones que no autorizaron la realización de los ensayos. También elimina las últimas dos páginas, las cuales solo contienen las imágenes de los ensayos y las firmas de los que realizaron el reporte.

In [46]:
for name in os.listdir(raw_dir):
    src = os.path.join(raw_dir, name)
    reader = PdfReader(src)
        
    if len(reader.pages) <= 4:
        skip_files.append(name)
        continue
    
    dest = os.path.join(tmp_dir, name)
    
    #Eliminar las últimas dos páginas, ya que son fotos y firmas
    for page_num in [reader.get_num_pages()-1,reader.get_num_pages()-2,0]:
        reader.remove_page(page_num)
    
    writer = PdfWriter(clone_from=reader)
    writer.write(dest)

/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in the trailer - assuming there is no previous xref table
/Prev=0 in

Un total de 130 archivos contienen cuatro páginas o menos, por lo que fueron omitidos para este análisis.

In [42]:
len(skip_files)

130

#### **1.2. Realizando OCR**

En la siguiente celda realizaremos un OCR a aquellos archivos que contienen más de 4 páginas. Los archivos PDFs con OCR son colocados en el directorio 'files/ocr' por si requieren ser consultados posteriormente.

In [77]:
import warnings

error_files = []
warning_files = []

for name in tqdm(os.listdir(tmp_dir), unit='file', desc='Files processed: '):
    
    #Omite los archivos que tienen cuatro páginas o menos
    if name in skip_files or os.path.exists(os.path.join(output_dir, name)):
        clear_output(wait=True)
        continue
    
    try:
        with warnings.catch_warnings(record = True) as w:
            
            warnings.simplefilter("always")
            #Realiza OCR
            ocrmypdf.ocr(os.path.join(tmp_dir, name),
                    os.path.join(output_dir, name),skip_text=True, language='spa', deskew=True)
            if w:
                raise w[-1].category(w[-1].message)
    
    except SubprocessOutputError as error:
        error_files.append({'file': name, 'error': error})
    
    except Warning as e:
        warning_files.append({'file': name, 'warning': e})
        
    clear_output(wait=True)

Files processed: 100%|██████████| 810/810 [1:33:18<00:00,  6.91s/file]


Solo un archivo procesado generó un error. Este se refiere al reporte "2023 03 047.pdf". Revisando el archivo manualmente, la razón del error se debe a que las primeras páginas del reporte ya están en texto y no escaneado. Como es solo un archivo, manualmente pueden eliminarse las primeras cuantro páginas para realizar el OCR a las tablas de los datos solamente, ya que estas si están escaneadas.

In [78]:
error_files

[{'file': '2023 03 047.pdf',
  'error': ocrmypdf.exceptions.SubprocessOutputError('Ghostscript PDF/A rendering failed')}]

#### **1.3. Eliminando Páginas**

Ahora procedemos a eliminar las páginas que no contengan tablas de datos. Estas páginas suelen contener portadas, resúmenes u otras informaciones no relevantes para este análisis. La forma de filtrar las páginas que nos interesan es identificando aquellas que tengan el texto "Test Report".

In [50]:

def remove_pages(compare_func: Callable, input_dir: str, output_dir: str) ->dict[str, dict|list]:

    results = {'deleted_pages':defaultdict(list), 'errors': {} }

    for name in tqdm(os.listdir(input_dir), unit='file', desc='Files processed: '):
        
        try:
            if not name.endswith('.pdf'):
                continue
            
            reader = PdfReader(os.path.join(input_dir, name))
            pages = list(reader.pages)
        
        except PdfStreamError as e:
            results['errors'][name] = e
            continue
        
        for i,page in enumerate(pages):   
            text = page.extract_text()
            
            if not compare_func(text):
    
                reader.remove_page(page, True)
                results['deleted_pages'][name].append(i+1)
                
        writer = PdfWriter(clone_from=reader)
        writer.write(os.path.join(output_dir, name))
    
    return results

In [26]:
def page_count(dirname: str) -> int:
    num_pages = 0
    for name in tqdm(os.listdir(dirname), unit='file', desc='Files processed: '):
    
        try:
            if not name.endswith('.pdf'):
                continue
            
            reader = PdfReader(os.path.join(dirname, name))
            num_pages += len(reader.pages)

        except PdfStreamError as e:
            continue
    return num_pages

In [27]:
results = remove_pages(lambda text: text.find("Test Report") >= 0, output_dir, tmp_dir)

Files processed: 100%|██████████| 810/810 [03:08<00:00,  4.30file/s]


A continuación se muestran las páginas eliminadas para cada archivo

In [28]:
results

{'deleted_pages': defaultdict(list,
             {'2023 03 140.pdf': [1, 2, 3, 4],
              '2023 03 034.pdf': [2],
              '2023 05 065.pdf': [1, 2, 3, 4],
              '2023 05 059.pdf': [1],
              '2023 02 030.pdf': [2],
              '2023 08 047.pdf': [1],
              '2023 03 021.pdf': [4],
              '2023 07 001.pdf': [1, 2, 3],
              '2023 05 112.pdf': [3, 4],
              '2023 03 037.pdf': [3],
              '2023 05 072.pdf': [1],
              '2023 08 051.pdf': [1, 2],
              '2023 04 088.pdf': [1, 3],
              '2023 04 089.pdf': [1, 2, 3, 4],
              '2023 03 036.pdf': [4],
              '2023 03 022.pdf': [3],
              '2023 05 107.pdf': [1, 2, 3, 4],
              '2023 06 007.pdf': [1],
              '2023 01 080.pdf': [3],
              '2023 06 003.pdf': [2],
              '2023 06 017.pdf': [1, 2, 3, 4],
              '2023 04 066.pdf': [1],
              '2023 05 076.pdf': [1],
              '2023 06 016.pdf

Debido a que usaremos un servicio de pago de Azure, limitaremos el análisis únicamente a las Gasolinas. Para esto, filtramos solamente las páginas que contienen la palabra "GASOLINA". En caso de requerirse los datos para el Gasoil, simplemente sería sustituir la palabra "GASOLINA" por "GASOIL".

In [44]:
results2 = remove_pages(lambda text: text.find("GASOLINA") >= 0, tmp_dir,'./files/sample')

Files processed: 100%|██████████| 810/810 [03:09<00:00,  4.28file/s]


A continuación se presentan las páginas que fueron eliminadas para cada archivo

In [48]:
results2

{'deleted_pages': defaultdict(list,
             {'2023 07 014.pdf': [4, 5],
              '2023 07 028.pdf': [4, 5],
              '2023 04 100.pdf': [4, 5],
              '2023 01 051.pdf': [3, 4],
              '2023 01 045.pdf': [3, 4],
              '2023 01 079.pdf': [2, 5],
              '2023 06 011.pdf': [4, 5],
              '2023 06 005.pdf': [4, 5],
              '2023 06 039.pdf': [4, 5],
              '2023 03 168.pdf': [4, 5],
              '2023 05 111.pdf': [4, 5],
              '2023 05 105.pdf': [4, 5],
              '2023 03 034.pdf': [1, 3, 4],
              '2023 03 008.pdf': [4],
              '2023 08 052.pdf': [4],
              '2023 05 065.pdf': [2],
              '2023 05 071.pdf': [4, 5],
              '2023 08 046.pdf': [4, 5],
              '2023 05 059.pdf': [3],
              '2023 02 025.pdf': [4, 5],
              '2023 02 031.pdf': [3, 4, 5],
              '2023 07 148.pdf': [4, 5],
              '2023 02 019.pdf': [4, 5],
              '2023 04 074.

Finalmente, contamos la cantidad de páginas de nuestra muestra que serán analizadas en Azure, las cuales totalizan 2,284

In [51]:
page_count('./files/sample')

Files processed: 100%|██████████| 810/810 [00:00<00:00, 1109.38file/s]


2284

### **2. Análisis de los Documentos**

In [112]:
class DataExtractor:
    
    page_num = None
    key_value_pairs: list[dict] = []
    tables: list[dict] = []
    
    def __init__(self, data) -> None:
        self.key_value_pairs = data['key_value_pairs']
        self.tables = data['tables']
            
    def __filter_by_content_fn (self,page_num: int, keyword: str) -> Callable:
        
        return lambda item: re.search(keyword, item['key']['content'].lower()) is not None \
            and item['key']['bounding_regions'][0]['page_number'] == page_num

    
    def __filter_table(self, page_number: int) -> dict|None:
        results = list(filter(
            lambda table: table['bounding_regions'][0]['page_number'] == page_number, 
            self.tables))
        
        if len(results) == 0:
            return None
        
        return results[0]
    
    def __extract_headers(self, cells: list[dict]) -> list[str]:
        filtered_cells =  list(filter(lambda item: 'kind' in item and item['kind'] == 'columnHeader', cells ))
        headers = []
        
        for header in filtered_cells:
            # No nos interesan las columnas combinadas
            if 'column_span' in header and header['column_span'] >= 2:
                continue
            
            headers.insert(header['column_index'], header['content'])
  
        return headers
    
    def __extract_rows(self, cells: list[dict], columnCount: int) -> list[str|float|int]:
        filtered_cells = list(filter(lambda item: 'kind' in item and item['kind'] == 'content', cells))
        rows = []
        row = []
        
        for index,cell in enumerate(filtered_cells):
            
            if index >0  and (index % columnCount) == 0:
                
                rows.append(row)
                row = []
                
            row.insert(cell['column_index'], cell['content'])
            
        return rows
    
    def extract_client_name (self, page_number: int) -> str:
        
        results = list(filter(self.__filter_by_content_fn(page_number, r"cliente[\s|\/\\n]+client"), self.key_value_pairs))
        
        if len(results) == 0:
            return ''
        
        return results[0]['value']['content']
        
    def extract_test_date (self, page_number: int) -> datetime|str:
        results = list(filter(
            self.__filter_by_content_fn(page_number, r"date-time"), self.key_value_pairs))
        
        if len(results) == 0:
            return ''
        date = re.match(
            r"(\d{1,2}/\d{1,2}/\d{4}\s\d{1,2}:\d{2}(?:\s[ap]\.m)?)", 
            results[0]['value']['content'])[0]
        try:
            return parse(date)
        except ParserError:
            return parse(re.sub(r'p\.m|a\.m', '',date))
        
    def extract_product_name(self, page_number: int) -> str:
            
        results = list(filter(self.__filter_by_content_fn(page_number, 'producto'), self.key_value_pairs))
        
        if len(results) == 0:
            return ''
        
        return results[0]['value']['content']
    
    def extract_table(self, page_num:int) -> pd.DataFrame:
        table_items =  self.__filter_table(page_num)
        if table_items is None:
            return pd.DataFrame
        
        headers = self.__extract_headers(table_items['cells'])
        values = self.__extract_rows(
            table_items['cells'], len(headers))

        df = pd.DataFrame(values, columns=headers)
        
        return df.assign(
            CLIENTE = self.extract_client_name(page_num), 
            PRODUCTO = self.extract_product_name(page_num), 
            FECHA = self.extract_test_date(page_num))
    
    def extract_tables(self) -> pd.DataFrame:
        df = pd.DataFrame()
        
        for table in self.tables:
            df1 = self.extract_table(table['bounding_regions'][0]['page_number'])
            df = pd.concat([df, df1], ignore_index=True)
            
        return df
    

In [113]:
class DocumentAnalyzer:
    
    document_analysis_client: DocumentAnalysisClient|None = None
    processed_files: list[str] = []
    errors: list[dict[str,str|BaseException]] = []
    
    def __init__(self) -> None:
        
        load_dotenv(override=True)
        endpoint = os.environ["AZURE_FORM_RECOGNIZER_ENDPOINT"]
        key = os.environ["AZURE_FORM_RECOGNIZER_KEY"]
        
        self.document_analysis_client = DocumentAnalysisClient(
        endpoint=endpoint, credential=AzureKeyCredential(key)
    )
        
    def __file_reader(self,dirname: str) -> Generator[BufferedReader, None, None]:
        
        for name in tqdm(os.listdir(dirname), unit='file', desc='Files processed: '):
            with open(os.path.join(dirname, name), "rb") as f:
                yield f
    
    def __get_analysis(self, file: BufferedReader) -> dict[str, str]:
        
        poller = self.document_analysis_client.begin_analyze_document(
                    "prebuilt-document", document=file
                )
        return poller.result().to_dict()
    
    def __write_results(self,filename: str, results: dict[str, str], output_dir: str, log_dir = './var') -> None:
        
        with open (os.path.join(output_dir, filename + '.json'), 'w') as json_file, \
            open(os.path.join(log_dir, 'checkpoint.txt'), 'w') as log_file:
                    
            json_file.write(json.dumps(results))
            log_file.write(json.dumps(processed_files))
        
    def analyze(self,input_dir: str, output_dir: str, log_dir = './var') -> pd.DataFrame:
        
        df = pd.DataFrame()
        self.processed_files = []
        self.errors = []
        
        i = 0
        for file in self.__file_reader(input_dir):
            
            if (i > 0 and i%75 ==0):
                break
            i+=1

            
            try:
                filename = Path(output_dir, file.name).stem
                
                result =  self.__get_analysis(file)
                
                extractor = DataExtractor(result)
                extracted_df = extractor.extract_tables().assign(ARCHIVO=filename + '.pdf')
                
                df = pd.concat([df,extracted_df], ignore_index = True)
                    
                self.processed_files.append(file.name)
    
            except (HttpResponseError, Exception) as e:
                error = {'file': file.name, 'error': e}
            
                if e is HttpResponseError:
                    error['comment'] = 'Archivo no procesado'
                    result = None 
                
                self.errors.append(error)      
                
            finally:
                if result is not None:
                    self.__write_results(filename, result, output_dir, log_dir)
        return df

In [114]:
analyzer = DocumentAnalyzer()
df = analyzer.analyze(sample_dir, json_dir)

Files processed:   9%|▉         | 75/810 [11:14<1:50:06,  8.99s/file]


In [115]:
analyzer.errors

[{'file': './files/sample/2023 08 053.pdf',
  'error': TypeError("'NoneType' object is not subscriptable")},
 {'file': './files/sample/2023 05 110.pdf',
  'error': TypeError("'NoneType' object is not subscriptable")},
 {'file': './files/sample/2023 04 101.pdf',
  'error': TypeError("'NoneType' object is not subscriptable")},
 {'file': './files/sample/2023 03 023.pdf',
  'error': TypeError("'NoneType' object is not subscriptable")},
 {'file': './files/sample/2023 03 037.pdf',
  'error': TypeError("'NoneType' object is not subscriptable")},
 {'file': './files/sample/2023 08 051.pdf',
  'error': azure.core.exceptions.HttpResponseError('(InvalidArgument) Invalid argument.\nCode: InvalidArgument\nMessage: Invalid argument.\nInner error: {\n    "code": "InvalidParameter",\n    "message": "The parameter pages is invalid: The page range exceeds the number of pages in the document."\n}')}]

In [116]:
len(df)

2144

In [117]:
df_copy = df.copy()

In [94]:
#df = df_copy.copy()

In [118]:
column_map = {
    'UNIDADES': 'UNIDADES / UNIT',
    'INCERTIDUMBRE': 'INCERTIDUMBRE/ UNCERTAINTY %',
    'RESULTADOS': 'RESULTADOS DE CALIDAD / QUALITY RESULTS',
    'MÉTODO': 'MÉTODO / METHOD',
    'MI': 'MIN',
    'ANALYSES': 'ANÁLISIS / ANALYSES'
}

In [119]:
for keyword,column_name in column_map.items():
    
    try:
        target_cols = list(filter(lambda col: col.find(keyword) >= 0 or \
            unidecode(col).find(keyword) >= 0, df.columns.to_list()))
        
        target_cols.remove(column_name)
        
        for col in target_cols:
            df[column_name].combine_first(df[col])
            df.drop(col, axis = 1, inplace=True)
    except ValueError as e:
        print(f'{column_name} not in list')

In [120]:
df.head(10)

Unnamed: 0,ANÁLISIS / ANALYSES,UNIDADES / UNIT,MIN,MAX,MÉTODO / METHOD,RESULTADOS DE CALIDAD / QUALITY RESULTS,CLIENTE,PRODUCTO,FECHA,ARCHIVO,Unnamed: 11,INCERTIDUMBRE/ UNCERTAINTY %,млх
0,NUMERO DE OCTANO METODO RESEARCH (RON),-,95,-,ASTM D-2699,96.0,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
1,NUMERO DE OCTANO METODO MOTOR (MON),.,82,-,ASTM D-2700,87.4,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
2,CONTENIDO DE PLOMO,G/gal,1,0.02,ASTM D-3237,N/D,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
3,PRESION A VAPOR REID (RVP) @ 100 º F,PSI,1,10.0,ASTM D-323,8.02,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
4,RVP + 0.1 E 70 ° C,1,Reportar,,Calculo,9.74,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
5,DESTILACION 10% VOL. RECUPERADO,0,",",75,ASTM D86,62,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
6,DESTILACION 50% VOL. RECUPERADO,·℃,,121,ASTM D86,110,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
7,DESTILACION 90% VOL. RECUPERADO,°℃,1,190,ASTM D86,159,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
8,PUNTO FINAL,℃,,225,ASTM D86,202,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,
9,RESIDUO,% Vol.,,20,ASTM D86,0.5,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,2023-04-07 15:12:00,2023 07 014.pdf,,,


In [104]:
analyzer.errors[0]

{'file': './files/ocr/2023 03 140.pdf',
 'error': ZeroDivisionError('integer modulo by zero')}

3. CONCLUSIONES

In [95]:
with open('./files/json/2023 01 079.json') as f:
    result = json.load(f)


In [96]:
extractor = DataExtractor(result)

In [97]:
extractor.extract_client_name(1)

'SUNIX 6 DE NOVIEMBRE'

In [98]:
extractor.extract_test_date(1)

datetime.datetime(2023, 1, 31, 6, 55)

In [18]:
df = extractor.extract_table(1)
df.tail(5)

Unnamed: 0,ANÁLISIS / ANALYSES,UNIDADES / UNIT,MIN,MAX,MÉTODO / METHOD,RESULTADOS DE CALIDAD / QUALITY RESULTS,INCERTIDUMBRE / UNCERTAINTY %,CLIENTE,PRODUCTO,FECHA
17,COLOR,,Incoloro,,,Amarillo,,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,
18,OLOR,1,Comerciable,,,Comerciable,,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,
19,DENSIDAD @ 15 ℃,Kg/L,Reportar,,ASTM D-1298,0.7386,,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,
20,GRAVEDAD API @ 60 ° F,-,Reportar,-,ASTM D-1298,60.0,,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,
21,INDICE ANTIDETONANTE,,Reportar,,Calculo,91.7,,ATLANTIC BONAO\n(ANIANA),GASOLINA PREMIUM,


Files processed:   0%|          | 0/810 [00:10<?, ?file/s]


In [None]:
len(analyzer.errors)