# Pakete installieren

In [1]:
! pip install gymnasium
! pip install gymnasium[classic-control]
! pip install -U tensorflow




[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


# Einen Einblick bekommen

Um da Problem besser zu verstehen, ist es eine gute Idee zufällige Aktionen zu wählen und visuell zu inspizieren was passiert.

In [1]:
import gymnasium as gym

In [2]:
env = gym.make("CartPole-v1", render_mode="human")
obs, info = env.reset()  # Start Simulation

for _ in range(1000):
    action = env.action_space.sample()  # Choose random action
    obs, reward, terminated, truncated, info = env.step(action)  # perform random action in simulation

    # if our agent fails we reset the environment
    if terminated or truncated:
        obs, info = env.reset()

env.close()  # start the rendering for us to see

Außerdem schauen wir uns an, wie viele verschiedene Aktionen es gibt und wie die Beobachtungen aussehen.

In [2]:
GAME = "CartPole-v1"
env = gym.make(GAME)

print(f'Action space {env.action_space}')
print(f'Action space size: {env.action_space.n}')

obs = env.reset()

print(f'Observation space shape: {obs[0].shape}')

Action space Discrete(2)
Action space size: 2
Observation space shape: (4,)


Wir sehen wir haben 2 diskrete Aktionen (nach links, nach rechts) aus denen wir wählen können. Unser Agent erhält vier Observationen. Der [Dokumentation](https://gymnasium.farama.org/environments/classic_control/cart_pole/) zu Folge sind das *Position des Karts*, *Geschwindigkeit des Karts*, *Winkel der Stange* und *Winkelgeschwindigkeit der Stange*.

Jetzt haben wir alle Informationen, die wir brauchen also...

# Let's Start building our agent

## Replay Buffer
Wir starten in dem wir eine Klasse für den Replay Buffer definieren. Dieser speichert vorherige Zustandsübergänge und erlaubt es uns aus allen Übergängen in einem durch `buffer_size` vordefinierten Zeitfenster zufällig zu samplen. Für jeden Übergang speichern wir die vorherige Observation, die Aktion, die wir gewählt haben, den Reward den wir dafür bekommen haben, die Observation des nächsten Zustands in den wir durch Wahl der Aktion gekommen sind und die Information, ob die Simulation nach unserer Aktion beendet war.

In [3]:
from collections import deque
import random as rand

class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.memory = deque(maxlen=buffer_size)
    
    def add(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        if batch_size <= len(self.memory):
            return rand.sample(self.memory, batch_size)
        else:
            assert False
    
    def __len__(self):
        return len(self.memory)

## Neural Network class
Wir verwenden ein relativ simples fully-connected Neuronales Netz. Unser Netz hat eine frei wählbare Anzahl an hidden layern mit ReLU-nicht-Linearität und 64 Neuronen. Das Netz hat außerdem einen Ausgabe-Layer ohne nicht-Linearität und genau so vielen Ausgabeneuronen wie er wählbare Aktionen gibt.

Wir definieren außerdem eine `act` Methode, die es uns erlaubt direkt die Aktion zu erhalten, die den hächsten vorhergesahgten Q-Wert hatte.

Außerdem definieren wir eine `get_model` Methode die es uns erlaubt schnell ein bereits kompiliertes Modell zu erhalten. Zudem haben wir eine `get_copy` und eine `transfer_weights` Methode, die es vereinfachen ein Target Netz zu erzeugen und zu aktualisieren.

In [4]:
from tensorflow import keras
import numpy as np
from copy import deepcopy

class DQN(keras.Model):

    def __init__(self, obs_space, num_actions, num_hidden_layers, optimizer, lr, loss):
        super(DQN, self).__init__()

        self.obs_space = obs_space
        self.input_layer = keras.layers.InputLayer(input_shape=obs_space)

        self.hidden_layers = []
        for _ in range(num_hidden_layers):
            self.hidden_layers.append(keras.layers.Dense(64, activation='relu'))
        self.output_layer =  keras.layers.Dense(num_actions, activation='linear')

        self.optimizer = optimizer
        self.rate = lr
        self.loss = loss
        self.num_actions = num_actions
        self.num_hidden_layers = num_hidden_layers

    def call(self, inputs):
        
        x = self.input_layer(inputs)

        for l in self.hidden_layers:
            x = l(x)
        
        q_vals = self.output_layer(x)
        return q_vals

    def act(self, state):
        q_vals = self(state)
        action = np.argmax(q_vals)
        return action

    @staticmethod
    def get_model(input_shape, num_action, num_hidden_layers, optimizer=keras.optimizers.Adam, lr=1e-5, loss=keras.losses.MeanSquaredError):
        model = DQN(input_shape, num_action, num_hidden_layers, optimizer, lr, loss)
        optimizer = deepcopy(optimizer)
        optimizer = optimizer(learning_rate=lr)
        model.compile(
            optimizer=optimizer,
            loss=keras.losses.MeanSquaredError()
        )
        # model.build(input_shape)
        return model
    
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'obs_space': self.obs_space,
            'num_actions': self.num_actions,
            'num_hidden_layers': self.num_hidden_layers,
            'optimizer': self.optimizer,
            'lr': self.rate,
            'loss': self.loss
        })
        return config
    
    def get_copy(self):
        model_copy = keras.models.clone_model(self)
        model_copy.set_weights(self.get_weights())
        model_copy.compile(
            optimizer=self.optimizer,
            loss=keras.losses.MeanSquaredError()
        )
        return model_copy
    
    def transfer_weights(self, target_net):
        target_net.set_weights(self.get_weights)


## Hyperparameter
Wir müssen einige Hyperparameter definieren. Diese Werte sind frei wählbar aber manche, wie `GAMMA = 0.99` sind de-facto Standard.

In [5]:
# Hyperparams
GAMMA = 0.99                # weight of old (~1) vs new (~0) situations
BATCH_SIZE = 32             # we know this one
BUFFER_SIZE = 50000         # how large is my memory
MIN_REPLAY_SIZE = 1000      # how much must I have seen before I start training
EPSILON_START = 1.0         # in the beginning we do everything random
EPSILON_END = 0.01          # in the end we do 1% of actions randomly
EPSILON_DECAY = 100000      # we go from random to non-random over 100000 steps
TARGET_UPDATE_FREQ = 1000   # every 1000 steps we update the target net
TRAINING_FREQ = 1           # every steps we train our model

## Weitere Vorraussetzungen für das Training
Bevor wir anfangen können zu trainieren müssen wir Replay und Reward Buffer initialisieren, die Netzwerke erzeugen und den Replay Buffer soweit befüllen, dass wir sinnvoll aus diesem samplen können.

In [6]:
replay_buffer = ReplayBuffer(BUFFER_SIZE)
reward_buffer = deque([0.0], maxlen = 100)

episode_reward = 0.0

In [7]:
online_net = DQN.get_model(obs[0].shape, env.action_space.n, 1, keras.optimizers.Adam, 1e-4)
target_net = online_net.get_copy()

Auffüllen des Replay Buffers, so dass wir genug Daten haben, um mit dem Training zu beginnen:

In [8]:
def step_env(action, last_obs, is_training=True):
    global episode_reward, reward_buffer
    new_obs, reward, done, truncated, _ = env.step(action)
    replay_buffer.add(last_obs, action, reward, new_obs, done)
    obs = new_obs

    if is_training:
        episode_reward += reward

    if done or truncated:
        obs = env.reset()
        if is_training:
            reward_buffer.append(episode_reward)
            episode_reward = 0.
    return obs

In [9]:
obs = env.reset()
for _ in range(MIN_REPLAY_SIZE):
    action = env.action_space.sample()
    obs = step_env(action, obs, is_training=False)

## Epsilon-greedy Policy
Die Epsilon-greedy Policy besagt, dass wir eine zufällige Aktion mit Wahrscheinlichkeit Epsilon wählen und mit Wahrscheinlichkeit 1-Epsilon das gelernte Wissen unseres Netzes ausnutzen.

In [10]:
import random

def get_epsilon_greedy_action(obs, epsilon):
    rand_samp = random.random()
    if rand_samp <= epsilon:
        action = env.action_space.sample()
    else:
        if type(obs) == tuple:
            obs = obs[0]
        obs = np.expand_dims(obs, 0)
        action = online_net.act(obs)
    return action

## Training Loop
Für das Training samplen wir zufällig aus dem Replay Buffer. Dann wandeln wir die gesampleten Werte in np arrays um. Jetzt folgen wir dem Double Deep Q Learning Algorithmus und sagen Q Werte für die gesampleten nächsten Beobachtungen mit unserem Target Netz hervor. Mit Hilfe der Bellmann Gleichung erhalten wir so die Werte, die das Neuronale Netz lernen soll zu approximieren.

Das eigentliche Training erfolgt dann mit der Standard Keras fit API.

In [11]:
def training():
    transitions = replay_buffer.sample(BATCH_SIZE)
    # print(transitions[0])
    obses = [t[0] for t in transitions]
    for i, o in enumerate(obses):
        if type(o) == tuple:
            obses[i] = o[0]
            o = o[0]

    obses = np.stack(obses)
    actions = np.array([t[1] for t in transitions])
    rewards = np.array([t[2] for t in transitions])
    next_obses = np.stack([t[3] for t in transitions])
    dones = np.array([t[4] for t in transitions], dtype=np.int16)
    
    q_values = online_net(obses).numpy()
    next_q_values = target_net(next_obses).numpy()

    targets = []
    for i, q in enumerate(q_values):
      target = deepcopy(q)
      target[actions[i]] = rewards[i] + GAMMA * (1-dones[i]) * np.amax(next_q_values[i])
      targets.append(target)
    targets = np.stack(targets)
    online_net.fit(obses, targets, epochs=1, batch_size=BATCH_SIZE, verbose=0)

In [15]:
training()

Für das Training starten wir einen potenziell endlosen Loop. Zu jedem Zeitschritt berechnen wir das aktuelle Epsilon basierend auf linearer Extrapolation. Dann wählen wir eine Aktion basierend auf unserer Epsilon-greedy Policy. Wir trainieren das online Modell alle `TRAINING_FREQ` Zeitschritte. Außerdem kopieren wir alle `TARGET_UPDATE_FREQ` Zeitschritte die Gewichte des online Netzes in das target Netz. Außerdem printen wir regelmäßig den mittleren Reward über die letzten 100 Zeitschritte, um den Trainingsfortschritt zu beurteilen. Falls wir einen vordefinierten maximalen Reward erreichen sagen wir das Training ist beendet.

In [14]:
import itertools

obs = env.reset()
for step in itertools.count():
    epsilon = np.interp(step, [0, EPSILON_DECAY], [EPSILON_START, EPSILON_END])
    action = get_epsilon_greedy_action(obs, epsilon)
    obs = step_env(action=action, last_obs=obs)

    if (step+1) % TRAINING_FREQ == 0:
        training()

    if (step+1) % TARGET_UPDATE_FREQ == 0:
        target_net.set_weights(online_net.get_weights())

    if (step+1) % 100 == 0:
        print()
        print(f'Step: {step+1}')
        print(f'Avg Reward: {np.mean(reward_buffer)}')
    
    if len(reward_buffer) >= 100:
        if np.mean(reward_buffer) >= 195:
            print("Solved Problem")
            break

KeyboardInterrupt: 

: 

# Wie verhält sich unser trainierter Agent?

In [None]:
from tensorflow import keras
restored_net = keras.models.load_model("my_cartpole_agent")

In [None]:
import numpy as np
env = gym.make("CartPole-v1", render_mode="human")
obs, info = env.reset()

for _ in range(1000):
    action = np.argmax(restored_net.predict(np.expand_dims(obs, 0), verbose=0))
    obs, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        obs, info = env.reset()

env.close()