Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Descent Env"

In [1]:
import numpy as np
from descent_env import DescentEnv
import random 


pygame 2.5.2 (SDL 2.28.3, Python 3.10.0)
Hello from the pygame community. https://www.pygame.org/contribute.html
Using Python-based geo functions


In [2]:
# Cambiar render_mode a rgb_array para entrenar/testear
env = DescentEnv(render_mode='human')

Reading config from C:\Users\User\bluesky\settings.cfg
Reading magnetic variation data
Loading global navigation database...
Reading cache: C:\Users\User\bluesky\cache\navdata.p
Successfully loaded OpenAP performance model
Failed to load BADA performance model
Successfully loaded legacy performance model
Successfully loaded plugin AREA
Successfully loaded plugin DATAFEED


Observation Space

In [3]:
env.observation_space

Dict('altitude': Box(-inf, inf, (1,), float64), 'runway_distance': Box(-inf, inf, (1,), float64), 'target_altitude': Box(-inf, inf, (1,), float64), 'vz': Box(-inf, inf, (1,), float64))

Action Space

In [4]:
env.action_space

Box(-1.0, 1.0, (1,), float64)

Discretización de los estados

**Nota:** es importante que chequeen el espacio de observación y el espacio de acción del entorno. Los números usados son ejemplos y pueden no ser correctos

In [5]:
altitude_space = np.linspace(0, 1, 10)           
vertical_velocity_space = np.linspace(-2.5, 2.5, 10)  
target_altitude_space = np.linspace(0, 1, 5)     
runway_distance_space = np.linspace(0, 1, 5)     
altitude_space

array([0.        , 0.11111111, 0.22222222, 0.33333333, 0.44444444,
       0.55555556, 0.66666667, 0.77777778, 0.88888889, 1.        ])

Obtener el estado a partir de la observación

In [6]:
def get_state(obs):
    alt = obs['altitude'][0]
    vz = obs['vz'][0]
    target_alt = obs['target_altitude'][0]
    runway_dist = obs['runway_distance'][0]
    alt_idx = min(np.digitize(alt, altitude_space), len(altitude_space)-1)
    vz_idx = min(np.digitize(vz, vertical_velocity_space), len(vertical_velocity_space)-1)
    target_alt_idx = min(np.digitize(target_alt, target_altitude_space), len(target_altitude_space)-1)
    runway_dist_idx = min(np.digitize(runway_dist, runway_distance_space), len(runway_distance_space)-1)
    return alt_idx, vz_idx, target_alt_idx, runway_dist_idx

In [7]:
obs = env.observation_space.sample()
print(obs)
state = get_state(obs) # Ejemplo de obs
state

OrderedDict([('altitude', array([-1.24606197])), ('runway_distance', array([-1.19115306])), ('target_altitude', array([0.44809907])), ('vz', array([0.79663088]))])


(0, 6, 2, 0)

Discretización de las acciones

In [8]:
actions = list(np.linspace(-1, 1, 5))
actions

[-1.0, -0.5, 0.0, 0.5, 1.0]

In [9]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [10]:
Q = np.zeros((len(altitude_space), len(vertical_velocity_space), len(target_altitude_space), len(runway_distance_space), len(actions)))
Q

array([[[[[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]],

         [[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]],

         [[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]],

         [[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]],

         [[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]]],


        [[[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]],

         [[0., 0., 0., 0.,

Obtención de la acción a partir de la tabla Q

In [11]:
def optimal_policy(state, Q):
    action = actions[np.argmax(Q[state])]
    return action

Epsilon-Greedy Policy

In [12]:
def epsilon_greedy_policy(state, Q, epsilon=0.1):
    explore = np.random.binomial(1, epsilon)
    if explore:
        action = get_sample_action()
    else:
        action = optimal_policy(state, Q)
        
    return action

Ejemplo de episodio 

In [13]:
alpha = 0.1        # tasa de aprendizaje
gamma = 0.99       # factor de descuento
epsilon = 0.5      # probabilidad de explorar

obs,_ = env.reset()
print(obs)
done = False
total_reward = 0
state = get_state(obs)
steps = 0
while not done:
    steps += 1
    # Acción del modelo
    action = epsilon_greedy_policy(state, Q, 0.5)
    
    # Indice de la accion en Q
    action_idx = actions.index(action)
    
    # Acción del ambiente
    real_action = np.array([action])
     
    obs, reward, done, _, _ = env.step(real_action)
    next_state = get_state(obs)
    
    next_action_idx = np.argmax(Q[next_state])

    # Actualizar Q-table
    td_target = reward + gamma * Q[next_state][next_action_idx]
    td_error = td_target - Q[state][action_idx]
    Q[state][action_idx] += alpha * td_error
   # Actualizar estado
    state = next_state
   
    total_reward += reward

    env.render()

env.close()    
print('total_reward', total_reward)
print('steps', steps)

{'altitude': array([0.78366667]), 'vz': array([0.]), 'target_altitude': array([0.793]), 'runway_distance': array([0.5])}
total_reward -162.72664
steps 17


: 