# Implementierung eines Reinforcement-Learning-Projekts

In [None]:
%%HTML
<style>
.container { width:100% }
</style>

## Das Problem
Wir sind Betreiber eine Spedition und sind zuständig für die Belieferung von Supermärkten. Dafür muss Ware zwischen Lagern und den Märkten innerhalb unserer Stadt transportiert werden. Unser innovativer LKW ist selbstfahrend und bekommt lediglich den Auftrag an einer Station (Lager oder Supermarkt) Ware einzusammeln und an einer anderen Station wieder abzuliefern.

## Definitionen
Wir bedienen zwei Lager und zwei Supermärkte.

Lager:
0. A-Lager (A)
1. B-Lager (B)

Supermarkt:
2. C-Markt (C)
3. D-Markt (D)

Diese verteilen sich folgendermaßen in unserer Stadt:

<img src="img/city.png" alt="stadt" width="400"/>

Die Stadt ist eine 6x6-Quadratestadt und mit 36 Positionen, die die Koordinaten $(0,0)$ bis $(5,5)$ haben. Der LKW kann sich frei in der Stadt bewegen, aber nicht durch die Grünstreifen fahren.

Die Anzahl der Zustände ergibt sich folgendermaßen:
* 6 x 6 Positionen
* 4 Orte zu denen die Ware gebracht werden kann (A bis D bzw. 0 bis 3)
* 5 Orte, an denen sich die Ware befindet (A bis D bzw. 0 bis 3 und im LKW (Position Nr.4))

\\[ 6 \cdot 6 \cdot 4 \cdot 5 = 720 \texttt{ mögliche Zustände}\\]

Die Aktionen, die der LKW ausführen kann sind:
0. nach Norden fahren
1. nach Osten fahren
2. nach Süden fahren
3. nach Westen fahren
4. Ware einsammeln
5. Ware abladen

Dabei kann er folgende Belohnungen (und Abzüge) erhalten:
* Ware korrekt abliefern: +20
* Ware falsch einsammeln/abliefern: -10
* Pro Schritt: -1

## Definitionen implementieren

In [None]:
import copy
import random
import time
import numpy as np

Im Folgenden werden die zuvor getroffenen Spezifikationen in ein für den Rechner verständliches Format überführt. Es gibt jeweils sechs Spalten und Reihen (0 bis 5) und eine Menge von Aktionen (ebenfalls 0 bis 5). Für die Ausgabe wird noch eine Überführung in eine Beschreibung vorgenommen. Außerdem werden die Koordinaten der Lager und Supermärlte festgehalten. Die Koordianten haben dabei die Form `(Reihe, Spalte)`.

Die Grünstreifen werden in einer Menge gespeichert. Ein Grünstreifen liegt immer zwischen zwei Feldern. Diese werden in einem Tupel in der Reihenfolge `(Links, Rechts)` bzw. `(Oben, Unten)` angegeben. Daraus ergibt sich für jedes Stück eines Grünstreifens ein Tupel der Form `((Reihe Zelle links, Spalte Zelle links), (Reihe Zelle rechts, Spalte Zelle rechts))` bzw. mit "oben" und "unten", falls es sich um einen horizontalen Streifen handelt.

In [None]:
rows = [row for row in range(0, 6)]
cols = [col for col in range(0, 6)]
actions = {action for action in range(0, 6)}
actions_description = ["Drive north", "Drive east", "Drive south", "Drive west", "Pickup goods", "Dropoff goods"]
stations = [(0,0), (0,3), (5,0), (4,5)]
stations_descriptions = ["Warehouse A", "Warehouse B", "Supermarket C", "Supermarket D"]
position_goods_descriptions = stations_descriptions + ["In the truck"]
walls = {
    ((0,2), (0,3)), #vertikal
    ((1,2), (1,3)),
    ((3,1), (3,2)),
    ((4,0), (4,1)),
    ((5,0), (5,1)),
    ((5,2), (5,3)),
    ((1,0), (2,0)), # horizonal
    ((1,5), (2,5)),
    ((3,4), (4,4)),
    ((3,5), (4,5))
}

Als nächstes soll für jedes Feld im Stadtplan festgehalten werden, welche Aktionen von diesem Feld ausgehend möglich sind. Zunächst wird allen Feldern alle Aktionen zugewiesen. Den am Rand liegenden Feldern wird die Aktion aberkannt, die aus der Stadt heraus führen würde. Einsammel- und Ablieferaktionen sind nur an den zuvor definierten Stationen möglich, weshalb den anderen Feldern diese Aktion ebenfalls entzogen wird. Als nächstes werden Felder betrachtet, die in der direkten Nachbarschaft eines Grünstreifens liegen und dort die Aktionen entfernt, die den LKW dazu veranlassen würden, die Grünanlage zu zerstören. Daraus ergibt sich dann das gewünschte Dictionary mit dem feld als Key und einer Menge möglicher Aktionen als Value.

In [None]:
possible_actions = dict()

for row in rows:
    for col in cols:
        possible_actions[(row, col)] = copy.deepcopy(actions)
for key in possible_actions:
    (row, col) = key
    # Ränder
    if row == 0:
        possible_actions[key].remove(0)
    elif row == 5:
        possible_actions[key].remove(2)
    if col == 0:
        possible_actions[key].remove(3)
    elif col == 5:
        possible_actions[key].remove(1)
    # Einsammeln, Abliefern
    if (row, col) not in stations:
        possible_actions[key].remove(4)
        possible_actions[key].remove(5)
    # Grünstreifen
    if ((row, col), (row, col + 1)) in walls:
        possible_actions[key].remove(1)
    if ((row, col - 1), (row, col)) in walls:
        possible_actions[key].remove(3)
    if ((row, col), (row + 1, col)) in walls:
        possible_actions[key].remove(2)
    if ((row - 1, col), (row, col)) in walls:
        possible_actions[key].remove(0)

## Die Transport-Klasse

Die Transportklasse speichert den aktuellen Zustand (`state`), die grafische Darstellung (`canvas`) von diesem sowie den aktuell erreichten Wert (`current_value`). Außerdem speichert sie, wie viele Schritte bereits ausgeführt wurden (`steps`). Ebenfalls ist eine Methode zur Ermittlung der nächsten Aktion, die `action_method`, zu übergeben. Dies kann eine Methode sein, die eine zufällige Aktion zurückgibt (`random_action`) oder eine Methode, die die durch Q-Learning erlernten Werte ausnutzt (`q_learning_action`). Außerdem werden alle besuchten Zustände in `visited_states` aufgelistet. Ist die Variable `stepwise` auf `True` gesetzt, wird zur besseren Sichtbarkeit der Aktionen in der Darstellung nach jeder Aktion eine kurze Pause durchgeführt. Die graphische Darstellung erfolgt nur, wenn `visualize` auch `True` ist. Die Variable `done` markiert, ob der Transport der Waren erfolgreich abgeschlossen wurde. Für eine spätere Berechnung wird die Vorgängerposition der Waren benötigt, weshalb diese hier ebenfalls abgespeichert wird.

In [None]:
class Transport():
    def __init__(self, state, action_method, stepwise = False, visualize = False):
        self.state = state
        self.canvas = init_canvas(self.state)
        self.action_method = action_method
        self.current_value = 0
        self.steps = 0
        self.visited_states = [self.state]
        self.stepwise = stepwise
        self.visualize = visualize
        self.pause = True
        self.done = False
        self.position_goods_old = self.state[1]
        
        self.canvas[3].on_mouse_down(self.handle_mouse_down)

Die Transition function gibt für eine Aktion den Zustand zurück der folgt, wenn man auf den aktuellen Zustand die gewünschte Aktion anwendet. Falls die Aktion nicht möglich ist, wird der alte Zustand zurückgegeben. Ein Zustand ist dabei folgendermaßen aufgebaut:

`(Position LKW, Position Ware, Position Ziel)`.

Wobei gilt:
* Position LKW = (Reihe LKW, Spalte LKW)
* Position Ware $\in$ \[0, 4\] $\rightarrow$ \[Lager A, Lager B, Supermarkt C, Supermarkt D, im LKW\]
* Position Ziel $\in$ \[0, 3\] $\rightarrow$ \[Lager A, Lager B, Supermarkt C, Supermarkt D\]

Bei der Funktion handelt es sich um eine Funktion der Klasse `Transport`.

In [None]:
def transition_function(self, action):
    position_truck, position_goods, position_goal = self.state
    if action not in possible_actions[position_truck]:
        return self.state
    
    row, col = position_truck
    if action == 0:
        position_truck = (row - 1, col)
    elif action == 1:
        position_truck = (row, col + 1) 
    elif action == 2:
        position_truck = (row + 1, col)
    elif action == 3:
        position_truck = (row, col - 1)
    elif action == 4:
        if position_goods == stations.index(position_truck):
            position_goods = 4
    elif action == 5:
        if position_goods != 4:
            return self.state
        position_goods = stations.index(position_truck)
            
    return (position_truck, position_goods, position_goal) 
Transport.transition_function = transition_function
del transition_function

`reward_function()` gibt die Belohnung zurück, die furch die Aktion im aktuellen Zustand erzielt wird.
Bei der Funktion handelt es sich ebenfalls um eine Funktion der Klasse `Transport`.

In [None]:
def reward_function(self, action):
    reward = -1 # Pro Schritt
    position_truck, position_goods, position_goal = self.state
    
    if action not in possible_actions[position_truck]: return reward
    
    if action == 4:
        station_truck = stations.index(position_truck)
        if position_goods != station_truck: # Ware falsch einsammeln
            reward -= 10
    elif action == 5:
        station_truck = stations.index(position_truck)
        if position_goal != station_truck and position_goods == 4: # Ware falsch abliefern  
            reward -= 10
        elif position_goal == station_truck and position_goods == 4: # Ware korrekt abliefern
            reward += 20
        else: # Abliefern ohne geladene Ware
            reward -= 10
            
    return reward
Transport.reward_function = reward_function
del reward_function

Die Funktion `random_action()` gibt eine zufällige Aktion zurück.

In [None]:
def random_action(state_num):
    return random.randrange(0, 6, 1)

`handle_mouse_down()` wird durch Klicken auf die Karte aufgerufen. Die Funktion ruft lediglich `transport_goods()` auf. Diese Funktion führt dann so oft eine Aktion, die von der gewünschten `action_method()` ausgewählt wird, aus, bis die Ware an ihrem Zielort eingetroffen ist. Nach dem Ausführen wird die Darstellung falls gewünscht aktualisiert. Die Konstante `MAX_VISITS_STATE` gibt an, wie oft ein Zustand besucht werden kann, bevor ausgehend von diesem Zustand statt der eigentlich gewünschten Aktion eine zufällige Aktion gestartet wird. Diese Randomisierung ist notwendig, um zu verhindern, dass sich Aktions-Schleifen bilden, in denen der Agent festhängt. Im idealen Fall besucht der Agenten einen State nur ein mal (Position zweimal, einmal mit Paktet, einmal ohne). DEr Wert wird trotzdem höher angesetzt, um keine zu starke Einschränkung zu bieten.

Bei der Funktion handelt es sich auch um eine Funktion der Klasse `Transport`.

In [None]:
def handle_mouse_down(self, x, y):
    self.transport_goods()
Transport.handle_mouse_down = handle_mouse_down
del handle_mouse_down

In [None]:
MAX_VISITS_STATE = 10

In [None]:
def transport_goods(self):
    position_truck, position_goods, position_goal = self.state
    while not self.done:    
        self.steps += 1
        state_num = state_to_state_num(self.state)
        action = self.action_method(state_num)
        if action not in possible_actions[position_truck]:
            action = random.sample(possible_actions[position_truck], 1)[0]
        
        value = self.reward_function(action)
        new_state = self.transition_function(action)
        if self.visited_states.count(new_state) > MAX_VISITS_STATE:
            action = random.sample(possible_actions[position_truck], 1)[0]
            value  = self.reward_function(action)
            new_state = self.transition_function(action)
        
        self.current_value += value
        self.state = new_state
        self.visited_states.append(self.state)
        position_truck, position_goods, position_goal = self.state
        
        if position_goods == position_goal: self.done = True
        if self. visualize:
            action_label.value = 'Action: ' + actions_description[action]
            update_canvas(self)
        if self.stepwise: time.sleep(0.1)
Transport.transport_goods = transport_goods
del transport_goods

Im folgenden werden alle möglichen Zustände in einer Liste gespeichert, sodass sie nummerierbar und damit eindeutig identifizierbar sind. Die nachfolgenden Funktionen `state_to_state_num()` und `state_num_to_state()` dienen dazu, zwischen den beiden Darstellungsweisen zu wechseln.

In [None]:
states = []
for row in range (0, 6):
    for col in range (0, 6):
        for position_goods in range (0, 5):
            for position_goal in range (0, 4):
                states.append(((row, col), position_goods, position_goal))

In [None]:
def state_to_state_num(state):
    return states.index(state)

def state_num_to_state(num):
    return states[num]

## Q-learning

Zunächst wird eine Tabelle mit einer Zeile für jeden Zustand und einer Spalte für jede Aktion angelegt. In jede Zelle wird zu Beginn der Wert Null geschrieben.

In [None]:
q_table = np.zeros([len(states), len(actions)])

Die `q_function()` nimmt einen Zustand und eine Aktion sowie die Lernrate $\alpha$ und den Discount-Faktor $\gamma$. Darauf basierend berechnet sie mit Hilfe der `q_table` den neuen Wert für den Zustand und die Aktion und trägt diesen an der entsprechenden Stelle in der Tabelle ein. Zurückgegeben wird außerdem der nächste Zustand. Die Formel für die Berechnung des neuen Wertes lautet folgendermaßen:

\\[Q_{\texttt{new}}(s_t, a_t) \leftarrow Q_{\texttt{old}}(s_t, a_t) + \alpha \cdot (r_t + \gamma \cdot max_q(s{t+1}, a) - Q_{\texttt{old}}(s_t, a_t))\\]

mit:
* $s_t$, $s_{t+1} \in$ len(states)
* a $\in$ actions
* $Q_{\texttt{old}}(s_t, a_t)$ = Tabellenwert in Zeile $s_t$ und Spalte $a_t$
* $\alpha$ = Lernrate
* $\gamma$ = Discount-Faktor
* $r_t$ = Reward
* max_q = Schätzung des optimalen Zukunftswertes

Der Wert in der Klammer wird dabei auch als "temporal difference" bezeichnet.

In [None]:
def q_function(state, action, alpha, gamma):
    transport  = Transport(state, stepwise = False, visualize = False)
    
    reward     = transport.reward_function(action)
    next_state = transport.transition_function(action)
    
    state_num      = state_to_state_num(state)
    next_state_num = state_to_state_num(next_state)
    
    old_value  = q_table[state_num][action]
    max_q      = np.max(q_table[next_state_num])
    new_value  = old_value + alpha * (reward + gamma * max_q - old_value)
    
    q_table[state_num, action] = new_value
    return next_state

Die Funktion `q_learning()` führt für eine zu übergebende Anzahl an Startzuständen (`episodes`) den Transport der Waren zum Ziel durch. Der Startzustand ist dabei ein zufälliger. Dies dient dem Training und somit der Verbesserung der `q_table`. Denn in jedem Schritt wird `q_function()` aufgerufen.  In rund 10% (= `epsilon`) der Schritte ist auch die Aktion zufällig ausgewählt. In den anderen Fällen wird die am besten bewertete Aktion für den aktuellen Zustand genutzt.

In [None]:
def q_learning(episodes):
    alpha   = 0.1
    gamma   = 0.6
    epsilon = 0.1
    
    for episode in range(episodes):
        state = states[random.randrange(0, len(states), 1)]
        done  = False
        
        while not done:
            state_num = state_to_state_num(state)
            if random.uniform(0, 1) < epsilon:
                action = random.sample(possible_actions[state[0]], 1)[0] # Aktionsbereich erkunden
            else:
                action = np.argmax(q_table[state_num]) # Gelernte Werte ausnutzen
                
            state = q_function(state, action, alpha, gamma)
            _, position_goods, position_goal = state
            if position_goods == position_goal: done = True
        print(round(episode + 1/episodes*100, 1), '%', end='\r')

Zum Trainieren muss die Funktion `q_learning()` mit einem relativ hohen Wert aufgerufen werden. Damit dies nicht nach jedem Neustart des Kernels notwendig ist, kann die Tabelle abgespeichert und beim nächsten mal einfach gelade werden.

In [None]:
#q_learning(10000) # Trainieren
#np.savetxt('q_table.txt', q_table) # Speichern der Tabelle
q_table = np.loadtxt('q_table.txt') # Laden der Tabelle

Die folgende Methode kann dann nach dem Trainieren in der tatsächlichen Anwendung genutzt werden, üm für jeden Zustand die beste Aktion zu ermitteln. Dazu muss sie der Klasse `Transport` bei der Initialisierung übergeben werden und wird später dann von `transport_goods()` aufgerufen.

In [None]:
def q_learning_action(state_num):
    return np.argmax(q_table[state_num]) 

## Visualisierung

Das Notebook, das im Folgenden geladen wird, beinhaltet Funktionen für die grafische Darstellung des aktuellen Zustands.

In [None]:
%run ./04_implementierung_gui.ipynb

Folgendermaßen kann dann eine Episode ausgeführt werden. Als Methoden für die Auswahl der Aktionen sind `random_action` und `q_learning_action` möglich.

In [None]:
#state = (((3, 0), 0, 2))
#transport = Transport(state, action_method = q_learning_action, stepwise = True, visualize = True)
#update_canvas(transport)

## Evaluation
Um die Performance des trainierten Agenten beurteilen zu können, wird für eine große Menge an zufälligen Startzuständen einmal zur Auswahl der Aktionen der zufällige Algorithmus genutzt und einmal jener, der auf den erlernten Werten basiert:

In [None]:
def evaluation(number_of_tests, action_method):
    total_value = 0
    total_steps = 0

    for i in range (number_of_tests):
        state = states[random.randrange(0, len(states), 1)]
        transport = Transport(state, action_method = action_method, stepwise = False, visualize = False)
        transport.transport_goods()

        total_value += transport.current_value
        total_steps += transport.steps
        #if i%10 == 0:
        print(round((i+1)/number_of_tests * 100, 1), '%', end='\r')

    print('Result after ' + str(number_of_tests) + ' tests:')
    print('Average value: ' + str(total_value/number_of_tests))
    print('Average number of steps: ' + str(total_steps/number_of_tests))
    print('Average reward per step: ' + str((total_value/number_of_tests)/(total_steps/number_of_tests)))

In [None]:
evaluation(2000, random_action)

Ergebnis nach 2000 Durchläufen ohne Reinforcement Learning:
* Average value: -1404.6395
* Average number of steps: 890.9995
* Average reward per step: -1.5764761933087503

In [None]:
evaluation(2000, q_learning_action)

Ergebnis nach 2000 Durchläufen mit Reinforcement Learning und 10.000 Trainingsdurchläufen für das Q-Learning:
* Average value: 1.2275
* Average number of steps: 12.8825
* Average reward per step: 0.0952843004075296

Das Ergebnis zeigt deutlich, dass sich das Trainieren gelohnt hat und sich die Anzahl der benötigten Schritte pro Episode deutlich reduziert hat. Schaut man sich die Episoden in der Visualisierung an, sieht man auch, dass der trainierte Agent meistens den optimalen, also kürzesten, Weg nimmt.