# Hive-DITTO Integration Testing Notebook

This notebook allows you to test the complete Hive data extraction, DITTO entity matching, and result storage workflow before deploying to Kubeflow.

## Workflow Overview:
1. **Setup & Configuration** - Configure connections and parameters
2. **Data Extraction** - Extract data from Hive table to JSONL
3. **Data Preprocessing** - Convert to DITTO format
4. **Entity Matching** - Run DITTO matching
5. **Result Analysis** - Analyze matching results
6. **Save to Hive** - Store results back to Hive

## 1. Setup & Installation

In [None]:
# Install required packages
!pip install pyhive pandas thrift sasl jsonlines matplotlib seaborn

# Import libraries
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import subprocess
import logging
from typing import List, Dict, Any
import warnings
warnings.filterwarnings('ignore')

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✓ Setup completed!")

## 2. Configuration

In [None]:
# Hive Configuration
HIVE_CONFIG = {
    'host': 'localhost',  # Change to your Hive host
    'port': 10000,
    'database': 'default',
    'username': 'hive',  # Change to your username
    'auth': 'NOSASL'     # Options: 'NOSASL', 'PLAIN', 'KERBEROS'
}

# Input/Output Configuration
DATA_CONFIG = {
    'input_table': 'base.table',              # Your Hive input table
    'output_table': 'results.ditto_matches',  # Hive output table
    'temp_dir': './temp_notebook',            # Temporary directory
    'input_jsonl': './temp_notebook/input_data.jsonl',
    'output_jsonl': './temp_notebook/output_results.jsonl',
    'sample_size': 1000                       # Number of records to test with
}

# DITTO Configuration
DITTO_CONFIG = {
    'task': 'person_records',
    'lm': 'bert',
    'max_len': 64,
    'checkpoint_path': 'checkpoints/',
    'use_gpu': True,
    'fp16': True
}

# Create temp directory
os.makedirs(DATA_CONFIG['temp_dir'], exist_ok=True)

print("📋 Configuration loaded:")
print(f"  Hive: {HIVE_CONFIG['host']}:{HIVE_CONFIG['port']}")
print(f"  Input table: {DATA_CONFIG['input_table']}")
print(f"  Output table: {DATA_CONFIG['output_table']}")
print(f"  DITTO model: {DITTO_CONFIG['lm']} ({DITTO_CONFIG['task']})")

## 3. Utility Functions

In [None]:
class HiveConnector:
    """Handle Hive database connections and operations."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.connection = None
    
    def connect(self):
        """Establish connection to Hive."""
        try:
            from pyhive import hive
            self.connection = hive.Connection(
                host=self.config['host'],
                port=self.config['port'],
                database=self.config['database'],
                username=self.config.get('username'),
                auth=self.config.get('auth', 'NOSASL')
            )
            logger.info(f"Connected to Hive: {self.config['host']}:{self.config['port']}")
            return True
        except Exception as e:
            logger.error(f"Failed to connect to Hive: {e}")
            return False
    
    def execute_query(self, query: str) -> pd.DataFrame:
        """Execute SQL query and return results as DataFrame."""
        if not self.connection:
            raise RuntimeError("Not connected to Hive")
        
        try:
            df = pd.read_sql(query, self.connection)
            logger.info(f"Query executed successfully, returned {len(df)} rows")
            return df
        except Exception as e:
            logger.error(f"Query failed: {e}")
            raise
    
    def close(self):
        """Close Hive connection."""
        if self.connection:
            self.connection.close()
            logger.info("Hive connection closed")

def convert_to_ditto_format(df: pd.DataFrame) -> List[Dict[str, Any]]:
    """Convert DataFrame to DITTO JSONL format."""
    records = []
    
    for idx, row in df.iterrows():
        # Convert row to COL/VAL format
        col_val_parts = []
        
        for col in df.columns:
            if pd.notna(row[col]) and str(row[col]).strip():
                value = str(row[col]).strip()
                col_val_parts.append(f"COL {col} VAL {value}")
        
        record_text = " ".join(col_val_parts)
        
        # Create JSONL record (modify this logic based on your needs)
        record = {
            "left": record_text,
            "right": record_text,  # For self-matching, change as needed
            "id": idx
        }
        records.append(record)
    
    return records

def analyze_ditto_results(jsonl_path: str) -> Dict[str, Any]:
    """Analyze DITTO matching results."""
    results = []
    
    with open(jsonl_path, 'r') as f:
        for line in f:
            if line.strip():
                results.append(json.loads(line))
    
    if not results:
        return {"error": "No results found"}
    
    # Calculate statistics
    total_pairs = len(results)
    matches = sum(1 for r in results if r.get('match', False))
    match_scores = [r.get('match_confidence', 0.0) for r in results]
    
    return {
        'total_pairs': total_pairs,
        'matches': matches,
        'non_matches': total_pairs - matches,
        'match_rate': matches / total_pairs if total_pairs > 0 else 0,
        'avg_score': np.mean(match_scores) if match_scores else 0,
        'score_distribution': match_scores
    }

print("✓ Utility functions defined!")

## 4. Test Hive Connection

In [None]:
# Test Hive connection
hive_conn = HiveConnector(HIVE_CONFIG)

if hive_conn.connect():
    print("✅ Hive connection successful!")
    
    # Test with a simple query
    try:
        test_df = hive_conn.execute_query("SHOW TABLES LIMIT 5")
        print("\n📋 Available tables (sample):")
        display(test_df)
    except Exception as e:
        print(f"⚠️  Could not list tables: {e}")
        print("This might be normal depending on your Hive permissions")
    
else:
    print("❌ Hive connection failed!")
    print("Please check your configuration and ensure Hive is accessible")

## 5. Data Extraction from Hive

In [None]:
# Extract data from Hive table
print(f"🔄 Extracting data from: {DATA_CONFIG['input_table']}")

try:
    # Extract sample data
    query = f"""
    SELECT * FROM {DATA_CONFIG['input_table']} 
    LIMIT {DATA_CONFIG['sample_size']}
    """
    
    df = hive_conn.execute_query(query)
    
    print(f"✅ Extracted {len(df)} records")
    print(f"📊 Columns: {list(df.columns)}")
    
    # Display sample data
    print("\n📋 Sample data:")
    display(df.head())
    
    # Show data info
    print("\n📈 Data info:")
    print(df.info())
    
except Exception as e:
    print(f"❌ Data extraction failed: {e}")
    print("\n💡 Creating sample data for testing...")
    
    # Create sample data if Hive extraction fails
    df = pd.DataFrame({
        'id': range(1, 11),
        'firstname': ['Ahmed', 'Mohamed', 'Fatima', 'Omar', 'Aisha', 'Khalid', 'Layla', 'Hassan', 'Zeinab', 'Ali'],
        'lastname': ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis', 'Rodriguez', 'Martinez'],
        'email': [f'user{i}@example.com' for i in range(1, 11)],
        'phone': [f'555-000{i:04d}' for i in range(1, 11)]
    })
    print(f"✅ Created sample dataset with {len(df)} records")
    display(df)

## 6. Convert to DITTO Format

In [None]:
# Convert to DITTO format
print("🔄 Converting data to DITTO format...")

ditto_records = convert_to_ditto_format(df)

print(f"✅ Converted {len(ditto_records)} records to DITTO format")

# Save to JSONL file
with open(DATA_CONFIG['input_jsonl'], 'w', encoding='utf-8') as f:
    for record in ditto_records:
        f.write(json.dumps(record, ensure_ascii=False) + '\n')

print(f"💾 Saved to: {DATA_CONFIG['input_jsonl']}")

# Show sample records
print("\n📋 Sample DITTO records:")
for i, record in enumerate(ditto_records[:3]):
    print(f"\nRecord {i+1}:")
    print(f"  Left:  {record['left'][:100]}...")
    print(f"  Right: {record['right'][:100]}...")
    print(f"  ID:    {record['id']}")

## 7. Check DITTO Model Status

In [None]:
# Check if DITTO models and checkpoints are available
print("🔍 Checking DITTO model status...")

# Check model files
model_checks = {
    'BERT Model': './models/bert-base-uncased/config.json',
    'RoBERTa Model': './models/roberta-base/config.json',
    'DistilBERT Model': './models/distilbert-base-uncased/config.json',
    'DITTO Matcher': './matcher.py',
    'Checkpoint Directory': DITTO_CONFIG['checkpoint_path']
}

for name, path in model_checks.items():
    if os.path.exists(path):
        print(f"✅ {name}: Found at {path}")
    else:
        print(f"❌ {name}: Not found at {path}")

# Set environment variables for offline models
os.environ['BERT_MODEL_PATH'] = './models/bert-base-uncased'
os.environ['ROBERTA_MODEL_PATH'] = './models/roberta-base'
os.environ['DISTILBERT_MODEL_PATH'] = './models/distilbert-base-uncased'
os.environ['NLTK_DATA'] = './nltk_data'

print("\n🔧 Environment variables set for offline model usage")

# Check GPU availability
gpu_available = False
try:
    result = subprocess.run(['nvidia-smi'], capture_output=True, text=True)
    if result.returncode == 0:
        gpu_available = True
        print("🎮 GPU available for DITTO matching")
    else:
        print("⚠️  GPU not available, will use CPU")
        DITTO_CONFIG['use_gpu'] = False
except:
    print("⚠️  Cannot detect GPU, will use CPU")
    DITTO_CONFIG['use_gpu'] = False

## 8. Run DITTO Matching

In [None]:
# Run DITTO matching
print("🔄 Running DITTO entity matching...")

# Build matcher command
cmd = [
    'python', 'matcher.py',
    '--task', DITTO_CONFIG['task'],
    '--input_path', DATA_CONFIG['input_jsonl'],
    '--output_path', DATA_CONFIG['output_jsonl'],
    '--lm', DITTO_CONFIG['lm'],
    '--max_len', str(DITTO_CONFIG['max_len']),
    '--checkpoint_path', DITTO_CONFIG['checkpoint_path']
]

if DITTO_CONFIG['use_gpu']:
    cmd.append('--use_gpu')
    
if DITTO_CONFIG['fp16']:
    cmd.append('--fp16')

print(f"🚀 Command: {' '.join(cmd)}")

# Set environment for GPU
env = os.environ.copy()
if DITTO_CONFIG['use_gpu']:
    env['CUDA_VISIBLE_DEVICES'] = '0'

try:
    # Run the matcher
    result = subprocess.run(
        cmd,
        env=env,
        capture_output=True,
        text=True,
        timeout=300  # 5 minute timeout
    )
    
    if result.returncode == 0:
        print("✅ DITTO matching completed successfully!")
        print("\n📋 Output:")
        print(result.stdout)
    else:
        print("❌ DITTO matching failed!")
        print("\n❌ Error:")
        print(result.stderr)
        
except subprocess.TimeoutExpired:
    print("⏰ DITTO matching timed out")
except Exception as e:
    print(f"❌ Error running DITTO: {e}")

# Check if output file exists
if os.path.exists(DATA_CONFIG['output_jsonl']):
    file_size = os.path.getsize(DATA_CONFIG['output_jsonl'])
    print(f"\n💾 Output file created: {DATA_CONFIG['output_jsonl']} ({file_size} bytes)")
else:
    print(f"\n❌ Output file not created: {DATA_CONFIG['output_jsonl']}")

## 9. Analyze Results

In [None]:
# Analyze DITTO results
if os.path.exists(DATA_CONFIG['output_jsonl']):
    print("📊 Analyzing DITTO matching results...")
    
    stats = analyze_ditto_results(DATA_CONFIG['output_jsonl'])
    
    if 'error' not in stats:
        print("\n✅ Results Summary:")
        print(f"  Total pairs processed: {stats['total_pairs']}")
        print(f"  Matches found: {stats['matches']}")
        print(f"  Non-matches: {stats['non_matches']}")
        print(f"  Match rate: {stats['match_rate']:.2%}")
        print(f"  Average confidence: {stats['avg_score']:.3f}")
        
        # Plot score distribution
        if stats['score_distribution']:
            plt.figure(figsize=(10, 6))
            
            plt.subplot(1, 2, 1)
            plt.hist(stats['score_distribution'], bins=20, alpha=0.7, edgecolor='black')
            plt.xlabel('Match Confidence Score')
            plt.ylabel('Frequency')
            plt.title('Distribution of Match Confidence Scores')
            
            plt.subplot(1, 2, 2)
            match_counts = [stats['matches'], stats['non_matches']]
            labels = ['Matches', 'Non-matches']
            plt.pie(match_counts, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title('Match vs Non-match Distribution')
            
            plt.tight_layout()
            plt.show()
        
        # Show sample results
        print("\n📋 Sample Results:")
        with open(DATA_CONFIG['output_jsonl'], 'r') as f:
            for i, line in enumerate(f):
                if i >= 5:  # Show first 5 results
                    break
                result = json.loads(line)
                match_status = "✅ MATCH" if result.get('match', False) else "❌ NO MATCH"
                confidence = result.get('match_confidence', 0.0)
                print(f"\nResult {i+1}: {match_status} (confidence: {confidence:.3f})")
                print(f"  Left:  {result.get('left', '')[:80]}...")
                print(f"  Right: {result.get('right', '')[:80]}...")
    
    else:
        print(f"❌ Error analyzing results: {stats['error']}")
        
else:
    print("❌ No results file found to analyze")

## 10. Save Results to Hive

In [None]:
# Save results back to Hive
if os.path.exists(DATA_CONFIG['output_jsonl']):
    print(f"🔄 Saving results to Hive table: {DATA_CONFIG['output_table']}")
    
    try:
        # Read results
        results = []
        with open(DATA_CONFIG['output_jsonl'], 'r') as f:
            for line in f:
                if line.strip():
                    results.append(json.loads(line))
        
        print(f"📋 Loaded {len(results)} results")
        
        # Convert to DataFrame for easier handling
        results_df = pd.DataFrame([
            {
                'record_id': r.get('id', 0),
                'left_record': r.get('left', ''),
                'right_record': r.get('right', ''),
                'match_probability': r.get('match_confidence', 0.0),
                'is_match': r.get('match', False),
                'created_at': datetime.now().isoformat()
            }
            for r in results
        ])
        
        print("\n📊 Results DataFrame:")
        display(results_df.head())
        
        # Connect to Hive for saving
        if hive_conn.connection:
            cursor = hive_conn.connection.cursor()
            
            # Create table if doesn't exist
            create_table_sql = f"""
            CREATE TABLE IF NOT EXISTS {DATA_CONFIG['output_table']} (
                record_id INT,
                left_record STRING,
                right_record STRING,
                match_probability DOUBLE,
                is_match BOOLEAN,
                created_at STRING
            )
            STORED AS PARQUET
            """
            
            cursor.execute(create_table_sql)
            print(f"✅ Table {DATA_CONFIG['output_table']} created/verified")
            
            # Insert results (for large datasets, consider bulk loading)
            insert_count = 0
            for _, row in results_df.iterrows():
                # Escape single quotes in strings
                left_record = str(row['left_record']).replace("'", "''")[:1000]  # Truncate long records
                right_record = str(row['right_record']).replace("'", "''")[:1000]
                
                insert_sql = f"""
                INSERT INTO {DATA_CONFIG['output_table']} VALUES (
                    {row['record_id']},
                    '{left_record}',
                    '{right_record}',
                    {row['match_probability']},
                    {'true' if row['is_match'] else 'false'},
                    '{row['created_at']}'
                )
                """
                
                cursor.execute(insert_sql)
                insert_count += 1
                
                if insert_count % 10 == 0:
                    print(f"  Inserted {insert_count}/{len(results_df)} records...")
            
            print(f"✅ Successfully saved {insert_count} results to {DATA_CONFIG['output_table']}")
            
            # Verify insertion
            verify_df = hive_conn.execute_query(f"SELECT COUNT(*) as count FROM {DATA_CONFIG['output_table']}")
            print(f"🔍 Verification: {verify_df.iloc[0]['count']} records in table")
            
        else:
            print("❌ Not connected to Hive, cannot save results")
            print("💡 Results are available in the DataFrame above and the JSONL file")
            
    except Exception as e:
        print(f"❌ Error saving to Hive: {e}")
        print("💡 Results are still available in the JSONL file for manual processing")
        
else:
    print("❌ No results file found to save")

## 11. Cleanup and Summary

In [None]:
# Cleanup and summary
print("🧹 Cleaning up and generating summary...")

# Close Hive connection
hive_conn.close()

# Generate workflow summary
summary = {
    'timestamp': datetime.now().isoformat(),
    'configuration': {
        'hive_table': DATA_CONFIG['input_table'],
        'output_table': DATA_CONFIG['output_table'],
        'ditto_model': DITTO_CONFIG['lm'],
        'sample_size': DATA_CONFIG['sample_size']
    },
    'files_created': [
        DATA_CONFIG['input_jsonl'],
        DATA_CONFIG['output_jsonl']
    ],
    'status': 'completed'
}

# Add results statistics if available
if os.path.exists(DATA_CONFIG['output_jsonl']):
    stats = analyze_ditto_results(DATA_CONFIG['output_jsonl'])
    if 'error' not in stats:
        summary['results'] = stats

# Save summary
summary_path = os.path.join(DATA_CONFIG['temp_dir'], 'workflow_summary.json')
with open(summary_path, 'w') as f:
    json.dump(summary, f, indent=2)

print(f"📋 Summary saved to: {summary_path}")

print("\n🎉 Workflow Testing Complete!")
print("\n📋 Next Steps for Kubeflow Deployment:")
print("1. ✅ Test completed successfully - ready for Kubeflow packaging")
print("2. 🔧 Update configuration parameters in the Kubeflow pipeline")
print("3. 🐳 Build Docker image with all dependencies")
print("4. 🚀 Deploy pipeline to Kubeflow")

print(f"\n📁 Generated Files:")
for file_path in [DATA_CONFIG['input_jsonl'], DATA_CONFIG['output_jsonl'], summary_path]:
    if os.path.exists(file_path):
        size = os.path.getsize(file_path)
        print(f"  ✅ {file_path} ({size} bytes)")
    else:
        print(f"  ❌ {file_path} (not created)")

## 12. Generate Kubeflow Pipeline Configuration

In [None]:
# Generate configuration for Kubeflow pipeline based on test results
print("🔧 Generating Kubeflow pipeline configuration...")

pipeline_config = {
    "pipeline_name": "hive-ditto-entity-matching",
    "description": "Entity matching pipeline with Hive integration",
    "parameters": {
        "hive_host": HIVE_CONFIG['host'],
        "hive_port": HIVE_CONFIG['port'],
        "hive_database": HIVE_CONFIG['database'],
        "hive_username": HIVE_CONFIG['username'],
        "input_table": DATA_CONFIG['input_table'],
        "output_table": DATA_CONFIG['output_table'],
        "ditto_task": DITTO_CONFIG['task'],
        "ditto_lm": DITTO_CONFIG['lm'],
        "ditto_max_len": DITTO_CONFIG['max_len'],
        "checkpoint_path": DITTO_CONFIG['checkpoint_path'],
        "use_gpu": DITTO_CONFIG['use_gpu'],
        "fp16": DITTO_CONFIG['fp16']
    },
    "resources": {
        "extract_step": {
            "cpu": "2",
            "memory": "4Gi"
        },
        "matching_step": {
            "cpu": "4",
            "memory": "8Gi",
            "gpu": "1" if DITTO_CONFIG['use_gpu'] else "0"
        },
        "save_step": {
            "cpu": "2",
            "memory": "4Gi"
        }
    },
    "volumes": {
        "data_volume": "50Gi",
        "models_volume": "10Gi"
    }
}

# Save pipeline configuration
config_path = os.path.join(DATA_CONFIG['temp_dir'], 'kubeflow_pipeline_config.json')
with open(config_path, 'w') as f:
    json.dump(pipeline_config, f, indent=2)

print(f"📋 Pipeline configuration saved to: {config_path}")

# Generate Docker build commands
docker_commands = f"""
# Docker Build Commands for Kubeflow Deployment

# 1. Build the DITTO image with Hive support
docker build -t your-registry/ditto-hive:latest -f Dockerfile .

# 2. Push to your container registry
docker push your-registry/ditto-hive:latest

# 3. Update the pipeline image references
# Edit hive_ditto_pipeline.py and update base_image='your-registry/ditto-hive:latest'

# 4. Compile the pipeline
python hive_ditto_pipeline.py

# 5. Upload the compiled pipeline YAML to Kubeflow
"""

docker_script_path = os.path.join(DATA_CONFIG['temp_dir'], 'docker_build_commands.txt')
with open(docker_script_path, 'w') as f:
    f.write(docker_commands)

print(f"🐳 Docker build commands saved to: {docker_script_path}")

print("\n✅ Kubeflow deployment preparation complete!")
print("\n📋 Configuration Summary:")
print(json.dumps(pipeline_config, indent=2))