In [2]:
def ShowBoundingBox(draw,box,width,height,boxColor):
    left = width * box['Left']
    top = height * box['Top']
    draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],outline=boxColor) 

def ShowSelectedElement(draw,box,width,height,boxColor):
    left = width * box['Left']
    top = height * box['Top']
    draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],fill=boxColor) 

def DisplayBlockInformation(block):
    print('Id: {}'.format(block['Id']))
    if 'Text' in block:
        print(' Detected: ' + block['Text'])
        print(' Type: ' + block['BlockType'])

    if 'Confidence' in block:
        print(' Confidence: ' + "{:.2f}".format(block['Confidence']) + "%")
    if block['BlockType'] == 'CELL':
        print(" Cell information")
        print(" Column:" + str(block['ColumnIndex']))
        print(" Row:" + str(block['RowIndex']))
        print(" Column Span:" + str(block['ColumnSpan']))
        print(" RowSpan:" + str(block['ColumnSpan']))

    if 'Relationships' in block:
        print(' Relationships: {}'.format(block['Relationships']))
        print(' Geometry: ')
        print(' Bounding Box: {}'.format(block['Geometry']['BoundingBox']))
        print(' Polygon: {}'.format(block['Geometry']['Polygon']))

    if block['BlockType'] == "KEY_VALUE_SET":
        print (' Entity Type: ' + block['EntityTypes'][0])

    if block['BlockType'] == 'SELECTION_ELEMENT':
        print(' Selection element detected: ', end='')
        
        if block['SelectionStatus'] =='SELECTED':
            print('Selected')
        else:
            print('Not selected')

    if 'Page' in block:
        print('Page: ' + block['Page'])
        print()

def process_text_detection(s3_connection, client, document):
    """
    Process text detection on an image.

    :param client: boto3.client
        An instance of the Textract client used to interact with the Textract API.
    :param document: str
        The path to the image document that will be processed for text detection.

    :return: int
        The number of text blocks detected in the image.

    This function reads the image at the specified document path and performs text detection on it using the provided Textract client.
    It returns the number of text blocks detected in the image.
    """
    
    try:
        start_time = datetime.datetime.now()
        s3_object = s3_connection.Object("kadakareerbucket",document)
        s3_response = s3_object.get()
        stream = io.BytesIO(s3_response['Body'].read())
        image=Image.open(stream).convert("RGB")
        ### To display image using PIL ###
        # image_binary = stream.getvalue()
        response = client.analyze_document(Document={'S3Object': {'Bucket': "kadakareerbucket", 'Name': document}}, FeatureTypes=["TABLES", "FORMS", "SIGNATURES"]) 
        
        # response = client.detect_document_text(Document={"Bytes": image.read()})
        status_code = response["ResponseMetadata"]["HTTPStatusCode"]
        if status_code == 200:
            #Get the text blocks
            blocks=response['Blocks']
            width, height = image.size
            print ('Detected Document Text')

            # Create image showing bounding box/polygon the detected lines/text
            for block in blocks:
                DisplayBlockInformation(block)
                draw=ImageDraw.Draw(image)
                # Draw bounding boxes for different detected response objects
                if block['BlockType'] == "KEY_VALUE_SET":
                    if block['EntityTypes'][0] == "KEY":
                        ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'red')
                    else:
                        ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'green')
                if block['BlockType'] == 'TABLE':
                    ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'blue')
                if block['BlockType'] == 'CELL':
                    ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'yellow')
                if block['BlockType'] == 'SELECTION_ELEMENT':
                    if block['SelectionStatus'] =='SELECTED':
                        ShowSelectedElement(draw, block['Geometry'] ['BoundingBox'],width,height, 'blue') 
            
            # print("block_words: ", block_words)
            # Display the image
            image.show()
            # Record the end time
            end_time = datetime.datetime.now()

            # Calculate the processing time
            processing_time = end_time - start_time
            print("Processing Time: ", processing_time)

            # return block_words

        else:
            # API call was not successful, print the status code
            print(f"API call failed with status code: {status_code}")

    except Exception as e:
        # Handle exceptions, if the API call fails
        print(f"An error occurred: {str(e)}")


def main():
    session = boto3.Session(profile_name="kadakareer-dev")
    s3_connection = session.resource('s3') 
    client = session.client("textract", region_name="ap-southeast-2")

    # page_block_words = []

    image_path = "/home/miggy/Desktop/apps/amazon-textract/images/"
    images = sorted(os.listdir(image_path))
    for img in images:
        block_count = process_text_detection(s3_connection, client, img)
        # page_block_words.append(block_count)
    # print("page_block_words: ", page_block_words)


if __name__ == "__main__":
    main()


Detected Document Text
Id: b47ff1a8-e529-4e5f-ac60-2fdfcc2ca59e
 Relationships: [{'Type': 'CHILD', 'Ids': ['b9f1f81d-583a-4e05-94bd-d3b5eb356abf', '9de75b04-bf3d-46e3-8787-8073f1f6bc69', '9961c199-6a8c-422a-aba5-d71547fd41d9', 'd05f4b5e-823a-4a7e-b4bb-5836f79a884f', '3cff6222-f414-46be-99cb-9a308a9b7e26', '251a9ad5-f57f-4be2-b03c-d8e8617834bb', 'dcfa54cc-44db-4c69-99b8-185b6625b135', '9ca539d2-77f7-455c-9f3c-58fc32c7e290', 'ec87a7fc-385a-4bca-abed-b13ca7f55383', '65db680e-1d4c-4387-9374-745a1142a331', '4e7bbed3-9599-4440-bce4-e390178cfd6e', '8032d49a-11d9-4235-8854-f5285525a63f', '3f266dcb-cd83-4e5c-b72e-5d85909925e4', '83b3cf58-c85d-4794-be6e-85fe0f090653', '6e330395-eb98-41db-8938-1ef71311e5b6', '9a848356-10b6-40aa-b053-e701f4d6ef84', '6184b16e-9a0b-4f6b-be2f-09758782b0c7', 'fc2ff4e6-a34c-4b1f-b529-a90e6db149d3', '1d57c170-cd9b-42c9-93fb-5dcfb9c35999', 'c95c929c-0393-4f61-8dda-5f9a724552d5', '97a64a7c-52be-49d0-9257-261e91bdecf8', 'ef94e490-766e-4c57-b02c-8f3bc49713f4', 'a738d941-8a8

/snap/core20/current/lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.29' not found (required by /lib/x86_64-linux-gnu/libproxy.so.1)
Failed to load module: /home/miggy/snap/code/common/.cache/gio-modules/libgiolibproxy.so
eog: symbol lookup error: /snap/core20/current/lib/x86_64-linux-gnu/libpthread.so.0: undefined symbol: __libc_pthread_init, version GLIBC_PRIVATE


KeyboardInterrupt: 

In [29]:
import numpy as np
import os
import pandas as pd

from collections import defaultdict
from PIL import Image, ImageDraw


def get_kv_map(file_name, session, client):
    with open(file_name, "rb") as file:
        img_test = file.read()
        bytes_test = bytearray(img_test)
        response = client.analyze_document(
            Document={"Bytes": bytes_test}, FeatureTypes=["FORMS"]
        )
        # Get the text blocks
        blocks = response["Blocks"]

        # Get key and value maps
        key_map = {}
        value_map = {}
        block_map = {}
        for block in blocks:
            block_id = block["Id"]
            block_map[block_id] = block
            if block["BlockType"] == "KEY_VALUE_SET":
                if "KEY" in block["EntityTypes"]:
                    key_map[block_id] = block
                else:
                    value_map[block_id] = block

        return key_map, value_map, block_map

def get_kv_relationship(key_map, value_map, block_map):
    kvs = defaultdict(list)
    for block_id, key_block in key_map.items():
        value_block = find_value_block(key_block, value_map)
        key = get_text(key_block, block_map)
        val = get_text(value_block, block_map)
        kvs[key].append(val)
    return kvs

def find_value_block(key_block, value_map):
    for relationship in key_block["Relationships"]:
        if relationship["Type"] == "VALUE":
            for value_id in relationship["Ids"]:
                value_block = value_map[value_id]
    return value_block

def get_text(result, blocks_map):
    text = ""
    if "Relationships" in result:
        for relationship in result["Relationships"]:
            if relationship["Type"] == "CHILD":
                for child_id in relationship["Ids"]:
                    word = blocks_map[child_id]
                    if word["BlockType"] == "WORD":
                        text += word["Text"] + " "
                    if word["BlockType"] == "SELECTION_ELEMENT":
                        if word["SelectionStatus"] == "SELECTED":
                            text += "X "

    return text

def main():
    session = boto3.Session(profile_name="kadakareer-dev")
    s3_connection = session.resource("s3")
    client = session.client("textract", region_name="ap-southeast-2")

    combined_data = {}
    image_path = "/home/miggy/Desktop/apps/amazon-textract/images/"
    images = sorted(os.listdir(image_path))
    for img in images:
        key_map, value_map, block_map = get_kv_map(
            image_path+img, session, client
        )
        kvs = get_kv_relationship(key_map, value_map, block_map)
        # Combine data1 into combined_data
        for key, value in kvs.items():
            if key in combined_data.keys():
                combined_data[key] += kvs[key]
            else:
                combined_data[key] = kvs[key]
    
    # Find the maximum column length
    max_len = max(len(combined_data[col]) for col in combined_data)

    # Pad the columns with 'NaN' as needed
    for col in combined_data:
        combined_data[col] += [np.nan] * (max_len - len(combined_data[col]))

    # Convert the dictionary into DataFrame 
    df = pd.DataFrame(combined_data)
    print(df)
    df.to_csv('output.csv', index=False)


if __name__ == "__main__":
    main()


   MIDDLE NAME:     GIVEN NAME:  NO  YES     SURNAME/LAST NAME:  RELIGION:   \
0       GOC-ONG   MIGUI PHILLIP   X                      GALAN,   CATHOLIC    
1      GOC- ONG         PHILLIP   X                GALAN, MIGUI   CATHOLIC    
2      GOC- ONG              NaN  X        GALAN, MIGUI PHILLIP         NaN   
3            NaN             NaN  X                          NaN        NaN   
4            NaN             NaN  X                          NaN        NaN   
..           ...             ...  ..  ...                    ...        ...   
57           NaN             NaN       X                     NaN        NaN   
58           NaN             NaN       X                     NaN        NaN   
59           NaN             NaN       X                     NaN        NaN   
60           NaN             NaN       X                     NaN        NaN   
61           NaN             NaN  X                          NaN        NaN   

   SINGLE  AGE:  SEAMAN'S BOOK NUMBER:  FEMALE   ..