In [1]:
import numpy as np
import random
import pandas as pd
import time as T
from math import floor
import tkinter as tk
from tkinter import ttk
from PIL import Image, ImageTk
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import (
    FigureCanvasTkAgg, NavigationToolbar2Tk)
# Implement the default Matplotlib key bindings.

from matplotlib.backend_bases import key_press_handler
from matplotlib.figure import Figure
from matplotlib.backends import _backend_tk
from matplotlib.backends.backend_agg import FigureCanvasAgg
import matplotlib.gridspec as gridspec

## 1. 车辆与道路

### 1.1 车辆Object与道路网格初始化

In [2]:
# 定义车辆，包含Cell是否存在车辆(exist)、是否自动驾驶车辆(auto)、速度（speed）以及行驶方向（direct）
class car(object):
    def __init__(self, exist = 0, auto = 0, speed = 0, accel = 0, direct = "S"):
        self.exist = exist
        self.auto = auto
        self.speed = speed
        self.accel = accel
        self.direct = direct


# 创建道路网格
def make2DArray(cols, rows):
    arr = np.ndarray((cols, rows), dtype=np.object)
    for i in range(0, cols):
        for j in range(0, rows): 
            arr[i][j] = car() 
    return arr

def generateUpdateArray(cols, rows):
    arr = np.zeros((cols, rows))
    return arr


### 1.2 车流规律

In [3]:
# 车流抵达规律（暂时不用）
def rule_arrival(grid):

    lane_oneside = int(lane_number / 2)
    
    for i in range(0, lane_number): # 左侧（南向）车流
        if i < lane_oneside:
            d_current, yf = check_forward((i, 0))
            if d_current >= (vm * dt + car_size) and grid[i][0].exist != 1:
                arrival = np.random.poisson(lam_S / 3600 / lane_oneside * dt, 1)
                grid[i][0].exist = min(1, arrival) # 单位时间每个车道抵达车辆数目为0或1
                grid[i][0].direct = "S"
                if grid[i][0].exist == 1:
                    grid[i][0].auto = generate_auto(ratio_auto)
                    grid[i][0].speed = int(random.gauss(vm, dv)) # 初始速度
        
    return grid


# 车流生成规律
def rule_generate(grid):

    n_cell = grid.shape[0] * grid.shape[1]
    n_car = road_density * road_length / 1000  * 2 # two side
    spacing_ini = (n_cell / n_car) - 1 # 初始间距，经验值
    
    if spacing_ini <= 1:
        raise ValueError('The density setting is too high. Please lower the density.') 
        
    auto_choices = [0, 1]
    p_auto_choices = [1 - ratio_auto, ratio_auto]
    count_car = 0
    
    p_auto = [ratio_auto, 1 - ratio_auto]
    choice_auto = [1, 0]
    
    for i in range(0, lane_number): 
        x = i
        j = 0
        count_car = 0

        while count_car < int(n_car / lane_number):    
            y = int(spacing_ini * j)
            grid[x][y].exist = 1
            grid[x][y].auto = np.random.choice(choice_auto, 1, p = p_auto)[0]
            grid[x][y].speed = int(random.gauss(vm, dv)) # 初始速度
            
            j += 1
            count_car += 1
    return grid


# 车流前进规律
def rule_forward(grid):
                    
    total_speed = 0
    total_traffic = 0
    
    for i in range(0, lane_number): 
        x = i
        for j in range(0, rows):
            y = rows - j - 1 # 自下而上更新

            if grid[x][y].exist == 1 and update[x][y] == 0: # 当前Cell有车，并且未更新

                d_current, yf = check_forward((x, y)) # 计算与前车之间的距离, 以及前车的位置

                # 根据当前车辆是否自动驾驶，计算安全距离
                if grid[x][y].auto == 0:
                    d_safe = grid[x][y].speed * th
                else:
                    d_safe = grid[x][y].speed * ta

                # 根据安全距离，计算前进的距离
                if (d_current - grid[x][y].speed * dt - car_size) >= d_safe:
                    displacement = int(grid[x][y].speed * dt)
                else:
                    displacement = min(int(grid[x][y].speed * dt), d_current - car_size)

                # 将车辆更新至新的Cell(注意越过边界再返回的处理)
                if (y + displacement) >= rows:
                    y_new = y + displacement - rows
                else:
                    y_new = y + displacement

                grid[x][y_new] = update_cell(grid[x][y], grid[x][yf], d_safe, d_current)
                update[x][y_new] = 1

                # 顺路作统计
                total_speed += grid[x][y_new].speed
                total_traffic += 1


                # 如果发生了实际的移动，将当前车辆注销
                if displacement != 0:
                    grid[x][y].exist = 0
        
    return grid, total_speed, total_traffic

# 变道规律
def rule_changeLane(grid):

    for i in range(0, lane_number): 
        x = i
        for j in range(0, rows):
            y = rows - j - 1 # 自下而上更新
            if grid[x][y].exist == 1:

                left, right = check_changability((x, y))  # 检查左右两侧是否可变道

                if left and right: # 左右皆可变道
                    directs = [-1, 0, 1]
                    ps = [pc / 2, 1 - pc, pc /2]

                elif left and (not right): # 仅左侧可变道
                    directs = [-1, 0]
                    ps = [pc, 1 - pc]

                elif (not left) and right: # 仅右侧可变道
                    directs = [1, 0]
                    ps = [pc, 1 - pc]

                else: # 不可变道
                    directs = [0]
                    ps = [1]

                dx = np.random.choice(directs, 1, p=ps)
                x_new = x + dx[0]
                grid[x_new][y] = copy_cell(grid[x][y])

                # 如果确实发生变道，将当前Cell注销
                if dx[0] != 0:
                    grid[x][y].exist = 0
    
    return grid

### 1.3 辅助功能

In [4]:
# 生成自动驾驶车辆
def generate_auto(ratio_auto):
    inter_auto = 2 - 1/ratio_auto
    auto = floor(random.uniform(inter_auto,2))
    if auto < 0:
        auto = int(0)
        
    return auto

# 复制Cell，用于变道
def copy_cell(car_to_copy):
    car_new = car()
    car_new.exist = car_to_copy.exist
    car_new.auto = car_to_copy.auto
    car_new.speed = car_to_copy.speed
    car_new.accel = car_to_copy.accel
    car_new.direct = car_to_copy.direct
    
    return car_new

# 用于前进时更新Cell
def update_cell(car_self, car_ahead, d_safe, d_current):
    car_new = car()
    car_new.exist = 1
    car_new.auto = car_self.auto
    car_new.direct = car_self.direct
    
    # 根据现车加速度更新速度
    if car_self.accel >= 0:
        car_new.speed = min(car_self.speed + car_self.accel * dt, vmax)
    else:
        car_new.speed = max(car_self.speed + car_self.accel * dt, vmin)
    
    # 如果前方无车，则设置前车参考速度为vmax, 参考加速度为0
    if car_ahead.exist == 0:
        car_ahead.speed == vmax
        car_ahead.accel = 0
    
    # 根据现车与前车是否自动驾驶，更新加速度
    if car_self.auto == 0: # 现车为人工驾驶
        car_new.accel = alpha * (d_current - d_safe)
    else: # 现车为自动驾驶
        if car_ahead.auto == 0: # 前车为人工驾驶
            car_new.accel = alpha * (d_current - d_safe) + beta * (car_ahead.speed - car_self.speed)
        else: # 前车为自动驾驶
            car_new.accel = alpha * (d_current - d_safe) + beta * (car_ahead.speed - car_self.speed) + gamma * car_ahead.accel
    
    return car_new

# 探测车辆前方是否有车，并返回与前车之间的距离
def check_forward(position):
    x = position[0]
    y = position[1]
    
    ymax = y + max_forward + 1 # 往前探测的最大距离
    d_current = max_forward
    
    # 往前探测到最大可前进距离
    for yf in range(y + 1, ymax):            
        if yf >= rows:
            yf_temp = yf - rows
        else:
            yf_temp = yf
        # 如果存在前车    
        if grid[x][yf_temp].exist == 1:
            d_current = max(car_size, yf - y) # current distance, which can not be smaller than the car size
            return d_current, yf_temp
    
    return d_current, yf_temp

# 探测车辆是否具备变道条件，并返回可变道的方向
def check_changability(position):
    x = position[0]
    y = position[1]
    left = 0
    right = 0
    
    d_current, yf = check_forward(position)
    
    # 根据当前车辆是否自动驾驶，计算安全距离
    if grid[x][y].auto == 0:
        d_safe = grid[x][y].speed * th
    else:
        d_safe = grid[x][y].speed * ta
    
    cond1 = (d_current < d_safe) # 当前跟车距离小于安全距离
    cond2 = ((x - 1) >= 0) # 左侧未超出边界
    cond3 = ((x + 1) <= int((lane_number / 2) - 1)) # 右侧未超出边界
    
    if cond1: # 当前前方安全距离过小，需要变道
        if cond2: 
            if grid[x - 1][y].exist == 0: # 左侧无车
                safe = check_safe((x - 1, y), grid[x][y].speed)
                if safe: # 符合安全距离
                    left = 1
        if cond3:
            if grid[x + 1][y].exist == 0: # 右侧无车
                safe = check_safe((x + 1, y), grid[x][y].speed)
                if safe:
                    right = 1
    
    return left, right

# 探测前后方车辆是否符合安全距离，返回符合与否
def check_safe(position, speed):
    x = position[0]
    y = position[1]
    
    safe = 1 # 1 means safe
    safe_margin = 0
    
    # 根据当前车辆是否自动驾驶，计算与前车安全距离
    if grid[x][y].auto == 0:
        d_safe_f = speed * th + safe_margin
    else:
        d_safe_f = speed * ta + safe_margin
    
    ymax = y + int(d_safe_f) # 往前探测 d_safe_f
    ymin = y - int(speed * th) # 往后探测 d_safe_b
    
    # 往前探测
    for yf in range(y + 1, ymax):
        # 如果越过道路边界
        if yf >= rows:
            yf = yf - rows
        # 如果存在前车
        if grid[x][yf].exist == 1:
            safe = 0 # not safe
            return safe
    
    # 往后探测
    for yb in range(ymin, y):
        if yb < 0:
            yb = rows + yb
        if grid[x][yb].exist == 1:
            
            # 先找到一辆人工驾驶的后车
            if grid[x][yb].auto == 0:
                d_safe_b = grid[x][yb].speed * th + safe_margin # 计算与后车安全距离（与后车安全距离取决于后车是否自动驾驶）
                if abs(yb - y) < d_safe_b:
                    safe = 0 # not safe
                    return safe
                
            # 先找到一辆人工驾驶的后车
            if grid[x][yb].auto == 1:
                d_safe_b = grid[x][yb].speed * ta + safe_margin
                if abs(yb - y) < d_safe_b:
                    safe = 0 # not safe
                    return safe
                
    return safe

## 2. 可视化

In [5]:
# 绘制车辆与道路
def draw(canvas, grid):
    # 更新窗口
    canvas.delete("all")
    
    # 绘制道路网格
    y_offset = 20
    dx = 0.2 * w_cell
    
    lane_oneside = int(lane_number / 2)

    for i in range(0, cols):
        for j in range(0, rows):
            if grid[i][j].exist == 1:
                x = i * w_cell + w_cell
                
                if grid[i][j].auto == 1: # 采用颜色区分自动驾驶与人工驾驶车辆
                    if i < lane_oneside:
                        y = j * h_cell + h_cell + y_offset
                        car_image = dict_car_images['auto_L']
                    else:
                        y = (rows - j) * h_cell + h_cell + y_offset
                        car_image = dict_car_images['auto_R']
                else:
                    if i < lane_oneside:
                        y = j * h_cell + h_cell + y_offset
                        car_image = dict_car_images['human_L']
                    else:
                        y = (rows - j) * h_cell + h_cell + y_offset
                        car_image = dict_car_images['human_R']               
                

                canvas.create_image(x + dx, y - 2.5 * h_cell, image=car_image, anchor = 'nw')
    
    currentTime = 'Time: ' + str(round(step * dt, 2)) + ' seconds' # 记录时间步
    canvas.create_text(w_cell, y_offset/2, text=currentTime, anchor = 'w')
    
    xc = w_cell * cols/2 + w_cell # 绘制马路中线
    yl_1 = h_cell + y_offset
    yl_2 = h_cell * (rows + 1) + y_offset
    delta = w_cell / 20
    canvas.create_line(xc + delta, yl_1, xc + delta, yl_2, fill = 'orange', width = w_cell / 20) # 绘制马路中线
    canvas.create_line(xc - delta, yl_1, xc - delta, yl_2, fill = 'orange', width = w_cell / 20) 
    canvas.create_line(xc, yl_1, xc, yl_2, fill = 'white', width = w_cell / 20) 
    
    len_dash1 = max(int(w_cell / 6), 1) # 绘制车道线
    len_dash2 = max(int(w_cell / 7), 1)
    for i in range(0, cols + 1): 
        xi = i * w_cell + w_cell
        if i == 0 or i == cols:
            canvas.create_line(xi, yl_1, xi, yl_2, fill = 'black')
        elif xi != xc:
            canvas.create_line(xi, yl_1, xi, yl_2, fill = 'black', dash=(len_dash1, len_dash2))
    
    
    # 绘制显示数据
    y = [average_speed, time_passing, flow, density]
    y_label = ['m/s', 's', 'veh/h', 'veh/km']
    titles = ['Average Speed', 'Passing Time', 'Flow', 'Density']
    for i in range(len(y)):
        lines[i].set_xdata(time) # 采用set_xdata, set_ydata可以避免重复生成figure, 从而避免占用的内存无限扩大
        lines[i].set_ydata(y[i])
        axs[i].set_xlim(min(time), max(time))
        axs[i].set_ylim(min(y[i]), max(y[i]) * 1.1)
        title_new = titles[i] + ': ' + str(round(y[i][-1],2)) + ' ' + y_label[i]
        axs[i].set_title(title_new)
        axs[i].relim()
        axs[i].autoscale_view(True, True, True)
        
    fig.canvas.draw()
    
    canvas.pack()


# 退出可视化
def _quit():
    root.quit()     # stops mainloop
    root.destroy() 
 

def init_everything():
    global step, grid, average_speed, time_passing, density, flow, time, update
    step = 0
    grid = make2DArray(cols, rows)
    time = [0]
    average_speed = [0]
    time_passing = [0]
    flow = [0]
    density = [0]
    update = generateUpdateArray(cols, rows)

def average(data, steps_trace_back):
    step = len(data)
    start = step - steps_trace_back
    data_len = steps_trace_back + 1
    
    recent_average = sum(data[start:]) / data_len
    
    return recent_average
    

def check_stable():
    
    min_step = 50
    stable_threshold = 0.005
    
    stable = False
    if step > min_step:
        if abs((average_speed[step] - average_speed[step-min_step])/average_speed[step]) < stable_threshold:
            stable = True
    return stable

def get_new_record():
    min_step = 50
    new_record = [average(density, min_step), \
                  average(average_speed, min_step), \
                  average(time_passing, min_step), \
                  average(flow, min_step)]
    
    return new_record

def export_data(output):
    # Write data to data frame, then to CSV file.
    file_name = "auto_%.2f_lane_%d_%s_%s.csv" % (ratio_auto, lane_number, str(T.strftime("%Y-%m-%d")), 
                               str(T.strftime("%H%M%S")))
    columns = ["density", "average_speed", "time_passing", "flow"]
    pd.DataFrame(output, columns = columns).drop_duplicates().to_csv(
        'output/' + file_name, index = False, encoding = "UTF-8")


# 用于实时更新图像的函数
def draw_after():
    
    global step, grid, flow_count, average_flow, average_speed, time, update, road_density, count_loop
    global draw_handle
    
    stable = False
    step += 1
    new_time = step * dt
    time.append(new_time)
    update = generateUpdateArray(cols, rows) # 设置所有Cell为未更新
    
    # 汽车生成
    if step == 1 and (not stable):
        grid = rule_generate(grid)
    
    # 变道
    grid = rule_changeLane(grid)
    
    # 前进
    grid, total_speed, total_traffic = rule_forward(grid)
    
    # 统计参数
    #total_traffic = max(total_traffic, 1)
    average_speed.append(total_speed / total_traffic) # m/s
    time_passing.append(road_length / average_speed[-1]) # s
    density.append((total_traffic / 2) / road_length * 1000) # veh/km
    flow.append(density[-1] *  average_speed[-1] / 1000 * 3600) # veh/h
 

    draw(canvas_road, grid)
    draw_handle = canvas_road.after(int(dt * 1000), draw_after)  # canvas.after的时间单位为ms，所以需乘1000
    
    stable = check_stable()
    if stable:
        print('Density: ', road_density)
        print('Time to stable: ', step * dt)
        new_record = get_new_record()
        output.append(new_record)
        init_everything()
        count_loop += 1
        if count_loop < len(road_densities):
            road_density = road_densities[count_loop]
        else:
            export_data(output)
            #root.after_cancel(draw_handle)
            root.after(100, root.destroy)

## 3. 主函数

In [6]:
if __name__ == "__main__":
    global step, grid, average_speed, time_passing, density, flow, time, update, output, road_density, count_loop
    global draw_handle
    step = 0 # 运行总步长，全局变量
    
# 以下参数为研究中有必要调整的参数
##########################################################    
    # 1. 参数定义
    # 1.1 道路参数
    lane_number = 6 # 双向车道数
    road_length = 2000 # 道路长度，单位m
    road_densities = list(range(100, 500, 20)) # veh/km, a list of densities to study
    road_density = road_densities[0] # veh/km
    count_loop = 0
    
    # 1.2 汽车参数
    vmax = 30 # 最大车速，单位m/s，~100km/h
    vmin = 0
    vm = 20 # 初始平均车速，单位m/s
    dv = 6 # 初始车速标准差，单位m/s
#     lam_S = 12000 # 南向（左侧）车流到达率, veh / hr
#     lam_N = 0.3 # 北向（右侧）车流到达率
    ratio_auto = 0.3 # 自动驾驶汽车比例
    car_size = 5 # 汽车长度， 5m
    
    # 1.3 跟车参数
    th = 2 # 驾驶员反应时间
    ta = 1 # 自动驾驶车辆反应时间
    max_forward = 100 # 车子最远受到其前方100m处前车的影响 
    alpha = 1 # 加速度更新，距离参数
    beta = 1 # 加速度更新，速度参数
    gamma = 1 # 加速度更新，加速度参数
    
    # 1.4 变道参数
    pc = 0.5 # 变道概率
    
    # 1.5 时间步
    dt = 0.2 # 定义每个时间步对应真实等待时长，单位s
#########################################################   
    

    # 1.5 显示参数
    h_cell = 6 # Cell的高度，代表1m
    w_cell = 4 * h_cell # Cell的宽度, 代表4m (实际车道宽3.7m)
    road_w_gui = (lane_number + 2) * w_cell  # 显示宽度
    road_h_gui = 600 # 显示长度
    road_h_gui_scroll = road_length * h_cell + 100
    data_w_gui = 500
    
    # 2. 实体定义
    # 2.1 路网
    cols = lane_number # 路网矩阵列数
    rows = int(road_length) # 路网矩阵行数
    grid = make2DArray(cols, rows) # 路网，全局变量
    update = generateUpdateArray(cols, rows) # 与路网一一对应，判断当次时间步是否已更新
    
    # 2.2 界面
    root = tk.Tk()
    canvas_r = tk.Canvas(root, width=road_w_gui, height = road_h_gui)
    canvas_r.pack(side = 'left')
    
    scrollbar = ttk.Scrollbar(root, orient="vertical", command=canvas_r.yview)
    scrollable_frame = ttk.Frame(canvas_r)

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas_r.configure(
            scrollregion=canvas_r.bbox("all")
        )
    )

    canvas_r.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas_r.configure(yscrollcommand=scrollbar.set)
    canvas_road = tk.Canvas(scrollable_frame, width=road_w_gui, height=road_h_gui_scroll)
    canvas_road.pack()
    scrollbar.place(x = road_w_gui - 15, y = 0, relheight=1)
    
    car_w = int(2 / 3 * w_cell) # 汽车尺寸，单位m
    car_h = int(car_size * h_cell) # 汽车尺寸，单位m
    images = ['auto_L', 'auto_R', 'human_L','human_R']
    dict_car_images = dict()
    for image in images:
        car_image = ImageTk.PhotoImage(Image.open('image/' + image + '.png').resize((car_w, car_h))) # 调用车辆图片
        dict_car_images[image] =  car_image
    
    # 3. 统计参数
    # 3.1 统计参数初始化
    time = [0]
    average_speed = [0]
    time_passing = [0]
    flow = [0]
    density = [0]
    
    # 3.2 统计参数实时显示
    fig = Figure(figsize=(6, 8), dpi=80)
    gs1 = gridspec.GridSpec(4, 1)
    gs1.update(wspace=0.1, hspace=0.5)
    axs = list()
    lines = list()
    txts = list()
    for i in range(4):
        axs.append(fig.add_subplot(gs1[i]))
        line, = axs[i].plot([0], [0])
        lines.append(line)
        
    axs[3].set_xlabel('Time (s)')
    
    canvas_data = FigureCanvasTkAgg(fig, master=root)  # A tk.DrawingArea.
    canvas_data.draw()
    canvas_data.get_tk_widget().pack()
    
    button = tk.Button(master=root, text="Quit", command=_quit)
    button.pack()
    
    # 4. 开始运行并动态显示
    output = list()
    draw_after()

    tk.mainloop()
