https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration

https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-python-example_code-textract.html

https://docs.aws.amazon.com/textract/latest/dg/what-is.html


In [42]:
pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-0.21.0-py3-none-any.whl (18 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-0.21.0
Note: you may need to restart the kernel to use updated packages.


In [45]:
%load_ext dotenv
%dotenv /home/jovyan/work/.env

In [73]:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Purpose

Shows how to use the AWS SDK for Python (Boto3) with Amazon Textract to
detect text, form, and table elements in document images.
"""

import json
import logging
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

# snippet-start:[python.example_code.textract.TextractWrapper]
class TextractWrapper:
    """Encapsulates Textract functions."""
    def __init__(self, textract_client, s3_resource, sqs_resource):
        """
        :param textract_client: A Boto3 Textract client.
        :param s3_resource: A Boto3 Amazon S3 resource.
        :param sqs_resource: A Boto3 Amazon SQS resource.
        """
        self.textract_client = textract_client
        self.s3_resource = s3_resource
        self.sqs_resource = sqs_resource
    # snippet-end:[python.example_code.textract.TextractWrapper]
    
    # snippet-start:[python.example_code.textract.StartDocumentAnalysis]
    def start_analysis_job(
            self, bucket_name, document_file_name, feature_types, sns_topic_arn,
            sns_role_arn):
        """
        Starts an asynchronous job to detect text and additional elements, such as
        forms or tables, in an image stored in an Amazon S3 bucket. Textract publishes
        a notification to the specified Amazon SNS topic when the job completes.
        The image must be in PNG, JPG, or PDF format.
        :param bucket_name: The name of the Amazon S3 bucket that contains the image.
        :param document_file_name: The name of the document image stored in Amazon S3.
        :param feature_types: The types of additional document features to detect.
        :param sns_topic_arn: The Amazon Resource Name (ARN) of an Amazon SNS topic
                              where job completion notification is published.
        :param sns_role_arn: The ARN of an AWS Identity and Access Management (IAM)
                             role that can be assumed by Textract and grants permission
                             to publish to the Amazon SNS topic.
        :return: The ID of the job.
        """
        try:
            response = self.textract_client.start_document_analysis(
                DocumentLocation={
                    'S3Object': {'Bucket': bucket_name, 'Name': document_file_name}},
                NotificationChannel={
                    'SNSTopicArn': sns_topic_arn, 'RoleArn': sns_role_arn},
                FeatureTypes=feature_types)
            job_id = response['JobId']
            logger.info(
                "Started text analysis job %s on %s.", job_id, document_file_name)
        except ClientError:
            logger.exception("Couldn't analyze text in %s.", document_file_name)
            raise
        else:
            return job_id
    # snippet-end:[python.example_code.textract.StartDocumentAnalysis]

### How to Use Amazon Textract

* [How to Use Amazon Textract](https://stackoverflow.com/questions/59038306/how-to-use-the-amazon-textract-with-pdf-files)

* Create new S3 bucket in console and write down bucket name, then:
* Run the code.
* It may take 5-50 seconds, until the call to ```get_document_text_detection(...)``` returns a result. Before, it will say that it is still processing.
* According to my understanding, for each token, exactly one paid API call will be performed - and a past one will be retrieved, if the token has appeared in the past.
* For Large documents: There is one intricacy if the document is large, in which case the result may need to be stitched together from multiple 'pages'. The kind of code you will need to add is:

```
pages = [response]
while nextToken := response.get('NextToken'):
    response = client.get_document_text_detection(JobId=jobid, NextToken=nextToken)
    pages.append(response)
```

#### Retrieving the Response

* JobId
* The identifier for the document text detection job. Use JobId to identify the job in a subsequent call to GetDocumentAnalysis. A JobId value is only valid for 7 days.

In [80]:
if __name__ == "__main__":
    
    from dotenv import load_dotenv
    import random
    from time import sleep
    import os
    load_dotenv()
    import json
    import logging
    logging.basicConfig(level=logging.INFO,force = True)
    import boto3
    from botocore.exceptions import ClientError

    bucketName = 'textract-input-20221001'
    documentFileName='example.pdf'
    ACCESS_KEY = os.getenv("ACCESS_KEY")
    SECRET_KEY = os.getenv("SECRET_KEY")

    textractClient = boto3.client('textract', 
                          region_name='us-east-1', 
                          aws_access_key_id=ACCESS_KEY,
                          aws_secret_access_key=SECRET_KEY)
    
    s3Resource = boto3.resource('s3',  
                      aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
    
    # possible_feature_types = ['TABLES','FORMS','QUERIES']
    feature_types = ['TABLES']

    s3bucket = s3Resource.Bucket(bucketName)

    extracted_data = []
    for s3_file in s3bucket.objects.all():
        print(s3_file)
        
        # use textract to process s3 file
        response = textractClient.start_document_analysis(
                DocumentLocation={'S3Object': {'Bucket': bucket, 'Name': s3_file.key}},
                # ClientRequestToken=random.randint(1,1e10),
                FeatureTypes=feature_types)

        job_Id = response['JobId']
        logger.info("Started text analysis job on job_id: ", job_id, "s3_file.key: ", s3_file.key)

        # sleep 2 seconds to prevent ProvisionedThroughputExceededException
        sleep(2)        
        
        """
        blocks=response['Blocks']

        for block in blocks:
                if block['BlockType'] != 'PAGE':
                    print('Detected: ' + block['Text'])
                    print('Confidence: ' + "{:.2f}".format(block['Confidence']) + "%")
                    
                    # Example case where you want to extract words with #
                    if("#" in block['Text']):
                        words = block['Text'].split()
                        for word in words:
                               if("#" in word):
                                    extracted_data.append({"word" : word, "file" : s3_file.key, "confidence": "{:.2f}".format(block['Confidence']) + "%"})

        df = pd.DataFrame(extracted_data)
        df = df.drop_duplicates()
        df.to_csv('output.csv')
    
        """

s3.ObjectSummary(bucket_name='textract-input-20221001', key='example.pdf')


--- Logging error ---
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/logging/__init__.py", line 1100, in emit
    msg = self.format(record)
  File "/opt/conda/lib/python3.10/logging/__init__.py", line 943, in format
    return fmt.format(record)
  File "/opt/conda/lib/python3.10/logging/__init__.py", line 678, in format
    record.message = record.getMessage()
  File "/opt/conda/lib/python3.10/logging/__init__.py", line 368, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.10/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/opt/conda/lib/python3.10/site-packages/traitlets/config/application.py", line 976

In [84]:
extractedData = textractClient.get_document_analysis(JobId=job_Id)

In [86]:
print("Hi")

Hi


### How to Use Output of AWS Textract Form

* After having extracted a form from an example PDF and recieved a jobID, a large json object will result.
* There is basically pre-written AWS code which shows how to convert said information into a CSV which can be adapted.

* [Pre-written CSV Table Example](https://docs.aws.amazon.com/textract/latest/dg/examples-export-table-csv.html)

* Note that there are both synchronous and asynchronous versions.

In [90]:
import boto3
from pprint import pprint

jobId = 'da5cb1e7daeb0f8f882782292a5304951bb394a6e9286d836bf887b64d00c9f7'
region_name = 'us-east-1'
file_name = 'output'

textract = boto3.client('textract', region_name=region_name)

# Display information about a block
def DisplayBlockInfo(block):
    print("Block Id: " + block['Id'])
    print("Type: " + block['BlockType'])
    if 'EntityTypes' in block:
        print('EntityTypes: {}'.format(block['EntityTypes']))

    if 'Text' in block:
        print("Text: " + block['Text'])

    if block['BlockType'] != 'PAGE':
        print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%")

def GetResults(jobId, file_name):
    maxResults = 1000
    paginationToken = None
    finished = False

    while finished == False:

        response = None

        if paginationToken == None:
            response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults)
        else:
            response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults,
                                                           NextToken=paginationToken)

        blocks = response['Blocks']
        table_csv = get_table_csv_results(blocks)
        output_file = file_name + ".csv"
        # replace content
        with open(output_file, "at") as fout:
            fout.write(table_csv)
        # show the results
        print('Detected Document Text')
        print('Pages: {}'.format(response['DocumentMetadata']['Pages']))
        print('OUTPUT TO CSV FILE: ', output_file)

        # Display block information
        for block in blocks:
            DisplayBlockInfo(block)
            print()
            print()

        if 'NextToken' in response:
            paginationToken = response['NextToken']
        else:
            finished = True


def get_rows_columns_map(table_result, blocks_map):
    rows = {}
    for relationship in table_result['Relationships']:
        if relationship['Type'] == 'CHILD':
            for child_id in relationship['Ids']:
                try:
                    cell = blocks_map[child_id]
                    if cell['BlockType'] == 'CELL':
                        row_index = cell['RowIndex']
                        col_index = cell['ColumnIndex']
                        if row_index not in rows:
                            # create new row
                            rows[row_index] = {}

                        # get the text value
                        rows[row_index][col_index] = get_text(cell, blocks_map)
                except KeyError:
                    print("Error extracting Table data - {}:".format(KeyError))
                    pass
    return rows


def get_text(result, blocks_map):
    text = ''
    if 'Relationships' in result:
        for relationship in result['Relationships']:
            if relationship['Type'] == 'CHILD':
                for child_id in relationship['Ids']:
                    try:
                        word = blocks_map[child_id]
                        if word['BlockType'] == 'WORD':
                            text += word['Text'] + ' '
                        if word['BlockType'] == 'SELECTION_ELEMENT':
                            if word['SelectionStatus'] == 'SELECTED':
                                text += 'X '
                    except KeyError:
                        print("Error extracting Table data - {}:".format(KeyError))

    return text


def get_table_csv_results(blocks):

    pprint(blocks)

    blocks_map = {}
    table_blocks = []
    for block in blocks:
        blocks_map[block['Id']] = block
        if block['BlockType'] == "TABLE":
            table_blocks.append(block)

    if len(table_blocks) <= 0:
        return "<b> NO Table FOUND </b>"

    csv = ''
    for index, table in enumerate(table_blocks):
        csv += generate_table_csv(table, blocks_map, index + 1)
        csv += '\n\n'
        # In order to generate separate CSV file for every table, uncomment code below
        #inner_csv = ''
        #inner_csv += generate_table_csv(table, blocks_map, index + 1)
        #inner_csv += '\n\n'
        #output_file = file_name + "___" + str(index) + ".csv"
        # replace content
        #with open(output_file, "at") as fout:
        #    fout.write(inner_csv)

    return csv


def generate_table_csv(table_result, blocks_map, table_index):
    rows = get_rows_columns_map(table_result, blocks_map)

    table_id = 'Table_' + str(table_index)

    # get cells.
    csv = 'Table: {0}\n\n'.format(table_id)

    for row_index, cols in rows.items():

        for col_index, text in cols.items():
            csv += '{}'.format(text) + ","
        csv += '\n'

    csv += '\n\n\n'
    return csv

response_blocks = GetResults(jobId, file_name)

generate_table_csv(table_result, blocks_map, table_index)

[{'BlockType': 'PAGE',
  'Geometry': {'BoundingBox': {'Height': 1.0,
                               'Left': 0.0,
                               'Top': 0.0,
                               'Width': 1.0},
               'Polygon': [{'X': 1.5849614334573464e-16, 'Y': 0.0},
                           {'X': 1.0, 'Y': 9.462437987838284e-17},
                           {'X': 1.0, 'Y': 1.0},
                           {'X': 0.0, 'Y': 1.0}]},
  'Id': 'a0a56d07-8647-47f9-a521-41fdce32e714',
  'Page': 1,
  'Relationships': [{'Ids': ['65983361-5294-49ca-b239-ee79ceffc973',
                             '00053d7e-aca9-4a9f-ab5e-348c756a4bd4',
                             '0322fb52-d7a4-407f-aa42-9ddc181f06c3',
                             '9eb96d51-4cc8-44ef-9a12-c57d0612fa08',
                             '89f40669-66d7-414c-ad3b-8a1affb08765',
                             '854d0434-62b1-4728-92ee-7aeca32b84bb',
                             '17206335-c210-4cc2-8284-026e7fa483ca',
                   

NameError: name 'table_result' is not defined