In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [35]:
class ChargingProbabilityCalculator:
    """
    基于 Yang et al. (2016) 的嵌套 Logit 模型计算充电概率。
    适用于 ChargingIntegratedEnvironment 的网格环境。
    """
    def __init__(self, grid_size):
        self.grid_size = grid_size
        
        # 模型系数 (参考 Yang et al. 2016, Table 3, Model M4) 
        self.coeffs = {
            # 下层: 充电站选择 (Route with charging)
            'beta_travel_time': -0.105,   # 行驶时间系数 (假设与距离成正比)
            'beta_travel_cost': -0.127,   # 行驶成本系数
            'beta_charge_time': -0.079,   # 充电时间系数
            'beta_distance_os': -0.130,   # 起点到充电站距离系数
            'beta_aocd': -0.387,          # 角度成本系数 (AOCD)
            
            # 上层: 是否充电 (Charging decision)
            'alpha_soc': 0.163,           # 初始 SOC 系数 (正值表示 SOC 越高越倾向于不充电)
            'asc_no_charge': -11.003,     # 不充电的替代特定常数 (注意：论文中符号可能根据基准不同而不同，这里参考 M4)
            'mu_charge': 0.468,           # 充电巢的包容值系数 (Inclusive Value Coefficient/Scale parameter)
            
            # 假设转换因子 (将网格距离转换为分钟/成本)
            'grid_to_minutes': 2.0,       # 假设网格每移动一格需要 2 分钟
            'grid_to_cost': 0.5           # 假设网格每移动一格花费 0.5 单位
        }

    def _get_coords(self, location_id):
        """将一维 ID 转换为 (x, y) 网格坐标"""
        x = location_id % self.grid_size
        y = location_id // self.grid_size
        return np.array([x, y])

    def _calculate_manhattan_distance(self, pos1, pos2):
        """计算网格上的曼哈顿距离"""
        return np.abs(pos1[0] - pos2[0]) + np.abs(pos1[1] - pos2[1])

    def _calculate_aocd(self, origin_coords, station_coords, dest_coords):
        """
        计算 AOCD (Angular formed by Origin, Charging Station, and Destination) [cite: 203]
        返回值为弧度 (radians)
        """
        # 向量 O->S
        vec_os = station_coords - origin_coords
        # 向量 O->D
        vec_od = dest_coords - origin_coords
        
        # 如果起点和终点重合，或起点和充电站重合，角度为 0
        norm_os = np.linalg.norm(vec_os)
        norm_od = np.linalg.norm(vec_od)
        
        if norm_os == 0 or norm_od == 0:
            return 0.0
            
        # 计算余弦相似度
        cos_theta = np.dot(vec_os, vec_od) / (norm_os * norm_od)
        # 截断以防止数值误差导致 acos 报错
        cos_theta = np.clip(cos_theta, -1.0, 1.0)
        
        return np.arccos(cos_theta)

    def calculate_probabilities(self, origin_id, dest_id, current_soc, charging_stations):
        """
        计算选择各个充电站的概率以及选择不充电的概率。
        
        Args:
            origin_id (int): 起点 ID
            dest_id (int): 终点 ID
            current_soc (float): 当前 SOC (0-100)
            charging_stations (list of dict): 充电站列表，每个元素形如 
                                            {'id': int, 'wait_time': float}
        
        Returns:
            dict: 包含 'prob_no_charge', 'prob_charge', 和每个站点的选择概率
        """
        origin_coords = self._get_coords(origin_id)
        dest_coords = self._get_coords(dest_id)
        
        # --- 1. 计算下层：各充电站路线的效用 (Utility of routes with charging) ---
        utilities_stations = []
        station_indices = []
        
        for station in charging_stations:
            station_id = station['id']
            # 获取充电时间 (Wait time + Service time) [cite: 202]
            # 假设 station['wait_time'] 已经包含了估计的排队和服务时间
            t_charge = station.get('estimated_time', 30.0) 
            
            station_coords = self._get_coords(station_id)
            
            # 计算路线属性
            # 路线：Origin -> Station -> Destination
            dist_os = self._calculate_manhattan_distance(origin_coords, station_coords)
            dist_sd = self._calculate_manhattan_distance(station_coords, dest_coords)
            total_dist = dist_os + dist_sd
            
            travel_time = total_dist * self.coeffs['grid_to_minutes']
            travel_cost = total_dist * self.coeffs['grid_to_cost']
            
            # 计算角度成本 AOCD
            angle = self._calculate_aocd(origin_coords, station_coords, dest_coords)
            
            # 应用公式 (1) [cite: 400, 422]
            # V_k = B_t*T + B_c*C + B_ct*T_charge + B_dist*Dist_OS + B_ang*Angle
            utility = (
                self.coeffs['beta_travel_time'] * travel_time +
                self.coeffs['beta_travel_cost'] * travel_cost +
                self.coeffs['beta_charge_time'] * t_charge +
                self.coeffs['beta_distance_os'] * dist_os +  # 注意这里用的是 O->S 的物理距离 [cite: 429]
                self.coeffs['beta_aocd'] * angle
            )
            
            utilities_stations.append(utility)
            station_indices.append(station_id)
        
        utilities_stations = np.array(utilities_stations)
        
        # --- 2. 计算上层：是否充电的效用 (Utility of Charging Decision) ---
        
        # 计算充电巢的包容值 (Inclusive Value)
        # IV_charge = ln(sum(exp(V_k)))
        # 使用 logsumexp 技巧防止溢出
        max_u = np.max(utilities_stations)
        sum_exp_stations = np.sum(np.exp(utilities_stations - max_u))
        iv_charge = max_u + np.log(sum_exp_stations)
        
        # 计算充电的总效用 V_charge
        # V_charge = Constant + Mu * IV_charge
        # 论文中没有给 V_charge 的显式 Constant (归一化为0或包含在 No-charge 中)，
        # 但提到了 driver characteristics，这里简化假设无特定用户画像
        v_charge = self.coeffs['mu_charge'] * iv_charge
        
        # 计算不充电的效用 V_no_charge [cite: 391, 392]
        # V_nc = Alpha_SOC * SOC + Constant
        v_no_charge = (self.coeffs['alpha_soc'] * current_soc) + self.coeffs['asc_no_charge']
        
        # --- 3. 计算最终概率 ---
        
        # 上层概率 P(Charge) vs P(No Charge)
        # P(Charge) = exp(V_c) / (exp(V_c) + exp(V_nc))
        # 同样使用数值稳定方法
        max_upper = max(v_charge, v_no_charge)
        exp_c = np.exp(v_charge - max_upper)
        exp_nc = np.exp(v_no_charge - max_upper)
        sum_upper = exp_c + exp_nc
        
        p_choose_charge = exp_c / sum_upper
        p_no_charge = exp_nc / sum_upper
        
        # 下层条件概率 P(k | Charge)
        # P(k|C) = exp(V_k) / sum(exp(V_j))
        p_station_given_charge = np.exp(utilities_stations - max_u) / sum_exp_stations
        
        # 联合概率 P(k) = P(k|C) * P(C)
        final_station_probs = p_station_given_charge * p_choose_charge
        
        result = {
            'action_no_charge': p_no_charge,
            'action_charge': p_choose_charge,
            'station_probs': dict(zip(station_indices, final_station_probs))
        }
        
        return result

In [42]:
grid_size = 15
calculator = ChargingProbabilityCalculator(grid_size)

# 模拟输入数据
origin_loc = 0      # (0, 0)
dest_loc = 224      # (14, 14) roughly
current_soc = 40.0  # 60% SOC (中等，可能倾向于充电) [cite: 244]

# 假设有三个充电站，带有预估的充电时间 (min)
stations_data = [
    {'id': 35, 'estimated_time': 20.0},  # 较近
    {'id': 112, 'estimated_time': 40.0}, # 中间
    {'id': 200, 'estimated_time': 15.0}  # 较远但快
]

probs = calculator.calculate_probabilities(origin_loc, dest_loc, current_soc, stations_data)

print(f"当前 SOC: {current_soc}%")
print(f"不充电概率: {probs['action_no_charge']:.4f}")
print(f"充电总概率: {probs['action_charge']:.4f}")
print("-" * 20)
for sid, p in probs['station_probs'].items():
    print(f"选择充电站 {sid} 的概率: {p:.4f}")

当前 SOC: 40.0%
不充电概率: 0.5413
充电总概率: 0.4587
--------------------
选择充电站 35 的概率: 0.3163
选择充电站 112 的概率: 0.0307
选择充电站 200 的概率: 0.1118


In [43]:
import numpy as np
import math

class RelocationManager:
    """
    基于 Ashkrof et al. (2024) 管理司机的重定位行为。
    处理 Zone 定义、Surge 区域以及概率计算。
    """
    def __init__(self, grid_size):
        self.grid_size = grid_size
        self.surge_zones = set()       # 存储属于 Surge 区域的 Grid ID
        self.high_demand_zones = set() # 存储属于 High Demand 区域的 Grid ID
        self.city_center_zones = set() # 存储属于市中心的 Grid ID (影响 Waiting 效用)
        
        # --- 模型系数 (基于 Table 3, Scenario 1 Full Model) [cite: 974] ---
        self.coeffs = {
            # Waiting (W)
            'asc_waiting': -0.283,             # 替代特定常数
            'beta_wait_time': -0.022,          # 已等待时间 (分钟)
            'beta_loc_city': 0.322,            # 若当前在市中心 (City Center)
            'beta_parking': 0.277,             # 若有停车位 (假设 Grid 环境部分点有)
            'beta_weekend': 0.350,             # 若是周末 (可作为环境全局变量)
            
            # Surge Area (S) & High Demand (H)
            'beta_trips_sh': 0.080,            # 已完成订单数 (对 S 和 H 均有效)
            
            # Surge Area Only (S)
            'beta_surge_price': 0.177,         # 溢价金额 ($)
            'beta_time_to_surge': -0.020,      # 去 Surge 区的行驶时间 (分钟)
            
            # High Demand Only (H)
            'beta_time_to_hd': -0.037,         # 去 HD 区的行驶时间 (分钟)
            
            # Cruising (C)
            # Cruising 通常作为基准 (Utility=0)，但如果有特定变量影响 C，如下：
            'beta_familiarity_c': -0.312,      # 若熟悉该区域，Cruising 的效用降低 (倾向于 Wait)
            
            # 辅助转换
            'grid_to_min': 2.0                 # 网格距离转分钟
        }

    def update_zone_info(self, surge_ids, hd_ids, city_center_ids):
        """更新网格的区域属性 (由环境每 Epoch 调用)"""
        self.surge_zones = set(surge_ids)
        self.high_demand_zones = set(hd_ids)
        self.city_center_zones = set(city_center_ids)

    def _get_coords(self, location_id):
        return np.array([location_id % self.grid_size, location_id // self.grid_size])

    def _manhattan_dist(self, id1, id2):
        pos1 = self._get_coords(id1)
        pos2 = self._get_coords(id2)
        return np.abs(pos1[0] - pos2[0]) + np.abs(pos1[1] - pos2[1])

    def _find_nearest_zone_dist(self, current_loc, target_zones):
        """找到最近的目标区域网格点的距离"""
        if not target_zones:
            return float('inf'), None
        
        min_dist = float('inf')
        nearest_id = None
        
        # 简单遍历 (可优化为 KDTree 或预计算矩阵)
        for zid in target_zones:
            d = self._manhattan_dist(current_loc, zid)
            if d < min_dist:
                min_dist = d
                nearest_id = zid
        return min_dist, nearest_id

    def calculate_relocation_utilities(self, agent_state, global_state):
        """
        计算四个重定位选项的效用
        """
        loc = agent_state['location']
        trips = agent_state['completed_trips']
        wait_t = agent_state['current_wait_time']
        
        # 1. Utility of Waiting (W)
        # 检查是否在市中心
        is_city = 1 if loc in self.city_center_zones else 0
        has_parking = 1 # 简化假设，或者从 grid 属性读取
        is_weekend = global_state.get('is_weekend', 0)
        
        u_wait = (self.coeffs['asc_waiting'] + 
                  self.coeffs['beta_wait_time'] * wait_t +
                  self.coeffs['beta_loc_city'] * is_city + 
                  self.coeffs['beta_parking'] * has_parking +
                  self.coeffs['beta_weekend'] * is_weekend)

        # 2. Utility of Driving to Surge Area (S)
        dist_s, nearest_s = self._find_nearest_zone_dist(loc, self.surge_zones)
        if nearest_s is not None:
            time_s = dist_s * self.coeffs['grid_to_min']
            surge_val = global_state.get('current_surge_price', 0.0)
            
            u_surge = (self.coeffs['beta_trips_sh'] * trips +
                       self.coeffs['beta_surge_price'] * surge_val +
                       self.coeffs['beta_time_to_surge'] * time_s)
        else:
            u_surge = -999.0 # 不可行选项

        # 3. Utility of Driving to High Demand Area (H)
        dist_h, nearest_h = self._find_nearest_zone_dist(loc, self.high_demand_zones)
        if nearest_h is not None:
            time_h = dist_h * self.coeffs['grid_to_min']
            
            u_hd = (self.coeffs['beta_trips_sh'] * trips +
                    self.coeffs['beta_time_to_hd'] * time_h)
        else:
            u_hd = -999.0

        # 4. Utility of Cruising (C)
        # 基础效用设为 0 (MNL 基准)。
        # 论文指出熟悉度高会降低 Cruising (倾向 Wait)。假设司机都熟悉当前区域(1)。
        is_familiar = 1 
        u_cruise = self.coeffs['beta_familiarity_c'] * is_familiar
        
        return {
            'Wait': u_wait,
            'Surge': u_surge,
            'HighDemand': u_hd,
            'Cruise': u_cruise
        }, {'Surge': nearest_s, 'HighDemand': nearest_h}

    def get_relocation_decision(self, agent_state, global_state):
        """
        返回具体的决策动作和目标位置
        """
        utils, targets = self.calculate_relocation_utilities(agent_state, global_state)
        
        # 计算概率 (Softmax)
        u_vec = np.array(list(utils.values()))
        exp_u = np.exp(u_vec)
        probs = exp_u / np.sum(exp_u)
        
        choices = list(utils.keys())
        choice = np.random.choice(choices, p=probs)
        
        target_loc = None
        if choice == 'Wait':
            target_loc = agent_state['location'] # 原地
        elif choice == 'Surge':
            target_loc = targets['Surge']
        elif choice == 'HighDemand':
            target_loc = targets['HighDemand']
        elif choice == 'Cruise':
            # 随机漫游：选择一个相邻网格
            # 这里简单模拟：随机选一个邻居
            target_loc = self._get_random_neighbor(agent_state['location'])
            
        return choice, target_loc, dict(zip(choices, probs))

    def _get_random_neighbor(self, loc_id):
        # 简单实现：返回 loc_id + 1 或 -1 (需处理边界)
        # 实际项目中应使用 grid 的 get_neighbors 方法
        return (loc_id + 1) % (self.grid_size**2)

In [45]:
# 假设已经实例化了两个计算器
charging_calculator = ChargingProbabilityCalculator(grid_size)
relocation_manager = RelocationManager(grid_size)

def process_driver_step(driver, grid_env):
    """
    处理单个司机在当前时间步的完整决策流程
    """
    
    # --- 步骤 1: 准备状态数据 ---
    current_soc = driver.soc
    loc = driver.location
    # 模拟环境状态
    global_state = {
        'current_surge_price': grid_env.current_surge_price, # e.g., $3.0
        'is_weekend': 0 # e.g., 0 for Weekday
    }
    
    driver_state = {
        'location': loc,
        'completed_trips': driver.trips_today,
        'current_wait_time': driver.idle_time
    }

    # --- 步骤 2: 充电决策 (Upper Level in Charging Model) ---
    # 获取充电站信息
    stations = grid_env.get_charging_stations()
    
    # 计算充电概率 [cite: 391, 401]
    charge_probs = charging_calculator.calculate_probabilities(
        origin_id=loc, 
        dest_id=loc, # 闲置时，终点暂定为当前点或预测点
        current_soc=current_soc, 
        charging_stations=stations
    )
    print("充电决策概率:", charge_probs)
    # 做出充电动作决策
    is_charging = np.random.random() < charge_probs['action_charge']
    
    if is_charging:
        # --> 进入充电流程
        # 根据下层概率选择具体充电站
        station_ids = list(charge_probs['station_probs'].keys())
        station_ps = list(charge_probs['station_probs'].values())
        # 归一化概率 (因为 station_probs 之和 = action_charge)
        norm_ps = [p / charge_probs['action_charge'] for p in station_ps]
        
        target_station = np.random.choice(station_ids, p=norm_ps)
        
        return {
            'status': 'charging',
            'target': target_station,
            'info': 'Heading to Charging Station'
        }
        
    else:
        # --- 步骤 3: 重定位决策 (Relocation Model) ---
        # 司机决定不充电，现在决定去哪里寻找乘客 [cite: 797, 936]
        
        # 确保 Zone 信息是最新的
        relocation_manager.update_zone_info(
            surge_ids=grid_env.get_surge_zones(),
            hd_ids=grid_env.get_high_demand_zones(),
            city_center_ids=grid_env.get_city_center_zones()
        )
        
        action, target_loc, probs = relocation_manager.get_relocation_decision(
            driver_state, global_state
        )
        
        # 更新司机状态
        if action == 'Wait':
            driver.idle_time += 1 # 增加等待时间，影响下一轮决策
        else:
            driver.idle_time = 0 # 移动会重置等待时间感受
            
        return {
            'status': 'relocating',
            'action': action,
            'target': target_loc,
            'probs': probs, # 用于日志分析
            'info': f'Decided to {action}'
        }

# --- 使用示例 ---
class MockDriver:
    def __init__(self):
        self.soc = 80       # SOC 较低，可能倾向于充电
        self.location = 50
        self.trips_today = 8  # 经验较多，更倾向于去 Surge/HD 
        self.idle_time = 10   # 已经等了一会了，可能倾向于移动 

class MockEnv:
    def __init__(self):
        self.current_surge_price = 4.5
        self.get_charging_stations = lambda: [{'id': 10, 'estimated_time': 20}]
        self.get_surge_zones = lambda: [100, 101, 115] # 远处的 Surge 区域
        self.get_high_demand_zones = lambda: [55, 56]  # 较近的高需求区域
        self.get_city_center_zones = lambda: [50, 51, 65, 66] # 当前就在市中心

# 运行
driver = MockDriver()
env = MockEnv()
decision = process_driver_step(driver, env)
print(decision)

充电决策概率: {'action_no_charge': 0.9950878925408455, 'action_charge': 0.004912107459154562, 'station_probs': {10: 0.004912107459154562}}
{'status': 'relocating', 'action': 'Cruise', 'target': 51, 'probs': {'Wait': 0.17763200418576916, 'Surge': 0.4928545095787014, 'HighDemand': 0.2113919552491005, 'Cruise': 0.11812153098642897}, 'info': 'Decided to Cruise'}
