# Master Data Management (MDM) - Spanner Native Streaming Processing

This notebook demonstrates a complete end-to-end streaming Master Data Management pipeline using Spanner's native capabilities:

- **Golden Record Bootstrap**: Load existing golden records from BigQuery batch processing
- **Spanner Infrastructure**: Set up minimal Spanner instance for real-time processing
- **Data Migration**: Transfer golden records to Spanner for real-time matching
- **Streaming Data Generation**: Create 100 new customer records for processing
- **4-Way Real-time Matching**: Exact, fuzzy, vector, and business rules matching
- **Synchronous Processing**: Sub-second processing with immediate feedback
- **Golden Record Updates**: Apply survivorship rules and update master entities
- **Live Performance Tracking**: Real-time metrics and Spanner transaction logging

## Architecture Overview

This implementation follows the streaming processing path:
1. **BigQuery Golden Records** → **Spanner Migration**
2. **Kafka-like Stream** → **Real-time Standardization**
3. **Spanner Vector Search** → **4-Way Matching Engine**
4. **Confidence Scoring** → **AUTO_MERGE/CREATE_NEW Decisions**
5. **Golden Record Updates** → **Spanner Transaction Logging**

## 1. Setup and Configuration

In [None]:
# Import required libraries
import warnings
from datetime import datetime
import time
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from batch_mdm_gcp.data_generator import MDMDataGenerator
from batch_mdm_gcp.bigquery_utils import BigQueryMDMHelper
from spanner_utils import SpannerMDMHelper
from streaming_processor import StreamingMDMProcessor
import sys
import os
import random

sys.path.append('..')
warnings.filterwarnings('ignore')

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ Libraries imported successfully")

In [None]:
# =============================================================================
# CONFIGURATION CONSTANTS - Centralized Settings
# =============================================================================

# GCP Configuration
PROJECT_ID = "your-project-id"  # Replace with your GCP project ID
DATASET_ID = "mdm_demo"  # BigQuery dataset (from batch processing)
INSTANCE_ID = "mdm-streaming-demo"  # Spanner instance
DATABASE_ID = "mdm_streaming"  # Spanner database
LOCATION = "US"

# Processing Configuration
NUM_STREAMING_RECORDS = 100
PROCESSING_DELAY_SEC = 0.1  # 10 record per second for demo
TARGET_LATENCY_MS = 400  # Target processing time per record

# Decision Thresholds
AUTO_MERGE_THRESHOLD = 0.85
CREATE_NEW_THRESHOLD = 0.65

print("📋 Configuration loaded:")
print(f"  Target records: {NUM_STREAMING_RECORDS}")
print(f"  Target latency: <{TARGET_LATENCY_MS}ms")
print(f"  Auto-merge threshold: ≥{AUTO_MERGE_THRESHOLD}")
print(f"  Create new threshold: <{CREATE_NEW_THRESHOLD}")

In [None]:
# Initialize helpers
try:
    # BigQuery helper (for loading golden records)
    bq_helper = BigQueryMDMHelper(PROJECT_ID, DATASET_ID)
    print(f"✅ Connected to BigQuery project: {PROJECT_ID}")
    print(f"📊 BigQuery dataset: {bq_helper.dataset_ref}")

    # Spanner helper (for streaming processing)
    spanner_helper = SpannerMDMHelper(PROJECT_ID, INSTANCE_ID, DATABASE_ID)
    print(f"✅ Connected to Spanner project: {PROJECT_ID}")
    print(f"🗃️ Spanner instance: {INSTANCE_ID}")
    print(f"🗃️ Spanner database: {DATABASE_ID}")

except Exception as e:
    print(f"❌ Error connecting: {e}")
    print("Please ensure you have:")
    print("1. Set up Google Cloud authentication")
    print("2. Enabled BigQuery and Spanner APIs")
    print("3. Updated PROJECT_ID above")

## 2. Helper Functions

In [None]:
def update_statistics(result, action_counts, confidence_counts):
    """Update running statistics with current result."""
    action = result.get('action', 'ERROR')
    confidence = result.get('confidence', 'LOW')

    action_counts[action] = action_counts.get(action, 0) + 1
    confidence_counts[confidence] = confidence_counts.get(confidence, 0) + 1


print("✅ Helper functions defined")

## 3. Spanner Infrastructure Setup

Create minimal Spanner infrastructure for the streaming demo.

In [None]:
print("🔄 Setting up Spanner infrastructure...")
print("💰 Cost estimate: ~$65/month for 100 processing units (regional)")
print("⚠️ Remember to delete the instance after demo to avoid charges")
print()

try:
    # Create Spanner instance (minimal configuration)
    spanner_helper.create_instance_if_needed(processing_units=100)

    # Create database
    spanner_helper.create_database_if_needed()

    # Create schema (aligned with BigQuery golden_records)
    spanner_helper.create_or_replace_schema()

    print("\n✅ Spanner infrastructure ready!")
    print(f"📊 Instance: {INSTANCE_ID} (100 processing units)")
    print(f"🗃️ Database: {DATABASE_ID}")
    print(f"📋 Schema: golden_entities, match_results tables created")

except Exception as e:
    print(f"❌ Error setting up Spanner infrastructure: {e}")
    print("Please check your GCP permissions and try again.")

## 4. Load Golden Records from BigQuery

Bootstrap the streaming system with existing golden records from batch processing.

In [None]:
print("🔄 Loading golden records from BigQuery batch processing...")

try:
    # Load golden records from BigQuery
    golden_count = spanner_helper.load_golden_records_from_bigquery(bq_helper)

    if golden_count > 0:
        print(
            f"\n✅ Successfully migrated {golden_count} golden records to Spanner")

        # Verify the migration
        current_count = spanner_helper.get_table_count("golden_entities")
        print(f"📊 Current golden entities in Spanner: {current_count}")

        # Show sample records
        sample_query = """
        SELECT entity_id, master_name, master_email, master_phone,
               source_record_count, processing_path
        FROM golden_entities
        LIMIT 5
        """

        sample_df = spanner_helper.execute_sql(sample_query)
        if not sample_df.empty:
            print("\n🔍 Sample Golden Records in Spanner:")
            sample_df.columns = ['entity_id', 'master_name',
                                 'master_email', 'master_phone', 'source_count', 'path']
            display(sample_df)
    else:
        print("⚠️ No golden records found in BigQuery")
        print("💡 Run the batch processing notebook first to create golden records")

except Exception as e:
    print(f"❌ Error loading golden records: {e}")
    print("💡 Make sure you've run the batch processing notebook first")

## 5. Generate New Streaming Data

Create new customer records to simulate streaming data.

In [None]:
print(f"🔄 Generating {NUM_STREAMING_RECORDS} new streaming records...")

try:
    # Generate new streaming data (different from batch data)
    generator = MDMDataGenerator(num_unique_customers=NUM_STREAMING_RECORDS)
    streaming_datasets = generator.generate_all_datasets()

    # Combine all streaming records
    all_streaming_records = []
    for source, df in streaming_datasets.items():
        for _, record in df.iterrows():
            all_streaming_records.append(record.to_dict())

    # Shuffle to simulate random streaming order
    random.shuffle(all_streaming_records)

    # Take exactly NUM_STREAMING_RECORDS
    streaming_records = all_streaming_records[:NUM_STREAMING_RECORDS]

    print(f"\n📈 Streaming Data Summary:")
    print(f"  Total streaming records: {len(streaming_records)}")

    # Show source distribution
    source_counts = {}
    for record in streaming_records:
        source = record.get('source_system', 'unknown')
        source_counts[source] = source_counts.get(source, 0) + 1

    for source, count in source_counts.items():
        print(f"  {source.upper()}: {count} records")

    print(f"\n🔍 Sample Streaming Records:")
    sample_streaming = pd.DataFrame(streaming_records[:3])
    display(sample_streaming[['record_id', 'full_name',
            'email', 'phone', 'source_system']].head(3))

    print("\n✅ Streaming data ready for processing!")

except Exception as e:
    print(f"❌ Error generating streaming data: {e}")
    streaming_records = []

## 6. Initialize Streaming Processor

Set up the 4-way matching processor.

In [None]:
print("🔄 Initializing 4-way streaming processor...")

try:
    # Initialize the streaming processor
    processor = StreamingMDMProcessor(spanner_helper)

    print("\n📊 Processor Configuration:")
    print(f"  Matching strategies: 4 (exact, fuzzy, vector, business)")
    print(f"  Strategy weights:")
    for strategy, weight in processor.weights.items():
        print(f"    {strategy}: {weight*100:.0f}%")

    print(f"\n⚖️ Decision Thresholds:")
    print(f"  Auto-merge: ≥{processor.auto_merge_threshold}")
    print(f"  Create new: <{processor.create_new_threshold}")

    print("\n✅ Streaming processor ready!")
    print(f"🎯 Target: <{TARGET_LATENCY_MS}ms processing time per record")

except Exception as e:
    print(f"❌ Error initializing processor: {e}")
    processor = None

## 7. Streaming Processing Loop

Process each record with sleep in between to simulate real-time pipeline.

In [None]:
print(
    f"🚀 Starting Streaming MDM Simulation ({NUM_STREAMING_RECORDS} records, per 100ms)")
print("=" * 80)
print()

# Validate prerequisites
if not streaming_records:
    print("❌ No streaming records available. Please run data generation first.")
elif not processor:
    print("❌ Processor not initialized. Please run processor setup first.")
else:
    # Track overall statistics
    start_time = time.time()
    total_processing_time = 0
    action_counts = {}
    confidence_counts = {}

    # Process each record
    for i, record in enumerate(streaming_records, 1):
        record_start = time.time()

        # Process the record with match details
        result = processor.process_record(
            record, i, NUM_STREAMING_RECORDS, include_match_details=True)

        # Store match result in Spanner
        try:
            match_id = processor.store_match_result(record, result)
            print(
                f"  🗃️ → Stored match result in Spanner (match_id: {match_id[:8]}...)")
        except Exception as e:
            print(f"  ⚠️ → Failed to store match result: {e}")

        # Update statistics
        total_processing_time += result.get('processing_time_ms', 0)
        update_statistics(result, action_counts, confidence_counts)

        # Sleep to maintain processing pace
        elapsed = time.time() - record_start
        sleep_time = max(0, PROCESSING_DELAY_SEC - elapsed)
        if sleep_time > 0:
            print(f"  ⏱️ Next record in {sleep_time:.1f}s...")
            time.sleep(sleep_time)

        print()  # Empty line for readability

    # Calculate final statistics
    total_time = time.time() - start_time

    print("🎉 Streaming Simulation Complete!")
    print("=" * 50)
    print(f"📊 Processing Summary:")
    print(f"  Records processed: {NUM_STREAMING_RECORDS}")
    print(f"  Total time: {total_time:.1f} seconds")
    print(
        f"  Average processing time: {total_processing_time/NUM_STREAMING_RECORDS:.0f}ms")
    print(
        f"  Throughput: {NUM_STREAMING_RECORDS/total_time:.1f} records/second")

    print(f"\n⚖️ Decision Distribution:")
    for action, count in action_counts.items():
        percentage = (count / NUM_STREAMING_RECORDS) * 100
        print(f"  {action}: {count} ({percentage:.1f}%)")

    print(f"\n🎯 Confidence Distribution:")
    for confidence, count in confidence_counts.items():
        percentage = (count / NUM_STREAMING_RECORDS) * 100
        print(f"  {confidence}: {count} ({percentage:.1f}%)")

    print(f"\n📁 Results stored in Spanner table: match_results")
    print("💡 Use Section 8 to analyze the results from Spanner")

## 8. Analysis and Visualization

Analyze the streaming processing results.

In [None]:
print("📊 Analyzing streaming processing results from Spanner...")

# Query transaction data from Spanner
transactions_query = """
SELECT
    record1_id, source1,
    exact_score, fuzzy_score, vector_score, business_score,
    combined_score, confidence_level, match_decision,
    processing_time_ms, matched_at
FROM match_results
WHERE matched_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY matched_at DESC
"""

transactions_df = spanner_helper.execute_sql(transactions_query)

# Rename columns for compatibility with existing analysis
transactions_df.columns = [
    'record_id', 'source_system',
    'exact_score', 'fuzzy_score', 'vector_score', 'business_score',
    'combined_score', 'confidence', 'action',
    'processing_time_ms', 'matched_at'
]

# Add calculated columns for match counts
transactions_df['exact_matches'] = (
    transactions_df['exact_score'] > 0).astype(int)
transactions_df['fuzzy_matches'] = (
    transactions_df['fuzzy_score'] > 0).astype(int)
transactions_df['vector_matches'] = (
    transactions_df['vector_score'] > 0).astype(int)
transactions_df['business_matches'] = (
    transactions_df['business_score'] > 0).astype(int)


# Performance analysis
print("\n⚡ Performance Analysis:")
print(
    f"  Average processing time: {transactions_df['processing_time_ms'].mean():.0f}ms")
print(
    f"  Median processing time: {transactions_df['processing_time_ms'].median():.0f}ms")
print(
    f"  95th percentile: {transactions_df['processing_time_ms'].quantile(0.95):.0f}ms")
print(
    f"  Max processing time: {transactions_df['processing_time_ms'].max():.0f}ms")

# Matching effectiveness
print("\n🎯 Matching Effectiveness:")
print(
    f"  Average combined score: {transactions_df['combined_score'].mean():.3f}")
print(
    f"  Records with exact matches: {(transactions_df['exact_matches'] > 0).sum()}")
print(
    f"  Records with fuzzy matches: {(transactions_df['fuzzy_matches'] > 0).sum()}")
print(
    f"  Records with vector matches: {(transactions_df['vector_matches'] > 0).sum()}")
print(
    f"  Records with business matches: {(transactions_df['business_matches'] > 0).sum()}")

In [None]:
# Create visualizations
print("📈 Creating performance visualizations...")

# Processing time distribution
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Processing Time Distribution', 'Action Distribution',
                    'Confidence Distribution', 'Combined Score Distribution'),
    specs=[[{'type': 'histogram'}, {'type': 'pie'}],
           [{'type': 'pie'}, {'type': 'histogram'}]]
)

# Processing time histogram
fig.add_trace(
    go.Histogram(
        x=transactions_df['processing_time_ms'], name='Processing Time (ms)'),
    row=1, col=1
)

# Action distribution pie chart
action_counts = transactions_df['action'].value_counts()
fig.add_trace(
    go.Pie(labels=action_counts.index,
           values=action_counts.values, name='Actions'),
    row=1, col=2
)

# Confidence distribution pie chart
confidence_counts = transactions_df['confidence'].value_counts()
fig.add_trace(
    go.Pie(labels=confidence_counts.index,
           values=confidence_counts.values, name='Confidence'),
    row=2, col=1
)

# Combined score histogram
fig.add_trace(
    go.Histogram(x=transactions_df['combined_score'], name='Combined Score'),
    row=2, col=2
)

fig.update_layout(
    title_text="Streaming MDM Performance Analysis", showlegend=False)
fig.show()

print("✅ Visualizations created!")

In [None]:
# Strategy effectiveness analysis
print("🎯 4-Strategy Effectiveness Analysis:")

strategy_stats = pd.DataFrame({
    'Strategy': ['Exact', 'Fuzzy', 'Vector', 'Business'],
    'Records_with_Matches': [
        (transactions_df['exact_matches'] > 0).sum(),
        (transactions_df['fuzzy_matches'] > 0).sum(),
        (transactions_df['vector_matches'] > 0).sum(),
        (transactions_df['business_matches'] > 0).sum()
    ],
    'Average_Score': [
        transactions_df['exact_score'].mean(),
        transactions_df['fuzzy_score'].mean(),
        transactions_df['vector_score'].mean(),
        transactions_df['business_score'].mean()
    ]
})

display(strategy_stats)

# Strategy effectiveness chart
fig = px.bar(
    strategy_stats,
    x='Strategy',
    y='Records_with_Matches',
    title='4-Strategy Matching Effectiveness',
    labels={'Records_with_Matches': 'Records with Matches'}
)
fig.show()

print("✅ Strategy analysis complete!")

## 9. Final Golden Record Analysis

Analyze the final state of golden records in Spanner.

In [None]:
print("🏆 Analyzing final golden record state...")

# Get final golden record count
final_count = spanner_helper.get_table_count("golden_entities")
print(f"\n📊 Final golden entities count: {final_count}")

# Analyze processing paths
path_query = """
SELECT processing_path, COUNT(*) as count
FROM golden_entities
GROUP BY processing_path
ORDER BY count DESC
"""

path_df = spanner_helper.execute_sql(path_query)
if not path_df.empty:
    path_df.columns = ['processing_path', 'count']
    print("\n🔄 Processing Path Distribution:")
    display(path_df)

# Analyze source record counts
source_query = """
SELECT source_record_count, COUNT(*) as entities
FROM golden_entities
GROUP BY source_record_count
ORDER BY source_record_count
"""

source_df = spanner_helper.execute_sql(source_query)
if not source_df.empty:
    source_df.columns = ['source_record_count', 'entities']
    print("\n📈 Source Record Count Distribution:")
    display(source_df)

# Show sample updated records
updated_query = """
SELECT entity_id, master_name, master_email, source_record_count,
       processing_path, updated_at
FROM golden_entities
WHERE processing_path = 'stream'
ORDER BY updated_at DESC
LIMIT 10
"""

updated_df = spanner_helper.execute_sql(updated_query)
if not updated_df.empty:
    updated_df.columns = ['entity_id', 'master_name',
                          'master_email', 'source_count', 'path', 'updated_at']
    print("\n🔄 Sample Updated Records (Streaming):")
    display(updated_df)

print("\n✅ Golden record analysis complete!")

## 10. Performance Metrics and Summary

Calculate key performance indicators for the 4-strategy streaming MDM pipeline.

In [None]:
print("📈 Calculating 4-Strategy Streaming MDM Performance Metrics...")

# Overall pipeline statistics
initial_golden_count = golden_count if 'golden_count' in locals() else 0
final_golden_count = spanner_helper.get_table_count("golden_entities")
new_entities_created = final_golden_count - initial_golden_count

print(f"\n📊 Pipeline Statistics:")
print(f"  Initial golden records (from BigQuery): {initial_golden_count}")
print(f"  Streaming records processed: {NUM_STREAMING_RECORDS}")
print(f"  Final golden records: {final_golden_count}")
print(f"  Net new entities created: {new_entities_created}")
print(
    f"  Entity consolidation rate: {((NUM_STREAMING_RECORDS - new_entities_created) / NUM_STREAMING_RECORDS * 100):.1f}%")

# Performance metrics from transactions
if 'transactions_df' in locals() and not transactions_df.empty:
    print(f"\n⚡ Performance Metrics:")
    print(
        f"  Average processing time: {transactions_df['processing_time_ms'].mean():.0f}ms")
    print(
        f"  Sub-second guarantee: {(transactions_df['processing_time_ms'] < 1000).sum()}/{len(transactions_df)} ({(transactions_df['processing_time_ms'] < 1000).mean()*100:.1f}%)")
    print(
        f"  Target <400ms: {(transactions_df['processing_time_ms'] < 400).sum()}/{len(transactions_df)} ({(transactions_df['processing_time_ms'] < 400).mean()*100:.1f}%)")

    print(f"\n🎯 4-Strategy Matching Results:")
    print(
        f"  Auto-merge rate: {action_counts.get('AUTO_MERGE', 0)}/{NUM_STREAMING_RECORDS} ({action_counts.get('AUTO_MERGE', 0)/NUM_STREAMING_RECORDS*100:.1f}%)")
    print(
        f"  New entity rate: {action_counts.get('CREATE_NEW', 0)}/{NUM_STREAMING_RECORDS} ({action_counts.get('CREATE_NEW', 0)/NUM_STREAMING_RECORDS*100:.1f}%)")
    print(
        f"  Average confidence score: {transactions_df['combined_score'].mean():.3f}")

print("\n✅ Performance analysis complete!")

## 11. Cleanup and Cost Management

Optional cleanup to avoid ongoing Spanner charges.

In [None]:
print("🧹 Demo Cleanup Options:")
print("=" * 50)
print(f"💰 Current Spanner instance cost: ~$65/month for {INSTANCE_ID}")
print(f"📊 Processing units: 100 (regional)")
print(f"🗃️ Database: {DATABASE_ID}")
print()
print("⚠️ To avoid ongoing charges, you can delete the Spanner instance:")
print(f"   gcloud spanner instances delete {INSTANCE_ID} --quiet")
print()
print("💡 The BigQuery golden records remain unchanged for future use.")
print("✅ Streaming MDM demo completed successfully!")