In [1]:
import numpy as np
import gymnasium as gym
import random
from tqdm import tqdm

In [2]:

class QLearn:
    def __init__(self,env:gym.Env):
        self.env = env
        self.obs_space: gym.spaces.Box = env.observation_space 

    def set_num_of_increments(self,num_of_increments: np.ndarray):
        self.num_of_increments: np.ndarray = num_of_increments
        self.increment: np.ndarray = self._obs_increment()

    
    def _obs_increment(self) -> np.ndarray:
        high = np.array([1.5,1.5,3,3,1.5,3,1,1])
        low = np.array([-1.5,-1.5,-3,-3,-1.5,-3,0,0])
        # range = self.obs_space.high - self.obs_space.low
        range = high - low
        increment = range / self.num_of_increments
        return increment

    def regulate_obs(self, obs: np.ndarray,time_step:int = None)-> np.ndarray:
        float_num = obs // self.increment + (self.num_of_increments //2)
        float_num = float_num[:-2]
        bool_num = obs[-2:]

        low_limit = np.zeros(8)
        high_limit = self.num_of_increments -1

        reg_obs = np.concatenate((float_num,bool_num)).astype(int)
        reg_obs = reg_obs.clip(low_limit,high_limit).astype(int)
        if time_step is not None:
            return self.add_step_dim(reg_obs,time_step)
        else:
            return reg_obs
    
    def add_step_dim(self,obs:np.ndarray, time_step: int):
        max_step = 300
        time_incre = 16
        incre = max_step / time_incre
        time_step = min(time_incre-1,time_step//incre)
        return np.insert(obs,0,time_step)


def create_Q_table(*dim):
    return np.zeros(dim)

#Bellman equation, Q
def Q_observed(s2_reward:np.float64, s2_obs: np.ndarray, q_table: np.ndarray, gamma:float):
    arg_max_Q = np.argmax(q_table[tuple(s2_obs)])
    return s2_reward + gamma * arg_max_Q

# def Q_observed(reward:np.float64, s2_obs: np.ndarray, q_table: np.ndarray, gamma:float):
#     arg_max_Q = np.argmax(q_table[tuple(s2_obs)])
#     return reward + gamma * arg_max_Q

def Q_expected(s1_obs: np.ndarray,s1_action: np.int64, q_table: np.ndarray):
    index = *s1_obs , s1_action
    return q_table[index]

def update_Q(Q_observed: float,Q_expected: float, alpha: float, q_table: np.ndarray,s1_obs: np.ndarray,s1_action: np.int64):
    index = *s1_obs , s1_action
    q_table[index] = Q_expected + alpha * (Q_observed - Q_expected)
    return q_table

def action_policy(q_table: np.ndarray, s1_obs: np.ndarray):
    return np.argmax(q_table[tuple(s1_obs)])


def expo_decay(large_epsilon, small_epsilon, epoch, steps):
    a = large_epsilon
    b = small_epsilon
    e = np.e
    z = 1- steps/epoch
    return z*((a-b)/e)*(e**z)+b






In [3]:
time_incre = 5
states = np.array([16,16,16,16,16,16,2,2])
# q_table = create_Q_table(time_incre,*states,4)
q_table = create_Q_table(16,*states,4)

In [4]:
env = gym.make("LunarLander-v2")
agent = QLearn(env)

In [5]:

env = gym.make("LunarLander-v2")
observation, info = env.reset()
max_alpha = 0.1
min_alpha = 0.00001
gamma = 0.55
# epsilon = 0.1
max_epsilon = 0.3
min_epsilon = 0.00001
# num_of_increments = 10
epoch = 4_000_000

agent = QLearn(env)
agent.set_num_of_increments(states)
game_step = 0
obs = agent.regulate_obs(observation,0)



In [6]:
for _ in tqdm(range(epoch)):
    epsilon = expo_decay(max_epsilon,min_epsilon,epoch,_)
    alpha = expo_decay(max_alpha,min_alpha,epoch,_)
    policy_action = action_policy(q_table,obs)
    action = policy_action if random.random() > epsilon else env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)

    new_obs = agent.regulate_obs(observation,_)

    q_observe = Q_observed(reward,new_obs,q_table,gamma)
    q_expected = Q_expected(obs,action,q_table)

    update_Q(q_observe,q_expected,alpha,q_table,obs,action)

    if terminated or truncated:
        observation, info = env.reset()
        obs = agent.regulate_obs(observation,_)
    else:
        obs = new_obs


100%|██████████| 4000000/4000000 [08:59<00:00, 7417.05it/s]


In [9]:
from playsound import playsound
playsound('./yakemashita.wav')

In [10]:
env = gym.make("LunarLander-v2",render_mode="human")
observation, info = env.reset()
obs = agent.regulate_obs(observation,0)

 

for _ in range(epoch):
    game_step += 1
    action = action_policy(q_table,obs)
    observation, reward, terminated, truncated, info = env.step(action)
    new_obs = agent.regulate_obs(observation,_)

    if terminated or truncated:
        observation, info = env.reset()
        game_step = 0
        obs = agent.regulate_obs(observation,_)
    else:
        obs = new_obs

KeyboardInterrupt: 

In [None]:
env = gym.make("LunarLander-v2")
observation, info = env.reset()

obs = agent.regulate_obs(observation,0)

temp = []
term_count = 0
survival_count = 0
for _ in range(100000):
    game_step += 1

    policy_action = action_policy(q_table,obs)
    action = policy_action
    observation, reward, terminated, truncated, info = env.step(action)
    new_obs = agent.regulate_obs(observation,_)

    if terminated or truncated:
        observation, info = env.reset()
        game_step = 0
        obs = agent.regulate_obs(observation,_)
        term_count += 1
        if reward > 99:
            survival_count +=1
            print(f"term: {term_count}, survival rate: {survival_count/term_count}")
    else:
        obs = new_obs
        temp.append(observation[3])
print(1000000/term_count)

In [None]:
def mean(data):
    """Return the sample arithmetic mean of data."""
    n = len(data)
    if n < 1:
        raise ValueError('mean requires at least one data point')
    return sum(data)/n # in Python 2 use sum(data)/float(n)

def _ss(data):
    """Return sum of square deviations of sequence data."""
    c = mean(data)
    ss = sum((x-c)**2 for x in data)
    return ss

def stddev(data, ddof=0):
    """Calculates the population standard deviation
    by default; specify ddof=1 to compute the sample
    standard deviation."""
    n = len(data)
    if n < 2:
        raise ValueError('variance requires at least two data points')
    ss = _ss(data)
    pvar = ss/(n-ddof)
    return pvar**0.5

mean(temp)

In [None]:
env.close

In [None]:
np.save('my_array.npy', q_table) 