In [None]:
#!git clone git@github.com:micheltokic/crawlingrobot.git
#!pip install -e crawlingrobot/

In [3]:
import sys
import numpy as np
import pygame
import os
#os.environ['SDL_VIDEODRIVER']='dummy'
import gymnasium as gym
import gym_crawlingrobot
import pickle

# 1) Manual

In [4]:
def manual_robot_control (env):
    
    done = False
    action = None
    obs, _ = env.reset()
    print(f"initial state: {obs}")
    cum_reward = 0
    step = 0

    while True:
        # process pygame event loop
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                return
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_ESCAPE:
                    pygame.quit()
                    return
                elif event.key == pygame.K_UP or event.key == pygame.K_w:
                    env.step(0)
                    action = 0
                elif event.key == pygame.K_RIGHT or event.key == pygame.K_d:
                    action = 1
                elif event.key == pygame.K_DOWN or event.key == pygame.K_s:
                    action = 2
                elif event.key == pygame.K_LEFT or event.key == pygame.K_a:
                    action = 3
                elif event.key == pygame.K_r:
                    env.reset()
                    action = 3
                elif event.key == pygame.K_SPACE:
                    env.robot.render_intermediate_steps = not env.robot.render_intermediate_steps

                if action:
                    obs, reward, terminated, truncated, info = env.step(action)
                    done = terminated or truncated
                    cum_reward += reward
                    print (f"step={step}, obs={obs}, action={action}, reward={reward:.2f}, cum_reward={cum_reward:.2f}, done={done}")

                    action = None 
                    step += 1
                if done:
                    env.reset()
                    action = 3
                    cum_reward = 0
                    step = 0

            env.render()


In [5]:
pygame.quit() # close any already opened simulation windows

env = gym.make('crawlingrobot-discrete-v1', rotation_angles=5, goal_distance=700, window_size=(640, 480), render_intermediate_steps=True, plot_steps_per_episode=True)
env.robot.mode = 2 # => Use WASD or Arrow Keys to control the robot's arms
manual_robot_control (env)

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


initial state: [0 0]


  logger.warn(


step=0, obs=[1 0], action=2, reward=0.99, cum_reward=0.99, done=False
step=1, obs=[2 0], action=2, reward=0.01, cum_reward=1.00, done=False
step=2, obs=[3 0], action=2, reward=7.24, cum_reward=8.24, done=False
step=3, obs=[4 0], action=2, reward=13.38, cum_reward=21.63, done=False
step=4, obs=[4 1], action=3, reward=22.67, cum_reward=44.29, done=False
step=5, obs=[4 2], action=3, reward=23.59, cum_reward=67.89, done=False
step=6, obs=[2 1], action=1, reward=-0.01, cum_reward=67.88, done=False
step=7, obs=[2 0], action=1, reward=0.01, cum_reward=67.89, done=False
step=8, obs=[2 0], action=1, reward=0.00, cum_reward=67.89, done=False
step=9, obs=[3 0], action=2, reward=6.79, cum_reward=74.69, done=False
step=10, obs=[4 0], action=2, reward=13.69, cum_reward=88.38, done=False
step=11, obs=[4 1], action=3, reward=22.72, cum_reward=111.09, done=False
step=12, obs=[4 2], action=3, reward=23.22, cum_reward=134.31, done=False


# 2) Q-Learning with discrete actions

In [6]:
# function maps the 2D observation (x, y) to a single state number n 
def obs_to_number(obs, obs_max):
    return int(obs[0] * obs_max + obs[1])

def q_agent(Q, obs_max, env, learn=True, render=False, alpha=1, gamma=0.95, epsilon=0.2, maxSteps=10000, episodes=200):
    
    print (f"Q.shape={Q.shape}")
    np.set_printoptions(threshold=sys.maxsize)

    for episode in range (episodes):
        done = False
        init_obs, _ = env.reset()
        init_obs = init_obs.tolist()
        state = obs_to_number(init_obs, obs_max)
        step = 0
        cum_reward =0 

        while not done and step < maxSteps:

            # action selection
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[state])

            # perform action in environment
            nextObs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            nextObs = nextObs.tolist()
            nextState = obs_to_number(nextObs, obs_max)
            cum_reward += reward

            # environment rendering
            if render:
                env.render()

                # process pygame event loop
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        pygame.quit()
                        return
                    elif event.type == pygame.KEYDOWN:
                        if event.key == pygame.K_ESCAPE:
                            pygame.quit()
                            return
                        if event.key == pygame.K_SPACE:
                            env.robot.render_intermediate_steps = not env.robot.render_intermediate_steps

            # Q-learning
            if learn:
                Q[state, action] += alpha * (reward + gamma * np.max(Q[nextState]) - Q[state, action])

            # time transition
            state = nextState
            step += 1
            
        res = 0
        if len(env.robot.episode_time_results) > 0:
            res = env.robot.episode_time_results[-1]
        print(f"episode={episode} took {step} steps => cumulative reward: {cum_reward:.2f}")
        
    pygame.quit()
    return

### 2.1) Learn Q function (no GUI)

In [7]:
pygame.quit() # close any already opened simulation windows

# instantiate environment
env = gym.make('crawlingrobot-discrete-v1', rotation_angles=5, goal_distance=700)

# 2.1) Initialize Q function
obs_max = env.observation_space.high[0] + 1  # currently 5
Q = np.zeros([obs_max ** len(env.observation_space.high), env.action_space.n])
q_filename = "Qfunction.pkl"

# learn Q function
q_agent(Q=Q, obs_max=obs_max, env=env, gamma=0.9, epsilon=0.1, episodes=10, render=False, learn=True)

# write learned Q function to disc
pickle.dump( Q, open( q_filename, "wb" ) )
print ("Wrote Q function to file: ", q_filename)

Q.shape=(25, 4)
episode=0 took 560 steps => cumulative reward: 512.63
episode=1 took 168 steps => cumulative reward: 515.29
episode=2 took 97 steps => cumulative reward: 529.99
episode=3 took 116 steps => cumulative reward: 514.23
episode=4 took 96 steps => cumulative reward: 512.84
episode=5 took 99 steps => cumulative reward: 518.24
episode=6 took 107 steps => cumulative reward: 529.97
episode=7 took 103 steps => cumulative reward: 517.51
episode=8 took 105 steps => cumulative reward: 530.90
episode=9 took 92 steps => cumulative reward: 527.40
Wrote Q function to file:  Qfunction.pkl


### 2.2) Evaluate policy derived from Q function (with GUI)

In [8]:
pygame.quit() # close any already opened simulation windows

# load Q function
print ("Loading Q function from file: ", q_filename)
Q = pickle.load( open(q_filename, "rb" ) )

# evalue Q function
env = gym.make('crawlingrobot-discrete-v1', rotation_angles=5, goal_distance=700, window_size=(640, 480), plot_steps_per_episode=True)
q_agent(Q=Q, obs_max=obs_max, env=env, episodes=20, epsilon=0.1, render=True, learn=False)

Loading Q function from file:  Qfunction.pkl
Q.shape=(25, 4)
episode=0 took 114 steps => cumulative reward: 521.63
episode=1 took 112 steps => cumulative reward: 525.30


# 3) PPO control with continuous actions

In [9]:
import pygame
import sys
import os
import numpy as np
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.monitor import Monitor
from stable_baselines3 import PPO
import gymnasium as gym
import gym_crawlingrobot

In [10]:
# define callback class for event loop cleanup
from stable_baselines3.common.callbacks import BaseCallback

class PyGameEventLoopCallback(BaseCallback):
    
    render = False
    training_env = None
    
    def __init__(self, verbose=0, render=False):
        super(PyGameEventLoopCallback, self).__init__(verbose)
        self.render = render
        # Those variables will be accessible in the callback
        # (they are defined in the base class)
        # The RL model
        # self.model = None  # type: BaseRLModel
        # An alias for self.model.get_env(), the environment used for training
        # self.training_env = None  # type: Union[gym.Env, VecEnv, None]
        # Number of time the callback was called
        # self.n_calls = 0  # type: int
        # self.num_timesteps = 0  # type: int
        # local and global variables
        # self.locals = None  # type: Dict[str, Any]
        # self.globals = None  # type: Dict[str, Any]
        # The logger object, used to report things in the terminal
        # self.logger = None  # type: logger.Logger
        # # Sometimes, for event callback, it is useful
        # # to have access to the parent object
        # self.parent = None  # type: Optional[BaseCallback]
    
    def _on_step(self) -> bool:
        
        robot_env = self.training_env.venv.envs[0]
        
        # process pygame event loop
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                return False
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_ESCAPE:
                    pygame.quit()
                    return False
                if event.key == pygame.K_SPACE:
                    robot_env.robot.render_intermediate_steps = not robot_env.robot.render_intermediate_steps

        if self.render:
            robot_env.render()
        
        return True

In [11]:
log_dir = ""
ppo = "ppo"
os.makedirs(ppo, exist_ok=True)

def ppo_learn(env, render=False, total_timesteps=20000):
    env = VecNormalize(DummyVecEnv([lambda: Monitor(env, log_dir)]), norm_obs=True, norm_reward=True)
    model = PPO(env=env, policy="MlpPolicy", verbose=1)

    cb = PyGameEventLoopCallback(render=render)
    cb.training_env = env

    model.learn(total_timesteps=total_timesteps, callback=cb)
    model.save("ppo/ppo_crawling_robot")
    env.save("ppo/vec_normalize.pkl")
 
    del model, env


def ppo_run_policy(env, render=False, episodes=1, deterministic=True):
    env = DummyVecEnv([lambda: Monitor(env, log_dir)])
    env = VecNormalize.load("ppo/vec_normalize.pkl", env)
    env.training = False

    model = PPO.load("ppo/ppo_crawling_robot")

    # visualization callback
    cb = PyGameEventLoopCallback(render=render)
    cb.training_env = env
    
    for e in range (episodes): 

        obs = env.reset()
        done = False
        cum_reward = 0
        step = 0

        while not done:
            action, _states = model.predict(obs, deterministic=deterministic)
            #obs, _reward, terminated, truncated, info = env.step(action)
            #done = truncated or terminated
            obs, _reward, done, info = env.step(action)
            reward = env.get_original_reward() # returns the last unnormalized reward
            if not cb._on_step(): 
                return
            
            cum_reward += reward[0]
            step += 1
            print (f"episode={e}, step={step}, action={action}, reward={reward[0]:.2f}, cum_reward={cum_reward:.2f}, done={done}")


### Train policy for 20000 timesteps (no GUI)

In [12]:
pygame.quit() # close any already opened simulation windows

robot_env = gym.make('crawlingrobot-continuous-v1', goal_distance=2500)
ppo_learn(env=robot_env, render=False, total_timesteps=20000)


Using cpu device


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


-----------------------------
| time/              |      |
|    fps             | 421  |
|    iterations      | 1    |
|    time_elapsed    | 4    |
|    total_timesteps | 2048 |
-----------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 4.04e+03    |
|    ep_rew_mean          | 2.38e+03    |
| time/                   |             |
|    fps                  | 382         |
|    iterations           | 2           |
|    time_elapsed         | 10          |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.008621333 |
|    clip_fraction        | 0.108       |
|    clip_range           | 0.2         |
|    entropy_loss         | -2.83       |
|    explained_variance   | -0.379      |
|    learning_rate        | 0.0003      |
|    loss                 | 0.0608      |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.013

### Evaluate policy (with GUI)

In [13]:
pygame.quit() # close any already opened simulation windows

#robot_env_nogui = gym.make('crawlingrobot-continuous-v1', goal_distance=2500, plot_steps_per_episode=False, render_intermediate_steps=False)
robot_env_gui = gym.make('crawlingrobot-continuous-v1', goal_distance=700, window_size=(640, 480), plot_steps_per_episode=True, render_intermediate_steps=True)
ppo_run_policy(env=robot_env_gui, episodes=1, render=True, deterministic=True)


  logger.warn(
  logger.warn(


episode=0, step=1, action=[[ 0.9224521  -0.80447966]], reward=18.49, cum_reward=18.49, done=[False]
episode=0, step=2, action=[[0.90136594 0.9827521 ]], reward=63.71, cum_reward=82.20, done=[False]
episode=0, step=3, action=[[-0.8699516  0.5694554]], reward=0.00, cum_reward=82.20, done=[False]
episode=0, step=4, action=[[-0.0223683 -1.       ]], reward=0.00, cum_reward=82.20, done=[False]
episode=0, step=5, action=[[ 0.94116026 -0.76198816]], reward=18.61, cum_reward=100.81, done=[False]
episode=0, step=6, action=[[0.8892091 1.       ]], reward=62.75, cum_reward=163.57, done=[False]
episode=0, step=7, action=[[-0.9040554  0.5372033]], reward=0.01, cum_reward=163.57, done=[False]
episode=0, step=8, action=[[ 0.03220048 -1.        ]], reward=-0.00, cum_reward=163.57, done=[False]
episode=0, step=9, action=[[ 0.95794904 -0.723708  ]], reward=19.38, cum_reward=182.95, done=[False]
episode=0, step=10, action=[[0.8820782 1.       ]], reward=60.47, cum_reward=243.42, done=[False]
episode=0, s