# 03 - Document processing: PDF to Plaintext and tables
This notebook will help you to convert your PDF documents into plaintext.

Requirements:
- The PDF documents need to be stored in an S3 bucket
- Access to AWS Textract

Features:
- Supports electronic and scanned PDF documents
- Also extracts tables included in your PDF documents

Limitations:
- Pages are processed seperately and hence context from one page to another can be lost, multi-page tables are not extracted correctly for the same reason

To demonstrate the use case we have made some documents available that you can use. <br>
These documents are the annual reports of several large marine logistics companies and they can be found in the `financial_reports` folder.
If the `financial_reports` folder is empty or absent, run the `02-download-raw-pdf-documents` notebook to download the sample documents.

In [None]:
import os
import boto3


# Method that will upload the financial reports to S3
def upload_files_with_metadata(bucket_name, s3_key_base, prepared_pdfs_metadata):
    s3_client = boto3.client("s3")

    for pdf_metadata in prepared_pdfs_metadata:
        local_file_path = pdf_metadata["local_pdf_path"]
        del pdf_metadata["local_pdf_path"]
        # Prepare S3 object key with the same folder structure
        company = pdf_metadata["company"]
        filename = local_file_path.split("/")[-1]
        s3_object_key = os.path.join(s3_key_base, company, filename)
        print(pdf_metadata)

        pdf_metadata["pages_kept"] = str(pdf_metadata["pages_kept"])
        # Upload the file to S3 with metadata
        with open(local_file_path, "rb") as f:
            s3_client.upload_fileobj(
                f, bucket_name, s3_object_key, ExtraArgs={"Metadata": pdf_metadata}
            )

Make sure to attach the IAM Policy ARN in the output of the CDK stack to the SageMaker Execution Role used in this SageMaker Studio environment. This policy contains all the permissions required to read the parameters from AWS System Manager (SSM) among others.

The IAM Policy will follow the template below:

```
arn:aws:iam::{aws_account_id}:policy/AssistantBackendStack-sageMakerPostgresDBAccessIAMPolicyXXXXX-XXXXXX"
```

You can identify the current SageMaker Execution role programmatically by running the following cell

In [None]:
import sagemaker

sagemaker.get_execution_role()

In [None]:
import boto3

ssm_client = boto3.client("ssm")

s3_bucket_name_parameter = "/AgenticLLMAssistant/AgentDataBucketParameter"

S3_BUCKET_NAME = ssm_client.get_parameter(Name=s3_bucket_name_parameter)
S3_BUCKET_NAME = S3_BUCKET_NAME["Parameter"]["Value"]

In [None]:
# Upload the financial reports to your bucket in S3 and add relevant metadata
s3_key_base = "prepared_pdf_documents"
# prepared_base_directory = "financial_reports/prepared/"
raw_base_directory = "raw_documents"
prepared_base_directory = os.path.join(raw_base_directory, "prepared/")
prepared_base_directory

In [None]:
import json

with open(
    os.path.join(prepared_base_directory, "metadata.json"), "r"
) as prepared_pdfs_metadata_obj:
    prepared_pdfs_metadata = json.load(prepared_pdfs_metadata_obj)

prepared_pdfs_metadata

In [None]:
ls {prepared_base_directory}

In [None]:
upload_files_with_metadata(S3_BUCKET_NAME, s3_key_base, prepared_pdfs_metadata)

In [None]:
!aws s3 ls {S3_BUCKET_NAME}/prepared_pdf_documents/Amazon/

## Define methods to extract data from PDF documents in S3

A high level overview of the logic is that:

1. We start a Textract analysis job for each of the documents in the S3 bucket using `start_analysis_jobs` and monitor their progress using `check_jobs_status` method.
2. When they are done, we get the results using `get_job_results` which consolidates the results of the jobs and ensures that for each document we have all the extracted text for each page, seperately.
3. The `extract_text` and `extract_tables` methods extract the text and tables present on a page.
4. The `detect_groups` method uses clustering to determine clusters of text on the page.
5. the found clusters are then ordered left to right and top to bottom to determine the reading order on the page.

This process is depicted below:

![pdf to plain text](assets/pdf-to-plain-text.png)|
-

In [None]:
import boto3

client = boto3.client("textract")

#### Method to start the Textract job

In [None]:
import time


def get_s3_file_metadata(bucket_name, file_key):
    s3 = boto3.client("s3")

    response = s3.head_object(Bucket=bucket_name, Key=file_key)

    # Extract the metadata from the response
    metadata = response["Metadata"]

    return metadata


def start_analysis_jobs(s3_bucket, s3_object_keys):
    client = boto3.client("textract")
    job_id_dict = {}

    for key in s3_object_keys:
        print(s3_bucket, key)
        response = client.start_document_analysis(
            DocumentLocation={"S3Object": {"Bucket": s3_bucket, "Name": key}},
            FeatureTypes=["TABLES"],
        )

        # Define the document source
        document_source_location = f"s3://{s3_bucket}/{key}"

        # Store the JobId for each s3 object key
        job_id_dict[document_source_location] = {
            "s3_bucket": s3_bucket,
            "s3_key": key,
            "document_source": document_source_location,
            "metadata": get_s3_file_metadata(s3_bucket, key),
            "job_id": response["JobId"],
        }

    return job_id_dict


def check_jobs_status(job_id_dict):
    client = boto3.client("textract")
    unfinished_jobs = set([job["job_id"] for job in job_id_dict.values()])

    while unfinished_jobs:
        print(f"Documents being processed: {len(unfinished_jobs)}\n")
        for job_id in list(
            unfinished_jobs
        ):  # We use list to avoid modifying the set during iteration
            response = client.get_document_analysis(JobId=job_id)

            status = response["JobStatus"]
            if status in ["SUCCEEDED", "FAILED"]:
                unfinished_jobs.remove(job_id)

        # To prevent rapidly hitting the API, we sleep for a short duration
        time.sleep(5)

    print("Finished all jobs!")

#### Method to track progress of Textract job and to extract results from response
The results for all pages are extracted and then those belonging to the same page are combined and put into chronological order.

In [None]:
def get_job_results(job_id):
    pages = {}
    response = client.get_document_analysis(JobId=job_id)

    for block in response["Blocks"]:
        if "Page" in block:
            if block["Page"] not in pages:
                # If the page number has not been encountered yet, create a new list
                pages[block["Page"]] = []
            pages[block["Page"]].append(block)

    next_token = None
    if "NextToken" in response:
        next_token = response["NextToken"]

    while next_token:
        response = client.get_document_analysis(JobId=job_id, NextToken=next_token)

        for block in response["Blocks"]:
            if "Page" in block:
                if block["Page"] not in pages:
                    # If the page number has not been encountered yet, create a new list
                    pages[block["Page"]] = []
                pages[block["Page"]].append(block)

        next_token = None
        if "NextToken" in response:
            next_token = response["NextToken"]

    # Convert dictionary to list for the return
    # Here we are sorting the keys (page numbers) to ensure the list is in the correct order
    pages_list = [pages[key] for key in sorted(pages.keys())]

    return pages_list

#### Helper method to determine how to group text to preserve reading order

In [None]:
from sklearn.cluster import DBSCAN
import numpy as np


def detect_groups(lines):
    # Convert the left positions of the lines to a 2D numpy array.
    X = np.array([[line[1]] for line in lines])

    # Check whether X is 2D
    if len(X.shape) != 2:
        return []

    # Use the DBSCAN algorithm to cluster the lines into groups.
    clustering = DBSCAN(eps=0.05, min_samples=1).fit(X)

    # Initialize an empty list for each group.
    groups = [[] for _ in range(max(clustering.labels_) + 1)]

    # Assign each line to the appropriate group.
    for i, line in enumerate(lines):
        groups[clustering.labels_[i]].append(line)

    # Sort groups by their leftmost position.
    groups.sort(key=lambda group: min(line[1] for line in group))

    return groups

#### Method to extract the text from the result

In [None]:
def extract_text(blocks):
    lines = []

    for item in blocks:
        if item["BlockType"] == "LINE":
            # Get the line text and its position.
            text = item["Text"]
            left = item["Geometry"]["BoundingBox"]["Left"]
            top = item["Geometry"]["BoundingBox"]["Top"]
            lines.append((top, left, text))

    # Detect groups.
    groups = detect_groups(lines)

    # Sort the lines in each group by their top position.
    for group in groups:
        group.sort(key=lambda x: x[0])  # sort by 'top' position

    # Extract the sorted text.
    text = ""
    for group in groups:
        for line in group:
            text += line[2] + "\n"

    return text

#### Method to extract the tables from the result

In [None]:
import pandas as pd


# Helper method to extract text from cells
def get_text(result, blocks_map):
    text = ""
    if "Relationships" in result:
        for relationship in result["Relationships"]:
            if relationship["Type"] == "CHILD":
                for child_id in relationship["Ids"]:
                    word = blocks_map[child_id]
                    if word["BlockType"] == "WORD":
                        text += word["Text"] + " "
                    if word["BlockType"] == "SELECTION_ELEMENT":
                        if word["SelectionStatus"] == "SELECTED":
                            text += "X "
    return text


# Helper method to determine table structure
def get_rows_columns_map(table_result, blocks_map):
    rows = {}
    for relationship in table_result["Relationships"]:
        if relationship["Type"] == "CHILD":
            for child_id in relationship["Ids"]:
                cell = blocks_map[child_id]
                if cell["BlockType"] == "CELL":
                    row_index = cell["RowIndex"]
                    col_index = cell["ColumnIndex"]
                    if row_index not in rows:
                        # create new row
                        rows[row_index] = {}

                    # get the text value
                    rows[row_index][col_index] = get_text(cell, blocks_map)
    return rows


# Helper method to create df from table
def create_df(table_result, blocks_map):
    rows = get_rows_columns_map(table_result, blocks_map)
    data = []

    # Iterate over rows
    for row_index, cols in rows.items():
        row = []
        # Iterate over columns
        for col_index, text in cols.items():
            row.append(text)
        data.append(row)

    # Convert list of lists to DataFrame
    df = pd.DataFrame(data)

    return df


def extract_tables(blocks):
    blocks_map = {}
    table_blocks = []
    for block in blocks:
        blocks_map[block["Id"]] = block
        if block["BlockType"] == "TABLE":
            table_blocks.append(block)

    # Get the table extraction
    dfs = []
    if len(table_blocks) > 0:
        for index, table in enumerate(table_blocks):
            # Create the df
            df = create_df(table, blocks_map)

            # Process the extracted table
            df = df.replace("", np.nan)
            df = df.dropna(axis=0, how="all")
            df = df.dropna(axis=1, how="all")
            df = df.replace(np.nan, "")
            dfs.append(df.to_markdown())

    return dfs

#### Method to find all documents

In [None]:
def list_files_in_s3_folder(bucket_name, folder_name=""):
    s3_client = boto3.client("s3")

    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)

    return [obj["Key"] for obj in response["Contents"]]

#### Methods to process the documents in parallel

In [None]:
def get_document_results(job_description):
    job_id = job_description["job_id"]
    document_name = job_description["s3_key"].split("/")[-1]
    document_source_location = job_description["document_source"]
    document_metadata = job_description["metadata"]

    textract_response = get_job_results(job_id)

    page_texts = [extract_text(blocks) for blocks in textract_response]
    page_tables = [extract_tables(blocks) for blocks in textract_response]

    if not len(page_texts) == len(page_tables):
        raise Exception(
            "Something went wrong during processing, text and table lengths don't match"
        )

    document_pages = []
    for page_nr, (page_text, page_table_list) in enumerate(
        zip(page_texts, page_tables)
    ):
        print(page_table_list)
        document_pages.append(
            {"page": page_nr, "page_text": page_text, "page_tables": page_table_list}
        )

    document = {
        "name": document_name,
        "source_location": document_source_location,
        "metadata": document_metadata,
        "pages": document_pages,
    }

    return document

In [None]:
import concurrent.futures


def process_jobs_in_parallel(job_id_dict):
    # Create a ThreadPoolExecutor. Adjust the max_workers parameter as needed.
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # Use the executor to start a job for each job id.
        # The executor will manage the threads and the jobs will run in parallel.
        future_to_job_id = {
            executor.submit(get_document_results, job_id_dict[key]): job_id_dict[key]
            for key in job_id_dict.keys()
        }

        results = []
        for future in concurrent.futures.as_completed(future_to_job_id):
            job_id = future_to_job_id[future]["job_id"]
            document_source_location = future_to_job_id[future]["document_source"]
            try:
                # If a job completed without raising an exception, its result is returned.
                # If a job raised an exception, that exception is re-raised here.
                result = future.result()
            except Exception as exc:
                print(
                    f"Job {job_id} generated an exception while trying to process document {document_source_location}:\n {exc}\n"
                )
            else:
                # Add the result to our results list.
                print(f"Finished processing document {document_source_location}.\n")
                results.append(result)
    return results

## Run the extraction
Make sure to have all of your PDF documents in a folder in an S3 bucket.

In [None]:
# folder_name = 'financial_reports'
s3_key = "prepared_pdf_documents"
document_keys = list_files_in_s3_folder(S3_BUCKET_NAME, s3_key)

In [None]:
document_keys

## Start the Textract analysis jobs and wait for them to finish

In [None]:
job_id_dict = start_analysis_jobs(S3_BUCKET_NAME, document_keys)
check_jobs_status(job_id_dict)

## Receive and process the results from Textract

If any of the jobs generate an exception you can run `get_document_results(job_id_dict['s3://<your_document_url>'])` to get the Traceback.

In [None]:
documents_processed = process_jobs_in_parallel(job_id_dict)

# Store the results

In [None]:
from utils.helpers import store_list_to_s3

In [None]:
# We serialize and save the document processing results as a json to reuse in different jobs
store_list_to_s3(S3_BUCKET_NAME, "documents_processed.json", documents_processed)

In [None]:
!aws s3 ls {S3_BUCKET_NAME}/documents_processed.json

In [None]:
len(documents_processed)