In [1]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from var import rsa_lib

sns.set_style('whitegrid')

# Get number of available CPU cores (for thread pool size)
import os
N_WORKERS = os.cpu_count() or 4
print(f"Using {N_WORKERS} parallel workers")

Using 10 parallel workers


## Load Test Cases

In [2]:
# Load multiple encrypted texts for cross-validation
test_cases = []
var_path = Path('var')

for i in range(1, 9):  # Load variants 1-8
    file_path = var_path / f'{i}_encrypted_text.txt'
    if file_path.exists():
        with open(file_path, 'r') as f:
            lines = f.readlines()
            # Parse the file format:
            # Line 0: len of text: <number>
            # Line 1: [encrypted array]
            # Line 2: public key:<number>
            # Line 3: n:<number>
            ciphertext_line = lines[1].strip()
            ciphertext = eval(ciphertext_line)
            test_cases.append({
                'id': i,
                'ciphertext': ciphertext,
                'length': len(ciphertext)
            })

print(f"Loaded {len(test_cases)} test cases")
for tc in test_cases:
    print(f"  Case {tc['id']}: length={tc['length']}")

Loaded 8 test cases
  Case 1: length=24
  Case 2: length=41
  Case 3: length=43
  Case 4: length=40
  Case 5: length=38
  Case 6: length=35
  Case 7: length=43
  Case 8: length=42


In [3]:
# RSA parameters
public_key = 65537
n = 33227
ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ, "

## GA Implementation Functions

In [4]:
def random_char(alphabet):
    return random.choice(alphabet)

def random_text(length, alphabet):
    return "".join(random_char(alphabet) for _ in range(length))

def encrypt_candidate(text, public_key, n):
    return rsa_lib.encrypt_text(text, public_key, n)

def error_P(candidate_text, target_cipher, public_key, n):
    cand_cipher = encrypt_candidate(candidate_text, public_key, n)
    L = min(len(cand_cipher), len(target_cipher))
    s = sum((cand_cipher[i] - target_cipher[i]) ** 2 for i in range(L))
    return s / L

def fitness(candidate_text, target_cipher, public_key, n):
    P = error_P(candidate_text, target_cipher, public_key, n)
    return 1.0 / (1.0 + P)

def tournament_select(population, fitnesses, tournament_size):
    selected_idx = random.randrange(len(population))
    for _ in range(tournament_size - 1):
        i = random.randrange(len(population))
        if fitnesses[i] > fitnesses[selected_idx]:
            selected_idx = i
    return population[selected_idx]

def one_point_crossover(parent1, parent2):
    L = len(parent1)
    if L < 2:
        return parent1, parent2
    cut = random.randint(1, L - 1)
    child1 = parent1[:cut] + parent2[cut:]
    child2 = parent2[:cut] + parent1[cut:]
    return child1, child2

def mutate(text, mutation_rate, alphabet):
    text_list = list(text)
    for i in range(len(text_list)):
        if random.random() < mutation_rate:
            text_list[i] = random_char(alphabet)
    return "".join(text_list)

In [5]:
def run_ga(target_cipher, text_length, pop_size, mutation_rate, crossover_rate, 
           tournament_size, elite_count, max_generations=2000, target_error=0.0,
           stagnation_limit=100, verbose=True):
    """
    Run the genetic algorithm with given parameters.
    Returns: (generations_to_converge, best_individual, best_error)
    
    Args:
        stagnation_limit: Stop if no improvement in error for this many generations
    """
    # Initialize population
    population = [random_text(text_length, ALPHABET) for _ in range(pop_size)]
    
    # Track stagnation
    best_error_ever = float('inf')
    generations_without_improvement = 0
    
    for generation in range(max_generations):
        # Evaluate current population
        fitnesses = [fitness(ind, target_cipher, public_key, n) for ind in population]
        
        # Sort by fitness
        pop_fit = list(zip(population, fitnesses))
        pop_fit.sort(key=lambda x: x[1], reverse=True)
        
        best_ind, best_fit = pop_fit[0]
        best_error = error_P(best_ind, target_cipher, public_key, n)
        
        if verbose and generation % 100 == 0:
            print(f"Gen {generation:4d} | error = {best_error:.6f} | stagnation = {generations_without_improvement}")
        
        # Check convergence
        if best_error <= target_error:
            return generation, best_ind, best_error
        
        # Check for improvement
        if best_error < best_error_ever - 1e-10:  # Small tolerance for floating point
            best_error_ever = best_error
            generations_without_improvement = 0
        else:
            generations_without_improvement += 1
        
        # Check stagnation - early stopping
        if generations_without_improvement >= stagnation_limit:
            if verbose:
                print(f"Early stop: no improvement for {stagnation_limit} generations")
            return generation, best_ind, best_error
        
        # Create new generation
        new_population = [ind for ind, fit in pop_fit[:elite_count]]
        
        while len(new_population) < pop_size:
            parent1 = tournament_select(population, fitnesses, tournament_size)
            parent2 = tournament_select(population, fitnesses, tournament_size)
            
            if random.random() < crossover_rate:
                child1, child2 = one_point_crossover(parent1, parent2)
            else:
                child1, child2 = parent1, parent2
            
            child1 = mutate(child1, mutation_rate, ALPHABET)
            child2 = mutate(child2, mutation_rate, ALPHABET)
            
            new_population.append(child1)
            if len(new_population) < pop_size:
                new_population.append(child2)

        population = new_population
    
    # Did not converge
    return max_generations, best_ind, best_error

## Parallel Execution Functions

In [6]:
def run_single_fold_wrapper(test_case, params, fold_idx, seed_offset=0):
    """
    Worker function for parallel execution of a single fold.
    Returns: dict with fold results
    """
    # Set random seed for reproducibility (different for each fold)
    random.seed(42 + fold_idx + seed_offset)
    np.random.seed(42 + fold_idx + seed_offset)
    
    try:
        generations, best_ind, best_error = run_ga(
            target_cipher=test_case['ciphertext'],
            text_length=test_case['length'],
            max_generations=1000,
            target_error=0.0,
            verbose=False,  # Disable verbose in parallel mode
            **params
        )
        
        result = {
            'fold_idx': fold_idx,
            'test_case_id': test_case['id'],
            'generations': generations,
            'error': best_error,
            'converged': best_error == 0.0
        }
        return result
    except Exception as e:
        print(f"Error in fold {fold_idx}: {e}")
        return {
            'fold_idx': fold_idx,
            'test_case_id': test_case['id'],
            'generations': 1000,
            'error': float('inf'),
            'converged': False
        }


def evaluate_params_parallel(params, test_cases, n_cv_folds, seed_offset=0):
    """
    Evaluate parameters across multiple folds in parallel using threads.
    """
    fold_results = []
    
    # Use ThreadPoolExecutor for better Jupyter notebook compatibility
    with ThreadPoolExecutor(max_workers=min(N_WORKERS, n_cv_folds)) as executor:
        # Submit all tasks
        future_to_fold = {}
        for i in range(min(n_cv_folds, len(test_cases))):
            future = executor.submit(
                run_single_fold_wrapper,
                test_cases[i],
                params,
                i,
                seed_offset
            )
            future_to_fold[future] = i
        
        # Collect results as they complete
        for future in as_completed(future_to_fold):
            fold_idx = future_to_fold[future]
            try:
                result = future.result()
                fold_results.append(result)
            except Exception as e:
                print(f"Exception in fold {fold_idx}: {e}")
                fold_results.append({
                    'fold_idx': fold_idx,
                    'test_case_id': test_cases[fold_idx]['id'],
                    'generations': 1000,
                    'error': float('inf'),
                    'converged': False
                })
    
    return sorted(fold_results, key=lambda x: x['fold_idx'])

## Grid Search Cross-Validation

In [7]:
# Define parameter grid
param_grid = {
    'pop_size': [100],
    'mutation_rate': [0.01, 0.1, 0.5],
    'crossover_rate': [0.7, 0.8, 0.9],
    'tournament_size': [3, 10, 50],
    'elite_count': [2, 10, 50]
}

print("Parameter grid:")
for param, values in param_grid.items():
    print(f"  {param}: {values}")

# Total combinations
total_combinations = np.prod([len(v) for v in param_grid.values()])
print(f"\nTotal parameter combinations: {total_combinations}")

Parameter grid:
  pop_size: [100]
  mutation_rate: [0.01, 0.1, 0.5]
  crossover_rate: [0.7, 0.8, 0.9]
  tournament_size: [3, 10, 50]
  elite_count: [2, 10, 50]

Total parameter combinations: 81


## Run Cross-Validation (with subset of test cases)

We'll use a subset of test cases and parameter combinations for feasibility.

In [None]:
# For practical reasons, we'll do random search instead of full grid search
# Sample random parameter combinations with parallel execution
n_random_samples = 50  # Adjust based on computational budget
n_cv_folds = 8  # Use all 8 test cases for cross-validation

random.seed(42)
np.random.seed(42)

# Generate all parameter combinations upfront
param_combinations = []
for sample_idx in range(n_random_samples):
    params = {
        'pop_size': random.choice(param_grid['pop_size']),
        'mutation_rate': random.choice(param_grid['mutation_rate']),
        'crossover_rate': random.choice(param_grid['crossover_rate']),
        'tournament_size': random.choice(param_grid['tournament_size']),
        'elite_count': random.choice(param_grid['elite_count'])
    }
    param_combinations.append(params)

print(f"Running cross-validation with {n_random_samples} parameter combinations")
print(f"Using {n_cv_folds} test cases per combination")
print(f"Parallelizing across {N_WORKERS} workers (threads)")
print(f"Total evaluations: {n_random_samples * n_cv_folds}")
print("\nStarting parallel cross-validation...")

results = []

# Process each parameter combination
for sample_idx, params in enumerate(param_combinations):
    print(f"\n{'='*60}")
    print(f"Sample {sample_idx + 1}/{n_random_samples}")
    print(f"Parameters: {params}")
    print('='*60)
    
    # Run folds in parallel using threads
    fold_results = evaluate_params_parallel(params, test_cases, n_cv_folds, seed_offset=sample_idx*1000)
    
    # Display fold results
    for fold_result in fold_results:
        status = "+" if fold_result['converged'] else "âœ—"
        print(f"  {status} Fold {fold_result['fold_idx'] + 1}/{n_cv_folds} (Case {fold_result['test_case_id']}): "
              f"{fold_result['generations']} gen, error={fold_result['error']:.6f}")
    
    # Aggregate results
    avg_generations = np.mean([r['generations'] for r in fold_results])
    avg_error = np.mean([r['error'] for r in fold_results])
    convergence_rate = np.mean([r['converged'] for r in fold_results])
    
    result = {
        **params,
        'avg_generations': avg_generations,
        'avg_error': avg_error,
        'convergence_rate': convergence_rate,
        'fold_results': fold_results
    }
    results.append(result)
    
    print(f"\nSummary: {avg_generations:.1f} avg generations, {convergence_rate*100:.1f}% convergence rate")

print("\n" + "="*60)
print("Cross-validation completed!")
print("="*60)

Running cross-validation with 50 parameter combinations
Using 8 test cases per combination
Parallelizing across 10 workers (threads)
Total evaluations: 400

Starting parallel cross-validation...

Sample 1/50
Parameters: {'pop_size': 100, 'mutation_rate': 0.01, 'crossover_rate': 0.9, 'tournament_size': 10, 'elite_count': 2}


## Analyze Results

In [None]:
# Convert to DataFrame
df_results = pd.DataFrame([{k: v for k, v in r.items() if k != 'fold_results'} for r in results])

# Sort by average generations (considering only converged solutions)
df_converged = df_results[df_results['convergence_rate'] > 0].sort_values('avg_generations')

print("Top 5 parameter combinations (by average generations):")
print(df_converged.head(5))

In [None]:
# Best parameters
if len(df_converged) > 0:
    best_params = df_converged.iloc[0]
    print("\n" + "="*60)
    print("BEST PARAMETERS:")
    print("="*60)
    print(f"Population size:     {int(best_params['pop_size'])}")
    print(f"Mutation rate:       {best_params['mutation_rate']:.3f}")
    print(f"Crossover rate:      {best_params['crossover_rate']:.2f}")
    print(f"Tournament size:     {int(best_params['tournament_size'])}")
    print(f"Elite count:         {int(best_params['elite_count'])}")
    print(f"\nAvg. generations:    {best_params['avg_generations']:.1f}")
    print(f"Convergence rate:    {best_params['convergence_rate']*100:.1f}%")
else:
    print("No converged solutions found. Try increasing max_generations or adjusting parameter ranges.")

## Visualize Parameter Effects

In [None]:
# Plot parameter effects
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

params_to_plot = ['pop_size', 'mutation_rate', 'crossover_rate', 'tournament_size', 'elite_count']

for idx, param in enumerate(params_to_plot):
    ax = axes[idx]
    
    # Group by parameter and calculate mean generations
    grouped = df_converged.groupby(param)['avg_generations'].agg(['mean', 'std', 'count'])
    
    ax.errorbar(grouped.index, grouped['mean'], yerr=grouped['std'], 
                marker='o', capsize=5, capthick=2, linewidth=2)
    ax.set_xlabel(param.replace('_', ' ').title())
    ax.set_ylabel('Avg. Generations')
    ax.set_title(f'Effect of {param.replace("_", " ").title()}')
    ax.grid(True, alpha=0.3)

# Remove extra subplot
fig.delaxes(axes[-1])

plt.tight_layout()
plt.savefig('cv_parameter_effects.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Convergence rate by parameters
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

for idx, param in enumerate(['pop_size', 'mutation_rate', 'tournament_size']):
    ax = axes[idx]
    grouped = df_results.groupby(param)['convergence_rate'].mean()
    
    ax.bar(range(len(grouped)), grouped.values)
    ax.set_xticks(range(len(grouped)))
    ax.set_xticklabels(grouped.index)
    ax.set_xlabel(param.replace('_', ' ').title())
    ax.set_ylabel('Convergence Rate')
    ax.set_title(f'Convergence Rate vs {param.replace("_", " ").title()}')
    ax.set_ylim([0, 1.1])
    ax.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('cv_convergence_rates.png', dpi=150, bbox_inches='tight')
plt.show()

## Save Results

In [None]:
# Save results to CSV
df_results.to_csv('cv_results.csv', index=False)
print("Results saved to 'cv_results.csv'")

# Save best parameters
if len(df_converged) > 0:
    with open('best_parameters.txt', 'w') as f:
        f.write("BEST GA PARAMETERS (from cross-validation)\n")
        f.write("="*50 + "\n\n")
        f.write(f"POP_SIZE = {int(best_params['pop_size'])}\n")
        f.write(f"MUTATION_RATE = {best_params['mutation_rate']}\n")
        f.write(f"CROSSOVER_RATE = {best_params['crossover_rate']}\n")
        f.write(f"TOURNAMENT_SIZE = {int(best_params['tournament_size'])}\n")
        f.write(f"ELITE_COUNT = {int(best_params['elite_count'])}\n")
        f.write(f"\nAverage generations to convergence: {best_params['avg_generations']:.1f}\n")
        f.write(f"Convergence rate: {best_params['convergence_rate']*100:.1f}%\n")
    print("Best parameters saved to 'best_parameters.txt'")