In [14]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame

In [15]:
class RoboticArmEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self):
        super(RoboticArmEnv, self).__init__()

        self.steps_done = 0
        self.max_steps = 200
        self.action_space = spaces.Box(low=-np.pi, high=np.pi, shape=(2,), dtype=np.float32)
        self.observation_space = spaces.Box(low=-np.pi, high=np.pi, shape=(4,), dtype=np.float32)
        
        self.angle1 = 0.0  
        self.angle2 = 0.0  
        self.length1 = 1.0  #main arm
        self.length2 = 0.5  #upper arm
        self.prev_angle1 = self.angle1
        self.prev_angle2 = self.angle2
        self.end_effector_x = self.length1 * np.cos(self.angle1) + self.length2 * np.cos(self.angle1 + self.angle2)
        self.end_effector_y = self.length1 * np.sin(self.angle1) + self.length2 * np.sin(self.angle1 + self.angle2)
        
        self.last_action = np.array([0, 0]) 

        
        self.reward_history = [] 
        self.smoothing_window_size = 20 
        self.state = np.array([self.angle1, self.angle2, self.end_effector_x, self.end_effector_y], dtype=np.float32)

    def reset(self, seed=None, **kwargs):
        if seed is not None:
            self.np_random, _ = gym.utils.seeding.np_random(seed) 

        self.angle1 = np.random.uniform(0.5 * np.pi, 1.5 * np.pi)
        self.angle2 = np.random.uniform(0.5 * np.pi, 1.5 * np.pi) 

        self.angle1 = np.clip(self.angle1, 0, np.pi)
        self.angle2 = np.clip(self.angle2, 0, np.pi)

        self.prev_angle1 = self.angle1
        self.prev_angle2 = self.angle2
        self.last_action = np.array([0, 0])

        self.end_effector_x = self.length1 * np.cos(self.angle1) + self.length2 * np.cos(self.angle1 + self.angle2)
        self.end_effector_y = self.length1 * np.sin(self.angle1) + self.length2 * np.sin(self.angle1 + self.angle2)

        self.steps_done = 0 

        self.state = np.array([self.angle1, self.angle2, self.end_effector_x, self.end_effector_y], dtype=np.float32)
        
        return self.state, {} 


    
    def reward_function(self):
        angle1_deviation = abs(self.angle1 - np.pi / 2)
        combined_deviation = abs((self.angle1 + self.angle2) - np.pi / 2)
        
        angle1_penalty = -20 * (angle1_deviation ** 2)
        combined_penalty = -15 * (combined_deviation ** 2)
        stability_penalty = -5 * (abs(self.angle1 - self.prev_angle1) + abs(self.angle2 - self.prev_angle2))

        steady_state_bonus = 1.5 if angle1_deviation < 0.01 and combined_deviation < 0.01 and self.steps_done > 50 else 0

        action_penalty = -0.1 * (abs(self.last_action[0]) + abs(self.last_action[1]))

        time_penalty = -1.0 * (angle1_deviation + combined_deviation) if self.steps_done > 50 else 0
        hold_bonus = 1.0 if angle1_deviation < 0.02 and combined_deviation < 0.02 else 0
        
        total_reward = angle1_penalty + combined_penalty + stability_penalty + action_penalty + time_penalty + hold_bonus+steady_state_bonus
        
        scaled_reward = total_reward / (np.pi ** 2)
    
        self.reward_history.append(scaled_reward)
        smoothed_reward = self.smooth_reward(self.reward_history, self.smoothing_window_size)
        
        return smoothed_reward
    




    def smooth_reward(self, reward_history, window_size=10):
        if len(reward_history) < window_size:
            return np.mean(reward_history)

        return np.mean(reward_history[-window_size:])
    
    

    def step(self, action):
        action = np.clip(action, -0.05, 0.05)
        
        self.angle1 += action[0]
        self.angle2 += action[1]

        self.angle1 = np.clip(self.angle1, 0, np.pi)
        self.angle2 = np.clip(self.angle2, 0, np.pi)

        self.end_effector_x = self.length1 * np.cos(self.angle1) + self.length2 * np.cos(self.angle1 + self.angle2)
        self.end_effector_y = self.length1 * np.sin(self.angle1) + self.length2 * np.sin(self.angle1 + self.angle2)

        self.state = np.array([self.angle1, self.angle2, self.end_effector_x, self.end_effector_y], dtype=np.float32)
        
        reward = self.reward_function()

        self.steps_done +=1

        terminated = False
        truncated = self.steps_done >= self.max_steps
        
        self.prev_angle1 = self.angle1
        self.prev_angle2 = self.angle2

        self.last_action = action

        info = {}

        return self.state, reward, terminated, truncated, info
        

    def render(self, mode='human'):

        if mode != 'human':
            raise NotImplementedError("Only human mode is supported.")
            
        """Renders the environment using Pygame."""
        # Initialize Pygame if not already done
        if not hasattr(self, 'screen'):
            pygame.init()
            self.screen = pygame.display.set_mode((600, 600))
            pygame.display.set_caption('Robotic Arm Simulation')
            self.clock = pygame.time.Clock()

        # Clear the screen
        self.screen.fill((255, 255, 255))  # White background

        # Define the center of the base
        base_x, base_y = 300, 300  # Center of the window

        # Calculate joint positions
        joint1_x = base_x + int(self.length1 * 100 * np.cos(self.angle1))
        joint1_y = base_y - int(self.length1 * 100 * np.sin(self.angle1))

        joint2_x = joint1_x + int(self.length2 * 100 * np.cos(self.angle1 + self.angle2))
        joint2_y = joint1_y - int(self.length2 * 100 * np.sin(self.angle1 + self.angle2))

        # Draw the base
        pygame.draw.circle(self.screen, (0, 0, 0), (base_x, base_y), 5)  # Black circle for the base

        # Draw the first segment
        pygame.draw.line(self.screen, (0, 0, 255), (base_x, base_y), (joint1_x, joint1_y), 5)  # Blue line

        # Draw the second segment
        pygame.draw.line(self.screen, (255, 0, 0), (joint1_x, joint1_y), (joint2_x, joint2_y), 5)  # Red line

        # Draw the end-effector
        pygame.draw.circle(self.screen, (0, 255, 0), (joint2_x, joint2_y), 5)  # Green circle for the end-effector

        # Update the display
        pygame.display.flip()

        # Limit the frame rate
        self.clock.tick(30)

    def close(self):
        """Closes the Pygame window and releases resources."""
        if hasattr(self, 'screen'):
            pygame.quit()
            del self.screen



In [16]:
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3.common.monitor import Monitor
import pandas as pd

import gymnasium as gym
from gymnasium.wrappers import TimeLimit
from stable_baselines3 import SAC

In [17]:
env = RoboticArmEnv()
timed_env = TimeLimit(env, max_episode_steps=200)

In [18]:
from stable_baselines3 import SAC
from stable_baselines3.common.noise import NormalActionNoise
import numpy as np

model = SAC(
    "MlpPolicy",
    timed_env,
    verbose=1,
    learning_rate=0.00005,
    batch_size=128, 
    train_freq=2, 
    gradient_steps=5,
    gamma=0.95, 
    tau=0.005,
    ent_coef='auto_0.01'               
)

model.learn(total_timesteps=13000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 200       |
|    ep_rew_mean     | -1.68e+03 |
| time/              |           |
|    episodes        | 4         |
|    fps             | 32        |
|    time_elapsed    | 24        |
|    total_timesteps | 800       |
| train/             |           |
|    actor_loss      | 46.6      |
|    critic_loss     | 7.79      |
|    ent_coef        | 0.0105    |
|    ent_coef_loss   | 9.64      |
|    learning_rate   | 5e-05     |
|    n_updates       | 1745      |
----------------------------------
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 200       |
|    ep_rew_mean     | -1.16e+03 |
| time/              |           |
|    episodes        | 8         |
|    fps             | 30        |
|    time_elapsed    | 53        |
|    total_timesteps | 1600    

<stable_baselines3.sac.sac.SAC at 0x7d0393dd7b90>

In [19]:
model.save("rob_arm_SAC18")

In [20]:
del model

In [21]:
model = SAC.load("rob_arm_SAC18", env=timed_env)

for episode in range(5):
    obs, _ = timed_env.reset()
    score = 0
    offset =0.3
    done = False
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = timed_env.step(action)
        timed_env.render()
        reward+=offset
        score += reward
        if terminated or truncated:
            print(f"Episode {episode + 1}: Score = {score:.2f}")
            break

timed_env.close()

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Episode 1: Score = -101.61
Episode 2: Score = -514.86
Episode 3: Score = -206.86
Episode 4: Score = -387.78
Episode 5: Score = -288.39
