# Reinforcement Learning Basics for our project

[Open me on Google collab](https://colab.research.google.com/github/leonardotorresaltez/routing-model-2025/blob/main/notebooks/reinforcement_learning_basic.ipynb)

This Jupyter notebook is a proof of concept (PoC) for reinforcement learning applied to our use case. It includes the basic elements: the environment, the policy, the agent (pending), and the training loop.

In this example:

- The nodes represent random (x, y) coordinates.

- The starting node is selected at random.

- The goal is to reach all remaining nodes while minimizing the total distance traveled.




## Configurations

In [16]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.2.3-py3-none-any.whl.metadata (10 kB)
Collecting cloudpickle>=1.2.0 (from gymnasium)
  Downloading cloudpickle-3.1.2-py3-none-any.whl.metadata (7.1 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.2.3-py3-none-any.whl (952 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m952.1/952.1 kB[0m [31m13.6 MB/s[0m  [33m0:00:00[0m
[?25hDownloading cloudpickle-3.1.2-py3-none-any.whl (22 kB)
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, cloudpickle, gymnasium
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3/3[0m [gymnasium]/3[0m [gymnasium]
[1A[2KSuccessfully installed cloudpickle-3.1.2 farama-notifications-0.0.4 gymnasium-1.2.3


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
import gymnasium as gym
from gymnasium import spaces

# Configuración de dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Trabajando con: {device}")

Trabajando con: cpu


## The environment

The environment is like a snapshot of a single moment:

- nodes
- current node
- nodes already visited

In [18]:
class Environment:
    def __init__(self, num_nodes):
        self.num_nodes = num_nodes
        self.reset()

    def reset(self):
        #input matrix of graph node coordinates (x,y)  num_nodes rows and 2 columns
        self.nodes = torch.rand(self.num_nodes, 2)
        #choose a random starting node
        self.current = random.randint(0, self.num_nodes - 1)
        #track visited nodes, all initially unvisited
        self.visited = torch.zeros(self.num_nodes, dtype=torch.bool)
        #mark starting node as visited
        self.visited[self.current] = True
        #track the tour
        self.tour = [self.current]
        return self.get_state()

    def get_state(self):
        return self.nodes, self.current, self.visited

    def step(self, action):
        prev = self.current
        self.current = action
        self.visited[action] = True
        self.tour.append(action)

        dist = torch.norm(self.nodes[prev] - self.nodes[action])
        reward = -dist

        done = self.visited.all()
        return self.get_state(), reward, done

## The policy ( implemented with Attention )

This is a policy learning model, which optimizes its learnable parameters during the training loop.

In this example, it is an attention-based policy. It implements an attention mechanism to determine the next node to visit.

In [19]:
class AttentionPolicy(nn.Module):
    def __init__(self, node_dim=2, embed_dim=128):
        super().__init__()
        self.node_embed = nn.Linear(node_dim, embed_dim)

        self.query = nn.Linear(embed_dim, embed_dim)
        self.key = nn.Linear(embed_dim, embed_dim)

    def forward(self, nodes, current_node_idx, visited_mask):
        """
        nodes: [N, node_dim]             node features (e.g. coordinates)
        current_node_idx: int            current position
        visited_mask: [N] (bool)         True = already visited
        """

        # Embed nodes
        h = self.node_embed(nodes)       # [N, embed_dim]

        # Query = embedding of current node
        q = self.query(h[current_node_idx])   # [embed_dim]

        # Keys = all nodes
        k = self.key(h)                       # [N, embed_dim]

        # Attention scores
        scores = torch.matmul(k, q)           # [N]

        # Mask visited nodes, the agent cannot revisit them
        scores = scores.masked_fill(visited_mask, float("-inf"))

        # Policy = probability of choosing next node
        # política estocástica
        probs = F.softmax(scores, dim=0)

        return probs
    

## Agent

The agent choose the action using the policy ( neural network algoritm )

In [20]:
class Agent:
    def __init__(self, node_dim=2, embed_dim=128, lr=1e-3):
        self.policy = AttentionPolicy(node_dim, embed_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

        # Buffers for REINFORCE
        self.log_probs = []
        self.rewards = []

    def act(self, state):
        """
        state = (nodes, current_node, visited_mask)
        """
        nodes, current, visited = state

        probs = self.policy(nodes, current, visited.clone())
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()

        self.log_probs.append(dist.log_prob(action))
        return action.item()

    def store_reward(self, reward):
        self.rewards.append(reward)

    def update(self):
        """
        Policy Gradient (REINFORCE)
        """
        R = sum(self.rewards)
        loss = -torch.stack(self.log_probs).sum() * R

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Clear buffers
        self.log_probs.clear()
        self.rewards.clear()

## Traingin Loop

The training loop is simple; as we can see, the model is successfully learning. It is necessary to clone the "visited" tensor to avoid a PyTorch error related to in-place operations.

In [15]:

def train():
    env = Environment(num_nodes=10)
    agent = Agent()


    for episode in range(500):
        state = env.reset()
        done = False
        episode_reward = 0.0 

        while not done:
            action = agent.act(state)
            state, reward, done = env.step(action)
            
            agent.store_reward(reward)       
            episode_reward += reward.item()
        agent.update()



        if episode % 50 == 0:
            print(
                f"Episode {episode:4d} | "
                f"Total reward: {episode_reward:.3f}"
            )



train()

Episode    0 | Total reward: -4.393
Episode   50 | Total reward: -3.913
Episode  100 | Total reward: -2.747
Episode  150 | Total reward: -2.726
Episode  200 | Total reward: -3.084
Episode  250 | Total reward: -2.901
Episode  300 | Total reward: -4.068
Episode  350 | Total reward: -3.821
Episode  400 | Total reward: -3.137
Episode  450 | Total reward: -3.439


## The environment with Gymnasium

In [21]:
class GymnasiumEnvironment(gym.Env):
    metadata = {"render_modes": []}

    def __init__(self, num_nodes=10):
        super().__init__()
        self.num_nodes = num_nodes

        self.action_space = spaces.Discrete(num_nodes)

        self.observation_space = spaces.Dict({
            "nodes": spaces.Box(low=0.0, high=1.0, shape=(num_nodes, 2)),
            "current": spaces.Discrete(num_nodes),
            "visited": spaces.MultiBinary(num_nodes)
        })

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.nodes = torch.rand(self.num_nodes, 2)
        self.current = random.randrange(self.num_nodes)

        self.visited = torch.zeros(self.num_nodes, dtype=torch.bool)
        self.visited[self.current] = True

        self.tour = [self.current]

        return self._get_state(), {}

    def _get_state(self):
        return (
            self.nodes.clone(),
            self.current,
            self.visited.clone()
        )

    def step(self, action):
        prev = self.current
        self.current = action
        self.visited[action] = True
        self.tour.append(action)

        dist = torch.norm(self.nodes[prev] - self.nodes[action])
        reward = -dist

        terminated = self.visited.all()
        truncated = False

        return self._get_state(), reward, terminated, truncated, {}

## Traingin Loop with Gymnasium

In [23]:
def gymnasium_train():
    env = GymnasiumEnvironment(num_nodes=10)
    agent = Agent()

    for episode in range(500):
        state, _ = env.reset()
        terminated = False
        truncated = False

        episode_reward = 0.0

        while not (terminated or truncated):
            action = agent.act(state)
            state, reward, terminated, truncated, _ = env.step(action)

            agent.store_reward(reward)
            episode_reward += reward.item()

        agent.update()

        if episode % 50 == 0:
            print(
                f"Episode {episode:4d} | "
                f"Total reward: {episode_reward:.3f}"
            )
            
gymnasium_train()

Episode    0 | Total reward: -5.632
Episode   50 | Total reward: -2.818
Episode  100 | Total reward: -3.165
Episode  150 | Total reward: -3.110
Episode  200 | Total reward: -3.442
Episode  250 | Total reward: -4.044
Episode  300 | Total reward: -3.701
Episode  350 | Total reward: -1.989
Episode  400 | Total reward: -2.731
Episode  450 | Total reward: -2.976
