In [1]:
import numpy as np
import math
import dimod
from neal import SimulatedAnnealingSampler
from typing import Dict, Tuple, List, Union
import matplotlib.pyplot as plt
from collections import defaultdict

In [2]:
def generate_qkp_instace (n:int = 20, capacity:int = 30, seed: int = None) -> Tuple[np.ndarray, np.ndarray]:
    if seed is not None:
        np.random.seed(seed)

    weights = np.random.randint(1, 11, size = n)
    profits = np.zeros((n, n))

    for i in range(n):
        for j in range(i, n):
            if i == j:
                profits[i, j] = np.random.randint(0, 11)
            else:
                profits[i, j] = np.random.randint(0, 11)
    
    return weights, profits

In [3]:
class QKP:
    """
    Fixed implementation for proper QKP encoding comparison.
    
    Addresses all issues: constraint scaling, penalty parameters, feasibility checking,
    and proper profit calculation.
    """
    
    def __init__(self, weights: List[float], profits: np.ndarray, capacity: float):
        self.weights = np.array(weights)
        self.profits = np.array(profits)
        self.capacity = capacity
        self.n = len(weights)
        
        # Validate inputs
        assert self.profits.shape == (self.n, self.n), "Profits matrix must be n×n"
        assert capacity > 0, "Capacity must be positive"
    
    def _get_encoding_params(self, encoding_type: str) -> Tuple[int, callable]:
        """Get bit depth D and encoding function f(d) for each encoding type."""
        if encoding_type == "one_hot":
            D = int(self.capacity) + 1
            f = lambda d: d
        elif encoding_type == "binary":
            D = math.ceil(math.log2(self.capacity)) if self.capacity > 1 else 1
            f = lambda d: 2**d
        elif encoding_type == "unary":
            D = int(self.capacity)
            f = lambda d: 1
        else:
            raise ValueError("encoding_type must be 'one_hot', 'binary', or 'unary'")
        return D, f
    
    def formulate_qubo(self, encoding_type: str, A: float = 1.0) -> Tuple[np.ndarray, Dict]:
        """Formulate the QUBO matrix for the given encoding type."""
        D, f = self._get_encoding_params(encoding_type)
        N = self.n + D
        Q = np.zeros((N, N))
        
        var_info = {
            'n_items': self.n,
            'bit_depth': D,
            'total_vars': N,
            'x_indices': list(range(self.n)),
            'y_indices': list(range(self.n, N)),
            'encoding_type': encoding_type
        }
        
        # Add objective function: -∑∑ p_{ij} x_i x_j
        for i in range(self.n):
            for j in range(i, self.n):
                if i == j:
                    Q[i, i] -= self.profits[i, i]
                else:
                    Q[i, j] -= self.profits[i, j]
        
        # Add constraint based on encoding type
        if encoding_type == "one_hot":
            self._add_one_hot_constraint(Q, A, D, f)
        elif encoding_type == "binary":
            self._add_binary_constraint(Q, A, D, f)
        elif encoding_type == "unary":
            self._add_unary_constraint(Q, A, D, f)
        
        return Q, var_info
    
    def _add_one_hot_constraint(self, Q: np.ndarray, A: float, D: int, f: callable):
        """Add one-hot encoding constraint terms to QUBO matrix."""
        # First term: (∑ d*y_d - ∑ w_i*x_i)²
        for d1 in range(D):
            for d2 in range(D):
                y1_idx = self.n + d1
                y2_idx = self.n + d2
                if d1 == d2:
                    Q[y1_idx, y1_idx] += A * (f(d1)**2)
                else:
                    Q[y1_idx, y2_idx] += A * f(d1) * f(d2)
        
        for d in range(D):
            for i in range(self.n):
                y_idx = self.n + d
                Q[i, y_idx] += A * (-2 * f(d) * self.weights[i])
        
        for i1 in range(self.n):
            for i2 in range(self.n):
                if i1 == i2:
                    Q[i1, i1] += A * (self.weights[i1]**2)
                else:
                    Q[i1, i2] += A * self.weights[i1] * self.weights[i2]
        
        # Second term: (∑ y_d - 1)²
        for d1 in range(D):
            for d2 in range(D):
                y1_idx = self.n + d1
                y2_idx = self.n + d2
                if d1 == d2:
                    Q[y1_idx, y1_idx] += A
                else:
                    Q[y1_idx, y2_idx] += A
        
        for d in range(D):
            y_idx = self.n + d
            Q[y_idx, y_idx] += A * (-2)
    
    def _add_binary_constraint(self, Q: np.ndarray, A: float, D: int, f: callable):
        """Add binary encoding constraint terms to QUBO matrix."""
        offset = 2**D - 1 - self.capacity
        
        for d1 in range(D):
            for d2 in range(D):
                y1_idx = self.n + d1
                y2_idx = self.n + d2
                if d1 == d2:
                    Q[y1_idx, y1_idx] += A * (f(d1)**2)
                else:
                    Q[y1_idx, y2_idx] += A * f(d1) * f(d2)
        
        for d in range(D):
            y_idx = self.n + d
            Q[y_idx, y_idx] += A * (-2 * f(d) * offset)
        
        for d in range(D):
            for i in range(self.n):
                y_idx = self.n + d
                Q[i, y_idx] += A * (-2 * f(d) * self.weights[i])
        
        for i in range(self.n):
            Q[i, i] += A * (2 * offset * self.weights[i])
        
        for i1 in range(self.n):
            for i2 in range(self.n):
                if i1 == i2:
                    Q[i1, i1] += A * (self.weights[i1]**2)
                else:
                    Q[i1, i2] += A * self.weights[i1] * self.weights[i2]
    
    def _add_unary_constraint(self, Q: np.ndarray, A: float, D: int, f: callable):
        """Add unary encoding constraint terms to QUBO matrix."""
        for d1 in range(D):
            for d2 in range(D):
                y1_idx = self.n + d1
                y2_idx = self.n + d2
                if d1 == d2:
                    Q[y1_idx, y1_idx] += A
                else:
                    Q[y1_idx, y2_idx] += A
        
        for d in range(D):
            for i in range(self.n):
                y_idx = self.n + d
                Q[i, y_idx] += A * (-2 * self.weights[i])
        
        for i1 in range(self.n):
            for i2 in range(self.n):
                if i1 == i2:
                    Q[i1, i1] += A * (self.weights[i1]**2)
                else:
                    Q[i1, i2] += A * self.weights[i1] * self.weights[i2]
    
    def to_bqm(self, encoding_type: str, A: float = 1.0) -> dimod.BinaryQuadraticModel:
        """Convert QUBO to BQM."""
        Q, var_info = self.formulate_qubo(encoding_type, A)
        
        # Create variable names
        var_names = []
        for i in range(var_info['n_items']):
            var_names.append(f"x{i}")
        for i in range(var_info['bit_depth']):
            var_names.append(f"y{i}")
        
        # Convert to QUBO dictionary
        qubo_dict = {}
        n = Q.shape[0]
        
        for i in range(n):
            for j in range(i, n):
                if i == j:
                    coeff = Q[i, i]
                else:
                    coeff = Q[i, j] + Q[j, i]
                
                if abs(coeff) > 1e-12:
                    qubo_dict[(var_names[i], var_names[j])] = coeff
        
        return dimod.BinaryQuadraticModel.from_qubo(qubo_dict)
    
    def analyze_solution(self, sample: Dict, encoding_type: str) -> Dict:
        """Analyze a solution for feasibility and true profit."""
        _, var_info = self.formulate_qubo(encoding_type, 1.0)  # A doesn't matter for analysis
        
        # Extract selected items
        selected_items = []
        for i in range(self.n):
            var_name = f"x{i}"
            if var_name in sample and sample[var_name] == 1:
                selected_items.append(i)
        
        # Calculate actual weight and profit
        total_weight = sum(self.weights[i] for i in selected_items)
        
        # Calculate true profit (objective function value)
        true_profit = 0
        for i in selected_items:
            for j in selected_items:
                if i <= j:  # Upper triangular
                    true_profit += self.profits[i, j]
        
        # Calculate auxiliary variable value
        aux_value = 0
        D = var_info['bit_depth']
        
        if encoding_type == "unary":
            for i in range(D):
                var_name = f"y{i}"
                if var_name in sample and sample[var_name] == 1:
                    aux_value += 1
        elif encoding_type == "binary":
            for i in range(D):
                var_name = f"y{i}"
                if var_name in sample and sample[var_name] == 1:
                    aux_value += 2**i
            offset = 2**D - 1 - self.capacity
            aux_value -= offset
        elif encoding_type == "one_hot":
            aux_bits_set = []
            for i in range(D):
                var_name = f"y{i}"
                if var_name in sample and sample[var_name] == 1:
                    aux_bits_set.append(i)
            
            if len(aux_bits_set) == 1:
                aux_value = aux_bits_set[0]
            else:
                aux_value = -1  # Invalid one-hot encoding
        
        # Check constraints
        knapsack_feasible = total_weight <= self.capacity
        encoding_consistent = abs(total_weight - aux_value) < 1e-10
        
        # One-hot specific check
        if encoding_type == "one_hot":
            one_hot_valid = len([i for i in range(D) if f"y{i}" in sample and sample[f"y{i}"] == 1]) == 1
            encoding_consistent = encoding_consistent and one_hot_valid
        
        return {
            'selected_items': selected_items,
            'total_weight': total_weight,
            'true_profit': true_profit,
            'aux_value': aux_value,
            'knapsack_feasible': knapsack_feasible,
            'encoding_consistent': encoding_consistent,
            'is_feasible': knapsack_feasible and encoding_consistent,
            'constraint_violation': abs(total_weight - aux_value)
        }
    
    def find_optimal_penalty(self, encoding_type: str, 
                           A_candidates: List[float] = None,
                           target_feasible_rate: float = 0.8,
                           num_samples: int = 200) -> Tuple[float, Dict]:
        """Find optimal penalty parameter for an encoding."""
        
        if A_candidates is None:
            if encoding_type == "binary":
                A_candidates = [0.1, 0.5, 1, 2, 5, 10, 20]
            elif encoding_type == "unary":
                A_candidates = [10, 20, 50, 100, 200]
            else:  # one_hot
                A_candidates = [50, 100, 200, 500, 1000]
        
        sampler = SimulatedAnnealingSampler()
        best_A = None
        best_score = -1
        results = {}
        
        print(f"\n=== Finding optimal A for {encoding_type.upper()} ===")
        
        for A in A_candidates:
            try:
                bqm = self.to_bqm(encoding_type, A)
                sampleset = sampler.sample(bqm, num_reads=num_samples)
                
                feasible_count = 0
                profits = []
                constraint_violations = []
                
                for sample in sampleset.samples():
                    analysis = self.analyze_solution(sample, encoding_type)
                    constraint_violations.append(analysis['constraint_violation'])
                    
                    if analysis['is_feasible']:
                        feasible_count += 1
                        profits.append(analysis['true_profit'])
                
                feasible_rate = feasible_count / num_samples
                avg_profit = np.mean(profits) if profits else 0
                avg_violation = np.mean(constraint_violations)
                
                # Score: prioritize feasible rate, then profit
                if feasible_rate >= target_feasible_rate:
                    score = feasible_rate + avg_profit / 10000  # Normalize profit
                else:
                    score = feasible_rate * 0.5  # Penalize low feasible rate
                
                results[A] = {
                    'feasible_rate': feasible_rate,
                    'avg_profit': avg_profit,
                    'avg_violation': avg_violation,
                    'num_feasible': feasible_count
                }
                
                print(f"A={A:6.1f}: feasible={feasible_rate:.2f}, profit={avg_profit:6.1f}, violation={avg_violation:.3f}")
                
                if score > best_score:
                    best_score = score
                    best_A = A
                    
            except Exception as e:
                print(f"A={A:6.1f}: Error - {e}")
                results[A] = None
        
        print(f"Best A = {best_A} (score = {best_score:.3f})")
        return best_A, results
    
    def compare_encodings(self, num_trials: int = 20, num_samples_per_trial: int = 1000) -> Dict:
        """Compare all three encodings properly."""
        
        print("=== AUTOMATED ENCODING COMPARISON ===")
        print(f"Problem: {self.n} items, capacity = {self.capacity}")
        print()
        
        # Find optimal penalty parameters
        optimal_A = {}
        penalty_results = {}
        
        for encoding in ["unary", "binary", "one_hot"]:
            A, results = self.find_optimal_penalty(encoding)
            optimal_A[encoding] = A
            penalty_results[encoding] = results
        
        print(f"\n=== Optimal penalty parameters ===")
        for encoding, A in optimal_A.items():
            print(f"{encoding:8s}: A = {A}")
        
        # Performance comparison with optimal parameters
        sampler = SimulatedAnnealingSampler()
        comparison_results = {}
        
        print(f"\n=== Performance comparison ({num_trials} trials) ===")
        
        for encoding in ["unary", "binary", "one_hot"]:
            A = optimal_A[encoding]
            bqm = self.to_bqm(encoding, A)
            
            trial_results = {
                'feasible_profits': [],
                'feasible_weights': [],
                'feasible_rates': [],
                'best_profits': [],
                'constraint_violations': []
            }
            
            print(f"\n--- {encoding.upper()} (A={A}) ---")
            
            for trial in range(num_trials):
                sampleset = sampler.sample(bqm, num_reads=num_samples_per_trial)
                
                trial_feasible_profits = []
                trial_violations = []
                
                for sample in sampleset.samples():
                    analysis = self.analyze_solution(sample, encoding)
                    trial_violations.append(analysis['constraint_violation'])
                    
                    if analysis['is_feasible']:
                        trial_feasible_profits.append(analysis['true_profit'])
                        trial_results['feasible_weights'].append(analysis['total_weight'])
                
                trial_results['feasible_profits'].extend(trial_feasible_profits)
                trial_results['constraint_violations'].extend(trial_violations)
                
                feasible_rate = len(trial_feasible_profits) / num_samples_per_trial
                trial_results['feasible_rates'].append(feasible_rate)
                
                if trial_feasible_profits:
                    trial_results['best_profits'].append(max(trial_feasible_profits))
                
                if (trial + 1) % 5 == 0:
                    print(f"  Trial {trial + 1:2d}: {len(trial_feasible_profits):3d} feasible solutions")
            
            # Statistics
            if trial_results['feasible_profits']:
                profits = trial_results['feasible_profits']
                weights = trial_results['feasible_weights']
                rates = trial_results['feasible_rates']
                bests = trial_results['best_profits']
                violations = trial_results['constraint_violations']
                
                print(f"  Feasible rate: {np.mean(rates):.1%} ± {np.std(rates):.1%}")
                print(f"  Profit: {np.mean(profits):.1f} ± {np.std(profits):.1f}")
                print(f"  Best profit: {np.mean(bests):.1f} ± {np.std(bests):.1f}")
                print(f"  Weight: {np.mean(weights):.1f} ± {np.std(weights):.1f}")
                print(f"  Constraint violation: {np.mean(violations):.4f} ± {np.std(violations):.4f}")
                
                comparison_results[encoding] = {
                    'optimal_A': A,
                    'feasible_rate_mean': np.mean(rates),
                    'feasible_rate_std': np.std(rates),
                    'profit_mean': np.mean(profits),
                    'profit_std': np.std(profits),
                    'best_profit_mean': np.mean(bests),
                    'best_profit_std': np.std(bests),
                    'weight_mean': np.mean(weights),
                    'weight_std': np.std(weights),
                    'violation_mean': np.mean(violations),
                    'violation_std': np.std(violations),
                    'all_profits': profits,
                    'all_rates': rates,
                    'all_bests': bests
                }
            else:
                print(f"  No feasible solutions found!")
                comparison_results[encoding] = None
        
        # Summary
        print(f"\n=== FINAL COMPARISON SUMMARY ===")
        print(f"{'Encoding':<10} {'Feasible%':<10} {'Avg Profit':<12} {'Best Profit':<12} {'Opt A':<8}")
        print("-" * 55)
        
        for encoding in ["unary", "binary", "one_hot"]:
            if comparison_results[encoding] is not None:
                r = comparison_results[encoding]
                print(f"{encoding:<10} {r['feasible_rate_mean']:.1%}±{r['feasible_rate_std']:.1%}   "
                      f"{r['profit_mean']:6.1f}±{r['profit_std']:4.1f}   "
                      f"{r['best_profit_mean']:6.1f}±{r['best_profit_std']:4.1f}   "
                      f"{r['optimal_A']:<8}")
            else:
                print(f"{encoding:<10} {'FAILED':<10} {'N/A':<12} {'N/A':<12} {'N/A':<8}")
        
        return {
            'optimal_penalties': optimal_A,
            'penalty_search_results': penalty_results,
            'performance_results': comparison_results
        }

In [4]:
weights, profits = generate_qkp_instace(n = 20, capacity = 50)
print(f"Generated QKP with {len(weights)} items, capacity = 30")
print(f"Weights: {weights}")
print(f"Weight sum: {sum(weights)} (capacity utilization: {sum(weights)/30:.1%})")
    

Generated QKP with 20 items, capacity = 30
Weights: [4 1 9 4 9 7 3 5 6 6 7 2 3 7 4 2 3 7 6 8]
Weight sum: 103 (capacity utilization: 343.3%)


In [5]:
comparison = QKP(weights, profits, 50)

In [6]:
results = comparison.compare_encodings(num_trials=10, num_samples_per_trial=500)

=== AUTOMATED ENCODING COMPARISON ===
Problem: 20 items, capacity = 50


=== Finding optimal A for UNARY ===
A=  10.0: feasible=1.00, profit= 314.5, violation=0.000
A=  20.0: feasible=1.00, profit= 253.3, violation=0.000
A=  50.0: feasible=1.00, profit= 187.8, violation=0.000
A= 100.0: feasible=1.00, profit= 151.2, violation=0.000
A= 200.0: feasible=1.00, profit= 132.8, violation=0.000
Best A = 10 (score = 1.031)

=== Finding optimal A for BINARY ===
A=   0.1: feasible=0.00, profit=   0.0, violation=44.000
A=   0.5: feasible=0.00, profit=   0.0, violation=14.000
A=   1.0: feasible=0.00, profit=   0.0, violation=3.375
A=   2.0: feasible=0.01, profit= 468.0, violation=2.425
A=   5.0: feasible=0.26, profit= 412.6, violation=1.370
A=  10.0: feasible=0.43, profit= 340.2, violation=0.805
A=  20.0: feasible=0.61, profit= 298.0, violation=0.385
Best A = 20 (score = 0.307)

=== Finding optimal A for ONE_HOT ===
A=  50.0: feasible=0.00, profit=   0.0, violation=59.305
A= 100.0: feasible=0.00, pr