# Martiny Family Archive - Knowledge Graph Builder

Multimodal analysis pipeline for historical archive processing

In [1]:
# Load environment variables from .env file (API keys, paths, etc.)
import os
import sys
from pathlib import Path
from dotenv import load_dotenv  

load_dotenv()

# Set protocol buffers implementation (fixes compatibility issues)
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = os.getenv('PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION')

# Get Gemini API key - this is required for AI-powered entity extraction
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY not found. Make sure it's set in your .env file.")
os.environ['GEMINI_API_KEY'] = GEMINI_API_KEY

# Get paths from environment
ARCHIVE_PATH = os.getenv('ARCHIVE_PATH')  
OUTPUT_DIR = os.getenv('OUTPUT_DIR')       
if not ARCHIVE_PATH or not OUTPUT_DIR:
    raise ValueError("ARCHIVE_PATH or OUTPUT_DIR not found in .env file.")

# Create output directory if it doesn't exist
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

print(f"Environment and paths loaded from .env")
print(f"Output directory ensured at: {OUTPUT_DIR}")


Environment and paths loaded from .env
Output directory ensured at: C:\Users\wel\Desktop\opvn\outputs


In [2]:
import json
import re
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')  
import cv2  
import numpy as np
from PIL import Image
import pandas as pd
from paddleocr import PaddleOCR
from docx import Document 
from PyPDF2 import PdfReader 
import networkx as nx  
from pyvis.network import Network  
from rapidfuzz import fuzz
import google.generativeai as genai

Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
print(f"Initialized. Output directory: {OUTPUT_DIR}")

Initialized. Output directory: C:\Users\wel\Desktop\opvn\outputs


In [3]:
# configuration parameters
MAX_SAMPLE_FILES = 20
MIN_PHOTO_SIZE = 100  
FUZZY_THRESHOLD = 85  

#data structures using Python dataclasses

@dataclass
class Entity:
    """Represents a single entity in our knowledge graph (person, place, company, etc.)"""
    id: str  
    type: str  
    name: str  
    properties: Dict 
    source_file: str 
    confidence: float = 1.0 

@dataclass
class Relationship:
    """Represents a connection between two entities"""
    source_id: str  
    target_id: str  
    type: str  
    properties: Dict  
    confidence: float = 1.0 
@dataclass
class FileMetadata:
    """Stores information about each processed file"""
    file_id: str
    path: str
    filename: str
    category: str  
    file_type: str  
    size_bytes: int
    ocr_text: Optional[str] = None  
    extracted_entities: List[str] = None  
    
    def __post_init__(self):
        if self.extracted_entities is None:
            self.extracted_entities = []

#main data stores - these will hold all extracted information
entities_db = {}  
relationships_db = []  
files_db = {}  

print("Data structures ready")

Data structures ready


In [4]:
def categorize_file(path: Path) -> str:
    """Figure out what type of file this is based on its path and name"""
    path_str = str(path).lower()
    
    # Check path/filename for keywords to determine category
    if 'genealog' in path_str:
        return 'genealogy'
    elif 'album' in path_str and path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
        return 'album_page' 
    elif 'singole' in path_str or 'fotografie di famiglia' in path_str:
        return 'single_photo'
    elif 'articoli' in path_str and path.suffix.lower() == '.pdf':
        return 'newspaper'
    elif 'tag' in path_str and path.suffix.lower() == '.docx':
        return 'client_tags'  
    elif 'certificati' in path_str or 'lettere' in path_str:
        return 'document'
    elif 'commerciali' in path_str or 'pubblicitari' in path_str:
        return 'commercial'
    elif path.suffix.lower() == '.pdf':
        return 'pdf_document'
    elif path.suffix.lower() == '.docx':
        return 'word_document'
    elif path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
        return 'image'
    else:
        return 'other'

def scan_archive(root_path: str) -> pd.DataFrame:
    """Scan the entire archive folder and catalog all files"""
    root = Path(root_path)
    files_data = []
    
    # Recursively find all files in the archive
    for file_path in root.rglob('*'):
        # Skip database and text files
        if file_path.is_file() and file_path.suffix.lower() not in ['.db', '.txt']:
            category = categorize_file(file_path)
            files_data.append({
                'path': str(file_path),
                'filename': file_path.name,
                'category': category,
                'extension': file_path.suffix.lower(),
                'size_kb': file_path.stat().st_size / 1024,
                'folder': file_path.parent.name
            })
    
    return pd.DataFrame(files_data)

def select_sample_files(df: pd.DataFrame, max_files: int = 20) -> List[str]:
    """Select a representative sample of files from different categories"""
    selected = []
    
    # Priority files: make sure we get at least one of each important type
    priority = [
        ('genealogy', 2),     # Family trees
        ('client_tags', 2),   # Photo metadata
        ('album_page', 4),    # Album pages
        ('newspaper', 2),     # Historical newspapers
        ('document', 2),      # Certificates, letters
        ('single_photo', 2),  # Individual photos
        ('commercial', 1)     # Business materials
    ]
    
    # Select priority files first
    for category, count in priority:
        cat_files = df[df['category'] == category]['path'].tolist()
        for f in cat_files[:count]:
            if f not in selected:
                selected.append(f)
    
    print(f"  Selected {len(selected)} priority files.")
    
    # Fill up to max_files with random additional files
    if len(selected) < max_files:
        num_needed = max_files - len(selected)
        all_files = df['path'].tolist()
        remaining_files = [f for f in all_files if f not in selected]
        files_to_add = remaining_files[:num_needed]
        selected.extend(files_to_add)
        print(f"  Added {len(files_to_add)} additional files to reach sample limit.")
    
    return selected[:max_files]

# Run the archive scan
print("Scanning archive...")
archive_df = scan_archive(ARCHIVE_PATH)
print(f"Found {len(archive_df)} files")
print(f"\nCategory distribution:")
print(archive_df['category'].value_counts())

# select our sample files to process
sample_files = select_sample_files(archive_df, MAX_SAMPLE_FILES)
print(f"\nSelected {len(sample_files)} files for processing")

# save the full catalog for reference
archive_df.to_csv(Path(OUTPUT_DIR) / "file_catalog.csv", index=False)
print(f"Catalog saved")

Scanning archive...
Found 297 files

Category distribution:
category
album_page       184
commercial        44
document          23
newspaper         12
single_photo      10
pdf_document      10
other              6
client_tags        4
image              2
genealogy          1
word_document      1
Name: count, dtype: int64
  Selected 14 priority files.
  Added 6 additional files to reach sample limit.

Selected 20 files for processing
Catalog saved


In [5]:
# Initialize OCR engine for reading text from images
print("Initializing OCR engine...")
ocr_engine = PaddleOCR(use_angle_cls=True, lang='it') 

def extract_text_from_image(image_path: str) -> str:
    """Use OCR to extract text from an image file"""
    try:
        result = ocr_engine.ocr(image_path, cls=True)
        if result and result[0]:
            # Combine all detected text lines
            text = ' '.join([line[1][0] for line in result[0]])
            return text
        return ""
    except Exception as e:
        print(f"OCR error for {Path(image_path).name}: {str(e)[:50]}")
        return ""

def extract_text_from_pdf(pdf_path: str) -> str:
    """Extract text from PDF (works for PDFs with text layer)"""
    try:
        reader = PdfReader(pdf_path)
        # Combine text from all pages
        text = ' '.join([page.extract_text() for page in reader.pages])
        return text
    except Exception as e:
        print(f"PDF error for {Path(pdf_path).name}: {str(e)[:50]}")
        return ""

def extract_text_from_docx(docx_path: str) -> str:
    """Extract text from Word document"""
    try:
        doc = Document(docx_path)
        # Combine all paragraphs
        text = ' '.join([para.text for para in doc.paragraphs])
        return text
    except Exception as e:
        print(f"DOCX error for {Path(docx_path).name}: {str(e)[:50]}")
        return ""

print("OCR ready")

Initializing OCR engine...
[2025/11/17 18:13:39] ppocr DEBUG: Namespace(help='==SUPPRESS==', use_gpu=False, use_xpu=False, use_npu=False, ir_optim=True, use_tensorrt=False, min_subgraph_size=15, precision='fp32', gpu_mem=500, gpu_id=0, image_dir=None, page_num=0, det_algorithm='DB', det_model_dir='C:\\Users\\wel/.paddleocr/whl\\det\\en\\en_PP-OCRv3_det_infer', det_limit_side_len=960, det_limit_type='max', det_box_type='quad', det_db_thresh=0.3, det_db_box_thresh=0.6, det_db_unclip_ratio=1.5, max_batch_size=10, use_dilation=False, det_db_score_mode='fast', det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_sast_score_thresh=0.5, det_sast_nms_thresh=0.2, det_pse_thresh=0, det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, scales=[8, 16, 32], alpha=1.0, beta=1.0, fourier_degree=5, rec_algorithm='SVTR_LCNet', rec_model_dir='C:\\Users\\wel/.paddleocr/whl\\rec\\latin\\latin_PP-OCRv3_rec_infer', rec_image_inverse=True, rec_image_shape='3, 48, 320', 

In [6]:
def segment_album_page(image_path: str, output_dir: Path) -> List[str]:
    """Use computer vision to detect and extract individual photos from album pages"""
    img = cv2.imread(image_path)
    if img is None:
        return []
    
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Apply adaptive thresholding to find photo boundaries
    thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                   cv2.THRESH_BINARY_INV, 21, 10)
    
    # Morphological operations to clean up the image
    kernel = np.ones((5,5), np.uint8)
    morph = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=2)
    
    # Find contours (photo boundaries)
    contours, _ = cv2.findContours(morph, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Filter contours by size and aspect ratio
    min_area = MIN_PHOTO_SIZE * MIN_PHOTO_SIZE
    max_area = img.shape[0] * img.shape[1] * 0.8 
    
    valid_contours = []
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if min_area < area < max_area:
            x, y, w, h = cv2.boundingRect(cnt)
            aspect_ratio = w / h if h > 0 else 0
            if 0.3 < aspect_ratio < 3.0:
                valid_contours.append((x, y, w, h))
    
    # Save each detected photo segment
    output_dir.mkdir(parents=True, exist_ok=True)
    segmented_paths = []
    base_name = Path(image_path).stem
    
    for idx, (x, y, w, h) in enumerate(valid_contours):
        cropped = img[y:y+h, x:x+w]
        output_path = output_dir / f"{base_name}_seg_{idx}.jpg"
        cv2.imwrite(str(output_path), cropped)
        segmented_paths.append(str(output_path))
    
    return segmented_paths

print("CV segmentation ready")

CV segmentation ready


In [7]:
# Configure Google Gemini AI
api_key = os.environ.get('GEMINI_API_KEY')
if not api_key or api_key == 'GEMINI_API_KEY_HERE':
    raise ValueError("Please set your Gemini API key in the first cell")

genai.configure(api_key=api_key)

# Create separate models for vision and text analysis
vision_model = genai.GenerativeModel('gemini-2.5-flash-image') 
text_model = genai.GenerativeModel('gemini-2.5-pro')   

def analyze_photo_with_gemini(image_path: str) -> Dict:
    """Use Gemini Vision AI to extract information from a photo"""
    img = Image.open(image_path)
    
    # Craft a detailed prompt to guide the AI
    prompt = """Analyze this historical photograph from early 1900s Turin, Italy.
Extract and return ONLY valid JSON with this structure:
{
  "people": [{"name": "estimated name or role", "description": "appearance/context"}],
  "places": [{"name": "location name", "type": "city/building/landmark"}],
  "objects": ["list of significant objects or products visible"],
  "date_estimate": "estimated year or decade",
  "scene_description": "brief description of what's happening",
  "text_visible": "any text visible in the image"
}

Output ONLY the JSON object, no other text."""
    
    try:
        response = vision_model.generate_content([prompt, img])
        result_text = response.text.strip()
        result_text = result_text.replace('```json', '').replace('```', '').strip()
        return json.loads(result_text)
    except Exception as e:
        print(f"Vision API error: {str(e)[:50]}")
        return {"people": [], "places": [], "objects": [], "date_estimate": "", 
                "scene_description": "", "text_visible": ""}

def analyze_document_with_gemini(text: str, filename: str) -> Dict:
    """Use Gemini Text AI to extract entities from document text"""
    prompt = f"""Analyze this historical document text related to the Martiny family of Turin, Italy (early 1900s).

Document: {filename}
Text: {text[:2000]}

Extract entities and return ONLY valid JSON:
{{
  "people": [{{"name": "full name", "role": "relationship or role"}}],
  "companies": [{{"name": "company name", "type": "industry"}}],
  "places": ["location names"],
  "dates": [{{"date": "YYYY or YYYY-MM-DD", "event": "what happened"}}],
  "events": ["significant events mentioned"],
  "products": ["products or brands mentioned"]
}}

Output ONLY the JSON object, no other text."""
    
    try:
        response = text_model.generate_content(prompt)
        result_text = response.text.strip()
        result_text = result_text.replace('```json', '').replace('```', '').strip()
        return json.loads(result_text)
    except Exception as e:
        print(f"Text API error: {str(e)[:50]}")
        return {"people": [], "companies": [], "places": [], "dates": [], 
                "events": [], "products": []}

print("Gemini models ready")

Gemini models ready


In [8]:
def normalize_name(name: str) -> str:
    """Clean up entity names (remove uncertainty markers, extra spaces, etc.)"""
    # Remove "(?)" uncertainty markers from client metadata
    name = re.sub(r'\s*\(\?\)\s*', '', name)
    # Normalize whitespace
    name = re.sub(r'\s+', ' ', name).strip()
    return name

def find_similar_entity(name: str, entity_type: str, threshold: int = 85) -> Optional[str]:
    """Check if we already have a similar entity (prevents duplicates)"""
    normalized = normalize_name(name)
    
    if entity_type in ["Photo", "Document"] and ("_" in normalized or "." in normalized):
        for entity_id, entity in entities_db.items():
            if entity.type == entity_type and entity.name == normalized:
                return entity_id
        return None
    
    for entity_id, entity in entities_db.items():
        if entity.type == entity_type:
            score = fuzz.ratio(normalized.lower(), entity.name.lower())
            if score >= threshold:
                return entity_id
    return None

def add_or_merge_entity(entity_type: str, name: str, properties: Dict, 
                        source_file: str, confidence: float = 1.0) -> str:
    """Add a new entity or merge with existing one if it's a duplicate"""
    existing_id = find_similar_entity(name, entity_type)
    
    if existing_id:
        # Entity already exists - merge properties
        existing = entities_db[existing_id]
        for k, v in properties.items():
            if k not in existing.properties:
                existing.properties[k] = v
        return existing_id
    else:
        # Create new entity
        entity_id = f"{entity_type}_{len([e for e in entities_db.values() if e.type == entity_type]) + 1}"
        entity = Entity(
            id=entity_id,
            type=entity_type,
            name=normalize_name(name),
            properties=properties,
            source_file=source_file,
            confidence=confidence
        )
        entities_db[entity_id] = entity
        return entity_id

print("Entity normalization ready")

Entity normalization ready


In [9]:
import time
from pathlib import Path

def relationship_exists(source_id: str, target_id: str, rel_type: str) -> bool:
    """Check if a relationship already exists (avoid creating duplicates)"""
    for rel in relationships_db:
        if (rel.source_id == source_id and 
            rel.target_id == target_id and 
            rel.type == rel_type):
            return True
    return False

def add_relationship(source_id: str, target_id: str, rel_type: str, properties: Dict):
    """Create a new relationship between two entities"""
    if not relationship_exists(source_id, target_id, rel_type):
        relationships_db.append(Relationship(
            source_id=source_id,
            target_id=target_id,
            type=rel_type,
            properties=properties
        ))
        return True
    return False

def process_file(file_path: str, category: str) -> None:
    """Main processing function - extracts entities and relationships from a single file"""
    path = Path(file_path)
    file_id = f"FILE_{len(files_db) + 1}"
    
    print(f"\n{'='*60}")
    print(f"Processing: {path.name} ({category})")
    
    # metadata record for this file
    metadata = FileMetadata(
        file_id=file_id,
        path=str(path),
        filename=path.name,
        category=category,
        file_type=path.suffix.lower(),
        size_bytes=path.stat().st_size
    )
    
    # entity for the file itself (Document or Photo)
    file_entity_id = add_or_merge_entity(
        entity_type="Document" if category != "album_page" else "Photo",
        name=path.stem,
        properties={"category": category, "file_type": path.suffix},
        source_file=file_id
    )
    
    # Process based on file category
    if category == "album_page":
        seg_dir = Path(OUTPUT_DIR) / "segmented" / path.stem
        segmented_photos = segment_album_page(str(path), seg_dir)
        print(f"  Segmented into {len(segmented_photos)} photos")
        
        for idx, seg_path in enumerate(segmented_photos[:3], 1):
            print(f"  Analyzing segment {idx}...")
            photo_data = analyze_photo_with_gemini(seg_path)
            print(f"    Gemini response: {json.dumps(photo_data, indent=2)}")
            
            # Extract people from the photo
            for person in photo_data.get('people', []):
                person_id = add_or_merge_entity(
                    "Person", person.get('name', 'Unknown'),
                    {"description": person.get('description', '')},
                    file_id
                )
                # "appears_in" relationship
                if add_relationship(person_id, file_entity_id, "appears_in", 
                                  {"context": photo_data.get('scene_description', '')}):
                    print(f"      ✓ {person.get('name')} appears_in {path.stem}")
            
            # Extract places from the photo
            for place in photo_data.get('places', []):
                place_id = add_or_merge_entity(
                    "Place", place.get('name', ''),
                    {"type": place.get('type', '')},
                    file_id
                )
                #  "located_at" relationship
                if add_relationship(file_entity_id, place_id, "located_at", {}):
                    print(f" {path.stem} located_at {place.get('name')}")
    
    elif category in ["newspaper", "pdf_document", "genealogy"]:
        # Extract text from PDF
        text = extract_text_from_pdf(str(path))
        print(f"  PDF text extracted: {len(text)} chars")
        
        # If PDF has very little text, it's probably scanned - use OCR
        if len(text) < 100:
            print(f"  PDF has little text, trying OCR...")
            ocr_text = extract_text_from_image(str(path))
            if ocr_text:
                text = ocr_text
                print(f"  OCR extracted: {len(text)} chars")
        
        metadata.ocr_text = text[:500]  
        
        # Only process if we got meaningful text
        if len(text) > 50:
            print(f"  Calling Gemini with {len(text)} chars...")
            doc_data = analyze_document_with_gemini(text, path.name)
            print(f"  Gemini response: {json.dumps(doc_data, indent=2)}")
            
            # Extract people mentioned in the document
            for person in doc_data.get('people', []):
                person_id = add_or_merge_entity(
                    "Person", person.get('name', ''),
                    {"role": person.get('role', '')},
                    file_id
                )
                if add_relationship(file_entity_id, person_id, "mentions", {}):
                    print(f" - mentions {person.get('name')}")
            
            # Extract companies mentioned
            for company in doc_data.get('companies', []):
                company_id = add_or_merge_entity(
                    "Company", company.get('name', ''),
                    {"type": company.get('type', '')},
                    file_id
                )
                if add_relationship(file_entity_id, company_id, "mentions", {}):
                    print(f"      - mentions {company.get('name')}")
        else:
            print(f" -- Skipping - not enough text ({len(text)} chars)")
    
    elif category == "client_tags":
        # Process Word documents with photo metadata
        text = extract_text_from_docx(str(path))
        print(f"  DOCX text extracted: {len(text)} chars")
        metadata.ocr_text = text[:500]
        
        if len(text) > 50:
            print(f"  Calling Gemini...")
            doc_data = analyze_document_with_gemini(text, path.name)
            print(f"  Gemini response: {json.dumps(doc_data, indent=2)}")
            
            for person in doc_data.get('people', []):
                person_id = add_or_merge_entity(
                    "Person", person.get('name', ''),
                    {"role": person.get('role', '')},
                    file_id
                )
                add_relationship(file_entity_id, person_id, "mentions", {})
    
    elif category == "single_photo":
        # Process individual photos
        print(f"  Analyzing photo...")
        photo_data = analyze_photo_with_gemini(str(path))
        print(f"  Gemini response: {json.dumps(photo_data, indent=2)}")
        
        for person in photo_data.get('people', []):
            person_id = add_or_merge_entity(
                "Person", person.get('name', 'Unknown'),
                {"description": person.get('description', '')},
                file_id
            )
            if add_relationship(person_id, file_entity_id, "appears_in", {}):
                print(f"       {person.get('name')} appears_in {path.stem}")
    
    elif category == "document":
        # For image-based documents (scanned certificates), use OCR
        print(f"  Running OCR on document image...")
        text = extract_text_from_image(str(path))
        print(f"  OCR extracted: {len(text)} chars")
        metadata.ocr_text = text[:500]
        
        if len(text) > 50:
            doc_data = analyze_document_with_gemini(text, path.name)
            print(f"  Gemini response: {json.dumps(doc_data, indent=2)}")
    
    # Save file metadata
    files_db[file_id] = metadata
    
    # Small delay to avoid hitting API rate limits
    time.sleep(2)
    print(f"  Completed")

print("Processing pipeline ready")

Processing pipeline ready


In [10]:
def create_document_to_document_relationships():
    """Create indirect relationships between documents that share entities"""
    print("\n" + "="*60)
    print("CREATING DOCUMENT-TO-DOCUMENT RELATIONSHIPS")
    print("="*60)
    
    # Get all document and photo entities
    all_docs = [e for e in entities_db.values() if e.type in ["Document", "Photo"]]
    created_count = 0
    
    # Compare each pair of documents
    for i, doc1 in enumerate(all_docs):
        for doc2 in all_docs[i+1:]:
            
            # 1. Check if documents share people
            doc1_people = set()
            doc2_people = set()
            for rel in relationships_db:
                # Document mentions person
                if rel.source_id == doc1.id and rel.target_id.startswith("Person_"):
                    doc1_people.add(rel.target_id)
                if rel.source_id == doc2.id and rel.target_id.startswith("Person_"):
                    doc2_people.add(rel.target_id)
                # Person appears in document (reverse direction)
                if rel.target_id == doc1.id and rel.source_id.startswith("Person_"):
                    doc1_people.add(rel.source_id)
                if rel.target_id == doc2.id and rel.source_id.startswith("Person_"):
                    doc2_people.add(rel.source_id)
            
            shared_people = doc1_people & doc2_people
            if shared_people:
                person_names = [entities_db[pid].name for pid in shared_people]
                relationships_db.append(Relationship(
                    source_id=doc1.id, target_id=doc2.id, type="shares_person",
                    properties={"person_ids": list(shared_people), "reason": f"Both mention {', '.join(person_names)}"},
                    confidence=0.92
                ))
                created_count += 1
                print(f"  {doc1.name} <-> {doc2.name} (shares_person)")
            
            # 2. Check if documents share places
            doc1_places = set()
            doc2_places = set()
            for rel in relationships_db:
                if rel.source_id == doc1.id and rel.target_id.startswith("Place_"):
                    doc1_places.add(rel.target_id)
                if rel.source_id == doc2.id and rel.target_id.startswith("Place_"):
                    doc2_places.add(rel.target_id)
            
            shared_places = doc1_places & doc2_places
            if shared_places:
                place_names = [entities_db[pid].name for pid in shared_places]
                relationships_db.append(Relationship(
                    source_id=doc1.id, target_id=doc2.id, type="shares_place",
                    properties={"place_ids": list(shared_places), "reason": f"Both reference {', '.join(place_names)}"},
                    confidence=0.88
                ))
                created_count += 1
                print(f"   {doc1.name} <-> {doc2.name} (shares_place)")
            
            # 3. Check if documents share companies
            doc1_companies = set()
            doc2_companies = set()
            for rel in relationships_db:
                if rel.source_id == doc1.id and rel.target_id.startswith("Company_"):
                    doc1_companies.add(rel.target_id)
                if rel.source_id == doc2.id and rel.target_id.startswith("Company_"):
                    doc2_companies.add(rel.target_id)
            
            shared_companies = doc1_companies & doc2_companies
            if shared_companies:
                company_names = [entities_db[cid].name for cid in shared_companies]
                relationships_db.append(Relationship(
                    source_id=doc1.id, target_id=doc2.id, type="shares_company",
                    properties={"company_ids": list(shared_companies), "reason": f"Both mention {', '.join(company_names)}"},
                    confidence=0.90
                ))
                created_count += 1
                print(f"  {doc1.name} <-> {doc2.name} (shares_company)")
            
            # 4. Check if photos are from the same album
            if doc1.type == "Photo" and doc2.type == "Photo":
                # Extract album name from filename (e.g., "Album A" from "Album A_001")
                album1 = doc1.name.rsplit('_', 1)[0] if '_' in doc1.name else None
                album2 = doc2.name.rsplit('_', 1)[0] if '_' in doc2.name else None
                if album1 and album2 and album1 == album2:
                    relationships_db.append(Relationship(
                        source_id=doc1.id, target_id=doc2.id, type="same_album",
                        properties={"album_id": album1, "reason": f"Both from {album1}"},
                        confidence=1.0
                    ))
                    created_count += 1
                    print(f"  {doc1.name} <-> {doc2.name} (same_album)")
    
    print(f"\n{'='*60}")
    print(f" Created {created_count} document-to-document relationships")
    print(f"{'='*60}\n")
    return created_count

# Main processing loop - process all selected files
print("="*60)
print("STARTING PIPELINE EXECUTION")
print("="*60)

for idx, file_path in enumerate(sample_files, 1):
    category = archive_df[archive_df['path'] == file_path]['category'].values[0]
    print(f"\n[{idx}/{len(sample_files)}]", end=' ')
    
    try:
        process_file(file_path, category)
    except Exception as e:
        print(f"  Error: {str(e)[:100]}")
        continue

print("\n" + "="*60)
print("PROCESSING COMPLETE")
print("="*60)

# Create document-to-document relationships
doc_doc_count = create_document_to_document_relationships()

print(f"\nFINAL STATISTICS:")
print(f"  Entities: {len(entities_db)}")
print(f"  Relationships: {len(relationships_db)} (including {doc_doc_count} doc-to-doc)")
print(f"  Files Processed: {len(files_db)}")

STARTING PIPELINE EXECUTION

[1/20] 
Processing: 2025 - 10 - 17 alberi geneologici famiglia Martiny.pdf (genealogy)
  PDF text extracted: 2 chars
  PDF has little text, trying OCR...
[2025/11/17 18:13:54] ppocr DEBUG: dt_boxes num : 43, elapsed : 4.9650022983551025
[2025/11/17 18:13:56] ppocr DEBUG: cls num  : 43, elapsed : 1.539111852645874
[2025/11/17 18:14:09] ppocr DEBUG: rec_res num  : 43, elapsed : 13.203935146331787
[2025/11/17 18:14:11] ppocr DEBUG: dt_boxes num : 17, elapsed : 1.7567613124847412
[2025/11/17 18:14:12] ppocr DEBUG: cls num  : 17, elapsed : 0.9484336376190186
[2025/11/17 18:14:13] ppocr DEBUG: rec_res num  : 17, elapsed : 1.192706823348999
[2025/11/17 18:14:14] ppocr DEBUG: dt_boxes num : 9, elapsed : 0.4354534149169922
[2025/11/17 18:14:14] ppocr DEBUG: cls num  : 9, elapsed : 0.24342989921569824
[2025/11/17 18:14:15] ppocr DEBUG: rec_res num  : 9, elapsed : 0.7516500949859619
  OCR extracted: 290 chars
  Calling Gemini with 290 chars...
  Gemini response: {
  "

In [11]:
# Display entity distribution
entity_types = defaultdict(int)
for entity in entities_db.values():
    entity_types[entity.type] += 1

print("\nENTITY DISTRIBUTION")
print("-" * 40)
for etype, count in sorted(entity_types.items(), key=lambda x: x[1], reverse=True):
    print(f"{etype:15} : {count:3}")

# Display relationship distribution
rel_types = defaultdict(int)
for rel in relationships_db:
    rel_types[rel.type] += 1

print("\nRELATIONSHIP DISTRIBUTION")
print("-" * 40)
for rtype, count in sorted(rel_types.items(), key=lambda x: x[1], reverse=True):
    print(f"{rtype:15} : {count:3}")

# Show sample entities
print("\nSAMPLE ENTITIES")
print("-" * 40)
for entity_type in ['Person', 'Place', 'Company']:
    samples = [e for e in entities_db.values() if e.type == entity_type][:3]
    if samples:
        print(f"\n{entity_type}s:")
        for e in samples:
            print(f"  - {e.name}")


ENTITY DISTRIBUTION
----------------------------------------
Person          :  21
Photo           :  10
Document        :   9
Place           :   4
Company         :   3

RELATIONSHIP DISTRIBUTION
----------------------------------------
same_album      :  45
mentions        :  25
located_at      :   7
shares_place    :   6
shares_person   :   3
appears_in      :   1

SAMPLE ENTITIES
----------------------------------------

Persons:
  - Giovanni Martiny
  - Johann
  - Alwine

Places:
  - Turin
  - Walter Martiny Industria Gomma
  - Società Anonima Walter Martiny Industria Gomma

Companys:
  - Bender e Martiny
  - LA STAMPA
  - PHILIPS


In [12]:
def networkx_graph() -> nx.MultiDiGraph:
    """Convert our entities and relationships into a NetworkX graph"""
    G = nx.MultiDiGraph()  # graph that allows multiple edges
    
    # Add all entities as nodes
    for entity_id, entity in entities_db.items():
        G.add_node(entity_id, 
                   label=entity.name,
                   entity_type=entity.type,
                   **entity.properties)
    
    # Add all relationships as edges
    for rel in relationships_db:
        if rel.source_id in G.nodes and rel.target_id in G.nodes:
            G.add_edge(rel.source_id, rel.target_id,
                      relation_type=rel.type,
                      **rel.properties)
    
    return G

# Build the knowledge graph
kg_graph = networkx_graph()

print(f"\nKnowledge Graph Built")
print(f"  Nodes: {kg_graph.number_of_nodes()}")
print(f"  Edges: {kg_graph.number_of_edges()}")
print(f"  Connected components: {nx.number_weakly_connected_components(kg_graph)}")


Knowledge Graph Built
  Nodes: 47
  Edges: 87
  Connected components: 8


In [13]:
def create_interactive_visualization(graph: nx.MultiDiGraph, output_path: str) -> None:
    """Create an interactive HTML visualization of the knowledge graph"""
    net = Network(height="900px", width="100%", directed=True, notebook=False, 
                  bgcolor="#ffffff", font_color="#000000")
    
    # Configure physics for better layout
    net.barnes_hut(
        gravity=-80000,
        central_gravity=0.3,
        spring_length=200,
        spring_strength=0.001,
        damping=0.09,
        overlap=0
    )
    
    # Color scheme for different entity types
    colors = {
        'Person': '#FF6B6B',   
        'Place': '#4ECDC4',  
        'Company': '#45B7D1',   
        'Document': '#96CEB4',   
        'Photo': '#FFEAA7',       
        'Event': '#DDA15E',       
        'Product': '#BC6C25'     
    }
    
    # Size scheme for different entity types
    sizes = {
        'Person': 30,
        'Company': 25,
        'Place': 20,
        'Document': 15,
        'Photo': 15,
        'Event': 20,
        'Product': 18
    }
    
    # Add nodes to the visualization
    for node_id, node_data in graph.nodes(data=True):
        entity_type = node_data.get('entity_type', 'Document')
        color = colors.get(entity_type, '#95a5a6')
        size = sizes.get(entity_type, 15)
        label = node_data.get('label', node_id)
        
        # Create tooltip with entity properties
        properties_str = ""
        for key, val in node_data.items():
            if key not in ['label', 'entity_type'] and val:
                properties_str += f"<br><b>{key}:</b> {val}"
        
        title = f"<b>{entity_type}: {label}</b>{properties_str}"
        
        net.add_node(
            node_id, 
            label=label,
            color=color,
            title=title,
            size=size,
            font={'size': 14, 'face': 'arial', 'color': '#000000'},
            shape='dot',
            borderWidth=2,
            borderWidthSelected=4
        )
    
    # Add edges to the visualization
    for source, target, edge_data in graph.edges(data=True):
        relation_type = edge_data.get('relation_type', 'related_to')
        
        # Create tooltip with edge properties
        props_str = ""
        for key, val in edge_data.items():
            if key != 'relation_type' and val:
                props_str += f"<br>{key}: {val}"
        
        edge_title = f"<b>{relation_type}</b>{props_str}"
        
        net.add_edge(
            source, 
            target,
            title=edge_title,
            label=relation_type,
            arrows='to',
            font={'size': 10, 'align': 'middle'},
            color={'color': '#848484', 'highlight': '#FF0000'},
            width=2,
            smooth={'type': 'dynamic'}
        )
    
    # Calculate statistics for the summary panel
    entity_counts = defaultdict(int)
    for _, node_data in graph.nodes(data=True):
        entity_counts[node_data.get('entity_type', 'Unknown')] += 1
    
    relation_counts = defaultdict(int)
    for _, _, edge_data in graph.edges(data=True):
        relation_counts[edge_data.get('relation_type', 'unknown')] += 1
    
    # Create HTML summary panel
    summary_html = f"""
    <div style="padding: 20px; background-color: #f8f9fa; border-bottom: 2px solid #dee2e6;">
        <h1 style="text-align: center; color: #333;">Martiny Family Archive - Knowledge Graph</h1>
        
        <div style="display: flex; justify-content: space-around; margin-top: 20px;">
            <div style="text-align: center;">
                <h3 style="color: #666;">Graph Statistics</h3>
                <p><strong>Total Nodes:</strong> {graph.number_of_nodes()}</p>
                <p><strong>Total Relationships:</strong> {graph.number_of_edges()}</p>
                <p><strong>Connected Components:</strong> {nx.number_weakly_connected_components(graph)}</p>
            </div>
            
            <div style="text-align: center;">
                <h3 style="color: #666;">Entity Types</h3>
                {''.join([f'<p><span style="color: {colors.get(etype, "#999")};">●</span> <strong>{etype}:</strong> {count}</p>' 
                          for etype, count in sorted(entity_counts.items(), key=lambda x: x[1], reverse=True)])}
            </div>
            
            <div style="text-align: center;">
                <h3 style="color: #666;">Relationship Types</h3>
                {''.join([f'<p><strong>{rtype}:</strong> {count}</p>' 
                          for rtype, count in sorted(relation_counts.items(), key=lambda x: x[1], reverse=True)])}
            </div>
        </div>
        
        <div style="text-align: center; margin-top: 15px; padding: 10px; background-color: #e9ecef; border-radius: 5px;">
            <p style="margin: 5px;"><strong>Controls:</strong> 
            Drag nodes to reposition | Scroll to zoom | Click and drag background to pan | Hover over nodes/edges for details</p>
        </div>
    </div>
    """
    
    # Save the graph
    net.save_graph(output_path)
    
    # Inject our custom summary HTML into the generated file
    with open(output_path, 'r', encoding='utf-8') as f:
        html_content = f.read()
    
    html_content = html_content.replace(
        '<div class="card" style="width: 100%">',
        f'{summary_html}<div class="card" style="width: 100%">'
    )
    
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    print(f"Enhanced visualization saved: {output_path}")

# Create the visualization
viz_path = str(Path(OUTPUT_DIR) / "knowledge_graph.html")
create_interactive_visualization(kg_graph, viz_path)
print(f"\nOpen {viz_path} in your browser")

# Print entity type summary
print(f"\nGraph Summary:")
print(f"  - People: {sum(1 for _, d in kg_graph.nodes(data=True) if d.get('entity_type') == 'Person')}")
print(f"  - Companies: {sum(1 for _, d in kg_graph.nodes(data=True) if d.get('entity_type') == 'Company')}")
print(f"  - Documents: {sum(1 for _, d in kg_graph.nodes(data=True) if d.get('entity_type') == 'Document')}")
print(f"  - Photos: {sum(1 for _, d in kg_graph.nodes(data=True) if d.get('entity_type') == 'Photo')}")

Enhanced visualization saved: C:\Users\wel\Desktop\opvn\outputs\knowledge_graph.html

Open C:\Users\wel\Desktop\opvn\outputs\knowledge_graph.html in your browser

Graph Summary:
  - People: 21
  - Companies: 3
  - Documents: 9
  - Photos: 10


In [14]:
# Export entities
entities_export = [asdict(e) for e in entities_db.values()]
with open(Path(OUTPUT_DIR) / "entities.json", 'w', encoding='utf-8') as f:
    json.dump(entities_export, f, indent=2, ensure_ascii=False)
print(f"Exported {len(entities_export)} entities to entities.json")

# Export relationships
relationships_export = [asdict(r) for r in relationships_db]
with open(Path(OUTPUT_DIR) / "relationships.json", 'w', encoding='utf-8') as f:
    json.dump(relationships_export, f, indent=2, ensure_ascii=False)
print(f"Exported {len(relationships_export)} relationships to relationships.json")

# Export file metadata
files_export = [asdict(f) for f in files_db.values()]
with open(Path(OUTPUT_DIR) / "file_metadata.json", 'w', encoding='utf-8') as f:
    json.dump(files_export, f, indent=2, ensure_ascii=False)
print(f"Exported {len(files_export)} file records to file_metadata.json")

# Export comprehensive statistics
stats = {
    "total_entities": len(entities_db),
    "total_relationships": len(relationships_db),
    "total_files_processed": len(files_db),
    "entity_types": dict(entity_types),
    "relationship_types": dict(rel_types),
    "graph_stats": {
        "nodes": kg_graph.number_of_nodes(),
        "edges": kg_graph.number_of_edges(),
        "density": nx.density(kg_graph),
        "connected_components": nx.number_weakly_connected_components(kg_graph)
    }
}

with open(Path(OUTPUT_DIR) / "statistics.json", 'w', encoding='utf-8') as f:
    json.dump(stats, f, indent=2)
print(f"Exported statistics to statistics.json")

Exported 47 entities to entities.json
Exported 87 relationships to relationships.json
Exported 19 file records to file_metadata.json
Exported statistics to statistics.json


In [15]:
def validate_knowledge_graph() -> Dict:
    """Run quality checks on the generated knowledge graph"""
    validation = {
        "extraction_quality": {},
        "graph_structure": {},
        "data_quality": {},
        "issues": []
    }
    
    # Check for orphaned nodes 
    orphaned = [n for n in kg_graph.nodes() if kg_graph.degree(n) == 0]
    validation["graph_structure"]["orphaned_nodes"] = len(orphaned)
    if len(orphaned) > 0:
        validation["issues"].append(f"{len(orphaned)} orphaned nodes found")
    
    # Check entity completeness (entities should have properties)
    entities_with_props = sum(1 for e in entities_db.values() if e.properties)
    validation["data_quality"]["entities_with_properties"] = entities_with_props
    validation["data_quality"]["completeness_ratio"] = entities_with_props / len(entities_db)
    
    # Check for potential duplicates
    names = [e.name for e in entities_db.values()]
    duplicates = len(names) - len(set(names))
    validation["data_quality"]["potential_duplicates"] = duplicates
    
    # Check file processing success rate
    files_with_entities = sum(1 for f in files_db.values() if f.extracted_entities)
    validation["extraction_quality"]["files_processed"] = len(files_db)
    validation["extraction_quality"]["files_with_entities"] = files_with_entities
    
    # Overall status
    validation["status"] = "PASS" if len(validation["issues"]) < 3 else "WARNING"
    
    return validation

# Run validation
validation_report = validate_knowledge_graph()

print("\nVALIDATION REPORT")
print("="*60)
print(f"Status: {validation_report['status']}")

print(f"\nGraph Structure:")
for k, v in validation_report['graph_structure'].items():
    print(f"  {k}: {v}")

print(f"\nData Quality:")
for k, v in validation_report['data_quality'].items():
    print(f"  {k}: {v}")

print(f"\nExtraction Quality:")
for k, v in validation_report['extraction_quality'].items():
    print(f"  {k}: {v}")

if validation_report['issues']:
    print(f"\nIssues Found:")
    for issue in validation_report['issues']:
        print(f"  - {issue}")

# Save validation report
with open(Path(OUTPUT_DIR) / "validation_report.json", 'w') as f:
    json.dump(validation_report, f, indent=2)

print(f"\nValidation report saved")
print(f"\nAll outputs saved to: {OUTPUT_DIR}")
print("\nPipeline complete.")


VALIDATION REPORT
Status: PASS

Graph Structure:
  orphaned_nodes: 4

Data Quality:
  entities_with_properties: 47
  completeness_ratio: 1.0
  potential_duplicates: 0

Extraction Quality:
  files_processed: 19
  files_with_entities: 0

Issues Found:
  - 4 orphaned nodes found

Validation report saved

All outputs saved to: C:\Users\wel\Desktop\opvn\outputs

Pipeline complete.
