In [1]:
from unstructured.partition.pdf import partition_pdf
from collections import Counter
import json
import re
import pprint as pp
from typing import Dict, Any
from datetime import datetime
import os

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
filepath1 = 'data/test/eg_annotated.pdf'

# Read the PDF, this will be used all the way
elements = partition_pdf(filepath1, strategy="hi_res")
print(len(elements))

# Display element types
display(Counter(type(element) for element in elements))


147


Counter({unstructured.documents.elements.ListItem: 60,
         unstructured.documents.elements.Title: 34,
         unstructured.documents.elements.NarrativeText: 30,
         unstructured.documents.elements.Text: 8,
         unstructured.documents.elements.Header: 7,
         unstructured.documents.elements.FigureCaption: 3,
         unstructured.documents.elements.Table: 3,
         unstructured.documents.elements.Image: 2})

In [3]:
element_dict = [el.to_dict() for el in elements]
print(json.dumps(
    element_dict, indent = 2
))

[
  {
    "type": "Title",
    "element_id": "854b8bd6f649a536432802f3b54ff6ea",
    "text": "Template intercompany loan agreement",
    "metadata": {
      "detection_class_prob": 0.7497600317001343,
      "coordinates": {
        "points": [
          [
            401.8135681152344,
            227.91619873046875
          ],
          [
            401.8135681152344,
            252.63912963867188
          ],
          [
            972.1865844726562,
            252.63912963867188
          ],
          [
            972.1865844726562,
            227.91619873046875
          ]
        ],
        "system": "PixelSpace",
        "layout_width": 1700,
        "layout_height": 2200
      },
      "last_modified": "2024-11-18T20:05:16",
      "filetype": "application/pdf",
      "languages": [
        "eng"
      ],
      "page_number": 1,
      "file_directory": "data/test",
      "filename": "eg_annotated.pdf"
    }
  },
  {
    "type": "NarrativeText",
    "element_id": "12283d53c

In [4]:
unique_types = set()
for item in element_dict:
    unique_types.add(item['type'])
print(unique_types)

{'Table', 'FigureCaption', 'UncategorizedText', 'Header', 'Image', 'NarrativeText', 'Title', 'ListItem'}


## Code sections

In [5]:
# Extracting metadata from pdf sections

def extract_document_type(elements):
    """Extract document type from the first title"""
    for element in elements:
        if element.get("type") == "Title" and element.get("text"):
            # Get the first title text
            title = element.get("text").lower()
            if "template" in title:
                # Remove 'template' and clean up
                title = title.replace("template", "").strip()
            # Capitalize first letter of each word
            return " ".join(word.capitalize() for word in title.split())
    return "Unknown" 
    
PATTERNS = {
    'contact': {
        'name': r'Contact Name\s+(.*?)(?=\s+Company|$)',
        'address': r'Address\s+(.*?)(?=\s+Email|$)',
        'email': r'Email address\s+(.*?)(?=$|\s)',
        'title': r'Title\s+(.*?)(?=\s+|$)'
    },
    'company': {
        'name': r'^(.*?),\s*company\s*number',
        'number': r'company\s*number\s*([^,]+)',
        'jurisdiction': r'incorporated\s*in\s*([^\s]+)',
        'office': r'registered\s*office\s*is\s*at\s*([^(]+)'
    },
    'loan': {
        # Modified to look specifically after "Loan" or "$"
        'principal': r'(?:Loan\s*\$|\$)\s*(\d{1,3}(?:,\d{3})*(?:\.\d+)?|\d+(?:\.\d+)?)',
        
        'currency': r'(?:to\s+)?(SGD|USD|EUR|GBP|THB)',
        
        # Modified to be more specific about where to find the interest rate
        'interest_rate': r'Interest Rate\s+(\d+\.?\d*)',
        
        'drawdown_date': r"Drawdown Date\s+(.*?)(?=\.|$)",
        'repayment': r'Repayment of Loan:\s*(.*?)(?=\.|$)'
        # 'repayment': r'repay[^.]+within\s+(\d+)\s+days\s+after[^.]+demand'
    }
}

def extract_with_pattern(text, pattern, default=''):
    """Generic pattern extraction function with error handling"""
    try:
        match = re.search(pattern, text, re.IGNORECASE)
        return match.group(1).strip() if match else default
    except Exception as e:
        print(f"Error extracting pattern {pattern}: {e}")
        return default

def extract_contact_details(text):
    """Extract contact details from table text"""
    return {
        field: extract_with_pattern(text, pattern)
        for field, pattern in PATTERNS['contact'].items()
    }

def extract_company_details(text):
    """Extract company details using regex patterns"""
    return {
        "name": extract_with_pattern(text, PATTERNS['company']['name']),
        "companyNumber": extract_with_pattern(text, PATTERNS['company']['number']),
        "jurisdiction": extract_with_pattern(text, PATTERNS['company']['jurisdiction']),
        "registeredOffice": extract_with_pattern(text, PATTERNS['company']['office'])
    }

def extract_signatures(elements):
    """Extract signature information as a dictionary of party type to title."""
    signatures = {}
    start_marker = "Signature of authorised signatory"
    end_marker = "Print full name of authorised"
    signature_count = 0
    
    for i, element in enumerate(elements):
        if (element.get("type") in ["FigureCaption", "NarrativeText"] and
            element.get("text") == start_marker):
            
            for j in range(i + 1, len(elements)):
                current = elements[j]
                if current.get("text") == end_marker:
                    break
                    
                if (current.get("type") == "NarrativeText" and 
                    "," in current.get("text", "")):
                    name, title = current.get("text").split(",", 1)
                    party_type = "lender" if signature_count == 0 else "borrower"
                    signatures[party_type] = title.strip()
                    signature_count += 1
                    break
    # # Debug print to see what we're finding
    # print("Found signatures:", signatures)
    return signatures

def extract_loan_details(text):
    """Extract loan details from table text with fixed pattern matching"""
    # Extract interest rate
    interest_rate_match = re.search(PATTERNS['loan']['interest_rate'], text, re.IGNORECASE)
    interest_rate = None
    if interest_rate_match:
        try:
            interest_rate = float(interest_rate_match.group(1))
        except ValueError:
            print(f"Error converting interest rate: {interest_rate_match.group(1)}")
    
    # Extract principal amount - now looks specifically for amount after "Loan $" or just "$"
    principal_match = re.search(PATTERNS['loan']['principal'], text)
    principal_amount = None
    if principal_match:
        principal_str = principal_match.group(1).replace(',', '')
        try:
            principal_amount = float(principal_str)
        except ValueError:
            print(f"Error converting principal amount: {principal_str}")
            
    # If principal not found with first pattern, try alternative pattern for just the number
    if principal_amount is None:
        alt_principal_match = re.search(r'(?:^|\s)(2,000,000)(?:\s|$)', text)
        if alt_principal_match:
            principal_amount = float(alt_principal_match.group(1).replace(',', ''))
    
    drawdown_days = extract_with_pattern(text, PATTERNS['loan']['drawdown_date'])
    
    # Extract repayment terms
    repayment_terms = extract_with_pattern(text, PATTERNS['loan']['repayment'])
    
    return {
        'principalAmount': principal_amount,
        'interestRate': interest_rate,
        'drawdownDate': f"{drawdown_days} Business Days after agreement date" if drawdown_days else "Unknown",
        'repaymentTerm': repayment_terms  # Add this line
    }


def create_loan_terms(element_dict):
    """Create loan terms with fixed extraction"""
    # Get relevant text containing loan details
    table_text = next((x['text'] for x in element_dict 
                      if x['type'] == 'Table' and ('per annum' in x['text'] or 'Interest Rate' in x['text'])), "")
    
    # Add this to also look for repayment terms in narrative text
    repayment_text = next((x['text'] for x in element_dict 
                          if x['type'] == 'ListItem' and 'Repayment of Loan' in x['text']), "")
    
    currency_text = next((x['text'] for x in element_dict 
                         if x['type'] == 'ListItem' and 
                         ('$' in x['text'] or 'currency' in x['text'].lower())), "")
    
    loan_details = extract_loan_details(table_text)
    currency = extract_with_pattern(currency_text, PATTERNS['loan']['currency']) or 'UNKNOWN'
    
    # Try to get repayment terms from narrative text if not found in table
    repayment_term = (loan_details.get('repaymentTerm') or 
                     extract_with_pattern(repayment_text, PATTERNS['loan']['repayment']) or 
                     "Within 30 days after written demand from Lender")

    return {
        'loanTerms': {
            'principalAmount': loan_details.get('principalAmount'),
            'currency': currency,
            'interestRate': loan_details.get('interestRate'),
            'drawdownDate': loan_details.get('drawdownDate'), 
            'repaymentTerm': repayment_term,  # Updated this line
            'interestPayment': {
                'frequency': 'annually',
                'compounding': True,
                'paymentDate': 'date of repayment of the Loan'
            }
        }
    }

# Process contact tables
def process_contact_tables(element_dict):
    """Process contact tables and return party information"""
    parties = {'lender': {'contact': {}}, 'borrower': {'contact': {}}}
    contact_tables = [x for x in element_dict if x['type'] == 'Table' 
                     and 'contact' in x['text'].lower()]
    
    for table in contact_tables:
        party_type = 'lender' if 'LENDER' in table['text'].upper() else 'borrower'
        parties[party_type]['contact'] = extract_contact_details(table['text'])
    
    return parties

def extract_events_of_default(elements):
    """
    Extract Events of Default clauses from the document elements
    Args:
        elements: List of document elements containing text and type information
    Returns:
        List of event of default clauses
    """
    events_of_default = []
    is_events_section = False
    
    # First find the "EVENTS OF DEFAULT" title
    for i, element in enumerate(elements):
        # Check if we've found the Events of Default section
        if (element.get('type') == 'Title' and 
            'EVENTS OF DEFAULT' in element.get('text', '')):
            is_events_section = True
            continue
            
        # If we're in the Events of Default section, collect list items
        if is_events_section and element.get('type') == 'ListItem':
            text = element.get('text', '').strip()
            
            # Stop when we reach clause 5.3
            if text.startswith('5.3'):
                break
                
            # Skip the introductory text
            if text.startswith('5.1') or text.startswith('5.2'):
                continue
                
            # Clean up the text - remove any leading letters/numbers and spaces
            cleaned_text = re.sub(r'^[a-z]\s+', '', text)  # Remove single letter prefixes like 'a '
            cleaned_text = re.sub(r'^[ivx]+\s+', '', cleaned_text)  # Remove roman numerals
            
            if cleaned_text:
                events_of_default.append(cleaned_text)
    
    return events_of_default

def clean_company_name(name: str) -> str:
    """Clean company name by removing leading numbers and extra whitespace"""
    # Remove leading numbers and any following whitespace
    cleaned = re.sub(r'^\d+\s*', '', name)
    # Remove any extra whitespace
    cleaned = ' '.join(cleaned.split())
    return cleaned

def format_output_json(parties: Dict[str, Any], loan_terms: Dict[str, Any], output_file: str = "output.json") -> Dict[str, Any]:
    """
    Format and combine parties and loan terms data into the desired JSON structure
    Args:
        parties: Dictionary containing lender and borrower information
        loan_terms: Dictionary containing loan terms information
        output_file: Optional filename to save the JSON output
    Returns:
        Dictionary with the formatted JSON structure
    """
    # Create the base structure with a deep copy to avoid modifying original
    formatted_json = {
        "documentType": "Intercompany Loan Agreement",  
        "parties": dict(parties),
        "loanTerms": loan_terms.get("loanTerms", {}),
        "eventsOfDefault": extract_events_of_default(element_dict),
        # "eventsOfDefault": extract_events_of_default(elements),
        "governingLaw": "Singapore" 
    }

    # Clean company names
    for party_type in ["lender", "borrower"]:
        if party_type in formatted_json["parties"]:
            party = formatted_json["parties"][party_type]
            if "name" in party:
                party["name"] = clean_company_name(party["name"])

    # Ensure all required fields exist
    for party_type in ["lender", "borrower"]:
        if party_type in formatted_json["parties"]:
            party = formatted_json["parties"][party_type]
            # Ensure contact field exists and has all required fields
            if "contact" not in party:
                party["contact"] = {}
            contact = party["contact"]
            required_contact_fields = ["name", "title", "address", "email"]
            for field in required_contact_fields:
                if field not in contact:
                    contact[field] = ""

    # Write to file if specified
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(formatted_json, f, indent=2)

    return formatted_json

def clean_text(text):
    # Remove excessive whitespace and normalize line endings
    return ' '.join(text.split()).strip()

def convert_to_markdown(elements):
    # Filter out elements from page 1
    filtered_elements = [
        element for element in elements 
        if getattr(element.metadata, 'page_number', 0) != 1
    ]
    
    markdown_content = []
    in_table = False
    table_data = []
    
    for element in filtered_elements:
        element_type = type(element).__name__
        element_text = clean_text(str(element))
        
        if not element_text:  # Skip empty elements
            continue
            
        if element_type == 'Title':
            markdown_content.append(f"# {element_text}\n")
        elif element_type == 'Header':
            markdown_content.append(f"## {element_text}\n")
        elif element_type == 'ListItem':
            markdown_content.append(f"- {element_text}")
        elif element_type == 'Table':
            # Handle table formatting
            if not in_table:
                in_table = True
                table_data = []
            table_data.append(element_text)
        elif element_type == 'FigureCaption':
            markdown_content.append(f"\n*{element_text}*\n")
        elif element_type == 'Image':
            markdown_content.append(f"![{element_text}](image_path)\n")
        elif element_type == 'NarrativeText':
            markdown_content.append(f"\n{element_text}\n")
        else:  # Default case for Text and other elements
            markdown_content.append(element_text)
            
        # Handle table end
        if in_table and element_type != 'Table':
            in_table = False
            if table_data:
                markdown_content.extend(format_table(table_data))
                table_data = []
                
    return '\n'.join(markdown_content)

def format_table(table_data):
    # Simple table formatting
    formatted_table = []
    formatted_table.append('\n| ' + ' | '.join(str(cell) for cell in table_data) + ' |')
    formatted_table.append('|' + '---|' * len(table_data))
    return formatted_table

# For the dictionary version, you can filter before converting to dict
filtered_elements = [
    el for el in elements 
    if getattr(el.metadata, 'page_number', 0) != 1
]
element_dict = [el.to_dict() for el in filtered_elements]

def process_files(pdf_path, output_dir, md_filename, json_filename):
    """
    Process PDF file and generate Markdown and JSON outputs using unstructured library.
    
    Args:
        pdf_path (str): Path to the input PDF file
        output_dir (str): Directory where output files will be saved
        md_filename (str): Name for the output Markdown file
        json_filename (str): Name for the output JSON file
    """
    if not os.path.exists(pdf_path):
        raise ValueError(f"PDF file not found: {pdf_path}")
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Add extensions if not provided
    if not md_filename.endswith('.md'):
        md_filename += '.md'
    
    if not json_filename.endswith('.json'):
        json_filename += '.json'
    
    # Define output paths
    output_md_path = os.path.join(output_dir, md_filename)
    json_output_path = os.path.join(output_dir, json_filename)
    
    try:
        # Read the PDF using unstructured
        elements = partition_pdf(pdf_path, strategy="hi_res")
        
        # Filter elements from page 1
        filtered_elements = [
            el for el in elements 
            if getattr(el.metadata, 'page_number', 0) != 1
        ]
        element_dict = [el.to_dict() for el in filtered_elements]
        
        # Extract document components
        art_title = extract_document_type(element_dict)
        loan_terms = create_loan_terms(element_dict)
        default_terms = extract_events_of_default(element_dict)
        print(default_terms)
        # Process parties information
        parties = {"lender": {}, "borrower": {}}
        
        # Get parties section elements
        parties_elem_id = [x['element_id'] for x in element_dict if x['text'] == 'PARTIES']
        parties_text = [x for x in element_dict if x['type'] == 'ListItem' and 
                       x['metadata']['parent_id'] in parties_elem_id]
        
        # Process company details
        for party in parties_text:
            party_type = 'lender' if 'Lender' in party['text'] else 'borrower'
            parties[party_type].update(extract_company_details(party['text']))
            parties[party_type]["contact"] = {}
        
        # Process contact details
        contact_tables = [x for x in element_dict if x['type'] == 'Table' and 
                         'contact' in x['text'].lower()]
        for table in contact_tables:
            party_type = 'lender' if 'LENDER' in table['text'].upper() else 'borrower'
            parties[party_type]['contact'] = extract_contact_details(table['text'])
        
        # Extract and add signature titles
        signatures = extract_signatures(element_dict)
        for party_type, title in signatures.items():
            if party_type in parties and 'contact' in parties[party_type]:
                parties[party_type]['contact']['title'] = title
        
        # Generate JSON output
        results = format_output_json(parties, loan_terms, json_output_path)
        
        # Generate Markdown output
        markdown_content = convert_to_markdown(elements)
        with open(output_md_path, 'w', encoding='utf-8') as f:
            f.write(markdown_content)
        
        print(f"Markdown file created at: {output_md_path}")
        print(f"JSON file created at: {json_output_path}")
        
        return results
        
    except Exception as e:
        print(f"Error processing files: {str(e)}")
        raise

In [6]:
process_files('data/test/leasedoc_1.pdf', 'data/output', 'eg_annotated3_unstructured.md', 'output_eg3_unstructured.json')

[]
Markdown file created at: data/output/eg_annotated3_unstructured.md
JSON file created at: data/output/output_eg3_unstructured.json


{'documentType': 'Intercompany Loan Agreement',
 'parties': {'lender': {'contact': {'name': '',
    'title': '',
    'address': '',
    'email': ''}},
  'borrower': {'contact': {'name': '',
    'title': '',
    'address': '',
    'email': ''}}},
 'loanTerms': {'principalAmount': None,
  'currency': 'UNKNOWN',
  'interestRate': None,
  'drawdownDate': 'Unknown',
  'repaymentTerm': 'Within 30 days after written demand from Lender',
  'interestPayment': {'frequency': 'annually',
   'compounding': True,
   'paymentDate': 'date of repayment of the Loan'}},
 'eventsOfDefault': ['a material breach of any term of this Agreement by the Borrower which is not remedied within 15 Business Days after the Borrower becoming aware of the breach;',
  'if:',
  'an order is made, resolution passed or legal proceedings issued, or corporate action is taken, notice given or other step taken for the dissolution of the Borrower;',
  'a liquidator, receiver, manager, statutory manager, inspector, trustee or oth

In [None]:
process_files('data/test/eg_annotated.pdf', 'data/output', 'eg_annotated1_unstructured.md', 'output_eg1_unstructured.json')

In [None]:
process_files('data/test/eg_annotated_cancel.pdf', 'data/output', 'eg_annotated2_unstructured.md', 'output_eg2_unstructured.json')

In [None]:
def extract_events_of_default(elements):
    """
    Extract Events of Default clauses from the document elements
    Args:
        elements: List of document elements containing text and type information
    Returns:
        List of event of default clauses
    """
    events_of_default = []
    is_events_section = False
    
    # First find the "EVENTS OF DEFAULT" title
    for i, element in enumerate(elements):
        # Check if we've found the Events of Default section
        if (element.get('type') == 'Title' and 
            'EVENTS OF DEFAULT' in element.get('text', '')):
            is_events_section = True
            continue
            
        # If we're in the Events of Default section, collect list items
        if is_events_section and element.get('type') == 'ListItem':
            text = element.get('text', '').strip()
            
            # Stop when we reach clause 5.3
            if text.startswith('5.3'):
                break
                
            # Skip the introductory text
            if text.startswith('5.1') or text.startswith('5.2'):
                continue
                
            # Clean up the text - remove any leading letters/numbers and spaces
            cleaned_text = re.sub(r'^[a-z]\s+', '', text)  # Remove single letter prefixes like 'a '
            cleaned_text = re.sub(r'^[ivx]+\s+', '', cleaned_text)  # Remove roman numerals
            
            if cleaned_text:
                events_of_default.append(cleaned_text)
    
    return events_of_default

In [None]:
process_files('data/test/leasedoc_1.pdf', 'data/output', 'eg_annotated3_unstructured.md', 'output_eg3_unstructured.json')