In [1]:
from langchain.text_splitter import CharacterTextSplitter
from unstructured.partition.pdf import partition_pdf
from pprint import pprint
import fitz
import tesserocr
from PIL import Image
import re
from ollama import generate
from unidecode import unidecode
import easyocr
from bs4 import BeautifulSoup
import json

In [2]:
%env EXTRACT_IMAGE_BLOCK_CROP_HORIZONTAL_PAD=100
%env EXTRACT_IMAGE_BLOCK_CROP_VERTICAL_PAD=100

env: EXTRACT_IMAGE_BLOCK_CROP_HORIZONTAL_PAD=100
env: EXTRACT_IMAGE_BLOCK_CROP_VERTICAL_PAD=100


In [3]:
pdf_file = 'files/2022-annual-report-wf.pdf'
images_path = 'data/'
output_file = 'parsed/wf.json'

In [4]:
# Extract elements from PDF
def extract_pdf_elements(pdf_file, images_path):
    """
    Extract images, tables, and chunk text from a PDF file.
    path: File path to extract PDF documents
    fname: File name
    image_path: Path to save images
    """
    return partition_pdf(
        filename=pdf_file,    
        strategy='hi_res',  
        infer_table_structure=True,
        # chunking_strategy="by_title",
        hi_res_model_name = 'detectron2_onnx', 
        extract_image_block_types=["Image", "Table","Figure"],
        extract_image_block_output_dir=images_path,
        # max_characters=4000,
        # new_after_n_chars=3800,
        # combine_text_under_n_chars=2000,
    )

In [5]:
# Get elements
raw_pdf_elements = extract_pdf_elements(pdf_file, images_path)

Some weights of the model checkpoint at microsoft/table-transformer-structure-recognition were not used when initializing TableTransformerForObjectDetection: ['model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing TableTransformerForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TableTransformerForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [6]:
class Document:
    def __init__(self, title, parent=None, level=0):
        self.title = title
        self.elements = []
        self.parent = parent
        self.level = level
        self.children = []

    def add_element(self, element):
        self.elements.append(element)

    def add_child(self, child):
        self.children.append(child)

    def to_dict(self):
        elements = [
            {"category": element.category, "text": unidecode(element.text), "metadata": element.metadata.to_dict()}
            for element in self.elements
        ]
        return {
            "title": self.title,
            "elements": elements,
            "children": [child.to_dict() for child in self.children],
        }

In [7]:
"""
Helps in determining the font size of the title
"""

pages = {}
doc = fitz.open(pdf_file)
api = tesserocr.PyTessBaseAPI(path="/usr/share/tesseract-ocr/5/tessdata")


def get_title_level(element):
    metadata = element.metadata

    # Load the page
    page_key = f"{metadata.filename}-{metadata.page_number}"
    if not page_key in pages:
        pages[page_key] = doc.load_page(metadata.page_number - 1)
    page = pages[page_key]

    # Get the image clip
    coords = metadata.coordinates.points
    pix = page.get_pixmap(dpi=200)
    image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples).crop(
        (coords[0][0], coords[0][1], coords[2][0], coords[2][1])
    )

    api.SetImage(image)
    api.Recognize()
    iterator = api.GetIterator()

    attrs = iterator.WordFontAttributes()

    if attrs["pointsize"] < 23:
        return 3
    elif attrs["pointsize"] < 35:
        return 2
    else:
        return 1

In [8]:
reader = easyocr.Reader(['en'],gpu=True)

def has_image_text(image_file_name):
    return len(reader.readtext(image_file_name)) > 2

In [9]:
# The function will help to describe table & image using multimodal model
describe_prommpt = "You are an assistant tasked with summarizing tables and image for retrieval. \
    These summaries will be embedded and used to retrieve the raw table or image elements. \
    Give a concise summary of the table or image that is well optimized for retrieval:"


def get_image_description(image_file_name):
    with open(image_file_name, "rb") as f:
        image = f.read()
        response = generate(model="llava", prompt=describe_prommpt, images=[image], stream=False)
        return response["response"]

In [10]:
def get_last_row_text(html):
    soup = BeautifulSoup(html, "html.parser")

    rows = soup.find_all("tr")
    if len(rows) == 0:
        return None

    last_row = rows[-1]
    print(last_row)
    # Find first td/th with text
    for row in last_row.find_all(["td", "th"]):
        if row.text.strip() != "":
            return row.text.strip().replace("\n", " ")

    return None

In [11]:
def build_documents(elements):

    pages_to_ignore = [11]
    regex_roman_numeral = re.compile(r"^[IVXLCDMivxlcdm]+$")

    top = Document("Wells Fargo", "")
    current_document = top

    prev_category = None
    skip_till_text = None

    for i, element in enumerate(elements):
        print("Processing element ", i, " from page", element.metadata.page_number, "with category", element.category)

        if element.metadata.page_number in pages_to_ignore:
            print("Ignored page", element.metadata.page_number)
            # Ignore this as this is the footer page number
            pass

        elif skip_till_text is not None:
            print("Ignored element ", element.category, " because of skip till text: ", skip_till_text)
            if skip_till_text in element.text:
                skip_till_text = None

        elif element.text == "2022 Annual Report" or element.text == "Wells Fargo & Company":
            print("Ignored page footer", element.text)
            # Ignore this as this is the footer text
            pass

        elif len(element.text) < 6 and (regex_roman_numeral.match(element.text) or element.text.isnumeric()):
            print("Ignored page footer", element.text)
            # Ignore this as this is the footer page number
            pass

        elif element.category == "Title":
            level = get_title_level(element)

            if prev_category == "ListItem" and level == 3 and not element.text[0].isupper():
                # This is a continuation of the previous list item
                # unstructure wrongly classified it as a title
                current_document.add_element(element)

            elif level == current_document.level:
                # Title is at the same level, so create a new document with
                # this title, and add upcoming texts in this document
                current_document = Document(element.text, current_document.parent, level)
                current_document.parent.add_child(current_document)

            elif level < current_document.level:
                # Title is of a higher level, so create a new document with
                # this title, add that to parent.parent
                # and add upcoming texts in this document
                while level <= current_document.parent.level:
                    current_document = current_document.parent
                current_document = Document(element.text, current_document.parent, level)
                current_document.parent.add_child(current_document)

            else:
                # Title is of a lower level, so create a new document with
                # this title, and add upcoming texts in this document
                current_document = Document(element.text, current_document, level)
                current_document.parent.add_child(current_document)

        elif element.category == "Text" or element.category == "NarrativeText" or element.category == "ListItem":
            current_document.add_element(element)

        elif element.category == "Table":
            element.text = get_image_description(element.metadata.image_path)
            current_document.add_element(element)

            # Fetch the text in the last row of the table html
            last_row_text = get_last_row_text(element.metadata.text_as_html)
            if last_row_text is not None:
                skip_till_text = last_row_text

        elif element.category == "Image":
            if has_image_text(element.metadata.image_path):
                element.text = get_image_description(element.metadata.image_path)
                current_document.add_element(element)
            else:
                print("Ignored image as it has no text", element.category)

        else:
            print("Ignored element", element.category)

        prev_category = element.category

    return top

In [12]:
doc = build_documents(raw_pdf_elements)

Processing element  0  from page 1 with category UncategorizedText
Ignored page footer 2022
Processing element  1  from page 1 with category Title
Processing element  2  from page 1 with category NarrativeText
Ignored page footer Wells Fargo & Company
Processing element  3  from page 1 with category Title
Processing element  4  from page 2 with category Title
Processing element  5  from page 2 with category NarrativeText
Processing element  6  from page 2 with category NarrativeText
Processing element  7  from page 2 with category Title
Processing element  8  from page 2 with category NarrativeText
Processing element  9  from page 2 with category NarrativeText
Processing element  10  from page 2 with category NarrativeText
Processing element  11  from page 2 with category NarrativeText
Processing element  12  from page 2 with category NarrativeText
Processing element  13  from page 2 with category NarrativeText
Processing element  14  from page 2 with category NarrativeText
Processing 

In [13]:
with open(output_file, 'w') as f:
    json.dump(doc.to_dict(), f, indent=4)