# Reinforcement Learning
### Deep Q-learning : The Q-learning with deep learning (an MLP for regression)

**Deep Q-Learning** is a *reinforcement learning* algorithm that combines **Q-learning** with **deep neural networks** to handle environments with high-dimensional state spaces (e.g., images). Instead of using a **Q-table**, a deep neural network approximates the Q-value function, predicting the value of each action for a given state. The agent learns by minimizing the difference between predicted and target Q-values using **experience replay** and a separate **target network** to stabilize training. 
<br> In fact, the ϵ-greedy action selection uses a **policy network** (an MLP for regression) and this policy network is updated by samples from the environment. However, from time to time, the parameters of the policy network is coped to the target network. This target network is used only to form the q-target for each transition (s,a,r,s',done). The **done** is true when s' is a terminal state. Otherwise, it is false. 
<br>Hint:THe policy network network is to approximate $q(s,a)$ such that:
<br> $\large q(s,a)=F_a(\boldsymbol{x}(s)))$
<br>where $\boldsymbol{x}(s)$ is the feature vector extracted from state $s$. And $F_a(\boldsymbol{x}(s)))$ is the $a$th component of the output vector $F(\boldsymbol{x}(s)))$ of the policy network. 
<hr>

The example in this Notebook is the same **Grid World** we used for the SARSA with RBF network. generally, we can have a grid of any size
 - **States:** A sizexsize grid (size*size states), labeled as (0,0) to (size-1,size-1).
 - **Actions:** Up, Down, Left, Right.
 - **Rewards:**
    - Reaching the goal state (size-1,size-1) gives a reward of +10.
    - Reaching a "pit" state (size/2,size/2) gives a reward of −10.
    - All other transitions give a reward of −1.
- **Terminal States:** (size-1,size-1) (goal) and (size/2,size/2) (pit).
- **Transition Probabilities:**
    - Moving in the intended direction succeeds with probability 0.8.
    - With probability 0.2, the agent moves in a random direction

<hr>
https://github.com/ostad-ai/Reinforcement-Learning
<br> Explanation: https://www.pinterest.com/HamedShahHosseini/Reinforcement-Learning

In [1]:
# Import required modules
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

In [2]:
# The GridWorld environment
class GridWorld:
    def __init__(self, size=5):
        self.size = size
        self.actions = ['up', 'down', 'left', 'right']
        self.terminal = {(size-1, size-1): 10}  # Goal at bottom-right
        self.pits = {(size//2, size//2): -10}   # Pit at center
        self.current_state = None

    def reset(self):
        self.current_state = (0, 0)
        return self._state_to_features(self.current_state)

    def step(self, action_idx):
        action = self.actions[action_idx]
        i, j = self.current_state

        if self.current_state in self.terminal:
            return self._state_to_features(self.current_state), 0, True

        # Movement with stochasticity
        if random.random() < 0.8:
            next_state = self._move(action, i, j)
        else:
            next_state = self._move(random.choice(self.actions), i, j)

        self.current_state = next_state
        reward = self.terminal.get(next_state, self.pits.get(next_state, -1))
        done = next_state in self.terminal or next_state in self.pits
        return self._state_to_features(next_state), reward, done

    def _move(self, action, i, j):
        if action == 'up': return (max(i-1, 0), j)
        elif action == 'down': return (min(i+1, self.size-1), j)
        elif action == 'left': return (i, max(j-1, 0))
        elif action == 'right': return (i, min(j+1, self.size-1))

    def _state_to_features(self, state):
        """Enhanced features for larger grids"""
        i, j = state
        features = [
            i / (self.size-1),  # Normalized x
            j / (self.size-1),  # Normalized y
             i / (self.size-1)*j / (self.size-1),  
            (self.size-1 - i) / (self.size-1),  # Progress right
            (self.size-1 - j) / (self.size-1),  # Progress down
            abs(i - self.size//2) / self.size,  # Pit x-distance
            abs(j - self.size//2) / self.size,  # Pit y-distance
            float(i in [0, self.size-1]),  # Edge x
            float(j in [0, self.size-1])   # Edge y
        ]
        return torch.FloatTensor(features)

In [3]:
# The structure of the policy (target) network
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
        
    def forward(self, x):
        return self.net(x)
    
# The class of the deep q-learning agent
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=0.005, gamma=0.99):
        self.policy_net = DQN(state_dim, action_dim)
        self.target_net = DQN(state_dim, action_dim)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.gamma = gamma
        self.action_dim = action_dim
        
    # Update weights and biases of policy network by batch
    def update(self, batch):
        states, actions, rewards, next_states, dones = batch
        
        # Current Q values for chosen actions
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(-1))

        # Next Q values from target network (using max)
        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0].unsqueeze(-1)
            targets = rewards.unsqueeze(-1) + self.gamma * next_q * (1 - dones.unsqueeze(-1))

        # Compute loss
        loss = nn.MSELoss()(current_q, targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
# epsilon-greedy action selection implementation
def epsilon_greedy(net, state, epsilon, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim-1)
    else:
        with torch.no_grad():
            return net(state.unsqueeze(0)).argmax().item()

In [4]:
# Train the policy and target networks by q-targets
def train_dqn(env, episodes=2000, batch_size=32, gamma=0.99):
    state_dim = len(env._state_to_features((0,0)))
    action_dim = len(env.actions)
    agent = DQNAgent(state_dim, action_dim)
    buffer = deque(maxlen=10000) #replay buffer
    epsilon = 1.0
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        
        while True:
            action = epsilon_greedy(agent.policy_net, state, epsilon, action_dim)
            next_state, reward, done = env.step(action)
            
            # Store experience in buffer (no next_action needed for Q-learning)
            buffer.append((
                state, 
                torch.LongTensor([action]), 
                torch.FloatTensor([reward]), 
                next_state, 
                torch.FloatTensor([done])
            ))
            
            episode_reward += reward
            
            # Train if enough samples in buffer
            if len(buffer) >= batch_size:
                batch_samples = random.sample(buffer, batch_size)
                batch = [
                    torch.stack([t[0] for t in batch_samples]),  # states
                    torch.cat([t[1] for t in batch_samples]),    # actions
                    torch.cat([t[2] for t in batch_samples]),    # rewards
                    torch.stack([t[3] for t in batch_samples]),  # next_states
                    torch.cat([t[4] for t in batch_samples])     # dones
                ]
                agent.update(batch)
            
            if done:
                break
                
            state = next_state
        
        # Update target network
        if episode % 10 == 0:
            agent.target_net.load_state_dict(agent.policy_net.state_dict())
        
        # Decay epsilon
        epsilon=max(.01,1-episode/episodes)
        
        if episode % 50 == 0:
            print(f"Episode {episode}, Reward: {episode_reward}, Epsilon: {epsilon:.2f}")
    
    return agent

In [5]:
# Test the learnt policy with greedy action selection
def test_policy(env, agent):
    state = env.reset()
    done = False
    total_reward = 0
    steps = 0
    
    print("\nTesting trained policy:")
    while not done:
        with torch.no_grad():
            action = agent.policy_net(state.unsqueeze(0)).argmax().item()
        state, reward, done = env.step(action)
        total_reward += reward
        steps += 1
        print(f"Step {steps}: At {env.current_state}, took {env.actions[action]}, Reward: {reward}")
    
    print(f"\nTotal reward: {total_reward}")
    print(f"Reached goal in {steps} steps")

# The main function to run
if __name__ == "__main__":
    env = GridWorld(size=5)
    trained_agent = train_dqn(env, episodes=4000)
    test_policy(env, trained_agent)

Episode 0, Reward: -15, Epsilon: 1.00
Episode 50, Reward: -46, Epsilon: 0.99
Episode 100, Reward: -19, Epsilon: 0.97
Episode 150, Reward: -35, Epsilon: 0.96
Episode 200, Reward: -55, Epsilon: 0.95
Episode 250, Reward: -26, Epsilon: 0.94
Episode 300, Reward: -37, Epsilon: 0.93
Episode 350, Reward: -17, Epsilon: 0.91
Episode 400, Reward: -15, Epsilon: 0.90
Episode 450, Reward: -26, Epsilon: 0.89
Episode 500, Reward: -23, Epsilon: 0.88
Episode 550, Reward: -43, Epsilon: 0.86
Episode 600, Reward: -16, Epsilon: 0.85
Episode 650, Reward: -40, Epsilon: 0.84
Episode 700, Reward: -32, Epsilon: 0.82
Episode 750, Reward: -6, Epsilon: 0.81
Episode 800, Reward: -21, Epsilon: 0.80
Episode 850, Reward: -13, Epsilon: 0.79
Episode 900, Reward: -25, Epsilon: 0.78
Episode 950, Reward: -15, Epsilon: 0.76
Episode 1000, Reward: -6, Epsilon: 0.75
Episode 1050, Reward: -25, Epsilon: 0.74
Episode 1100, Reward: -13, Epsilon: 0.72
Episode 1150, Reward: -17, Epsilon: 0.71
Episode 1200, Reward: -19, Epsilon: 0.70


In [6]:
# The greedy policy learnt for each state of the environment. 
for i in range(env.size):
    for j in range(env.size):
        state=env._state_to_features((i,j))
        with torch.no_grad():
            action=trained_agent.policy_net(state.unsqueeze(0)).argmax().item()
        print(f'state({i},{j}): {env.actions[action]}',end=',')
    print()

state(0,0): right,state(0,1): right,state(0,2): right,state(0,3): down,state(0,4): down,
state(1,0): down,state(1,1): right,state(1,2): right,state(1,3): right,state(1,4): down,
state(2,0): down,state(2,1): down,state(2,2): up,state(2,3): right,state(2,4): down,
state(3,0): right,state(3,1): right,state(3,2): down,state(3,3): right,state(3,4): down,
state(4,0): right,state(4,1): right,state(4,2): right,state(4,3): right,state(4,4): down,
