# Bufory doświadczeń

In [1]:
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)

import ptan
import gym
from typing import List, Optional, Tuple, Any
import torch
import torch.nn as nn

  from torch.distributed.optim import ZeroRedundancyOptimizer


In [2]:
class ToyEnv(gym.Env):
    def __init__(self):
        super(ToyEnv, self).__init__()
        self.observation_space = gym.spaces.Discrete(n=5)
        self.action_space = gym.spaces.Discrete(n=3)
        self.step_index = 0

    def reset(self):
        self.step_index = 0
        return self.step_index, {}

    def step(self, action):
        is_done = self.step_index == 10
        if is_done:
            return self.step_index % self.observation_space.n, \
                   0.0, is_done, False, {}
        self.step_index += 1
        return self.step_index % self.observation_space.n, \
               float(action), self.step_index == 10, False, {}


class DullAgent(ptan.agent.BaseAgent):
    def __init__(self, action: int):
        self.action = action

    def __call__(self, observations: List[Any],
                 state: Optional[List] = None) \
            -> Tuple[List[int], Optional[List]]:
        return [self.action for _ in observations], state

  and should_run_async(code)


In [3]:
# W głębokich sieciach Q korzystamy z fragmentów doświadczeń zebranych w buforach
# Odczytywane są one w celu uzyskania paczki treningowej, losowo lub przy użyciu wag priorytetowych
# Implementując je, powinniśmy zwrócić uwagę na:
# 1. Efektywne pobieranie danych z dużego bufora
# 2. Sposób usuwania starych danych
# 3. Zarządzanie priorytetami w buforze priorytetowym
# Kwestie te istotne są dla wydajności procesu treningowego

# Biblioteka PTAN udostępnia kilka rodzajów buforów
# Najprostszy jest ExperienceReplayBuffer o predefiniowanym rozmiarze i jednolitym próbkowaniu
# Mamy też PrioritizedReplayBuffer i PrioReplayBufferNaive

env = ToyEnv()
agent = DullAgent(action=1)
exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=1.0, steps_count=1)
buffer = ptan.experience.ExperienceReplayBuffer(exp_source, buffer_size=100)

In [4]:
# pętla treningowa

for step in range(6):
    buffer.populate(1) # metoda pozwalająca na pobranie N próbek ze źródła doświadczenia i umieszczenie ich w buforze
    if len(buffer) < 5:
        continue
    batch = buffer.sample(4) # metoda pozwalająca na pozyskanie paczki z N obiektami doświadczenia
    print("Train time, %d batch samples:" % len(batch))
    for s in batch:
        print(s)

# po powyższych operacjach - we właściwej pętli:
# 1. obliczenie straty
# 2. użycie propagacji wstecznej
# 3. powtórzenie kroków (łącznie z metodami populate() i sample()), aż do uzyskania stabilnych wyników

Train time, 4 batch samples:
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=0, action=1, reward=1.0, last_state=1)
ExperienceFirstLast(state=3, action=1, reward=1.0, last_state=4)
Train time, 4 batch samples:
ExperienceFirstLast(state=4, action=1, reward=1.0, last_state=0)
ExperienceFirstLast(state=0, action=1, reward=1.0, last_state=1)
ExperienceFirstLast(state=0, action=1, reward=1.0, last_state=1)
ExperienceFirstLast(state=0, action=1, reward=1.0, last_state=1)


# Klasa TargetNet

In [5]:
# Klasa ta pozwala na synchronizację dwóch sieci neuronowych o tej samej architekturze
# Dzięki temu stabilność procesu trenowania poprawia się
# Dwa tryby synchronizacji:
# 1. sync() - wagi sieci źródłowej są kopiowane do sieci docelowej
# 2. alpha_sync() - wagi sieci źródłowej są łączone z wagami sieci docelowej przy użyciu współczynnika alfa (zakres 0 - 1)

class DQNNet(nn.Module): # sieć źródłowa
    def __init__(self, obs_size: int, n_actions: int):
        super(DQNNet, self).__init__()
        self.ff = nn.Linear(5, 3)

    def forward(self, x):
        return self.ff(x)

net = DQNNet(5, 3)
net

DQNNet(
  (ff): Linear(in_features=5, out_features=3, bias=True)
)

In [6]:
tgt_net = ptan.agent.TargetNet(net) # sieć docelowa
print(net.ff.weight)
print(tgt_net.target_model.ff.weight)

Parameter containing:
tensor([[-0.0190, -0.4170, -0.2922, -0.1017, -0.1230],
        [-0.4086, -0.2317,  0.2755, -0.1093, -0.0145],
        [ 0.2008, -0.4454, -0.2162,  0.0386, -0.1118]], requires_grad=True)
Parameter containing:
tensor([[-0.0190, -0.4170, -0.2922, -0.1017, -0.1230],
        [-0.4086, -0.2317,  0.2755, -0.1093, -0.0145],
        [ 0.2008, -0.4454, -0.2162,  0.0386, -0.1118]], requires_grad=True)


In [7]:
# Wagi sieci źródłowej i docelowej są takie same
# Sieci, mimo takiej samej architektury, są od siebie niezależne

net.ff.weight.data += 1.0
print(net.ff.weight)
print(tgt_net.target_model.ff.weight)

Parameter containing:
tensor([[0.9810, 0.5830, 0.7078, 0.8983, 0.8770],
        [0.5914, 0.7683, 1.2755, 0.8907, 0.9855],
        [1.2008, 0.5546, 0.7838, 1.0386, 0.8882]], requires_grad=True)
Parameter containing:
tensor([[-0.0190, -0.4170, -0.2922, -0.1017, -0.1230],
        [-0.4086, -0.2317,  0.2755, -0.1093, -0.0145],
        [ 0.2008, -0.4454, -0.2162,  0.0386, -0.1118]], requires_grad=True)


In [8]:
# Aby zsynchronizować sieci, możemy użyć metody sync()

tgt_net.sync()
print(tgt_net.target_model.ff.weight)

Parameter containing:
tensor([[0.9810, 0.5830, 0.7078, 0.8983, 0.8770],
        [0.5914, 0.7683, 1.2755, 0.8907, 0.9855],
        [1.2008, 0.5546, 0.7838, 1.0386, 0.8882]], requires_grad=True)
