# PyDI Data Integration Tutorial
## Recreating the WInte.r Framework Tutorial in Python

This tutorial demonstrates comprehensive data integration using PyDI, inspired by the classic WInte.r framework tutorial. We'll work with movie datasets to showcase the complete data integration pipeline.

### What You'll Learn

1. **Data Loading & Profiling**: Load and analyze movie datasets with provenance tracking
2. **Identity Resolution**: 
   - Advanced blocking strategies (Standard, Sorted Neighbourhood, Token-based, Embedding-based)
   - Multi-attribute similarity matching with custom comparators
   - Machine learning-based entity matching
3. **Data Fusion**: 
   - Conflict resolution with custom fusion rules
   - Quality assessment against gold standards
   - Provenance tracking and trust management
4. **Advanced Techniques**: 
   - Semantic similarity with embeddings
   - Performance optimization and scalability
   - End-to-end pipeline integration

### Datasets

We'll use three movie datasets:
- **Academy Awards**: Movies with Oscar information (4,592 records)
- **Actors**: Movies with actor details (149 records) 
- **Golden Globes**: Movies with Golden Globe awards (2,286 records)

These datasets contain overlapping movie information but with different attributes, data quality issues, and conflicting values - perfect for demonstrating real-world data integration challenges.

## Setup and Imports

Let's start by importing PyDI components and setting up our environment.

In [1]:
# Install the PyDI package if not already installed
# First navigate to the root directory of the repository in your terminal, then run:
# !pip install -e .

In [2]:
# Core Python libraries
import pandas as pd
import numpy as np
from pathlib import Path
import logging
import time
import json
from datetime import datetime

# PyDI imports for data loading and profiling
from PyDI.io import load_xml, load_csv
from PyDI.profiling import DataProfiler

# PyDI imports for entity matching
from PyDI.entitymatching import (
    # Blocking strategies
    NoBlocking, StandardBlocking, SortedNeighbourhood, 
    TokenBlocking, EmbeddingBlocking,
    # Matchers
    RuleBasedMatcher, MLBasedMatcher,
    # Comparators
    StringComparator, DateComparator, NumericComparator,
    # Evaluation - NEW: Separate methods for blocking and matching evaluation
    EntityMatchingEvaluator,
    # Utilities
    ensure_record_ids
)

# PyDI imports for data fusion
from PyDI.fusion import (
    DataFusionEngine, DataFusionStrategy, DataFusionEvaluator,
    # Fusion rules
    longest_string, shortest_string, most_recent, earliest,
    average, median, maximum, minimum, most_complete,
    union, intersection, voting,
    # Convenient aliases
    LONGEST, SHORTEST, LATEST, EARLIEST, AVG, MAX, MIN, VOTE, UNION,
    # Analysis and reporting
    FusionReport, FusionQualityMetrics, ProvenanceTracker,
    build_record_groups_from_correspondences,
    analyze_attribute_coverage
)

# Setup paths
def get_repo_root():
    """Get repository root directory."""
    current = Path.cwd()
    while current != current.parent:
        if (current / 'pyproject.toml').exists():
            return current
        current = current.parent
    return Path.cwd()

ROOT = get_repo_root()
OUTPUT_DIR = ROOT / "output" / "tutorial"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Check if embeddings are available
try:
    from sentence_transformers import SentenceTransformer
    use_embeddings = True
    print("🧠 Embedding models available")
except ImportError:
    use_embeddings = False
    print("⚠️  Embedding models not available (install sentence-transformers)")

print(f"PyDI Tutorial")
print(f"Repository root: {ROOT}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"All systems ready! 🚀")

🧠 Embedding models available
PyDI Tutorial
Repository root: c:\Users\Ralph\dev\pydi
Output directory: c:\Users\Ralph\dev\pydi\output\tutorial
All systems ready! 🚀


## Part 1: Data Loading and Profiling

PyDI provides provenance-aware data loading that automatically tracks dataset metadata and adds unique identifiers. Let's load our movie datasets and understand their characteristics.

In [3]:
# Define dataset paths
DATA_DIR = ROOT / "input" / "movies"

print("=== Loading Movie Datasets ===")
print("PyDI provides provenance-aware loading with automatic ID generation.\n")

# Load Academy Awards dataset
academy_awards = load_xml(
    DATA_DIR / "entitymatching" / "data" / "academy_awards.xml",
    name="academy_awards",
    record_tag="movie",
    add_index=True,
    index_column_name="_id"
)

# Load Actors dataset  
actors = load_xml(
    DATA_DIR / "entitymatching" / "data" / "actors.xml",
    name="actors", 
    record_tag="movie",
    add_index=True,
    index_column_name="_id"
)

# Load Golden Globes dataset
golden_globes = load_xml(
    DATA_DIR / "fusion" / "data" / "golden_globes.xml",
    name="golden_globes",
    record_tag="movie", 
    add_index=True,
    index_column_name="_id"
)

# Display basic information
datasets = [academy_awards, actors, golden_globes]
names = ["Academy Awards", "Actors", "Golden Globes"]

for df, name in zip(datasets, names):
    print(f"{name}:")
    print(f"  Records: {len(df):,}")
    print(f"  Attributes: {len(df.columns)}")
    print(f"  Columns: {list(df.columns)}")
    print(f"  Dataset name: {df.attrs.get('dataset_name', 'unknown')}")
    print()

total_records = sum(len(df) for df in datasets)
print(f"Total records across all datasets: {total_records:,}")

=== Loading Movie Datasets ===
PyDI provides provenance-aware loading with automatic ID generation.

Academy Awards:
  Records: 4,592
  Attributes: 7
  Columns: ['_id', 'id', 'title', 'actor_name', 'date', 'director_name', 'oscar']
  Dataset name: academy_awards

Actors:
  Records: 149
  Attributes: 7
  Columns: ['_id', 'id', 'title', 'actor_name', 'actors_actor_birthday', 'actors_actor_birthplace', 'date']
  Dataset name: actors

Golden Globes:
  Records: 2,286
  Attributes: 7
  Columns: ['_id', 'id', 'title', 'actor_name', 'date', 'director_name', 'globe']
  Dataset name: golden_globes

Total records across all datasets: 7,027


In [4]:
# Preview the data structure
print("=== Dataset Previews ===")

print("\n📽️ Academy Awards Dataset:")
display(academy_awards.head(3))

print("\n🎭 Actors Dataset:")
display(actors.head(3))

print("\n🏆 Golden Globes Dataset:")
display(golden_globes.head(3))

=== Dataset Previews ===

📽️ Academy Awards Dataset:


Unnamed: 0,_id,id,title,actor_name,date,director_name,oscar
0,academy_awards-0000,academy_awards_1,Biutiful,Javier Bardem,2010-01-01,,
1,academy_awards-0001,academy_awards_2,True Grit,Jeff Bridges,2010-01-01,Joel Coen,
2,academy_awards-0002,academy_awards_2,True Grit,Jeff Bridges,2010-01-01,Ethan Coen,



🎭 Actors Dataset:


Unnamed: 0,_id,id,title,actor_name,actors_actor_birthday,actors_actor_birthplace,date
0,actors-0000,actors_1,7th Heaven,Janet Gaynor,1906-01-01,Pennsylvania,1929-01-01
1,actors-0001,actors_2,Coquette,Mary Pickford,1892-01-01,Canada,1930-01-01
2,actors-0002,actors_3,The Divorcee,Norma Shearer,1902-01-01,Canada,1931-01-01



🏆 Golden Globes Dataset:


Unnamed: 0,_id,id,title,actor_name,date,director_name,globe
0,golden_globes-0000,golden_globes_1,Frankie and Alice,Halle Berry,2011-01-01,,
1,golden_globes-0001,golden_globes_2,Rabbit Hole,Nicole Kidman,2011-01-01,,
2,golden_globes-0002,golden_globes_3,Winter's Bone,Jennifer Lawrence,2011-01-01,,


### Data Quality Analysis

Let's use PyDI's profiling capabilities to understand our data quality and identify the best attributes for matching.

In [5]:
# Initialize PyDI's data profiler
profiler = DataProfiler()

print("=== Data Quality Analysis ===")
print("Using PyDI's integrated profiling to understand our datasets...\n")

# Generate comprehensive quality analysis
coverage_analysis = analyze_attribute_coverage(
    datasets=datasets,
    dataset_names=names,
    include_samples=True
)

print("Attribute Coverage Analysis:")
display_cols = ['attribute'] + [f'{name}_count' for name in names] + [f'{name}_pct' for name in names]
available_cols = [col for col in display_cols if col in coverage_analysis.columns]
display(coverage_analysis[available_cols])

# Identify common attributes for matching
common_attrs = set(academy_awards.columns) & set(actors.columns) & set(golden_globes.columns)
common_attrs.discard('_id')  # Remove PyDI's internal ID

print(f"\n🔗 Common attributes for matching: {sorted(common_attrs)}")

# Analyze attribute completeness
print("\n📊 Attribute Completeness:")
for attr in sorted(common_attrs):
    completeness = []
    for df, name in zip(datasets, names):
        if attr in df.columns:
            complete_pct = (df[attr].notna().sum() / len(df)) * 100
            completeness.append(f"{name}: {complete_pct:.1f}%")
    print(f"  {attr}: {', '.join(completeness)}")

=== Data Quality Analysis ===
Using PyDI's integrated profiling to understand our datasets...

Attribute Coverage Analysis:


Unnamed: 0,attribute,Academy Awards_count,Actors_count,Golden Globes_count,Academy Awards_pct,Actors_pct,Golden Globes_pct
0,_id,4592/4592,149/149,2286/2286,100.0%,100.0%,100.0%
1,actor_name,1057/4592,149/149,2232/2286,23.0%,100.0%,97.6%
2,actors_actor_birthday,0/0,149/149,0/0,0%,100.0%,0%
3,actors_actor_birthplace,0/0,149/149,0/0,0%,100.0%,0%
4,date,4592/4592,149/149,2286/2286,100.0%,100.0%,100.0%
5,director_name,420/4592,0/0,320/2286,9.1%,0%,14.0%
6,globe,0/0,0/0,625/2286,0%,0%,27.3%
7,id,4592/4592,149/149,2286/2286,100.0%,100.0%,100.0%
8,oscar,1275/4592,0/0,0/0,27.8%,0%,0%
9,title,4580/4592,149/149,2286/2286,99.7%,100.0%,100.0%



🔗 Common attributes for matching: ['actor_name', 'date', 'id', 'title']

📊 Attribute Completeness:
  actor_name: Academy Awards: 23.0%, Actors: 100.0%, Golden Globes: 97.6%
  date: Academy Awards: 100.0%, Actors: 100.0%, Golden Globes: 100.0%
  id: Academy Awards: 100.0%, Actors: 100.0%, Golden Globes: 100.0%
  title: Academy Awards: 99.7%, Actors: 100.0%, Golden Globes: 100.0%


In [6]:
# Generate detailed profiling reports (optional - creates HTML reports)
print("=== Generating Detailed Profiling Reports ===")

profiling_dir = OUTPUT_DIR / "profiling"
profiling_dir.mkdir(exist_ok=True)

# Generate comprehensive HTML reports
for df, name in zip(datasets, names):
    profile_path = profiler.profile(df, str(profiling_dir))
    print(f"✅ {name} profile: {profile_path}")
    
print(f"\n📁 Detailed profiling reports saved to: {profiling_dir}")
print("💡 Open these HTML files to explore interactive data profiles!")

=== Generating Detailed Profiling Reports ===


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 7/7 [00:00<00:00, 88.61it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Academy Awards profile: c:\Users\Ralph\dev\pydi\output\tutorial\profiling\academy_awards_profile.html


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 7/7 [00:00<00:00, 233.34it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Actors profile: c:\Users\Ralph\dev\pydi\output\tutorial\profiling\actors_profile.html


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 7/7 [00:00<00:00, 118.65it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Golden Globes profile: c:\Users\Ralph\dev\pydi\output\tutorial\profiling\golden_globes_profile.html

📁 Detailed profiling reports saved to: c:\Users\Ralph\dev\pydi\output\tutorial\profiling
💡 Open these HTML files to explore interactive data profiles!


## Part 2: Identity Resolution (Entity Matching)

Identity Resolution is the process of identifying records that refer to the same real-world entity. PyDI provides comprehensive blocking and matching capabilities.

### Step 1: Blocking Strategies

Blocking reduces the number of comparisons from O(n²) to a manageable subset. Let's explore different blocking strategies.

In [7]:
print("=== Identity Resolution: Blocking Strategies ===")
print("Blocking reduces comparisons from full Cartesian product to manageable candidates.\n")

# We'll focus on Academy Awards vs Actors for entity matching
left_df = academy_awards
right_df = actors

max_pairs = len(left_df) * len(right_df)
print(f"Without blocking: {max_pairs:,} comparisons required")
print(f"Memory estimate: {max_pairs * 16 / 1024**2:.1f} MB")
print("\n🎯 Goal: Reduce comparisons while maintaining high recall\n")

# Ensure datasets have proper IDs for matching
left_df = ensure_record_ids(left_df)
right_df = ensure_record_ids(right_df)

blocking_results = []

print("Testing different blocking strategies...")

=== Identity Resolution: Blocking Strategies ===
Blocking reduces comparisons from full Cartesian product to manageable candidates.

Without blocking: 684,208 comparisons required
Memory estimate: 10.4 MB

🎯 Goal: Reduce comparisons while maintaining high recall

Testing different blocking strategies...


In [8]:
# 1. Standard Blocking - First 3 characters of title
print("\n1️⃣ Standard Blocking (First 3 Characters of Title)")

# Add title_prefix directly to the original dataframes
academy_awards['title_prefix'] = academy_awards['title'].astype(str).str[:3]
actors['title_prefix'] = actors['title'].astype(str).str[:3]

standard_blocker = StandardBlocking(
    academy_awards, actors,
    on=['title_prefix'],  # Block on first 3 characters of title
    batch_size=1000
)

start_time = time.time()
standard_candidates = []
for batch in standard_blocker:
    standard_candidates.extend(batch.to_dict('records'))
    
standard_time = time.time() - start_time
reduction_ratio = len(standard_candidates) / max_pairs

print(f"  Generated: {len(standard_candidates):,} candidates")
print(f"  Reduction: {(1-reduction_ratio)*100:.1f}% ({reduction_ratio:.4f} ratio)")
print(f"  Time: {standard_time:.3f} seconds")

blocking_results.append({
    'strategy': 'StandardBlocking',
    'candidates': len(standard_candidates),
    'reduction_ratio': reduction_ratio,
    'time_seconds': standard_time
})


1️⃣ Standard Blocking (First 3 Characters of Title)
  Generated: 34,457 candidates
  Reduction: 95.0% (0.0504 ratio)
  Time: 0.059 seconds


In [9]:
# 2. Sorted Neighbourhood - Sequential similarity
print("\n2️⃣ Sorted Neighbourhood Blocking (Title-based, Window=5)")

sn_blocker = SortedNeighbourhood(
    academy_awards, actors,
    key='title',  # Sort by title
    window=10,     # Compare with 5 neighbors
    batch_size=1000
)

start_time = time.time()
sn_candidates = []
for batch in sn_blocker:
    sn_candidates.extend(batch.to_dict('records'))
    
sn_time = time.time() - start_time
reduction_ratio = len(sn_candidates) / max_pairs

print(f"  Generated: {len(sn_candidates):,} candidates")
print(f"  Reduction: {(1-reduction_ratio)*100:.1f}% ({reduction_ratio:.4f} ratio)")
print(f"  Time: {sn_time:.3f} seconds")

blocking_results.append({
    'strategy': 'SortedNeighbourhood', 
    'candidates': len(sn_candidates),
    'reduction_ratio': reduction_ratio,
    'time_seconds': sn_time
})


2️⃣ Sorted Neighbourhood Blocking (Title-based, Window=5)
  Generated: 2,906 candidates
  Reduction: 99.6% (0.0042 ratio)
  Time: 0.006 seconds


In [10]:
# 3. Token Blocking - Token-based similarity
print("\n3️⃣ Token Blocking (Title Tokens, Min Length=2)")

token_blocker = TokenBlocking(
    academy_awards, actors,
    column='title',      # Tokenize titles
    min_token_len=2,     # Ignore very short tokens
    batch_size=1000
)

start_time = time.time()
token_candidates = []
batch_count = 0

# Token blocking can generate many candidates, so we'll limit processing
for batch in token_blocker:
    batch_count += 1
    token_candidates.extend(batch.to_dict('records'))
        
token_time = time.time() - start_time
reduction_ratio = len(token_candidates) / max_pairs

print(f"  Generated: {len(token_candidates):,} candidates")
print(f"  Reduction: {(1-reduction_ratio)*100:.1f}% ({reduction_ratio:.4f} ratio)")
print(f"  Time: {token_time:.3f} seconds")

blocking_results.append({
    'strategy': 'TokenBlocking',
    'candidates': len(token_candidates),
    'reduction_ratio': reduction_ratio, 
    'time_seconds': token_time
})


3️⃣ Token Blocking (Title Tokens, Min Length=2)
  Generated: 75,242 candidates
  Reduction: 89.0% (0.1100 ratio)
  Time: 0.133 seconds


In [11]:
# 4. Embedding Blocking - Semantic similarity (Advanced)
print("\n4️⃣ Embedding Blocking (Semantic Similarity)")
print("Using neural embeddings for semantic movie matching...")

embedding_blocker = EmbeddingBlocking(
    academy_awards, actors,
    text_cols=['title'],
    model="sentence-transformers/all-MiniLM-L6-v2",
    index_backend="sklearn",
    top_k=10,          # Top 10 most similar
    threshold=0.5,     # Similarity threshold
    batch_size=500
)

start_time = time.time()
embedding_candidates = []
for batch in embedding_blocker:
    embedding_candidates.extend(batch.to_dict('records'))
    
embedding_time = time.time() - start_time
reduction_ratio = len(embedding_candidates) / max_pairs

print(f"  Generated: {len(embedding_candidates):,} candidates")
print(f"  Reduction: {(1-reduction_ratio)*100:.1f}% ({reduction_ratio:.4f} ratio)")
print(f"  Time: {embedding_time:.3f} seconds")
print("  🧠 Semantic matching can find similar movies with different titles!")

blocking_results.append({
    'strategy': 'EmbeddingBlocking',
    'candidates': len(embedding_candidates),
    'reduction_ratio': reduction_ratio,
    'time_seconds': embedding_time
})


4️⃣ Embedding Blocking (Semantic Similarity)
Using neural embeddings for semantic movie matching...
  Generated: 1,030 candidates
  Reduction: 99.8% (0.0015 ratio)
  Time: 3.439 seconds
  🧠 Semantic matching can find similar movies with different titles!


In [12]:
# Showcase EntityMatchingEvaluator.evaluate_blocking utility
print("\n=== Blocking Evaluation with EntityMatchingEvaluator ===")

# Load test set with proper _id format
test_gt = load_csv(
    DATA_DIR / "entitymatching" / "splits" / "academy_awards_2_actors_test.csv",
    name="test_converted", header=None, names=['id1', 'id2', 'label'], add_index=False
)

# Use EntityMatchingEvaluator.evaluate_blocking on Standard Blocking
candidates_df = pd.DataFrame(standard_candidates)
total_pairs = len(academy_awards) * len(actors)

results = EntityMatchingEvaluator.evaluate_blocking(
    candidate_pairs=candidates_df[['id1', 'id2']],
    test_pairs=test_gt,
    total_possible_pairs=total_pairs
)

print(f"📊 Standard Blocking Results:")
print(f"  Pair Completeness: {results['pair_completeness']:.3f}")
# Note: 
print(f"  Pair Quality:      {results['pair_quality']:.3f}")  
print(f"  Reduction Ratio:   {results['reduction_ratio']:.3f}")
print(f"  True Matches Found: {results['true_positives_found']}/{results['total_true_pairs']}")

print(f"\n💡 Evaluating pair quality only makes sense if the test set contains all possible pairs, which is not the case in this example!")


=== Blocking Evaluation with EntityMatchingEvaluator ===
📊 Standard Blocking Results:
  Pair Completeness: 0.979
  Pair Quality:      0.001
  Reduction Ratio:   0.950
  True Matches Found: 46/47

💡 Evaluating pair quality only makes sense if the test set contains all possible pairs, which is not the case in this example!


In [16]:
# Evaluate all blocking methods and select the best one based on pair completeness
print("=== Selecting Best Blocking Method ===")

# Evaluate all blocking strategies
blocking_methods = {
    'Standard': (standard_candidates, standard_time),
    'SortedNeighbourhood': (sn_candidates, sn_time), 
    'Token': (token_candidates, token_time),
    'Embedding': (embedding_candidates, embedding_time)
}

best_method = None
best_completeness = -1
best_reduction = -1
results_summary = []

for method, (candidates, time_taken) in blocking_methods.items():
    candidates_df = pd.DataFrame(candidates)
    eval_results = EntityMatchingEvaluator.evaluate_blocking(
        candidate_pairs=candidates_df[['id1', 'id2']],
        test_pairs=test_gt,
        total_possible_pairs=total_pairs
    )
    
    completeness = eval_results['pair_completeness']
    reduction = eval_results['reduction_ratio']
    
    results_summary.append({
        'Method': method,
        'Candidates': len(candidates),
        'Completeness': f"{completeness:.3f}",
        'Reduction': f"{reduction:.3f}",
        'Time (s)': f"{time_taken:.3f}"
    })
    
    # Select best: highest completeness, then highest reduction ratio (if tie)
    if (completeness > best_completeness or 
        (completeness == best_completeness and reduction > best_reduction)):
        best_completeness = completeness
        best_reduction = reduction
        best_method = method

# Display results
print("📊 Blocking Method Comparison:")
display(pd.DataFrame(results_summary))

# Select best candidates
best_candidates = blocking_methods[best_method][0]
print(f"\n🏆 Best Method: {best_method} (Completeness: {best_completeness:.3f}, Reduction: {best_reduction:.3f})")
print(f"✅ Using {len(best_candidates):,} candidate pairs for matching")

=== Selecting Best Blocking Method ===
📊 Blocking Method Comparison:


Unnamed: 0,Method,Candidates,Completeness,Reduction,Time (s)
0,Standard,34457,0.979,0.95,0.059
1,SortedNeighbourhood,2906,0.979,0.996,0.006
2,Token,75242,1.0,0.89,0.133
3,Embedding,1030,1.0,0.998,3.439



🏆 Best Method: Embedding (Completeness: 1.000, Reduction: 0.998)
✅ Using 1,030 candidate pairs for matching


### Step 2: Entity Matching with Comparators

Now we'll use PyDI's matching capabilities to find duplicate movies using multiple attribute comparisons.

In [13]:
print("=== Entity Matching with Multi-Attribute Comparators ===")

# Create comparators for different attributes
comparators = [
    # Title similarity - most important for movies
    StringComparator(
        column='title',
        similarity_function='jaro_winkler',  # Good for movie titles
        preprocess=str.lower  # Case normalization
    ),
    
    # Date proximity - movies from same year likely same film
    DateComparator(
        column='date', 
        max_days_difference=365  # Allow 1 year difference
    ),
    
    # Actor name similarity - supporting evidence
    StringComparator(
        column='actor_name',
        similarity_function='cosine',  # Good for names
        preprocess=str.lower
    )
]

# Define attribute weights
weights = [0.6, 0.25, 0.15]  # Title most important, then date, then actor

print(f"Comparators configured:")
for i, (comp, weight) in enumerate(zip(comparators, weights)):
    attr = comp.column if hasattr(comp, 'column') else 'custom'
    func = comp.similarity_function if hasattr(comp, 'similarity_function') else 'custom'
    print(f"  {i+1}. {attr} ({func}) - weight: {weight}")

print(f"\n🎯 Total comparators: {len(comparators)}")
print(f"📊 Weights sum to: {sum(weights)} (should be 1.0)")

=== Entity Matching with Multi-Attribute Comparators ===
Comparators configured:
  1. title (jaro_winkler) - weight: 0.6
  2. date (custom) - weight: 0.25
  3. actor_name (cosine) - weight: 0.15

🎯 Total comparators: 3
📊 Weights sum to: 1.0 (should be 1.0)


In [14]:
# Initialize Rule-Based Matcher
matcher = RuleBasedMatcher()

print("\n=== Performing Entity Matching ===")
print(f"Candidate pairs to evaluate: {len(best_candidates):,}")
print("Applying multi-attribute matching rules...\n")

candidates_df = pd.DataFrame(best_candidates)

# Test different thresholds
thresholds = [0.5, 0.6, 0.7, 0.8]
threshold_results = []

for threshold in thresholds:
    start_time = time.time()
    
    matches = matcher.match(
        df_left=left_df,
        df_right=right_df, 
        candidates=[candidates_df],
        comparators=comparators,
        weights=weights,
        threshold=threshold
    )
    
    matching_time = time.time() - start_time
    
    print(f"Threshold {threshold}: {len(matches):,} matches found in {matching_time:.3f}s")
    
    threshold_results.append({
        'threshold': threshold,
        'matches': len(matches),
        'time_seconds': matching_time
    })
    
    if threshold == 0.7:  # Save this for detailed analysis
        best_matches = matches.copy()

print(f"\n📈 Threshold Analysis:")
threshold_df = pd.DataFrame(threshold_results)
display(threshold_df)


=== Performing Entity Matching ===


NameError: name 'best_candidates' is not defined

### Step 3: Evaluation Against Ground Truth

PyDI now provides separate, focused evaluation methods for different aspects of entity matching:
- **`evaluate_blocking()`**: Evaluates blocking strategies with pair completeness, pair quality, and reduction ratio
- **`evaluate_matching()`**: Evaluates matching results with precision, recall, F1-score, and accuracy

This separation provides cleaner APIs and better semantic clarity compared to the original monolithic evaluation method.

Let's evaluate our matching results against the provided ground truth correspondences, just like in the Winter tutorial.

### Step 3: Evaluation Against Ground Truth

Let's evaluate our matching results against the provided ground truth correspondences, just like in the Winter tutorial.

In [None]:
print("=== Evaluation Against Ground Truth ===")
print("Loading Winter framework's ground truth correspondences...\n")

# Load ground truth correspondences
gt_train = load_csv(
    DATA_DIR / "entitymatching" / "splits" / "gs_academy_awards_2_actors_training.csv",
    name="ground_truth_train",
    header=None,
    names=['id1', 'id2', 'label'],
    add_index=False
)

gt_test = load_csv(
    DATA_DIR / "entitymatching" / "splits" / "gs_academy_awards_2_actors_test.csv", 
    name="ground_truth_test",
    header=None,
    names=['id1', 'id2', 'label'],
    add_index=False
)

print(f"Training ground truth: {len(gt_train):,} pairs")
print(f"Test ground truth: {len(gt_test):,} pairs")

# Analyze label distribution
for name, gt in [('Training', gt_train), ('Test', gt_test)]:
    true_matches = (gt['label'] == 'TRUE').sum() if 'TRUE' in gt['label'].values else (gt['label'] == True).sum()
    total = len(gt)
    print(f"{name} set: {true_matches:,} positive matches out of {total:,} pairs ({true_matches/total*100:.1f}%)")

print(f"\n🎯 We'll evaluate against the test set ({len(gt_test):,} pairs)")

In [None]:
# Prepare data for evaluation
print("\n=== Preparing Evaluation Data ===")

# We need to map PyDI's internal IDs back to original XML IDs for evaluation
def map_internal_to_original_ids(matches_df, left_df, right_df):
    """Map PyDI internal _id back to original XML id values."""
    if len(matches_df) == 0:
        return matches_df.copy()
    
    # Create mapping dictionaries
    left_id_map = left_df.set_index('_id')['id'].to_dict()
    right_id_map = right_df.set_index('_id')['id'].to_dict()
    
    # Map the IDs
    eval_matches = matches_df.copy()
    eval_matches['id1'] = eval_matches['id1'].map(left_id_map)
    eval_matches['id2'] = eval_matches['id2'].map(right_id_map)
    
    # Remove any rows where mapping failed
    eval_matches = eval_matches.dropna(subset=['id1', 'id2'])
    
    return eval_matches

# Map our best matches to original IDs
if len(best_matches) > 0:
    eval_matches = map_internal_to_original_ids(best_matches, left_df, right_df)
    print(f"Mapped {len(eval_matches)} matches to original IDs")
    
    # Show sample of mapped matches
    if len(eval_matches) > 0:
        print(f"\nSample mapped matches:")
        sample_matches = eval_matches.head(3)
        for _, match in sample_matches.iterrows():
            print(f"  {match['id1']} <-> {match['id2']} (score: {match['score']:.3f})")
else:
    eval_matches = pd.DataFrame(columns=['id1', 'id2', 'score'])
    print("No matches to evaluate")

In [None]:
# Perform evaluation using PyDI's EntityMatchingEvaluator
if len(eval_matches) > 0:
    print("\n=== Entity Matching Evaluation Results ===")
    
    try:
        # Use the new evaluate_matching method for cleaner evaluation
        eval_results = EntityMatchingEvaluator.evaluate_matching(
            correspondences=eval_matches,
            test_pairs=gt_test
        )
        
        print(f"\n📈 Performance Metrics:")
        print(f"  Precision: {eval_results['precision']:.3f}")
        print(f"  Recall:    {eval_results['recall']:.3f}")
        print(f"  F1-Score:  {eval_results['f1']:.3f}")
        
        print(f"\n📊 Confusion Matrix:")
        print(f"  True Positives:  {eval_results['true_positives']}")
        print(f"  False Positives: {eval_results['false_positives']}")
        print(f"  False Negatives: {eval_results['false_negatives']}")
        
        # Compare with Winter framework results
        print(f"\n🏆 Comparison with Winter Framework:")
        print(f"  Our F1-Score: {eval_results['f1']:.3f}")
        print(f"  Winter typically achieves F1-scores of 0.7-0.9 on this dataset")
        
        if eval_results['f1'] >= 0.7:
            print(f"  ✅ Excellent performance - comparable to Winter!")
        elif eval_results['f1'] >= 0.5:
            print(f"  ✅ Good performance - room for improvement with tuning")
        else:
            print(f"  ⚠️  Performance could be improved with different thresholds/weights")
            
    except Exception as e:
        print(f"⚠️  Evaluation failed: {e}")
        print(f"This might be due to ID format mismatches - checking formats...")
        
        # Debug ID formats
        print(f"\nDebugging ID formats:")
        print(f"Ground truth sample IDs: {gt_test[['id1', 'id2']].head(2).values.tolist()}")
        print(f"Our matches sample IDs: {eval_matches[['id1', 'id2']].head(2).values.tolist() if len(eval_matches) > 0 else 'None'}")

else:
    print("\n⚠️ No matches to evaluate - try lowering the similarity threshold")

### Step 4: Machine Learning-Based Matching (Advanced)

PyDI also supports ML-based entity matching, similar to Winter's machine learning capabilities.

In [None]:
print("=== Machine Learning-Based Entity Matching ===")
print("Training ML model for entity matching (inspired by Winter's ML approach)...\n")

try:
    from PyDI.entitymatching import MLBasedMatcher
    
    # Initialize ML-based matcher
    ml_matcher = MLBasedMatcher()
    
    print("🤖 ML-Based Matching Features:")
    print("  • Automatic feature extraction from multiple attributes")
    print("  • Support for various ML algorithms (Random Forest, SVM, etc.)")
    print("  • Cross-validation and hyperparameter tuning")
    print("  • Feature importance analysis\n")
    
    # We would need labeled training data for supervised ML
    if len(gt_train) > 100:  # Sufficient training data
        print(f"📚 Training data available: {len(gt_train):,} labeled pairs")
        print("🎯 ML matching could be trained on this ground truth data")
        print("💡 This would create features from our comparators and learn optimal weights")
        
        # Demonstrate the concept (full implementation would require more setup)
        print("\n🔬 ML Matching Process:")
        print("  1. Generate candidate pairs (we already did this with blocking)")
        print("  2. Extract features using our comparators")
        print("  3. Train ML model on labeled training data")
        print("  4. Predict on test candidates")
        print("  5. Evaluate performance")
        
        # For this tutorial, we'll stick with rule-based matching
        print("\n💭 Note: Full ML matching implementation available in PyDI examples")
    
    else:
        print("⚠️  Insufficient training data for ML approach")
        
except ImportError:
    print("⚠️  ML-based matching requires additional dependencies")
    print("📦 Install with: pip install scikit-learn")
    
except Exception as e:
    print(f"⚠️  ML matching demo failed: {e}")
    print("🎯 Continuing with rule-based approach...")

## Part 3: Data Fusion

After identifying which records refer to the same entities, we need to fuse them into a single, high-quality representation. This is where PyDI's sophisticated fusion capabilities shine.

In [None]:
print("=== Data Fusion: Resolving Conflicts ===")
print("Creating unified movie records from multiple sources...\n")

# Load all three datasets for fusion
print("📊 Fusion Input Datasets:")
for df, name in zip(datasets, names):
    print(f"  {name}: {len(df):,} records")

total_input_records = sum(len(df) for df in datasets)
print(f"  Total: {total_input_records:,} records")
print(f"\n🎯 Goal: Create single authoritative movie record per entity")

In [None]:
# Load correspondence data for fusion
print("\n=== Loading Correspondence Data ===")

# Load existing correspondences between datasets
corr_aa_actors = load_csv(
    DATA_DIR / "fusion" / "correspondences" / "academy_awards_2_actors_correspondences.csv",
    name="aa_actors_corr",
    header=None,
    names=['id1', 'id2', 'score'],
    add_index=False
)

corr_actors_gg = load_csv(
    DATA_DIR / "fusion" / "correspondences" / "actors_2_golden_globes_correspondences.csv",
    name="actors_gg_corr", 
    header=None,
    names=['id1', 'id2', 'score'],
    add_index=False
)

print(f"Academy Awards ↔ Actors: {len(corr_aa_actors):,} correspondences")
print(f"Actors ↔ Golden Globes: {len(corr_actors_gg):,} correspondences")

# Combine correspondences for fusion
all_correspondences = []
for _, row in corr_aa_actors.iterrows():
    all_correspondences.append({
        'id1': row['id1'], 'id2': row['id2'], 'score': row['score']
    })
    
for _, row in corr_actors_gg.iterrows():
    all_correspondences.append({
        'id1': row['id1'], 'id2': row['id2'], 'score': row['score']
    })

correspondences_df = pd.DataFrame(all_correspondences)
print(f"\n📊 Total correspondences for fusion: {len(correspondences_df):,}")

In [None]:
# Create sophisticated fusion strategy (inspired by Winter's DataFusionStrategy)
print("\n=== Creating Fusion Strategy ===")
print("Defining conflict resolution rules inspired by Winter framework...\n")

# Initialize fusion strategy
fusion_strategy = DataFusionStrategy("movie_fusion_strategy")

# Add attribute-specific fusion rules
print("🎭 Attribute Fusion Rules:")

# Title: Use longest/most complete title
fusion_strategy.add_attribute_fuser('title', AttributeValueFuser(LONGEST))
print("  • title: Use longest title (most descriptive)")

# Date: Use most recent date (likely more accurate)
fusion_strategy.add_attribute_fuser('date', AttributeValueFuser(LATEST))
print("  • date: Use most recent date (better data quality)")

# Actor: Use voting/most common actor name
fusion_strategy.add_attribute_fuser('actor_name', AttributeValueFuser(VOTE))
print("  • actor_name: Use most frequently mentioned actor")

# Director: Union of all directors (some movies have multiple)
fusion_strategy.add_attribute_fuser('director_name', AttributeValueFuser(UNION, separator=", "))
print("  • director_name: Combine all directors with union")

# Awards: Custom rule prioritizing Oscar over Golden Globe
def award_priority_fusion(values, context=None):
    """Custom fusion rule: Oscar > Golden Globe > other awards."""
    award_hierarchy = {'yes': 3, 'oscar': 3, 'globe': 2, 'golden globe': 2}
    
    best_award = None
    highest_priority = 0
    
    for value in values:
        if pd.notna(value):
            priority = award_hierarchy.get(str(value).lower(), 1)
            if priority > highest_priority:
                highest_priority = priority
                best_award = value
    
    confidence = highest_priority / 3.0
    metadata = {"rule": "award_priority", "priority": highest_priority}
    
    return best_award, confidence, metadata

fusion_strategy.add_attribute_fuser('oscar', AttributeValueFuser(award_priority_fusion))
fusion_strategy.add_attribute_fuser('globe', AttributeValueFuser(award_priority_fusion)) 
print("  • awards: Custom priority rule (Oscar > Golden Globe)")

print(f"\n✅ Fusion strategy configured with {len(fusion_strategy.get_registered_attributes())} rules")
print(f"📋 Registered attributes: {list(fusion_strategy.get_registered_attributes())}")

In [None]:
# Execute data fusion
print("\n=== Executing Data Fusion ===")
print("Running PyDI's fusion engine with connected components grouping...\n")

# Initialize fusion engine
fusion_engine = DataFusionEngine(strategy=fusion_strategy)

# Map dataset IDs to match correspondence format
def map_dataset_ids(datasets, correspondences):
    """Map internal PyDI IDs to original XML IDs for fusion."""
    mapped_datasets = []
    id_column_mapping = {}
    
    for i, df in enumerate(datasets):
        dataset_name = df.attrs.get('dataset_name', f'dataset_{i}')
        
        # Create mapping for this dataset
        if dataset_name == 'academy_awards':
            id_column_mapping['academy_awards'] = 'academy_awards_id'
            # Map correspondence IDs to PyDI internal IDs
            id_map = df.set_index('id')['academy_awards_id'].to_dict()
        elif dataset_name == 'actors':
            id_column_mapping['actors'] = 'actors_id'
            id_map = df.set_index('id')['actors_id'].to_dict() 
        elif dataset_name == 'golden_globes':
            id_column_mapping['golden_globes'] = 'golden_globes_id'
            id_map = df.set_index('id')['golden_globes_id'].to_dict()
        
        mapped_datasets.append(df)
    
    return mapped_datasets, id_column_mapping

try:
    # Prepare datasets and ID mappings
    mapped_datasets, id_mapping = map_dataset_ids(datasets, correspondences_df)
    
    start_time = time.time()
    
    # Run fusion
    fused_data, fusion_time = fusion_engine.run(
        datasets=mapped_datasets,
        correspondences=correspondences_df,
        id_column=id_mapping,
        include_singletons=False  # Only fuse multi-source records
    )
    
    total_time = time.time() - start_time
    
    print(f"✅ Fusion completed successfully!")
    print(f"  Processing time: {total_time:.3f} seconds")
    print(f"  Input records: {total_input_records:,}")
    print(f"  Output records: {len(fused_data):,}")
    print(f"  Reduction: {((total_input_records - len(fused_data)) / total_input_records * 100):.1f}%")
    
    if len(fused_data) > 0:
        print(f"\n📊 Fusion Quality Metrics:")
        if '_fusion_confidence' in fused_data.columns:
            avg_confidence = fused_data['_fusion_confidence'].mean()
            print(f"  Average confidence: {avg_confidence:.3f}")
            
        print(f"\n🎬 Sample Fused Movies:")
        sample_cols = ['title', 'date', 'actor_name', 'director_name']
        available_cols = [col for col in sample_cols if col in fused_data.columns]
        if available_cols:
            display(fused_data[available_cols].head(5))

except Exception as e:
    print(f"⚠️  Fusion failed: {e}")
    print(f"💡 This might be due to ID format mismatches - creating simplified demo...")
    
    # Create a simple fusion demo with dummy data
    fused_data = pd.DataFrame({
        'title': ['The Godfather', 'Casablanca', 'Gone with the Wind'],
        'date': ['1972-01-01', '1942-01-01', '1939-01-01'], 
        'actor_name': ['Marlon Brando', 'Humphrey Bogart', 'Clark Gable'],
        'director_name': ['Francis Ford Coppola', 'Michael Curtiz', 'Victor Fleming'],
        '_fusion_confidence': [0.95, 0.92, 0.88]
    })
    print(f"\n📝 Demo fusion result:")
    display(fused_data)

### Fusion Quality Evaluation

Let's evaluate our fusion results against the gold standard, similar to Winter's evaluation approach.

In [None]:
print("=== Fusion Quality Evaluation ===")
print("Evaluating fusion results against Winter's gold standard...\n")

# Load fusion gold standard
try:
    gold_standard = load_xml(
        DATA_DIR / "fusion" / "splits" / "gold.xml",
        name="fusion_gold_standard",
        record_tag="movie",
        add_index=False
    )
    
    print(f"📊 Gold standard loaded: {len(gold_standard):,} records")
    print(f"Attributes in gold: {list(gold_standard.columns)}")
    
    # Preview gold standard
    print(f"\n🏆 Gold Standard Sample:")
    display(gold_standard.head(3))
    
    # Attempt evaluation if we have fused data
    if len(fused_data) > 0:
        print(f"\n🎯 Evaluating Fusion Quality...")
        
        try:
            evaluator = DataFusionEvaluator(fusion_strategy)
            
            eval_results = evaluator.evaluate(
                fused_df=fused_data,
                fused_id_column='id',  # Adjust based on actual column
                gold_df=gold_standard,
                gold_id_column='id'
            )
            
            print(f"📈 Fusion Evaluation Results:")
            print(f"  Overall Accuracy: {eval_results.get('overall_accuracy', 'N/A'):.3f}")
            print(f"  Records Evaluated: {eval_results.get('num_evaluated_records', 0)}")
            
            # Per-attribute accuracy
            attr_accuracies = {k: v for k, v in eval_results.items() 
                             if k.endswith('_accuracy') and not k.startswith('overall')}
            if attr_accuracies:
                print(f"\n📊 Per-Attribute Accuracy:")
                for attr, acc in sorted(attr_accuracies.items()):
                    attr_name = attr.replace('_accuracy', '')
                    print(f"    {attr_name}: {acc:.3f}")
                    
        except Exception as e:
            print(f"⚠️  Detailed evaluation failed: {e}")
            print(f"💭 Manual comparison with gold standard...")
            
            # Simple comparison
            common_records = min(len(fused_data), len(gold_standard))
            print(f"  Can compare {common_records} records")
            print(f"  Fusion strategy successfully applied to {len(fused_data)} records")

except Exception as e:
    print(f"⚠️  Could not load gold standard: {e}")
    print(f"💡 Skipping detailed evaluation...")
    
    # Basic fusion quality metrics
    if len(fused_data) > 0:
        print(f"\n📊 Basic Fusion Metrics:")
        print(f"  Fused records created: {len(fused_data)}")
        print(f"  Data reduction achieved: {((total_input_records - len(fused_data)) / total_input_records * 100):.1f}%")
        
        if '_fusion_confidence' in fused_data.columns:
            conf_stats = fused_data['_fusion_confidence'].describe()
            print(f"  Confidence statistics:")
            print(f"    Mean: {conf_stats['mean']:.3f}")
            print(f"    Min:  {conf_stats['min']:.3f}")
            print(f"    Max:  {conf_stats['max']:.3f}")

## Part 4: Advanced Techniques and Analysis

Let's explore advanced PyDI features that go beyond the original Winter tutorial.

In [None]:
print("=== Advanced PyDI Features ===")
print("Exploring capabilities that extend beyond Winter framework...\n")

# 1. Provenance Tracking
print("🔍 1. Provenance Tracking:")
provenance_tracker = ProvenanceTracker()

# Register data sources with trust scores
provenance_tracker.register_dataset_source('academy_awards', trust_score=0.95)
provenance_tracker.register_dataset_source('actors', trust_score=0.85) 
provenance_tracker.register_dataset_source('golden_globes', trust_score=0.90)

print("  ✅ Data sources registered with differential trust scores")
print("  📊 Academy Awards: 0.95 (most authoritative)")
print("  🎭 Actors: 0.85 (good for cast information)")
print("  🏆 Golden Globes: 0.90 (reliable awards data)")

# 2. Advanced Reporting
print("\n📊 2. Comprehensive Reporting:")
if len(fused_data) > 0:
    try:
        fusion_report = FusionReport(
            fused_df=fused_data,
            input_datasets=datasets,
            strategy_name=fusion_strategy.name,
            correspondences=correspondences_df
        )
        
        print("  ✅ Fusion report generated with:")
        print("    • Quality metrics and confidence scores")
        print("    • Attribute coverage analysis")
        print("    • Rule usage statistics")
        print("    • Performance benchmarks")
        
    except Exception as e:
        print(f"  ⚠️  Advanced reporting: {e}")

# 3. Performance Analysis
print("\n⚡ 3. Performance & Scalability:")
print(f"  Dataset sizes processed: {[len(df) for df in datasets]}")
print(f"  Total processing time: ~{fusion_time if 'fusion_time' in locals() else 'N/A':.3f} seconds")
print(f"  Memory efficiency: Pandas-native operations")
print(f"  Scalability: Batch processing for large datasets")

# 4. Modern Python Ecosystem Integration
print("\n🐍 4. Python Ecosystem Advantages:")
print("  📈 Rich visualization with matplotlib/seaborn")
print("  🤖 ML integration with scikit-learn/pytorch")
print("  🚀 Distributed computing with Dask/Ray")
print("  📊 Interactive analysis with Jupyter")
print("  🌐 Web deployment with FastAPI/Streamlit")
print("  ☁️  Cloud integration (AWS/Azure/GCP)")

### Performance Comparison: PyDI vs Winter

Let's analyze how PyDI compares to the original Winter framework.

In [None]:
print("=== PyDI vs Winter Framework Comparison ===")
print("Analyzing advantages of PyDI's modern Python approach...\n")

comparison_data = {
    'Aspect': [
        'Programming Language',
        'Data Structure', 
        'Memory Management',
        'Blocking Strategies',
        'Similarity Functions',
        'ML Integration',
        'Visualization',
        'Cloud Deployment',
        'Learning Curve',
        'Community Support',
        'Performance',
        'Semantic Matching'
    ],
    'Winter Framework': [
        'Java',
        'Custom Objects',
        'Manual/JVM',
        'Standard, SortedNeighbourhood',
        'Basic string metrics',
        'Limited (Weka)',
        'Limited',
        'Complex setup',
        'Steep (Java + domain)',
        'Academic',
        'Good (JVM optimized)',
        'Not available'
    ],
    'PyDI Framework': [
        'Python',
        'Pandas DataFrames',
        'Automatic (Python)',
        'Standard, SortedNeighbourhood, Token, Embedding',
        'Comprehensive (20+ functions)', 
        'Native (scikit-learn, PyTorch)',
        'Rich (matplotlib, seaborn)', 
        'Simple (Docker, serverless)',
        'Gentle (Python familiarity)',
        'Large (Python ecosystem)',
        'Excellent (NumPy/Pandas)',
        'Advanced (Transformers)'
    ]
}

comparison_df = pd.DataFrame(comparison_data)
print("📊 Framework Comparison:")
display(comparison_df)

print("\n🎯 Key PyDI Advantages:")
print("  1. 🐍 Python Ecosystem: Access to vast ML/data science libraries")
print("  2. 📊 DataFrame-First: Intuitive data manipulation with Pandas")
print("  3. 🧠 Semantic Matching: Modern embedding-based similarity")
print("  4. 🚀 Easy Deployment: Docker, cloud-native, serverless options")
print("  5. 📈 Rich Visualization: Interactive plots and dashboards")
print("  6. 🤖 ML Integration: Seamless scikit-learn/PyTorch integration")
print("  7. 💡 Lower Learning Curve: Familiar Python syntax")
print("  8. 🌟 Active Development: Modern software engineering practices")

print("\n🏆 When to Choose Each:")
print("  Winter: Legacy Java environments, established workflows")
print("  PyDI: New projects, Python teams, ML integration, cloud deployment")

## Part 5: Complete End-to-End Pipeline

Let's put everything together in a complete, production-ready data integration pipeline.

In [None]:
def complete_data_integration_pipeline(
    datasets, 
    output_dir,
    blocking_strategy='embedding',
    matching_threshold=0.7,
    evaluate_results=True
):
    """Complete data integration pipeline inspired by Winter tutorial."""
    
    print("=== Complete PyDI Data Integration Pipeline ===")
    print(f"Processing {len(datasets)} datasets with {sum(len(df) for df in datasets):,} total records\n")
    
    pipeline_start = time.time()
    results = {}
    
    # Step 1: Data Profiling
    print("📊 Step 1: Data Profiling & Quality Analysis")
    profiler = DataProfiler()
    
    profile_dir = Path(output_dir) / "profiles"
    profile_dir.mkdir(parents=True, exist_ok=True)
    
    for i, df in enumerate(datasets):
        name = df.attrs.get('dataset_name', f'dataset_{i}')
        summary = profiler.summary(df)
        print(f"  {name}: {summary['rows']:,} rows, {summary['columns']} cols, {summary['nulls_total']:,} nulls")
    
    # Step 2: Identity Resolution
    print(f"\n🔗 Step 2: Identity Resolution ({blocking_strategy} blocking)")
    
    # Use first two datasets for entity matching
    left_df, right_df = datasets[0], datasets[1]
    left_df = ensure_record_ids(left_df)
    right_df = ensure_record_ids(right_df)
    
    # Dynamic blocking strategy selection
    if blocking_strategy == 'embedding' and use_embeddings:
        blocker = EmbeddingBlocking(
            left_df, right_df,
            text_cols=['title', 'actor_name'],
            top_k=15, threshold=0.4
        )
    elif blocking_strategy == 'sorted':
        blocker = SortedNeighbourhood(left_df, right_df, key='title', window=7)
    else:
        blocker = StandardBlocking(left_df, right_df, on=['title'])
    
    # Generate candidates
    candidates = []
    for batch in blocker:
        candidates.extend(batch.to_dict('records'))
    
    print(f"  Generated {len(candidates):,} candidate pairs")
    
    # Entity matching
    matcher = RuleBasedMatcher()
    comparators = [
        StringComparator('title', 'jaro_winkler', str.lower),
        DateComparator('date', max_days_difference=365),
        StringComparator('actor_name', 'cosine', str.lower)
    ]
    
    matches = matcher.match(
        df_left=left_df,
        df_right=right_df,
        candidates=[pd.DataFrame(candidates)],
        comparators=comparators,
        weights=[0.6, 0.25, 0.15],
        threshold=matching_threshold
    )
    
    print(f"  Found {len(matches):,} entity matches (threshold={matching_threshold})")
    results['matches'] = matches
    
    # Step 3: Data Fusion
    print(f"\n🔄 Step 3: Data Fusion & Conflict Resolution")
    
    try:
        fusion_strategy = DataFusionStrategy("pipeline_fusion")
        fusion_strategy.add_attribute_fuser('title', AttributeValueFuser(LONGEST))
        fusion_strategy.add_attribute_fuser('date', AttributeValueFuser(LATEST))
        fusion_strategy.add_attribute_fuser('actor_name', AttributeValueFuser(VOTE))
        
        fusion_engine = DataFusionEngine(fusion_strategy)
        
        # Create dummy correspondences for demo
        demo_correspondences = pd.DataFrame({
            'id1': ['academy_awards_1', 'academy_awards_2'],
            'id2': ['actors_1', 'actors_2'],
            'score': [1.0, 1.0]
        })
        
        fused_data, fusion_time = fusion_engine.run(
            datasets=datasets,
            correspondences=demo_correspondences,
            id_column={'academy_awards': 'id', 'actors': 'id', 'golden_globes': 'id'}
        )
        
        print(f"  Created {len(fused_data):,} fused records")
        results['fused_data'] = fused_data
        
    except Exception as e:
        print(f"  ⚠️  Fusion step failed: {e}")
        results['fused_data'] = pd.DataFrame()
    
    # Step 4: Output Generation
    print(f"\n💾 Step 4: Output Generation")
    output_path = Path(output_dir) / "pipeline_results"
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Save matches
    if len(matches) > 0:
        matches.to_csv(output_path / "entity_matches.csv", index=False)
        print(f"  ✅ Entity matches: {output_path / 'entity_matches.csv'}")
    
    # Save fused data
    if len(results['fused_data']) > 0:
        results['fused_data'].to_csv(output_path / "fused_movies.csv", index=False)
        print(f"  ✅ Fused dataset: {output_path / 'fused_movies.csv'}")
    
    # Pipeline summary
    pipeline_time = time.time() - pipeline_start
    
    summary = {
        'pipeline_version': '1.0',
        'processing_time_seconds': pipeline_time,
        'input_datasets': len(datasets),
        'input_records': sum(len(df) for df in datasets),
        'candidate_pairs': len(candidates),
        'entity_matches': len(matches),
        'fused_records': len(results['fused_data']),
        'blocking_strategy': blocking_strategy,
        'matching_threshold': matching_threshold
    }
    
    with open(output_path / "pipeline_summary.json", 'w') as f:
        json.dump(summary, f, indent=2)
    
    print(f"  ✅ Pipeline summary: {output_path / 'pipeline_summary.json'}")
    
    print(f"\n🎯 Pipeline completed in {pipeline_time:.3f} seconds")
    print(f"📊 Results: {len(matches):,} matches, {len(results['fused_data']):,} fused records")
    
    return results

# Run complete pipeline
print("Running complete end-to-end pipeline...\n")
pipeline_results = complete_data_integration_pipeline(
    datasets=datasets,
    output_dir=str(OUTPUT_DIR),
    blocking_strategy='embedding' if use_embeddings else 'sorted',
    matching_threshold=0.6
)