# Text Chunking 

Separar el texto por las paginas del documento, para poder procesar cada una de ellas de manera independiente. Almacenando los chuncks (El texto por paginas y metadata) en un DataFrame de `polars`.

In [None]:
# Dependencies for Text Chunking

%pip install polars
%pip install sqlalchemy

### PDF text chunking

La salida del proceso de OCR de UNSTRUTURED.io es una lista de diccionarios, donde cada diccionario contiene la metadata del elemento y el texto del mismo elemento, e.g.:

```python
data = [
  {
    "type": "NarrativeText",
    "element_id": "084bcfca09086336d78c8ba5c6103a13",
    "text": "Cuando hablamos de capacitación en ingeniería de software, nos referimos:",
    "metadata": {
      "filetype": "application/pdf",
      "languages": [
        "eng"
      ],
      "page_number": 1,
      "filename": "file.pdf"
    }
  },
  {
    "type": "NarrativeText",
    "element_id": "f666bfbe7368828ec28fc42ca8724ce3",
    "text": "A la mejora de la calidad de los procesos de desarrollo de software\nb. A la mejora de la calidad de los productos de software\nc. A la mejora de la calidad de los servicios de software\nd. A la mejora de la calidad de los sistemas de software", 
    "metadata": {
      "filetype": "application/pdf",
      "languages": [
        "eng"
      ],
      "page_number": 2,
      "filename": "file.pdf"
    }
  }
]
```

Para poder hacer un procesamiento del esta estructura, y crear el DataFrame de `polars` se necesita hacer un proceso de chunking, donde se separe el texto por paginas, y se cree una fila por cada pagina; con la siguiente función se puede hacer este proceso:

```python
import json
from typing import List, Dict
import polars as pl

def pdf_chunk(json_data: List[Dict]) -> pl.DataFrame:
    # Agrupar los elementos por número de página
    pages = {}
    for item in json_data:
        page_number = item['metadata']['page_number']
        if page_number not in pages:
            pages[page_number] = []
        pages[page_number].append(item['text'])
    
    # Crear una lista de diccionarios con la estructura deseada
    data = []
    for page_number, texts in pages.items():
        data.append({
            'metadata': json.dumps({'page_number': page_number, 'filename': json_data[0]['metadata']['filename']}),
            'text': ' '.join(texts)
        })
    
    # Crear el DataFrame de Polars
    return pl.DataFrame(data).with_row_index('id')


# Ejemplo de uso
pdf_chunk(json.loads(data))
```

##### Output

<div><style>
.dataframe > thead > tr,
.dataframe > tbody > tr {
  text-align: right;
  white-space: pre-wrap;
}
</style>
<small>shape: (2, 3)</small><table border="1" class="dataframe"><thead><tr><th>id</th><th>metadata</th><th>text</th></tr><tr><td>u32</td><td>str</td><td>str</td></tr></thead><tbody><tr><td>0</td><td>&quot;{&quot;page_number&quot;: 1, &quot;filename&quot;:…</td><td>&quot;Cuando hablamos de capacitació…</td></tr><tr><td>1</td><td>&quot;{&quot;page_number&quot;: 2, &quot;filename&quot;:…</td><td>&quot;A la mejora de la calidad de l…</td></tr></tbody></table></div>

---

### DataFrame en una Base de Datos SQLite

Para poder almacenar el DataFrame en una base de datos SQLite, se puede usar las siguientes funciones, en donde `save_checkpoint()` guarda el DataFrame en la base de datos, y `load_checkpoint()` carga el DataFrame de la base de datos.:

```python
from typing import Optional
import sqlite3
from sqlalchemy import create_engine
import polars as pl


def save_checkpoint(df: pl.DataFrame, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> None:
    conn = sqlite3.connect(checkpoint_path)
    temp_df = df.clone()
    temp_df.drop_in_place('id')
    temp_df.write_database(table_name=table_name, connection=f"sqlite:///{checkpoint_path}", if_table_exists="replace")
    conn.close()

def load_checkpoint(df: pl.DataFrame, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> pl.DataFrame:
    conn = create_engine(f"sqlite:///{checkpoint_path}")
    query = f"SELECT * FROM {table_name}"
    df = pl.read_database(query=query, connection=conn.connect()).with_row_index('id')
    return df
```

## Clase TextChunk

**Con la estructura que general el proceso de OCR de UNSTRUCTURED.io.**

In [10]:
import json
import sqlite3
import copy
from typing import List, Dict, Optional, Union
import polars as pl
from sqlalchemy import create_engine


class TextChunk():
    def __init__(self, current_df: Optional[pl.DataFrame] = pl.DataFrame()):
        self.current_df = current_df

    def __pdf_chunk(self, json_data: List[Dict]) -> pl.DataFrame:
        # Agrupar los elementos por número de página
        pages = {}
        for item in json_data:
            page_number = item['metadata']['page_number']
            if page_number not in pages:
                pages[page_number] = []
            pages[page_number].append(item['text'])
        
        # Crear una lista de diccionarios con la estructura deseada
        data = []
        for page_number, texts in pages.items():
            data.append({
                'metadata': json.dumps({'page_number': page_number, 'filetype': [0]['metadata']['filetype'], 'filename': json_data[0]['metadata']['filename']}),
                'text': ' '.join(texts)
            })
        
        # Crear el DataFrame de Polars
        return pl.DataFrame(data).with_row_index('id', offset=len(self.current_df)+1)

    def __rtf_chunk(self, json_data: List[Dict]) -> pl.DataFrame:
        metadata = {
            "filetype": json_data[0]['metadata']['filetype'], 
            "filename": json_data[0]['metadata']['filename']
        }
        text = []
        for item in json_data:
            text.append(item['text'])
        data = {
            'metadata': json.dumps(metadata),
            'text': ' '.join(text)
        }
        # Crear el DataFrame de Polars
        return pl.DataFrame(data).with_row_index('id', offset=len(self.current_df)+1) 

    def __add_if_not_exists(self, new_data: Union[pl.DataFrame, Dict], key_columns: Optional[List]=None) -> pl.DataFrame:
        if key_columns is None:
            key_columns = ['metadata', 'text']
        # Si nuevos_datos es un diccionario, convertirlo a DataFrame
        if isinstance(new_data, dict):
            new_data = pl.DataFrame([new_data])
        if not isinstance(new_data, pl.DataFrame):
            raise TypeError("nuevos_datos debe ser un DataFrame de Polars o un diccionario")
        if self.current_df.is_empty():
            self.current_df = self.current_df.vstack(new_data)
            return self.current_df
        # Crear una expresión para verificar si los datos ya existen
        condition = pl.all_horizontal([
            pl.col(col).is_in(new_data[col])
            for col in key_columns
        ])
        # Filtrar los datos existentes
        existing_data = self.current_df.filter(condition)
        # Identificar los datos nuevos
        new = new_data.join(
            existing_data.select(key_columns),
            on=key_columns,
            how="anti"
        )
        # Si hay datos nuevos, agregarlos al DataFrame original
        if not new.is_empty():
            print("Se han encontrado datos nuevos para agregar")
            self.current_df = pl.concat([self.current_df, new], how="vertical")
        else:
            print("No hay datos nuevos para agregar")
        return self.current_df
    
    def text_chunks_to_dataframe(self, json_data: List[Dict]) -> pl.DataFrame:
        filetype = json_data[0]['metadata']['filetype']
        if filetype == "application/pdf":
            df = self.__pdf_chunk(json_data)
        elif filetype == "text/rtf":
            df = self.__rtf_chunk(json_data)
        elif filetype.startswith('text'):
            data = copy.deepcopy(json_data)
            data[0]['metadata'] = json.dumps(data[0]['metadata'])
            df = pl.DataFrame(data).with_row_index('id', offset=len(self.current_df)+1)

        self.__add_if_not_exists(new_data=df)
        return self.current_df

    def save_checkpoint(self, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> None:
        conn = sqlite3.connect(checkpoint_path)
        temp_df = self.current_df.clone()
        temp_df.drop_in_place('id')
        temp_df.write_database(table_name=table_name, connection=f"sqlite:///{checkpoint_path}", if_table_exists="replace")
        conn.close()
    
    def load_checkpoint(self, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> pl.DataFrame:
        conn = create_engine(f"sqlite:///{checkpoint_path}")
        query = f"SELECT * FROM {table_name}"
        self.current_df = pl.read_database(query=query, connection=conn.connect()).with_row_index('id')
        return self.current_df

---

## Clase TextChunk

**Con la capa de OCR que se genera con ayuda de OCRmyPDF**

In [9]:
import json
import copy
import sqlite3
from typing import List, Dict, Optional, Union
from sqlalchemy import create_engine
import polars as pl


class TextChunk():
    """
    Class to handle text chunks and add them to a Polars DataFrame
    """
    def __init__(self, current_df: Optional[pl.DataFrame] = pl.DataFrame()):
        self.current_df = current_df
    
    def _pdf_chunk(self, json_data: List[Dict]) -> pl.DataFrame:
        """
        Create a DataFrame from a list of dictionaries with text chunks from a PDF file

        Args:
            json_data (List[Dict]): List of dictionaries with text chunks from a PDF file
        
        Returns:
            pl.DataFrame: polars DataFrame with the text chunks
        """
        for item in json_data:
            item['metadata'] = json.dumps(item['metadata'])
            item['text'] = item['text'].decode('utf-8')
        # Crea el DataFrame de Polars con los datos y un índice que empieza en el último índice del DataFrame actual
        return pl.DataFrame(json_data).with_row_index('id', offset=len(self.current_df)+1)

    def _add_if_not_exists(self, new_data: Union[pl.DataFrame, Dict], key_columns: Optional[List] = ['metadata', 'text']) -> pl.DataFrame:
        """
        Add new data to the current DataFrame if it does not exist

        Args:
            new_data (Union[pl.DataFrame, Dict]): New data to add to the DataFrame
            key_columns (Optional[List]): Columns to use as keys to identify if the data already exists, by default ['metadata', 'text']
        
        Returns:
            pl.DataFrame: Currently updated polars DataFrame
        """
        # Si nuevos_datos es un diccionario, convertirlo a DataFrame, si no, verificar que sea un DataFrame
        if isinstance(new_data, dict):
            new_data = pl.DataFrame([new_data])
        elif not isinstance(new_data, pl.DataFrame):
            raise TypeError("nuevos_datos debe ser un DataFrame de Polars o un diccionario")
        # Si el DataFrame actual está vacío, asignarle los nuevos datos y retornarlo
        if self.current_df.is_empty():
            self.current_df = self.current_df.vstack(new_data)
            return self.current_df
        # Crear una expresión para verificar si los datos ya existen
        condition = pl.all_horizontal([
            pl.col(col).is_in(new_data[col])
            for col in key_columns
        ])
        # Filtrar los datos existentes
        existing_data = self.current_df.filter(condition)
        # Identificar los datos nuevos
        new = new_data.join(
            existing_data.select(key_columns),
            on=key_columns,
            how="anti"
        )
        # Si hay datos nuevos, agregarlos al DataFrame original
        if not new.is_empty():
            print("Se han encontrado datos nuevos para agregar")
            self.current_df = pl.concat([self.current_df, new], how="vertical")
        else:
            print("No hay datos nuevos para agregar")
        return self.current_df
    
    def add_chunks_to_dataframe(self, json_data: List[Dict]) -> pl.DataFrame:
        """
        Add text chunks to the current DataFrame. The method will identify the type of file and call the corresponding method to process the data

        Args:
            json_data (List[Dict]): List of dictionaries with text chunks
        
        Returns:
            pl.DataFrame: Currently updated polars DataFrame
        """
        filetype = json_data[0]['metadata']['filetype']
        if filetype == "application/pdf":
            df = self._pdf_chunk(json_data)
        elif filetype.startswith('text'):
            data = copy.deepcopy(json_data)
            data[0]['metadata'] = json.dumps(data[0]['metadata'])
            df = pl.DataFrame(data).with_row_index('id', offset=len(self.current_df)+1)
        self._add_if_not_exists(new_data=df)
        return self.current_df

    def save_checkpoint(self, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> None:
        """
        Save the current DataFrame to a SQLite database

        Args:
            checkpoint_path (str): Path to the SQLite database
            table_name (Optional[str]): Name of the table to store the data, by default 'ocr_data'

        Returns:
            None
        """
        conn = sqlite3.connect(checkpoint_path)
        temp_df = self.current_df.clone()
        temp_df.drop_in_place('id')
        temp_df.write_database(table_name=table_name, connection=f"sqlite:///{checkpoint_path}", if_table_exists="replace")
        conn.close()
    
    def load_checkpoint(self, checkpoint_path: str, table_name: Optional[str] = 'ocr_data') -> pl.DataFrame:
        """
        Load a DataFrame from a SQLite database

        Args:
            checkpoint_path (str): Path to the SQLite database
            table_name (Optional[str]): Name of the table to load the data from, by default 'ocr_data'

        Returns:
            pl.DataFrame: DataFrame loaded from the SQLite database (current DataFrame)
        """
        conn = create_engine(f"sqlite:///{checkpoint_path}")
        query = f"SELECT * FROM {table_name}"
        self.current_df = pl.read_database(query=query, connection=conn.connect()).with_row_index('id')
        return self.current_df


#### Ejemplo de uso

In [12]:
import sys
sys.path.append('./..')

from core import OCR

data = OCR.get_ocr("example.pdf")
text_chunk = TextChunk()
df = text_chunk.add_chunks_to_dataframe(data)
print(df)


This PDF is marked as a Tagged PDF. This often indicates that the PDF was generated from an office document and does not need OCR. PDF pages processed by OCRmyPDF may not be tagged correctly.
Start processing 2 pages concurrently
    1 page already has text! - rasterizing text and running OCR anyway
    2 page already has text! - rasterizing text and running OCR anyway
Postprocessing...


OCR aplicado exitosamente a /Users/jorge-jrzz/Desktop/chunks/API/dev/example.pdf. Salida: /Users/jorge-jrzz/Desktop/chunks/API/dev/example.pdf
shape: (2, 3)
┌─────┬─────────────────────────────────┬─────────────────────────────────┐
│ id  ┆ metadata                        ┆ text                            │
│ --- ┆ ---                             ┆ ---                             │
│ u32 ┆ str                             ┆ str                             │
╞═════╪═════════════════════════════════╪═════════════════════════════════╡
│ 1   ┆ {"filetype": "application/pdf"… ┆ Titulo                          │
│     ┆                                 ┆ Encabezado 1                    │
│     ┆                                 ┆ Para empez…                     │
│ 2   ┆ {"filetype": "application/pdf"… ┆ Esto es texto que esta en una … │
└─────┴─────────────────────────────────┴─────────────────────────────────┘


Image optimization did not improve the file - optimizations will not be used
Image optimization ratio: 1.00 savings: -0.0%
Total file size ratio: 1.00 savings: 0.0%
