<a href="https://colab.research.google.com/github/oussemajelassi/embedded_ReinforcementLearning/blob/main/RL_RobotOrientation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# This project aims to perform robot orientation using RL

In [None]:
# Install required modules
!python -m pip install gymnasium==0.28.1
!python -m pip install stable-baselines3[extra]==2.1.0
!python -m pip install pyserial




[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Import Modules
import gymnasium as gym
import stable_baselines3 as sb3
from gymnasium import spaces


import numpy as np
import pygame
import time
import json

import serial
import serial.tools.list_ports
import time

In [None]:
# Vairbales and Constants :

RL_DECREASE_PWM_VALUE = 0
RL_PWM_VALUE_CONSTANT = 1
RL_INCREASE_PWM_VALUE = 2
RL_PWM_STOP_VALUE = 3

ROBOT_ACTIONS_MAP = {
    RL_DECREASE_PWM_VALUE: -100,
    RL_PWM_VALUE_CONSTANT : 10,
    RL_INCREASE_PWM_VALUE: 100,
    RL_PWM_STOP_VALUE: 0
                  }



STM_ResponseTimeout                     = 30
RL_TargetedOrientation                  = 90
RL_OrientationNorm                      = 180

RL_ROBOT_CRASH_REWARD                   = -200
RL_ROBOT_ORIENTATION_DIVERGENCE         = -500
RL_ROBOT_ORIENTED_REWARD                = 50

RL_THETA_REWARD_CONSTANT                = 1
RL_VELOCITY_REWARD_CONSTANT             = 0.1
RL_VELOCITY_DIFFERENCE_REWARD_CONSTANT  = 0.1
RL_ANGULAR_VELOCITY_REWARD_CONSTANT     = 0.25

RL_DESIRED_ANGLE_RANGE                  = [ RL_TargetedOrientation - 10 , RL_TargetedOrientation + 10 ]
RL_MINIMAL_ORIENTATION_ANGLE            = - 180
RL_MAXIMAL_ORIENTATION_ANGLE            =   180

RL_MAXIMUM_VELOCITY                     = 80
RL_LOW_VELOCITY                         = 10

# Test Block for Json parsing :


In [None]:
msg = "{ \"message\": \"Hello, world!\", \"sender\": \"John Doe\", \"timestamp\": \"2024-01-22T12:34:56\" }"
obs = json.loads(msg)
print (obs['message'])

Hello, world!


## Test for serial communcation  



In [None]:
serial_list = []
for port, desc, hwid in serial.tools.list_ports.comports():
  serial_attrs = (port, desc, hwid)
  serial_list.append(serial_attrs)
  print(serial_list)

[('COM13', 'STMicroelectronics STLink Virtual COM Port (COM13)', 'USB VID:PID=0483:374E SER=004900283137511339383538 LOCATION=1-2:x.2')]


## Low Level communication Class

In [None]:
class LowLevelCommunication :
  def __init__(self , Port , Baudrate) -> bool:
    self.STM32 = serial.Serial(dsrdtr=False)
    self.STM32.port = Port
    self.STM32.baudrate = Baudrate
    self.STM32.rts = False
    self.STM32.dtr = False
    self.STM32.close()
    time.sleep(1)
    self.STM32.open()
    print("Open")

  def sendCommand (self , Command) :
    cmd = f"{Command}"
    self.STM32.write(bytes(cmd,encoding='utf-8'))
  def waitResponse (self , Timeout) :
    timestamp = time.time()
    print(timestamp)
    while (self.STM32.in_waiting == 0) :
      time.sleep(1)
      if ( (time.time() - timestamp) > Timeout ) :

        print ("No return from Low Level")
        self.EndCommunication()
        return _ , False
    response = self.STM32.read_until('\n'.encode('utf-8'))
    self.STM32
    response =response.decode('utf-8')
    observation = json.loads(response)
    return observation , True
  def EndCommunication (self) :
    self.STM32.close()


In [None]:
class RobotOrientation (gym.Env) :
  def __init__(self , Port , Baudrate , WheelRadius , WheelBase) -> None:
    super().__init__()
    self.Port = Port
    self.Baudrate = Baudrate
    self.WheelRadius = WheelRadius
    self.WheelBase = WheelBase
    self.STM32 = LowLevelCommunication ( self.Port , self.Baudrate )
    self.StartTime = time.time()

# Test for raw data communication :
# Robot = RobotOrientation("COM13" , 115200 , 10 , 15)
# ===> Working Fine with Local runtime connected.

    self.action_space = gym.spaces.Discrete( len(ROBOT_ACTIONS_MAP) )
    # Observations are CurrentRobotAngle, CurrentRightWheelVelocity , CurrentLeftWheelvelocity
    self.observation_space = gym.spaces.Box(
            low=np.array([-180, -np.inf , -np.inf]),
            high=np.array([180,  np.inf ,  np.inf]),
            dtype=np.float32
        )

## Attributes :

    self.CurrentRightWheelVelocity  = 0
    self.CurrentLeftWheelVelocity   = 0
    self.CurrentAngle               = 0
    self.CurrentAngularVelocity     = 0.0
    self.CurrentVelocityDifference  = 0.0
    self.Timestamp                  = 0

  def __del__ (self) :
    self.STM32.EndCommunication()

  def reset (self) :
    self.STM32.sendCommand(RL_PWM_STOP_VALUE)
    self.CurrentAngle               = 0
    self.CurrentAngularVelocity     = 0
    self.CurrentLeftWheelVelocity   = 0
    self.CurrentRightWheelVelocity  = 0

  def step (self , action) :
    terminated = False
    truncated = False
    reward = 0
    info = {"dtime" : 0 , " elapsedTime" : 0 }
    self.STM32.sendCommand (action)
    observations , ret = self.STM32.waitResponse(STM_ResponseTimeout)
    print(observations)
    if ( ret == True ) :

      ElapsedTime , terminated , self.CurrentAngle , self.CurrentRightWheelVelocity ,self.CurrentLeftWheelVelocity = (observations['STM32TimeStamp'] , observations ['Terminated'] , observations ['CurrentRobotAngle'] , observations ['CurrentRightWheelVelocity'] , observations ['CurrentLeftWheelVelocity'] )
      info["dtime"] = ElapsedTime - self.Timestamp
      self.Timestamp = ElapsedTime
      self.CurrentVelocityDifference = abs ( self.CurrentRightWheelVelocity - self.CurrentLeftWheelVelocity )
      self.CurrentAngularVelocity = ( self.CurrentRightWheelVelocity - self.CurrentLeftWheelVelocity ) / self.WheelBase

      obs [0] = self.CurrentAngle
      obs [1] = self.CurrentRightWheelVelocity
      obs [2] = self.CurrentLeftWheelVelocity
      obs [3] = self.CurrentAngularVelocity
      obs [4] = self.CurrentVelocityDifference

      if (self.CurrentAngle > RL_MINIMAL_ORIENTATION_ANGLE ) or ( self.CurrentAngle < RL_MAXIMAL_ORIENTATION_ANGLE ) :
        print("dkhalt lenna")
        reward += -1 * (  RL_THETA_REWARD_CONSTANT            * pow( ( obs [0] - RL_TargetedOrientation ) , 2 )
                        + RL_VELOCITY_REWARD_CONSTANT         * pow( ( obs [1] ) , 2 )
                        + RL_VELOCITY_REWARD_CONSTANT         * pow( ( obs [2] ) , 2 )
                        + RL_ANGULAR_VELOCITY_REWARD_CONSTANT * pow( ( obs [3] ) , 2 ) )

        if ( abs(self.CurrentAngularVelocity) > RL_MAXIMUM_VELOCITY ) :
          print("dkhalt lenna 1 ")
          terminated = True
          self.STM32.sendCommand(RL_PWM_STOP_VALUE)
          reward += RL_ROBOT_CRASH_REWARD
          print("Robot Reached maximum Speed")

        elif (self.CurrentAngle in RL_DESIRED_ANGLE_RANGE ) and (self.CurrentAngularVelocity < RL_LOW_VELOCITY ):
          print("dkhalt lenna  2")
          terminated = True
          self.STM32.sendCommand(RL_PWM_STOP_VALUE)
          reward += RL_ROBOT_ORIENTED_REWARD
          print(" Robot in Desired Orientation ")

      else :
        print("dkhalt lenna 3 ")
        terminated = True
        reward = RL_ROBOT_ORIENTATION_DIVERGENCE
        self.STM32.sendCommand(RL_PWM_STOP_VALUE)
        print(" Robot orientation Diverged ")
      print("Elapsed Time : ",time.time() - self.StartTime )
      print ( "terminated :",terminated)



# Testing The data exchange :
Robot = RobotOrientation("COM13" , 115200 , 10 , 15)
Robot.step(10)
Robot.__del__()

Open
1708019195.8852253
{'STM32TimeStamp': 1000, 'Terminated': 0, 'CurrentRobotAngle': 0.0, 'CurrentRightWheelVelocity': 200.0, 'CurrentLeftWheelVelocity': 100.0}
dkhalt lenna
Elapsed Time :  1.0112566947937012
terminated : 0
