Import di base

In [1]:
from gym import Env
from gym.spaces import Discrete, Box, MultiDiscrete
import copy
from IPython.display import clear_output
import time
import numpy as np
import random
import socket
import math
from scipy.spatial.transform import Rotation as R
import struct

In [11]:
# definizioni variabili globali ===============================================
HOST = "127.0.0.1"
PORT = 7001
PACKET_SIZE = 64
#target
TargetDistanceForReward = 3

# definizioni funzioni =======================================================
#creo socket e lo faccio partire
def CreaSocket():
    # creo socket server per UE5
    MySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    #Bindo il socket 
    MySocket.bind((HOST,PORT))

    #accendo il socket in ascolto
    MySocket.listen()

    return MySocket

#ciclo di attesa connesione
def AttesaConnesione(MySocket):
    while True:
        RemoteConnection , RemoteAddress = MySocket.accept()
        MySocket.close()
        break
    return RemoteConnection,RemoteAddress

#funzione per sampolare il robot di unreal
def MandaRicevi_Robot(MySocket, R_Angles = [0.0]*5 ):
    """
    Args:
        Socket: socket verso UE5
        R_XXX: angoli da applicare ai bracci del robot, in rad
    Return:
        Target_Coor: X Y Z
        EE_Coor: X Y Z
        EE_Rotator: X Y Z W
    - Converte angoli da rad a gradi
    - crea il messaggio binario
    - manda e riceve
    - scompatta il messaggio binario di ritorno in un buffer
    - separa il buffer negli oggetti ricevuti
    """
    Angoli = copy.deepcopy(R_Angles) # questo perchè python è cattivo e le liste le porta in giro come ref
    
    #converto angoli in gradi
    for i in range(5):
        Angoli[i] = math.degrees(Angoli[i] * math.pi *2)

    # preparo messaggio binario
    Messaggio =    (struct.pack("f",Angoli[0])+
                    struct.pack("f",Angoli[1])+
                    struct.pack("f",Angoli[2])+
                    struct.pack("f",Angoli[3])+
                    struct.pack("f",Angoli[4])+
                    struct.pack("f",0.0)+
                    struct.pack("f",0.0) # questo è un "flag" di comandi. 0: comando angoli, 1: comando reset target
                    )

    # mando e ricevo
    MySocket.sendall(Messaggio)
    ReceivedData = MySocket.recv(PACKET_SIZE)

    ParsedData = [] # lista dei valori float ricevuti parsati
    for i in range(0,44,4):
        ParsedData.append( struct.unpack("f",ReceivedData[i:i+4])[0])
    
    # arrivati: Target Coordinate, EE Coordinate, EE Rotator Quaternion
    Target_Coor = ParsedData[0:3]
    EE_Coor     = ParsedData[3:6]
    EE_Rotator  = ParsedData[6:10]
    CollisionDetect = ParsedData[10]

    return Target_Coor, EE_Coor, EE_Rotator, CollisionDetect

# classe environment di gioco =============================================

class RobotEnv(Env):
    def __init__(self):
        # azioni che possiamo fare: 5 angoli normalizzati
        self.action_space = Box(low=-1.0,high=1.0,shape=(5,))
        self.last_action = [0.0] * 5

        # Observation space: Target_Coor (3), Effector Coor (3), Effector to Target Polar (3), angoli Attuali (5), velocità (5), last action (5)
        self.observation_space = Box(low=-10000,high=10000,shape=(24,))
        self.state = [0.0] * 24
        
        # Randomizzo angoli
        for i in range(10,15):
            self.state[i] = random.uniform( -1.0 , +1.0)

        # buffers
        self.BufferAngoliAttuali = self.state[10:15]
        self.BufferSpeed = [0.0]*5
        
        # lunghezza esercizio
        self.Exercise_length = 1000

    def step(self, action):
        global UE5_Socket
        global TargetDistanceForReward
        done = False
        
        # -------------applichiamo azione 
        self.BufferAngoliAttuali , self.BufferSpeed = RobotSpeedApplication(self.BufferAngoliAttuali, self.BufferSpeed, action)


        ''' ------- vecchio modo
        # velocità discretizzata 0-1.5 gradi/step ( 0.0261799 rad/step )
        # prendo il nuovo target proposto da Action, e calcolo un aumento in quella direzione di un grado per step
        # se Action corrisponde ad angolo attuale, velocità passa a zero
        NuoviAngoli = [0.0]*5
        for i in range(5):

            if self.BufferAngoliAttuali[i] < action[i]:
                
                NuoviAngoli[i] = self.BufferAngoliAttuali[i] + math.radians(0.5)
                self.BufferSpeed[i] = +0.0261799

            elif self.BufferAngoliAttuali[i] > action[i]:

                NuoviAngoli[i] = self.BufferAngoliAttuali[i] - math.radians(0.5)
                self.BufferSpeed[i] = -0.0261799

            else:
                self.BufferSpeed[i] = 0
        
        self.BufferAngoliAttuali = NuoviAngoli
        '''
        # fine applicazione
           
        # ---------------- Mando ricevo!
        Target_Coor, EE_Coor, EE_Rotator, CollisionDetected = MandaRicevi_Robot(UE5_Socket, R_Angles = self.BufferAngoliAttuali)
        
            
        # creo polari 
        EE_to_Base_Rotation = R.from_quat(EE_Rotator)
        Base_to_EE_Rotation = EE_to_Base_Rotation.inv()
        # to invert the homogeneous transformation
        # R' = R^-1
        Base_to_EE_Quat = Base_to_EE_Rotation.as_quat()
        # t' = - R^-1 * t
        Base_to_EE_Translation = - Base_to_EE_Rotation.apply(EE_Coor)

        EE_to_Target_Coord = change_reference_frame(Target_Coor,Base_to_EE_Translation,Base_to_EE_Quat)
        EE_to_Target_Polar = cartesian_to_polar_3d(EE_to_Target_Coord)


        # ----------------compilo lo state
        self.state[0:3]   = Target_Coor
        self.state[3:6]   = EE_Coor
        self.state[6:9]   = EE_to_Target_Polar

        self.state[9:14]  = self.BufferAngoliAttuali
        self.state[14:19] = self.BufferSpeed

        self.state[19:24] = self.last_action 
        
        # mi segno attuale azione, per futuro
        self.last_action = action

        # riduzione tempo 
        self.Exercise_length -= 1
        
        #---------------calcolo reward 
        reward = 0

        # reward base:
        reward += -0.005 * EE_to_Target_Polar[0]

        # reward aggiuntivo:
        if EE_to_Target_Polar[0] <= TargetDistanceForReward:
            reward = 2
            done = True
            print("NICE")
        
        #se rilevo collisione, tutti a casa
        if CollisionDetected == 1.0:
            reward = -1
            done = True
            print("Collision Detected")

        #-----------------verfica se tempo finito
        if done == False:  # solo se non ho già finito per altri motivi
            if self.Exercise_length <= 0:
                done = True
            else:
                done = False
                     
        # segnaposto per info
        info = {}
        # ritorniamo informazioin sullo step
        return np.array(self.state), reward, done, info

    def render(self,mode=0):
        print("Tempo: "+str(self.Exercise_length) + "-- state: " + str(self.state))

    def reset(self):
        print("RESETTTT")
        # mando segnale ad UE5 di resettare il target:
        # preparo messaggio binario

        Messaggio =(struct.pack("f",0.0)+
                    struct.pack("f",0.0)+
                    struct.pack("f",0.0)+
                    struct.pack("f",0.0)+
                    struct.pack("f",0.0)+
                    struct.pack("f",0.0)+
                    struct.pack("f",1.0)
                    )
        
        # mando
        UE5_Socket.sendall(Messaggio)

        #resettiamo 
        self.last_action = [0.0] * 5
        self.state = [0.0] * 24

        # Randomizzo angoli
        for i in range(10,15):
            self.state[i] = random.uniform( -1.0 , +1.0)

        # buffers
        self.BufferAngoliAttuali = self.state[10:15]
        self.BufferSpeed = [0.0]*5

        # resettiamo tempo
        self.Exercise_length = 1000

        return np.array(self.state)

def RobotSpeedApplication(arg_JointActualPos,arg_JointActualSpeed,arg_JointTargetPos):
    '''Given actual position and speed of the joints, and a target position, this func return the next step position.
    Using a prefixed acceleration curve hardcoded here. ( linear ramp for now )

    Args:
        JointActualPos: list of the actual angle of all joints
        JointActualSpeed: list the actual speed of all joints
        JointTargetPos: list of the target position of all joints

    Returns:
        JointNewPos: list of the new position of all joints
        JointNewSpeed: list of the new speed    
    
    '''
    JointActualPos      = copy.deepcopy(arg_JointActualPos)
    JointActualSpeed    = copy.deepcopy(arg_JointActualSpeed)
    JointTargetPos      = copy.deepcopy(arg_JointTargetPos)
    JointNewPos         = [0.0]*5
    JointNewSpeed       = [0.0]*5

    JOINT_MAX_SPEED = math.radians(120) # 120 degree/s in rads/s
    SYSTEM_TARGET_FREQ = 60 # Hz ideal realtime comunication 
    # quindi max gradi per Hz = 2°
    MaxJointSpeed_PerStep = JOINT_MAX_SPEED/SYSTEM_TARGET_FREQ

    for i in range(5):
        if JointActualSpeed[i] >= MaxJointSpeed_PerStep and (JointTargetPos[i]-JointActualPos[i]) > 1 :     # velocità sopra 2°/s e delta angolo maggiore di 1°
            JointNewSpeed[i] = MaxJointSpeed_PerStep                                                        # aggiorno velocità
            JointNewPos[i] = JointActualPos[i] + MaxJointSpeed_PerStep                                      # nuovo angolo = angolo attuale + max speed per singolo step

        elif JointActualSpeed[i] < MaxJointSpeed_PerStep and (JointTargetPos[i]-JointActualPos[i]) > 1 :    # velocità sotto 2°/s e delta angolo maggiore di 1°
            JointNewSpeed[i] = (JointActualSpeed[i] + MaxJointSpeed_PerStep/SYSTEM_TARGET_FREQ)                # nuova velocità = vecchia + minima accelerazione possibile
            JointNewPos[i] = JointActualPos[i] + JointNewSpeed[i]                                           # nuovo angolo = angolo attuale + nuova velocità

        elif JointActualSpeed[i] < MaxJointSpeed_PerStep and (JointTargetPos[i]-JointActualPos[i]) <= 1 :   # velocità sotto i 2°/s e delta angolo minore di 1°
            JointNewSpeed[i] = (JointActualSpeed[i] - MaxJointSpeed_PerStep/SYSTEM_TARGET_FREQ)             # decelero
            JointNewPos[i] = JointActualPos[i] + JointNewSpeed[i]                                           # applico velocità abbassata
            

    return JointNewPos, JointNewSpeed


def change_reference_frame(point, translation, quaternion):
    """Transform a point from one reference frame to another, given
        the translation vector between the two frames and the quaternion
        between  the two frames.

    Args:
        point (array_like,shape(3,) or shape(N,3)): x,y,z coordinates of the point in the original frame
        translation (array_like,shape(3,)): translation vector from the original frame to the new frame 
        quaternion (array_like,shape(4,)): quaternion from the original frame to the new frame

    Returns:
        ndarray,shape(3,): x,y,z coordinates of the point in the new frame.
        
    """

    #point = [1,2,3]
    #point = np.array([1,2,3])
    #point = np.array([[11,12,13],[21,22,23]]) # point.shape = (2,3) # point (11,12,13)  and point (21,22,23)

    # Apply rotation
    r = R.from_quat(quaternion)
    rotated_point = r.apply(np.array(point))
    # Apply translation
    translated_point = np.add(rotated_point, np.array(translation))

    return translated_point

def cartesian_to_polar_3d(cartesian_coordinates):
    """Transform 3D cartesian coordinates to 3D polar coordinates.

    Args:
        cartesian_coordinates (list): [x,y,z] coordinates of target point.

    Returns:
        list: [r,phi,theta] polar coordinates of point.

    """

    x = cartesian_coordinates[0]
    y = cartesian_coordinates[1]
    z = cartesian_coordinates[2]
    r =  np.sqrt(x**2+y**2+z**2)
    #? phi is defined in [-pi, +pi]
    phi = np.arctan2(y,x)
    #? theta is defined in [0, +pi]
    theta = np.arccos(z/r)

    return [r,theta,phi]

# codice ==================================================================

#creo e ascolto
Server_Socket = CreaSocket()

#attendo e connetto con UE5 
UE5_Socket , UE5_Address = AttesaConnesione(Server_Socket)

#confermo connesione aperta
print(f"Aperto socket con UE5: {UE5_Address}")

SimpleRobotEnv = RobotEnv()


Aperto socket con UE5: ('127.0.0.1', 50245)


In [7]:
from stable_baselines3.common.env_checker import check_env
check_env(SimpleRobotEnv)

RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT


In [9]:
# giro di prova dell'exercise
episodes = 1
for episode in range(1, episodes+1):
    state = SimpleRobotEnv.reset()
    done = False
    score = 0 
    
    while not done:
        SimpleRobotEnv.render(mode=0)
        action = SimpleRobotEnv.action_space.sample()
        n_state, reward, done, info = SimpleRobotEnv.step(action)
        score+=reward
    #print('Episode:{} Score:{}'.format(episode, score))

RESETTTT
Tempo: 1000-- state: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.04555779221459244, 0.8012257295229785, -0.9147689430067543, 0.12799763582218437, 0.3029277914249233, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
Tempo: 999-- state: [-0.38395240902900696, 0.0313226617872715, 92.36895751953125, -2.963913917541504, 6.856786251068115, 142.79635620117188, 50.95258278430186, 1.5294280142819634, 0.16909528422972633, -0.07889112554792577, 0.7678923961896452, -0.8814356096734209, 0.09466430248885105, 0.26959445809159, -0.03333333333333333, -0.03333333333333333, 0.03333333333333333, -0.03333333333333333, -0.03333333333333333, 0.0, 0.0, 0.0, 0.0, 0.0]
Tempo: 998-- state: [-0.38395240902900696, 0.0313226617872715, 92.36895751953125, 70.84556579589844, 106.1899642944336, 12.521162986755371, 150.72813854334936, 2.751092894953853, 0.04038959636666245, -0.14555779221459242, 0.7012257295229786, -0.8814356096734209, 0.027997635822184383, 0.20292779142492334, -0.06666666666666667, -0.06

In [12]:
from stable_baselines3 import TD3
from stable_baselines3.td3.policies import MlpPolicy
from stable_baselines3.common.noise import NormalActionNoise

n_actions = SimpleRobotEnv.action_space.shape[-1]

TargetDistanceForReward = 25
action_noise = NormalActionNoise( mean=np.zeros(n_actions) , sigma=0.9 * np.ones(n_actions) )

model = TD3(MlpPolicy, SimpleRobotEnv, action_noise=action_noise, verbose=1, learning_rate=0.001, buffer_size=1000000, batch_size=256,gamma=0.99,seed=123
            ,train_freq=(1,"episode"), tensorboard_log="./TD3_Madness_TB2/")

model.learn( log_interval = 7 , total_timesteps = 2500000 )

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
RESETTTT
Logging to ./TD3_Madness_TB2/TD3_19
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 8.86     |
|    ep_rew_mean     | -4.89    |
| time/              |          |
|    episodes        | 7        |
|    fps             | 51       |
|    time_elapsed    | 1        |
|    total_timesteps | 62       |
---------------------------------
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
Collision Detected
RESETTTT
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 4.93     |
|    ep_rew_mean     | -2.9

error: unpack requires a buffer of 4 bytes

Allenamento approfondito:

In [149]:
TargetDistanceForReward = 20

model.set_env(SimpleRobotEnv)

model.action_noise = NormalActionNoise( mean=np.zeros(n_actions) , sigma=0.5 * np.ones(n_actions) )
model.learning_rate = 0.001

model.learn(total_timesteps=2500000, log_interval=5)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
RESETTTT
Logging to ./TD3_Madness_TB2/TD3_4
RESETTTT
RESETTTT
RESETTTT


KeyboardInterrupt: 

Piccolo test del modello:

In [205]:
obs = SimpleRobotEnv.reset()
TargetDistanceForReward = 5
while True:
    action, _states = model.predict(obs)
    obs, rewards, dones, info = SimpleRobotEnv.step(action)
    if dones:
        break
    SimpleRobotEnv.render()

RESETTTT
Tempo: 999-- state: [110.2102279663086, -29.162033081054688, 8.014104843139648, 6.30468225479126, 16.422828674316406, -8.567770957946777, 114.67040008627849, 0.3658657875376487, -0.8929039330440408, 0.8797154585055792, -0.9055188109139741, -0.21443704343511671, -0.7763415458413266, 0.15578729037391736, -0.0261799, -0.0261799, 0.0261799, 0.0261799, 0.0261799, 0.0, 0.0, 0.0, 0.0, 0.0]
Tempo: 998-- state: [110.2102279663086, -29.162033081054688, 8.014104843139648, 10.592447280883789, -29.614614486694336, -84.32867431640625, 135.83481115504523, 1.361354534669676, 2.682882920037832, 0.8884421047655509, -0.8967921646540025, -0.20571039717514505, -0.767614899581355, 0.16451393663388902, 0.0261799, 0.0261799, 0.0261799, 0.0261799, 0.0261799, -0.9999994, -1.0, 0.89289105, 1.0, 0.99975514]
Tempo: 997-- state: [110.2102279663086, -29.162033081054688, 8.014104843139648, 6.170560359954834, -30.706195831298828, -84.71000671386719, 139.37143802901682, 1.3707657434046225, 2.7392694847848853, 

KeyboardInterrupt: 

In [162]:
model= TD3.load("TD3_madness_working")


In [16]:
model.save(path="TD3_madness_working")

Chiusure varie

In [None]:
# spengo socket prima di uscire
UE5_Socket.shutdown(socket.SHUT_RDWR)
UE5_Socket.close()