# OCR Pipeline for Handwritten Document PII Extraction

This notebook implements an end-to-end pipeline:
1. Image Pre-processing
2. OCR (Optical Character Recognition)
3. Text Cleaning
4. PII Detection
5. Optional: Redacted Image Generation


In [None]:
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import easyocr
import re
import os
from pathlib import Path
import json
from typing import Dict, List, Tuple, Optional


## 1. Image Pre-processing


In [None]:
def preprocess_image(image_path: str, return_binary: bool = False) -> np.ndarray:
    """
    Pre-process image for better OCR results.
    Handles: rotation correction, noise reduction, contrast enhancement
    
    For handwritten text, grayscale often works better than binary.
    """
    # Read image
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError(f"Could not read image: {image_path}")
    
    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Deskew (correct rotation)
    gray = deskew_image(gray)
    
    # Denoise - use lighter denoising for handwritten text
    denoised = cv2.fastNlMeansDenoising(gray, None, 5, 7, 21)  # Reduced h from 10 to 5
    
    # Enhance contrast using CLAHE (Contrast Limited Adaptive Histogram Equalization)
    clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))  # Slightly higher clipLimit
    enhanced = clahe.apply(denoised)
    
    if return_binary:
        # Threshold to binary (black and white) - only if specifically requested
        _, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        return binary
    
    # Return enhanced grayscale (better for handwritten text)
    return enhanced

def deskew_image(image: np.ndarray) -> np.ndarray:
    """
    Correct slight rotation/tilt in the image.
    """
    # Find all non-zero points
    coords = np.column_stack(np.where(image > 0))
    
    if len(coords) == 0:
        return image
    
    # Find minimum area rectangle
    angle = cv2.minAreaRect(coords)[-1]
    
    # Correct angle
    if angle < -45:
        angle = -(90 + angle)
    else:
        angle = -angle
    
    # Only rotate if angle is significant (> 0.5 degrees)
    if abs(angle) > 0.5:
        (h, w) = image.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
        return rotated
    
    return image


## 2. OCR Processing


In [None]:
# Initialize EasyOCR reader (supports handwritten text)
# This will download models on first run
print("Initializing EasyOCR (this may take a moment on first run)...")
reader = easyocr.Reader(['en'], gpu=False, verbose=False)
print("EasyOCR initialized successfully!")


In [None]:
def perform_ocr(image: np.ndarray, use_binary: bool = False) -> tuple:
    """
    Perform OCR on preprocessed image.
    For handwritten text, using grayscale often works better than binary.
    Returns raw extracted text and OCR results.
    """
    # For handwritten text, use grayscale
    if use_binary:
        ocr_image = image
    else:
        # Convert binary back to grayscale if needed, or use original grayscale
        if len(image.shape) == 2:
            ocr_image = image
        else:
            ocr_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    
    # For handwritten text, use more lenient parameters to get more text
    results = reader.readtext(
        ocr_image,
        paragraph=False,  # Don't group - get individual detections
        width_ths=0.5,     # Even lower threshold for width (handwritten is variable)
        height_ths=0.5,    # Even lower threshold for height
        detail=1,          # Return detailed results
        allowlist=None,    # Allow all characters
        blocklist=''       # Don't block any characters
    )
    
    # Combine all detected text
    text_lines = []
    for result in results:
        # Handle both tuple and list formats
        if isinstance(result, (tuple, list)) and len(result) >= 3:
            bbox, text, confidence = result[0], result[1], result[2]
            # Very low confidence threshold for handwritten text
            if confidence > 0.1:  # Very low threshold to capture more text
                text_lines.append(text)
    
    # Join text blocks
    if text_lines:
        raw_text = ' '.join(text_lines)  # Join with spaces
    else:
        raw_text = ''
    
    return raw_text, results  # Return both text and bbox info for redaction


## 3. Text Cleaning


In [None]:
def clean_text(text: str) -> str:
    """
    Clean and normalize extracted text.
    """
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove special characters that are OCR artifacts
    text = re.sub(r"[^\w\s\.,;:!?\-'\"()\[\]{}@#%&*+=/\\]", '', text)
    
    # Fix common OCR errors
    replacements = {
        r'\b0\b': 'O',  # Standalone 0 might be O
        r'\bl\b': 'I',  # Standalone l might be I
    }
    
    # Normalize line breaks
    text = text.strip()
    
    return text


## 4. PII Detection


In [None]:
def detect_pii(text: str) -> Dict[str, List[str]]:
    """
    Detect Personally Identifiable Information (PII) in text.
    Returns dictionary with PII types and detected values.
    """
    pii = {
        'emails': [],
        'phone_numbers': [],
        'ssn': [],
        'dates': [],
        'names': [],  # Basic name detection (capitalized words, 2-3 words)
        'addresses': [],
        'medical_record_numbers': [],
        'dates_of_birth': []
    }
    
    # Email pattern
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    pii['emails'] = re.findall(email_pattern, text, re.IGNORECASE)
    
    # Phone numbers (various formats)
    phone_patterns = [
        r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',  # 123-456-7890
        r'\b\(\d{3}\)\s?\d{3}[-.]?\d{4}\b',  # (123) 456-7890
        r'\b\d{10}\b'  # 1234567890
    ]
    for pattern in phone_patterns:
        matches = re.findall(pattern, text)
        pii['phone_numbers'].extend(matches)
    pii['phone_numbers'] = list(set(pii['phone_numbers']))  # Remove duplicates
    
    # SSN pattern
    ssn_pattern = r'\b\d{3}-?\d{2}-?\d{4}\b'
    pii['ssn'] = re.findall(ssn_pattern, text)
    
    # Dates (MM/DD/YYYY, DD/MM/YYYY, etc.)
    date_patterns = [
        r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',  # 01/15/2024 or 1/15/24
        r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b',  # 2024/01/15
        r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b',  # January 15, 2024
    ]
    for pattern in date_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        pii['dates'].extend(matches)
    pii['dates'] = list(set(pii['dates']))
    
    # Date of Birth (DOB) - look for keywords
    dob_keywords = r'\b(?:DOB|Date of Birth|Born|Birth Date)[: ]*([\d/\-]+|[A-Za-z]+\s+\d{1,2},?\s+\d{4})\b'
    dob_matches = re.findall(dob_keywords, text, re.IGNORECASE)
    pii['dates_of_birth'] = dob_matches
    
    # Medical Record Number (MRN) - various formats
    mrn_patterns = [
        r'\bMRN[: ]*([A-Z0-9]{6,12})\b',
        r'\bMedical Record[: ]*([A-Z0-9]{6,12})\b',
        r'\bRecord #[: ]*([A-Z0-9]{6,12})\b'
    ]
    for pattern in mrn_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        pii['medical_record_numbers'].extend(matches)
    pii['medical_record_numbers'] = list(set(pii['medical_record_numbers']))
    
    # Names - simple heuristic: capitalized 2-3 word sequences
    # This is basic - could be improved with NER models
    name_pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}\b'
    potential_names = re.findall(name_pattern, text)
    # Filter out common false positives
    false_positives = {'Date', 'Time', 'Name', 'Address', 'Phone', 'Email', 'Patient', 'Doctor', 'Clinic', 'Hospital'}
    pii['names'] = [name for name in potential_names if not any(fp in name for fp in false_positives)]
    
    # Addresses - look for street patterns
    address_pattern = r'\b\d+\s+[A-Z][a-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Drive|Dr|Lane|Ln|Boulevard|Blvd|Court|Ct|Place|Pl)\b'
    pii['addresses'] = re.findall(address_pattern, text, re.IGNORECASE)
    
    # Remove empty categories
    pii = {k: v for k, v in pii.items() if v}
    
    return pii


## 5. Redaction (Optional)


In [None]:
def create_redacted_image(image_path: str, ocr_results: List, pii_texts: List[str], output_path: str):
    """
    Create a redacted version of the image by blacking out PII regions.
    """
    # Load original image
    img = cv2.imread(image_path)
    
    # Convert PII texts to lowercase for matching
    pii_lower = [p.lower() for p in pii_texts]
    
    # Find and redact bounding boxes containing PII
    for result in ocr_results:
        # Handle both tuple and list formats
        if isinstance(result, (tuple, list)) and len(result) >= 3:
            bbox, text, confidence = result[0], result[1], result[2]
            text_lower = text.lower()
            # Check if this text contains any PII
            if any(pii in text_lower for pii in pii_lower):
                # Get bounding box coordinates
                bbox = np.array(bbox, dtype=np.int32)
                # Draw black rectangle over the text
                cv2.fillPoly(img, [bbox], (0, 0, 0))
    
    # Save redacted image
    cv2.imwrite(output_path, img)
    return img


In [None]:
# This cell intentionally left blank - redaction function is defined above


## 6. Main Pipeline


In [None]:
def process_document(image_path: str, create_redaction: bool = True) -> Dict:
    """
    Complete pipeline: Pre-process → OCR → Clean → PII Detection → Redaction
    """
    print(f"Processing: {image_path}")
    
    # Step 1: Pre-processing
    print("Step 1: Pre-processing image...")
    processed_img = preprocess_image(image_path)
    
    # Step 2: OCR
    print("Step 2: Performing OCR...")
    raw_text, ocr_results = perform_ocr(processed_img)
    
    # Step 3: Text Cleaning
    print("Step 3: Cleaning text...")
    cleaned_text = clean_text(raw_text)
    
    # Step 4: PII Detection
    print("Step 4: Detecting PII...")
    pii_detected = detect_pii(cleaned_text)
    
    # Step 5: Optional Redaction
    redacted_path = None
    if create_redaction:
        print("Step 5: Creating redacted image...")
        # Collect all PII text for redaction
        all_pii_texts = []
        for pii_list in pii_detected.values():
            all_pii_texts.extend(pii_list)
        
        if all_pii_texts:
            base_name = Path(image_path).stem
            output_dir = Path(image_path).parent / 'output'
            output_dir.mkdir(exist_ok=True)
            redacted_path = str(output_dir / f"{base_name}_redacted.jpg")
            create_redacted_image(image_path, ocr_results, all_pii_texts, redacted_path)
    
    # Compile results
    results = {
        'image_path': image_path,
        'raw_text': raw_text,
        'cleaned_text': cleaned_text,
        'pii_detected': pii_detected,
        'redacted_image_path': redacted_path
    }
    
    return results


## 7. Process Sample Documents


In [None]:
# Set up paths
samples_dir = Path('samples')
output_dir = Path('output')
output_dir.mkdir(exist_ok=True)

# Find all JPEG images in samples folder
image_files = list(samples_dir.glob('*.jpg')) + list(samples_dir.glob('*.jpeg')) + list(samples_dir.glob('*.JPG')) + list(samples_dir.glob('*.JPEG'))

if not image_files:
    print("No images found in 'samples' folder. Please add your JPEG images there.")
else:
    print(f"Found {len(image_files)} image(s) to process.")
    
    # Process each image
    all_results = []
    for img_path in image_files:
        print(f"\n{'='*60}")
        results = process_document(str(img_path), create_redaction=True)
        all_results.append(results)
        
        # Display results
        print(f"\nResults for {img_path.name}:")
        print(f"\nRaw Text:\n{results['raw_text']}")
        print(f"\nCleaned Text:\n{results['cleaned_text']}")
        print(f"\nPII Detected:")
        for pii_type, values in results['pii_detected'].items():
            print(f"  {pii_type}: {values}")
        
        if results['redacted_image_path']:
            print(f"\nRedacted image saved to: {results['redacted_image_path']}")
        
        print(f"\n{'='*60}")


## 8. Save Results to JSON


In [None]:
# Save all results to JSON file
if all_results:
    results_file = output_dir / 'results.json'
    
    # Convert Path objects to strings for JSON serialization
    json_results = []
    for r in all_results:
        json_r = {
            'image_path': str(r['image_path']),
            'raw_text': r['raw_text'],
            'cleaned_text': r['cleaned_text'],
            'pii_detected': r['pii_detected'],
            'redacted_image_path': str(r['redacted_image_path']) if r['redacted_image_path'] else None
        }
        json_results.append(json_r)
    
    with open(results_file, 'w') as f:
        json.dump(json_results, f, indent=2)
    
    print(f"\nResults saved to: {results_file}")


## 9. Display Images (for visualization)


In [None]:
from IPython.display import Image, display
from matplotlib import pyplot as plt

# Display original and redacted images side by side
if all_results:
    for result in all_results:
        if result['redacted_image_path']:
            fig, axes = plt.subplots(1, 2, figsize=(15, 8))
            
            # Original
            orig_img = cv2.imread(result['image_path'])
            orig_img_rgb = cv2.cvtColor(orig_img, cv2.COLOR_BGR2RGB)
            axes[0].imshow(orig_img_rgb)
            axes[0].set_title('Original Image')
            axes[0].axis('off')
            
            # Redacted
            redacted_img = cv2.imread(result['redacted_image_path'])
            redacted_img_rgb = cv2.cvtColor(redacted_img, cv2.COLOR_BGR2RGB)
            axes[1].imshow(redacted_img_rgb)
            axes[1].set_title('Redacted Image')
            axes[1].axis('off')
            
            plt.tight_layout()
            plt.show()
            
            print(f"\nPII Summary for {Path(result['image_path']).name}:")
            print(json.dumps(result['pii_detected'], indent=2))
