# Holistic Packet Classification with Extraction and Rule Validation

This notebook demonstrates how to use the holistic packet classification capability of the IDP Common Package to classify multi-document packets, where each document might span multiple pages. The holistic approach examines the document as a whole to identify boundaries between different document types within the packet.

**Key Benefits of Holistic Packet Classification:**
1. Properly handles multi-page documents within a packet
2. Detects logical document boundaries
3. Identifies document types in context of the whole document
4. Handles documents where individual pages may not be clearly classifiable on their own

The notebook demonstrates how to process a document with:

1. OCR Service - Convert a PDF document to text using AWS Textract
2. Classification Service - Classify document pages into sections using Bedrock using the multi-model page based method.
3. Extraction Service - Extract structured information from classified sections using Bedrock
4. Rule Validation Service - Validate documents against business rules using LLMs with extraction results as context
5. Evaluation Service - Evaluate accuracy of extracted information

Each step uses the unified Document object model for data flow and consistency.

> **Note**: This notebook uses AWS services including S3, Textract, and Bedrock. You need valid AWS credentials with appropriate permissions to run this notebook.

## 1. Install Dependencies

The IDP common package supports granular installation through extras. You can install:
- `[core]` - Just core functionality 
- `[ocr]` - OCR service with Textract dependencies
- `[classification]` - Classification service dependencies
- `[extraction]` - Extraction service dependencies
- `[evaluation]` - Evaluation service dependencies
- `[all]` - All of the above

In [None]:
# Let's make sure that modules are autoreloaded
%load_ext autoreload
%autoreload 2

ROOTDIR="../.."
# First uninstall existing package (to ensure we get the latest version)
%pip uninstall -y idp_common

# Install the IDP common package with all components in development mode
%pip install -q -e "{ROOTDIR}/lib/idp_common_pkg[dev, all]"

# Note: We can also install specific components like:
# %pip install -q -e "{ROOTDIR}/lib/idp_common_pkg[ocr,classification,extraction,evaluation]"

# Check installed version
%pip show idp_common | grep -E "Version|Location"

# Optionally use a .env file for environment variables
try:
    from dotenv import load_dotenv
    load_dotenv()  
except ImportError:
    pass  

## 2. Import Libraries and Set Up Environment

In [None]:
import os
import json
import time
import boto3
import logging
import datetime
import copy

# Import base libraries
from idp_common.models import Document, Status, Section, Page
from idp_common import ocr, classification, extraction, evaluation, summarization
from idp_common import s3
import json
from dotenv import load_dotenv
load_dotenv()

# Configure logging 
logging.basicConfig(level=logging.WARNING)  # Set root logger to WARNING (less verbose)
logging.getLogger('idp_common.ocr.service').setLevel(logging.INFO)  # Focus on service logs
logging.getLogger('textractor').setLevel(logging.WARNING)  # Suppress textractor logs
logging.getLogger('idp_common.evaluation.service').setLevel(logging.INFO)  # Enable evaluation logs
logging.getLogger('idp_common.bedrock.client').setLevel(logging.INFO)  # show prompts
logging.getLogger('idp_common.rule_validation.service').setLevel(logging.DEBUG)


# Set environment variables
os.environ['METRIC_NAMESPACE'] = 'IDP-Notebook-Example'
os.environ['AWS_REGION'] = boto3.session.Session().region_name or 'us-east-1'

# Get AWS account ID for unique bucket names
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()["Account"]
region = os.environ['AWS_REGION']

# Define sample PDF path 
SAMPLE_PDF_PATH = f"{ROOTDIR}/samples/rule-validation/respiratory_pa_packet.pdf"

# Create unique bucket names based on account ID and region
input_bucket_name =  os.getenv("IDP_INPUT_BUCKET_NAME", f"idp-notebook-input-{account_id}-{region}")
output_bucket_name = os.getenv("IDP_OUTPUT_BUCKET_NAME", f"idp-notebook-output-{account_id}-{region}")

# Helper function to parse S3 URIs
def parse_s3_uri(uri):
    parts = uri.replace("s3://", "").split("/")
    bucket = parts[0]
    key = "/".join(parts[1:])
    return bucket, key

# Helper function to load JSON from S3
def load_json_from_s3(uri):
    bucket, key = parse_s3_uri(uri)
    response = s3_client.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    return json.loads(content)

print("Environment setup:")
print(f"METRIC_NAMESPACE: {os.environ.get('METRIC_NAMESPACE')}")
print(f"AWS_REGION: {os.environ.get('AWS_REGION')}")
print(f"Input bucket: {input_bucket_name}")
print(f"Output bucket: {output_bucket_name}")
print(f"SAMPLE_PDF_PATH: {SAMPLE_PDF_PATH}")

## 3. Set Up S3 Buckets and Upload Sample File

In [None]:
# Create S3 client
s3_client = boto3.client('s3')

# Function to create a bucket if it doesn't exist
def ensure_bucket_exists(bucket_name):
    try:
        s3_client.head_bucket(Bucket=bucket_name)
        print(f"Bucket {bucket_name} already exists")
    except Exception:
        try:
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=bucket_name)
            else:
                s3_client.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
            print(f"Created bucket: {bucket_name}")
            
            # Wait for bucket to be accessible
            waiter = s3_client.get_waiter('bucket_exists')
            waiter.wait(Bucket=bucket_name)
        except Exception as e:
            print(f"Error creating bucket {bucket_name}: {str(e)}")
            raise

# Ensure both buckets exist
ensure_bucket_exists(input_bucket_name)
ensure_bucket_exists(output_bucket_name)

# Upload the sample file to S3
sample_file_key = "sample-" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".pdf"
with open(SAMPLE_PDF_PATH, 'rb') as file_data:
    s3_client.upload_fileobj(file_data, input_bucket_name, sample_file_key)

print(f"Uploaded sample file to: s3://{input_bucket_name}/{sample_file_key}")

## 4. Set Up Configuration

In [None]:
import yaml
with open(f"{ROOTDIR}/config_library/pattern-2/rule-validation/config.yaml", 'r') as file:
    # "../../config_library/pattern-2/rule-validation/config.yaml"
    # genaiic-idp-accelerator/config_library/pattern-2/rvl-cdip-package-sample/config.yaml
    CONFIG = yaml.safe_load(file)

## 5. Process Document with OCR

In [None]:
# Initialize a new Document
document = Document(
    id="respiratory_pa_packet",
    input_bucket=input_bucket_name,
    input_key=sample_file_key,
    output_bucket=output_bucket_name,
    status=Status.QUEUED
)

print(f"Created document with ID: {document.id}")
print(f"Status: {document.status.value}")

# Create OCR service with Textract
# Valid features are 'LAYOUT', 'FORMS', 'SIGNATURES', 'TABLES' (uses analyze_document API)
# or leave it empty (to use basic detect_document_text API)
ocr_service = ocr.OcrService(
    region=region,
    enhanced_features=['LAYOUT']
)

# Process document with OCR
print("\nProcessing document with OCR...")
start_time = time.time()
document = ocr_service.process_document(document)
ocr_time = time.time() - start_time

print(f"OCR processing completed in {ocr_time:.2f} seconds")
print(f"Document status: {document.status.value}")
print(f"Number of pages processed: {document.num_pages}")

# Show pages information
print("\nProcessed pages:")
for page_id, page in document.pages.items():
    print(f"Page {page_id}:")
    print(f"  Image URI: {page.image_uri}")
    print(f"  Raw Text URI: {page.raw_text_uri}")
    print(f"  Parsed Text URI: {page.parsed_text_uri}")
print("\nMetering:")
print(json.dumps(document.metering))

## 6. Classify the Document

In [None]:
# Verify that Config specifies => "classificationMethod": "textbasedHolisticClassification"
print("*****************************************************************")
print(f'CONFIG classificationMethod: {CONFIG["classification"].get("classificationMethod")}')
print("*****************************************************************")

# Create classification service with Bedrock backend
# The classification method is set in the config
classification_service = classification.ClassificationService(
    config=CONFIG, 
    backend="bedrock" 
)

# Classify the document
print("\nClassifying document...")
start_time = time.time()
document = classification_service.classify_document(document)
classification_time = time.time() - start_time
print(f"Classification completed in {classification_time:.2f} seconds")
print(f"Document status: {document.status.value}")

### Show classification results

In [None]:
if document.sections:
    print("\nDetected sections:")
    for section in document.sections:
        print(f"Section {section.section_id}: {section.classification}")
        print(f"  Pages: {section.page_ids}")
else:
    print("\nNo sections detected")

# Show page classification
print("\nPage-level classifications:")
for page_id, page in sorted(document.pages.items()):
    print(f"Page {page_id}: {page.classification}")

In [None]:
document.sections

## 7. Extract Information from Classified Sections

Now that we have classified the document into sections, we'll extract structured information from each section using the extraction service. This step is crucial for rule validation as it provides structured data that can be referenced during the validation process.

In [None]:
# Create extraction service
extraction_service = extraction.ExtractionService(config=CONFIG)

# Extract information from each section
print("\nExtracting information from classified sections...")
start_time = time.time()

if not document.sections:
    print("No sections found in document. Cannot proceed with extraction.")
else:
    print(f"Processing {len(document.sections)} sections...")
    
    for i, section in enumerate(document.sections):
        print(f"\n--- Processing Section {i+1}/{len(document.sections)} ---")
        print(f"Section ID: {section.section_id}")
        print(f"Classification: {section.classification}")
        print(f"Pages: {section.page_ids}")
        
        # Process section extraction
        section_start_time = time.time()
        document = extraction_service.process_document_section(
            document=document,
            section_id=section.section_id
        )
        section_time = time.time() - section_start_time
        print(f"Section extraction completed in {section_time:.2f} seconds")

extraction_time = time.time() - start_time
print(f"\nTotal extraction completed in {extraction_time:.2f} seconds")
print(f"Document status: {document.status.value}")

### Show extraction results

In [None]:
print("\nExtraction results by section:")
for section in document.sections:
    print(f"\nSection {section.section_id} ({section.classification}):")
    if section.extraction_result_uri:
        try:
            extraction_data = s3.get_json_content(section.extraction_result_uri)
            extraction_results = extraction_data.get('inference_result', {})
            print(f"  Extracted {len(extraction_results)} fields")
            # Show first few fields as preview
            for i, (key, value) in enumerate(extraction_results.items()):
                if i < 3:  # Show first 3 fields
                    print(f"    {key}: {str(value)[:100]}{'...' if len(str(value)) > 100 else ''}")
                elif i == 3:
                    print(f"    ... and {len(extraction_results) - 3} more fields")
                    break
        except Exception as e:
            print(f"  Error loading extraction results: {e}")
    else:
        print("  No extraction results available")

## 8. Rule-Validation Document

In [None]:
from idp_common.rule_validation import RuleValidationService


# Load configuration from YAML file
config_path = "../../config_library/pattern-2/rule-validation/config.yaml"
with open(config_path, 'r') as f:
    config_data = yaml.safe_load(f)

print(f"✅ Loaded configuration from: {config_path}")
print(f"Fact Extraction Model: {config_data['rule_validation']['fact_extraction']['model']}")

rule_validation_service = RuleValidationService(
            region=region,
            config=config_data
        )       

In [None]:
# Process each section individually (following new architecture)
section_results = []

for section in document.sections:
    print(f"Processing section {section.section_id} (classification: {section.classification})")
    
    # Create a document with only this section
    section_document = Document(
            id=document.id,
            input_key=document.input_key,
            input_bucket=document.input_bucket,
            output_bucket=document.output_bucket,
            pages=document.pages,
            sections=[section],  # Only include this section
            status=document.status,
            metering=document.metering.copy() if document.metering else {},  # Copy existing metering
            rule_validation_result=document.rule_validation_result,  # Copy existing result
            # Add other fields as needed
        )
    
    # Create fresh service instance for each section (avoids asyncio semaphore issues in notebooks)
    section_rule_validation_service = RuleValidationService(
        region=region,
        config=config_data
    )
    
    # Process the single section
    section_result = section_rule_validation_service.validate_document(section_document)
    section_results.append(section_result)
    
    print(f"✅ Completed section {section.section_id}")

In [None]:
section_results[0].rule_validation_result.metadata

In [None]:
# Consolidate results (this would normally load from S3 files)
# For notebook demo, we'll show the individual section results
print("Consolidating section results...")

for i, section_result in enumerate(section_results):
    section_id = document.sections[i].section_id
    if hasattr(section_result, 'rule_validation_result') and section_result.rule_validation_result:
        rv_result = section_result.rule_validation_result
        # The actual result is stored in the S3 URI
        section_uri = rv_result.metadata.get('section_output_uri')
        if section_uri:
            print(f"Section {section_id}: Results saved to {section_uri}")
        else:
            print(f"Section {section_id}: Rule validation completed (check summary)")
    else:
        print(f"Section {section_id}: No rule validation results")

print("Section processing complete")

In [None]:
# Create Orchestration service
from idp_common.rule_validation import RuleValidationOrchestratorService

orchestration_service = RuleValidationOrchestratorService(
    config=config_data
)

print("Orchestration service created")

In [None]:
document.sections

In [None]:
document = orchestration_service.consolidate_and_save(
    document=document,
    config=config_data,
    multiple_sections=True
)

print(f"Consolidated summary URI: {document.rule_validation_result.output_uri}")
print(f"Rule type files: {document.rule_validation_result.summary.get('rule_type_uris', [])}")
print(f"Sections processed: {document.rule_validation_result.metadata.get('sections_processed', 0)}")

