# Bank Reconciliation Pipeline with PySpark and Data Lineage

This comprehensive notebook demonstrates a complete data pipeline for bank reconciliation using PySpark, featuring:

- 📁 **Interactive File Upload** - Upload CSV/Excel files with drag-and-drop interface
- ✅ **Dynamic Validation Rules** - Create custom validation rules for data quality
- 🔄 **Data Reconciliation** - Compare datasets and identify discrepancies  
- 🔗 **Data Lineage Tracking** - Complete audit trail of data transformations
- 📊 **Real-time Monitoring** - Live dashboards for pipeline status and logs
- 🚀 **End-to-end Pipeline** - Raw data to reportable format transformation

## Prerequisites
- PySpark installed and configured
- Java 8+ for Spark
- Required Python packages: pandas, ipywidgets, plotly

Let's start by importing libraries and setting up our environment!

In [None]:
# Import Required Libraries and Initialize Spark
import sys
import os
from pathlib import Path

# Add src directory to path for imports
notebook_dir = Path.cwd()
src_path = notebook_dir.parent / "src"
sys.path.append(str(src_path))

# Core libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import uuid
import logging

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Visualization libraries
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Interactive widgets
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ Libraries imported successfully!")
print("🚀 Initializing Spark session...")

In [None]:
# Initialize Spark Session with Optimized Configuration
def create_spark_session():
    try:
        spark = SparkSession.builder \
            .appName("BankReconciliationPipeline") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.driver.memory", "2g") \
            .config("spark.executor.memory", "2g") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .getOrCreate()
        
        spark.sparkContext.setLogLevel("WARN")  # Reduce log verbosity
        return spark
    except Exception as e:
        print(f"❌ Failed to initialize Spark: {e}")
        return None

# Create Spark session
spark = create_spark_session()

if spark:
    print("✅ Spark session initialized successfully!")
    print(f"🔥 Spark version: {spark.version}")
    print(f"💾 Available cores: {spark.sparkContext.defaultParallelism}")
else:
    print("❌ Failed to initialize Spark session. Please check your configuration.")

# Global variables for pipeline state
uploaded_files = {}
validation_rules = {}
reconciliation_config = {}
lineage_events = []
pipeline_logs = []

## 📁 File Upload Widget Implementation

Interactive file upload interface for CSV and Excel files with automatic schema detection and preview capabilities.

In [None]:
# File Upload Widget Implementation
class FileUploadManager:
    def __init__(self, spark_session):
        self.spark = spark_session
        self.uploaded_data = {}
        
    def create_upload_widget(self, dataset_name):
        """Create file upload widget for a dataset"""
        upload_widget = widgets.FileUpload(
            accept='.csv,.xlsx,.xls',
            multiple=False,
            description=f'Upload {dataset_name}',
            style={'button_color': '#4CAF50'}
        )
        
        output_widget = widgets.Output()
        
        def on_upload_change(change):
            with output_widget:
                clear_output()
                if upload_widget.value:
                    file_info = list(upload_widget.value.values())[0]
                    filename = list(upload_widget.value.keys())[0]
                    
                    print(f"📁 Processing file: {filename}")
                    print(f"📊 File size: {len(file_info['content']) / 1024:.1f} KB")
                    
                    try:
                        # Save file temporarily
                        temp_path = f"temp_{dataset_name}_{filename}"
                        with open(temp_path, "wb") as f:
                            f.write(file_info['content'])
                        
                        # Load data into Spark DataFrame
                        df = self.load_file_to_dataframe(temp_path, filename)
                        
                        if df is not None:
                            self.uploaded_data[dataset_name] = {
                                'dataframe': df,
                                'filename': filename,
                                'temp_path': temp_path,
                                'row_count': df.count(),
                                'column_count': len(df.columns),
                                'columns': df.columns,
                                'schema': df.dtypes
                            }
                            
                            # Track in lineage
                            event_id = self.track_source_event(dataset_name, filename, df)
                            self.uploaded_data[dataset_name]['event_id'] = event_id
                            
                            print(f"✅ Successfully loaded {df.count()} rows, {len(df.columns)} columns")
                            
                            # Display preview
                            self.display_data_preview(dataset_name)
                            
                        else:
                            print("❌ Failed to load file")
                            
                    except Exception as e:
                        print(f"❌ Error processing file: {str(e)}")
        
        upload_widget.observe(on_upload_change, names='value')
        
        return widgets.VBox([
            widgets.HTML(f"<h4>📁 {dataset_name} Upload</h4>"),
            upload_widget,
            output_widget
        ])
    
    def load_file_to_dataframe(self, file_path, filename):
        """Load file into Spark DataFrame"""
        try:
            if filename.endswith('.csv'):
                df = self.spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
            elif filename.endswith(('.xlsx', '.xls')):
                # Use pandas for Excel, then convert to Spark
                pandas_df = pd.read_excel(file_path)
                df = self.spark.createDataFrame(pandas_df)
            else:
                raise ValueError(f"Unsupported file format: {filename}")
            
            return df
        except Exception as e:
            print(f"Error loading file: {e}")
            return None
    
    def track_source_event(self, dataset_name, filename, df):
        """Track data source in lineage"""
        event_id = str(uuid.uuid4())
        event = {
            'event_id': event_id,
            'event_type': 'SOURCE',
            'timestamp': datetime.now().isoformat(),
            'dataset_name': dataset_name,
            'filename': filename,
            'row_count': df.count(),
            'column_count': len(df.columns),
            'columns': df.columns,
            'schema': df.dtypes
        }
        lineage_events.append(event)
        pipeline_logs.append(f"[{datetime.now().strftime('%H:%M:%S')}] SOURCE: Loaded {dataset_name} from {filename}")
        return event_id
    
    def display_data_preview(self, dataset_name):
        """Display data preview and schema information"""
        if dataset_name not in self.uploaded_data:
            return
        
        data_info = self.uploaded_data[dataset_name]
        df = data_info['dataframe']
        
        print("\\n📊 Data Preview:")
        preview_df = df.limit(5).toPandas()
        display(preview_df)
        
        print("\\n📋 Column Information:")
        schema_df = pd.DataFrame(data_info['schema'], columns=['Column', 'Type'])
        display(schema_df)

# Create file upload manager
upload_manager = FileUploadManager(spark)

# Create upload widgets for two datasets
dataset1_upload = upload_manager.create_upload_widget("Dataset 1 (Bank Statement)")
dataset2_upload = upload_manager.create_upload_widget("Dataset 2 (General Ledger)")

# Display upload widgets
display(widgets.HBox([dataset1_upload, dataset2_upload]))

## 🔍 Data Schema Detection and Column Selection

Automatically detect data schemas and provide interactive column selection for validation and reconciliation operations.

In [None]:
# Data Schema Detection and Column Selection
class SchemaManager:
    def __init__(self, upload_manager):
        self.upload_manager = upload_manager
        self.column_mappings = {}
        
    def create_schema_analysis_widget(self):
        """Create widget for schema analysis and column selection"""
        output_widget = widgets.Output()
        refresh_button = widgets.Button(
            description="🔄 Analyze Schemas",
            button_style='info',
            style={'button_color': '#2196F3'}
        )
        
        def analyze_schemas(button):
            with output_widget:
                clear_output()
                
                if not self.upload_manager.uploaded_data:
                    print("⚠️ Please upload datasets first!")
                    return
                
                print("🔍 Analyzing schemas...")
                
                for dataset_name, data_info in self.upload_manager.uploaded_data.items():
                    print(f"\\n📊 {dataset_name}:")
                    print(f"   Rows: {data_info['row_count']:,}")
                    print(f"   Columns: {data_info['column_count']}")
                    
                    # Detailed schema analysis
                    df = data_info['dataframe']
                    
                    # Check for null values
                    null_counts = []
                    for col_name in df.columns:
                        null_count = df.filter(col(col_name).isNull()).count()
                        null_counts.append((col_name, null_count))
                    
                    # Create schema summary
                    schema_summary = []
                    for i, (col_name, col_type) in enumerate(data_info['schema']):
                        null_count = null_counts[i][1]
                        null_percentage = (null_count / data_info['row_count']) * 100
                        
                        schema_summary.append({
                            'Column': col_name,
                            'Type': col_type,
                            'Null Count': null_count,
                            'Null %': f"{null_percentage:.1f}%"
                        })
                    
                    schema_df = pd.DataFrame(schema_summary)
                    display(schema_df)
                    
                    # Sample values for each column
                    print(f"\\n📋 Sample values for {dataset_name}:")
                    sample_data = df.limit(3).toPandas()
                    for col_name in sample_data.columns:
                        values = sample_data[col_name].tolist()
                        print(f"   {col_name}: {values}")
                
                # Create column mapping interface
                self.create_column_mapping_interface(output_widget)
        
        refresh_button.on_click(analyze_schemas)
        
        return widgets.VBox([
            widgets.HTML("<h4>🔍 Schema Analysis</h4>"),
            refresh_button,
            output_widget
        ])
    
    def create_column_mapping_interface(self, output_widget):
        """Create interface for column mapping and selection"""
        if len(self.upload_manager.uploaded_data) < 2:
            return
        
        datasets = list(self.upload_manager.uploaded_data.keys())
        dataset1_cols = self.upload_manager.uploaded_data[datasets[0]]['columns']
        dataset2_cols = self.upload_manager.uploaded_data[datasets[1]]['columns']
        
        with output_widget:
            print("\\n🔗 Column Mapping for Reconciliation:")
            print("Select columns that should be matched between datasets")
            
            # Find potential matches
            common_cols = set(dataset1_cols).intersection(set(dataset2_cols))
            
            if common_cols:
                print(f"\\n✅ Common columns found: {list(common_cols)}")
                
                # Create mapping widgets
                mapping_widgets = []
                for common_col in common_cols:
                    checkbox = widgets.Checkbox(
                        value=True,
                        description=f"Map: {common_col}",
                        disabled=False
                    )
                    mapping_widgets.append(checkbox)
                
                # Join key selection
                join_keys_widget = widgets.SelectMultiple(
                    options=list(common_cols),
                    value=list(common_cols)[:1] if common_cols else [],
                    description='Join Keys:',
                    disabled=False
                )
                
                # Compare columns selection
                compare_cols_widget = widgets.SelectMultiple(
                    options=list(common_cols),
                    value=list(common_cols)[1:] if len(common_cols) > 1 else [],
                    description='Compare Columns:',
                    disabled=False
                )
                
                # Store selections
                def save_mapping(button):
                    self.column_mappings = {
                        'join_keys': list(join_keys_widget.value),
                        'compare_columns': list(compare_cols_widget.value),
                        'common_columns': list(common_cols)
                    }
                    print("\\n✅ Column mapping saved!")
                
                save_button = widgets.Button(
                    description="💾 Save Mapping",
                    button_style='success'
                )
                save_button.on_click(save_mapping)
                
                display(widgets.VBox([
                    widgets.HTML("<h5>🔗 Reconciliation Configuration</h5>"),
                    join_keys_widget,
                    compare_cols_widget,
                    save_button
                ]))
            else:
                print("⚠️ No common columns found between datasets")

# Create schema manager and display widget
schema_manager = SchemaManager(upload_manager)
schema_widget = schema_manager.create_schema_analysis_widget()
display(schema_widget)

## ✅ Rule Creation Interface

Interactive interface for creating data validation and business rules with support for multiple rule types.

In [None]:
# Rule Creation Interface
class RuleManager:
    def __init__(self, upload_manager):
        self.upload_manager = upload_manager
        self.rules = {}
        
    def create_rule_builder_widget(self):
        """Create comprehensive rule builder interface"""
        
        # Main container
        rule_container = widgets.VBox()
        
        # Dataset selection
        dataset_selector = widgets.Dropdown(
            options=[],
            description='Dataset:',
            disabled=True
        )
        
        # Column selection
        column_selector = widgets.Dropdown(
            options=[],
            description='Column:',
            disabled=True
        )
        
        # Rule type selection
        rule_type_selector = widgets.Dropdown(
            options=[
                ('Not Null', 'not_null'),
                ('Unique Values', 'unique'),
                ('Numeric Range', 'range'),
                ('String Format (Regex)', 'format'),
                ('Custom SQL Condition', 'custom')
            ],
            description='Rule Type:',
            disabled=True
        )
        
        # Rule name
        rule_name_input = widgets.Text(
            description='Rule Name:',
            placeholder='Enter rule name...'
        )
        
        # Dynamic rule parameters container
        rule_params_container = widgets.VBox()
        
        # Rule list display
        rules_display = widgets.Output()
        
        # Buttons
        add_rule_button = widgets.Button(
            description="➕ Add Rule",
            button_style='primary',
            disabled=True
        )
        
        clear_rules_button = widgets.Button(
            description="🗑️ Clear All",
            button_style='warning'
        )
        
        test_rules_button = widgets.Button(
            description="🧪 Test Rules",
            button_style='info'
        )
        
        # Update dataset options when data is uploaded
        def update_dataset_options():
            if self.upload_manager.uploaded_data:
                dataset_selector.options = list(self.upload_manager.uploaded_data.keys())
                dataset_selector.disabled = False
            else:
                dataset_selector.options = []
                dataset_selector.disabled = True
        
        # Update column options when dataset is selected
        def on_dataset_change(change):
            if change['new'] in self.upload_manager.uploaded_data:
                columns = self.upload_manager.uploaded_data[change['new']]['columns']
                column_selector.options = columns
                column_selector.disabled = False
                rule_type_selector.disabled = False
                add_rule_button.disabled = False
            else:
                column_selector.options = []
                column_selector.disabled = True
                rule_type_selector.disabled = True
                add_rule_button.disabled = True
        
        dataset_selector.observe(on_dataset_change, names='value')
        
        # Update rule parameters based on rule type
        def on_rule_type_change(change):
            rule_params_container.children = self.create_rule_parameters(change['new'])\n        \n        rule_type_selector.observe(on_rule_type_change, names='value')
        
        # Add rule function
        def add_rule(button):
            dataset = dataset_selector.value
            column = column_selector.value
            rule_type = rule_type_selector.value
            rule_name = rule_name_input.value or f"{rule_type}_{column}"
            
            if not all([dataset, column, rule_type]):
                with rules_display:
                    print("⚠️ Please fill all required fields")
                return
            
            # Collect rule parameters
            rule_params = self.collect_rule_parameters(rule_params_container.children)
            
            # Create rule object
            rule = {
                'name': rule_name,
                'dataset': dataset,
                'column': column,
                'type': rule_type,
                **rule_params
            }
            
            # Add to rules dictionary
            if dataset not in self.rules:
                self.rules[dataset] = []
            
            self.rules[dataset].append(rule)
            
            # Update display
            self.update_rules_display(rules_display)
            
            # Clear form
            rule_name_input.value = ""
            
            # Track in lineage
            event_id = str(uuid.uuid4())
            lineage_events.append({
                'event_id': event_id,
                'event_type': 'RULE_CREATION',
                'timestamp': datetime.now().isoformat(),
                'rule': rule
            })
            pipeline_logs.append(f"[{datetime.now().strftime('%H:%M:%S')}] RULE: Created {rule_name} for {dataset}.{column}")
        
        add_rule_button.on_click(add_rule)
        
        # Clear rules function
        def clear_rules(button):
            self.rules = {}
            self.update_rules_display(rules_display)
            pipeline_logs.append(f"[{datetime.now().strftime('%H:%M:%S')}] RULE: Cleared all rules")
        
        clear_rules_button.on_click(clear_rules)
        
        # Test rules function
        def test_rules(button):
            self.test_validation_rules(rules_display)
        
        test_rules_button.on_click(test_rules)
        
        # Refresh button to update dataset options
        refresh_button = widgets.Button(
            description="🔄 Refresh",
            button_style='info'
        )
        
        def refresh_datasets(button):
            update_dataset_options()
        
        refresh_button.on_click(refresh_datasets)
        
        # Initial update
        update_dataset_options()
        
        # Assemble widget
        rule_container.children = [
            widgets.HTML("<h4>✅ Validation Rule Builder</h4>"),
            widgets.HBox([dataset_selector, refresh_button]),
            column_selector,
            rule_type_selector,
            rule_name_input,
            rule_params_container,
            widgets.HBox([add_rule_button, clear_rules_button, test_rules_button]),
            widgets.HTML("<h5>📋 Current Rules</h5>"),
            rules_display
        ]
        
        return rule_container
    
    def create_rule_parameters(self, rule_type):
        """Create parameter widgets based on rule type"""
        if rule_type == 'range':
            return [
                widgets.FloatText(description='Min Value:', value=0.0),
                widgets.FloatText(description='Max Value:', value=100.0)
            ]
        elif rule_type == 'format':
            return [
                widgets.Text(
                    description='Regex Pattern:',
                    placeholder='e.g., ^[A-Z]{2}[0-9]{4}$',
                    value=''
                )
            ]
        elif rule_type == 'custom':
            return [
                widgets.Textarea(
                    description='SQL Condition:',
                    placeholder='e.g., column_name > 0 AND column_name < 1000',
                    value=''
                )
            ]
        else:
            return [widgets.HTML("<i>No additional parameters required</i>")]
    
    def collect_rule_parameters(self, param_widgets):
        """Collect parameters from parameter widgets"""
        params = {}
        for widget in param_widgets:
            if hasattr(widget, 'description') and hasattr(widget, 'value'):
                if 'Min Value' in widget.description:
                    params['min_value'] = widget.value
                elif 'Max Value' in widget.description:
                    params['max_value'] = widget.value
                elif 'Regex Pattern' in widget.description:
                    params['pattern'] = widget.value
                elif 'SQL Condition' in widget.description:
                    params['condition'] = widget.value
        return params
    
    def update_rules_display(self, output_widget):
        """Update the rules display"""
        with output_widget:
            clear_output()
            if not self.rules:
                print("📋 No rules defined yet")
                return
            
            for dataset, rules_list in self.rules.items():
                print(f"\\n📊 {dataset} ({len(rules_list)} rules):")
                for i, rule in enumerate(rules_list, 1):
                    rule_desc = f"{i}. {rule['name']} ({rule['type']}) on {rule['column']}"
                    if rule['type'] == 'range':
                        rule_desc += f" [{rule.get('min_value', 'N/A')} - {rule.get('max_value', 'N/A')}]"
                    elif rule['type'] == 'format':
                        rule_desc += f" pattern: {rule.get('pattern', 'N/A')}"
                    print(f"   {rule_desc}")
    
    def test_validation_rules(self, output_widget):
        """Test validation rules on uploaded data"""
        if not self.rules:
            with output_widget:
                print("⚠️ No rules to test")
            return
        
        with output_widget:
            print("\\n🧪 Testing validation rules...")
            
            for dataset, rules_list in self.rules.items():
                if dataset in self.upload_manager.uploaded_data:
                    df = self.upload_manager.uploaded_data[dataset]['dataframe']
                    print(f"\\n📊 Testing {len(rules_list)} rules on {dataset}:")
                    
                    for rule in rules_list:
                        result = self.apply_validation_rule(df, rule)
                        status = "✅ PASS" if result['passed'] else "❌ FAIL"
                        print(f"   {rule['name']}: {status} ({result['violations']} violations)")

    def apply_validation_rule(self, df, rule):
        """Apply a single validation rule"""
        column = rule['column']
        rule_type = rule['type']
        
        try:
            if rule_type == 'not_null':
                violations = df.filter(col(column).isNull()).count()
            elif rule_type == 'unique':
                total_rows = df.count()
                unique_rows = df.select(column).distinct().count()
                violations = total_rows - unique_rows
            elif rule_type == 'range':
                min_val = rule.get('min_value', float('-inf'))
                max_val = rule.get('max_value', float('inf'))
                violations = df.filter(
                    (col(column) < min_val) | (col(column) > max_val)
                ).count()
            elif rule_type == 'format':
                pattern = rule.get('pattern', '')
                violations = df.filter(~col(column).rlike(pattern)).count()
            elif rule_type == 'custom':
                condition = rule.get('condition', '')
                violations = df.filter(~expr(condition.replace('column_name', column))).count()
            else:
                violations = 0
            
            return {
                'passed': violations == 0,
                'violations': violations,
                'error': None
            }
        except Exception as e:
            return {
                'passed': False,
                'violations': 0,
                'error': str(e)
            }

# Create rule manager and display widget
rule_manager = RuleManager(upload_manager)
rule_builder_widget = rule_manager.create_rule_builder_widget()
display(rule_builder_widget)

## 🔍 Data Validation Engine

Comprehensive validation engine that applies rules to data and generates detailed validation reports.

In [None]:
# Data Validation Engine
class ValidationEngine:
    def __init__(self, spark_session, rule_manager, upload_manager):
        self.spark = spark_session
        self.rule_manager = rule_manager
        self.upload_manager = upload_manager
        self.validation_results = {}
        
    def create_validation_widget(self):
        """Create validation execution and results widget"""
        
        output_widget = widgets.Output()
        
        # Validation control buttons
        run_validation_button = widgets.Button(
            description="🚀 Run Validation",
            button_style='success',
            style={'button_color': '#4CAF50'}
        )
        
        export_results_button = widgets.Button(
            description="📊 Export Results",
            button_style='info'
        )
        
        def run_validation(button):
            with output_widget:
                clear_output()
                if not self.rule_manager.rules:
                    print("⚠️ No validation rules defined!")
                    return
                
                print("🚀 Starting validation process...")
                self.validation_results = {}
                
                for dataset_name, rules_list in self.rule_manager.rules.items():
                    if dataset_name in self.upload_manager.uploaded_data:
                        print(f"\\n📊 Validating {dataset_name}...")
                        
                        df = self.upload_manager.uploaded_data[dataset_name]['dataframe']
                        dataset_results = self.validate_dataset(df, rules_list, dataset_name)
                        self.validation_results[dataset_name] = dataset_results
                        
                        # Display results summary
                        self.display_validation_summary(dataset_name, dataset_results)
                
                # Create detailed validation report
                print("\\n📋 Generating detailed validation report...")
                self.create_validation_report()
        
        run_validation_button.on_click(run_validation)
        
        def export_results(button):
            with output_widget:
                if not self.validation_results:
                    print("⚠️ No validation results to export!")
                    return
                
                self.export_validation_results()
                print("✅ Validation results exported!")
        
        export_results_button.on_click(export_results)
        
        return widgets.VBox([
            widgets.HTML("<h4>🔍 Data Validation Engine</h4>"),
            widgets.HBox([run_validation_button, export_results_button]),
            output_widget
        ])
    
    def validate_dataset(self, df, rules_list, dataset_name):
        """Validate a dataset against a list of rules"""
        results = {
            'dataset_name': dataset_name,
            'total_rows': df.count(),
            'rules_applied': len(rules_list),
            'rules_passed': 0,
            'rules_failed': 0,
            'total_violations': 0,
            'rule_results': []
        }
        
        for rule in rules_list:
            rule_result = self.rule_manager.apply_validation_rule(df, rule)
            
            # Enhance rule result with additional information
            enhanced_result = {
                **rule_result,
                'rule_name': rule['name'],
                'rule_type': rule['type'],
                'column': rule['column'],
                'dataset': dataset_name,
                'violation_percentage': (rule_result['violations'] / results['total_rows']) * 100 if results['total_rows'] > 0 else 0
            }
            
            results['rule_results'].append(enhanced_result)
            
            if rule_result['passed']:
                results['rules_passed'] += 1
            else:
                results['rules_failed'] += 1
                results['total_violations'] += rule_result['violations']
        
        # Calculate overall validation score
        results['validation_score'] = (results['rules_passed'] / results['rules_applied']) * 100 if results['rules_applied'] > 0 else 0
        
        # Track validation in lineage
        event_id = str(uuid.uuid4())
        lineage_events.append({
            'event_id': event_id,
            'event_type': 'VALIDATION',
            'timestamp': datetime.now().isoformat(),
            'dataset_name': dataset_name,
            'validation_summary': {
                'total_rows': results['total_rows'],
                'rules_applied': results['rules_applied'],
                'rules_passed': results['rules_passed'],
                'rules_failed': results['rules_failed'],
                'validation_score': results['validation_score']
            }
        })
        
        pipeline_logs.append(f"[{datetime.now().strftime('%H:%M:%S')}] VALIDATION: {dataset_name} - {results['validation_score']:.1f}% passed")
        
        return results
    
    def display_validation_summary(self, dataset_name, results):
        """Display validation summary for a dataset"""
        print(f"\\n📊 {dataset_name} Validation Summary:")
        print(f"   Total Rows: {results['total_rows']:,}")
        print(f"   Rules Applied: {results['rules_applied']}")
        print(f"   Rules Passed: {results['rules_passed']} ✅")
        print(f"   Rules Failed: {results['rules_failed']} ❌")
        print(f"   Total Violations: {results['total_violations']:,}")
        print(f"   Validation Score: {results['validation_score']:.1f}%")
        
        # Show failed rules
        if results['rules_failed'] > 0:
            print("\\n❌ Failed Rules:")
            for rule_result in results['rule_results']:
                if not rule_result['passed']:
                    print(f"   • {rule_result['rule_name']}: {rule_result['violations']:,} violations ({rule_result['violation_percentage']:.1f}%)")
    
    def create_validation_report(self):
        """Create comprehensive validation report with visualizations"""
        if not self.validation_results:
            return
        
        # Validation summary chart
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('Validation Scores by Dataset', 'Rules Pass/Fail Distribution', 
                          'Violations by Dataset', 'Rule Performance'),
            specs=[[{"type": "bar"}, {"type": "pie"}],
                   [{"type": "bar"}, {"type": "bar"}]]
        )
        
        # Dataset validation scores
        datasets = list(self.validation_results.keys())
        scores = [self.validation_results[ds]['validation_score'] for ds in datasets]
        
        fig.add_trace(
            go.Bar(x=datasets, y=scores, name="Validation Score", 
                   marker_color=['green' if s >= 90 else 'orange' if s >= 70 else 'red' for s in scores]),
            row=1, col=1
        )
        
        # Overall pass/fail distribution
        total_passed = sum(r['rules_passed'] for r in self.validation_results.values())
        total_failed = sum(r['rules_failed'] for r in self.validation_results.values())
        
        fig.add_trace(
            go.Pie(labels=['Passed', 'Failed'], values=[total_passed, total_failed],
                   marker_colors=['green', 'red']),
            row=1, col=2
        )
        
        # Violations by dataset
        violations = [self.validation_results[ds]['total_violations'] for ds in datasets]
        
        fig.add_trace(
            go.Bar(x=datasets, y=violations, name="Total Violations", marker_color='red'),
            row=2, col=1
        )
        
        # Rule performance (top failing rules)
        all_rule_results = []
        for dataset_results in self.validation_results.values():
            all_rule_results.extend(dataset_results['rule_results'])
        
        failing_rules = [r for r in all_rule_results if not r['passed']]
        failing_rules.sort(key=lambda x: x['violations'], reverse=True)
        top_failing = failing_rules[:10]  # Top 10 failing rules
        
        if top_failing:
            rule_names = [f"{r['rule_name']} ({r['dataset']})" for r in top_failing]
            rule_violations = [r['violations'] for r in top_failing]
            
            fig.add_trace(
                go.Bar(x=rule_violations, y=rule_names, orientation='h', 
                       name="Violations", marker_color='orange'),
                row=2, col=2
            )
        
        fig.update_layout(height=800, showlegend=False, title_text="Data Validation Report")
        fig.show()
        
        # Create detailed rule results table
        detailed_results = []
        for dataset_results in self.validation_results.values():
            for rule_result in dataset_results['rule_results']:
                detailed_results.append({
                    'Dataset': rule_result['dataset'],
                    'Rule Name': rule_result['rule_name'],
                    'Rule Type': rule_result['rule_type'],
                    'Column': rule_result['column'],
                    'Status': '✅ Pass' if rule_result['passed'] else '❌ Fail',
                    'Violations': rule_result['violations'],
                    'Violation %': f"{rule_result['violation_percentage']:.2f}%"
                })
        
        if detailed_results:
            print("\\n📋 Detailed Rule Results:")
            detailed_df = pd.DataFrame(detailed_results)
            display(detailed_df)
    
    def export_validation_results(self):
        """Export validation results to files"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Export summary results
        summary_data = []
        for dataset_name, results in self.validation_results.items():
            summary_data.append({
                'dataset': dataset_name,
                'total_rows': results['total_rows'],
                'rules_applied': results['rules_applied'],
                'rules_passed': results['rules_passed'],
                'rules_failed': results['rules_failed'],
                'total_violations': results['total_violations'],
                'validation_score': results['validation_score'],
                'timestamp': datetime.now().isoformat()
            })
        
        summary_df = pd.DataFrame(summary_data)
        summary_df.to_csv(f"validation_summary_{timestamp}.csv", index=False)
        
        # Export detailed results
        detailed_data = []
        for dataset_results in self.validation_results.values():
            for rule_result in dataset_results['rule_results']:
                detailed_data.append({
                    'dataset': rule_result['dataset'],
                    'rule_name': rule_result['rule_name'],
                    'rule_type': rule_result['rule_type'],
                    'column': rule_result['column'],
                    'passed': rule_result['passed'],
                    'violations': rule_result['violations'],
                    'violation_percentage': rule_result['violation_percentage'],
                    'error': rule_result.get('error', ''),
                    'timestamp': datetime.now().isoformat()
                })
        
        detailed_df = pd.DataFrame(detailed_data)
        detailed_df.to_csv(f"validation_detailed_{timestamp}.csv", index=False)

# Create validation engine and display widget
validation_engine = ValidationEngine(spark, rule_manager, upload_manager)
validation_widget = validation_engine.create_validation_widget()
display(validation_widget)