<a href="https://colab.research.google.com/github/udsey/SATO_RL/blob/main/Train_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import plotly.graph_objects as go
import gc

# Дискретная модель поезда, состояниее - позиция, действие - ускорение

In [5]:
class Train:

    def __init__(self, time_limit=2000, v_limit=100, s_limit=300, dt=10):
        self.time_limit = time_limit # s
        self.v_limit = v_limit # km/h
        self.s_limit = s_limit # km
        self.state_space = np.arange(0, self.s_limit, 0.001) # m
        self.action_space = np.arange(-0.4, 0.5, 0.1) # m/s^2
        self.dt = dt / 3600 # h
        self.df_limit = pd.DataFrame({'start': [0],
                                      'stop': [self.s_limit],
                                      'limit': self.v_limit,})

# Добавление ограничений скорости на участке

    def append_velocity_limit(self, limit, start, stop):
        self.df_limit = self.df_limit.append({'start': start,
                                              'stop': stop,
                                              'limit': limit,}, ignore_index=True)
        return self.df_limit
# Сброс скоростного ограничения к одному на всем участке
    def reset_velocity_limit(self):
        self.append_velocity_limit(self, limit=self.v_limit, 
                                   position=range(0, self.s_limit))
# Сброс окружения к начальному значению
    def reset(self):
        self.s = 0
        self.v = 0
        self.a = 0
        self.done = False
        self.total_time = 0
        self.reward = 0
        self.v_list = []
        self.s_list = []
        self.t_list = []
        self.a_list = []
        return self.s

# Возвращает скоростное ограничение на текущем участке 
    def speed_limit(self):
        if self.s < 0:
            return self.df_limit.iloc[0, 2]
    
        for i in range(self.df_limit.shape[0]):
            min_p = self.df_limit.iloc[i, 0]
            max_p = self.df_limit.iloc[i, 1]
            if min_p <= self.s <= max_p:
                return self.df_limit.iloc[i, 2]
            else:
                return min_p
# Возвращает награду в соответствии с текущим состоянием
    def reward_func(self):
        if self.s < 0: # Уехал в обратную сторону
            return -int(self.time_limit), True
        if self.s >= self.s_limit: # Доехал до точки назначения
            self.s = self.s_limit
            return int(self.time_limit - self.total_time), True
        if self.total_time > self.time_limit/3600: # Превысил время
            return -int(self.time_limit), True
        if self.v > self.speed_limit()*1.5: # Превысил скорость
            return -int(self.time_limit), True
        if self.speed_limit()/2 <= self.v <= self.speed_limit():
            return 1, False
        else:
            return 0, False
        
        

# Шаг для заданного действия
    def step(self, action):
        
        self.total_time += self.dt # h
        self.a = self.action_space[action] * (3600**2 / 1000) # km/h^2
        self.s += self.v * self.dt + (self.a * (self.dt ** 2))/2 #km
        self.v += (self.a * self.dt) # km/h
        if self.v < 0 :
            self.v = 0

        self.reward, self.done = self.reward_func()

        self.v_list.append(self.v)
        self.t_list.append(self.total_time)
        self.s_list.append(self.s)
        self.a_list.append(self.a)

        return  int(np.round(self.s, 3)*1000)-1, self.reward, self.done

# Справочная информация о движении
    def action_info(self, action):
        print('*'*20)
        print('Ускорение {:.2f} м/с^2'.format(self.action_space[action]))
        print('Скорость {:.2f} км/ч' .format(self.v)),
        print('Пройденный путь {:.6f} км'.format(self.s))
        print('Штраф', self.reward)
        print('*'*20)

# Возвращает данные о скорости на каждом участке        
    def speed_legend(self):
        df = pd.DataFrame(columns=['time', 'position', 'speed', 'acceleration'])
        df.time = self.t_list
        df.position = self.s_list
        df.speed = self.v_list
        df.acceleration = self.a_list
        return df

 

In [7]:
train_pos_st = Train()
train_pos_st.reset()
train_pos_st.step(1)

(-16, -2000, True)

# Дискретная модель, состояние - время, действие - ускорение

In [8]:
class Train:


    def __init__(self, time_limit=2000, v_limit=100, s_limit=300, dt=10):
        self.time_limit = time_limit # s
        self.v_limit = v_limit # km/h
        self.s_limit = s_limit # km
        self.state_space = np.arange(0, self.time_limit, dt)  # sec
        self.action_space = np.arange(-0.4, 0.5, 0.1) # m/s^2
        self.dt = dt / 3600 # h
        self.df_limit = pd.DataFrame({'start': [0],
                                      'stop': [self.s_limit],
                                      'limit': self.v_limit,})
        self.total_time = 0

# Добавление ограничений скорости на участке

    def append_velocity_limit(self, limit, start, stop):
        self.df_limit = self.df_limit.append({'start': start,
                                              'stop': stop,
                                              'limit': limit,}, ignore_index=True)
        return self.df_limit
# Сброс скоростного ограничения к одному на всем участке
    def reset_velocity_limit(self):
        self.append_velocity_limit(self, limit=self.v_limit, 
                                   position=range(0, self.s_limit))
# Сброс окружения к начальному значению
    def reset(self):
        self.s = 0
        self.v = 0
        self.a = 0
        self.done = False
        self.total_time = 0
        self.reward = 0
        self.v_list = []
        self.s_list = []
        self.t_list = []
        self.a_list = []
        self.state = -1
        return self.s
# Возвращает скоростное ограничение на текущем участке 
    def speed_limit(self):
        if self.s < 0:
            return self.df_limit.iloc[0, 2]
    
        for i in range(self.df_limit.shape[0]):
            min_p = self.df_limit.iloc[i, 0]
            max_p = self.df_limit.iloc[i, 1]
            if min_p <= self.s <= max_p:
                return self.df_limit.iloc[i, 2]
            else:
                return min_p
# Шаг для заданного действия
    def step(self, action):
        self.state += 1
        self.total_time += self.dt # h
        self.a = self.action_space[action] * (3600**2 / 1000) # km/h^2
        self.s += self.v * self.dt + (self.a * (self.dt ** 2))/2 #km
        self.v += (self.a * self.dt) # km/h
        
        
        self.v_list.append(self.v)
        self.t_list.append(self.total_time)
        if self.s>=self.s_limit:
            self.s_list.append(self.s_limit)
        else:
            self.s_list.append(self.s)
        self.a_list.append(self.a)



        if self.s >= self.s_limit:
                self.s = self.s_limit
                self.reward = 100
                self.done = True
                #print('Done')
                return  self.state, self.reward, self.done
        elif self.state  <  self.state_space.shape[0]-1:
            if self.v > self.speed_limit(): # если превышает скорость
                self.reward = -10
            elif self.s < 0:  # Движение в обратном направлении
                self.reward = -1000
                self.done = True
            elif self.v < 0:
                self.reward = -20
            elif 0 <= self.v < self.speed_limit()*0.5:  # если слишком маленькая скорость
                self.reward = -10
                #print('Wrong direction')
                return  self.state, self.reward, self.done  
            else :
                self.reward = 0
        else:
                self.reward = -100
                self.done = True
                
                #print('Time limit')
                return  self.state, self.reward, self.done
        #print('Next step')
        return  self.state, self.reward, self.done

# Справочная информация о движении
    def action_info(self, action):
        print('*'*20)
        print('Ускорение {:.2f} м/с^2'.format(self.action_space[action]))
        print('Скорость {:.2f} км/ч' .format(self.v)),
        print('Пройденный путь {:.6f} км'.format(self.s))
        print('Штраф', self.reward)
        print('*'*20)

# Возвращает данные о скорости на каждом участке        
    def speed_legend(self):
        df = pd.DataFrame(columns=['time', 'position', 'speed', 'acceleration'])
        df.time = self.t_list
        df.position = self.s_list
        df.speed = self.v_list
        df.acceleration = self.a_list
        return df

 

In [9]:
train_time_st = Train()
train_time_st.reset()
train_time_st.step(1)

(0, -1000, True)

# Непрерывная модель, состояние - позиция, действий - ускорение

In [10]:
class Train:


    def __init__(self, time_limit=2000, v_limit=100, s_limit=300, dt=10):
        self.time_limit = time_limit # s
        self.v_limit = v_limit # km/h
        self.s_limit = s_limit # km
        self.state_space = np.arange(0, self.s_limit, 0.001) # m
        self.action_space = np.arange(-0.4, 0.5, 0.1) # m/s^2
        self.action_space_high = 0.5
        self.action_space_low = -0.5
        self.dt = dt / 3600 # h
        self.df_limit = pd.DataFrame({'start': [0],
                                      'stop': [self.s_limit],
                                      'limit': self.v_limit,})

# Добавление ограничений скорости на участке

    def append_velocity_limit(self, limit, start, stop):
        self.df_limit = self.df_limit.append({'start': start,
                                              'stop': stop,
                                              'limit': limit,}, ignore_index=True)
        return self.df_limit
# Сброс скоростного ограничения к одному на всем участке
    def reset_velocity_limit(self):
        self.append_velocity_limit(self, limit=self.v_limit, 
                                   position=range(0, self.s_limit))
# Сброс окружения к начальному значению
    def reset(self):
        self.s = 0
        self.v = 0
        self.a = 0
        self.done = False
        self.total_time = 0
        self.reward = 0
        self.v_list = []
        self.s_list = []
        self.t_list = []
        self.a_list = []
        self.reward_list = []
        self.state = self.state = self.s*1000
        return self.state

# Возвращает скоростное ограничение на текущем участке 
    def speed_limit(self):
        if self.s < 0:
            return self.df_limit.iloc[0, 2]
    
        for i in range(self.df_limit.shape[0]):
            min_p = self.df_limit.iloc[i, 0]
            max_p = self.df_limit.iloc[i, 1]
            if min_p <= self.s <= max_p:
                return self.df_limit.iloc[i, 2]
            else:
                return min_p
# Возвращает награду в соответствии с текущим состоянием
    def reward_func(self):
        if self.s < 0: # Уехал в обратную сторону
            return -1000, True
        if self.s >= self.s_limit: # Доехал до точки назначения
            self.s = self.s_limit
            return 100, True
        if self.total_time > self.time_limit/3600: # Превысил время
            return -100, True
        if self.v > self.speed_limit(): # Превысил скорость
            return -500, True
        if self.speed_limit()/2 <= self.v <= self.speed_limit():
            return 1, False
        else:
            return 0, False
        
        

# Шаг для заданного действия
    def step(self, action):
        
        self.total_time += self.dt # h
        self.a = action * (3600**2 / 1000) # km/h^2
        self.s += self.v * self.dt + (self.a * (self.dt ** 2))/2 #km
        self.v += (self.a * self.dt) # km/h
        if self.v < 0 :
            self.v = 0

        self.reward, self.done = self.reward_func()

        self.v_list.append(self.v)
        self.t_list.append(self.total_time)
        self.s_list.append(self.s)
        self.a_list.append(self.a)
        self.reward_list.append(self.reward)
        self.state = self.s*1000
        return  self.state, self.reward, self.done

# Справочная информация о движении
    def action_info(self, action):
        print('*'*20)
        print('Ускорение {:.2f} м/с^2'.format(action))
        print('Скорость {:.2f} км/ч' .format(self.v)),
        print('Пройденный путь {:.6f} км'.format(self.s))
        print('Штраф', self.reward)
        print('*'*20)

# Возвращает данные о скорости на каждом участке        
    def speed_legend(self):
        df = pd.DataFrame(columns=['time', 'position', 'speed', 'acceleration', 'reward'])
        df.time = self.t_list
        df.position = self.s_list
        df.speed = self.v_list
        df.acceleration = self.a_list
        df.reward = self.reward_list
        return df



 

In [15]:
train_cont_pos_st = Train()
train_cont_pos_st.reset()
train_cont_pos_st.step(-0.5)

(-25.0, -1000, True)

# Непрерывная модель, состояние - позиция, скорость, время, действий - ускорение

In [16]:
class Train:


    def __init__(self, time_limit=2000, v_limit=100, s_limit=300, dt=10):
        self.time_limit = time_limit # s
        self.v_limit = v_limit # km/h
        self.s_limit = s_limit # km
        self.state_space = np.arange(0, self.s_limit, 0.001) # m
        self.action_space = np.arange(-0.4, 0.5, 0.1) # m/s^2
        self.action_space_high = 0.5
        self.action_space_low = -0.5
        self.dt = dt / 3600 # h
        self.df_limit = pd.DataFrame({'start': [0],
                                      'stop': [self.s_limit],
                                      'limit': self.v_limit,})

# Добавление ограничений скорости на участке

    def append_velocity_limit(self, limit, start, stop):
        self.df_limit = self.df_limit.append({'start': start,
                                              'stop': stop,
                                              'limit': limit,}, ignore_index=True)
        return self.df_limit
# Сброс скоростного ограничения к одному на всем участке
    def reset_velocity_limit(self):
        self.append_velocity_limit(self, limit=self.v_limit, 
                                   position=range(0, self.s_limit))
# Сброс окружения к начальному значению
    def reset(self):
        self.s = 0
        self.v = 0
        self.a = 0
        self.done = False
        self.total_time = 0
        self.reward = 0
        self.v_list = []
        self.s_list = []
        self.t_list = []
        self.a_list = []
        self.reward_list = []
        self.state = np.array([self.s*1000, self.v, self.total_time])
        return self.state

# Возвращает скоростное ограничение на текущем участке 
    def speed_limit(self):
        if self.s < 0:
            return self.df_limit.iloc[0, 2]
    
        for i in range(self.df_limit.shape[0]):
            min_p = self.df_limit.iloc[i, 0]
            max_p = self.df_limit.iloc[i, 1]
            if min_p <= self.s <= max_p:
                return self.df_limit.iloc[i, 2]
            else:
                return min_p
# Возвращает награду в соответствии с текущим состоянием
    def reward_func(self):
        if self.s < 0: # Уехал в обратную сторону
            return -1000, True
        if self.s >= self.s_limit: # Доехал до точки назначения
            self.s = self.s_limit
            return 100, True
        if self.total_time > self.time_limit/3600: # Превысил время
            return -100, True
        if self.v > self.speed_limit(): # Превысил скорость
            return -500, True
        if self.speed_limit()/2 <= self.v <= self.speed_limit():
            return 1, False
        else:
            return 0, False
        
        

# Шаг для заданного действия
    def step(self, action):
        
        self.total_time += self.dt # h
        self.a = action * (3600**2 / 1000) # km/h^2
        self.s += self.v * self.dt + (self.a * (self.dt ** 2))/2 #km
        self.v += (self.a * self.dt) # km/h
        if self.v < 0 :
            self.v = 0

        self.reward, self.done = self.reward_func()

        self.v_list.append(self.v)
        self.t_list.append(self.total_time)
        self.s_list.append(self.s)
        self.a_list.append(self.a)
        self.reward_list.append(self.reward)
        self.state = np.array([self.s*1000, self.v, self.total_time])
        return  self.state, self.reward, self.done

# Справочная информация о движении
    def action_info(self, action):
        print('*'*20)
        print('Ускорение {:.2f} м/с^2'.format(action))
        print('Скорость {:.2f} км/ч' .format(self.v)),
        print('Пройденный путь {:.6f} км'.format(self.s))
        print('Штраф', self.reward)
        print('*'*20)

# Возвращает данные о скорости на каждом участке        
    def speed_legend(self):
        df = pd.DataFrame(columns=['time', 'position', 'speed', 'acceleration', 'reward'])
        df.time = self.t_list
        df.position = self.s_list
        df.speed = self.v_list
        df.acceleration = self.a_list
        df.reward = self.reward_list
        return df



 

In [17]:
train_cont_pos_time_vel_st = Train()
train_cont_pos_time_vel_st.reset()
train_cont_pos_time_vel_st.step(-0.5)

(array([-2.50000000e+01,  0.00000000e+00,  2.77777778e-03]), -1000, True)