# Importing libraries

In [12]:
import numpy as np
import pickle
import os
import sys
import random

## Defining constants

In [11]:
STATE_SIZE = (10,10)
ACTION_SIZE = 4

## Defining loading and saving of files

In [3]:
def load(filename):
    with open(filename, 'rb') as f:
        return pickle.load(f)

In [4]:
def save(filename, Q_table):
    with open(filename, 'wb') as f:
        pickle.dump(Q_table, f)

## Defining the static environment



In [13]:
class DroneGrid():

    def __init__(self, grid):

        self.grid = grid
        self.grid_size = np.array(grid).shape
        self.observation_space = (self.grid_size[0]), (self.grid_size[1])
        self.action_space = [0, 1, 2, 3] # 4 discrete actions: 0 = up, 1 = down, 2 = left, 3 = right
        self.start_pos = (0, 0)  # Starting position at top left corner
        self.goal_pos = (self.grid_size[0] - 1, self.grid_size[1] - 1)  # Goal position at bottom right corner
        self.current_pos = self.start_pos  # Initialize current position

    def reset(self):
        self.current_pos = self.start_pos  # Reset current position to start position
        return self.current_pos  # Return initial state


In [14]:
class QLEnvironment(DroneGrid):
    def __init__(self, grid):
        super().__init__(grid)

    def step(self, action):

        assert action in self.action_space, f"Invalid action {action}"  # Check if action is valid

        # Define movement based on action
        if action == 0:  # Up
            new_pos = (self.current_pos[0], self.current_pos[1] - 1)
        elif action == 1:  # Down
            new_pos = (self.current_pos[0], self.current_pos[1] + 1)
        elif action == 2:  # Left
            new_pos = (self.current_pos[0] - 1, self.current_pos[1])
        elif action == 3:  # Right
            new_pos = (self.current_pos[0] + 1, self.current_pos[1])

        # Check if new position is within bounds and not an obstacle
        if 0 <= new_pos[0] < self.grid_size[0] and 0 <= new_pos[1] < self.grid_size[1] and self.grid[new_pos[1]][new_pos[0]] != 1:

            self.current_pos = new_pos  # Update current position

            # Check if goal state is reached
            done = (self.current_pos == self.goal_pos)

            # Calculate reward
            if done:
                reward = 1.0  # Positive reward for reaching the goal

            elif self.grid[new_pos[1]][new_pos[0]] == 1:
                done = True
                reward = -1 # Negative reward for going in a wall

            else:
                reward = 0 #Negative reward for non-goal state

        else:
            done = False
            reward = 0  # Negative reward for going out of bounds

        return self.current_pos, reward, done

Function to compute an action in function of the epsilon-greedy algorith

In [7]:
def compute_action(current_state, Q_table, epsilon, environment):

    if np.random.uniform(0,1) < epsilon:
        return np.random.choice(range(len(environment.action_space)))

    else:
        return np.argmax(Q_table[current_state])

Loading a personalized map

In [8]:
map_simple = load('map_simple.pkl')
print(map_simple)

map_mid = load('map_mid.pkl')
print(map_mid)

map_hard = load('map_hard.pkl')
print(map_hard)

[[0, 0, 0, 0, 0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1, 1, 0, 0, 0], [1, 0, 0, 0, 0, 1, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
[[0, 0, 0, 0, 0, 1, 1, 0, 0, 0], [0, 0, 0, 0, 0, 1, 1, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0, 0, 0, 0], [1, 1, 0, 0, 1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 1, 1, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0, 1, 1, 0], [0, 0, 0, 1, 1, 0, 0, 1, 1, 0], [0, 0, 0, 1, 1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
[[0, 0, 0, 0, 1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 1, 1, 0, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0, 1, 1, 0], [1, 1, 0, 0, 0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0, 1, 1, 1], [0, 1, 0, 0, 1, 1, 1, 1, 1, 1], [0, 0, 0, 1, 1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]


Creating an instance of the environment through the loaded map

In [9]:
environment_simple = QLEnvironment(map_simple)
print(environment_simple.observation_space)
print(environment_simple.action_space)

environment_mid = QLEnvironment(map_mid)
print(environment_mid.observation_space)
print(environment_mid.action_space)

environment_hard = QLEnvironment(map_hard)
print(environment_hard.observation_space)
print(environment_hard.action_space)

(10, 10)
[0, 1, 2, 3]
(10, 10)
[0, 1, 2, 3]
(10, 10)
[0, 1, 2, 3]


---
# Q-Learning
---


In [None]:
def q_learning(env, alpha=1, gamma=0.9,  epsilon=0.99, epsilon_decay=0.0001, episodes = 10000, max_iter_episode = 100):

    Q = np.zeros((env.grid_size[0]*env.grid_size[1], len(env.action_space)), dtype=np.float32) #Initialize the Q table to all 0s
    rewards = []

    for e in range(episodes): #Run 1k training runs

        state = env.reset() #Part of OpenAI where you need to reset at the start of each run
        total_reward = 0 #Set initial reward to 0
        iteration = 0

        while True: #Loop until done == True
            #IF random number is less than epsilon grab the random action else grab the argument max of Q[state]

            current_state_index = env.current_pos[0] + env.current_pos[1]*env.observation_space[0] # Obtain the index of the state

            action = compute_action(current_state_index, Q, epsilon, env) # Compute the action for the current state in function of the epsilon_greedy

            posp1, reward, done = env.step(action) #Send your action to OpenAI and get back the tuple

            state_tp1_index = posp1[0] + posp1[1]*env.observation_space[0]

            total_reward += reward #Increment your reward

            Q[current_state_index][action] = Q[current_state_index][action] + alpha * (reward + gamma * np.max(Q[state_tp1_index]) - Q[current_state_index][action])

             #Make sure to keep random at 10%

            if done:
                print(f"Episode: {e}, Reward: {total_reward}")
                break

            iteration += 1

            if iteration >= max_iter_episode:
                print(f"Episode: {e}, Reward: {total_reward}")
                break


        if epsilon>0.1:
            epsilon *= np.exp(-epsilon_decay)

        rewards.append(total_reward)

    return Q, rewards

Effective running of the Q-Learning and saving of the trained Q-Table

In [None]:
q_simple, _ = q_learning(environment_simple, epsilon_decay = 0.0003, episodes = 2000)
save('Simple - Q-Learning.pkl', q_simple)

q_mid, _ = q_learning(environment_mid, episodes = 3000, epsilon_decay = 0.0003, max_iter_episode = 200)
save('Mid - Q-Learning.pkl', q_mid)

q_hard, _ = q_learning(environment_hard, episodes = 5000, epsilon_decay = 0.00005, max_iter_episode = 300)
save('Hard - Q-Learning.pkl', q_hard)



[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
Episode: 0, Reward: 0
Episode: 1, Reward: 0
Episode: 2, Reward: 0
Episode: 3, Reward: 0
Episode: 4, Reward: 0
Episode: 5, Reward: 0
Episode: 6, Reward: 0
Episode: 7, Reward: 0
Episode: 8, Reward: 0
Episode: 9, Reward: 0
Episode: 10, Reward: 0
Episode: 11, Reward: 0
Episode: 12, Reward: 0
Episode: 13, Reward: 0
Episode: 14, Reward: 0
Episode: 15, Reward: 0
Episode: 16, Reward: 0
Episode: 17, Reward: 0
Episode: 18, Reward: 1.0
Episode: 19, Reward: 0
Episode: 20, Reward: 0
Episode: 21, Reward: 0
Episode: 22, Reward: 0
Episode: 23, Reward: 0
Episode: 24, Reward: 0
Episode: 25, Reward: 0
Episode: 26, Reward: 0
Episode: 27, Reward: 0
Episode: 28, Reward: 0
Episode: 29, Reward: 0
Episode: 30, Reward: 0
Episode: 31, Reward: 0
Episode: 32, Reward: 0
Episode: 33, Reward: 0
Episode: 34, Reward: 0
Episode: 35, Reward: 0
Episode: 36, Reward: 0
Episode: 37, Reward: 0
Episode: 38, Reward: 0
Episode: 39, Reward

In [None]:
from google.colab import files

files.download('Simple - Q-Learning.pkl')
files.download('Mid - Q-Learning.pkl')
files.download('Hard - Q-Learning.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Loading Q-Learning trained Q-Table and checking if successful

In [None]:
trained_q_simple = load('Simple - Q-Learning.pkl')
print('simple', trained_q_simple)

trained_q_mid = load('Mid - Q-Learning.pkl')
print('mid', trained_q_mid)

trained_q_hard = load('Hard - Q-Learning.pkl')
print('hard', trained_q_hard)

simple [[0.15009463 0.16677181 0.15009463 0.16677181]
 [0.16677181 0.18530202 0.15009463 0.18530202]
 [0.18530202 0.20589113 0.16677181 0.20589113]
 [0.20589113 0.22876792 0.18530202 0.22876792]
 [0.22876792 0.25418657 0.20589113 0.25418657]
 [0.25418657 0.28242952 0.22876792 0.25418657]
 [0.         0.         0.         0.        ]
 [0.3138106  0.34867844 0.3138106  0.34867844]
 [0.34867844 0.38742048 0.3138106  0.38742048]
 [0.38742048 0.4304672  0.34867844 0.38742048]
 [0.15009463 0.18530202 0.16677181 0.18530202]
 [0.16677181 0.20589113 0.16677181 0.20589113]
 [0.18530202 0.20589113 0.18530202 0.22876792]
 [0.20589113 0.22876792 0.20589113 0.25418657]
 [0.22876792 0.28242952 0.22876792 0.28242952]
 [0.25418657 0.3138106  0.25418657 0.3138106 ]
 [0.3138106  0.34867844 0.28242952 0.34867844]
 [0.3138106  0.38742048 0.3138106  0.38742048]
 [0.34867844 0.4304672  0.34867844 0.4304672 ]
 [0.38742048 0.47829688 0.38742048 0.4304672 ]
 [0.16677181 0.20589113 0.18530202 0.20589113]
 [0.18

---

# SARSA

---

In [None]:
def sarsa(env, alpha=0.8, gamma=0.9,  epsilon=0.99, epsilon_decay=0.0005, episodes = 2000, max_iter_episode = 100):

    Q = np.zeros((env.grid_size[0]*env.grid_size[1], len(env.action_space)), dtype=np.float32) #Initialize the Q table to all 0s
    rewards = []

    for e in range(episodes): #Run 1k training runs

        state = env.reset() #Part of OpenAI where you need to reset at the start of each run
        total_reward = 0 #Set initial reward to 0
        iteration = 0

        if e % 1000 == 0:
            print(f"Epsilon:{epsilon}")

        while True: #Loop until done == True
            #IF random number is less than epsilon grab the random action else grab the argument max of Q[state]

            current_state_index = env.current_pos[0] + env.current_pos[1]*env.observation_space[0] # Obtain the index of the state

            action = compute_action(current_state_index, Q, epsilon, env) # Compute the action for the current state using Q-Table

            posp1, reward, done = env.step(action) # Send the action to the environment and obtain the new position, the reward and the termination flag

            state_tp1_index = posp1[0] + posp1[1]*env.observation_space[0] # Compute the index of the state at t+1
            action_tp1 = compute_action(state_tp1_index, Q, epsilon, env) # Compute the action for the next state using Q-Table

            total_reward += reward # Increment the reward

            Q[current_state_index][action] = Q[current_state_index][action] + alpha * (reward + gamma*Q[state_tp1_index][action_tp1] - Q[current_state_index][action])

             #Make sure to keep random at 10%

            if done:
                print(f"Episode: {e}, Reward: {total_reward}")
                break

            iteration += 1

            if iteration >= max_iter_episode:
                print(f"Episode: {e}, Reward: {total_reward}")
                break

        if epsilon > 0.1:
            epsilon *= np.exp(-epsilon_decay)

    return Q, rewards

Effective running of SARSA and saving of the trained Q-Table

In [None]:
s_simple, _ = sarsa(environment_simple, epsilon_decay = 0.0005, episodes = 3000, gamma  = 0.99)
save('Simple - SARSA.pkl', s_simple)
'''
s_mid, _ = sarsa(environment_mid, epsilon_decay = 0.0001, episodes = 3000)
save('Mid - SARSA.pkl', s_mid)

s_hard, _ = sarsa(environment_hard, epsilon_decay = 0.00005, episodes = 8000, max_iter_episode = 300)
save('Hard - SARSA.pkl', s_hard)'''

Epsilon:0.99
Episode: 0, Reward: 1.0
Episode: 1, Reward: 0
Episode: 2, Reward: 0
Episode: 3, Reward: 0
Episode: 4, Reward: 0
Episode: 5, Reward: 0
Episode: 6, Reward: 0
Episode: 7, Reward: 0
Episode: 8, Reward: 0
Episode: 9, Reward: 0
Episode: 10, Reward: 0
Episode: 11, Reward: 0
Episode: 12, Reward: 1.0
Episode: 13, Reward: 0
Episode: 14, Reward: 0
Episode: 15, Reward: 0
Episode: 16, Reward: 0
Episode: 17, Reward: 1.0
Episode: 18, Reward: 0
Episode: 19, Reward: 0
Episode: 20, Reward: 0
Episode: 21, Reward: 0
Episode: 22, Reward: 0
Episode: 23, Reward: 0
Episode: 24, Reward: 0
Episode: 25, Reward: 0
Episode: 26, Reward: 0
Episode: 27, Reward: 0
Episode: 28, Reward: 0
Episode: 29, Reward: 0
Episode: 30, Reward: 0
Episode: 31, Reward: 0
Episode: 32, Reward: 0
Episode: 33, Reward: 1.0
Episode: 34, Reward: 0
Episode: 35, Reward: 0
Episode: 36, Reward: 1.0
Episode: 37, Reward: 0
Episode: 38, Reward: 0
Episode: 39, Reward: 0
Episode: 40, Reward: 0
Episode: 41, Reward: 0
Episode: 42, Reward: 

"\ns_mid, _ = sarsa(environment_mid, epsilon_decay = 0.0001, episodes = 3000)\nsave('Mid - SARSA.pkl', s_mid)\n\ns_hard, _ = sarsa(environment_hard, epsilon_decay = 0.00005, episodes = 8000, max_iter_episode = 300)\nsave('Hard - SARSA.pkl', s_hard)"

Loading SARSA trained Q-Table and checking if successful

In [None]:
trained_s_simple = load('Simple - SARSA.pkl')
print('simple', trained_s_simple)

trained_s_mid = load('Mid - SARSA.pkl')
print('mid', trained_s_mid)

trained_s_hard = load('Hard - SARSA.pkl')
print('hard',trained_s_hard)

simple [[6.52714372e-01 6.47494018e-01 7.23750412e-01 7.58664191e-01]
 [7.60195136e-01 7.64874637e-01 6.92775786e-01 7.23410845e-01]
 [7.45654404e-01 7.29906619e-01 7.23038316e-01 7.63762355e-01]
 [7.63710260e-01 7.45432079e-01 7.45864093e-01 7.62138367e-01]
 [8.16025615e-01 8.41918528e-01 7.66182601e-01 7.68533170e-01]
 [7.70974696e-01 7.83883214e-01 7.75534451e-01 7.72456050e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [7.46203065e-01 7.51334310e-01 7.98488259e-01 8.11325431e-01]
 [8.05480361e-01 8.43513548e-01 7.30618834e-01 6.11588776e-01]
 [7.14743555e-01 8.20641518e-01 7.66671777e-01 7.56867290e-01]
 [7.11885273e-01 3.55403751e-01 7.02281117e-01 7.06793249e-01]
 [7.62565076e-01 7.55830228e-01 7.25627601e-01 7.73490846e-01]
 [7.59242892e-01 7.53112972e-01 7.62546718e-01 7.71887779e-01]
 [7.65662134e-01 7.72811651e-01 7.67614424e-01 8.38021934e-01]
 [8.19480836e-01 8.27804029e-01 8.10175121e-01 7.76852787e-01]
 [7.75588453e-01 8.42435360e-01 7.74728417e-01 7

In [None]:
from google.colab import files

files.download('Simple - SARSA.pkl')
#files.download('Mid - SARSA.pkl')
#files.download('Hard - SARSA.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

---

# Alternating Q-Learning / SARSA

---

In [None]:
def alternating(env, alpha=1, gamma=0.9,  epsilon=1, epsilon_decay=0.005, episodes = 2000, max_iter_episode = 200):

    Q = np.zeros((env.grid_size[0]*env.grid_size[1], len(env.action_space)), dtype=np.float32) #Initialize the Q table to all 0s
    rewards = []

    for e in range(episodes): #Run 1k training runs

        state = env.reset() #Part of OpenAI where you need to reset at the start of each run
        total_reward = 0 #Set initial reward to 0
        iteration = 0
        while True: #Loop until done == True

            random_num = random.random() # Generate a random number between 0 and 1

            current_state_index = env.current_pos[0] + env.current_pos[1]*env.observation_space[0] # Obtain the index of the state

            action = compute_action(current_state_index, Q, epsilon, env) # Compute the action for the current state using Q-Table

            posp1, reward, done = env.step(action) # Send the action to the environment and obtain the new position, the reward and the termination flag

            state_tp1_index = posp1[0] + posp1[1]*env.observation_space[0] # Compute the index of the state at t+1
            action_tp1 = compute_action(state_tp1_index, Q, epsilon, env) # Compute the action for the next state using Q-Table

            total_reward += reward # Increment the reward

            if (random_num <= 0.5): # We use Q-learning
                Q[current_state_index][action] = Q[current_state_index][action] + alpha * (reward + gamma * np.max(Q[state_tp1_index]) - Q[current_state_index][action])

            else: # We use SARSA
                Q[current_state_index][action] = Q[current_state_index][action] + alpha * (reward + gamma*Q[state_tp1_index][action_tp1] - Q[current_state_index][action])

            if done:
                print(f"Episode: {e}, Reward: {total_reward}")
                break


            iteration += 1

            if iteration >= max_iter_episode:
                print(f"Episode: {e}, Reward: {total_reward}")
                break

        if epsilon > 0.1:
            epsilon *= np.exp(-epsilon_decay)

    return Q, rewards

Effective running of alternating Q-Learning/SARSA and saving of the trained Q-Table

In [None]:
a_simple, _ = alternating(environment_simple, epsilon_decay = 0.0003)
save('Simple - Alternating.pkl', a_simple)

a_mid, _ = alternating(environment_mid)
save('Mid - Alternating.pkl', a_mid)

a_hard, _ = alternating(environment_hard)
save('Hard - Alternating.pkl', a_hard)

[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
Episode: 1000, Reward: 1.0
Episode: 1001, Reward: 1.0
Episode: 1002, Reward: 1.0
Episode: 1003, Reward: 1.0
Episode: 1004, Reward: 1.0
Episode: 1005, Reward: 1.0
Episode: 1006, Reward: 1.0
Episode: 1007, Reward: 1.0
Episode: 1008, Reward: 1.0
Episode: 1009, Reward: 1.0
Episode: 1010, Reward: 1.0
Episode: 1011, Reward: 1.0
Episode: 1012, Reward: 1.0
Episode: 1013, Reward: 1.0
Episode: 1014, Reward: 1.0
Episode: 1015, Reward: 1.0
Episode: 1016, Reward: 1.0
Episode: 1017, Reward: 1.0
Episode: 1018, Reward: 1.0
Episode: 1019, Reward: 1.0
Episode: 1020, Reward: 1.0
Episode: 1021, Reward: 1.0
Episode: 1022, Reward: 1.0
Episode: 1023, Reward: 1.0
Episode: 1024, Reward: 0
Episode: 1025, Reward: 0
Episode: 1026, Reward: 1.0
Episode: 1027, Reward: 1.0
Episode: 1028, Reward: 1.0
Episode: 1029, Reward: 1.0
Episode: 1030, Reward: 1.0
Episode: 1031, Reward: 1.0
Episode: 1032, Reward: 1.0
Episode: 1033, Reward

Loading alternating trained Q-Table and checking if successful

In [None]:
trained_a_simple = load('Simple - Alternating.pkl')
print(trained_a_simple)

trained_a_mid = load('Mid - Alternating.pkl')
print(trained_a_mid)

trained_a_hard = load('Hard - Alternating.pkl')
print(trained_a_hard)

[[0.0717898  0.07976644 0.07976644 0.05814974]
 [0.06461082 0.0717898  0.0717898  0.07976644]
 [0.08862938 0.08862938 0.08862938 0.08862938]
 [0.10941899 0.09847709 0.10941899 0.12157665]
 [0.12157665 0.13508517 0.12157665 0.13508517]
 [0.13508517 0.16677181 0.10941899 0.15009463]
 [0.         0.         0.         0.        ]
 [0.15009463 0.25418657 0.16677181 0.16677181]
 [0.15009463 0.15009463 0.16677181 0.3138106 ]
 [0.15009463 0.06461082 0.28242952 0.16677181]
 [0.0717898  0.04239116 0.07976644 0.0717898 ]
 [0.0717898  0.08862938 0.07976644 0.07976644]
 [0.07976644 0.07976644 0.0717898  0.04239116]
 [0.10941899 0.09847709 0.09847709 0.15009463]
 [0.12157665 0.15009463 0.13508517 0.15009463]
 [0.15009463 0.18530202 0.09847709 0.16677181]
 [0.16677181 0.18530202 0.18530202 0.18530202]
 [0.15009463 0.25418657 0.18530202 0.08862938]
 [0.13508517 0.25418657 0.09847709 0.07976644]
 [0.25418657 0.25418657 0.0717898  0.16677181]
 [0.07976644 0.04710129 0.07976644 0.07976644]
 [0.08862938 

In [None]:
from google.colab import files

files.download('Simple - Alternating.pkl')
#files.download('Mid - Alternating.pkl')
#files.download('Hard - Alternating.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

---

# Deep Q-Learning

---

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

Checking availability of CUDA

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cpu


Defining the neural network

In [3]:
class DeepQNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DeepQNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)

        return x

Defining the exploration-exploit function


In [None]:
def compute_action_torch(current_state, epsilon, environment, policy_net):

    if np.random.uniform(0,1) < epsilon:
        return np.random.choice(range(len(environment.action_space))) # Exploration

    else:
        current_state = torch.FloatTensor(current_state).unsqueeze(0)
        q = policy_net(current_state)
        return torch.argmax(q).item() # Exploit

Defining the optimizing function


In [4]:
def optimize(memory, policy_net, target_net, gamma, optimizer, batch_size = 64):

    if len(memory) < batch_size:
        return

    batch = random.sample(memory, batch_size)
    state_batch, action_batch, reward_batch, next_state_batch, done_batch = zip(*batch)

    state_batch = torch.FloatTensor(state_batch)
    action_batch = torch.LongTensor(action_batch).unsqueeze(1)
    reward_batch = torch.FloatTensor(reward_batch)
    next_state_batch = torch.FloatTensor(next_state_batch)
    done_batch = torch.FloatTensor(done_batch)

    # Compute Q-values for current states
    q_values = policy_net(state_batch).gather(1, action_batch).squeeze()

    # Compute target Q-values using the target network
    with torch.no_grad():
        max_next_q_values = target_net(next_state_batch).max(1)[0]
        target_q_values = reward_batch + gamma * max_next_q_values * (1 - done_batch)

    loss = nn.MSELoss()(q_values, target_q_values)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Defining the Deep Q-Learning function

# **IL FAUDRAIT ENVOYER LA GRILLE CONTENANT LA POSITION DU DRONE EN TANT QU'ETAT DANS LE RESEAU ET NON PAS SEULEMENT L'INDEX DE L'ETAT COMME POUR LE Q-LEARNING**

In [5]:
def deep_q_learning(env, memory, policy_net, target_net, optimizer, alpha=0.01, gamma=0.9,  epsilon=1, epsilon_decay=0.005, target_update_freq = 800, episodes = 3000):

    steps = 0
    rewards = []

    for e in range(episodes): #Run 1k training runs

        state = env.reset() #Part of OpenAI where you need to reset at the start of each run
        total_reward = 0 #Set initial reward to 0

        while True: #Loop until done == True
            #IF random number is less than epsilon grab the random action else grab the argument max of Q[state]

            current_state_index = env.current_pos[0] + env.current_pos[1]*env.observation_space[0] # Obtain the index of the state

            action = compute_action_torch(current_state_index, epsilon, env, policy_net) # Compute the action for the current state in function of the epsilon_greedy

            posp1, reward, done = env.step(action) #Send your action to OpenAI and get back the tuple

            state_tp1_index = posp1[0] + posp1[1]*env.observation_space[0]

            memory.append((current_state_index, action, reward, state_tp1_index, done))

            state = state_tp1_index

            total_reward += reward #Increment your reward

            optimize(memory, policy_net, target_net, gamma, optimizer)

            if steps % target_update_freq == 0:
                target_net.load_state_dict(policy_net.state_dict())

            if done:
                print(f"Episode: {e}, Reward: {total_reward}")
                break

            steps += 1


        if epsilon>0.1:
            epsilon *= np.exp(-epsilon_decay)

        rewards.append(total_reward)

    return rewards

Function to save the weights of the policy_net and the target_net

In [7]:
def save_weights(filename, neural_net):
    torch.save(neural_net.state_dict(), filename)

In [8]:
def load_weights(filename, blank_net):
    blank_net.load_state_dict(torch.load(filename))

Initialization of the agent

In [16]:
policy_net = DeepQNetwork(STATE_SIZE[0]*STATE_SIZE[1], 100, ACTION_SIZE)
target_net = DeepQNetwork(STATE_SIZE[0]*STATE_SIZE[1], 100, ACTION_SIZE)
target_net.load_state_dict(policy_net.state_dict())

learning_rate = 0.001
memory_size = 1000

optimizer = optim.Adam(policy_net.parameters(), lr = learning_rate)
memory = deque(maxlen = memory_size)

Running of the training

In [None]:
dql_simple = deep_q_learning(environment_simple, memory, policy_net, target_net, optimizer)
dql_mid = deep_q_learning(environment_mid, memory, policy_net, target_net, optimizer)
dql_hard = deep_q_learning(environment_hard, memory, policy_net, target_net, optimizer)

---

# Spatial Computing for Path Planning - SCPP - Personalized algorithm

---

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

Checking availability of CUDA

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cuda


Defining a new type of environment with a different step() method

In [None]:
class SCPPEnvironment(DroneGrid):
    def __init__(self, grid):
        super().__init__(grid)

    def step(self, action):
        pass

Defining the neural network

In [None]:
class NeuralAgent(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(NeuralAgent, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

Function to compute the input tensor during training

In [None]:
def calculate_input_tensor(env):
    # Get the dimensions of the grid
    height, width = env.grid.shape

    # Get the agent's position
    x, y = env.current_pos

    # Initialize the input tensor with zeros
    input_tensor = np.zeros(6)

    # Calculate the distance to the nearest obstacle in the left direction
    for i in range(x-1, -1, -1):
        if grid[i, y] == 1:
            input_tensor[0] = x - i
            break

    # Calculate the distance to the nearest obstacle in the right direction
    for i in range(x+1, height):
        if grid[i, y] == 1:
            input_tensor[1] = i - x
            break

    # Calculate the distance to the nearest obstacle in the up direction
    for j in range(y-1, -1, -1):
        if grid[x, j] == 1:
            input_tensor[2] = y - j
            break

    # Calculate the distance to the nearest obstacle in the down direction
    for j in range(y+1, width):
        if grid[x, j] == 1:
            input_tensor[3] = j - y
            break

    # Calculate the distance to the goal in the horizontal direction
    goal_position = np.argwhere(grid == 2)[0]
    input_tensor[4] = goal_position[1] - y

    # Calculate the distance to the goal in the vertical direction
    input_tensor[5] = goal_position[0] - x

    # Convert the input tensor to a PyTorch tensor
    input_tensor = torch.tensor(input_tensor, dtype=torch.float32).unsqueeze(0)

    return input_tensor

Definition of the training

In [None]:
def scpp(agent, env, num_episodes, criterion, optimizer):
    # Train the agent for a specified number of episodes
    for episode in range(num_episodes):
        # Reset the environment
        state = env.reset()

        # Initialize the episode reward
        episode_reward = 0

        # Loop through the episode
        while True:
            # Convert the state to a PyTorch tensor
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

            # Forward pass
            action_probs = agent(state_tensor)

            # Sample an action from the action probabilities
            action = torch.multinomial(action_probs, num_samples=1).item()

            # Take the action and observe the next state and reward
            next_state, reward, done, _ = env.step(action)

            # Update the episode reward
            episode_reward += reward

            # Convert the next state to a PyTorch tensor
            next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)

            # Calculate the target tensor
            with torch.no_grad():
                next_action_probs = agent(next_state_tensor)
                target = reward + 0.99 * torch.max(next_action_probs)

            # Calculate the loss
            loss = criterion(action_probs[0, action], target)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Update the state
            state = next_state

            # Check if the episode is done
            if done:
                break

        # Print the episode reward every 100 episodes
        if (episode + 1) % 100 == 0:
            print(f'Episode [{episode+1}/{num_episodes}], Episode Reward: {episode_reward:.2f}')