# Data Fidelity and SQL Fidelity Correlation Analysis

This notebook integrates data volume and SQL subset analysis:
1. Merge two data files
2. Calculate data volume fidelity correlations
3. Perform SQL subset selection analysis
4. Visualize comparison between two approaches


## Step 1: Data Merging

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os
import sys
from scipy.stats import spearmanr, pearsonr, kendalltau
import warnings

print("Starting data merging and analysis...")

In [None]:
file1 = "./20251022_033707/config_validation_results.csv"
file2 = "./20251022_033810/config_validation_results.csv"
output_path = "./merged_results.csv"

def merge_csv_files(file1, file2, output_path):
    print(f"Reading file: {file1}")
    df1 = pd.read_csv(file1)
    print(f"File 1: {len(df1)} records")
    
    print(f"Reading file: {file2}")
    df2 = pd.read_csv(file2)
    print(f"File 2: {len(df2)} records")
    
    max_config_id_df1 = df1['config_id'].max()
    print(f"File 1 max config_id: {max_config_id_df1}")
    
    df2['config_id'] = df2['config_id'] + max_config_id_df1 + 1
    print(f"File 2 config_id range after adjustment: {df2['config_id'].min()} - {df2['config_id'].max()}")
    
    merged_df = pd.concat([df1, df2], ignore_index=True)
    print(f"Total records after merging: {len(merged_df)}")
    
    merged_df.to_csv(output_path, index=False)
    print(f"Merged results saved to: {output_path}")
    
    return merged_df

if os.path.exists(output_path):
    merged_df = pd.read_csv(output_path)
else:
    merged_df = merge_csv_files(file1, file2, output_path)
    print(f"\nMerging complete! Total records: {len(merged_df)}")
    print(f"Fidelity levels: {sorted(merged_df['fidelity'].unique())}")
    print(f"Databases: {merged_df['database'].unique()}")


## Step 2: Data Volume Fidelity Correlation Analysis


In [None]:
def calculate_fidelity_correlations(df, method='spearman'):
    full_fidelity = 1.0
    full_data = df[df['fidelity'] == full_fidelity].copy()
    print(f"Full fidelity data: {len(full_data)} configurations")
    
    fidelity_levels = sorted([f for f in df['fidelity'].unique() if f != full_fidelity])
    print(f"All fidelity levels: {fidelity_levels}")
    
    correlations = {}
    
    for fidelity in fidelity_levels:
        fidelity_data = df[df['fidelity'] == fidelity].copy()
        
        common_configs = set(full_data['config_id']) & set(fidelity_data['config_id'])
        print(f"fidelity={fidelity}: Found {len(common_configs)} common configurations")
        
        if len(common_configs) < 3:
            print(f"fidelity={fidelity}: Insufficient common configurations ({len(common_configs)})")
            continue
        
        full_subset = full_data[full_data['config_id'].isin(common_configs)].copy()
        fidelity_subset = fidelity_data[fidelity_data['config_id'].isin(common_configs)].copy()
        
        full_subset = full_subset.reset_index(drop=True)
        fidelity_subset = fidelity_subset.reset_index(drop=True)
        
        full_subset_copy = full_subset.copy()
        fidelity_subset_copy = fidelity_subset.copy()
        
        incomplete_records = []
        if 'successful_files' in full_subset_copy.columns and 'total_files' in full_subset_copy.columns:
            for idx, row in full_subset_copy.iterrows():
                if row['successful_files'] < row['total_files']:
                    completion_ratio = row['successful_files'] / row['total_files']
                    penalty_factor = np.exp(2.0 * (1.0 - completion_ratio))
                    original_obj = row['objective']
                    full_subset_copy.loc[idx, 'objective'] = original_obj * penalty_factor
                    incomplete_records.append((row['config_id'], completion_ratio, original_obj, row['objective']))
        
        if 'successful_files' in fidelity_subset_copy.columns and 'total_files' in fidelity_subset_copy.columns:
            for idx, row in fidelity_subset_copy.iterrows():
                if row['successful_files'] < row['total_files']:
                    completion_ratio = row['successful_files'] / row['total_files']
                    penalty_factor = np.exp(2.0 * (1.0 - completion_ratio))
                    original_obj = row['objective']
                    fidelity_subset_copy.loc[idx, 'objective'] = original_obj * penalty_factor
                    incomplete_records.append((row['config_id'], completion_ratio, original_obj, row['objective']))
        
        if incomplete_records:
            print(f"  {len(incomplete_records)} records incomplete, applying penalty factor")
            for config_id, ratio, orig, new in incomplete_records[:3]:
                print(f"    Config{config_id}: completion={ratio:.3f}, original={orig:.1f}, penalized={new:.1f}")
        
        full_valid = full_subset_copy['objective'].values
        fidelity_valid = fidelity_subset_copy['objective'].values
        
        valid_mask = ~(np.isnan(full_valid) | np.isnan(fidelity_valid) | 
                    np.isinf(full_valid) | np.isinf(fidelity_valid))
        
        if valid_mask.sum() < 3:
            print(f"fidelity={fidelity}: Insufficient valid data points ({valid_mask.sum()})")
            continue
        
        full_valid = full_valid[valid_mask]
        fidelity_valid = fidelity_valid[valid_mask]
        
        if method == 'spearman':
            corr, pvalue = spearmanr(full_valid, fidelity_valid)
        elif method == 'pearson':
            corr, pvalue = pearsonr(full_valid, fidelity_valid)
        else:  # kendall
            corr, pvalue = kendalltau(full_valid, fidelity_valid)
        
        full_avg_spark_time = full_subset_copy['spark_time'].mean()
        fidelity_avg_spark_time = fidelity_subset_copy['spark_time'].mean()
        time_ratio = fidelity_avg_spark_time / full_avg_spark_time if full_avg_spark_time > 0 else 0.0
        
        full_avg_exec_time = full_subset_copy['elapsed_time'].replace(float('inf'), pd.NA).mean()
        fidelity_avg_exec_time = fidelity_subset_copy['elapsed_time'].replace(float('inf'), pd.NA).mean()
        exec_time_ratio = fidelity_avg_exec_time / full_avg_exec_time if full_avg_exec_time > 0 and not pd.isna(full_avg_exec_time) else 0.0
        
        correlations[fidelity] = {
            'correlation': corr,
            'pvalue': pvalue,
            'n_samples': len(full_valid),
            'database': fidelity_data['database'].iloc[0] if 'database' in fidelity_data.columns else 'unknown',
            'time_stats': {
                'full_avg_spark_time': full_avg_spark_time,
                'fidelity_avg_spark_time': fidelity_avg_spark_time,
                'time_ratio': time_ratio,
                'full_avg_exec_time': full_avg_exec_time,
                'fidelity_avg_exec_time': fidelity_avg_exec_time,
                'exec_time_ratio': exec_time_ratio
            }
        }
        
        print(f"\nfidelity={fidelity} ({correlations[fidelity]['database']}):")
        print(f"  Correlation: {corr:.4f}")
        print(f"  P-value: {pvalue:.4e}")
        print(f"  Samples: {len(full_valid)}")
        print(f"  Time stats:")
        print(f"    Full fidelity avg spark_time: {full_avg_spark_time:.2f}s")
        print(f"    Current fidelity avg spark_time: {fidelity_avg_spark_time:.2f}s")
        print(f"    spark_time ratio: {time_ratio:.4f}")
        print(f"    Full fidelity avg elapsed_time: {full_avg_exec_time:.2f}s")
        print(f"    Current fidelity avg elapsed_time: {fidelity_avg_exec_time:.2f}s")
        print(f"    elapsed_time ratio: {exec_time_ratio:.4f}")
    
    return correlations

print("=" * 60)
print("Data Volume Fidelity Correlation Analysis")
print("=" * 60)

method = 'kendall'
data_volume_correlations = calculate_fidelity_correlations(merged_df, method=method)
correlation_file = f"./dv_{method}_corr.json"
with open(correlation_file, 'w') as f:
    json.dump(data_volume_correlations, f, indent=2, ensure_ascii=False)
print(f"\nCorrelation results saved to: {correlation_file}")


## Step 3: SQL Subset Selection Analysis

In [None]:
import sys
import pandas as pd
sys.path.append('.')

from sql_subset_selection import (
    compute_subset_correlation,
    multi_fidelity_sql_selection,
    multi_fidelity_sql_selection_incremental,
    analyze_fidelity_subsets
)

print("SQL analysis functions imported")


In [None]:
def perform_sql_analysis(df, method='spearman', time_type='spark_time', sql_type='qt', tolerance=0.1):
    print("=" * 60)
    print("SQL Subset Selection Analysis")
    print("=" * 60)
    
    df_full = df[df['fidelity'] == 1.0].copy()
    print(f"Using full fidelity data: {len(df_full)} configurations")
    print(f"Time type: {time_type}, SQL type: {sql_type}, Tolerance: {tolerance}")
    
    fidelity_levels = [1/27, 1/9, 1/3, 3/5, 1.0]
    
    print("\n--- Non-incremental SQL Selection ---")
    fidelity_subsets_non_inc, sql_stats = multi_fidelity_sql_selection(
        df_full, 
        fidelity_levels=fidelity_levels,
        lambda_penalty=0.1,
        correlation_method=method,
        time_type=time_type,
        sql_type=sql_type,
        tolerance=tolerance
    )
    
    print("\n--- Incremental SQL Selection ---")
    fidelity_subsets_inc, _ = multi_fidelity_sql_selection_incremental(
        df_full,
        fidelity_levels=fidelity_levels,
        lambda_penalty=0.1,
        correlation_method=method,
        time_type=time_type,
        sql_type=sql_type,
        tolerance=tolerance
    )
    
    print("\n--- Non-incremental Analysis ---")
    analyze_fidelity_subsets(fidelity_subsets_non_inc, sql_stats, df_full, method)
    
    print("\n--- Incremental Analysis ---")
    analyze_fidelity_subsets(fidelity_subsets_inc, sql_stats, df_full, method)

    save_sql_selection_results(fidelity_subsets_non_inc, fidelity_subsets_inc, sql_stats, method, time_type, sql_type)
    
    return fidelity_subsets_non_inc, fidelity_subsets_inc, sql_stats

def save_sql_selection_results(fidelity_subsets_non_inc, fidelity_subsets_inc, sql_stats, method, time_type, sql_type):
    """Save SQL selection results with detailed statistics to JSON file"""
    
    def calculate_subset_statistics(fidelity_subsets, df_full, sql_stats, method):
        """Calculate detailed statistics for each fidelity subset"""
        subset_stats = {}
        
        for fidelity, sqls in fidelity_subsets.items():
            if not sqls:
                continue
                
            total_estimated_time = sum(sql_stats[sql]['estimated_time'] for sql in sqls)
            subset_corr = compute_subset_correlation(df_full, sqls, sql_stats, method)

            sql_details = {}
            for sql in sqls:
                if sql in sql_stats:
                    sql_details[sql] = {
                        'correlation': sql_stats[sql]['correlation'],
                        'estimated_time': sql_stats[sql]['estimated_time'],
                        'avg_time': sql_stats[sql]['avg_time'],
                        'total_time': sql_stats[sql]['total_time']
                    }
            
            # Calculate correlation statistics
            correlations = [sql_stats[sql]['correlation'] for sql in sqls if sql in sql_stats]
            time_ratios = [sql_stats[sql]['estimated_time'] for sql in sqls if sql in sql_stats]
            
            subset_stats[fidelity] = {
                'sql_count': len(sqls),
                'sql_list': list(sqls),
                'time_ratio': total_estimated_time,
                'subset_correlation': subset_corr,
                'sql_details': sql_details,
                'correlation_stats': {
                    'mean': np.mean(correlations) if correlations else 0,
                    'std': np.std(correlations) if correlations else 0,
                    'min': np.min(correlations) if correlations else 0,
                    'max': np.max(correlations) if correlations else 0
                },
                'time_stats': {
                    'mean': np.mean(time_ratios) if time_ratios else 0,
                    'std': np.std(time_ratios) if time_ratios else 0,
                    'min': np.min(time_ratios) if time_ratios else 0,
                    'max': np.max(time_ratios) if time_ratios else 0
                }
            }
        
        return subset_stats
    
    non_inc_stats = calculate_subset_statistics(fidelity_subsets_non_inc, merged_df[merged_df['fidelity'] == 1.0], sql_stats, method)
    inc_stats = calculate_subset_statistics(fidelity_subsets_inc, merged_df[merged_df['fidelity'] == 1.0], sql_stats, method)
    
    results = {
        'metadata': {
            'method': method,
            'time_type': time_type,
            'sql_type': sql_type,
            'generation_time': pd.Timestamp.now().isoformat(),
            'total_configs': len(merged_df[merged_df['fidelity'] == 1.0]),
            'total_sqls': len(sql_stats)
        },
        'non_incremental': {
            'fidelity_subsets': {k: list(v) for k, v in fidelity_subsets_non_inc.items()},
            'statistics': non_inc_stats
        },
        'incremental': {
            'fidelity_subsets': {k: list(v) for k, v in fidelity_subsets_inc.items()},
            'statistics': inc_stats
        },
        'sql_stats': sql_stats
    }
    
    output_path = f"./sql_{method}_{sql_type}.json"
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    
    print(f"\nSQL selection results saved to: {output_path}")
    print(f"  - Non-incremental: {len(fidelity_subsets_non_inc)} fidelity levels")
    print(f"  - Incremental: {len(fidelity_subsets_inc)} fidelity levels")
    print(f"  - Total SQLs analyzed: {len(sql_stats)}")

method = 'kendall'
time_type = 'spark_time'
sql_type = 'qt'
tolerance = 0.05
fidelity_subsets_non_inc, fidelity_subsets_inc, sql_stats = perform_sql_analysis(merged_df, method, time_type, sql_type, tolerance)


## Step 4: Visualization and Comparison


In [None]:
def prepare_visualization_data(data_volume_correlations, sql_stats_path):
    data_volume_data = []
    for fidelity, stats in data_volume_correlations.items():
        data_volume_data.append({
            'fidelity': fidelity,
            'time_ratio': stats['time_stats']['time_ratio'],
            'correlation': stats['correlation'],
            'type': 'Data Volume'
        })
    
    data_volume_data.append({
        'fidelity': 1.0,
        'time_ratio': 1.0,
        'correlation': 1.0,
        'type': 'Data Volume'
    })

    with open(sql_stats_path, 'r') as f:
        sql_stats = json.load(f)
    sql_non_inc_data = []
    for key, data_dict in sql_stats['non_incremental']['statistics'].items():
        sql_non_inc_data.append({
            'fidelity': key,
            'time_ratio': data_dict['time_ratio'],
            'correlation': data_dict['subset_correlation'],
            'type': 'SQL Selection (Non-incremental)'
        })
    
    sql_inc_data = []
    for key, data_dict in sql_stats['incremental']['statistics'].items():
        sql_inc_data.append({
            'fidelity': key,
            'time_ratio': data_dict['time_ratio'],
            'correlation': data_dict['subset_correlation'],
            'type': 'SQL Selection (Incremental)'
        })
    
    return data_volume_data, sql_non_inc_data, sql_inc_data

method = 'kendall'
time_type = 'spark_time'
sql_type = 'qt'
sql_stats_path = f"./sql_{method}_{sql_type}.json"
data_volume_data, sql_non_inc_data, sql_inc_data = prepare_visualization_data(
    data_volume_correlations,
    sql_stats_path
)

print("Data preparation complete")
print(f"Data volume points: {len(data_volume_data)}")
print(f"SQL non-incremental points: {len(sql_non_inc_data)}")
print(f"SQL incremental points: {len(sql_inc_data)}")

# Debug: Print data to verify correlation values
print("\nDebug - Data volume data:")
for item in sorted(data_volume_data, key=lambda x: x['fidelity']):
    print(f"  Fidelity: {item['fidelity']}, Time ratio: {item['time_ratio']}, Correlation: {item['correlation']}")

print("\nDebug - SQL non-incremental data:")
for item in sorted(sql_non_inc_data, key=lambda x: x['fidelity']):
    print(f"  Fidelity: {item['fidelity']}, Time ratio: {item['time_ratio']}, Correlation: {item['correlation']}")

print("\nDebug - SQL incremental data:")
for item in sorted(sql_inc_data, key=lambda x: x['fidelity']):
    print(f"  Fidelity: {item['fidelity']}, Time ratio: {item['time_ratio']}, Correlation: {item['correlation']}")


In [None]:
def create_comparison_plot(data_volume_data, sql_non_inc_data, sql_inc_data):
    plt.figure(figsize=(12, 8))
    
    df_volume = pd.DataFrame(data_volume_data)
    df_sql_non_inc = pd.DataFrame(sql_non_inc_data)
    df_sql_inc = pd.DataFrame(sql_inc_data)
    
    plt.plot(df_volume['time_ratio'], df_volume['correlation'], 
             'o-', linewidth=2, markersize=8, label='Data Volume', color='blue')
    
    plt.plot(df_sql_non_inc['time_ratio'], df_sql_non_inc['correlation'], 
             's-', linewidth=2, markersize=8, label='SQL Selection (Non-incremental)', color='red')
    
    plt.plot(df_sql_inc['time_ratio'], df_sql_inc['correlation'], 
             '^-', linewidth=2, markersize=8, label='SQL Selection (Incremental)', color='green')
    
    plt.xlabel('Average Time Ratio', fontsize=14)
    plt.ylabel('Correlation with Total Execution Time', fontsize=14)
    plt.title('Data Volume vs SQL Selection Correlation Comparison', fontsize=16, fontweight='bold')
    plt.legend(fontsize=12)
    plt.grid(True, alpha=0.3)
    
    # Set adaptive y-axis range based on data
    all_correlations = list(df_volume['correlation']) + list(df_sql_non_inc['correlation']) + list(df_sql_inc['correlation'])
    min_corr = min(all_correlations)
    max_corr = max(all_correlations)
    y_margin = (max_corr - min_corr) * 0.1  # 10% margin
    plt.ylim(max(0, min_corr - y_margin), min(1.0, max_corr + y_margin))
    
    plt.xlim(0, 1.1)
    
    # Reference lines for interpretation
    plt.axhline(y=0.8, color='gray', linestyle='--', alpha=0.5, label='High Correlation Threshold (0.8)')
    plt.axvline(x=0.5, color='gray', linestyle='--', alpha=0.5, label='Medium Time Ratio (0.5)')
    
    for i, row in df_volume.iterrows():
        plt.annotate(f'{row["correlation"]:.2f}', 
                    (row['time_ratio'], row['correlation']),
                    textcoords="offset points", xytext=(0,10), ha='center')
    
    for i, row in df_sql_non_inc.iterrows():
        plt.annotate(f'{row["correlation"]:.2f}', 
                    (row['time_ratio'], row['correlation']),
                    textcoords="offset points", xytext=(0,-15), ha='center')
    
    for i, row in df_sql_inc.iterrows():
        plt.annotate(f'{row["correlation"]:.2f}', 
                    (row['time_ratio'], row['correlation']),
                    textcoords="offset points", xytext=(0,10), ha='center')
    
    plt.tight_layout()
    
    output_path = "fidelity_comparison.png"
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    print(f"Comparison plot saved to: {output_path}")
    
    plt.show()
    
    return df_volume, df_sql_non_inc, df_sql_inc

df_volume, df_sql_non_inc, df_sql_inc = create_comparison_plot(
    data_volume_data, sql_non_inc_data, sql_inc_data
)


## Results Summary


In [None]:
def print_analysis_summary(df_volume, df_sql_non_inc, df_sql_inc):
    print("=" * 80)
    print("Analysis Results Summary")
    print("=" * 80)
    
    print("\n1. Data Volume Results:")
    print("   Fidelity | Time Ratio | Correlation")
    print("   ---------|------------|------------")
    for _, row in df_volume.iterrows():
        print(f"   {row['fidelity']:8.2f} | {row['time_ratio']:10.4f} | {row['correlation']:10.4f}")
    
    print("\n2. SQL Selection Results (Non-incremental):")
    print("   Fidelity | Time Ratio | Correlation")
    print("   ---------|------------|------------")
    for _, row in df_sql_non_inc.iterrows():
        print(f"   {float(row['fidelity']):8.2f} | {float(row['time_ratio']):10.4f} | {float(row['correlation']):10.4f}")
    
    print("\n3. SQL Selection Results (Incremental):")
    print("   Fidelity | Time Ratio | Correlation")
    print("   ---------|------------|------------")
    for _, row in df_sql_inc.iterrows():
        print(f"   {float(row['fidelity']):8.2f} | {float(row['time_ratio']):10.4f} | {float(row['correlation']):10.4f}")
    
    print("\n4. Efficiency Analysis:")
    
    volume_efficiency = df_volume['correlation'] / df_volume['time_ratio']
    print(f"   Data Volume Average Efficiency: {volume_efficiency.mean():.4f}")
    
    sql_non_inc_efficiency = df_sql_non_inc['correlation'] / df_sql_non_inc['time_ratio']
    sql_inc_efficiency = df_sql_inc['correlation'] / df_sql_inc['time_ratio']
    
    print(f"   SQL Selection (Non-incremental) Average Efficiency: {sql_non_inc_efficiency.mean():.4f}")
    print(f"   SQL Selection (Incremental) Average Efficiency: {sql_inc_efficiency.mean():.4f}")
    
    print("\n5. Strategy Recommendation:")
    if volume_efficiency.mean() > max(sql_non_inc_efficiency.mean(), sql_inc_efficiency.mean()):
        print("   Recommended: Data Volume Strategy")
    elif sql_inc_efficiency.mean() > sql_non_inc_efficiency.mean():
        print("   Recommended: SQL Selection (Incremental) Strategy")
    else:
        print("   Recommended: SQL Selection (Non-incremental) Strategy")
    
    print("\nAnalysis Complete!")

print_analysis_summary(df_volume, df_sql_non_inc, df_sql_inc)


In [None]:
results_summary = {
    'data_volume_analysis': df_volume.to_dict('records'),
    'sql_non_incremental_analysis': df_sql_non_inc.to_dict('records'),
    'sql_incremental_analysis': df_sql_inc.to_dict('records'),
    'summary': {
        'data_volume_efficiency': df_volume['correlation'].sum() / df_volume['time_ratio'].sum(),
        'sql_non_inc_efficiency': df_sql_non_inc['correlation'].sum() / df_sql_non_inc['time_ratio'].sum(),
        'sql_inc_efficiency': df_sql_inc['correlation'].sum() / df_sql_inc['time_ratio'].sum()
    }
}

summary_file = "fidelity_analysis_summary.json"
with open(summary_file, 'w', encoding='utf-8') as f:
    json.dump(results_summary, f, indent=2, ensure_ascii=False)

print(f"Analysis results saved to: {summary_file}")
print("\nAll analysis complete!")
