# P01：最大熵 OD 矩阵（MaxEnt + IPF）

给定：
- 起点总出发量 \(O_i\)
- 终点总到达量 \(D_j\)

求一个非负矩阵 \(T_{ij}\) 满足边际约束，并使熵最大：
\[
\max_{T_{ij}\ge 0}\; -\sum_{ij} T_{ij}\ln T_{ij}
\]
（只给定边际时，这等价于 **IPF/RAS** 迭代缩放。）

本 Notebook 会：
1. 生成一组“合成”的 \(O, D\)
2. 用 IPF 推断最大熵 OD
3. 检查约束误差、画热力图
4. （可选）加入成本矩阵 \(c_{ij}\) 看重力/阻抗形式


In [None]:
from pathlib import Path
import sys

# Ensure repo root is on sys.path (so `import exercises` / `import projects` works)
ROOT = Path().resolve()
while ROOT != ROOT.parent and not (ROOT / "README.md").exists():
    ROOT = ROOT.parent
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))
print("Repo root:", ROOT)


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from projects.p01_maxent_od.src.ipf import maxent_od, ipf


## 1) 合成边际（O, D）

In [None]:
rng = np.random.default_rng(0)

zones = [f"Z{i:02d}" for i in range(1, 11)]
m = n = len(zones)

O = rng.uniform(50, 150, size=m)   # 出发量
D = rng.uniform(50, 150, size=n)   # 到达量
D = D * (O.sum() / D.sum())        # 调整总量一致

O.sum(), D.sum()

## 2) 最大熵解（只给定边际）

In [None]:
res = maxent_od(O, D, tol=1e-10, max_iter=10_000)
T = res.T

row_err = np.max(np.abs(T.sum(axis=1)-O)/(O+1e-12))
col_err = np.max(np.abs(T.sum(axis=0)-D)/(D+1e-12))

res.n_iter, row_err, col_err, T.min(), T.max()

In [None]:
# 变成 DataFrame 方便查看
df = pd.DataFrame(T, index=zones, columns=zones)
df.iloc[:5, :5]

### 可视化（热力图）

In [None]:
plt.figure()
plt.imshow(T)
plt.title("MaxEnt OD matrix (IPF)")
plt.xlabel("destination j"); plt.ylabel("origin i")
plt.colorbar()
plt.show()


## 3) （可选）加入成本矩阵：\(T_{ij}=a_i b_j e^{-\beta c_{ij}}\)

如果你还知道“平均成本”之类的信息，一个常见的最大熵形式是
\[
T_{ij}\propto e^{-\beta c_{ij}}
\]
再通过 \(a_i,b_j\) 使边际满足。

这里我们演示一个简单做法：把 prior 设成 \(e^{-\beta c_{ij}}\)，再做 IPF。


In [None]:
# 构造一个“距离/成本”矩阵：这里用随机二维坐标的欧式距离当 toy
xy = rng.normal(size=(m, 2))
C = np.sqrt(((xy[:, None, :] - xy[None, :, :])**2).sum(axis=2))
C = C / (C.mean() + 1e-12)

beta = 2.0
prior = np.exp(-beta * C)

res2 = ipf(prior=prior, row_sums=O, col_sums=D, tol=1e-10, max_iter=10_000)
T2 = res2.T

# 看看与纯 MaxEnt（prior=1）相比，是否更“近距离集中”
avg_cost_1 = (T * C).sum() / T.sum()
avg_cost_2 = (T2 * C).sum() / T2.sum()

avg_cost_1, avg_cost_2

In [None]:
plt.figure()
plt.imshow(T2)
plt.title("OD with cost prior exp(-beta*C) + IPF")
plt.xlabel("destination j"); plt.ylabel("origin i")
plt.colorbar()
plt.show()


## 4) 你应该写进知识库的 3 张卡片（强制）
1. `最大熵`：约束 + 拉格朗日乘子 → 指数族
2. `边际约束`：为什么会出现 a_i b_j 的乘法结构
3. `IPF/RAS`：算法步骤、收敛与结构零（prior=0）的含义

完成后：把它们写在 `kb/` 里，并用双链连到这份项目笔记。


## 对比任务：成本敏感性 β 扫描（无成本 prior vs 带成本 prior）

我们把 prior 设为 \(Q_{ij}(\beta)=\exp(-\beta c_{ij})\)。  
当 \(\beta=0\) 时，prior 为常数（等价于“无成本 prior”）；当 \(\beta>0\) 时，prior 会偏向低成本流。

下面扫描若干 \(\beta\)，比较：
- 平均成本 \(\langle c\rangle\)
- 与 \(\beta=0\) 基线 OD 的差异（用 KL 作为一个简单指标）


In [None]:
betas = [0.0, 0.2, 0.5, 1.0, 2.0]

def normalize(T):
    T = T.astype(float)
    return T / T.sum()

def kl(P, Q, eps=1e-12):
    P = np.clip(P, eps, None)
    Q = np.clip(Q, eps, None)
    return float(np.sum(P * (np.log(P) - np.log(Q))))

results = []
T_beta0 = None
for beta in betas:
    Q = np.exp(-beta * cost)
    T = ipf(Q, O, D, max_iter=5000, tol=1e-10)
    avg_cost = float(np.sum(T * cost) / np.sum(T))
    if beta == 0.0:
        T_beta0 = T
    kld = kl(normalize(T), normalize(T_beta0)) if T_beta0 is not None else 0.0
    results.append((beta, avg_cost, kld))

results


In [None]:
import pandas as pd
df = pd.DataFrame(results, columns=["beta", "avg_cost", "KL_vs_beta0"])
df


In [None]:
plt.figure()
plt.plot(df["beta"], df["avg_cost"], marker="o")
plt.xlabel("beta (cost sensitivity)")
plt.ylabel("average cost")
plt.title("OD average cost vs beta")
plt.grid(True)
plt.show()


In [None]:
plt.figure()
plt.plot(df["beta"], df["KL_vs_beta0"], marker="o")
plt.xlabel("beta (cost sensitivity)")
plt.ylabel("KL(T_beta || T_beta0)")
plt.title("OD distribution shift vs beta (KL divergence)")
plt.grid(True)
plt.show()
