In [None]:
def process_evrp_dataset(raw_dataset):
    def calculate_distance_matrix_np(coords_np):
        if coords_np.size == 0 or coords_np.shape[1] != 2: return np.array([])
        diff = coords_np[:, np.newaxis, :] - coords_np[np.newaxis, :, :]
        return np.sqrt(np.sum(diff**2, axis=-1))

    instance_name = raw_dataset.get("instance_name", "unknown_instance")

    coordinates_tensor = raw_dataset.get("coordinates")
    coords_np = coordinates_tensor.numpy() if isinstance(coordinates_tensor, torch.Tensor) else np.array(coordinates_tensor)
    distance_matrix = calculate_distance_matrix_np(coords_np)

    pd_pairs_raw = raw_dataset.get("pd_pairs", [])
    pd_pairs_list = [tuple(pair) for pair in pd_pairs_raw]

    # Depot
    depot = int(raw_dataset.get("depot_node_idx", -1))

    max_distance = float(raw_dataset.get("battery_capacity", -1.0))

    charging_stations_raw = raw_dataset.get("charging_station_indices", [])
    charging_stations_list = [int(cs) for cs in charging_stations_raw]

    processed_data = {
        "instance_name": instance_name,
        "distance_matrix": distance_matrix,
        "depot": depot,
        "pickups_deliveries": pd_pairs_list,
        "charging_stations": charging_stations_list,
        "max_distance_per_charge": max_distance
    }
    return processed_data

In [None]:
def export_results_to_csv(results_data, file_name='aco_results.csv'):
    if not results_data:
        print("No results to export.")
        return
    fieldnames = ['instance_name', 'best_cost', 'time_secor', 'best_path_str']
    try:
        with open(file_name, mode='w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(results_data)
        print(f"\nResults successfully exported to '{file_name}'")
    except IOError as e:
        print(f"Error writing to CSV file '{file_name}': {e}")

In [None]:
import torch
import numpy as np
import time
import random as rn
from numpy.random import choice
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.sparse import csr_matrix
import csv

In [None]:
class EVRP_ACS(object):
    def __init__(self, distances, ants, iterations, decay, q0, tau_min, tau_max,
                 current_data, max_distance_per_charge, alpha=1, beta=1):
        self.distances = np.array(distances)
        self.pheromone = np.ones(self.distances.shape) * 0.1
        self.all_inds = range(len(distances))
        self.ants = ants
        self.iterations = iterations
        self.decay = decay
        self.q0 = q0
        self.tau_min = tau_min
        self.tau_max = tau_max
        self.alpha = alpha
        self.beta = beta
        dist_handled = np.copy(self.distances)
        dist_handled[dist_handled == 0] = 1e-10
        self.heuristic = (1.0 / dist_handled) ** self.beta
        self.data = current_data
        self.depot = self.data["depot"]
        self.pickups_deliveries = self.data["pickups_deliveries"]
        self.pickup_to_delivery = {p: d for p, d in self.pickups_deliveries}
        self.charging_stations = set(self.data["charging_stations"])
        self.max_distance_per_charge = 10.0
        self.precedence = {d: p for p, d in self.pickups_deliveries}
        self.pickup_nodes = set(self.pickup_to_delivery.keys())
        self.delivery_nodes = set(self.precedence.keys())
        self.all_pd_nodes = self.pickup_nodes.union(self.delivery_nodes)
        self.global_best_path = None
        self.global_best_distance = np.inf


    def run(self):
        print("\\nStarting optimization search: looking for the BEST path...")
        stagnation_count = 0
        max_stagnation = 20
        for i in range(self.iterations):
            all_paths_data = self.allPaths()
            valid_paths_data = [(p, d) for p, d in all_paths_data if p and d < np.inf and self.is_valid_tour(p)]
            if valid_paths_data:
                iter_best_path, iter_best_dist = min(valid_paths_data, key=lambda x: x[1])
                if iter_best_dist < self.global_best_distance:
                    self.global_best_path = iter_best_path
                    self.global_best_distance = iter_best_dist
                    stagnation_count = 0
                else:
                    stagnation_count += 1
            else:
                stagnation_count += 1
            if self.global_best_path:
                self.dropPheromoneGlobalBest()
            self.pheromone = np.clip(self.pheromone * self.decay, self.tau_min, self.tau_max)
            if (i + 1) % 10 == 0:
                if self.global_best_path:
                    print(f"Iter {i+1}: Current Best Dist: {self.global_best_distance:.4f}")
                else:
                    print(f"Iter {i+1}: No valid path found yet.")
            if stagnation_count >= max_stagnation:
                print(f"Stopping early due to stagnation after {i+1} iterations.")
                break
        return self.global_best_path, self.global_best_distance


    def allPaths(self):
        all_paths_data = []
        for _ in range(self.ants):
            path = self.generatePath()
            if path:
                dist = self.pathDistance(path)
                all_paths_data.append((path, dist))
            else:
                all_paths_data.append(([], np.inf))
        return all_paths_data


    def _get_batch_delivery_mst_cost(self, current_node_idx, nodes_to_visit):
        if not nodes_to_visit:
            return 0
        node_indices = list(nodes_to_visit) + [current_node_idx]
        sub_dist_matrix = self.distances[np.ix_(node_indices, node_indices)]
        csr_mat = csr_matrix(sub_dist_matrix)
        mst = minimum_spanning_tree(csr_mat)
        return mst.sum()


    def generatePath(self):
        path = [self.depot]
        visited_nodes = {self.depot}
        current_node = self.depot
        distance_on_charge = 0.0
        cargo = set()

        while not (self.all_pd_nodes.issubset(visited_nodes) and current_node == self.depot):
            next_node = self.pickMove(current_node, visited_nodes, distance_on_charge, cargo)
            if next_node is None: return None

            dist_to_next = self.distances[current_node, next_node]
            distance_on_charge += dist_to_next
            path.append(next_node)

            if next_node in self.pickup_nodes:
                cargo.add(next_node)
            elif next_node in self.delivery_nodes:
                if self.precedence[next_node] in cargo:
                    cargo.remove(self.precedence[next_node])
            if next_node in self.charging_stations or next_node == self.depot:
                distance_on_charge = 0.0

            self.pheromone[current_node, next_node] = (1 - 0.1) * self.pheromone[current_node, next_node] + 0.1 * self.tau_min
            self.pheromone[next_node, current_node] = self.pheromone[current_node, next_node]
            current_node = next_node
            if next_node not in self.charging_stations:
                visited_nodes.add(next_node)
            if len(path) > (len(self.all_inds) * 2): return None
        return path


    def pickMove(self, current_node, visited_nodes, distance_on_charge, cargo):
        candidate_nodes = []
        all_tasks_done = self.all_pd_nodes.issubset(visited_nodes)

        for next_n in self.all_inds:
            if next_n == current_node: continue

            if (distance_on_charge + self.distances[current_node, next_n]) > self.max_distance_per_charge:
                continue

            if all_tasks_done:
                if next_n == self.depot:
                    candidate_nodes.append(next_n)
                continue

            if next_n == self.depot and next_n not in self.charging_stations:
                continue

            if next_n in self.precedence and self.precedence[next_n] not in visited_nodes:
                continue

            if next_n in self.pickup_nodes and next_n not in visited_nodes:
                dist_to_pickup = self.distances[current_node, next_n]
                remaining_charge_after_pickup = self.max_distance_per_charge - (distance_on_charge + dist_to_pickup)
                deliveries_to_complete = {self.pickup_to_delivery[p] for p in cargo}
                deliveries_to_complete.add(self.pickup_to_delivery[next_n])
                deliveries_to_complete = {d for d in deliveries_to_complete if d not in visited_nodes}
                estimated_delivery_cost = self._get_batch_delivery_mst_cost(next_n, deliveries_to_complete)
                if remaining_charge_after_pickup < estimated_delivery_cost:
                    continue

            if next_n in visited_nodes and next_n not in self.charging_stations:
                continue

            candidate_nodes.append(next_n)

        if not candidate_nodes:
            candidate_chargers = [cs for cs in self.charging_stations if (distance_on_charge + self.distances[current_node, cs]) <= self.max_distance_per_charge]
            if not candidate_chargers and (distance_on_charge + self.distances[current_node, self.depot]) <= self.max_distance_per_charge:
                 if self.all_pd_nodes.issubset(visited_nodes):
                    return self.depot
            if candidate_chargers:
                return min(candidate_chargers, key=lambda cs: self.distances[current_node, cs])
            return None

        pheromone_vals = np.array([self.pheromone[current_node, c] for c in candidate_nodes])
        heuristic_vals = np.array([self.heuristic[current_node, c] for c in candidate_nodes])
        selection_vals = (pheromone_vals ** self.alpha) * (heuristic_vals ** self.beta)
        if rn.random() < self.q0:
            chosen_node = candidate_nodes[np.argmax(selection_vals)]
        else:
            prob = selection_vals / np.sum(selection_vals)
            if np.any(np.isnan(prob)) or np.sum(prob) == 0:
                return rn.choice(candidate_nodes) if candidate_nodes else None
            chosen_node = choice(candidate_nodes, 1, p=prob)[0]
        return chosen_node

    def is_valid_tour(self, tour):
        if not tour or tour[0] != self.depot or tour[-1] != self.depot: return False

        tour_nodes_set = set(tour)
        if not self.all_pd_nodes.issubset(tour_nodes_set):
            return False

        dist_on_charge = 0.0
        visited_so_far_in_path = set()

        for i in range(len(tour)):
            v = tour[i]

            if i > 0:
                u = tour[i-1]
                dist_leg = self.distances[u, v]
                if dist_on_charge + dist_leg > self.max_distance_per_charge + 1e-9: \
                    return False
                dist_on_charge += dist_leg
            if v in self.delivery_nodes:
                if self.precedence[v] not in visited_so_far_in_path:
                    return False

            if v == self.depot or v in self.charging_stations:
                dist_on_charge = 0.0

            visited_so_far_in_path.add(v)

        return True

    def pathDistance(self, path):
        total_distance = 0
        for i in range(len(path) - 1):
            total_distance += self.distances[path[i], path[i+1]]
        return total_distance

    def dropPheromoneGlobalBest(self):
        if self.global_best_path is None: return
        self.dropPheromoneOnPath(self.global_best_path, self.global_best_distance)

    def dropPheromoneOnPath(self, path, distance):
        if distance == np.inf: return
        deposit = 1.0 / distance
        for k in range(len(path) - 1):
            u, v = path[k], path[k+1]
            self.pheromone[u, v] = (1 - self.decay) * self.pheromone[u,v] + self.decay * deposit
            self.pheromone[v, u] = self.pheromone[u, v]
        self.pheromone = np.clip(self.pheromone, self.tau_min, self.tau_max)

    def format_path(self, path):
        if not path: return "Empty Path"
        parts = []
        for node in path:
            if node == self.depot: parts.append(str(node))
            elif node in self.pickup_nodes: parts.append(f"{node}(P)")
            elif node in self.delivery_nodes: parts.append(f"{node}(D)")
            elif node in self.charging_stations: parts.append(f"{node}(C)")
            else: parts.append(str(node))
        return " -> ".join(parts)

if __name__ == '__main__':
    file_path = 'D:/evrp_pdr_dataset_final/test/evrp_pdr_n20.pt'
    raw_datasets = []
    try:
        print(f"Attempting to load data from '{file_path}'...")
        loaded_data = torch.load(file_path, map_location=torch.device('cpu'))
        print(f"File loaded successfully. Found {len(loaded_data)} datasets.")
        raw_datasets = loaded_data[:100]
    except FileNotFoundError:
        print(f"\nError: The file '{file_path}' was not found.")
    except Exception as e:
        print(f"\nAn error occurred during file loading: {e}")
    print(raw_datasets)
    total_cost_all_datasets = 0.0
    total_time_all_datasets = 0.0
    num_successful_runs = 0
    all_results_for_export = []

    if raw_datasets:
        for i, raw_data in enumerate(raw_datasets):
            print("\n" + "="*50)
            print(f"--- Processing and Running Dataset {i+1}/{len(raw_datasets)} ---")

            dataset_params = process_evrp_dataset(raw_data)

            print(f"Instance: {dataset_params['instance_name']}")
            print(f"  Depot: {dataset_params['depot']}, P/D Pairs: {len(dataset_params['pickups_deliveries'])}")
            print(f"  Charging Stations: {dataset_params['charging_stations']}")
            print(f"  Max Distance: {dataset_params['max_distance_per_charge']:.2f}")

            aco_solver = EVRP_ACS(
                distances=dataset_params["distance_matrix"], ants=20, iterations=50,
                decay=0.9, q0=0.3, beta=1.0, tau_min=0.01, tau_max=10.0,
                current_data=dataset_params,
                max_distance_per_charge=dataset_params['max_distance_per_charge'],
                alpha=1.0
            )

            start_time = time.time()
            best_path, best_distance = aco_solver.run()
            end_time = time.time()
            execution_time = end_time - start_time

            result_row = {
                'instance_name': dataset_params['instance_name'],
                'best_cost': 'inf',
                'time_secor': f"{execution_time:.6f}",
                'best_path_str': 'No valid path'
            }

            if best_path:
                print(f"\nResult for {dataset_params['instance_name']}: SUCCESS")
                print(f"  Best Distance: {best_distance:.4f}")

                result_row['best_cost'] = f"{best_distance:.6f}"
                formatted_path = aco_solver.format_path(best_path)
                result_row['best_path_str'] = formatted_path.replace(' -> ', '->')
                total_cost_all_datasets += best_distance
                total_time_all_datasets += execution_time
                num_successful_runs += 1
            else:
                print(f"\nResult for {dataset_params['instance_name']}: FAILURE - No valid path found.")

            all_results_for_export.append(result_row)

        output_filename = 'epdp_20n_acs_results.csv'
        export_results_to_csv(all_results_for_export, output_filename)
        print("\n\n--- Overall Test Set Results ---")
        print(f"Total datasets processed: {len(raw_datasets)}")
        print(f"Number of successful runs (valid paths found): {num_successful_runs}")
        if num_successful_runs > 0:
            average_cost = total_cost_all_datasets / num_successful_runs
            average_time = total_time_all_datasets / num_successful_runs
            print(f"Average cost (distance) for successful runs: {average_cost:.4f}")
            print(f"Average time for successful runs: {average_time:.4f} seconds")
        else:
            print("No successful runs to calculate an average cost.")

In [None]:
class EVRP_EAS(object):
    def __init__(self, distances, ants, iterations, decay,
                 tau_min, tau_max, e_weight,
                 current_data, max_distance_per_charge, alpha=1, beta=1):
        self.distances = np.array(distances)
        self.pheromone = np.ones(self.distances.shape) * 0.1
        self.all_inds = range(len(distances))
        self.ants = ants
        self.iterations = iterations
        self.decay = decay
        self.e_weight = e_weight
        self.tau_min = tau_min
        self.tau_max = tau_max
        self.alpha = alpha
        self.beta = beta
        dist_handled = np.copy(self.distances)
        dist_handled[dist_handled == 0] = 1e-10
        self.heuristic = (1.0 / dist_handled) ** self.beta
        self.data = current_data
        self.depot = self.data["depot"]
        self.pickups_deliveries = self.data["pickups_deliveries"]
        self.pickup_to_delivery = {p: d for p, d in self.pickups_deliveries}
        self.charging_stations = set(self.data["charging_stations"])
        self.max_distance_per_charge = 10.0
        self.precedence = {d: p for p, d in self.pickups_deliveries}
        self.pickup_nodes = set(self.pickup_to_delivery.keys())
        self.delivery_nodes = set(self.precedence.keys())
        self.all_pd_nodes = self.pickup_nodes.union(self.delivery_nodes)
        self.global_best_path = None
        self.global_best_distance = np.inf


    def run(self):
        print("\nStarting EAS optimization: looking for the BEST path...")
        stagnation_count = 0
        max_stagnation = 20

        for i in range(self.iterations):
            all_paths_data = self.allPaths()
            valid_paths_data = [(p, d) for p, d in all_paths_data if p and d < np.inf and self.is_valid_tour(p)]

            if valid_paths_data:
                iter_best_path, iter_best_dist = min(valid_paths_data, key=lambda x: x[1])
                if iter_best_dist < self.global_best_distance:
                    self.global_best_path = iter_best_path
                    self.global_best_distance = iter_best_dist
                    stagnation_count = 0
                else:
                    stagnation_count += 1
            else:
                stagnation_count += 1

            self.pheromone *= self.decay
            for path, dist in valid_paths_data:
                self.depositPheromone(path, dist, weight=1.0)
            if self.global_best_path:
                self.depositPheromone(self.global_best_path, self.global_best_distance, weight=self.e_weight)
            self.pheromone = np.clip(self.pheromone, self.tau_min, self.tau_max)

            if (i + 1) % 10 == 0:
                if self.global_best_path:
                    print(f"Iter {i+1}: Current Best Dist: {self.global_best_distance:.4f}")
                else:
                    print(f"Iter {i+1}: No valid path found yet.")
            if stagnation_count >= max_stagnation:
                print(f"Stopping early due to stagnation after {i+1} iterations.")
                break

        return self.global_best_path, self.global_best_distance


    def allPaths(self):
        all_paths_data = []
        for _ in range(self.ants):
            path = self.generatePath()
            if path:
                dist = self.pathDistance(path)
                all_paths_data.append((path, dist))
            else:
                all_paths_data.append(([], np.inf))
        return all_paths_data


    def generatePath(self):
        path = [self.depot]
        visited_nodes = {self.depot}
        current_node = self.depot
        distance_on_charge = 0.0
        cargo = set()

        while not (self.all_pd_nodes.issubset(visited_nodes) and current_node == self.depot):
            next_node = self.pickMove(current_node, visited_nodes, distance_on_charge, cargo)
            if next_node is None: return None

            dist_to_next = self.distances[current_node, next_node]
            distance_on_charge += dist_to_next
            path.append(next_node)

            if next_node in self.pickup_nodes:
                cargo.add(next_node)
            elif next_node in self.delivery_nodes:
                if self.precedence[next_node] in cargo:
                    cargo.remove(self.precedence[next_node])

            if next_node in self.charging_stations or next_node == self.depot:
                distance_on_charge = 0.0

            current_node = next_node
            if next_node not in self.charging_stations:
                visited_nodes.add(next_node)
            if len(path) > (len(self.all_inds) * 2): return None
        return path


    def pickMove(self, current_node, visited_nodes, distance_on_charge, cargo):
        candidate_nodes = []
        all_tasks_done = self.all_pd_nodes.issubset(visited_nodes)

        for next_n in self.all_inds:
            if next_n == current_node: continue
            if (distance_on_charge + self.distances[current_node, next_n]) > self.max_distance_per_charge: continue

            if all_tasks_done:
                if next_n == self.depot:
                    candidate_nodes.append(next_n)
                continue

            if next_n == self.depot and next_n not in self.charging_stations:
                continue

            if next_n in self.precedence and self.precedence[next_n] not in visited_nodes:
                continue

            if next_n in self.pickup_nodes and next_n not in visited_nodes:
                dist_to_pickup = self.distances[current_node, next_n]
                remaining_charge_after_pickup = self.max_distance_per_charge - (distance_on_charge + dist_to_pickup)
                deliveries_to_complete = {self.pickup_to_delivery[p] for p in cargo}
                deliveries_to_complete.add(self.pickup_to_delivery[next_n])
                deliveries_to_complete = {d for d in deliveries_to_complete if d not in visited_nodes}
                estimated_delivery_cost = self._get_batch_delivery_mst_cost(next_n, deliveries_to_complete)
                if remaining_charge_after_pickup < estimated_delivery_cost:
                    continue

            if next_n in visited_nodes and next_n not in self.charging_stations:
                continue
            candidate_nodes.append(next_n)

        if not candidate_nodes:
            candidate_chargers = [cs for cs in self.charging_stations if (distance_on_charge + self.distances[current_node, cs]) <= self.max_distance_per_charge]
            if not candidate_chargers and (distance_on_charge + self.distances[current_node, self.depot]) <= self.max_distance_per_charge:
                 if self.all_pd_nodes.issubset(visited_nodes):
                    return self.depot
            if candidate_chargers:
                return min(candidate_chargers, key=lambda cs: self.distances[current_node, cs])
            return None

        pheromone_vals = np.array([self.pheromone[current_node, c] for c in candidate_nodes])
        heuristic_vals = np.array([self.heuristic[current_node, c] for c in candidate_nodes])
        selection_vals = (pheromone_vals ** self.alpha) * (heuristic_vals ** self.beta)

        prob = selection_vals / np.sum(selection_vals)
        if np.any(np.isnan(prob)) or np.sum(prob) == 0:
            return rn.choice(candidate_nodes) if candidate_nodes else None
        chosen_node = choice(candidate_nodes, 1, p=prob)[0]
        return chosen_node


    def is_valid_tour(self, tour):
        if not tour or tour[0] != self.depot or tour[-1] != self.depot: return False
        tour_nodes_set = set(tour)
        if not self.all_pd_nodes.issubset(tour_nodes_set): return False

        dist_on_charge = 0.0
        visited_so_far_in_path = set()

        for i in range(len(tour)):
            v = tour[i]

            if i > 0:
                u = tour[i-1]
                dist_leg = self.distances[u, v]
                if dist_on_charge + dist_leg > self.max_distance_per_charge + 1e-9: return False
                dist_on_charge += dist_leg

            if v in self.delivery_nodes:
                if self.precedence[v] not in visited_so_far_in_path: return False

            if v == self.depot or v in self.charging_stations:
                dist_on_charge = 0.0

            visited_so_far_in_path.add(v)

        return True

    def depositPheromone(self, path, distance, weight):
        if distance == np.inf: return
        deposit = weight * (1.0 / distance)
        for k in range(len(path) - 1):
            u, v = path[k], path[k+1]
            self.pheromone[u, v] += deposit
            self.pheromone[v, u] += deposit

    def _get_batch_delivery_mst_cost(self, current_node_idx, nodes_to_visit):
        if not nodes_to_visit: return 0
        node_indices = list(nodes_to_visit) + [current_node_idx]
        sub_dist_matrix = self.distances[np.ix_(node_indices, node_indices)]
        csr_mat = csr_matrix(sub_dist_matrix)
        mst = minimum_spanning_tree(csr_mat)
        return mst.sum()

    def pathDistance(self, path):
        total_distance = 0
        for i in range(len(path) - 1):
            total_distance += self.distances[path[i], path[i+1]]
        return total_distance

    def format_path(self, path):
        if not path: return "Empty Path"
        parts = []
        for node in path:
            if node == self.depot: parts.append(str(node))
            elif node in self.pickup_nodes: parts.append(f"{node}(P)")
            elif node in self.delivery_nodes: parts.append(f"{node}(D)")
            elif node in self.charging_stations: parts.append(f"{node}(C)")
            else: parts.append(str(node))
        return " -> ".join(parts)
if __name__ == '__main__':
    file_path = 'D:/evrp_pdr_dataset_final/test/evrp_pdr_n20.pt'
    raw_datasets = []
    try:
        print(f"Attempting to load data from '{file_path}'...")
        loaded_data = torch.load(file_path, map_location=torch.device('cpu'))
        print(f"File loaded successfully. Found {len(loaded_data)} datasets.")
        raw_datasets = loaded_data[:100]
    except FileNotFoundError:
        print(f"\nError: The file '{file_path}' was not found.")
    except Exception as e:
        print(f"\nAn error occurred during file loading: {e}")

    total_cost_all_datasets = 0.0
    total_time_all_datasets = 0.0
    num_successful_runs = 0
    all_results_for_export = []
    if raw_datasets:
        for i, raw_data in enumerate(raw_datasets):
            print("\n" + "="*50)
            print(f"--- Processing and Running EAS for Dataset {i+1}/{len(raw_datasets)} ---")
            dataset_params = process_evrp_dataset(raw_data)

            print(f"Instance: {dataset_params['instance_name']}")
            print(f"  Max Distance: {dataset_params['max_distance_per_charge']:.2f}")
            eas_solver = EVRP_EAS(
                distances=dataset_params["distance_matrix"],
                ants=20,
                iterations=50,
                decay=0.9,
                e_weight=10.0,
                tau_min=0.01,
                tau_max=10.0,
                current_data=dataset_params,
                max_distance_per_charge=dataset_params['max_distance_per_charge'],
                alpha=1.0,
                beta=1.0
            )

            start_time = time.time()
            best_path, best_distance = eas_solver.run()
            end_time = time.time()
            execution_time = end_time - start_time

            result_row = {
                'instance_name': dataset_params['instance_name'],
                'best_cost': 'inf',
                'time_secor': f"{execution_time:.6f}",
                'best_path_str': 'No valid path'
            }
            if best_path:
                print(f"\nResult for {dataset_params['instance_name']}: SUCCESS")
                print(f"  Best Distance: {best_distance:.4f}")
                result_row['best_cost'] = f"{best_distance:.6f}"
                formatted_path = eas_solver.format_path(best_path)
                result_row['best_path_str'] = formatted_path.replace(' -> ', '->')
                total_cost_all_datasets += best_distance
                total_time_all_datasets += execution_time
                num_successful_runs += 1
            else:
                print(f"\nResult for {dataset_params['instance_name']}: FAILURE - No valid path found.")
            all_results_for_export.append(result_row)

        output_filename = 'epdp_20n_eas_results.csv' # Đổi tên file đầu ra
        export_results_to_csv(all_results_for_export, output_filename)
        print("\n\n--- Overall Test Set Results ---")
        print(f"Total datasets processed: {len(raw_datasets)}")
        print(f"Number of successful runs (valid paths found): {num_successful_runs}")
        if num_successful_runs > 0:
            average_cost = total_cost_all_datasets / num_successful_runs
            average_time = total_time_all_datasets / num_successful_runs
            print(f"Average cost (distance) for successful runs: {average_cost:.4f}")
            print(f"Average time for successful runs: {average_time:.4f} seconds")
        else:
            print("No successful runs to calculate an average cost.")

In [None]:
class EVRP_MMAS(object):
    def __init__(self, distances, ants, iterations, decay, q0,
                 tau_min, tau_max,
                 current_data, max_distance_per_charge, alpha=1, beta=1):
        self.distances = np.array(distances)
        self.tau_min = tau_min
        self.tau_max = tau_max
        self.pheromone = np.ones(self.distances.shape) * self.tau_max

        self.all_inds = range(len(distances))
        self.ants = ants
        self.iterations = iterations
        self.decay = decay
        self.q0 = q0
        self.alpha = alpha
        self.beta = beta
        dist_handled = np.copy(self.distances)
        dist_handled[dist_handled == 0] = 1e-10
        self.heuristic = (1.0 / dist_handled) ** self.beta
        self.data = current_data
        self.depot = self.data["depot"]
        self.pickups_deliveries = self.data["pickups_deliveries"]
        self.pickup_to_delivery = {p: d for p, d in self.pickups_deliveries}
        self.charging_stations = set(self.data["charging_stations"])
        self.max_distance_per_charge = 10.0
        self.precedence = {d: p for p, d in self.pickups_deliveries}
        self.pickup_nodes = set(self.pickup_to_delivery.keys())
        self.delivery_nodes = set(self.precedence.keys())
        self.all_pd_nodes = self.pickup_nodes.union(self.delivery_nodes)
        self.global_best_path = None
        self.global_best_distance = np.inf


    def run(self):
        print("\nStarting MMAS optimization: looking for the BEST path...")
        stagnation_count = 0
        max_stagnation_for_reset = 20

        for i in range(self.iterations):
            all_paths_data = self.allPaths()
            valid_paths_data = [(p, d) for p, d in all_paths_data if p and d < np.inf and self.is_valid_tour(p)]

            if valid_paths_data:
                iter_best_path, iter_best_dist = min(valid_paths_data, key=lambda x: x[1])
                if iter_best_dist < self.global_best_distance:
                    self.global_best_path = iter_best_path
                    self.global_best_distance = iter_best_dist
                    stagnation_count = 0
                else:
                    stagnation_count += 1
            else:
                stagnation_count += 1

            self.pheromone *= self.decay
            if self.global_best_path:
                deposit = 1.0 / self.global_best_distance
                for k in range(len(self.global_best_path) - 1):
                    u, v = self.global_best_path[k], self.global_best_path[k+1]
                    self.pheromone[u, v] += deposit
                    self.pheromone[v, u] += deposit
            self.pheromone = np.clip(self.pheromone, self.tau_min, self.tau_max)

            if stagnation_count >= max_stagnation_for_reset:
                print(f"  --- Stagnation detected! Resetting pheromone trails at iteration {i+1} ---")
                self.pheromone = np.ones(self.distances.shape) * self.tau_max
                stagnation_count = 0

            if (i + 1) % 10 == 0:
                if self.global_best_path:
                    print(f"Iter {i+1}: Current Best Dist: {self.global_best_distance:.4f}")
                else:
                    print(f"Iter {i+1}: No valid path found yet.")

        return self.global_best_path, self.global_best_distance


    def allPaths(self):
        all_paths_data = []
        for _ in range(self.ants):
            path = self.generatePath()
            if path:
                dist = self.pathDistance(path)
                all_paths_data.append((path, dist))
            else:
                all_paths_data.append(([], np.inf))
        return all_paths_data


    def generatePath(self):
        path = [self.depot]
        visited_nodes = {self.depot}
        current_node = self.depot
        distance_on_charge = 0.0
        cargo = set()

        while not (self.all_pd_nodes.issubset(visited_nodes) and current_node == self.depot):
            next_node = self.pickMove(current_node, visited_nodes, distance_on_charge, cargo)
            if next_node is None: return None

            dist_to_next = self.distances[current_node, next_node]
            distance_on_charge += dist_to_next
            path.append(next_node)

            if next_node in self.pickup_nodes:
                cargo.add(next_node)
            elif next_node in self.delivery_nodes:
                if self.precedence[next_node] in cargo:
                    cargo.remove(self.precedence[next_node])

            if next_node in self.charging_stations or next_node == self.depot:
                distance_on_charge = 0.0

            current_node = next_node
            if next_node not in self.charging_stations:
                visited_nodes.add(next_node)
            if len(path) > (len(self.all_inds) * 2): return None
        return path


    def pickMove(self, current_node, visited_nodes, distance_on_charge, cargo):
        candidate_nodes = []
        all_tasks_done = self.all_pd_nodes.issubset(visited_nodes)

        for next_n in self.all_inds:
            if next_n == current_node: continue
            if (distance_on_charge + self.distances[current_node, next_n]) > self.max_distance_per_charge: continue

            if all_tasks_done:
                if next_n == self.depot:
                    candidate_nodes.append(next_n)
                continue

            if next_n == self.depot and next_n not in self.charging_stations:
                continue

            if next_n in self.precedence and self.precedence[next_n] not in visited_nodes:
                continue

            if next_n in self.pickup_nodes and next_n not in visited_nodes:
                dist_to_pickup = self.distances[current_node, next_n]
                remaining_charge_after_pickup = self.max_distance_per_charge - (distance_on_charge + dist_to_pickup)
                deliveries_to_complete = {self.pickup_to_delivery[p] for p in cargo}
                deliveries_to_complete.add(self.pickup_to_delivery[next_n])
                deliveries_to_complete = {d for d in deliveries_to_complete if d not in visited_nodes}
                estimated_delivery_cost = self._get_batch_delivery_mst_cost(next_n, deliveries_to_complete)
                if remaining_charge_after_pickup < estimated_delivery_cost:
                    continue

            if next_n in visited_nodes and next_n not in self.charging_stations:
                continue
            candidate_nodes.append(next_n)

        if not candidate_nodes:
            candidate_chargers = [cs for cs in self.charging_stations if (distance_on_charge + self.distances[current_node, cs]) <= self.max_distance_per_charge]
            if not candidate_chargers and (distance_on_charge + self.distances[current_node, self.depot]) <= self.max_distance_per_charge:
                 if self.all_pd_nodes.issubset(visited_nodes):
                    return self.depot
            if candidate_chargers:
                return min(candidate_chargers, key=lambda cs: self.distances[current_node, cs])
            return None

        pheromone_vals = np.array([self.pheromone[current_node, c] for c in candidate_nodes])
        heuristic_vals = np.array([self.heuristic[current_node, c] for c in candidate_nodes])
        selection_vals = (pheromone_vals ** self.alpha) * (heuristic_vals ** self.beta)
        if rn.random() < self.q0:
            chosen_node = candidate_nodes[np.argmax(selection_vals)]
        else:
            prob = selection_vals / np.sum(selection_vals)
            if np.any(np.isnan(prob)) or np.sum(prob) == 0:
                return rn.choice(candidate_nodes) if candidate_nodes else None
            chosen_node = choice(candidate_nodes, 1, p=prob)[0]
        return chosen_node

    def is_valid_tour(self, tour):
        if not tour or tour[0] != self.depot or tour[-1] != self.depot: return False
        tour_nodes_set = set(tour)
        if not self.all_pd_nodes.issubset(tour_nodes_set): return False

        dist_on_charge = 0.0
        visited_so_far_in_path = set()

        for i in range(len(tour)):
            v = tour[i]

            if i > 0:
                u = tour[i-1]
                dist_leg = self.distances[u, v]
                if dist_on_charge + dist_leg > self.max_distance_per_charge + 1e-9: return False
                dist_on_charge += dist_leg

            if v in self.delivery_nodes:
                if self.precedence[v] not in visited_so_far_in_path: return False

            if v == self.depot or v in self.charging_stations:
                dist_on_charge = 0.0

            visited_so_far_in_path.add(v)

        return True


    def _get_batch_delivery_mst_cost(self, current_node_idx, nodes_to_visit):
        if not nodes_to_visit: return 0
        node_indices = list(nodes_to_visit) + [current_node_idx]
        sub_dist_matrix = self.distances[np.ix_(node_indices, node_indices)]
        csr_mat = csr_matrix(sub_dist_matrix)
        mst = minimum_spanning_tree(csr_mat)
        return mst.sum()

    def pathDistance(self, path):
        total_distance = 0
        for i in range(len(path) - 1):
            total_distance += self.distances[path[i], path[i+1]]
        return total_distance

    def format_path(self, path):
        if not path: return "Empty Path"
        parts = []
        for node in path:
            if node == self.depot: parts.append(str(node))
            elif node in self.pickup_nodes: parts.append(f"{node}(P)")
            elif node in self.delivery_nodes: parts.append(f"{node}(D)")
            elif node in self.charging_stations: parts.append(f"{node}(C)")
            else: parts.append(str(node))
        return " -> ".join(parts)

if __name__ == '__main__':
    file_path = 'D:/evrp_pdr_dataset_final/test/evrp_pdr_n20.pt'
    raw_datasets = []
    try:
        print(f"Attempting to load data from '{file_path}'...")
        loaded_data = torch.load(file_path, map_location=torch.device('cpu'))
        print(f"File loaded successfully. Found {len(loaded_data)} datasets.")
        raw_datasets = loaded_data[:100]
    except FileNotFoundError:
        print(f"\nError: The file '{file_path}' was not found.")
    except Exception as e:
        print(f"\nAn error occurred during file loading: {e}")

    total_cost_all_datasets = 0.0
    total_time_all_datasets = 0.0
    num_successful_runs = 0

    all_results_for_export = []
    if raw_datasets:
        for i, raw_data in enumerate(raw_datasets):
            print("\n" + "="*50)
            print(f"--- Processing and Running MMAS for Dataset {i+1}/{len(raw_datasets)} ---")
            dataset_params = process_evrp_dataset(raw_data)

            print(f"Instance: {dataset_params['instance_name']}")
            print(f"  Max Distance: {dataset_params['max_distance_per_charge']:.2f}")
            mmas_solver = EVRP_MMAS(
                distances=dataset_params["distance_matrix"],
                ants=20,
                iterations=50,
                decay=0.9,
                q0=0.3,
                tau_min=0.01,
                tau_max=10.0,
                current_data=dataset_params,
                max_distance_per_charge=dataset_params['max_distance_per_charge'],
                alpha=1.0,
                beta=1.0
            )

            start_time = time.time()
            best_path, best_distance = mmas_solver.run()
            end_time = time.time()
            execution_time = end_time - start_time

            result_row = {
                'instance_name': dataset_params['instance_name'],
                'best_cost': 'inf',
                'time_secor': f"{execution_time:.6f}",
                'best_path_str': 'No valid path'
            }
            if best_path:
                print(f"\nResult for {dataset_params['instance_name']}: SUCCESS")
                print(f"  Best Distance: {best_distance:.4f}")
                result_row['best_cost'] = f"{best_distance:.6f}"
                formatted_path = mmas_solver.format_path(best_path)
                result_row['best_path_str'] = formatted_path.replace(' -> ', '->')
                total_cost_all_datasets += best_distance
                total_time_all_datasets += execution_time
                num_successful_runs += 1
            else:
                print(f"\nResult for {dataset_params['instance_name']}: FAILURE - No valid path found.")
            all_results_for_export.append(result_row)

        output_filename = 'epdp_20n_mmas_results.csv'
        export_results_to_csv(all_results_for_export, output_filename)
        print("\n\n--- Overall Test Set Results ---")
        print(f"Total datasets processed: {len(raw_datasets)}")
        print(f"Number of successful runs (valid paths found): {num_successful_runs}")
        if num_successful_runs > 0:
            average_cost = total_cost_all_datasets / num_successful_runs
            average_time = total_time_all_datasets / num_successful_runs
            print(f"Average cost (distance) for successful runs: {average_cost:.4f}")
            print(f"Average time for successful runs: {average_time:.4f} seconds")
        else:
            print("No successful runs to calculate an average cost.")