# 铁路调度问题
公共交通运输业有许多可以从数学优化中受益的应用场景。这包括长期规划(运营哪些服务及其频率、服务时刻表编制)、中期规划(将车辆和乘务员分配给列车运行)以及短期规划(重新调度、调度)。本笔记本主要讨论铁路调度问题。给定一组列车及其当前位置和期望路线,我们需要决定列车运行的顺序,使每列车在遵守轨道和中间站点容量限制的前提下尽快到达终点。

## 加载所需的包
如果您已经有 Gurobi 许可证并安装了该软件,则可以跳过安装 `gurobipy`,但请始终确保您使用的是[最新版本](https://www.gurobi.com/downloads/gurobi-software)。

In [1]:
%pip install gurobipy pandas plotly.express nbformat

In [2]:
import itertools as it
import math
import pandas as pd
import plotly.express as px
import datetime as dt
from os import path
import gurobipy as gp
from gurobipy import GRB

## 获取数据
我们将从 CSV 文件加载场景数据。这些文件位于 `data` 文件夹中;每个场景在其子文件夹中包含多个 CSV 文件。这些文件包含以下数据:
- `stations.csv` 包含铁路站点信息。每个站点都有 `capacity`(可同时容纳的列车数量)和 `duration`(在站点停留所需的时间)。站点还有一个 `y` 坐标用于后续可视化时刻表。
- `tracks.csv` 包含连接站点对的铁路轨道。每条轨道都有 `capacity` 和 `duration` 属性,以及 `start` 和 `end` 站点。我们假设轨道可双向通行,所以 `start` 和 `end` 的顺序并不重要。
- `routes.csv` 包含网络中预定义的路线。每条路线是资源(站点和轨道)的列表。
- `trains.csv` 包含需要调度通过网络的列车。每列火车都遵循一条 `route`,但可以从路线上的任何点 `start` 开始。

In [3]:
dataset = 'linear'
folder = f'data/{dataset}'
df_stations = pd.read_csv(path.join(folder, 'stations.csv')).set_index('id')
df_tracks = pd.read_csv(path.join(folder, 'tracks.csv')).set_index('id')
df_routes = pd.read_csv(path.join(folder, 'routes.csv')).set_index('id')
df_trains = pd.read_csv(path.join(folder, 'trains.csv')).set_index('id')

我们将进行一些处理,以便更容易使用我们的数据。

In [None]:
# 对于每条路线，解析站点/轨道字符串
routes = df_routes['resources'].map(lambda route: [int(res) for res in route.split('-')]).to_dict()

# 对于每列列车，找到对应的路线并应用起始位置
trains = df_trains.apply(lambda row: list(it.dropwhile(lambda resource: resource!=row.start, routes[row.route])), axis=1).to_dict()

# 对于每个资源，找到持续时间和容量
duration = pd.concat([df_tracks['duration'], df_stations['duration']]).to_dict()
capacity = pd.concat([df_tracks['capacity'], df_stations['capacity']]).to_dict()

# 查找站点的Y坐标
station_y = df_stations['y'].to_dict()

# 对于每条轨道，找到轨道两侧的站点对
track_stations = df_tracks.apply(lambda track: (track.start, track.end), axis=1)

# 查找资源集合及其类型（S=站点，T=轨道）
resource_type = { key: 'S' for key in df_stations.index } | { key: 'T' for key in df_tracks.index }
resources = resource_type.keys()

我们可以将输入数据显示为一个简单的 Dataframe,将轨道和站点结合在一起。请注意,轨道没有 `y` 值,而站点没有 `endpoints` 值。

In [5]:
pd.DataFrame({'type': resource_type, 'duration': duration, 'capacity': capacity, 'y': station_y, 'endpoints': track_stations})

Unnamed: 0,type,duration,capacity,y,endpoints
0,S,2,2,0.0,
1,T,6,1,,"(0, 2)"
2,S,4,2,6.0,
3,T,8,1,,"(2, 4)"
4,S,2,2,14.0,
5,T,5,1,,"(4, 6)"
6,S,2,2,19.0,


为了使我们的生活更轻松,我们将定义一个小的辅助函数,该函数为特定列车和资源提供沿列车路线给定资源之后的资源。如果给定资源是路线中的最后一个,则返回 `None`。

In [6]:
def get_next_resource(train, resource):
    route = trains[train]
    index = route.index(resource)
    return route[index + 1] if index < len(route)-1 else None

本笔记本中包含的各种数据集如下所示。节点(圆圈)表示站点,而线条表示轨道。虽然 `linear` 模型不涉及分支,但 `yshape` 更复杂,而 `xshape` 增加了更多复杂性。

![](images/Slide2.png)

## 数学优化模型

### 时间方面
此用例中的主要决策是关于时间安排:每列列车何时访问每个资源。

让我们定义一些重要的集合。
- 令 $R$ 为资源集合;这些资源包括站点和轨道。
- 令 $I$ 为列车集合;我们定义 $R_i$ 为列车 $i \in I$ 要访问的(有序)资源集合

对于时间安排,我们引入两组决策变量。
- 主要变量是 $t_{i,r}$,表示列车 $i$ 访问(开始占用)资源 $r$ 的时间。
- 由于我们希望查看总的通过时间,我们还定义 $t^F_i \: \forall i \in I$ 为列车 $i$ 的完成时间

In [None]:
model = gp.Model()
events = gp.tuplelist([(train, resource) for train, route in trains.items() for resource in route])
t = model.addVars(events, name='t')
tf = model.addVars(trains.keys(), name='tf')

# 辅助函数，根据资源是否为None返回't'或'tf'
def tvar(train, resource):
    return t[train, resource] if resource is not None else tf[train]

Set parameter LicenseID to value 692374


如果我们考虑单列列车,我们知道资源是按特定顺序访问的,列车将花费已知的时间访问每个资源。
- 假设每个资源 $r \in R$ 的持续时间为 $dur_r$
- 然后对于列车 $i$ 连续访问的资源对 $(r, r')$,我们知道 $t_{i,r'} = t_{i,r} + dur_r$

因此,我们可以使用该方程链接每列列车的成对连续时间变量。

In [None]:
# 优先级约束
for train, route in trains.items():
    for train_i in range(len(route)-1):
        # 我们假设列车从一开始就存在于网络中，因此
        # 第一个事件必须等于0。
        if train_i==0:
            model.addConstr(t[train, route[train_i+1]] - t[train, route[train_i]] >= duration[route[train_i]])
        else:
            model.addConstr(t[train, route[train_i+1]] - t[train, route[train_i]] >= duration[route[train_i]])            
    model.addConstr(tf[train] - t[train, route[len(route)-1]] == duration[route[len(route)-1]])

# 起始位置
for train in trains:
    resource = trains[train][0]
    t[train, resource].UB=0

### 冲突检测
如果我们按现在的模型求解,由于列车之间没有相互关联的约束,我们将分别优化每列列车。列车之间的关系是由于它们共享某些有限容量的资源而产生的。

为了处理资源容量问题,第一步是关注共享特定资源 $r$ 的一对列车 $i$ 和 $j$。假设列车 $i$ 在访问资源 $r$ 后访问资源 $u$,而列车 $j$ 接下来访问资源 $v$。

现在根据 $t$ 变量的值,我们将面临以下三种情况之一:
- 列车 $i$ 先于 $j$,这意味着列车 $i$ 在列车 $j$ 到达资源 $r$ 之前就已到达 $u$。
- 同样,列车 $j$ 可能先于列车 $i$,当它在列车 $i$ 到达资源 $r$ 之前到达 $v$。
- 最后,列车 $i$ 和 $j$ 可能在某段时间内同时共享资源。

我们可以引入二进制变量来表示上述三种情况:
- 当 $i$ 先于 $j$ 时,$y_{i,j,r}=1$;为此添加约束 $t_{j,r} - t_{i,u} ≥ -M·(1-y_{i,j,r}$)
- 当 $j$ 先于 $i$ 时,$y_{j,i,r}=1$;为此添加约束 $t_{i,r} - t_{j,v} ≥ -M·(1-y_{j,i,r}$) 
- 当 $i$ 和 $j$ 在资源 $r$ 相遇时,$x_{i,j,r}=1$;为此添加一对约束 $t_{j,v} - t_{i,r} ≥ -M·(1-x_{i,j,r}$) 和 $t_{i,u} - t_{j,r} ≥ -M·(1-x_{i,j,r}$)
- 根据定义,必须满足 $y_{i,j,r} + y_{j,i,r} + x_{i,j,r} = 1$

![](images/Slide1.png)

左图显示了两列列车 i 和 i' 如何访问共享资源 r,然后分别继续前往资源 u 和 v。右图显示了资源 r 的冲突检测涉及的各个事件。每个箭头表示头节点必须跟随尾节点。P 箭头表示单列列车的优先级约束。X 约束适用于列车同时占用资源的情况。当一列列车在资源 r 上先于另一列时,应用 Y 箭头。

In [None]:
M = sum(duration) * len(trains)

def create_disjunct_constraints(resource, train_i, resource_u, train_j, resource_v, M):
    t_ir = tvar(train_i, resource)
    t_iu = tvar(train_i, resource_u)
    t_jr = tvar(train_j, resource)
    t_jv = tvar(train_j, resource_v)

    # A在B之前
    y_ab = model.addVar(vtype=GRB.BINARY, name=f'y[{resource},{train_i},{train_j}]')
    model.addConstr(t_jr - t_iu >= -M *(1-y_ab), name=f'y[{resource},{train_i},{train_j}]')

    # B在A之前
    y_ba = model.addVar(vtype=GRB.BINARY, name=f'y[{resource},{train_j},{train_i}]')
    model.addConstr(t_ir - t_jv >= -M *(1-y_ba), name=f'y[{resource},{train_j},{train_i}]')

    # A和B相遇
    x = model.addVar(vtype=GRB.BINARY, name=f'x[{resource},{train_i},{train_j}]')
    model.addConstr(t_jv - t_ir >= -M *(1-x), name=f'x1[{resource},{train_i},{train_j}]')
    model.addConstr(t_iu - t_jr >= -M *(1-x), name=f'x2[{resource},{train_i},{train_j}]')

    # 三种情况有且只有一种成立
    model.addConstr(y_ab + y_ba + x == 1, name=f'xy[{resource},{train_i},{train_j}]')
    return x

### 资源容量
有了上述约束,我们只能根据 t 的值推导出 x 和 y 的正确值,而不会影响 t 的可行域。缺失的部分是我们资源的有限容量。

假设我们有一个容量为 c 的资源,有 n 列列车要访问它。如果 $c ≥ n$,那么我们不需要做任何事。否则,我们需要约束来确保同时不会有超过 c 个资源。我们如何使用列车对之间的 x 变量来建模呢?

想象一下,我们有 c+1 列列车同时访问。这些列车会形成 ${c+1 \choose 2}$ 对列车 (i,j),并且所有对应的变量 $y_{i,j,r}$ 的值都将为 1,因为它们都在同一时间相遇。因此,这些变量的总和将等于 ${c+1 \choose 2}$。

所以要防止这种情况,需要为每个包含 $c+1$ 列列车的子集添加约束。该约束应确保子集中至少有一对列车不会在资源处相遇。换句话说,子集中 y 变量的总和需要小于 ${c+1 \choose 2}$。

生成这些约束的算法如下:
- 遍历所有资源
- 找出访问该资源的所有列车对
- 使用上面的辅助函数为每对列车生成 x 和 y 变量
- 如果列车数量不超过 c,则跳过
- 否则,如果资源容量 $c>1$:
  - 生成所有大小为 $c+1$ 的列车子集
  - 添加一个约束,确保该子集中列车对的 y 变量之和不超过 ${c+1 \choose 2}$
- 否则,我们可以简单地强制 $x=0$ 来防止重叠。

In [None]:
# 冲突
for resource in resources:
    # 查找访问该资源的列车
    resource_events = events.select('*', resource)
    resource_trains = [t for (t,r) in resource_events]
    trains_with_next = []

    # 对于每列列车，找到下一个连续事件
    for train in resource_trains:
        next_resource = get_next_resource(train, resource)
        trains_with_next.append((train, next_resource))

    # X变量列表，稍后用于热点约束
    xlist = {}

    # 考虑列车对
    pairs = it.combinations(trains_with_next, 2)
    for pair in pairs:
        ((train_i, resource_u), (train_j, resource_v)) = pair
        xlist[train_i,train_j] = create_disjunct_constraints(resource, train_i, resource_u, train_j, resource_v, M)

    # 添加热点约束
    if len(resource_trains) <= capacity[resource]:
        continue
    elif(capacity[resource] > 1):
        subsets = it.combinations(resource_trains, capacity[resource]+1)
        rhs = math.comb(capacity[resource]+1, 2) - 1
        for subset in subsets:
            pairs = it.combinations(subset, 2)
            xsel = [xlist[ta,tb] for (ta,tb) in pairs]
            suffix = ','.join(list(subset))
            model.addConstr(gp.quicksum(xsel) <= rhs, f'h[{resource},{suffix}]')            
    else:
        for x in xlist.values():
            x.UB = 0

### 额外的复杂性
如果需要,您可以通过在事件时间上添加约束来增加模型的复杂性。例如,在 `linear` 数据集中,您可能希望强制列车 `F` 在列车 `E` 之前到达。我们可以通过取消下面语句的注释来实现这一点。请注意这如何增加总延迟,因为列车 `E` 必须在车站等待列车 `F`。

In [11]:
#model.addConstr(tf['F'] <= tf['E'])

### 最小化延误
这就是大部分工作了! 我们已经定义了每列列车的时间变量和优先级约束;然后我们添加了列车共享资源时的关系,并确保这些列车永远不会超过资源容量。现在唯一缺少的是目标函数。为了优化乘客体验和运营效率,我们希望所有列车尽快到达终点。我们可以通过几种方式建模:
- 一个简单的目标可以是简单地将终点的到达时间相加。在这里,一列列车有较长的延误,或者相同的总延误分布在多列列车上并不重要。我们在这里实现的是确保有限容量的资源尽快得到使用。
- 另一个目标可以是最小化所有列车到达终点的时间。这类似于其他调度问题中的"完工时间"概念。
- 这些值不仅取决于我们的调度决策,还取决于每列列车无法避免的最小行程时间。我们可以通过从每列列车的计划到达时间中减去这个最小持续时间来补偿。请注意,由于我们只是减去一个常数值,我们将得到与前一点相同的最优解。
- 最后,为了使事情更有趣,我们可以尝试在列车之间分配延误。虽然这可能对运营效率产生负面影响(处理单个延误可能比处理多个小延误更容易),但它确实代表了一个可以被视为"公平性"的概念。

下面我们将采用第三种方法,但可以轻松修改目标函数以使用这里提到的任何想法。第四种方法需要完整的 Gurobi 许可证,因为它涉及二次目标。

In [12]:
def delay(train):
    min_duration = sum(duration[resource] for resource in trains[train])
    return tf[train] - min_duration
    
model.setObjective(gp.quicksum(delay(train) for train in trains))
model.ModelSense = GRB.MINIMIZE

## 求解模型
现在我们准备求解模型。Gurobi 在几秒钟内完成此操作:

In [13]:
model.optimize()

Gurobi Optimizer version 12.0.0 build v12.0.0rc1 (mac64[arm] - Darwin 24.1.0 24B91)

CPU model: Apple M1
Thread count: 8 physical cores, 8 logical processors, using up to 8 threads

Optimize a model with 367 rows, 218 columns and 1069 nonzeros
Model fingerprint: 0x55c46576
Variable types: 38 continuous, 180 integer (180 binary)
Coefficient statistics:
  Matrix range     [1e+00, 1e+02]
  Objective range  [1e+00, 1e+00]
  Bounds range     [1e+00, 1e+00]
  RHS range        [1e+00, 1e+02]
Presolve removed 154 rows and 108 columns
Presolve time: 0.00s
Presolved: 213 rows, 110 columns, 641 nonzeros
Variable types: 26 continuous, 84 integer (84 binary)
Found heuristic solution: objective 70.0000000

Root relaxation: objective 1.000000e+01, 39 iterations, 0.00 seconds (0.00 work units)

    Nodes    |    Current Node    |     Objective Bounds      |     Work
 Expl Unexpl |  Obj  Depth IntInf | Incumbent    BestBd   Gap | It/Node Time

     0     0   10.00000    0    9   70.00000   10.00000  85

## 可视化解决方案
### 检索解决方案

一旦计算出解决方案,我们希望通过多种方式进行可视化验证。为此,我们首先检索所有变量的值并以有用的方式构建此信息。对于每个列车和访问的资源组合,我们收集到达该资源的时间,以及下一个资源和到达下一个资源的时间。

In [14]:
intervals = {
    (train, resource): (tvar(train, resource).X, tvar(train, get_next_resource(train, resource)).X, get_next_resource(train, resource) )
    for (train, resource) in events
}
model.dispose()

### 显示每列列车和每个资源的解决方案

一种快速简便的方法是创建一个(透视)表,其中列表示资源(横轴表示*空间*),行表示列车。单元格显示列车开始占用资源的时间。可以从特定列车的时间从左到右或从右到左增加的事实中轻松看出行驶方向。

In [15]:
df_intervals = pd.DataFrame.from_dict(intervals, orient='index').rename(columns={0:'start',1:'end',2:'next'}).reset_index()
df_intervals['train'], df_intervals['resource'] = zip(*df_intervals['index'])
df_intervals.drop(columns='index', inplace=True)
df_intervals.pivot_table(values='start', columns='resource', index='train', fill_value='')

  df_intervals.pivot_table(values='start', columns='resource', index='train', fill_value='')


resource,0,1,2,3,4,5,6
train,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
A,0.0,6.0,12.0,24.0,32.0,34.0,39.0
B,,0.0,6.0,16.0,24.0,26.0,31.0
C,,,,0.0,8.0,10.0,15.0
D,,,,,0.0,2.0,7.0
E,26.0,20.0,16.0,8.0,0.0,,
F,50.0,44.0,40.0,32.0,26.0,15.0,0.0


### 使用时空图可视化时刻表

列车时刻表通常通过在*垂直*轴上显示空间,在*水平*轴上显示时间来进行可视化。站点在 y 轴上有特定的点,而轨道是这些站点之间的空间。单个列车行程是通过图表连接(位置,时间)对的路径。水平线段表示列车在站点停留。当两列列车在同一方向行驶并且它们的路径交叉时,这意味着较快的列车超过较慢的列车。要构建此类可视化,我们首先需要为每列列车的每个事件计算 Y 坐标。

In [None]:
# 对于每列列车，计算每个事件的Y坐标
train_coordinates = { }
for train, route in trains.items():
    current_resource = None
    for resource in route:
        if resource_type[resource] == 'S':
            train_coordinates[train, resource] = (station_y[resource], station_y[resource])
        else:
            endpoints = track_stations[resource]
            if current_resource != None:
                first = current_resource
                second = endpoints[0] if endpoints[1] == current_resource else endpoints[1]                
            else:
                second = route[1]
                first = endpoints[0] if endpoints[1] == second else endpoints[1]
            train_coordinates[train, resource] = (station_y[first], station_y[second])
        current_resource = resource

一旦我们有了这些坐标,我们可以计算每列列车的单个线段,然后将这些线段连接为路径。我们将站点绘制为虚线。通常线段的斜率表示列车的速度。

请注意,对于 `yshape` 数据集,轨道中有一个交叉点,但两个分支无法在一维中正确可视化。站点 6 被绘制在站点 4 和 8 之间,即使从站点 4 到 8 行驶的列车实际上并没有经过/通过站点 6。

In [17]:
df_diagram = it.chain(*[[{ 'train':train, 'y': train_coordinates[train, resource][0], 'x': start }, { 'train':train, 'y': train_coordinates[train, resource][1], 'x':end }] for (train, resource), (start, end, next) in intervals.items()])
fig = px.line(df_diagram, x='x', y='y', color='train', width=600, height=400)
fig.update_traces(line={'width': 4}, opacity=.7)
for station, y_ab in station_y.items():    
    fig.add_hline(y=y_ab, annotation_text=station, line_dash="dot", line_width=1, line_color='gray')
fig.show()

### 资源占用

最后,我们可以以甘特图的形式可视化每个资源的占用情况。行表示资源,横轴表示时间。当不同列车的两个条形图重叠时,它们在同一时间(部分)占用相同的资源。

In [18]:
t = dt.datetime.combine(dt.date.today(), dt.time())
df_timeline =  pd.DataFrame.from_records([{ 'train':train, 'resource':resource, 'start':t+dt.timedelta(minutes=start), 'end':t+dt.timedelta(minutes=end) } for (train, resource), (start, end, next) in intervals.items()])
fig = px.timeline(df_timeline, x_start='start', x_end='end', y='resource', color='train', height=400, opacity=0.5)
fig.update_yaxes(type='category', showticklabels=True, categoryorder='array', categoryarray=sorted(int(x) for x in resources))
fig.show() 

 # 结论

在本笔记本中,我们研究了铁路运营商的调度问题。我们将调度问题表述为一个 MIP,可以使用 Gurobi 在合理的时间内求解相对较小的实例。当然,随着列车和资源数量的增加,以及单个资源的容量增加,所选方法的模型规模将迅速增长。已经开发了更先进的技术来解决更大的实例。了解更多信息,请参阅我们与 Sintef 合作的[2024 年网络研讨会](https://www.gurobi.com/events/sintef-railway-optimization/)。