In [3]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',  # Keeping as requested
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.vehicles_by_size_distance = self._group_vehicles()
        
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups

    def calculate_required_vehicles(self, vehicle: Dict) -> int:
        """Calculate required number of vehicles based on demand and yearly range"""
        return int(np.ceil(vehicle['demand'] / vehicle['yearly_range']))

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        
        for _ in range(population_size):
            solution = {
                v['vehicle_type']: random.randint(0, self.calculate_required_vehicles(v))
                for v in vehicles
            }
            population.append(solution)
        
        return population

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost (insurance + maintenance + fuel)
        """
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        # Fitness score combining TOPSIS, cost, and demand penalty
        return weighted_topsis + cost_score - demand_penalty

    def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions"""
        crossover_point = random.randint(1, len(parent1) - 1)
        child1 = {}
        child2 = {}

        for i, vehicle_type in enumerate(parent1.keys()):
            if i < crossover_point:
                child1[vehicle_type] = parent1[vehicle_type]
                child2[vehicle_type] = parent2[vehicle_type]
            else:
                child1[vehicle_type] = parent2[vehicle_type]
                child2[vehicle_type] = parent1[vehicle_type]

        return child1, child2

    def mutate(self, solution: Dict, mutation_rate: float = 0.1, size_distance: Tuple = None) -> Dict:
        """Mutate a solution by randomly adjusting the number of vehicles"""
        mutated_solution = solution.copy()

        # Ensure size_distance is passed, as it is needed for vehicle grouping
        if size_distance is None:
            raise ValueError("size_distance must be provided for mutation")

        vehicles = self.vehicles_by_size_distance[size_distance]
        
        for vehicle_type in mutated_solution.keys():
            if random.random() < mutation_rate:
                # Get the maximum allowed number of vehicles for this vehicle
                max_vehicles = self.calculate_required_vehicles(next(v for v in vehicles if v['vehicle_type'] == vehicle_type))
                mutated_solution[vehicle_type] = max(0, min(max_vehicles, mutated_solution[vehicle_type] + random.choice([-1, 1])))

        return mutated_solution

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for generation in range(generations):
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                              for solution in population]
            
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Track best fitness score
            print(f"Generation {generation + 1}: Best Fitness Score = {fitness_scores[0][1]}")

            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2)
                
                # Pass size_distance during mutation
                child1 = self.mutate(child1, size_distance=size_distance)
                child2 = self.mutate(child2, size_distance=size_distance)
                
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles
                    })

        return pd.DataFrame(results)

def main(csv_path: str):
    """Main function to run the optimization"""
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path)
    print(results_df)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)


Generation 1: Best Fitness Score = 27.484541431041233
Generation 2: Best Fitness Score = 27.484541431041233
Generation 3: Best Fitness Score = 28.40313207481445
Generation 4: Best Fitness Score = 28.40313207481445
Generation 5: Best Fitness Score = 28.40313207481445
Generation 6: Best Fitness Score = 28.40313207481445
Generation 7: Best Fitness Score = 28.40313207481445
Generation 8: Best Fitness Score = 28.40313207481445
Generation 9: Best Fitness Score = 28.40313207481445
Generation 10: Best Fitness Score = 28.40313207481445
Generation 11: Best Fitness Score = 28.40313207481445
Generation 12: Best Fitness Score = 28.40313207481445
Generation 13: Best Fitness Score = 28.40313207481445
Generation 14: Best Fitness Score = 28.40313207481445
Generation 15: Best Fitness Score = 28.40313207481445
Generation 16: Best Fitness Score = 28.40313207481445
Generation 17: Best Fitness Score = 28.40313207481445
Generation 18: Best Fitness Score = 28.40313207481445
Generation 19: Best Fitness Score =

In [4]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',  # Keeping as requested
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.vehicles_by_size_distance = self._group_vehicles()
        
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups

    def calculate_required_vehicles(self, vehicle: Dict) -> int:
        """Calculate required number of vehicles based on demand and yearly range"""
        return int(np.ceil(vehicle['demand'] / vehicle['yearly_range']))

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        
        for _ in range(population_size):
            solution = {
                v['vehicle_type']: random.randint(0, self.calculate_required_vehicles(v))
                for v in vehicles
            }
            population.append(solution)
        
        return population

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost (insurance + maintenance + fuel)
        """
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        # Fitness score combining TOPSIS, cost, and demand penalty
        return weighted_topsis + cost_score - demand_penalty

    def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions"""
        crossover_point = random.randint(1, len(parent1) - 1)
        child1 = {}
        child2 = {}

        for i, vehicle_type in enumerate(parent1.keys()):
            if i < crossover_point:
                child1[vehicle_type] = parent1[vehicle_type]
                child2[vehicle_type] = parent2[vehicle_type]
            else:
                child1[vehicle_type] = parent2[vehicle_type]
                child2[vehicle_type] = parent1[vehicle_type]

        return child1, child2

    def mutate(self, solution: Dict, mutation_rate: float = 0.1, size_distance: Tuple = None) -> Dict:
        """Mutate a solution by randomly adjusting the number of vehicles"""
        mutated_solution = solution.copy()

        # Ensure size_distance is passed, as it is needed for vehicle grouping
        if size_distance is None:
            raise ValueError("size_distance must be provided for mutation")

        vehicles = self.vehicles_by_size_distance[size_distance]
        
        for vehicle_type in mutated_solution.keys():
            if random.random() < mutation_rate:
                # Get the maximum allowed number of vehicles for this vehicle
                max_vehicles = self.calculate_required_vehicles(next(v for v in vehicles if v['vehicle_type'] == vehicle_type))
                new_vehicle_count = mutated_solution[vehicle_type] + random.randint(-1, 1)
                # Ensure vehicle count is within the feasible range
                mutated_solution[vehicle_type] = max(0, min(max_vehicles, new_vehicle_count))

        return mutated_solution

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for generation in range(generations):
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                              for solution in population]
            
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Track best fitness score
            print(f"Generation {generation + 1}: Best Fitness Score = {fitness_scores[0][1]}")

            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2)
                
                # Pass size_distance during mutation
                child1 = self.mutate(child1, size_distance=size_distance)
                child2 = self.mutate(child2, size_distance=size_distance)
                
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles
                    })

        return pd.DataFrame(results)

def main(csv_path: str):
    """Main function to run the optimization"""
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path)
    results_df.to_csv("optimized_fleet_allocation.csv", index=False)


Generation 1: Best Fitness Score = 22.56099210915077
Generation 2: Best Fitness Score = 24.72876950374236
Generation 3: Best Fitness Score = 28.40313207481445
Generation 4: Best Fitness Score = 28.40313207481445
Generation 5: Best Fitness Score = 28.40313207481445
Generation 6: Best Fitness Score = 28.40313207481445
Generation 7: Best Fitness Score = 28.40313207481445
Generation 8: Best Fitness Score = 28.40313207481445
Generation 9: Best Fitness Score = 28.40313207481445
Generation 10: Best Fitness Score = 28.40313207481445
Generation 11: Best Fitness Score = 28.40313207481445
Generation 12: Best Fitness Score = 28.40313207481445
Generation 13: Best Fitness Score = 28.40313207481445
Generation 14: Best Fitness Score = 28.40313207481445
Generation 15: Best Fitness Score = 28.40313207481445
Generation 16: Best Fitness Score = 28.40313207481445
Generation 17: Best Fitness Score = 28.40313207481445
Generation 18: Best Fitness Score = 28.40313207481445
Generation 19: Best Fitness Score = 2

In [1]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',  # Keeping as requested
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.vehicles_by_size_distance = self._group_vehicles()
        
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups

    def calculate_required_vehicles(self, vehicle: Dict) -> int:
        """Calculate required number of vehicles based on demand and yearly range"""
        return int(np.ceil(vehicle['demand'] / vehicle['yearly_range']))

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        
        for _ in range(population_size):
            solution = {
                v['vehicle_type']: random.randint(0, self.calculate_required_vehicles(v))
                for v in vehicles
            }
            population.append(solution)
        
        return population

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost (insurance + maintenance + fuel)
        """
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        return weighted_topsis + cost_score - demand_penalty

    def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions"""
        crossover_point = random.randint(1, len(parent1) - 1)
        child1 = {}
        child2 = {}

        for i, vehicle_type in enumerate(parent1.keys()):
            if i < crossover_point:
                child1[vehicle_type] = parent1[vehicle_type]
                child2[vehicle_type] = parent2[vehicle_type]
            else:
                child1[vehicle_type] = parent2[vehicle_type]
                child2[vehicle_type] = parent1[vehicle_type]

        return child1, child2

    def mutate(self, solution: Dict, mutation_rate: float = 0.1) -> Dict:
        """Mutate a solution by randomly adjusting the number of vehicles"""
        mutated_solution = solution.copy()
        for vehicle_type in mutated_solution.keys():
            if random.random() < mutation_rate:
                mutated_solution[vehicle_type] = max(0, mutated_solution[vehicle_type] + random.choice([-1, 1]))

        return mutated_solution

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for _ in range(generations):
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                              for solution in population]
            
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2)
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles
                    })

        return pd.DataFrame(results)

def main(csv_path: str):
    """Main function to run the optimization"""
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path)
    print(results_df)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)



Optimized Fleet Allocation:
              Allocation Vehicle    Cost ($)         Fuel  no_of_vehicles
0   Size S1, Distance D1     BEV   946780.44  Electricity              33
1   Size S1, Distance D1  Diesel   854651.70          B20              26
2   Size S1, Distance D1     LNG   784647.90          LNG              34
3   Size S1, Distance D2  Diesel  1512076.08          B20              46
4   Size S1, Distance D2     LNG  1153893.98          LNG              50
5   Size S1, Distance D3  Diesel  1709303.39          B20              52
6   Size S1, Distance D3     LNG  1338517.01          LNG              58
7   Size S1, Distance D4  Diesel   887522.91          B20              27
8   Size S1, Distance D4     LNG   623102.75          LNG              27
9   Size S2, Distance D1     BEV  1105657.79  Electricity              32
10  Size S2, Distance D1  Diesel   809794.87          B20              23
11  Size S2, Distance D1     LNG   796664.73          LNG              31
12  Size 

In [4]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame, max_total_vehicles_per_bucket: int = 10):
        self.data = data
        self.max_total_vehicles_per_bucket = max_total_vehicles_per_bucket
        self.vehicles_by_size_distance = self._group_vehicles()
    
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups

    def is_valid_solution(self, solution: Dict) -> bool:
        """Check if the total number of vehicles in the solution is within the maximum limit"""
        return sum(solution.values()) <= self.max_total_vehicles_per_bucket

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        
        while len(population) < population_size:
            # Generate a solution
            solution = {v['vehicle_type']: 0 for v in vehicles}
            remaining_vehicles = self.max_total_vehicles_per_bucket
            
            # Randomly distribute vehicles while respecting the total maximum
            vehicle_types = list(solution.keys())
            while remaining_vehicles > 0 and vehicle_types:
                vehicle_type = random.choice(vehicle_types)
                if random.random() < 0.5:  # 50% chance to add a vehicle
                    solution[vehicle_type] += 1
                    remaining_vehicles -= 1
                else:
                    vehicle_types.remove(vehicle_type)
            
            if self.is_valid_solution(solution):
                population.append(solution)
        
        return population

    def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions while respecting total vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            crossover_point = random.randint(1, len(parent1) - 1)
            child1 = {}
            child2 = {}
            
            for i, vehicle_type in enumerate(parent1.keys()):
                if i < crossover_point:
                    child1[vehicle_type] = parent1[vehicle_type]
                    child2[vehicle_type] = parent2[vehicle_type]
                else:
                    child1[vehicle_type] = parent2[vehicle_type]
                    child2[vehicle_type] = parent1[vehicle_type]
            
            if self.is_valid_solution(child1) and self.is_valid_solution(child2):
                return child1, child2
            
            attempts += 1
        
        # If we can't create valid children, return copies of parents
        return parent1.copy(), parent2.copy()

    def mutate(self, solution: Dict, mutation_rate: float = 0.1) -> Dict:
        """Mutate a solution while respecting the total vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            mutated_solution = solution.copy()
            total_vehicles = sum(mutated_solution.values())
            
            for vehicle_type in mutated_solution.keys():
                if random.random() < mutation_rate:
                    change = random.choice([-1, 1])
                    if (change == 1 and total_vehicles < self.max_total_vehicles_per_bucket) or \
                       (change == -1 and mutated_solution[vehicle_type] > 0):
                        mutated_solution[vehicle_type] += change
                        total_vehicles += change
            
            if self.is_valid_solution(mutated_solution):
                return mutated_solution
            
            attempts += 1
        
        return solution  # Return original if no valid mutation found

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost
        4. Penalty for exceeding maximum vehicles
        """
        if not self.is_valid_solution(solution):
            return float('-inf')
            
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        return weighted_topsis + cost_score - demand_penalty

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for _ in range(generations):
            # Calculate fitness for all solutions
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                            for solution in population]
            
            # Sort by fitness (descending)
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Update best solution if needed
            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            # Select parents for next generation
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            # Create next generation
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2)
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] 
                                     if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles
                    })

        return pd.DataFrame(results)

def main(csv_path: str, max_vehicles: int = 10):
    """Main function to run the optimization"""
    print(f"\nRunning optimization with maximum {max_vehicles} vehicles per bucket...")
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df, max_total_vehicles_per_bucket=max_vehicles)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path, max_vehicles=10)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)


Running optimization with maximum 10 vehicles per bucket...

Optimized Fleet Allocation:
              Allocation Vehicle   Cost ($) Fuel  no_of_vehicles
0   Size S1, Distance D1     LNG  230778.80  LNG              10
1   Size S1, Distance D2     LNG  230778.80  LNG              10
2   Size S1, Distance D3  Diesel   65742.44  B20               2
3   Size S1, Distance D3     LNG  184623.04  LNG               8
4   Size S1, Distance D4     LNG  230778.80  LNG              10
5   Size S2, Distance D1     LNG  256988.62  LNG              10
6   Size S2, Distance D2     LNG  256988.62  LNG              10
7   Size S2, Distance D3     LNG  256988.62  LNG              10
8   Size S2, Distance D4     LNG  256988.62  LNG              10
9   Size S3, Distance D1  Diesel   54122.69  B20               2
10  Size S3, Distance D1     LNG  169920.41  LNG               8
11  Size S3, Distance D2     LNG  212400.52  LNG              10
12  Size S3, Distance D3     LNG  212400.52  LNG              10


In [9]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

class FleetEvaluator:
    def __init__(self, solution: Dict, vehicles_data: List[Dict], size_distance: Tuple):
        self.solution = solution
        self.vehicles_data = vehicles_data
        self.size_distance = size_distance
        self.demand = vehicles_data[0]['demand']  # Demand is same for all vehicles in bucket
        
    def evaluate_demand_coverage(self) -> Dict:
        """Evaluate how well the solution meets the demand"""
        total_capacity = 0
        for vehicle in self.vehicles_data:
            num_vehicles = self.solution[vehicle['vehicle_type']]
            total_capacity += num_vehicles * vehicle['yearly_range']
        
        coverage_ratio = total_capacity / self.demand if self.demand > 0 else 0
        
        return {
            'total_capacity': total_capacity,
            'demand': self.demand,
            'coverage_ratio': coverage_ratio,
            'is_demand_met': coverage_ratio >= 1,
            'capacity_surplus': max(0, total_capacity - self.demand),
            'capacity_deficit': max(0, self.demand - total_capacity)
        }
    
    def evaluate_costs(self) -> Dict:
        """Evaluate cost metrics of the solution"""
        total_cost = 0
        cost_breakdown = {
            'insurance': 0,
            'maintenance': 0,
            'fuel': 0
        }
        
        for vehicle in self.vehicles_data:
            num_vehicles = self.solution[vehicle['vehicle_type']]
            cost_breakdown['insurance'] += num_vehicles * vehicle['insurance_cost']
            cost_breakdown['maintenance'] += num_vehicles * vehicle['maintenance_cost']
            cost_breakdown['fuel'] += num_vehicles * vehicle['fuel_costs']
            
        total_cost = sum(cost_breakdown.values())
        
        return {
            'total_cost': total_cost,
            'cost_breakdown': cost_breakdown,
            'cost_per_km': total_cost / self.demand if self.demand > 0 else 0
        }
    
    def evaluate_fleet_composition(self) -> Dict:
        """Evaluate the composition of the fleet"""
        total_vehicles = sum(self.solution.values())
        composition = {}
        
        for vehicle in self.vehicles_data:
            vehicle_type = vehicle['vehicle_type']
            num_vehicles = self.solution[vehicle_type]
            composition[vehicle_type] = {
                'count': num_vehicles,
                'percentage': (num_vehicles / total_vehicles * 100) if total_vehicles > 0 else 0,
                'topsis_score': vehicle['topsis_score'],
                'weighted_topsis': vehicle['topsis_score'] * num_vehicles
            }
            
        return {
            'total_vehicles': total_vehicles,
            'composition': composition,
            'average_topsis': sum(v['weighted_topsis'] for v in composition.values()) / total_vehicles if total_vehicles > 0 else 0
        }
    
    def evaluate_sustainability(self) -> Dict:
        """Evaluate sustainability metrics"""
        fuel_types = {}
        for vehicle in self.vehicles_data:
            num_vehicles = self.solution[vehicle['vehicle_type']]
            fuel_type = vehicle['fuel']
            if fuel_type not in fuel_types:
                fuel_types[fuel_type] = 0
            fuel_types[fuel_type] += num_vehicles
            
        total_vehicles = sum(self.solution.values())
        
        return {
            'fuel_distribution': {
                fuel: {
                    'count': count,
                    'percentage': (count / total_vehicles * 100) if total_vehicles > 0 else 0
                }
                for fuel, count in fuel_types.items()
            }
        }
    
    def get_comprehensive_evaluation(self) -> Dict:
        """Get comprehensive evaluation of all metrics"""
        demand_eval = self.evaluate_demand_coverage()
        cost_eval = self.evaluate_costs()
        fleet_eval = self.evaluate_fleet_composition()
        sustainability_eval = self.evaluate_sustainability()
        
        solution_score = self._calculate_solution_score(
            demand_eval, cost_eval, fleet_eval, sustainability_eval
        )
        
        return {
            'size_distance': self.size_distance,
            'solution_score': solution_score,
            'demand_evaluation': demand_eval,
            'cost_evaluation': cost_eval,
            'fleet_evaluation': fleet_eval,
            'sustainability_evaluation': sustainability_eval
        }
    
    def _calculate_solution_score(self, demand_eval, cost_eval, fleet_eval, sustainability_eval) -> float:
        """Calculate an overall solution score (0-100)"""
        score = 0
        
        # Demand coverage (40 points)
        coverage_ratio = min(demand_eval['coverage_ratio'], 1.2)  # Cap at 120%
        score += (coverage_ratio / 1.2) * 40
        
        # Cost efficiency (30 points)
        # Assuming a benchmark cost_per_km (you may need to adjust this)
        benchmark_cost_per_km = 1.0
        cost_ratio = min(benchmark_cost_per_km / cost_eval['cost_per_km'], 1)
        score += cost_ratio * 30
        
        # TOPSIS score (20 points)
        score += (fleet_eval['average_topsis'] / 100) * 20
        
        # Sustainability (10 points)
        # Assuming electric/hybrid vehicles are more sustainable
        sustainable_fuels = {'Electric', 'Hybrid'}
        sustainable_percentage = sum(
            data['percentage'] 
            for fuel, data in sustainability_eval['fuel_distribution'].items()
            if fuel in sustainable_fuels
        )
        score += (sustainable_percentage / 100) * 10
        
        return round(score, 2)

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame, max_total_vehicles_per_bucket: int = 10):
        self.data = data
        self.max_total_vehicles_per_bucket = max_total_vehicles_per_bucket
        self.vehicles_by_size_distance = self._group_vehicles()
    
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups

    def is_valid_solution(self, solution: Dict) -> bool:
        """Check if the total number of vehicles in the solution is within the maximum limit"""
        return sum(solution.values()) <= self.max_total_vehicles_per_bucket

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        
        while len(population) < population_size:
            # Generate a solution
            solution = {v['vehicle_type']: 0 for v in vehicles}
            remaining_vehicles = self.max_total_vehicles_per_bucket
            
            # Randomly distribute vehicles while respecting the total maximum
            vehicle_types = list(solution.keys())
            while remaining_vehicles > 0 and vehicle_types:
                vehicle_type = random.choice(vehicle_types)
                if random.random() < 0.5:  # 50% chance to add a vehicle
                    solution[vehicle_type] += 1
                    remaining_vehicles -= 1
                else:
                    vehicle_types.remove(vehicle_type)
            
            if self.is_valid_solution(solution):
                population.append(solution)
        
        return population

    def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions while respecting total vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            crossover_point = random.randint(1, len(parent1) - 1)
            child1 = {}
            child2 = {}
            
            for i, vehicle_type in enumerate(parent1.keys()):
                if i < crossover_point:
                    child1[vehicle_type] = parent1[vehicle_type]
                    child2[vehicle_type] = parent2[vehicle_type]
                else:
                    child1[vehicle_type] = parent2[vehicle_type]
                    child2[vehicle_type] = parent1[vehicle_type]
            
            if self.is_valid_solution(child1) and self.is_valid_solution(child2):
                return child1, child2
            
            attempts += 1
        
        # If we can't create valid children, return copies of parents
        return parent1.copy(), parent2.copy()

    def mutate(self, solution: Dict, mutation_rate: float = 0.1) -> Dict:
        """Mutate a solution while respecting the total vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            mutated_solution = solution.copy()
            total_vehicles = sum(mutated_solution.values())
            
            for vehicle_type in mutated_solution.keys():
                if random.random() < mutation_rate:
                    change = random.choice([-1, 1])
                    if (change == 1 and total_vehicles < self.max_total_vehicles_per_bucket) or \
                       (change == -1 and mutated_solution[vehicle_type] > 0):
                        mutated_solution[vehicle_type] += change
                        total_vehicles += change
            
            if self.is_valid_solution(mutated_solution):
                return mutated_solution
            
            attempts += 1
        
        return solution  # Return original if no valid mutation found

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost
        4. Penalty for exceeding maximum vehicles
        """
        if not self.is_valid_solution(solution):
            return float('-inf')
            
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        return weighted_topsis + cost_score - demand_penalty

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for _ in range(generations):
            # Calculate fitness for all solutions
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                            for solution in population]
            
            # Sort by fitness (descending)
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Update best solution if needed
            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            # Select parents for next generation
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            # Create next generation
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2)
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def evaluate_solution(self, solution: Dict, size_distance: Tuple) -> Dict:
    # """Evaluate a given solution using the FleetEvaluator"""
        vehicles = self.vehicles_by_size_distance[size_distance]
        evaluator = FleetEvaluator(solution, vehicles, size_distance)
        return evaluator.get_comprehensive_evaluation()

    def get_optimized_results(self) -> pd.DataFrame:
    # """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            # Get vehicle data for this bucket
            vehicles = self.vehicles_by_size_distance[size_distance]
            
            # Initialize solution with all vehicle types set to 0
            initial_solution = {v['vehicle_type']: 0 for v in vehicles}
            
            # Get optimized solution
            best_solution = self.optimize(size_distance)
            
            # Update initial solution with optimized values
            initial_solution.update(best_solution)
            
            # Create results entries
            for vehicle_type, num_vehicles in initial_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in vehicles if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles
                    })

        return pd.DataFrame(results)

def main(csv_path: str, max_vehicles: int = 10):
    # """Main function to run the optimization with evaluation"""
    print(f"\nRunning optimization with maximum {max_vehicles} vehicles per bucket...")
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df, max_total_vehicles_per_bucket=max_vehicles)
    optimized_results = optimizer.get_optimized_results()
    
    # Evaluate solutions for each size-distance bucket
    evaluations = []
    for size_distance in optimizer.vehicles_by_size_distance.keys():
        # Get all vehicles for this bucket
        vehicles = optimizer.vehicles_by_size_distance[size_distance]
        
        # Create solution dictionary with all vehicle types initialized to 0
        solution = {v['vehicle_type']: 0 for v in vehicles}
        
        # Update with optimized results
        bucket_results = optimized_results[
            optimized_results['Allocation'] == f"Size {size_distance[0]}, Distance {size_distance[1]}"
        ]
        for _, row in bucket_results.iterrows():
            solution[row['Vehicle']] = row['no_of_vehicles']
        
        evaluation = optimizer.evaluate_solution(solution, size_distance)
        evaluations.append(evaluation)
        
        print(f"\nEvaluation for Size {size_distance[0]}, Distance {size_distance[1]}:")
        print(f"Solution Score: {evaluation['solution_score']}/100")
        print(f"Demand Coverage: {evaluation['demand_evaluation']['coverage_ratio']:.2f}")
        print(f"Cost per km: ${evaluation['cost_evaluation']['cost_per_km']:.2f}")
        print(f"Average TOPSIS Score: {evaluation['fleet_evaluation']['average_topsis']:.2f}")
    
    return optimized_results, evaluations

# 6. Existing if __name__ == "__main__" block
if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df, evaluations = main(csv_path, max_vehicles=10)
    print(results_df)
    print(evaluations)


Running optimization with maximum 10 vehicles per bucket...

Evaluation for Size S1, Distance D1:
Solution Score: 70.18/100
Demand Coverage: 2.35
Cost per km: $0.55
Average TOPSIS Score: 0.91

Evaluation for Size S1, Distance D2:
Solution Score: 56.38/100
Demand Coverage: 0.79
Cost per km: $0.18
Average TOPSIS Score: 0.98

Evaluation for Size S1, Distance D3:
Solution Score: 50.85/100
Demand Coverage: 0.62
Cost per km: $0.14
Average TOPSIS Score: 0.98

Evaluation for Size S1, Distance D4:
Solution Score: 66.42/100
Demand Coverage: 4.92
Cost per km: $1.14
Average TOPSIS Score: 0.98

Evaluation for Size S2, Distance D1:
Solution Score: 70.18/100
Demand Coverage: 2.02
Cost per km: $0.51
Average TOPSIS Score: 0.91

Evaluation for Size S2, Distance D2:
Solution Score: 70.19/100
Demand Coverage: 1.53
Cost per km: $0.41
Average TOPSIS Score: 0.94

Evaluation for Size S2, Distance D3:
Solution Score: 70.19/100
Demand Coverage: 2.72
Cost per km: $0.68
Average TOPSIS Score: 0.97

Evaluation for

In [None]:
# 1. Existing imports
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np

# 2. Add FleetEvaluator class here
class FleetEvaluator:
    # [Paste the entire FleetEvaluator class code here]

# 3. Existing load_and_preprocess_data function
def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    # [Your existing function code]

# 4. Existing FleetOptimizer class
class FleetOptimizer:
    # [All your existing FleetOptimizer methods]
    
    # Add this new method inside the FleetOptimizer class
    def evaluate_solution(self, solution: Dict, size_distance: Tuple) -> Dict:
        """Evaluate a given solution using the FleetEvaluator"""
        vehicles = self.vehicles_by_size_distance[size_distance]
        evaluator = FleetEvaluator(solution, vehicles, size_distance)
        return evaluator.get_comprehensive_evaluation()

    # Your existing get_optimized_results method
    def get_optimized_results(self):
        # [Your existing method code]

# 5. Replace your existing main function with this version
def main(csv_path: str, max_vehicles: int = 10):
    """Main function to run the optimization with evaluation"""
    print(f"\nRunning optimization with maximum {max_vehicles} vehicles per bucket...")
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df, max_total_vehicles_per_bucket=max_vehicles)
    optimized_results = optimizer.get_optimized_results()
    
    # Evaluate solutions for each size-distance bucket
    evaluations = []
    for size_distance in optimizer.vehicles_by_size_distance.keys():
        solution = {row['Vehicle']: row['no_of_vehicles'] 
                   for _, row in optimized_results.iterrows() 
                   if row['Allocation'] == f"Size {size_distance[0]}, Distance {size_distance[1]}"}
        evaluation = optimizer.evaluate_solution(solution, size_distance)
        evaluations.append(evaluation)
        
        print(f"\nEvaluation for Size {size_distance[0]}, Distance {size_distance[1]}:")
        print(f"Solution Score: {evaluation['solution_score']}/100")
        print(f"Demand Coverage: {evaluation['demand_evaluation']['coverage_ratio']:.2f}")
        print(f"Cost per km: ${evaluation['cost_evaluation']['cost_per_km']:.2f}")
        print(f"Average TOPSIS Score: {evaluation['fleet_evaluation']['average_topsis']:.2f}")
    
    return optimized_results, evaluations

# 6. Existing if __name__ == "__main__" block
if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df, evaluations = main(csv_path, max_vehicles=10)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)

In [11]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np
import math

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    if 'Total_Cost' not in df.columns:
        df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.vehicles_by_size_distance = self._group_vehicles()
        self.max_vehicles_by_group = self._calculate_max_vehicles()
    
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups
    
    def _calculate_max_vehicles(self) -> Dict:
        """Calculate maximum vehicles for each size-distance combination"""
        max_vehicles = {}
        for key, vehicles in self.vehicles_by_size_distance.items():
            # Get demand for this combination
            demand = vehicles[0]['demand']  # All vehicles in the same group have the same demand
            
            # Calculate max vehicles based on the vehicle with highest yearly range
            max_yearly_range = max(v['yearly_range'] for v in vehicles)
            
            # Calculate max vehicles needed and round up to ensure demand is met
            max_vehicles[key] = math.ceil(demand / max_yearly_range)
        
        return max_vehicles

    def is_valid_solution(self, solution: Dict, size_distance: Tuple) -> bool:
        """Check if the total number of vehicles in the solution is within the dynamic maximum limit"""
        return sum(solution.values()) <= self.max_vehicles_by_group[size_distance]

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        max_vehicles = self.max_vehicles_by_group[size_distance]
        
        while len(population) < population_size:
            # Generate a solution
            solution = {v['vehicle_type']: 0 for v in vehicles}
            remaining_vehicles = max_vehicles
            
            # Randomly distribute vehicles while respecting the total maximum
            vehicle_types = list(solution.keys())
            while remaining_vehicles > 0 and vehicle_types:
                vehicle_type = random.choice(vehicle_types)
                if random.random() < 0.5:  # 50% chance to add a vehicle
                    solution[vehicle_type] += 1
                    remaining_vehicles -= 1
                else:
                    vehicle_types.remove(vehicle_type)
            
            if self.is_valid_solution(solution, size_distance):
                population.append(solution)
        
        return population

    def crossover(self, parent1: Dict, parent2: Dict, size_distance: Tuple) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions while respecting dynamic vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            crossover_point = random.randint(1, len(parent1) - 1)
            child1 = {}
            child2 = {}
            
            for i, vehicle_type in enumerate(parent1.keys()):
                if i < crossover_point:
                    child1[vehicle_type] = parent1[vehicle_type]
                    child2[vehicle_type] = parent2[vehicle_type]
                else:
                    child1[vehicle_type] = parent2[vehicle_type]
                    child2[vehicle_type] = parent1[vehicle_type]
            
            if (self.is_valid_solution(child1, size_distance) and 
                self.is_valid_solution(child2, size_distance)):
                return child1, child2
            
            attempts += 1
        
        # If we can't create valid children, return copies of parents
        return parent1.copy(), parent2.copy()

    def mutate(self, solution: Dict, size_distance: Tuple, mutation_rate: float = 0.1) -> Dict:
        """Mutate a solution while respecting the dynamic vehicle limit"""
        attempts = 0
        max_attempts = 10
        max_vehicles = self.max_vehicles_by_group[size_distance]
        
        while attempts < max_attempts:
            mutated_solution = solution.copy()
            total_vehicles = sum(mutated_solution.values())
            
            for vehicle_type in mutated_solution.keys():
                if random.random() < mutation_rate:
                    change = random.choice([-1, 1])
                    if (change == 1 and total_vehicles < max_vehicles) or \
                       (change == -1 and mutated_solution[vehicle_type] > 0):
                        mutated_solution[vehicle_type] += change
                        total_vehicles += change
            
            if self.is_valid_solution(mutated_solution, size_distance):
                return mutated_solution
            
            attempts += 1
        
        return solution  # Return original if no valid mutation found

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost
        4. Penalty for exceeding maximum vehicles
        """
        if not self.is_valid_solution(solution, size_distance):
            return float('-inf')
            
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        return weighted_topsis + cost_score - demand_penalty

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for _ in range(generations):
            # Calculate fitness for all solutions
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                            for solution in population]
            
            # Sort by fitness (descending)
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Update best solution if needed
            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            # Select parents for next generation
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            # Create next generation
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2, size_distance)
                child1 = self.mutate(child1, size_distance)
                child2 = self.mutate(child2, size_distance)
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            max_vehicles = self.max_vehicles_by_group[size_distance]
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] 
                                     if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles,
                        "Max Vehicles": max_vehicles,
                        "Demand": vehicle_data['demand'],
                        "Yearly Range": vehicle_data['yearly_range']
                    })

        return pd.DataFrame(results)

def main(csv_path: str):
    """Main function to run the optimization"""
    print("\nRunning optimization with dynamic maximum vehicles...")
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "data/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)


Running optimization with dynamic maximum vehicles...

Optimized Fleet Allocation:
              Allocation Vehicle   Cost ($)         Fuel  no_of_vehicles  \
0   Size S1, Distance D1     LNG  207700.92          LNG               9   
1   Size S1, Distance D2  Diesel  262969.75          B20               8   
2   Size S1, Distance D2     LNG  415401.83          LNG              18   
3   Size S1, Distance D3  Diesel  525939.50          B20              16   
4   Size S1, Distance D3     LNG  392323.95          LNG              17   
5   Size S1, Distance D4     LNG  115389.40          LNG               5   
6   Size S2, Distance D1     LNG  256988.62          LNG              10   
7   Size S2, Distance D2     LNG  359784.07          LNG              14   
8   Size S2, Distance D3     LNG  205590.90          LNG               8   
9   Size S2, Distance D4     LNG   51397.72          LNG               2   
10  Size S3, Distance D1     BEV   93354.33  Electricity               3   
11  

In [1]:
import pandas as pd
import random
from typing import List, Dict, Tuple
import numpy as np
import math

def load_and_preprocess_data(csv_path: str) -> pd.DataFrame:
    """Load and preprocess the fleet data from CSV file."""
    df = pd.read_csv(csv_path)
    
    # Rename columns properly
    column_mapping = {
        'Unnamed: 0': 'Index',
        'Distance_x': 'Distance_x',
        'Demand (km)': 'Demand',
        'Cost ($)': 'Cost',
        'Yearly range (km)': 'Yearly_Range',
        'insurance_cost': 'Insurance_Cost',
        'maintenance_cost': 'Maintenance_Cost',
        'fuel_costs': 'Fuel_Costs',
        'Fuel': 'Fuel',
        'Total_Cost': 'Total_Cost',
        'Topsis_Score': 'Topsis_Score',
    }
    
    df.rename(columns=column_mapping, inplace=True, errors='ignore')

    # Calculate Total Cost if missing
    # if 'Total_Cost' not in df.columns:
    #     df['Total_Cost'] = df['Insurance_Cost'] + df['Maintenance_Cost'] + df['Fuel_Costs']

    return df

class FleetOptimizer:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.vehicles_by_size_distance = self._group_vehicles()
        self.max_vehicles_by_group = self._calculate_max_vehicles()
    
    def _group_vehicles(self) -> Dict:
        """Group vehicles by (Size, Distance_x) combination"""
        groups = {}
        for _, row in self.data.iterrows():
            key = (row['Size'], row['Distance_x'])
            if key not in groups:
                groups[key] = []
            groups[key].append({
                'vehicle_type': row['Vehicle'],
                'yearly_range': row['Yearly_Range'],
                'topsis_score': row['Topsis_Score'],
                'insurance_cost': row['Insurance_Cost'],
                'maintenance_cost': row['Maintenance_Cost'],
                'Cost ($)':row['Cost ($)'],
                'fuel_costs': row['Fuel_Costs'],
                'fuel': row['Fuel'],
                'demand': row['Demand']
            })
        return groups
    
    def _calculate_max_vehicles(self) -> Dict:
        """Calculate maximum vehicles for each size-distance combination"""
        max_vehicles = {}
        for key, vehicles in self.vehicles_by_size_distance.items():
            # Get demand for this combination
            demand = vehicles[0]['demand']  # All vehicles in the same group have the same demand
            
            # Calculate max vehicles based on the vehicle with highest yearly range
            max_yearly_range = max(v['yearly_range'] for v in vehicles)
            
            # Calculate max vehicles needed and round up to ensure demand is met
            max_vehicles[key] = math.ceil(demand / max_yearly_range)
        
        return max_vehicles

    def is_valid_solution(self, solution: Dict, size_distance: Tuple) -> bool:
        """Check if the total number of vehicles in the solution is within the dynamic maximum limit"""
        return sum(solution.values()) <= self.max_vehicles_by_group[size_distance]

    def calculate_total_cost(self, num_vehicles: int, vehicle: Dict) -> float:
        """Calculate total cost including insurance, maintenance, and fuel costs"""
        return num_vehicles * (
            vehicle['insurance_cost'] + 
            vehicle['maintenance_cost'] + 
            vehicle['fuel_costs']+
            vehicle['Cost ($)']
        )

    def generate_initial_population(self, size_distance: Tuple, population_size: int = 50) -> List[Dict]:
        """Generate initial population of fleet combinations"""
        population = []
        vehicles = self.vehicles_by_size_distance[size_distance]
        max_vehicles = self.max_vehicles_by_group[size_distance]
        
        while len(population) < population_size:
            # Generate a solution
            solution = {v['vehicle_type']: 0 for v in vehicles}
            remaining_vehicles = max_vehicles
            
            # Randomly distribute vehicles while respecting the total maximum
            vehicle_types = list(solution.keys())
            while remaining_vehicles > 0 and vehicle_types:
                vehicle_type = random.choice(vehicle_types)
                if random.random() < 0.5:  # 50% chance to add a vehicle
                    solution[vehicle_type] += 1
                    remaining_vehicles -= 1
                else:
                    vehicle_types.remove(vehicle_type)
            
            if self.is_valid_solution(solution, size_distance):
                population.append(solution)
        
        return population

    def crossover(self, parent1: Dict, parent2: Dict, size_distance: Tuple) -> Tuple[Dict, Dict]:
        """Perform crossover between two parent solutions while respecting dynamic vehicle limit"""
        attempts = 0
        max_attempts = 10
        
        while attempts < max_attempts:
            crossover_point = random.randint(1, len(parent1) - 1)
            child1 = {}
            child2 = {}
            
            for i, vehicle_type in enumerate(parent1.keys()):
                if i < crossover_point:
                    child1[vehicle_type] = parent1[vehicle_type]
                    child2[vehicle_type] = parent2[vehicle_type]
                else:
                    child1[vehicle_type] = parent2[vehicle_type]
                    child2[vehicle_type] = parent1[vehicle_type]
            
            if (self.is_valid_solution(child1, size_distance) and 
                self.is_valid_solution(child2, size_distance)):
                return child1, child2
            
            attempts += 1
        
        # If we can't create valid children, return copies of parents
        return parent1.copy(), parent2.copy()

    def mutate(self, solution: Dict, size_distance: Tuple, mutation_rate: float = 0.1) -> Dict:
        """Mutate a solution while respecting the dynamic vehicle limit"""
        attempts = 0
        max_attempts = 10
        max_vehicles = self.max_vehicles_by_group[size_distance]
        
        while attempts < max_attempts:
            mutated_solution = solution.copy()
            total_vehicles = sum(mutated_solution.values())
            
            for vehicle_type in mutated_solution.keys():
                if random.random() < mutation_rate:
                    change = random.choice([-1, 1])
                    if (change == 1 and total_vehicles < max_vehicles) or \
                       (change == -1 and mutated_solution[vehicle_type] > 0):
                        mutated_solution[vehicle_type] += change
                        total_vehicles += change
            
            if self.is_valid_solution(mutated_solution, size_distance):
                return mutated_solution
            
            attempts += 1
        
        return solution  # Return original if no valid mutation found

    def fitness_function(self, solution: Dict, size_distance: Tuple) -> float:
        """
        Calculate fitness of a solution based on:
        1. TOPSIS score
        2. Meeting demand requirements
        3. Total cost
        4. Penalty for exceeding maximum vehicles
        """
        if not self.is_valid_solution(solution, size_distance):
            return float('-inf')
            
        vehicles = self.vehicles_by_size_distance[size_distance]
        total_cost = 0
        total_capacity = 0
        weighted_topsis = 0
        
        for vehicle in vehicles:
            num_vehicles = solution[vehicle['vehicle_type']]
            total_cost += self.calculate_total_cost(num_vehicles, vehicle)
            total_capacity += num_vehicles * vehicle['yearly_range']
            weighted_topsis += num_vehicles * vehicle['topsis_score']
        
        # Penalize solutions that don't meet demand
        demand = vehicles[0]['demand']
        demand_penalty = max(0, demand - total_capacity) * 1000
        
        # Normalize costs (lower is better)
        cost_score = 1 / (total_cost + 1)
        
        return weighted_topsis + cost_score - demand_penalty

    def optimize(self, size_distance: Tuple, generations: int = 100) -> Dict:
        """Run genetic algorithm to find optimal fleet combination"""
        population = self.generate_initial_population(size_distance)
        best_solution = None
        best_fitness = float('-inf')
        
        for _ in range(generations):
            # Calculate fitness for all solutions
            fitness_scores = [(solution, self.fitness_function(solution, size_distance))
                            for solution in population]
            
            # Sort by fitness (descending)
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # Update best solution if needed
            if fitness_scores[0][1] > best_fitness:
                best_solution = fitness_scores[0][0]
                best_fitness = fitness_scores[0][1]
            
            # Select parents for next generation
            parents = [score[0] for score in fitness_scores[:len(population)//2]]
            
            # Create next generation
            next_generation = parents.copy()
            while len(next_generation) < len(population):
                parent1, parent2 = random.sample(parents, 2)
                child1, child2 = self.crossover(parent1, parent2, size_distance)
                child1 = self.mutate(child1, size_distance)
                child2 = self.mutate(child2, size_distance)
                next_generation.extend([child1, child2])
            
            population = next_generation[:len(population)]
        
        return best_solution

    def get_optimized_results(self) -> pd.DataFrame:
        """Run optimization and return results in the required format"""
        results = []
        
        for size_distance in self.vehicles_by_size_distance.keys():
            best_solution = self.optimize(size_distance)
            max_vehicles = self.max_vehicles_by_group[size_distance]
            
            for vehicle_type, num_vehicles in best_solution.items():
                if num_vehicles > 0:
                    vehicle_data = next(v for v in self.vehicles_by_size_distance[size_distance] 
                                     if v['vehicle_type'] == vehicle_type)
                    total_cost = self.calculate_total_cost(num_vehicles, vehicle_data)

                    results.append({
                        "Allocation": f"Size {size_distance[0]}, Distance {size_distance[1]}",
                        "Vehicle": vehicle_type,
                        "Cost ($)": round(total_cost, 2),
                        "Fuel": vehicle_data['fuel'],
                        "no_of_vehicles": num_vehicles,
                        "Max Vehicles": max_vehicles,
                        "Demand": vehicle_data['demand'],
                        "Yearly Range": vehicle_data['yearly_range'],
                        "Total_Cost":vehicle_data['Total_Cost']
                    })

        return pd.DataFrame(results)

def main(csv_path: str):
    """Main function to run the optimization"""
    print("\nRunning optimization with dynamic maximum vehicles...")
    data_df = load_and_preprocess_data(csv_path)
    optimizer = FleetOptimizer(data_df)
    optimized_results = optimizer.get_optimized_results()
    print("\nOptimized Fleet Allocation:")
    print(optimized_results)
    return optimized_results

if __name__ == "__main__":
    csv_path = "topsis_result/topsis_results_2023.csv"  # Replace with your CSV file path
    results_df = main(csv_path)
    # results_df.to_csv("optimized_fleet_allocation.csv", index=False)


Running optimization with dynamic maximum vehicles...


KeyError: 'Distance_x'