## Libraries and Variables setup

In [3]:
import sys

sys.path.append("../")

# Basic libraries
import numpy as np
import os
import matplotlib.pyplot as plt
import time
import random
from collections import deque

# PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal

# System libraries
from lib.data.dataplot import *
from lib.system.cart import *
from lib.system.controllers import *
from lib.system.trajectory import *
from lib.system.polar import *
from lib.dds.dds import *
from lib.utils.time import *

# Check available compute devices
if torch.backends.mps.is_available():
    device = torch.device("mps")  # Apple Silicon
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

# Initialize DDS communication
dds = DDS()
dds.start()

class PolarWheelSpeedControl:
    
    def __init__(self, _wheelbase, _kp, _ki, _kd, _sat):
        self.wheelbase = _wheelbase
        self.left_pid = PID_Controller(_kp, _ki, _kd, _sat)
        self.right_pid = PID_Controller(_kp, _ki, _kd, _sat)
        
    def evaluate(self, delta_t, target_linear, target_angular, current_left, current_right):
        self.target_left = target_linear - (target_angular * self.wheelbase) / 2.0
        self.target_right = target_linear + (target_angular * self.wheelbase) / 2.0
        out_left = self.left_pid.evaluate(delta_t, self.target_left - current_left)
        out_right = self.right_pid.evaluate(delta_t, self.target_right - current_right)
        return (out_left, out_right)

# Initialize robot controllers
wheel_speed_control = PolarWheelSpeedControl(0.5,    # wheelbase
                                         1.0, 5.0, 0.0, None)  # PID parameters

# Initialize robot model - physical properties
cart2d = TwoWheelsCart2DEncodersOdometry(1.0,        # 1 kg
                                     0.3,        # 30cm radius
                                     0.9, 0.8,   # linear and angular friction
                                     0.04, 0.04, # traction wheels radius 4cm
                                     0.4,        # traction wheels timebase 40cm
                                     0.03, 0.03, # encoder wheels radius 3cm
                                     0.5,        # encoder wheels timebase 50cm
                                     4096)       # resolution 4096 tick/rev

# Position controller - parameters for linear and angular movement
polar_position = Polar2DController(8, 10.0,        # kp = 8, vmax = 10 m/s
                                 8, 6.0)         # kp = 8, wmax = 6 rad/s

Using device: mps


## Implementation of Actor-Critic RL

In [12]:
# Actor Network: Generates target positions (x,y)
class ActorNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=64):
        super(ActorNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x))  # Output in range [-1, 1]
        return x

# Critic Network: Evaluates state values
class CriticNetwork(nn.Module):
    def __init__(self, state_size, hidden_size=64):
        super(CriticNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Actor-Critic Agent Implementation
class A2CAgent:
    def __init__(self, state_size, action_size=2):
        self.state_size = state_size
        self.action_size = action_size
        
        # Algorithm parameters
        self.gamma = 0.99       # Discount factor
        self.actor_lr = 0.0001  # Actor learning rate
        self.critic_lr = 0.001  # Critic learning rate
        self.action_std_dev = 0.5  # Exploration noise standard deviation
        self.min_action_std_dev = 0.1  # Minimum exploration
        self.std_dev_decay = 0.9995  # Decay rate for exploration
        
        # Experience replay memory
        self.memory = deque(maxlen=10000)
        self.batch_size = 32
        
        # Initialize Actor and Critic networks
        self.actor = ActorNetwork(state_size, action_size).to(device)
        self.critic = CriticNetwork(state_size).to(device)
        
        # Optimizers
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=self.actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=self.critic_lr)
        
        # Training metrics
        self.episode_rewards = []
    
    def select_action(self, state):
        # Convert state to PyTorch tensor
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        
        # Get action from actor network (no gradient tracking)
        with torch.no_grad():
            action_mean = self.actor(state_tensor).cpu().numpy()[0]
        
        # Add exploration noise
        noise = np.random.normal(0, self.action_std_dev, size=self.action_size)
        action = action_mean + noise
        
        # Clip actions to valid range
        action = np.clip(action, -1, 1)
        
        return action, action_mean
    
    def remember(self, state, action, reward, next_state, done):
        # Store experience in replay memory
        self.memory.append((state, action, reward, next_state, done))
    
    def decay_exploration(self):
        # Reduce exploration over time
        self.action_std_dev = max(self.min_action_std_dev, 
                                 self.action_std_dev * self.std_dev_decay)
    
    def train(self):
        # Skip if not enough samples
        if len(self.memory) < self.batch_size:
            return
        
        # Sample batch from memory
        minibatch = random.sample(self.memory, self.batch_size)
        
        # Convert to PyTorch tensors
        states = torch.FloatTensor([experience[0] for experience in minibatch]).to(device)
        actions = torch.FloatTensor([experience[1] for experience in minibatch]).to(device)
        rewards = torch.FloatTensor([experience[2] for experience in minibatch]).to(device)
        next_states = torch.FloatTensor([experience[3] for experience in minibatch]).to(device)
        dones = torch.FloatTensor([experience[4] for experience in minibatch]).to(device)
        
        # Get current value estimates
        current_values = self.critic(states).squeeze()
        
        # Compute target values (TD targets)
        with torch.no_grad():
            next_values = self.critic(next_states).squeeze()
            target_values = rewards + self.gamma * next_values * (1 - dones)
        
        # Compute critic loss (MSE)
        critic_loss = F.mse_loss(current_values, target_values)
        
        # Update critic network
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # Compute advantages for actor update
        advantage = (target_values - current_values).detach()
        
        # Update actor network - policy gradient approach
        predicted_actions = self.actor(states)
        actor_loss = -torch.mean(
            torch.sum(
                -torch.square(actions - predicted_actions) * advantage.unsqueeze(1),
                dim=1
            )
        )
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # Reduce exploration
        self.decay_exploration()
    
    def save_models(self, actor_path='actor_model.pth', critic_path='critic_model.pth'):
        # Save model weights

        actor_path = os.path.join("models", actor_path)
        critic_path = os.path.join("models", critic_path)

        # Ensure directory exists
        os.makedirs(os.path.dirname(actor_path), exist_ok=True)       
        os.makedirs(os.path.dirname(critic_path), exist_ok=True) 

        torch.save(self.actor.state_dict(), actor_path)
        torch.save(self.critic.state_dict(), critic_path)
        print(f"Models saved to {actor_path} and {critic_path}")
    
    def load_models(self, actor_path='actor_model.pth', critic_path='critic_model.pth'):
        # Load model weights
        self.actor.load_state_dict(torch.load(actor_path))
        self.critic.load_state_dict(torch.load(critic_path))
        self.actor.eval()
        self.critic.eval()
        print(f"Models loaded from {actor_path} and {critic_path}")

# Robot Environment wrapper for RL
class RobotEnvironment:
    def __init__(self, dds, cart2d, polar_position):
        # References to existing objects
        self.dds = dds
        self.cart2d = cart2d
        self.polar_position = polar_position
        
        # Goal position
        self.goal_x = 0.0
        self.goal_y = 1.5
        
        # Environment boundaries
        self.env_min_x = -5.0
        self.env_max_x = 5.0
        self.env_min_y = -5.0
        self.env_max_y = 5.0
        
        # Current target position
        self.target_x = 0.0
        self.target_y = 0.0
        
        # Previous distance for reward calculation
        self.prev_distance = None
        
        # Timer for movement control
        self.t = Time()
    
    def get_state(self):
        # Get current robot pose
        pose = self.cart2d.get_pose()
        x, y, theta = pose[0], pose[1], pose[2]
        
        # Calculate distance to final goal
        distance = np.sqrt((x - self.goal_x)**2 + (y - self.goal_y)**2)
        
        # Calculate angle to goal relative to robot orientation
        angle_to_goal = np.arctan2(self.goal_y - y, self.goal_x - x) - theta
        angle_to_goal = np.arctan2(np.sin(angle_to_goal), np.cos(angle_to_goal))  # Normalize to [-π, π]
        
        # Calculate distance to current target
        dist_to_target = np.sqrt((x - self.target_x)**2 + (y - self.target_y)**2)
        
        # Construct state vector with normalized values
        state = np.array([
            x / self.env_max_x,            # Normalized x position
            y / self.env_max_y,            # Normalized y position
            np.cos(theta),                 # Orientation (cos)
            np.sin(theta),                 # Orientation (sin)
            distance / 10.0,               # Distance to goal (normalized)
            np.cos(angle_to_goal),         # Angle to goal (cos)
            np.sin(angle_to_goal),         # Angle to goal (sin)
            dist_to_target / 5.0           # Distance to current target (normalized)
        ])
        
        return state, pose
    
    def set_target(self, action, pose):
        # Scale actions from [-1,1] to position offsets
        x, y = pose[0], pose[1]
        
        # Convert action to target coordinates
        self.target_x = x + action[0] * 1.0  # Max movement of 1 unit
        self.target_y = y + action[1] * 1.0
        
        # Clip target coordinates to environment boundaries
        self.target_x = np.clip(self.target_x, self.env_min_x, self.env_max_x)
        self.target_y = np.clip(self.target_y, self.env_min_y, self.env_max_y)
        
        return self.target_x, self.target_y
    
    def step(self, action, duration=0.5):
        # Get current state and position
        state, pose = self.get_state()
        
        # Set target position based on action
        x_target, y_target = self.set_target(action, pose)
        
        # Calculate initial distance to goal
        distance_to_goal = np.sqrt((pose[0] - self.goal_x)**2 + (pose[1] - self.goal_y)**2)
        
        # Initialize previous distance if first step
        if self.prev_distance is None:
            self.prev_distance = distance_to_goal
        
        # Move robot towards target for 'duration' seconds
        self.t.start()
        start_time = self.t.get()
        
        while self.t.get() - start_time < duration:
            delta_t = self.t.elapsed()
            
            # Get current position
            pose = self.cart2d.get_pose()
            
            # Calculate target velocities using polar controller
            target_v, target_w = self.polar_position.evaluate(delta_t, x_target, y_target, pose)
            
            # Apply wheel control
            current_left, current_right = self.cart2d.get_wheel_speed()
            torque_left, torque_right = wheel_speed_control.evaluate(
                delta_t, target_v, target_w, current_left, current_right
            )
            
            # Update robot simulation
            self.cart2d.evaluate(delta_t, torque_left, torque_right)
            
            # Update position in DDS
            pose = self.cart2d.get_pose()
            self.dds.publish('X', pose[0], DDS.DDS_TYPE_FLOAT)
            self.dds.publish('Y', pose[1], DDS.DDS_TYPE_FLOAT)
            self.dds.publish('Theta', pose[2], DDS.DDS_TYPE_FLOAT)
            
            # Small delay for simulation
            self.t.sleep(0.005)
        
        # Get new state after movement
        next_state, new_pose = self.get_state()
        
        # Calculate new distance to goal
        new_distance = np.sqrt((new_pose[0] - self.goal_x)**2 + (new_pose[1] - self.goal_y)**2)
        
        # Calculate reward
        reward = 0
        done = False
        
        # Goal reached
        if new_distance < 0.2:
            reward = 100
            done = True
        else:
            # Reward for getting closer to goal
            progress = self.prev_distance - new_distance
            reward += progress * 20
            
            # Time penalty
            reward -= 0.1
            
            # Penalty for moving away from goal
            if progress < -0.05:
                reward -= 5
        
        # Update previous distance
        self.prev_distance = new_distance
        
        return next_state, reward, done

    def reset(self):
        # Initialize a new cart2d object instead of using reset()
        global cart2d  # Use the global variable cart2d
        
        # Recreate cart2d with the same parameters
        cart2d = TwoWheelsCart2DEncodersOdometry(1.0,        # 1 kg
                                                0.3,        # 30cm radius
                                                0.9, 0.8,   # linear and angular friction
                                                0.04, 0.04, # traction wheels radius 4cm
                                                0.4,        # traction wheels timebase 40cm
                                                0.03, 0.03, # encoder wheels radius 3cm
                                                0.5,        # encoder wheels timebase 50cm
                                                4096)       # resolution 4096 tick/rev
        
        # Update the reference in the environment
        self.cart2d = cart2d
        
        # Reset previous distance
        self.prev_distance = None
        
        # Small delay for stabilization
        time.sleep(0.1)
        
        # Return initial state
        state, _ = self.get_state()
        return state

## Training Functions

In [13]:
# Main training function
def train_rl_navigation(agent, env, episodes=300, steps_per_episode=50):
    rewards_history = []
    
    for episode in range(episodes):
        # Reset environment e ottieni lo stato iniziale
        state = env.reset()  # Aggiungi questa riga per inizializzare state
        episode_reward = 0
        
        for step in range(steps_per_episode):
            # Select action
            action, _ = agent.select_action(state)
            
            # Execute action
            next_state, reward, done = env.step(action)
            
            # Store experience
            agent.remember(state, action, reward, next_state, done)
            
            # Train agent
            agent.train()
            
            # Update state and reward
            state = next_state
            episode_reward += reward
            
            # End episode if done
            if done:
                print(f"Episode {episode+1}: Goal reached in {step+1} steps!")
                break
        
        # Store episode reward
        rewards_history.append(episode_reward)
        
        # Output information
        print(f"Episode {episode+1}/{episodes}, Reward: {episode_reward:.2f}, Steps: {step+1}")
        
        # Save model periodically
        if (episode + 1) % 50 == 0:
            agent.save_models(f"actor_ep{episode+1}.pth", f"critic_ep{episode+1}.pth")
            
            # Plot rewards
            plt.figure(figsize=(10, 5))
            plt.plot(rewards_history)
            plt.title('Reward per Episode')
            plt.xlabel('Episode')
            plt.ylabel('Total Reward')
            plt.savefig(f'rewards_ep{episode+1}.png')
            plt.close()
    
    # Save final model
    agent.save_models()
    return rewards_history

# Function to use a pre-trained agent
def use_trained_agent(env, agent, episodes=5, steps_per_episode=100):
    # Load pre-trained models
    agent.load_models()
    
    for episode in range(episodes):
        # Reset e ottieni lo stato iniziale
        state = env.reset()  # Aggiungi questa riga
        total_reward = 0
        
        for step in range(steps_per_episode):
            # Use deterministic action (no exploration)
            _, action_mean = agent.select_action(state)
            next_state, reward, done = env.step(action_mean)
            
            state = next_state
            total_reward += reward
            
            if done:
                print(f"Goal reached in {step+1} steps!")
                break
        
        print(f"Episode {episode+1}: Total reward {total_reward:.2f}")

## Main Loop

In [14]:
def main_rl_loop():
    # Initialize data plotters for visualization
    left_w = DataPlotter()
    left_w.set_x("time (seconds)")
    left_w.add_y("target_speed", "Target Left Speed")
    left_w.add_y("current_speed", "Current Left Speed")

    right_w = DataPlotter()
    right_w.set_x("time (seconds)")
    right_w.add_y("target_speed", "Target Right Speed")
    right_w.add_y("current_speed", "Current Right Speed")
    
    # Initialize reward plotter
    reward_plotter = DataPlotter()
    reward_plotter.set_x("time (seconds)")
    reward_plotter.add_y("reward", "Cumulative Reward")
    
    # Initialize RL components
    state_size = 8  # State dimension
    action_size = 2  # (x,y) target
    rl_agent = A2CAgent(state_size, action_size)
    
    # Initialize RL environment
    rl_env = RobotEnvironment(dds, cart2d, polar_position)
    
    # Performance tracking
    episode = 0
    total_reward = 0
    
    # Initialize simulation
    t = Time()
    t.start()
    pose = cart2d.get_pose()
    
    # Get initial state
    state = rl_env.get_state()[0]
    
    # Main loop
    try:
        while t.get() < 60:  # Extended time window for learning
            # RL agent selects action (new target position)
            action, _ = rl_agent.select_action(state)
            
            # Update target x,y based on agent's action
            x_target, y_target = rl_env.set_target(action, pose)
            
            # Inner loop for low-level robot control
            step_start_time = t.get()
            step_duration = 0.3  # Duration of each RL step
            
            while t.get() - step_start_time < step_duration:
                t.sleep(0.005)
                delta_t = t.elapsed()
                
                # Get current position
                pose = cart2d.get_pose()
                
                # Calculate target velocities using polar controller
                target_v, target_w = polar_position.evaluate(delta_t, x_target, y_target, pose)
                
                # Apply wheel control
                current_left, current_right = cart2d.get_wheel_speed()
                torque_left, torque_right = wheel_speed_control.evaluate(delta_t,
                                                                       target_v, target_w,
                                                                       current_left, current_right)
                
                # Update robot simulation
                cart2d.evaluate(delta_t, torque_left, torque_right)
                
                # Get new position
                pose = cart2d.get_pose()
                
                # Publish to DDS
                dds.publish('X', pose[0], DDS.DDS_TYPE_FLOAT)
                dds.publish('Y', pose[1], DDS.DDS_TYPE_FLOAT)
                dds.publish('Theta', pose[2], DDS.DDS_TYPE_FLOAT)
                
                # Update plots
                left_w.append_x(t.get())
                left_w.append_y("current_speed", current_left)
                left_w.append_y("target_speed", wheel_speed_control.target_left)
                
                right_w.append_x(t.get())
                right_w.append_y("current_speed", current_right)
                right_w.append_y("target_speed", wheel_speed_control.target_right)
            
            # Get new state and calculate reward
            next_state, reward, done = rl_env.step(action, 0)  # 0 duration because movement already happened
            
            # Store experience for learning
            rl_agent.remember(state, action, reward, next_state, done)
            
            # Train agent
            rl_agent.train()
            
            # Update state
            state = next_state
            
            # Accumulate reward
            total_reward += reward
            
            # Update reward plot
            reward_plotter.append_x(t.get())
            reward_plotter.append_y("reward", total_reward)
            
            # Check if episode ended
            if done or t.get() >= 60:
                # Print episode information
                print(f"Episode {episode} completed. Total reward: {total_reward}")
                
                # Periodically save model
                if episode % 10 == 0:
                    rl_agent.save_models(f"actor_ep{episode}.pth", f"critic_ep{episode}.pth")
                
                # Reset for next episode
                total_reward = 0
                episode += 1
    
    except KeyboardInterrupt:
        print("Manual interruption. Saving model and data...")
        rl_agent.save_models()
        
    finally:
        # Clean up DDS connection
        dds.stop()
        
        # Display plots
        left_w.plot()
        right_w.plot()
        reward_plotter.plot()
        
        # Save final model
        rl_agent.save_models("actor_final.pth", "critic_final.pth")

In [15]:
# Choose execution mode
mode = "main"  # options: "main", "train", "use_trained"
mode = "train"

if mode == "main":
    main_rl_loop()
elif mode == "train":
    # Initialize agent and environment
    state_size = 8
    action_size = 2
    agent = A2CAgent(state_size, action_size)
    env = RobotEnvironment(dds, cart2d, polar_position)
    
    # Train agent
    rewards = train_rl_navigation(agent, env, episodes=300)
    
    # Plot final rewards
    plt.figure(figsize=(10, 5))
    plt.plot(rewards)
    plt.title('Reward per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.savefig('final_rewards.png')
    plt.show()
elif mode == "use_trained":
    # Initialize agent and environment
    state_size = 8
    action_size = 2
    agent = A2CAgent(state_size, action_size)
    env = RobotEnvironment(dds, cart2d, polar_position)
    
    # Use trained agent
    use_trained_agent(env, agent, episodes=5)

# Cleanup resources when done
dds.stop()
print("DDS connection closed")

  states = torch.FloatTensor([experience[0] for experience in minibatch]).to(device)


Episode 1/300, Reward: -217.70, Steps: 50
Episode 2/300, Reward: -181.98, Steps: 50
Episode 3/300, Reward: -164.99, Steps: 50
Episode 4/300, Reward: -195.20, Steps: 50
Episode 5/300, Reward: -196.15, Steps: 50
Episode 6/300, Reward: -156.32, Steps: 50
Episode 7/300, Reward: -160.11, Steps: 50
Episode 8/300, Reward: -210.74, Steps: 50
Episode 9/300, Reward: -172.60, Steps: 50
Episode 10/300, Reward: -189.07, Steps: 50
Episode 11/300, Reward: -206.15, Steps: 50


ZeroDivisionError: float division by zero