In [8]:
class Document:
    def __init__(self, path):
        """
        The constructor of the Document class, which initializes instance variables.
        """
        self.path = path  # The path of the document
        self.bucket_name = "Name of bucket containing the document is not specified yet!"  # The bucket name
        self.order_num = "Order number of document is not specified yet!"  # The order number
        self.name = "Name of document is not specified yet!"  # The document's name
        self.type = self.get_file_extension()  # The type of the document
        self.is_analysable = self.is_ocr_analysable()  # Checks whether the document can be analyzed with OCR

    def get_file_extension(self):
        """
        Method to get the file extension from the path.
        """
        _, extension = self.path.rsplit('.', 1)  # Split the path by '.' and get the last part
        return '.' + extension.lower()  # Return the extension with a '.' prefix and in lower case

    def is_ocr_analysable(self):
        """
        Method to check whether the document is analyzable with OCR.
        """
        if self.type == ".pdf":  # If the document is a PDF
            return False  # Return False
        elif self.type in [".jpg", ".jpeg", ".png"]:  # If the document is a JPG, JPEG, or PNG
            return True  # Return True
        else:
            # If the document is not any of the supported types, raise a ValueError
            raise ValueError("The imported document has an unsupported file format.")

In [9]:
import boto3
from pdf2image import convert_from_bytes
import io
import pandas as pd

class DocumentProcessor:
    def __init__(self, region_name = 'eu-central-1'):
        """
        The constructor of the DocumentProcessor class.
        """
        self.session = boto3.Session(region_name=region_name)  # Create a session with the specified region
        self.s3_resource = self.session.resource('s3')  # Create an S3 resource from the session
        self.s3_client = self.session.client('s3')  # Create an S3 client from the session
        self.textract = self.session.client('textract')  # Create a Textract client from the session

    def get_bucket_folder_content(self, folder_path, bucket_name = 'bucket-zakariyae-trial'):
        """
        Method to get the content of a folder in an S3 bucket.
        """
        folder_name = folder_path.rstrip('/').split('/')[-1]  # Extract the folder name from the path

        # List the objects in the specified S3 bucket and folder
        response = self.s3_client.list_objects_v2(Bucket = bucket_name, Prefix = folder_path)

        documents = []  # List to store the documents
        if 'Contents' in response:  # If the response contains 'Contents'
            for object in response['Contents']:  # For each object in the response
                object_key = object['Key']  # Get the object key

                if object_key.endswith('/'):  # If the object key is a directory
                    continue  # Skip to the next iteration

                document = Document(object_key)  # Create a Document instance for the object
                document.order_num = folder_name  # Assign the folder name to the order number of the document

                # Extract the document name from the object key and assign it to the name of the document
                document_name = object_key.rsplit('/', 1)[-1].rsplit('.', 1)[0]
                document.name = document_name

                document.bucket_name = bucket_name  # Assign the bucket name to the bucket name of the document

                documents.append(document)  # Append the document to the list
        else:
            print('No objects found in the specified bucket and folder.')
        return documents  # Return the list of documents

    def file_converter(self, document):
        """
        Method to convert a PDF document to PNG images.
        """
        # Fetch the document from the S3 bucket
        response = self.s3_client.get_object(Bucket = document.bucket_name, Key = document.path)

        # Convert the document to images
        images = convert_from_bytes(response['Body'].read())

        return images  # Return the images

    def analyze_document(self, document):
        """
        Method to analyze a document using AWS Textract.
        """
        if document.type in [".jpg", ".jpeg", ".png"]:  # If the document is a JPG, JPEG, or PNG
            # Analyze the document using AWS Textract
            response = self.textract.analyze_document(
                Document={'S3Object': {'Bucket': document.bucket_name, 'Name': document.path}},
                FeatureTypes=["TABLES", "FORMS", "SIGNATURES"])
            return response  # Return the response

        elif document.type == ".pdf":  # If the document is a PDF
            png_files = self.file_converter(document)  # Convert the PDF to PNG images

            responses = []  # List to store the responses
            for png_file in png_files:  # For each PNG image
                byte_stream = io.BytesIO()  # Create a byte stream
                png_file.save(byte_stream, format='PNG')  # Save the PNG image to the byte stream
                byte_stream = byte_stream.getvalue()  # Get the byte data from the byte stream

                # Analyze the PNG image using AWS Textract
                response = self.textract.analyze_document(Document={'Bytes': byte_stream}, 
                                                          FeatureTypes=["TABLES", "FORMS", "SIGNATURES"])
                responses.append(response)  # Append the response to the list

            return responses  # Return the list of responses

        else:
            # If the document is not a supported type, raise a ValueError
            raise ValueError(f'The document "{document.name}" has an unsupported file format.')
    
    def extract_data(self, responses):
        """
        Extract data from the response of the AWS Textract API.
        """
        if isinstance(responses, dict):
            responses = [responses]  # If there is only one response, convert it to a list for the loop below

        tables = []  # List to hold extracted tables
        forms = []   # List to hold extracted form key-value pairs
        signatures = []  # List to hold extracted signatures
        lines = []  # List to hold extracted lines
        words = []  # List to hold extracted words

        # Loop over each response
        for response in responses:
            # Iterate through blocks in the response
            for block in response['Blocks']:
                block_type = block['BlockType']

                # Extract tables
                if block_type == 'TABLE':
                    table = {}  # Dictionary to hold table data
                    cells = []  # List to hold cell data

                    # Check if block has relationships
                    if 'Relationships' in block:
                        # Loop over each relationship
                        for relationship in block['Relationships']:
                            # Check if the relationship is of type 'CHILD'
                            if relationship['Type'] == 'CHILD':
                                # Loop over each ID in the relationship
                                for cell_id in relationship['Ids']:
                                    # Find the block with the matching ID
                                    cell_block = [b for b in response['Blocks'] if b['Id'] == cell_id][0]
                                    # Extract cell data
                                    cell = self._extract_cell_data(cell_block, response)
                                    # Append cell data to cells list
                                    cells.append(cell)
                    # Add cells to the table dictionary
                    table['Cells'] = cells
                    # Append the table to the tables list
                    tables.append(table)

                # Extract forms (key-value pairs)
                elif block_type == 'KEY_VALUE_SET':
                    key_value = {}  # Dictionary to hold key-value pair
                    # Check if block has entity types
                    if 'EntityTypes' in block:
                        # Extract keys
                        if 'KEY' in block['EntityTypes']:
                            key = ''  # String to hold the key
                            # Check if block has relationships
                            if 'Relationships' in block:
                                # Loop over each relationship
                                for relationship in block['Relationships']:
                                    # Check if the relationship is of type 'CHILD'
                                    if relationship['Type'] == 'CHILD':
                                        # Loop over each ID in the relationship
                                        for word_id in relationship['Ids']:
                                            # Find the block with the matching ID
                                            word = [b for b in response['Blocks'] if b['Id'] == word_id][0]
                                            # Append the text of the word to the key
                                            key += word.get('Text', '') + ' '
                            # Remove trailing whitespace and add the key to the key_value dictionary
                            key_value['Key'] = key.strip()
                            
                        # Extract values
                        elif 'VALUE' in block['EntityTypes']:
                            value = ''  # String to hold the value
                            # Check if block has relationships
                            if 'Relationships' in block:
                                # Loop over each relationship
                                for relationship in block['Relationships']:
                                    # Check if the relationship is of type 'CHILD'
                                    if relationship['Type'] == 'CHILD':
                                        # Loop over each ID in the relationship
                                        for word_id in relationship['Ids']:
                                            # Find the block with the matching ID
                                            word = [b for b in response['Blocks'] if b['Id'] == word_id][0]
                                            # Append the text of the word to the value
                                            value += word.get('Text', '') + ' '
                            # Remove trailing whitespace and add the value to the key_value dictionary
                            key_value['Value'] = value.strip()
                            
                    # Add the key-value pair to the forms list if it contains data
                    if key_value:  
                        forms.append(key_value)
                        
                # Extract signatures
                elif block_type == 'SELECTION_ELEMENT':
                    if 'SelectionStatus' in block:
                        if block['SelectionStatus'] == 'SELECTED':
                            # Add the bounding box of the signature to the signatures list
                            signatures.append(block['Geometry']['BoundingBox'])
                
                # Extract lines
                elif block_type == 'LINE':
                    # Append a tuple of the line text and its bounding box top position
                    lines.append((block['Text'], block['Geometry']['BoundingBox']['Top']))

                # Extract words
                elif block_type == 'WORD':
                    # Append a tuple of the word text and its bounding box top position
                    words.append((block['Text'], block['Geometry']['BoundingBox']['Top']))
                    
        return tables, forms, signatures, lines, words

    def _extract_cell_data(self, cell_block, response):
        """
        Helper method to extract cell data from a cell block.
        """
        # Initialize a dictionary to hold the cell data
        cell = {"RowIndex": cell_block['RowIndex'], "ColumnIndex": cell_block['ColumnIndex'], "Text": ""}
        # Check if the cell block has relationships
        if 'Relationships' in cell_block:
            # Loop over each relationship in the cell block
            for relationship in cell_block['Relationships']:
                # Check if the relationship is of type 'CHILD'
                if relationship['Type'] == 'CHILD':
                    # Loop over each ID in the relationship
                    for word_id in relationship['Ids']:
                        # Find the block with the matching ID
                        word_block = [b for b in response['Blocks'] if b['Id'] == word_id][0]
                        # Append the text of the word to the cell's text
                        cell['Text'] += word_block.get('Text', '') + ' '
        # Remove trailing whitespace from the cell's text
        cell['Text'] = cell['Text'].strip()
        return cell

    def tables_to_excel(self, tables, file_name):
        """
        Save extracted tables to an Excel file.
        """
        # Open an ExcelWriter object
        with pd.ExcelWriter(file_name) as writer:
            # Loop over each table in the tables list
            for i, table_data in enumerate(tables):
                # Find the maximum row and column indices to determine the size of the DataFrame
                max_row_index = max(cell['RowIndex'] for cell in table_data['Cells'])
                max_col_index = max(cell['ColumnIndex'] for cell in table_data['Cells'])

                # Create an empty DataFrame with the appropriate size
                df = pd.DataFrame('', index=range(1, max_row_index + 1), columns=range(1, max_col_index + 1))

                # Fill the DataFrame with the cell data
                for cell in table_data['Cells']:
                    df.at[cell['RowIndex'], cell['ColumnIndex']] = cell['Text']

                # Write the DataFrame to the Excel file
                df.to_excel(writer, sheet_name=f'Table {i}')

    def forms_to_df(self, forms):
        """
        Convert extracted forms to a DataFrame.
        """
        # Dictionary to hold the keys and values
        forms_dict = {}

        # Variable to hold the current key
        current_key = None

        # Iterate over each dictionary in the forms list
        for form in forms:
            # Check if 'Key' is in the dictionary
            if 'Key' in form:
                # Store the key in current_key variable
                current_key = form['Key']
            # Check if 'Value' is in the dictionary
            elif 'Value' in form:
                # Add the value to the forms_dict under the current key
                forms_dict[current_key] = form['Value']

        # Convert the forms_dict into a dataframe and transpose it for better view
        df = pd.DataFrame(forms_dict, index=[0]).T.reset_index()

        # Rename the columns to 'Key' and 'Value'
        df.columns = ['Key', 'Value']

        # Return the dataframe
        return df

    def process_document(self, document):
        """
        Method to process a document.
        Process here means analyzing the document using Textract,
        and then extracting the useful information (tables, forms, signatures)
        from the response.
        """
        document.ocr_responses = self.analyze_document(document)  # Analyze the document
        extracted_data = self.extract_data(document.ocr_responses)  # Extract the data from the analysis response
        document.tables = extracted_data[0]  # Assign the extracted tables to the document instance
        document.forms = self.forms_to_df(extracted_data[1])  # Assign the extracted forms to the document instance
        document.signatures = extracted_data[2]  # Assign the extracted signatures to the document instance
        document.lines = self.group_and_order_text(extracted_data[3])  # Group and order the lines
        document.words = self.group_and_order_text(extracted_data[4])  # Group and order the words
        return document  # Return the document

    def process_documents(self, documents):
        """
        Method to process multiple documents.
        """
        processed_documents = []  # List to store the processed documents
        for document in documents:  # For each document in the list
            try:
                processed_document = self.process_document(document)  # Try to process the document
                processed_documents.append(processed_document)  # If successful, append the processed document to 
                                                                # the list
            except ValueError as e:  # If a ValueError is raised in process_document
                print(e)  # Print the error message
        return processed_documents  # Return the list of processed documents
    
    def group_and_order_text(self, text_data):
        """
        Group and order lines or words by their top position.
        """
        # Sort the text data by the top position
        sorted_text_data = sorted(text_data, key=lambda x: x[1])

        # Initialize the first group with the first element
        groups = [[sorted_text_data[0]]]

        # Group the elements
        for i in range(1, len(sorted_text_data)):
            # If the top position of the current element is close to the last element of the last group, add it to the group
            if abs(sorted_text_data[i][1] - groups[-1][-1][1]) < 0.01:  # Adjust the threshold as needed
                groups[-1].append(sorted_text_data[i])
            # Otherwise, start a new group with the current element
            else:
                groups.append([sorted_text_data[i]])

        # Convert the groups to strings of text
        grouped_text = [' '.join([word for word, _ in group]) for group in groups]

        return grouped_text


In [10]:
processor = DocumentProcessor()

# Process a batch of documents
documents = processor.get_bucket_folder_content('orders/4969182449/')
processor.process_documents(documents)
    

[<__main__.Document at 0x7f32d2880f10>,
 <__main__.Document at 0x7f32d2880fa0>,
 <__main__.Document at 0x7f32d2880f40>,
 <__main__.Document at 0x7f32d2880e50>,
 <__main__.Document at 0x7f32d2880f70>,
 <__main__.Document at 0x7f32d2880dc0>,
 <__main__.Document at 0x7f32d2880ca0>]

In [12]:
#documents[0] is the first document in the folder of the order:4969182449

#accessing raw text extracted from the doc
lines = documents[0].lines

#accessing forms extracted from the doc as a dataframe of two columns (Keys and Values)
forms = documents[0].forms

#accessing signatures extracted from the doc
signatures = documents[0].signatures

#accessing tables extracted from the doc
tables = documents[0].tables

#converting the tables extractaed to an excel format and saving them
processor.tables_to_excel(tables, "DUM_table.xlsx")

In [13]:
lines

['CERTIFICAT DE CIRCULATION DES MARCHANDISES',
 '1. Exportateur (nom agresse complete. pays) EUR.1 N° A 8003927',
 'TARGANINE',
 'N°33 BLOC 3 RUE DE MARRAKECH QI AGADIR Consulter les notes ao verso avant de remplir le formulaire',
 '2. Certificat utilisé dans les échanges préférentiels entre',
 'LE ROYAUME DU MAROC 3. Destinataire (nom, agresse complète. pays) (mention facultative',
 'et',
 'UE BASF Beauty Care',
 'FRANCE',
 'undiquer les pavs groupes de pays ou territoires concernes)',
 '5. Pays. groupe de pays ou 4. Pays. groupe de pays ou',
 'territoire de destination territoire dont les produits',
 'sont consideres comme FRANCE originaires, MAROC',
 '6. Informations relatives ou transport (mention facultative] 7. Observations',
 "8. N° d'ordre. marques. numéros, nombre et nature des colis (1). désignation des 10. Factures 9. Masse brute marchandises (mention (kg) ou autre facultative) mesure",
 'adique 4 1, m ², etc 1 numbre doom',
 'ME60/22 8685.000 9 COLIS',
 "9000 L HUILE D'ARGA

In [14]:
forms

Unnamed: 0,Key,Value
0,"3. Destinataire (nom, agresse complète. pays) ...",BASF Beauty Care FRANCE
1,7. Observations,
2,6. Informations relatives ou transport (mentio...,
3,1. Exportateur (nom agresse complete. pays),TARGANINE N°33 BLOC 3 RUE DE MARRAKECH QI AGADIR
4,4. Pays. groupe de pays ou territoire dont les...,MAROC
5,5. Pays. groupe de pays ou territoire de desti...,FRANCE
6,Bureau de douane,
7,2. Certificat utilisé dans les échanges préfér...,LE ROYAUME DU MAROC
8,Pays ou territoire de delivrance,LE ROYAUME.DU AAROC
9,Port de Tangér-,19/02/2022


In [15]:
signatures

[]

In [16]:
tables

[{'Cells': [{'RowIndex': 1,
    'ColumnIndex': 1,
    'Text': "8. N° d'ordre. marques. numéros, nombre et nature des colis (1). désignation des marchandises"},
   {'RowIndex': 1,
    'ColumnIndex': 2,
    'Text': '9. Masse brute (kg) ou autre mesure 1, m ², etc 1'},
   {'RowIndex': 1,
    'ColumnIndex': 3,
    'Text': '10. Factures (mention facultative)'},
   {'RowIndex': 2, 'ColumnIndex': 1, 'Text': "9 COLIS 9000 L HUILE D'ARGAN"},
   {'RowIndex': 2, 'ColumnIndex': 2, 'Text': '8685.000 KG'},
   {'RowIndex': 2, 'ColumnIndex': 3, 'Text': 'ME60/22 15/02/2022'}]}]