In [0]:
%pip install folium
%pip install -q --extra-index-url=https://pypi.nvidia.com cuopt-server-cu12 cuopt-sh-client cuopt-cu12==25.8.*
%restart_python

In [0]:
NUM_SHIPMENTS = 20_000
NUM_ROUTES = int(round(NUM_SHIPMENTS / 250 )) # total trucks available
MAX_EV = 4000 # max capacity
MAX_VAN = 8000
DEPOT_LAT, DEPOT_LON = 39.7685, -86.1580 # start and end point for each route
SOLVER_MINUTES = 10


catalog = "default"
schema = f"routing"
shipments_table = f"{catalog}.{schema}.raw_shipments_{NUM_SHIPMENTS}"
mapping_table = f"{catalog}.{schema}.shipment_ids_map_{NUM_SHIPMENTS}"
clustered_table = f"{catalog}.{schema}.shipment_clusters_gpu_{NUM_SHIPMENTS}"
distances_table = f"{catalog}.{schema}.distances_by_route_gpu_{NUM_SHIPMENTS}"
routing_table = f"{catalog}.{schema}.routing_unified_by_cluster_gpu_{NUM_SHIPMENTS}"

In [0]:
%sh nvidia-smi

In [0]:
import cudf
from cuopt import routing, distance_engine
import pandas as pd
import numpy as np
from pyspark.sql import functions as F

In [0]:
cost = cudf.DataFrame([[0,3,1,2],[3,0,1,2],[2,3,0,2],[2,3,1,0]], dtype='float32')
n_locations = cost.shape[0]
n_vehicles = 2
n_orders = 3  # one order per task node

dm = routing.DataModel(n_locations, n_vehicles, n_orders)
dm.add_cost_matrix(cost)
dm.add_transit_time_matrix(cost.copy(deep=True))  # separate if times differ

ss = routing.SolverSettings()
ss.set_verbose_mode(True)
# ss.set_time_limit(5)
sol = routing.Solve(dm, ss)

print(sol.get_route())      # pandas-like table
sol.display_routes()        # pretty print 

In [0]:
# Hello World: cuOpt with CSR Waypoint Graph (no dense N×N)

import numpy as np
import cudf
from cuopt import distance_engine, routing


base = np.array([
    [0,3,1,2],
    [3,0,1,2],
    [2,3,0,2],
    [2,3,1,0]
], dtype=np.float32)

V = base.shape[0]
# Build CSR: for each src i, edges to all j != i
indices = []
weights = []
offsets = [0]
for i in range(V):
    for j in range(V):
        if i == j:
            continue
        indices.append(j)
        weights.append(float(base[i, j]))
    offsets.append(len(indices))

indices = np.asarray(indices, dtype=np.int32)        # size E = 12
weights = np.asarray(weights, dtype=np.float32)      # size E
offsets = np.asarray(offsets, dtype=np.int32)        # size V+1

# ----- 2) Build Waypoint graph and compute compact matrix for targets -----
wg = distance_engine.WaypointMatrix(offsets, indices, weights)
targets = np.arange(V, dtype=np.int32)               # use all 4 nodes; 0 will be the depot
cost = wg.compute_cost_matrix(targets)               # cudf.DataFrame (4x4)

# ----- 3) Build a minimal routing model and solve -----
n_locations = len(targets)
n_vehicles  = 2
n_orders    = 3

dm = routing.DataModel(n_locations, n_vehicles, n_orders)

# vehicles start/end at depot (index 0 in our target set)
dm.set_vehicle_locations(
    cudf.Series([0]*n_vehicles),   # starts
    cudf.Series([0]*n_vehicles)    # ends
)

# 3 orders at locations 1,2,3  (indices within the compact matrix)
dm.set_order_locations(cudf.Series([1,2,3]))

# Primary matrices
dm.add_cost_matrix(cost)

# Optional: require both vehicles to be used (just to see multiple routes)
dm.set_min_vehicles(n_vehicles)

ss = routing.SolverSettings()
ss.set_verbose_mode(True)
# ss.set_time_limit(5)  # optional

sol = routing.Solve(dm, ss)
print(sol.get_route())   # pandas-like table
sol.display_routes()     # pretty print

In [0]:
DEPOT_ID = 0

distances_df = (
  spark.read.table(distances_table)
  .select("global_idx_source", "global_idx_dest", "duration_seconds")
)

rev_to_depot = (
    distances_df
      .where(F.col("global_idx_dest") == DEPOT_ID)
      .select(
          F.lit(DEPOT_ID).alias("global_idx_source"),
          F.col("global_idx_source").alias("global_idx_dest"),
          F.col("duration_seconds")
      )
)

distances_df = (
    distances_df
    .unionByName(rev_to_depot)
    # .orderBy("global_idx_source", "global_idx_dest")
)  
display(distances_df)

**Questions**
- Works for 20k packages on a 10 min solver time limit, but do longer solves take more memory?
- Do we need both cost and time matrices?
- Will it help memory constraints to round durations to ints?
- (tentatively done) How do we set a max duration per truck?

**Notes**
- OOMs at 40k packages
- TODO: run trials on CPU vs GPU
- not in cuopt yet, but cupy supports sparse matrices https://docs.cupy.dev/en/v9.6.0/reference/sparse.html

In [0]:
# ---------------------------
# 1) Pull edges and normalize
# ---------------------------
pdf = (
    distances_df
      .select("global_idx_source","global_idx_dest","duration_seconds")
      .toPandas()
)

# Build stable 0..n-1 index space for ALL nodes seen in src or dest
all_nodes = pd.Index(pd.unique(pd.concat([pdf["global_idx_source"],
                                          pdf["global_idx_dest"]], ignore_index=True)))
node2pos = {int(g): i for i, g in enumerate(all_nodes)}
n = len(all_nodes)

pdf["src_idx"] = pdf["global_idx_source"].map(node2pos).astype(np.int32)
pdf["dst_idx"] = pdf["global_idx_dest"].map(node2pos).astype(np.int32)
pdf["cost"]    = pdf["duration_seconds"].astype(np.float32)

# ---------------------------
# 2) Build CSR (offsets/indices/weights)
# ---------------------------
pdf = pdf.sort_values(["src_idx","dst_idx"], kind="mergesort")

indices = pdf["dst_idx"].to_numpy(dtype=np.int32)       # E-length array of neighbor dsts
weights = pdf["cost"].to_numpy(dtype=np.float32)        # E-length array of edge costs

# counts per src over the FULL 0..n-1 range (nodes with 0 out-edges still get an offset)
counts = (
    pdf.groupby("src_idx").size()
       .reindex(range(n), fill_value=0)
       .to_numpy(dtype=np.int32)
)

# offsets[v]..offsets[v+1]-1 slice into `indices`/`weights`
offsets = np.concatenate([[0], np.cumsum(counts, dtype=np.int64)]).astype(np.int32)

# ---------------------------
# 3) Waypoint graph + compact matrix for selected targets
# ---------------------------
wg = distance_engine.WaypointMatrix(offsets, indices, weights)
order_globals = [int(x) for x in all_nodes if int(x) != DEPOT_ID]

# Targets are the nodes we want in the compact matrix: [depot] + orders
targets = np.array(
    [node2pos[DEPOT_ID]] + [node2pos[g] for g in order_globals],
    dtype=np.int32
)

cost = wg.compute_cost_matrix(targets)   # cudf.DataFrame (len(targets) x len(targets))
time = cost.copy(deep=True)              # use cost as time for now

# ---------------------------
# 4) Routing model and solve
# ---------------------------
MAX_ROUTE_SECONDS = 60*60*9 # 9 hours max route time
n_locations = len(targets)
n_orders    = len(order_globals)

dm = routing.DataModel(n_locations, NUM_ROUTES, n_orders)
dm.set_vehicle_locations(cudf.Series([0]*NUM_ROUTES), cudf.Series([0]*NUM_ROUTES))
dm.set_order_locations(cudf.Series(np.arange(1, n_locations, dtype=np.int32)))
dm.set_vehicle_max_times(cudf.Series(np.full(NUM_ROUTES, MAX_ROUTE_SECONDS, dtype=np.float32)))

# Primary matrices
dm.add_cost_matrix(cost)
dm.add_transit_time_matrix(time)
dm.set_min_vehicles(NUM_ROUTES)

ss = routing.SolverSettings()
ss.set_verbose_mode(True)
ss.set_time_limit(SOLVER_MINUTES * 60)  # time limit in seconds

sol = routing.Solve(dm, ss)
sol.display_routes()

In [0]:
route_pdf = sol.get_route().to_pandas()
optimized_routes_df = spark.createDataFrame(route_pdf)
optimized_routes_df.write.mode("overwrite").saveAsTable(routing_table)
routing_df = spark.read.table(routing_table)
display(routing_df)

In [0]:
routing_df = spark.read.table("default.routing.routing_unified_by_cluster_gpu_20000")
display(routing_df)

Databricks visualization. Run in Databricks to view.

In [0]:
NUM_SHIPMENTS = 20_000
catalog = "default"
schema = f"routing"
shipments_table = f"{catalog}.{schema}.raw_shipments_{NUM_SHIPMENTS}"

In [0]:
from utils.plotter import plot_route_folium
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Recreate the order_globals mapping from cell 9
# This maps compact matrix positions to actual global indices
DEPOT_ID = 0

# Get all global indices except depot (same logic as cell 9)
distances_df = spark.read.table("default.routing.distances_by_route_gpu_20000")
all_nodes_df = (
    distances_df
    .select("global_idx_source").distinct()
    .union(distances_df.select("global_idx_dest").distinct())
    .distinct()
)

order_globals_df = (
    all_nodes_df
    .where(F.col("global_idx_source") != DEPOT_ID)
    .orderBy("global_idx_source")
    .withColumn("compact_location", F.row_number().over(Window.orderBy("global_idx_source")))
)

print("Order globals mapping (first 10):")
order_globals_df.show(10)

# Get a sample route
route = routing_df.select("route").limit(1).collect()[0][0]
print(f"\nAnalyzing route: {route}")

# Add route_index column
window_spec = Window.partitionBy("route").orderBy("arrival_stamp")
single_route_df = (
    routing_df
    .where(f"route={route}")
    .withColumn("route_index", F.row_number().over(window_spec) - 1)
)

print("Route data:")
single_route_df.select("route", "location", "arrival_stamp", "route_index", "type").show()

# Handle depot (location=0)
depot_df = (
    single_route_df
    .where(F.col("location") == 0)
    .withColumn("latitude", F.lit(39.7685))
    .withColumn("longitude", F.lit(-86.1580))
    .withColumn("origin_id", F.lit("DEPOT"))
)

# For location>0, map through order_globals to get actual global_idx
mapping_df = spark.read.table("default.routing.shipment_ids_map_20000")

shipment_df = (
    single_route_df
    .where(F.col("location") > 0)
    .join(
        order_globals_df,
        single_route_df["location"] == order_globals_df["compact_location"],
        "inner"
    )
    .join(
        mapping_df,
        order_globals_df["global_idx_source"] == mapping_df["global_idx"],
        "inner"
    )
    .select(
        single_route_df["route"],
        single_route_df["arrival_stamp"],
        single_route_df["truck_id"],
        single_route_df["location"],
        single_route_df["type"],
        single_route_df["route_index"],
        mapping_df["latitude"],
        mapping_df["longitude"],
        mapping_df["package_id"].alias("origin_id")
    )
)

print(f"\nShipment data after proper mapping: {shipment_df.count()} records")
if shipment_df.count() > 0:
    print("Sample shipment data:")
    shipment_df.select("location", "latitude", "longitude", "origin_id").show(5)

# Combine depot and shipment data
combined_route_df = depot_df.unionByName(shipment_df)
single_route_pdf = combined_route_df.orderBy("route_index").toPandas()

print(f"\nFinal route visualization data: {len(single_route_pdf)} stops")
if len(single_route_pdf) > 0:
    print("All stops:")
    print(single_route_pdf[['route_index', 'location', 'latitude', 'longitude', 'origin_id']])
    
    # Plot the route
    m = plot_route_folium(single_route_pdf)
    display(m)
else:
    print("No route data to visualize!")

%md

&copy; 2025 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source).  All included or referenced third party libraries are subject to the licenses set forth below.

| library                | description                                                                                      | license      | source                                                    |
|------------------------|--------------------------------------------------------------------------------------------------|--------------|-----------------------------------------------------------|
| OSRM Backend Server    | High performance routing engine written in C++14 designed to run on OpenStreetMap data           | BSD 2-Clause "Simplified" License | https://github.com/Project-OSRM/osrm-backend              |
| osmnx                  | Download, model, analyze, and visualize street networks and other geospatial features from OpenStreetMap in Python | MIT License  | https://github.com/gboeing/osmnx                          |
| ortools                | Operations research tools developed at Google for combinatorial optimization                     | Apache License 2.0 | https://github.com/google/or-tools                        |
| folium                 | Visualize data in Python on interactive Leaflet.js maps                                          | MIT License  | https://github.com/python-visualization/folium            |
| dash                   | Python framework for building analytical web applications and dashboards; built on Flask, React, and Plotly.js | MIT License  | https://github.com/plotly/dash                            |
| branca                 | Library for generating complex HTML+JS pages in Python; provides non-map-specific features for folium | MIT License  | https://github.com/python-visualization/branca            |
| plotly                 | Open-source Python library for creating interactive, publication-quality charts and graphs        | MIT License  | https://github.com/plotly/plotly.py                       |
ray |	Flexible, high-performance distributed execution framework for scaling Python workflows |	Apache2.0 |	https://github.com/ray-project/ray