In [None]:
!pip install rapidfuzz
!pip install boto3



In [None]:
import boto3
import time
import itertools
import pandas as pd
import numpy as np
from datetime import datetime
from rapidfuzz import fuzz
import json

runtime = True

# **Paths and Variables**


In [None]:
access_key_id = your_access_key_id
secret_access_key = your_access_key
aws_region = "us-east-2"
bucket_name = "talebtext"
bucket_key = "test.pdf"
file_path = "/content/no_blank_image_table_sample-1.pdf"

# **Upload File on S3 Bucket**
Below two cells are used to upload your pdf file on AWS S3 Bucket.
Note: To run AWS Tesseract, it is necessary to upload files on S3 Bucket.

**Run this code (below 2 cells) only once to upload file.**

In [None]:
with open(file_path, "rb") as f:
  bytes_read = f.read()

In [None]:
s3_client = boto3.client(
    's3',
    aws_access_key_id= access_key_id,
    aws_secret_access_key= secret_access_key,
    region_name=aws_region
)

s3_client.put_object(Body=bytes_read,
                  Bucket=bucket_name,
                  Key=bucket_key)
print("File Uploaded Successfully.")

File Uploaded Successfully.


# **Textract Service**

In [None]:
#Initialze AWS Tesseract service object using boto3 client
textract_client = boto3.client(
    'textract',
    aws_access_key_id= access_key_id,
    aws_secret_access_key= secret_access_key,
    region_name=aws_region
)

In [None]:
def start_job():
  # This method starts the document analysis asynchronously and returns a job id in response.
    try:
        response = textract_client.start_document_analysis(
            DocumentLocation={
                'S3Object': {
                    'Bucket': bucket_name,
                    'Name': bucket_key
                }},
            FeatureTypes=['TABLES'])
        return response["JobId"]
    except Exception as e:
        print("Failed to start job: {}".format(e))

def is_job_complete(job_id):
  # This method checks the status of the job submitted to AWS Tesseract for analysis using the JobID.
    try:
        time.sleep(1)
        response = textract_client.get_document_analysis(JobId=job_id)
        status = response["JobStatus"]
        print("Job status: {}".format(status))

        while status == "IN_PROGRESS":
            time.sleep(1)
            response = textract_client.get_document_analysis(JobId=job_id)
            status = response["JobStatus"]
            print("Job status: {}".format(status))

        return status
    except Exception as e:
        print("Failed to get job status: {}".format(e))

def get_job_results(job_id):
  #This method fetches the results from the successful AWS Tesseract Job.
    try:
        pages: list = []
        time.sleep(1)
        response = textract_client.get_document_analysis(JobId=job_id)
        pages.append(response)
        print("Paginated resultset received: {}".format(len(pages)))
        next_token = None
        if 'NextToken' in response:
            next_token = response['NextToken']

        while next_token:
            time.sleep(1)
            response = textract_client.\
                get_document_analysis(JobId=job_id, NextToken=next_token)
            pages.append(response)
            print("Result set page received: {}".format(len(pages)))
            next_token = None
            if 'NextToken' in response:
                next_token = response['NextToken']

        return pages
    except Exception as e:
        print("Failed to get job result: {}".format(e))

def submit_job():
  #This method initiates the AWS Job and and collects result.
    try:
        response: list = []
        job_id = start_job()
        print("Started job with id: {}".format(job_id))
        if is_job_complete(job_id):
            result = get_job_results(job_id)
            response.append(result)

        return response
    except Exception as e:
        print("Failed to submit job to textract: {}".format(e))

In [None]:
result = submit_job()


Started job with id: 4a1a62b49ead786f867e8b6a1b143840c95beea0a105f3e33b0574f1c6374ee6
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
Paginated resultset received: 1
Result set page received: 2


In [None]:
#Store result in File
json_file_path = "tesseract_result.json"

# Write the JSON object to the file
import json
with open(json_file_path, 'w') as f:
    json.dump(result, f)

print("JSON data has been stored in the file:, please download/save it on your local system for later use.", file_path)

JSON data has been stored in the file:, please download/save it on your local system for later use. /content/no_blank_image_table_sample-1.pdf


In [None]:
result

[[{'DocumentMetadata': {'Pages': 4},
   'JobStatus': 'SUCCEEDED',
   'NextToken': 'p3StR2NSiBmeqocxSUQfKDk+wWKfC5G7d4XYjqHMhmO9B2HHBfr2mzVu76Gt1j0w4J76JMVMg2jjW5s+sfni9c+wovbJf+v1+UUC2+NDOhujUr5oLkGN0hxpFJUg2gbAl2fEl2w=',
   'Blocks': [{'BlockType': 'PAGE',
     'Geometry': {'BoundingBox': {'Width': 1.0,
       'Height': 1.0,
       'Left': 0.0,
       'Top': 0.0},
      'Polygon': [{'X': 0.0, 'Y': 9.629952835155109e-09},
       {'X': 1.0, 'Y': 0.0},
       {'X': 1.0, 'Y': 1.0},
       {'X': 0.0, 'Y': 1.0}]},
     'Id': 'c6b32617-23c2-42a2-95c6-2bb73f400297',
     'Relationships': [{'Type': 'CHILD',
       'Ids': ['148a2a41-be74-441a-987d-7cc89a3060a0',
        '5e776070-e253-4ce1-bc8b-bba46c32a865',
        'cc4c2f75-d8c8-4a89-ac01-1b2c6773e694',
        'f20e21b7-170b-4fd1-b330-b3d397f49f18',
        '4a3042e8-9ca4-4c41-a606-39b0bebb596b',
        'ba1297fd-f4bc-4a81-8955-cb46dde3a88d',
        '354f8ae2-e8d6-4891-add1-d9309b9192a0',
        'ef439a34-0411-4c86-9da3-a847ec834073',
  

In [None]:
json_file_path = "tesseract_result.json"
f = open(json_file_path, 'r')

# returns JSON object as a dictionary
result = json.load(f)

# **Transformation**

After getting test result from AWS Tesseract, below code transforms it into the desired format.
At the end of code a CSV File is generated.

In [None]:
def get_text(words, lines):
    symbols, boxes = list(), list()
    for word in words.values():
        symbol = word["Text"]
        if symbol and len(symbol) > 0.5:
            symbols.append(symbol)
            boxes.append(word["Geometry"]["BoundingBox"])

    text = list()
    for line in lines.values():
        text.append(line["Text"])
    return symbol, boxes, text

def get_children_ids(block):
    for rels in block.get('Relationships', []):
        if rels['Type'] == 'CHILD':
            yield from rels['Ids']

def map_blocks(blocks, block_type):
    return {
        block['Id']: block
        for block in blocks
        if block['BlockType'] == block_type
    }

def get_tables_df(tables, cells, words, selections):
    dataframes = []
    for table in tables.values():
        # Determine all the cells that belong to this table
        table_cells = [cells[cell_id] for cell_id in get_children_ids(table)]
        # Determine the table's number of rows and columns
        n_rows = max(cell['RowIndex'] for cell in table_cells)
        n_cols = max(cell['ColumnIndex'] for cell in table_cells)
        content = [[None for _ in range(n_cols)] for _ in range(n_rows)]

        # Fill in each cell
        for cell in table_cells:
            cell_contents = [
                words[child_id]['Text']
                if child_id in words
                else selections[child_id]['SelectionStatus']
                for child_id in get_children_ids(cell)
            ]
            i = cell['RowIndex'] - 1
            j = cell['ColumnIndex'] - 1
            content[i][j] = ' '.join(cell_contents)

        # We assume that the first row corresponds to the column names
        dataframe = pd.DataFrame(content[0:], columns=content[0])
        dataframes.append([dataframe, table["Page"]])
    return dataframes

In [None]:
def transform_dataset(df):
    values_to_filter = ['Run hours', 'Consumption']
    # Select rows based on values in a list
    df = df[df['Details'].isin(values_to_filter)]
    # print(df[df['Details'].isin(values_to_filter)])
    system = [1, 1, 2, 2, 3, 3, 4, 4]
    df["System"] = system[:len(df)]
    # print( system[:len(df)])
    df = df[["System", "Details", "3rd", "1st", "2nd"]]
    pivot_df = df.pivot(index='System', columns='Details').swaplevel(0, 1, 1).sort_index(axis=1)
    single_row_df = pivot_df.unstack().to_frame().transpose()
    single_row_df.columns = ['_'.join(map(str, col)) for col in single_row_df.columns]
    new_cols = ['Run hours_3rd_1', 'Consumption_3rd_1', 'Run hours_1st_1', 'Consumption_1st_1', 'Run hours_2nd_1',
                'Consumption_2nd_1', 'Run hours_3rd_2', 'Consumption_3rd_2', 'Run hours_1st_2', 'Consumption_1st_2',
                'Run hours_2nd_2', 'Consumption_2nd_2', 'Run hours_3rd_3', 'Consumption_3rd_3', 'Run hours_1st_3',
                'Consumption_1st_3', 'Run hours_2nd_3', 'Consumption_2nd_3', 'Run hours_3rd_4', 'Consumption_3rd_4',
                'Run hours_1st_4', 'Consumption_1st_4', 'Run hours_2nd_4', 'Consumption_2nd_4']
    single_row_df = single_row_df.reindex(columns = new_cols)
    single_row_df = single_row_df.fillna(0)
    return single_row_df

In [None]:
import re

def clean_and_convert_dates(date_list):
    cleaned_dates = []
    for sublist in date_list:
        date_str = ' '.join(sublist[1:])
        date = date_str.replace('.',' ')
        date = date.split(" ")
        while('' in date):
          date.remove('')
        date = f"{date[0]} {date[1]} {date[2]}"
        cleaned_date = datetime.strptime(date, "%d %b %Y").date()
        cleaned_dates.append(cleaned_date)

    return cleaned_dates

def extract_date(word_list, start_token, stop_token):
    is_extraction_started = False
    extracted_words = []

    for word in word_list:
        if fuzz.ratio(word.lower(), start_token) >= 80:
            is_extraction_started = True
        elif fuzz.ratio(word.lower(), stop_token) >= 80:
            is_extraction_started = False

        if is_extraction_started:
            extracted_words.append(word)

    return extracted_words

def get_date(words, total_pages):
    uncleaned_dates = []

    for value in range(total_pages):
        selected_arrays = [sub_array for sub_array in words if len(sub_array) > 1 and sub_array[1] == value+1]
        first_elements = [sub_array[0] for sub_array in selected_arrays]
        uncleaned_dates.append(extract_date(first_elements, "date", "operator"))

    return clean_and_convert_dates(uncleaned_dates)

In [None]:
master_df = pd.DataFrame(columns = ['Run hours_3rd_1', 'Consumption_3rd_1', 'Run hours_1st_1', 'Consumption_1st_1', 'Run hours_2nd_1',
                'Consumption_2nd_1', 'Run hours_3rd_2', 'Consumption_3rd_2', 'Run hours_1st_2', 'Consumption_1st_2',
                'Run hours_2nd_2', 'Consumption_2nd_2', 'Run hours_3rd_3', 'Consumption_3rd_3', 'Run hours_1st_3',
                'Consumption_1st_3', 'Run hours_2nd_3', 'Consumption_2nd_3', 'Run hours_3rd_4', 'Consumption_3rd_4',
                'Run hours_1st_4', 'Consumption_1st_4', 'Run hours_2nd_4', 'Consumption_2nd_4'])
blocks = []
dates = []
total_pages = 0
tables = {}
cells = {}
words = {}
lines = {}
selections = {}
for page in result[0]:
    blocks.append(page["Blocks"])
    total_pages = page["DocumentMetadata"]["Pages"]

blocks = list(itertools.chain.from_iterable(blocks))

tables.update(map_blocks(blocks, 'TABLE'))
cells.update(map_blocks(blocks, 'CELL'))
words.update(map_blocks(blocks, 'WORD'))
lines.update(map_blocks(blocks, 'LINE'))
selections.update(map_blocks(blocks, 'SELECTION_ELEMENT'))

page_words = []
for word in words.values():
    page_words.append([word["Text"], word["Page"]])

dates = get_date(page_words, total_pages)
# print(dates)

dataframes = get_tables_df(tables=tables,
                           cells=cells,
                           words=words,
                           selections=selections)
# print(dataframes)
df_new = pd.DataFrame(columns=["Details", "3rd", "1st", "2nd"])


page_dataframes = {}

# Concatenate dataframes of the same page
for dataframe, page in dataframes:
    if len(dataframe.columns)!=4:
        continue
    # print(dataframe)
    # print(page)
    dataframe = pd.DataFrame(np.concatenate([df_new.values, dataframe.values], axis=0), columns=df_new.columns)
    if page not in page_dataframes:
        page_dataframes[page] = dataframe
    else:
        page_dataframes[page] = pd.concat([page_dataframes[page], dataframe])
# Transform each page's dataframe as a single row and append it to the master dataframe.
for page, dataframe in page_dataframes.items():
    single_row_dataset = transform_dataset(dataframe)
    master_df = pd.DataFrame(np.concatenate([master_df.values, single_row_dataset.values], axis=0),
                             columns=master_df.columns)

master_df["Date"] = dates
master_df = master_df[['Date', 'Run hours_3rd_1', 'Consumption_3rd_1', 'Run hours_1st_1', 'Consumption_1st_1', 'Run hours_2nd_1',
                'Consumption_2nd_1', 'Run hours_3rd_2', 'Consumption_3rd_2', 'Run hours_1st_2', 'Consumption_1st_2',
                'Run hours_2nd_2', 'Consumption_2nd_2', 'Run hours_3rd_3', 'Consumption_3rd_3', 'Run hours_1st_3',
                'Consumption_1st_3', 'Run hours_2nd_3', 'Consumption_2nd_3', 'Run hours_3rd_4', 'Consumption_3rd_4',
                'Run hours_1st_4', 'Consumption_1st_4', 'Run hours_2nd_4', 'Consumption_2nd_4']]

master_df = master_df.fillna('').replace('', 0)
master_df.to_csv("Final_Master_dataset.csv", index=False)