# Contract Analysis with Azure Content Understanding

This notebook demonstrates how to extract and analyze information from contract documents using Microsoft's Azure Content Understanding service (part of Azure AI Foundry).

## Prerequisites
- Azure subscription with Microsoft Foundry resource
- Content Understanding endpoint with autodeployment enabled for required models
- RBAC role: Cognitive Services User assigned to your user/managed identity
- PDF contract files in the `contracts/` folder

## Step 1: Install Required Packages

In [None]:
# Install required packages for Azure Content Understanding
# %pip install -r requirements.txt

## Step 2: Import Libraries and Setup

In [None]:
import os
from pathlib import Path
from azure.identity import DefaultAzureCredential
import pandas as pd
import json
import requests
import time

print("‚úì Libraries imported successfully")

## Step 3: Configure Azure Authentication (RBAC)

Using Azure RBAC with `DefaultAzureCredential` for secure authentication.

**To set up:**
1. Go to [Azure Portal](https://portal.azure.com)
2. Create a **"Microsoft Foundry"** resource in a supported region
3. Enable autodeployment for required models (GPT-4.1, GPT-4.1-mini, text-embedding-3-large)
4. Assign yourself the **"Cognitive Services User"** role:
   - Go to your Foundry resource ‚Üí Access Control (IAM)
   - Click "Add role assignment"
   - Select "Cognitive Services User" role
   - Assign to yourself or your managed identity
5. Set environment variable for endpoint or configure in code
6. Authenticate using: `az login` (Azure CLI) or use managed identity in Azure

In [None]:
# Set your Content Understanding endpoint

# Base endpoint for your Cognitive Services resource

base_endpoint = "https://___-contracts-ai-proj-resource.cognitiveservices.azure.com"
api_version = "2025-11-01"
analyzer_id = "prebuilt-contract"  # Use prebuilt contract analyzer
analyzer_id = "projectAnalyzer_1768587228991_591"
endpoint = f"{base_endpoint}/contentunderstanding/analyzers/{analyzer_id}:analyzeBinary?api-version={api_version}"

# Use DefaultAzureCredential for RBAC authentication
credential = DefaultAzureCredential()
token = credential.get_token("https://cognitiveservices.azure.com/.default")
headers = {
    "Authorization": f"Bearer {token.token}"
}

## Step 4: Load Contract Files

In [None]:
# Get all PDF files from the contracts folder
contracts_dir = Path("contracts")
contract_files = list(contracts_dir.glob("*.pdf"))

print(f"Found {len(contract_files)} contract files:")
for file in contract_files:
    print(f"  - {file.name}")

## Step 5: Extract Information Using Azure Content Understanding

Azure Content Understanding prebuilt analyzers can extract:
- **Parties** involved and their roles
- **Contract dates** (effective, expiration, execution)
- **Contract value/amount** and payment terms
- **Clauses** with titles, types, and full content (using custom analyzer)
- **Terms and conditions** in structured format
- Document structure, tables, key-value pairs, and more

We'll use custom analyzers with schema-based extraction for contracts.

In [None]:
import base64
import re

def analyze_contract(file_path):
    """
    Analyze a contract using Azure Content Understanding with analyzeBinary endpoint.
    Returns structured data extracted from the contract.
    """
    print(f"\nAnalyzing: {file_path.name}")

    # Read the raw binary file content
    with open(file_path, 'rb') as f:
        file_content = f.read()

    print(f"  File size: {len(file_content)} bytes")

    # Headers for binary upload - set Content-Type to the actual file type
    binary_headers = {
        "Authorization": f"Bearer {token.token}",
        "Content-Type": "application/pdf"
    }

    # Step 1: Submit analysis request
    print("  Submitting to Azure Content Understanding...")
    print(f"  Endpoint: {endpoint}")

    # Use data= for raw binary, NOT json=
    response = requests.post(endpoint, headers=binary_headers, data=file_content)

    print(f"  Response status: {response.status_code}")

    if response.status_code != 202:
        print(f"‚ùå Error: {response.status_code}")
        print(f"  Response body: {response.text}")
        return {
            "filename": file_path.name,
            "error": f"API returned status {response.status_code}: {response.text}"
        }

    # Get the Operation-Location header for polling results
    operation_location = response.headers.get('Operation-Location')
    print(f"  Operation-Location: {operation_location}")

    if not operation_location:
        print("‚ùå No Operation-Location header in response")
        return {
            "filename": file_path.name,
            "error": "No Operation-Location header"
        }

    print(f"  Results URL: {operation_location}")

    # Step 2: Poll for results (use JSON headers for GET requests)
    print("  Waiting for analysis to complete...")
    poll_headers = {
        "Authorization": f"Bearer {token.token}"
    }

    max_retries = 60
    retry_count = 0

    while retry_count < max_retries:
        time.sleep(2)  # Wait 2 seconds between polls
        result_response = requests.get(operation_location, headers=poll_headers)

        if result_response.status_code == 200:
            result_data = result_response.json()
            status = result_data.get('status')
            print(f"  Status: {status}")

            if status == 'Succeeded':
                # Extract structured data from result
                result_contents = result_data.get('result', {}).get('contents', [])

                if result_contents:
                    content = result_contents[0]
                    fields = content.get('fields', {})

                    # Map fields to our structure using contract-specific extractors
                    extracted_data = {
                        "filename": file_path.name,
                        "title": extract_contract_title(fields),
                        "parties": extract_parties(fields),
                        "dates": extract_dates(fields),
                        "duration": extract_duration(fields),
                        "jurisdictions": extract_jurisdictions(fields),
                        "clauses": extract_clauses(fields),
                        "raw_fields": fields,
                        "markdown": content.get('markdown', '')
                    }

                    print(f"‚úì Analysis complete")
                    return extracted_data
                else:
                    print("‚ö†Ô∏è No content in result")
                    return {
                        "filename": file_path.name,
                        "raw_result": result_data
                    }

            elif status in ['Failed', 'Canceled']:
                print(f"‚ùå Analysis {status.lower()}")
                error_info = result_data.get('error', {})
                print(f"  Error: {error_info}")
                return {
                    "filename": file_path.name,
                    "error": f"Analysis {status.lower()}: {error_info}"
                }

            # Status is Running or NotStarted, continue polling
        else:
            print(f"  Poll response: {result_response.status_code}")

        retry_count += 1

    print("‚ùå Timeout waiting for results")
    return {
        "filename": file_path.name,
        "error": "Timeout"
    }


def extract_field_value(field_data):
    """
    Helper function to extract values from Content Understanding field structure.
    Handles string, date, number, array, and object types.
    """
    if not field_data:
        return None
    
    field_type = field_data.get('type')
    
    if field_type == 'array':
        return [extract_field_value(item) for item in field_data.get('valueArray', [])]
    elif field_type == 'object':
        obj = {}
        for key, value in field_data.get('valueObject', {}).items():
            obj[key] = extract_field_value(value)
        return obj
    elif field_type == 'string':
        return field_data.get('valueString')
    elif field_type == 'number':
        return field_data.get('valueNumber')
    elif field_type == 'date':
        return field_data.get('valueDate')
    else:
        return field_data.get('content', field_data.get('valueString'))


def extract_contract_title(fields):
    """Extract contract title from fields."""
    return extract_field_value(fields.get('Title'))


def extract_parties(fields):
    """
    Extract party information from contract fields.
    Structure: Parties.valueArray[].valueObject.{Name, Address, ReferenceName, Clause}
    """
    parties = []
    
    parties_field = fields.get('Parties', {})
    parties_array = parties_field.get('valueArray', [])
    
    for party_item in parties_array:
        if party_item.get('type') == 'object':
            party_obj = party_item.get('valueObject', {})
            
            party_data = {
                "name": extract_field_value(party_obj.get('Name')),
                "address": extract_field_value(party_obj.get('Address')),
                "reference_name": extract_field_value(party_obj.get('ReferenceName')),
                "clause": extract_field_value(party_obj.get('Clause'))
            }
            
            # Only add if we have at least a name
            if party_data["name"]:
                # Clean up None values
                party_data = {k: v for k, v in party_data.items() if v is not None}
                parties.append(party_data)
    
    return parties


def extract_dates(fields):
    """
    Extract date information from contract fields.
    Available dates: ExecutionDate, EffectiveDate, ExpirationDate, RenewalDate
    """
    dates = {}
    
    date_fields = ['ExecutionDate', 'EffectiveDate', 'ExpirationDate', 'RenewalDate']
    
    for date_field in date_fields:
        date_value = extract_field_value(fields.get(date_field))
        if date_value:
            dates[date_field] = date_value
    
    return dates


def extract_duration(fields):
    """Extract contract duration from fields."""
    return extract_field_value(fields.get('ContractDuration'))


def extract_jurisdictions(fields):
    """
    Extract jurisdiction information from contract fields.
    Structure: Jurisdictions.valueArray[]
    """
    jurisdictions_field = fields.get('Jurisdictions', {})
    jurisdictions_array = jurisdictions_field.get('valueArray', [])
    
    jurisdictions = []
    for item in jurisdictions_array:
        value = extract_field_value(item)
        if value:
            jurisdictions.append(value)
    
    return jurisdictions


def extract_clauses(fields):
    """
    Extract clause information from contract fields.
    Structure: Clauses.valueArray[].valueObject.{clauseType, clauseTitle, clauseText}
    """
    clauses = []
    
    clauses_field = fields.get('Clauses', {})
    clauses_array = clauses_field.get('valueArray', [])
    
    for clause_item in clauses_array:
        if clause_item.get('type') == 'object':
            clause_obj = clause_item.get('valueObject', {})
            
            clause_data = {
                "type": extract_field_value(clause_obj.get('clauseType')),
                "title": extract_field_value(clause_obj.get('clauseTitle')),
                "text": extract_field_value(clause_obj.get('clauseText'))
            }
            
            # Only add if we have at least a title or text
            if clause_data["title"] or clause_data["text"]:
                # Clean up None values
                clause_data = {k: v for k, v in clause_data.items() if v is not None}
                clauses.append(clause_data)
    
    return clauses

## Step 6: Process All Contracts

In [None]:
# Analyze all contracts
all_contract_data = []

for contract_file in contract_files:
    try:
        data = analyze_contract(contract_file)
        all_contract_data.append(data)
    except Exception as e:
        print(f"‚ùå Error processing {contract_file.name}: {str(e)}")

print(f"\n‚úì Successfully processed {len(all_contract_data)} contracts")

## Step 7: View Extracted Data

In [None]:
# Display extracted data for each contract
for contract in all_contract_data:
    print(f"\n{'='*60}")
    print(f"Contract: {contract['filename']}")
    print(f"{'='*60}")
    
    # Title
    if contract.get('title'):
        print(f"\nüìÑ Title: {contract['title']}")
    
    # Parties
    print("\nüìã Parties:")
    parties = contract.get('parties', [])
    if parties:
        for party in parties:
            print(f"  ‚Ä¢ {party.get('name', 'Unknown')}")
            if party.get('address'):
                print(f"    Address: {party['address']}")
            if party.get('reference_name'):
                print(f"    Reference: {party['reference_name']}")
    else:
        print("  No parties extracted")
    
    # Dates
    print("\nüìÖ Dates:")
    dates = contract.get('dates', {})
    if dates:
        for date_type, date_value in dates.items():
            print(f"  {date_type}: {date_value}")
    else:
        print("  No dates extracted")
    
    # Duration
    duration = contract.get('duration')
    if duration:
        print(f"\n‚è±Ô∏è Duration: {duration}")
    
    # Jurisdictions
    jurisdictions = contract.get('jurisdictions', [])
    if jurisdictions:
        print(f"\nüåç Jurisdictions: {', '.join(jurisdictions)}")
    
    # Clauses
    print(f"\nüìù Clauses ({len(contract.get('clauses', []))} found):")
    clauses = contract.get('clauses', [])
    if clauses:
        for i, clause in enumerate(clauses, 1):
            print(f"\n  [{i}] {clause.get('title', 'Untitled')}")
            if clause.get('type'):
                print(f"      Type: {clause['type']}")
            if clause.get('text'):
                text_preview = clause['text'][:150] + "..." if len(clause['text']) > 150 else clause['text']
                print(f"      Text: {text_preview}")
    else:
        print("  No clauses extracted")

In [None]:
from azure.identity import DefaultAzureCredential
import psycopg2

# Create tables for contracts, clauses, and parties

# Connection parameters
server_name = "contract-db"
database_name = "postgres"
host = f"{server_name}.postgres.database.azure.com"
port = 5432

# Get access token using Azure AD authentication
credential = DefaultAzureCredential()
pg_token = credential.get_token("https://ossrdbms-aad.database.windows.net/.default")

# Your Azure AD username (from `az ad signed-in-user show`)
aad_username = "admin@MngEnvMCAP560696.onmicrosoft.com"

# Connect to database using AAD token authentication
conn = psycopg2.connect(
    host=host,
    port=port,
    database=database_name,
    user=aad_username,
    password=pg_token.token,
    sslmode="require",
)

cursor = conn.cursor()

# Create contracts table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS contracts (
        id SERIAL PRIMARY KEY,
        filename VARCHAR(255) NOT NULL,
        title TEXT,
        duration VARCHAR(100),
        jurisdictions JSONB,
        dates JSONB,
        markdown TEXT,
        raw_fields JSONB,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# Create parties table with foreign key to contracts
cursor.execute("""
    CREATE TABLE IF NOT EXISTS parties (
        id SERIAL PRIMARY KEY,
        contract_id INTEGER REFERENCES contracts(id) ON DELETE CASCADE,
        name TEXT,
        address TEXT,
        reference_name TEXT,
        clause TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# Create clauses table with foreign key to contracts
cursor.execute("""
    CREATE TABLE IF NOT EXISTS clauses (
        id SERIAL PRIMARY KEY,
        contract_id INTEGER REFERENCES contracts(id) ON DELETE CASCADE,
        clause_type VARCHAR(100),
        title TEXT,
        text TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# Create indexes for better query performance
cursor.execute("CREATE INDEX IF NOT EXISTS idx_parties_contract_id ON parties(contract_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_parties_name ON parties(name)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_clauses_contract_id ON clauses(contract_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_clauses_type ON clauses(clause_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_contracts_filename ON contracts(filename)")

conn.commit()
cursor.close()
conn.close()

print("‚úì Tables created successfully:")
print("  - contracts (main table)")
print("  - parties (with FK to contracts)")
print("  - clauses (with FK to contracts)")
print("‚úì Indexes created for efficient querying")

In [None]:
# Reconnect to database (previous connection was closed)
pg_token = credential.get_token("https://ossrdbms-aad.database.windows.net/.default")

conn = psycopg2.connect(
    host=host,
    port=port,
    database=database_name,
    user=aad_username,
    password=pg_token.token,
    sslmode="require",
)

cursor = conn.cursor()

# Insert all contract data
for contract in all_contract_data:
    # Insert into contracts table
    cursor.execute("""
        INSERT INTO contracts (filename, title, duration, jurisdictions, dates, markdown, raw_fields)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        RETURNING id
    """, (
        contract.get('filename'),
        contract.get('title'),
        contract.get('duration'),
        json.dumps(contract.get('jurisdictions', [])),
        json.dumps(contract.get('dates', {})),
        contract.get('markdown'),
        json.dumps(contract.get('raw_fields', {}))
    ))

    contract_id = cursor.fetchone()[0]

    # Insert parties
    for party in contract.get('parties', []):
        cursor.execute("""
            INSERT INTO parties (contract_id, name, address, reference_name, clause)
            VALUES (%s, %s, %s, %s, %s)
        """, (
            contract_id,
            party.get('name'),
            party.get('address'),
            party.get('reference_name'),
            party.get('clause')
        ))

    # Insert clauses
    for clause in contract.get('clauses', []):
        cursor.execute("""
            INSERT INTO clauses (contract_id, clause_type, title, text)
            VALUES (%s, %s, %s, %s)
        """, (
            contract_id,
            clause.get('type'),
            clause.get('title'),
            clause.get('text')
        ))

    print(f"‚úì Uploaded: {contract.get('filename')}")

conn.commit()
cursor.close()
conn.close()

print(f"\n‚úì Successfully uploaded {len(all_contract_data)} contracts to database")

In [None]:
# Reconnect to database and query the stored data
pg_token = credential.get_token("https://ossrdbms-aad.database.windows.net/.default")

conn = psycopg2.connect(
    host=host,
    port=port,
    database=database_name,
    user=aad_username,
    password=pg_token.token,
    sslmode="require",
)

cursor = conn.cursor()

# Query contracts
print("üìÑ CONTRACTS TABLE:")
cursor.execute("SELECT id, filename, title, duration, jurisdictions, dates FROM contracts")
contracts_rows = cursor.fetchall()
contracts_df = pd.DataFrame(contracts_rows, columns=['id', 'filename', 'title', 'duration', 'jurisdictions', 'dates'])
print(contracts_df.to_string())

# Query parties
print("\n\nüë• PARTIES TABLE:")
cursor.execute("SELECT id, contract_id, name, address, reference_name FROM parties")
parties_rows = cursor.fetchall()
parties_df = pd.DataFrame(parties_rows, columns=['id', 'contract_id', 'name', 'address', 'reference_name'])
print(parties_df.to_string())

# Query clauses
print("\n\nüìù CLAUSES TABLE:")
cursor.execute("SELECT id, contract_id, clause_type, title, LEFT(text, 100) as text_preview FROM clauses")
clauses_rows = cursor.fetchall()
clauses_df = pd.DataFrame(clauses_rows, columns=['id', 'contract_id', 'clause_type', 'title', 'text_preview'])
print(clauses_df.to_string())

cursor.close()
conn.close()

print(f"\n‚úì Retrieved {len(contracts_rows)} contracts, {len(parties_rows)} parties, {len(clauses_rows)} clauses")