In [1]:
import numpy as np
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '4'
import cv2  # OpenCV 用於處理圖片
import matplotlib.pyplot as plt
import tensorflow as tf

from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.agents.dqn import dqn_agent
from tf_agents.networks import q_network
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.drivers import dynamic_step_driver
from tf_agents.utils import common
from tf_agents.policies import random_tf_policy
from tf_agents.environments import tf_py_environment
from tf_agents.environments.wrappers import FlattenActionWrapper


2024-07-17 15:19:42.514875: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-17 15:19:42.705640: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-17 15:19:42.929642: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-17 15:19:43.179611: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-17 15:19:43.181037: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1442] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-17 15:19:43.490332: I tensorflow/core/platform/cpu_feature_guard.cc:

In [2]:
print(tf.__version__)

2.16.2


In [2]:
# v1
class MapDroneEnv(py_environment.PyEnvironment):
    def __init__(self, map_image_path):
        self.map_image = cv2.imread(map_image_path)  # 讀取地圖圖片
        self.map_height, self.map_width, _ = self.map_image.shape

        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=3, name='action')  # 動作：上、下、左、右

        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(2,), dtype=np.int32, minimum=0, maximum=max(self.map_height, self.map_width), name='observation')  # 狀態：無人機座標

        self._state = np.array([self.map_height // 2, self.map_width // 2], dtype=np.int32)  # 初始位置在圖片中心
        self.goal_position = np.array([955, 317]) 
    
    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = np.array([self.map_height // 2, self.map_width // 2], dtype=np.int32)
        return ts.restart(self._state)

    def _discretize_action(self, action):
        """將離散動作值轉換為連續動作向量"""
        x_idx = action // 3  # 計算 x 維度的區間索引
        y_idx = action % 3   # 計算 y 維度的區間索引

        x_bins = [-1, -0.33, 0.33, 1]
        y_bins = [-1, -0.33, 0.33, 1]
        
        # 限制索引範圍，以確保不會超過陣列邊界
        x_idx = min(x_idx, len(x_bins) - 2)  
        y_idx = min(y_idx, len(y_bins) - 2)
        
        x_val = (x_bins[x_idx] + x_bins[x_idx + 1]) / 2  # 取區間中點
        y_val = (y_bins[y_idx] + y_bins[y_idx + 1]) / 2

        return np.array([x_val, y_val]).round().astype(np.int32)
    
    def _step(self, action):
        continuous_action = self._discretize_action(action)  # 轉換為連續動作

        # 使用 continuous_action 更新位置
        move = continuous_action.round().astype(np.int32)
        self._state = (self._state + move).astype(np.int32)

        # Clip the state to stay within the map boundaries
        self._state = np.clip(self._state, 0, [self.map_height - 1, self.map_width - 1]).astype(np.int32)

        reward = self._compute_reward()
        done = np.array_equal(self._state, self.goal_position)

        if done:
              return ts.termination(self._state, reward)
        else:
              return ts.transition(self._state.astype(np.int32), reward, discount=1.0)
    # reward function
    def _compute_reward(self):
        # 計算獎勵，例如根據與目標的距離給予獎勵
        distance = np.linalg.norm(self._state - self.goal_position)
        return -distance  # 距離越近，獎勵越高

    def render(self):
        # 可選：渲染環境，例如在圖片上標記無人機和目標位置
        pass

In [3]:
map_image_path = 'ccu_map.png'  # 請替換為您的地圖圖片路徑
env = MapDroneEnv(map_image_path)
env = tf_py_environment.TFPyEnvironment(env)  # 轉換為 TF 環境
# env = FlattenActionWrapper(env)  # 使用包裝器

# 設定目標位置（例如：中正大學圖書館）
goal_x, goal_y = 955, 317  # 請根據地圖圖片設定目標的像素座標
env.goal_position = np.array([goal_x, goal_y])  # 在這裡設定目標位置

In [5]:
# 建立 DQN 代理
fc_layer_params = (20,)

# 調整 action_spec
updated_action_spec = array_spec.BoundedArraySpec(
      shape=(), dtype=np.int32, minimum=0, maximum=3, name='action')

q_net = q_network.QNetwork(
    env.observation_spec(),
    updated_action_spec, # 使用更新後的 action_spec
    fc_layer_params=fc_layer_params)

class MyAdamOptimizer(tf.keras.optimizers.Adam):
    def _zeros_slot(self, var, slot_name, op_name):
        named_slots = self._slot_dict(slot_name)
        if id(var.ref()) not in named_slots:  # 使用 id(var.ref()) 
            new_slot_variable = slot_creator.create_zeros_slot(var, op_name, copy_xla_sharding=True)
            self._restore_slot_variable(slot_name=slot_name, variable=var, slot_variable=new_slot_variable)

optimizer = MyAdamOptimizer(learning_rate=1e-3)
train_step_counter = tf.Variable(0)

agent = dqn_agent.DqnAgent(
    env.time_step_spec(),
    env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

agent.initialize()

In [6]:
# 建立 Replay Buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=env.batch_size,
    max_length=100000)

In [7]:
# 建立觀察者 (Observer)
replay_buffer_observer = replay_buffer.add_batch

In [8]:
# 建立收集數據的策略
initial_collect_policy = random_tf_policy.RandomTFPolicy(env.time_step_spec(), env.action_spec())

In [9]:
# 收集初始數據
init_driver = dynamic_step_driver.DynamicStepDriver(
    env,
    initial_collect_policy,
    observers=[replay_buffer_observer],
    num_steps=200)  # 或者其他您認為合適的初始步數
init_driver.run()

(TimeStep(
 {'step_type': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>,
  'reward': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-923.06555], dtype=float32)>,
  'discount': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>,
  'observation': <tf.Tensor: shape=(1, 2), dtype=int32, numpy=array([[210, 862]], dtype=int32)>}),
 ())

In [10]:
# 建立訓練的策略
collect_driver = dynamic_step_driver.DynamicStepDriver(
    env,
    agent.collect_policy,
    observers=[replay_buffer_observer],
    num_steps=1)

In [11]:
# 訓練代理
num_iterations = 10  # 迭代次數
batch_size = 32
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3,
    sample_batch_size=batch_size,
    num_steps=2).prefetch(3)
iterator = iter(dataset)

Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=False) instead.


In [12]:
for _ in range(num_iterations):
    collect_driver.run()
    experience, _ = next(iterator)
    train_loss = agent.train(experience).loss
    step = agent.train_step_counter.numpy()

    if step % 1 == 0:
        print('step = {0}: loss = {1}'.format(step, train_loss))

Instructions for updating:
back_prop=False is deprecated. Consider using tf.stop_gradient instead.
Instead of:
results = tf.foldr(fn, elems, back_prop=False)
Use:
results = tf.nest.map_structure(tf.stop_gradient, tf.foldr(fn, elems))
step = 1: loss = 706221.375
step = 2: loss = 717196.375
step = 3: loss = 724086.8125
step = 4: loss = 717335.6875
step = 5: loss = 714578.75
step = 6: loss = 733973.5
step = 7: loss = 737691.5
step = 8: loss = 732823.0625
step = 9: loss = 718734.75
step = 10: loss = 730540.25


In [13]:
# 儲存訓練損失以便視覺化
train_losses = []
episode_rewards = []  # 儲存每個 episode 的總獎勵

In [None]:
for _ in range(num_iterations):
    collect_driver.run()
    experience, _ = next(iterator)
    train_loss = agent.train(experience).loss
    step = agent.train_step_counter.numpy()

    time_step = env.current_time_step()
    episode_reward = 0
    while not time_step.is_last():
        action_step = agent.policy.action(time_step)
        time_step = env.step(action_step.action)
        episode_reward += time_step.reward

    episode_rewards.append(episode_reward)
    train_losses.append(train_loss)  # 將損失添加到列表中
    
    if step % 10 == 0:
        print('step = {0}: loss = {1}'.format(step, train_loss))
    # 平滑獎勵
    df_rewards = pd.DataFrame({'episode': np.arange(len(episode_rewards)), 'reward': episode_rewards})
    window_size = 10  # 移動平均的窗口大小
    df_rewards['smoothed_reward'] = df_rewards['reward'].rolling(window=window_size).mean()

In [None]:
# 繪製損失曲線
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.xlabel('Training Steps')
plt.ylabel('Loss')
plt.title('Training Loss Over Time')

# 繪製獎勵曲線
plt.subplot(1, 2, 2)
plt.plot(df_rewards['episode'], df_rewards['reward'], label='Reward')
plt.plot(df_rewards['episode'], df_rewards['smoothed_reward'], label='Smoothed Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Total Reward per Episode')
plt.legend()

plt.show()

In [None]:
# 繪製策略熱力圖 v1
inner_env = env.pyenv.envs[0]
x_coords = np.arange(inner_env.map_width)
y_coords = np.arange(inner_env.map_height)
X, Y = np.meshgrid(x_coords, y_coords)

actions = np.zeros_like(X)
for i in range(inner_env.map_height):
    for j in range(inner_env.map_width):
        state = np.array([i, j], dtype=np.int32)
        time_step = ts.restart(tf.expand_dims(state, axis=0))  # 在這裡添加 expand_dims
        policy_step = agent.policy.action(time_step)
        actions[i, j] = policy_step.action.numpy()

cmap = plt.cm.get_cmap('viridis', 9)
colors = cmap(actions)

plt.imshow(colors)
plt.title('Policy Heatmap')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.colorbar(label='Action')
plt.show()

In [None]:
from matplotlib.pyplot import get_cmap
import threading

def calculate_action(state, actions, row, col, agent):
    time_step = ts.restart(tf.expand_dims(state, axis=0))
    policy_step = agent.policy.action(time_step)
    actions[row, col] = policy_step.action.numpy()[0]

# 繪製策略熱力圖
inner_env = env.pyenv.envs[0]
x_coords = np.arange(inner_env.map_width)
y_coords = np.arange(inner_env.map_height)
X, Y = np.meshgrid(x_coords, y_coords)

actions = np.zeros_like(X)

threads = []
for i in range(inner_env.map_height):
    for j in range(inner_env.map_width):
        state = np.array([i, j], dtype=np.int32)
        thread = threading.Thread(target=calculate_action, args=(state, actions, i, j, agent))
        threads.append(thread)
        thread.start()

# 等待所有線程完成
for thread in threads:
    thread.join()

cmap = get_cmap('viridis', 9)
colors = cmap(actions)

plt.imshow(colors)
plt.title('Policy Heatmap')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.colorbar(label='Action')
plt.show()

In [None]:
#用來找座標點

import matplotlib.pyplot as plt
import numpy as np

def get_click_coordinates(event):
    x, y = event.xdata, event.ydata
    if event.button == 1:  # 1 代表滑鼠左鍵
        print(f"Clicked coordinates: ({int(x)}, {int(y)})")

# 讀取地圖圖片
img = plt.imread('ccu_map.png')

# 顯示圖片
fig, ax = plt.subplots()
ax.imshow(img)

# 連接滑鼠點擊事件
fig.canvas.mpl_connect('button_press_event', get_click_coordinates)

plt.show()
