# MountainCar DQN

In [2]:
import math
import copy
import time
import gym
import numpy as np
import matplotlib.pyplot as plt   
import torch
import torch.nn as nn

In [3]:
class DQN:
    """ DQN метод для дискретных действий """
    def __init__(self, env):
        self.env     = env                                  # среда с которой мы работаем
        self.obs_min = env.observation_space.low            # минимальные значения наблюдений
        self.obs_max = env.observation_space.high           # максимальные значения наблюдений
        self.nS      =  self.env.observation_space.shape[0] # число переменных состояния
        self.nA      =  self.env.action_space.n             # число дискретных действий

        self.gamma     = 0.99      # дисконтирующий множитель
        self.eps       = 1.        # эпсилон-жадная стратегия
        self.eps_decay = 0.995      # скорость распада эпсилон
        self.eps_min   = 1e-3      # после него eps обнуляется

        self.epochs    = 1         # число эпох обучения        
        self.batch     = 10        # размер батча для обучения
        self.capacity  = 1000      # величина памяти      
        self.learn_step = 5        # через сколько эпизодов учим

        print("obs_min", self.obs_min)  
        print("obs_max", self.obs_max)  

    #---------------------------------------------------------------------------------------    

    def init(self, nH = 32):
        """ Сформировать нейронную сеть c nH нейронами в скрытом слое """            
        nX = self.nS + self.nA
        self.model = nn.Sequential(
            nn.Linear(nX, 64),
            nn.Tanh(),
            nn.Linear(64, 1) )
              
        self.loss      = nn.MSELoss()
        #self.optimizer = torch.optim.SGD(self.model.parameters(), lr=0.00001, momentum=0.8)     
        self.optimizer  = torch.optim.Adam(self.model.parameters(), lr=0.00001)

        self.best_reward = -1e5

    #---------------------------------------------------------------------------------------

    def scale(self, obs):           
        return -1. + 2.*(obs - self.obs_min)/(self.obs_max-self.obs_min)

    #---------------------------------------------------------------------------------------

    def getQ(self, state):           
        """
        Значения Q при данном state и различных состояниях
        """
        actions = torch.eye(self.nA,  dtype=torch.float32)
        states  = torch.tensor(state, dtype=torch.float32).repeat(self.nA, 1)
        x = torch.cat([states, actions], axis=1)
        with torch.no_grad():
            y = self.model(x).detach().numpy()
        return y                                # (num_actions,)

    #---------------------------------------------------------------------------------------

    def policy(self, state):           
        """
        Вернуть действие в соответствии с epsilon-жадной стратегией 
        """
        if np.random.random() < self.eps:               # случайное действие: 
            action = np.random.randint(0, self.nA)
            #action = 2*int(state[1] > 0)          
            y = self.getQ(state)
            return action, y[action][0]
        
        y = self.getQ(state)
        action =  np.argmax(y)   
        Q      =  np.max(y)         
        return action, Q                                 # лучшее действие

    #---------------------------------------------------------------------------------------

    def maxQ(self, state):           
        """
        Вычислить максимальное значение ценности
        """
        y = self.getQ(state)
        Q =  np.max(y)
        return Q                    

    #---------------------------------------------------------------------------------------
    
    def test(self, episodes = 1000, ticks = 1000):
        """ 
        Тестирование с неизменной Q-функцией 
        """
        rews = []
        for _ in range(episodes):        
            obs =  self.env.reset()
            tot = 0
            for _ in range(ticks):
                obs = -1. + 2.*(obs - self.obs_min)/(self.obs_max-self.obs_min)
                action, _ = self.policy(obs)                
                obs, rew, done, _ = self.env.step(action) 

                tot += rew
                if done:                    
                    break
            rews.append(tot)

        print(f"Reward[{episodes},{ticks}]: {np.mean(rews):>7.2f} ± {np.std(rews):>.1f}")

    #---------------------------------------------------------------------------------------
    
    def learn(self, lm = 0.1, episodes = 100000, ticks = 200):
        """
        """
        self.memoX, self.memoR, self.memoQ = [], [], []
        rews, As, beg   = [], [], time.process_time()
        for episode in range(episodes):        
            s0 = self.env.reset()             
            s0 = self.scale(s0)
            a0, _ = self.policy(s0)  
            tot = 0
            for t in range(ticks):                                   
                As.append(float(a0))
                s1, r1, done, _ = self.env.step(a0)                  
                s1 = self.scale(s1)
                tot += r1
                
                actions = [0.]*self.nA;  actions[a0] = 1.               
                self.memoX.append(s0.tolist() + actions)
                if s1[0] > 0:
                    r1 += s1[0]
                self.memoR.append( r1 )
                
                a1, Q1 = self.policy(s1)  

                if done and t+1 < ticks:
                    Q1 = 0
                self.memoQ.append(Q1)                 

                if done:                    
                    break
                    
                s0, a0 = s1, a1                

            rews.append(tot)

            self.memoX  = self.memoX[-self.capacity:]     # обрезаем память
            self.memoR  = self.memoR[-self.capacity:]
            self.memoQ  = self.memoQ[-self.capacity:]            

            if episode == 0:
                print(f"R: {tot:7.2f}  Q: {np.mean(self.memoQ):8.2f} ± {np.std(self.memoQ):.0e}")

            if episode and episode % self.learn_step:
                self.learn_model()                         # обучаем модель

            self.eps *= self.eps_decay                     # epsilon-распад
            if self.eps < self.eps_min:
                self.eps = 0.
            
            if episode and episode % 100 == 0:
                mean, std = np.mean(rews[-100:]), np.std(rews[-100:])
                print(f"{episode:5d} R: {mean:7.2f} ± {std:.0e},  epsilon: {self.eps:.0e},  Q: {np.mean(self.memoQ):8.2f} ± {np.std(self.memoQ):.0e}, loss: {self.lastL:.1e}, As:{np.mean(As[-100:]):.2f} ± {np.std(As[-100:]):.0e} best: {self.best_reward:7.2f}, time: {(time.process_time() - beg):3.0f}s")
                beg = time.process_time()

                if mean > self.best_reward:
                    self.best_reward = mean                

    #---------------------------------------------------------------------------------------

    def learn_model(self):
        """
        Обучение модели
        """        
        X = torch.tensor(self.memoX, dtype=torch.float32)
        R = torch.tensor(self.memoR, dtype=torch.float32).view(-1,1)
        Q = torch.tensor(self.memoQ, dtype=torch.float32).view(-1,1)

        idx = torch.randperm( len(X) )     # перемешанный список индексов
        X, R, Q = X[idx], R[idx], Q[idx]
        
        for epoch in range(self.epochs):                     # эпоха - проход по всем примерам
            numB, sumL = int( len(X)/self.batch), 0.
            for i in range(0, numB*self.batch, self.batch):          
                xb, rb, qb = X[i: i+self.batch], R[i: i+self.batch], Q[i: i+self.batch]                        
                yb = rb + self.gamma*qb
            
                y = self.model(xb)                           # прямое распространение    # m = torch.mean(y)                                                                                     
                L = self.loss(y, yb)                         #  L = torch.mean((y-yb)**2) - 0.1*torch.mean((y-m)**2)          # вычисляем ошибку
                L += 0.01*torch.mean(y*y)
            
                self.optimizer.zero_grad()                   # обнуляем градиенты        
                L.backward()                                 # вычисляем градиенты            
                self.optimizer.step()                        # подправляем параметры 

                sumL += L.detach().item()
            self.lastL = sumL/numB

    #---------------------------------------------------------------------------------------


In [None]:
env = gym.make("MountainCar-v0")

model = DQN( env )
model.init()
model.learn()
#model.test(episodes = 10)
