In [None]:
%pip install -r requirements.txt --quiet

In [None]:
import osmnx as ox
import networkx as nx
import folium
from folium import FeatureGroup
from folium.plugins import BeautifyIcon
from sklearn.cluster import KMeans
import random, numpy as np
from tqdm import tqdm
from functools import lru_cache
import branca.colormap as cm

# ---------------- CONFIG ----------------
random.seed(42)
np.random.seed(42)

REGION = "√éle-de-France, France"
ENTERPRISE_COORD = (2.3522, 48.8566)  # Paris coordinates

DEPOT_COORDS = {
    "Creteil": (2.4457, 48.7771),
    "Versailles": (2.1301, 48.8049),
    "Cergy": (2.0765, 49.0361),
    "Meaux": (2.8800, 48.9600)
}

# Resources available at each depot
DEPOT_RESOURCES = {
    "Creteil": [1, 2],
    "Versailles": [2, 3],
    "Cergy": [1, 3],
    "Meaux": [3, 4]
}

NUM_CLIENTS = 50
CLIENT_RADIUS_KM = 10
RESOURCES = [1, 2, 3, 4]
TRUCKS_PER_RESOURCE = {1: 3, 2: 3, 3: 3, 4: 3}  # Max trucks per resource type
MAX_DELIVERY_TIME = 180  # minutes, max acceptable delivery time
OUTPUT_DIR = "maps"
OUTPUT_HTML = "supply_chain_aco_advanced_en.html"

# ============================================
# FINANCIAL PARAMETERS
# ============================================
COSTS = {
    "truck_fixed": 150,        # ‚Ç¨/day
    "fuel_per_km": 0.85,
    "driver_per_hour": 25,
    "maintenance_per_km": 0.15
}

TRUCK_CAPACITY = 20

# ACO parameters
ACO_PARAMS = {
    "n_ants": 20,
    "n_iterations": 100,
    "alpha": 1.0,
    "beta": 2.0,
    "evaporation": 0.5,
    "Q": 100
}

# ---------------- LOAD MAP ----------------
print("‚è≥ Loading OSM graph for √éle-de-France...")
G = ox.graph_from_place(REGION, network_type="drive", simplify=True)
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)
print(f"‚úÖ Graph loaded ‚Äî {len(G.nodes)} nodes")

enterprise_node = ox.distance.nearest_nodes(G, *ENTERPRISE_COORD)
depot_nodes = {name: ox.distance.nearest_nodes(G, lon, lat) for name, (lon, lat) in DEPOT_COORDS.items()}

# ---------------- HELPERS ----------------
def gen_random_points_around(lon, lat, n, radius_km):
    """Generate random client points around a depot"""
    pts = []
    for _ in range(n):
        r = radius_km * (random.random() ** 0.5)
        theta = random.random() * 2 * np.pi
        dy = (r * np.cos(theta)) / 110.574
        dx = (r * np.sin(theta)) / (111.320 * np.cos(np.radians(lat)))
        pts.append((lon + dx, lat + dy))
    return pts

per_depot = [NUM_CLIENTS // len(depot_nodes)] * len(depot_nodes)
for i in range(NUM_CLIENTS % len(depot_nodes)):
    per_depot[i] += 1

client_info = []
depot_names = list(depot_nodes.keys())
for di, name in enumerate(depot_names):
    lon, lat = DEPOT_COORDS[name]
    pts = gen_random_points_around(lon, lat, per_depot[di], CLIENT_RADIUS_KM)
    for plon, plat in pts:
        nid = ox.distance.nearest_nodes(G, plon, plat)
        res = random.choice(RESOURCES)
        client_info.append({
            "node": nid,
            "lon": G.nodes[nid]["x"],
            "lat": G.nodes[nid]["y"],
            "resource": res,
            "assigned_depot": None
        })

@lru_cache(maxsize=None)
def travel_time_minutes(u, v):
    if u == v: return 0.0
    try:
        return nx.shortest_path_length(G, u, v, weight="travel_time") / 60.0
    except:
        return float("inf")

@lru_cache(maxsize=None)
def travel_distance_km(u, v):
    if u == v: return 0.0
    try:
        return nx.shortest_path_length(G, u, v, weight="length") / 1000.0
    except:
        return float("inf")

def is_route_valid(nodes_seq):
    """Check if all segments in a route are connected"""
    for i in range(len(nodes_seq)-1):
        if travel_time_minutes(nodes_seq[i], nodes_seq[i+1]) == float("inf"):
            return False
    return True

# Assign clients to nearest compatible depot
for c in client_info:
    res = c["resource"]
    best, bestt = None, float("inf")
    for dname, dnode in depot_nodes.items():
        if res not in DEPOT_RESOURCES[dname]: continue
        t = travel_time_minutes(dnode, c["node"])
        if t < bestt and t != float("inf"):
            best, bestt = dname, t
    c["assigned_depot"] = best
    c["is_reachable"] = (best is not None)

reachable_clients = [c for c in client_info if c.get("is_reachable", True)]
print(f"‚úÖ {len(reachable_clients)}/{len(client_info)} clients reachable")

# Organize clients by resource and depot
clients_by_res_depot = {r: {d: [] for d in depot_names} for r in RESOURCES}
for c in reachable_clients:
    if c["assigned_depot"]:
        clients_by_res_depot[c["resource"]][c["assigned_depot"]].append(c)

# ============================================
# ANT COLONY OPTIMIZATION
# ============================================

class AntColonyTSP:
    """ACO solver for TSP-like routes"""
    def __init__(self, nodes, distance_func, **params):
        self.nodes = nodes
        self.n = len(nodes)
        self.distance = distance_func
        self.alpha = params.get("alpha", 1.0)
        self.beta = params.get("beta", 2.0)
        self.evaporation = params.get("evaporation", 0.5)
        self.Q = params.get("Q", 100)
        self.n_ants = params.get("n_ants", 10)
        self.n_iterations = params.get("n_iterations", 50)
        self.pheromone = np.ones((self.n, self.n))
        self.best_path = None
        self.best_distance = float("inf")

    def _select_next(self, current, unvisited, pheromone_row):
        probabilities = []
        for node in unvisited:
            dist = self.distance(self.nodes[current], self.nodes[node])
            if dist == float("inf") or dist > 1000: probabilities.append(0.0001)
            else:
                tau = pheromone_row[node] ** self.alpha
                eta = (1.0 / max(dist, 0.001)) ** self.beta
                probabilities.append(tau * eta)
        probabilities = np.array(probabilities)
        prob_sum = probabilities.sum()
        if prob_sum == 0 or np.isnan(prob_sum) or np.isinf(prob_sum):
            probabilities = np.ones(len(unvisited)) / len(unvisited)
        else:
            probabilities /= prob_sum
        return np.random.choice(unvisited, p=probabilities)

    def _construct_solution(self):
        path = [0]
        unvisited = list(range(1, self.n))
        while unvisited:
            current = path[-1]
            next_node = self._select_next(current, unvisited, self.pheromone[current])
            path.append(next_node)
            unvisited.remove(next_node)
        path.append(0)
        return path

    def _path_distance(self, path):
        total = 0
        for i in range(len(path)-1):
            dist = self.distance(self.nodes[path[i]], self.nodes[path[i+1]])
            if dist == float("inf") or dist > 1000: return float("inf")
            total += dist
        return total

    def _update_pheromone(self, all_paths, all_distances):
        self.pheromone *= (1 - self.evaporation)
        for path, distance in zip(all_paths, all_distances):
            if distance == float("inf") or np.isnan(distance) or np.isinf(distance):
                continue
            deposit = self.Q / distance
            for i in range(len(path)-1):
                self.pheromone[path[i]][path[i+1]] += deposit
                self.pheromone[path[i+1]][path[i]] += deposit

    def solve(self):
        for iteration in range(self.n_iterations):
            all_paths = []
            all_distances = []
            for ant in range(self.n_ants):
                path = self._construct_solution()
                distance = self._path_distance(path)
                all_paths.append(path)
                all_distances.append(distance)
                if distance < self.best_distance:
                    self.best_distance = distance
                    self.best_path = path
            self._update_pheromone(all_paths, all_distances)
        return self.best_path, self.best_distance

# ============================================
# ROUTES AND COST/TIME CALCULATION
# ============================================

def build_real_route_geometry(G, nodes_seq):
    """Get real coordinates for visualization of a route"""
    coords = []
    for a, b in zip(nodes_seq[:-1], nodes_seq[1:]):
        try:
            spath = nx.shortest_path(G, a, b, weight="travel_time")
            for u, v in zip(spath[:-1], spath[1:]):
                data = min(G.get_edge_data(u, v).values(), key=lambda e: e.get("travel_time", 0))
                if "geometry" in data:
                    xy = [(pt[1], pt[0]) for pt in data["geometry"].coords]
                else:
                    xy = [(G.nodes[u]["y"], G.nodes[u]["x"]), (G.nodes[v]["y"], G.nodes[v]["x"])]
                if coords and coords[-1] == xy[0]: coords.extend(xy[1:])
                else: coords.extend(xy)
        except: pass
    return coords

def calculate_route_cost(nodes_seq):
    """Compute total cost, distance, and time of a route"""
    time_min = 0
    distance_km = 0
    for i in range(len(nodes_seq)-1):
        t = travel_time_minutes(nodes_seq[i], nodes_seq[i+1])
        d = travel_distance_km(nodes_seq[i], nodes_seq[i+1])
        if t == float("inf") or d == float("inf"): return {"time_min": float("inf"), "distance_km": float("inf"), "cost": float("inf")}
        time_min += t
        distance_km += d
    cost = COSTS["truck_fixed"] + distance_km*(COSTS["fuel_per_km"] + COSTS["maintenance_per_km"]) + (time_min/60)*COSTS["driver_per_hour"]
    return {"time_min": time_min, "distance_km": distance_km, "cost": cost}

# ============================================
# TRUCK ROUTE OPTIMIZATION
# ============================================

truck_routes = []
truck_id = 0
skipped_routes = 0
clients_assigned_to_trucks = set()

for res in RESOURCES:
    for depot in depot_names:
        clist = clients_by_res_depot[res][depot]
        if not clist: continue
        optimal_trucks = max(1, (len(clist)+TRUCK_CAPACITY-1)//TRUCK_CAPACITY)
        k = min(optimal_trucks, TRUCKS_PER_RESOURCE[res], len(clist))
        coords = np.array([[c["lon"], c["lat"]] for c in clist])
        if len(coords) <= k:
            labels = np.arange(len(coords))
        else:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = kmeans.fit_predict(coords)
        for cluster_idx in range(k):
            sub_clients = [clist[i] for i in range(len(clist)) if labels[i] == cluster_idx]
            if not sub_clients: continue
            depot_node = depot_nodes[depot]
            route_nodes = [depot_node] + [c["node"] for c in sub_clients]
            def dist_func(n1,n2): return travel_time_minutes(n1,n2)
            if len(route_nodes) > 2:
                aco = AntColonyTSP(route_nodes, dist_func, **ACO_PARAMS)
                best_path_indices, best_dist = aco.solve()
                if best_dist != float("inf") and not np.isnan(best_dist):
                    optimized_nodes = [route_nodes[i] for i in best_path_indices]
                    nodes_seq = [enterprise_node] + optimized_nodes
                else:
                    nodes_seq = [enterprise_node, depot_node] + [c["node"] for c in sub_clients] + [enterprise_node]
            else:
                nodes_seq = [enterprise_node, depot_node] + [c["node"] for c in sub_clients] + [enterprise_node]
            if not is_route_valid(nodes_seq):
                skipped_routes += 1
                continue
            metrics = calculate_route_cost(nodes_seq)
            if metrics["cost"] == float("inf"): skipped_routes += 1; continue
            route_geom = build_real_route_geometry(G, nodes_seq)
            for c in sub_clients: clients_assigned_to_trucks.add(id(c))
            truck_routes.append({
                "truck_id": truck_id,
                "resource": res,
                "depot": depot,
                "n_clients": len(sub_clients),
                "coords": route_geom,
                **metrics
            })
            truck_id += 1

valid_truck_routes = [r for r in truck_routes if r["cost"] != float("inf")]
optimized_cost = sum(r["cost"] for r in valid_truck_routes)
optimized_trucks = len(valid_truck_routes)
total_distance = sum(r["distance_km"] for r in valid_truck_routes)
total_time = sum(r["time_min"] for r in valid_truck_routes)
baseline_trucks = len(reachable_clients)
baseline_cost = baseline_trucks * 300  # Approximation baseline

# ============================================
# INTERACTIVE MAP
# ============================================

m = folium.Map(location=[48.8566, 2.35], zoom_start=9, tiles="cartodbpositron")

# Enterprise
folium.Marker(
    location=(G.nodes[enterprise_node]["y"], G.nodes[enterprise_node]["x"]),
    popup="üè¢ Enterprise (Paris)",
    icon=BeautifyIcon(icon="building", border_color="black", text_color="black")
).add_to(m)

# Depots
for d, n in depot_nodes.items():
    folium.Marker(
        location=(G.nodes[n]["y"], G.nodes[n]["x"]),
        popup=f"üì¶ Depot {d}<br>Resources: {DEPOT_RESOURCES[d]}",
        icon=BeautifyIcon(icon="archive", border_color="red", text_color="red")
    ).add_to(m)

# Clients
resource_colors = {1:"blue",2:"purple",3:"orange",4:"darkgreen"}
for i,c in enumerate(client_info):
    if c.get("is_reachable",True):
        color = resource_colors[c["resource"]]
        fill_opacity = 0.7
        popup_text = f"‚úÖ Client {i}<br>Resource {c['resource']}<br>Depot: {c['assigned_depot']}"
    else:
        color = "red"
        fill_opacity = 0.3
        popup_text = f"‚ùå Client {i} (NOT REACHABLE)<br>Resource {c['resource']}"
    folium.CircleMarker(
        location=(c["lat"],c["lon"]),
        radius=5,
        color=color,
        fill=True,
        fillOpacity=fill_opacity,
        popup=popup_text
    ).add_to(m)

# Truck routes
colors = ["green","darkred","darkblue","black","orange","pink","gray","teal","brown","cyan"]
for r in valid_truck_routes:
    fg = FeatureGroup(
        name=f"üöõ Truck {r['truck_id']} | Res {r['resource']} | {r['depot']} | {r['n_clients']} clients | {r['cost']:.0f}‚Ç¨ | {r['time_min']:.0f}min",
        show=False
    )
    folium.PolyLine(r["coords"], color=colors[r["truck_id"]%len(colors)], weight=5, opacity=0.8).add_to(fg)
    fg.add_to(m)

folium.LayerControl(collapsed=False).add_to(m)

# Legend
clients_served = sum(r["n_clients"] for r in valid_truck_routes)
savings = baseline_cost - optimized_cost
savings_pct = (savings / baseline_cost) * 100 if baseline_cost > 0 else 0
truck_reduction = baseline_trucks - optimized_trucks

legend_html = f"""
<div style="position: fixed; bottom: 30px; left: 30px; width: 280px;
            background-color: white; border:2px solid grey; z-index:9999;
            font-size:12px; padding:12px; border-radius:5px; box-shadow: 2px 2px 6px rgba(0,0,0,0.3);">
<b style="font-size:14px;">üí∞ Supply Chain - ROI Analysis</b><hr>
<b>üìä Performance:</b><br>
‚Ä¢ Trucks: {optimized_trucks} <span style="color:green;">(-{truck_reduction})</span><br>
‚Ä¢ Clients: {clients_served}/{len(client_info)}<br>
‚Ä¢ Distance: {total_distance:.0f} km<br>
‚Ä¢ Time: {total_time:.0f} min<br><hr>
<b>üí∏ Financial:</b><br>
‚Ä¢ Cost before: <span style="color:red;">{baseline_cost:,.0f}‚Ç¨</span><br>
‚Ä¢ Cost after: <span style="color:green;">{optimized_cost:,.0f}‚Ç¨</span><br>
‚Ä¢ Savings: {savings_pct:.1f}%</div>
"""
m.get_root().html.add_child(folium.Element(legend_html))
OUTPUT_HTML = os.join(OUTPUT_DIR, OUTPUT_HTML)
m.save(OUTPUT_HTML)
print(f"‚úÖ Map saved to {OUTPUT_HTML}")
