## 柱状图

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
%config InlineBackend.figure_format = 'svg'
plt.style.use('default')  # 设置画图的风格
np.set_printoptions(suppress=True)
import gym
import numpy as np
import pandas as pd
import shap
import torch
import torch.nn as nn
from stable_baselines3 import DQN
import highway_env


def calculate_decision_change(mean=0, std=5, feature=1, obs=None):
    obs_noise = torch.clone(obs)
    for i in range(obs.shape[0]):
        noise = torch.normal(mean, std, size=(1, 1))
        obs_noise[i, feature] = obs[i, feature] + noise
    change_act = 0
    for i in range(obs.shape[0]):
        act, _ = model.predict(obs[i:i + 1, :], deterministic=True)
        act_noise, _ = model.predict(obs_noise[i:i + 1, :], deterministic=True)
        if act != act_noise:
            change_act = change_act + 1
    return change_act


# 配置参数
shap.initjs()
# TRAIN = False
Learn = True
version = 0
file_path_model = r'.\trained_model' + "\model_lambda=%d" % (version)
file_path_log = 'dqn_attri/' + "log_%d/" % (version)
data_path = 'dqn_attri/' + "data_%d.pth" % (version)

env = gym.make("highway-v0")  # "highway-fast-v0"是一个环境ID，定义为env
env.config["real_time_rendering"] = True
env.config["lanes_count"] = 1
env.config["vehicles_count"] = 2
env.config["vehicles_density"] = 1  #这个改动了
env.config["collision_reward"] = -1
env.config["right_lane_reward"] = 0
env.config["reward_speed_range"] = [15, 30]
env.config["lane_change_reward"] = 0
env.config["high_speed_reward"] = 0.2
# env.config["manual_control"] = True
env.config["simulation_frequency"] = 15
env.config["duration"] = 40
env.config["show_trajectories"] = False
env.config.update({
    "observation": {
        "type": "Kinematics"
    },
    "action": {
        "type": "DiscreteMetaAction",
        "longitudinal": True,  # 只有纵向动作
        "lateral": False
    }
})
env.reset()
env.seed(1)
# %% 测试部分
import statistics

data_value_list = []
data_deviation_list = []
right_number = 0
Total_number_step = 0  # 创建一个空的list   存储Tensor的容器  通过list.append存储tensor
version_list = [11, 14, 10, 16]
std_list = [2, 3, 5, 6]
for std in std_list:
    mean_value_list = []
    std_deviation_list = []
    for version in version_list:
        file_path_model = r'X:\project\PythonCode\ExplainGroup\DQN特征归因\有伪变量的情况\dqn_attri' + "\model_%d" % (version)
        model = DQN.load(file_path_model, env=env)
        calculate_decision = []
        for videos in range(10):  #每个range 为 40
            done = False
            Tensor_ini = []
            Action_ini = []
            obs = env.reset()
            while not done:
                obs_tensor = torch.from_numpy(obs)
                Tensor_ini.append(obs_tensor.squeeze())
                obs_flatten_tensor = torch.flatten(
                    obs_tensor)  # 原来是（7*2），但是神经网络拟合是3个输入状态（从q_net可以看出来）。一定用flatten函数把原来状态展开成1维
                action, _states = model.predict(obs, deterministic=True)
                obs, reward, done, info = env.step(action.item())
                # print(obs)
                Action_ini.append(action)
                # env.render()
            decision_change = calculate_decision_change(mean=0, std=std, feature=1, obs=torch.stack(Tensor_ini))
            calculate_decision.append(decision_change)
        mean_value = statistics.mean(calculate_decision)
        std_deviation = statistics.stdev(calculate_decision)
        mean_value_list.append(mean_value)
        std_deviation_list.append(std_deviation)
    data_value_list.append(np.array(mean_value_list))
    data_deviation_list.append(np.array(std_deviation_list))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

labels = ['DQN', 'DQN with \n SVAP  (0.5)', 'DQN with \n SVAP  (2)' , 'DQN with \n SVAP  (1000)']
data = np.array(data_value_list)
deviation = np.array(data_deviation_list)
# data3 = np.array(data_deviation_list)
# data4 = np.array(mean_value_list)


In [None]:
# plt.subplots(figsize=(8, 6))
# 设置柱状图参数
bar_width = 0.2
x_pos1 = np.arange(len(labels))
x_pos2 = [x + bar_width for x in x_pos1]
x_pos3 = [x + bar_width * 2 for x in x_pos1]
x_pos4 = [x + bar_width * 3 for x in x_pos1]
# errors1 = np.array(data_deviation_list)
color_palette = sns.color_palette('colorblind', n_colors=4)
# color_palette = plt.cm.get_cmap('Set1', 4)
# # 绘制柱状图
# rects1=plt.bar(x_pos1, data[0,:], width=bar_width, color=color_palette[0], yerr=deviation[0,:]/10, capsize=6, label='mean:0, std:2')
# rects2=plt.bar(x_pos2, data[1,:], width=bar_width, color=color_palette[1], yerr=deviation[1,:]/10, capsize=6, label='mean:0, std:3')
# rects3=plt.bar(x_pos3, data[2,:], width=bar_width, color=color_palette[2], yerr=deviation[2,:]/10, capsize=6, label='mean:0, std:5')
# rects4=plt.bar(x_pos4, data[3,:], width=bar_width, color=color_palette[3], yerr=deviation[3,:]/10, capsize=6, label='mean:0, std:6')
colors = sns.color_palette("Blues", n_colors=20)
rects1=plt.bar(x_pos1, data[0,:], width=bar_width,  yerr=deviation[0,:]/10,color= colors[6], capsize=6, label='mean:0, std:2')
rects2=plt.bar(x_pos2, data[1,:], width=bar_width,  yerr=deviation[1,:]/10,color= colors[10], capsize=6, label='mean:0, std:3')
rects3=plt.bar(x_pos3, data[2,:], width=bar_width,  yerr=deviation[2,:]/10,color= colors[15], capsize=6, label='mean:0, std:5')
rects4=plt.bar(x_pos4, data[3,:], width=bar_width,  yerr=deviation[3,:]/10,color= colors[19], capsize=6, label='mean:0, std:6')

# rects1=plt.bar(x_pos1, data[0,:], width=bar_width, color=color_palette(0), yerr=deviation[0,:]/10, capsize=6, label='mean:0, std:2')
# rects2=plt.bar(x_pos2, data[1,:], width=bar_width, color=color_palette(1), yerr=deviation[1,:]/10, capsize=6, label='mean:0, std:3')
# rects3=plt.bar(x_pos3, data[2,:], width=bar_width, color=color_palette(2), yerr=deviation[2,:]/10, capsize=6, label='mean:0, std:5')
# rects4=plt.bar(x_pos4, data[3,:], width=bar_width, color=color_palette(3), yerr=deviation[3,:]/10, capsize=6, label='mean:0, std:6')
# for i, v in enumerate(data.flatten()):
#     plt.text(i, v + deviation.flatten()[i] + 0.5, str(v), ha='center', fontsize=10)
for rects in [rects1, rects2, rects3, rects4]:
    for rect in rects:
        height = rect.get_height()
        plt.annotate('{:.1f}'.format(round(height,1)),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 5),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom', fontsize=10)
# plt.bar(x_pos1, data[0,:], width=bar_width,  yerr=deviation[0,:]/10, capsize=6, label='mean:0, std:2')
# plt.bar(x_pos2, data[1,:], width=bar_width,  yerr=deviation[1,:]/10, capsize=6, label='mean:0, std:3')
# plt.bar(x_pos3, data[2,:], width=bar_width,  yerr=deviation[2,:]/10, capsize=6, label='mean:0, std:5')
# plt.bar(x_pos4, data[3,:], width=bar_width,  yerr=deviation[3,:]/10, capsize=6, label='mean:0, std:6')
# plt.legend(fontsize = 14)
# plt.legend(loc='upper center', ncol=4, fontsize=14)
plt.ylabel('Number of decision changes', fontsize = 16)
plt.tick_params(axis='both', labelcolor='black', labelsize=14)
# plt.grid()
# plt.title('Multiple Indicators Bar Chart')
plt.xticks([x + bar_width for x in range(len(labels))], labels,fontsize = 16)
plt.show()