# Entwicklung und Evaluation eines DDQN-Agenten in der Gym-Umgebung CarRacing-v3

Notebook erstellt von Niklas Dziwisch (s221274)

Matrikelnummer: 8812125

E-Mail: niklas.dziwisch@sap.com

Dieses Notebook zeigt die komplette Entwicklung, das Training und die Auswertung eines RL-Agenten in der Umgebung 'CarRacing-v3' unter Verwendung eines Double Deep Q-Networks (DDQN).

# Abschnitt 1: Imports & Setup und Hilfsfunktionen

Setup vor dem Start:
1. venv erstellen (oder vorhandenen Python Kernel wählen)
2. Notwendige Packete installieren (pip install -r requirements.txt)

In [1]:
import gymnasium as gym
from gymnasium.wrappers import (
    ResizeObservation,                  # Skaliert die Beobachtungen
    FrameStackObservation,              # Stapelt mehrere Beobachtungen
    GrayscaleObservation,               # Konvertiert die Beobachtungen in Graustufen
)
import numpy as np 
from IPython.display import clear_output
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import time

Diese Zelle enthält die Funktion `moving_average()`, die dazu verwendet wird, den gleitenden Durchschnitt einer Zahlenreihe zu berechnen.

Dies ist nützlich, um Trends in den Trainingsmetriken wie Belohnung oder Verlust zu glätten und besser sichtbar zu machen.

Die Funktion nimmt eine Liste von Werten und eine Fenstergröße als Eingabe und gibt den geglätteten Verlauf zurück.

In [2]:
def moving_average(data, window_size=10):
    """
    Funktion zur Berechnung des gleitenden Durchschnitts der Eingabedaten.
    Nimmt eine Liste von Daten und eine Fenstergröße als Eingabe und gibt den gleitenden Durchschnitt zurück.
    Wenn die Länge der Daten kleiner als die Fenstergröße ist, wird die Originaldatenliste zurückgegeben.
    """
    if len(data) < window_size:
        return np.array(data)
    return np.convolve(data, np.ones(window_size) / window_size, mode="valid")

# Abschnitt 2: Agent-Definition (DDQN mit Replay Buffer)

Diese Zelle definiert das Deep Q-Network (DQN) Modell, das für die CarRacing-Umgebung verwendet wird.

Das Modell besteht aus mehreren Convolutional-Schichten zur Extraktion von Merkmalen aus den Bilddaten sowie voll verbundenen Schichten zur Berechnung der Q-Werte für jede mögliche Aktion.

Die forward-Methode beschreibt, wie eine Eingabe (Zustand) durch das Netzwerk verarbeitet wird, um die Q-Werte auszugeben.

In [3]:
class DQN(nn.Module):
    """
    Deep Q-Network (DQN) Modell für die CarRacing-Umgebung.
    Besteht aus mehreren Convolutional-Schichten gefolgt von voll verbundenen Schichten.
    """

    def __init__(self, action_dim):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),                                  # Convolutional Layer zur Extraktion von Merkmalen
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
            nn.Conv2d(32, 64, kernel_size=4, stride=2),                                 # Weitere Convolutional Layer
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
            nn.Conv2d(64, 64, kernel_size=3, stride=1),                                 # Letzte Convolutional Layer
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
        )
        self.fc = nn.Sequential(
            nn.Linear(3136, 512),                                                       # Voll verbundene Schicht, die die Ausgabe der Convolutional-Schichten verarbeitet
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
            nn.Linear(512, 256),                                                        # Weitere voll verbundene Schicht
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
            nn.Linear(256, 128),                                                        # Letzte voll verbundene Schicht
            nn.ReLU(),                                                                  # Aktivierungsfunktion ReLU
            nn.Linear(128, action_dim),                                                 # Ausgabe-Schicht, die die Q-Werte für jede Aktion vorhersagt
        )

    def forward(self, x):
        x = x / 255.0                                                                   # Normalisierung der Eingabedaten
        x = self.conv(x)                                                                # Durchlaufen der Convolutional-Schichten
        x = torch.flatten(x, 1)                                                         # Flatten der Ausgabe für die voll verbundenen Schichten
        return self.fc(x)                                                               # Durchlaufen der voll verbundenen Schichten und Ausgabe der Q-Werte

Diese Zelle definiert die Klasse `ReplayBuffer`, die als Zwischenspeicher für Erfahrungen des Agenten dient.

Der Replay Buffer speichert vergangene Zustände, Aktionen, Belohnungen und Folgezustände, damit der Agent beim Training zufällig ausgewählte Erfahrungen erneut verwenden kann.

Dies hilft, die Trainingsdaten zu durchmischen und die Stabilität des Lernprozesses zu verbessern.

In [4]:
class ReplayBuffer:
    """
    Replay Buffer zur Speicherung von Erfahrungen für das Training des DQN-Agenten.
    Verwendet eine deque mit einer maximalen Kapazität, um die letzten Erfahrungen zu speichern.
    """

    def __init__(self, capacity):                                                       # Initialisiert den Replay Buffer mit einer bestimmten Kapazität.
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):                            # Fügt eine Erfahrung zum Replay Buffer hinzu.
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):                                                       # Entnimmt eine Stichprobe von Erfahrungen aus dem Replay Buffer.
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return np.stack(state), action, reward, np.stack(next_state), done

    def __len__(self):                                                                  # Gibt die aktuelle Größe des Replay Buffers zurück.
        return len(self.buffer)

Diese Zelle definiert die Klasse `DQNAgent`, die den Double Deep Q-Network (DDQN) Agenten für die CarRacing-Umgebung implementiert.

Der Agent verwaltet das neuronale Netzwerk, den Replay Buffer, die Auswahl von Aktionen sowie das Training und die Aktualisierung der Netzwerke.

Außerdem enthält die Klasse Methoden zur schrittweisen Reduzierung des Epsilon-Werts, um die Balance zwischen Exploration und Exploitation während des Trainings zu steuern.

In [5]:
class DQNAgent:
    """
    DQN-Agent für die CarRacing-Umgebung.
    Implementiert das DQN-Training mit einem Replay Buffer und einem Target-Netzwerk.
    Ermöglicht die Auswahl von Aktionen basierend auf dem aktuellen Zustand und das Training des Netzwerks.
    """

    def __init__(self, action_dim, ddqn=True):                                          # Initialisiert den DQN-Agenten mit der Anzahl der Aktionen und ob DDQN verwendet werden soll.
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")      # Überprüft, ob CUDA verfügbar ist und setzt das Gerät entsprechend
        self.q_net = DQN(action_dim).to(self.device)                                    # Erstellt das Q-Netzwerk
        self.target_net = DQN(action_dim).to(self.device)                               # Erstellt das Target-Netzwerk
        self.target_net.load_state_dict(self.q_net.state_dict())                        # Kopiert die Gewichte des Q-Netzwerks in das Target-Netzwerk
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-4)                   # Optimierer für das Q-Netzwerk
        self.replay_buffer = ReplayBuffer(800_000)                                      # Initialisiert den Replay Buffer mit einer Kapazität von 800.000 Erfahrungen
        self.batch_size = 64                                                            # Größe der Batch für das Training
        self.gamma = 0.99                                                               # Diskontfaktor für zukünftige Belohnungen
        self.epsilon = 0.8                                                              # Startwert für Epsilon, der die Exploration steuert
        self.epsilon_start = 0.8                                                        # Startwert für Epsilon, der die Exploration steuert
        self.epsilon_min = 0.1                                                          # Minimaler Wert für Epsilon, um die Exploration zu begrenzen
        self.ddqn = ddqn                                                                # Flag, ob Double DQN verwendet werden soll
        self.TAU = 0.005                                                                # Faktor für die weiche Aktualisierung des Target-Netzwerks
        self.criterion = nn.SmoothL1Loss()                                              # Verlustfunktion für das Training des DQN-Netzwerks (Huber-Loss)

    def select_action(self, state):
        """
        Wählt eine Aktion basierend auf dem aktuellen Zustand.
        Mit einer Wahrscheinlichkeit von epsilon wird eine zufällige Aktion ausgewählt (Exploration),
        ansonsten wird die Aktion mit dem höchsten Q-Wert aus dem DQN-Netzwerk ausgewählt (Exploitation).
        """
        if random.random() < self.epsilon:                                              # Mit einer Wahrscheinlichkeit von epsilon wird eine zufällige Aktion ausgewählt
            return random.randint(0, 4) 
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)                   # Konvertiert den Zustand in einen Tensor und fügt eine Batch-Dimension hinzu
        with torch.no_grad(): 
            q_values = self.q_net(state)                                                # Berechnet die Q-Werte für den aktuellen Zustand
        return q_values.argmax().item()                                                 # Gibt die Aktion mit dem höchsten Q-Wert zurück

    def train_step(self):
        """
        Führt einen Trainingsschritt durch, indem es eine Batch von Erfahrungen aus dem Replay Buffer sampelt,
        die Q-Werte berechnet und das DQN-Netzwerk aktualisiert.
        Verwendet das Target-Netzwerk für die Berechnung der Ziel-Q-Werte.
        Führt auch eine weiche Aktualisierung des Target-Netzwerks durch.
        """
        if len(self.replay_buffer) < self.batch_size:                                   # Überprüft, ob genügend Erfahrungen im Replay Buffer vorhanden sind
            return 0

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(
            self.batch_size
        ) # Entnimmt eine Stichprobe von Erfahrungen aus dem Replay Buffer

        states = torch.FloatTensor(states).to(self.device)                              # Konvertiert die Zustände in Tensoren
        next_states = torch.FloatTensor(next_states).to(self.device)                    # Konvertiert die nächsten Zustände in Tensoren
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)                # Konvertiert die Aktionen in Tensoren und fügt eine Batch-Dimension hinzu
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)               # Konvertiert die Belohnungen in Tensoren und fügt eine Batch-Dimension hinzu
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)                   # Konvertiert die "done"-Flags in Tensoren und fügt eine Batch-Dimension hinzu

        q_values = self.q_net(states).gather(1, actions)                                # Berechnet die Q-Werte für die ausgewählten Aktionen     

        with torch.no_grad():
            if self.ddqn:                                                               # Wenn DDQN verwendet wird, wird die Aktion mit dem höchsten Q-Wert aus dem Q-Netzwerk ausgewählt
                next_actions = self.q_net(next_states).argmax(1, keepdim=True)          # Wählt die Aktion mit dem höchsten Q-Wert aus dem Q-Netzwerk
                next_q_values = self.target_net(next_states).gather(1, next_actions)    # Berechnet die Q-Werte für die nächsten Zustände basierend auf der Aktion mit dem höchsten Q-Wert
            else:                                                                       # Wenn DDQN nicht verwendet wird, wird der maximale Q-Wert aus dem Target-Netzwerk für die nächsten Zustände verwendet
                next_q_values = self.target_net(next_states).max(1, keepdim=True)[0]    # Berechnet den maximalen Q-Wert für die nächsten Zustände
            target_q = rewards + (1 - dones) * self.gamma * next_q_values               # Berechnet die Ziel-Q-Werte

        loss = self.criterion(q_values, target_q)                                       # Berechnet den Verlust zwischen den Q-Werten und den Ziel-Q-Werten

        self.optimizer.zero_grad()                                                      # Setzt die Gradienten des Optimierers auf Null
        loss.backward()
        self.optimizer.step()                                                           # Führt einen Schritt des Optimierers aus, um die Gewichte des DQN-Netzwerks zu aktualisieren              


        for target_param, param in zip(                                                 # Soft Update für das Target-Netzwerk
            self.target_net.parameters(), self.q_net.parameters()                       # Durchläuft die Parameter des Target-Netzwerks und des Q-Netzwerks
        ):
            target_param.data.copy_(                                                    # Aktualisiert die Parameter des Target-Netzwerks mit einer Mischung aus den aktuellen Parametern des Q-Netzwerks und den aktuellen Parametern des Target-Netzwerks
                param.data * self.TAU + target_param.data * (1 - self.TAU)              # Mischung der Parameter
            )

        return loss.item()                                                              # Gibt den Verlustwert zurück                

    def decay_epsilon(self, episode, factor=0.5):
        """
        Reduziert den Epsilon-Wert basierend auf der Anzahl der Episoden.
        Der Epsilon-Wert wird exponentiell verringert, um die Exploration im Laufe
        der Zeit zu reduzieren.
        """
        self.epsilon = self.epsilon_min + (                                           # Berechnet den neuen Epsilon-Wert
            self.epsilon_start - self.epsilon_min                                     # Differenz zwischen Start- und Minimalwert von Epsilon
        ) * np.exp(-factor * episode)                                                 # Exponentielle Abnahme des Epsilon-Werts

# Abschnitt 3: Umgebung & Training

Diese Zelle definiert die Hilfsfunktion `discrete_to_continuous_action()`, die eine diskrete Aktionsnummer in eine kontinuierliche Aktionsdarstellung für die CarRacing-Umgebung umwandelt.

Dadurch kann der Agent mit diskreten Aktionen arbeiten, obwohl die Umgebung kontinuierliche Steuerbefehle erwartet.

In [6]:
def discrete_to_continuous_action(action):
    """
    Konvertiert eine diskrete Aktion in eine kontinuierliche Aktion für die CarRacing-Umgebung.
    Aktionen: [steering, gas, brake]
    - 0: Nichts tun
    - 1: Nach rechts lenken
    - 2: Nach links lenken
    - 3: Gas geben
    - 4: Bremsen
    """
    if action == 0:  # Nichts tun
        return np.array([0.0, 0.0, 0.0])
    elif action == 1:  # Rechts lenken
        return np.array([1.0, 0.0, 0.0])
    elif action == 2:  # Links lenken
        return np.array([-1.0, 0.0, 0.0])
    elif action == 3:  # Gas geben
        return np.array([0.0, 1.0, 0.0])
    elif action == 4:  # Bremsen
        return np.array([0.0, 0.0, 0.8])
    else:
        return np.array([0.0, 0.0, 0.0])  # Fallback, sollte nicht passieren

Diese Zelle definiert die Klasse `SkipFrame`, einen Wrapper für die Gym-Umgebung.

Mit diesem Wrapper kann der Agent mehrere Schritte in der Umgebung überspringen, indem eine Aktion für mehrere Frames wiederholt wird.

Dadurch wird das Training effizienter, da weniger Schritte pro Episode ausgeführt werden und die Lernumgebung beschleunigt wird.

Die Klasse summiert die Belohnungen der übersprungenen Schritte und gibt den letzten Beobachtungszustand zurück.

In [7]:
class SkipFrame(gym.Wrapper):
    """
    Wrapper, der es ermöglicht, mehrere Umgebungs-Schritte zu überspringen.
    Nützlich, um die Anzahl der Schritte zu reduzieren und die Trainingsgeschwindigkeit zu erhöhen
    """

    def __init__(self, env, skip):                                                      # Initialisiert den Wrapper mit der Umgebung und der Anzahl der zu überspringenden Schritte 
        super().__init__(env)                                                           
        self._skip = skip   
        self.observation_space = env.observation_space                                  # Skaliert die Beobachtungen
        self.action_space = env.action_space                                            # Setzt den Aktionsraum der Umgebung

    def step(self, action):
        """
        Führt mehrere Schritte in der Umgebung aus, um die Anzahl der Schritte zu reduzieren.
        Summiert die Belohnungen über die übersprungenen Schritte und gibt den letzten Beobachtungszustand zurück.
        """
        total_reward = 0.0                                                              # Initialisiert die Gesamtbelohnung                        
        terminated = False                                                              # Initialisiert das Flag für das Ende der Episode
        truncated = False                                                               # Initialisiert das Flag für das Truncation der Episode
        info = {}                                                                       # Initialisiert ein leeres Info-Dictionary
        for _ in range(self._skip):                                                     # Durchläuft die Anzahl der zu überspringenden Schritte
            obs, reward, terminated, truncated, info = self.env.step(action)            # Überprüft die Aktion in der Umgebung
            total_reward += reward                                                      # Summiert die Belohnungen über die übersprungenen Schritte             
            if terminated or truncated:                                                 # Wenn die Episode beendet oder getruncated ist, wird die Schleife abgebrochen
                break
        return obs, total_reward, terminated, truncated, info                           # Gibt den letzten Beobachtungszustand, die Gesamtbelohnung, das Ende-Flag, das Truncation-Flag und das Info-Dictionary zurück

Diese Zelle initialisiert und konfiguriert die CarRacing-Umgebung.

Dabei werden mehrere Wrapper verwendet, um die Beobachtungen in Graustufen umzuwandeln, die Bildgröße auf 84x84 Pixel zu skalieren, die letzten 4 Beobachtungen zu stapeln und mehrere Schritte pro Aktion zu überspringen.

Das Human-Rendering ist aktiviert, sodass die Umgebung visuell angezeigt wird.

In [8]:
env = gym.make(
    "CarRacing-v3", render_mode="human"
)                                                                                       # Human-Rendering aktiviert um die Umgebung zu sehen
env = GrayscaleObservation(
    env, keep_dim=False
)                                                                                       # Konvertiert die Beobachtungen in Graustufen
env = ResizeObservation(env, (84, 84))                                                  # Skaliert die Beobachtungen auf 84x84 Pixel
env = FrameStackObservation(env, stack_size=4)                                          # Stapelt die letzten 4 Beobachtungen
env = SkipFrame(env, skip=4)                                                            # Wrapper, um mehrere Schritte zu überspringen

  from pkg_resources import resource_stream, resource_exists


Diese Zelle initialisiert den DQN-Agenten für die CarRacing-Umgebung, legt die Anzahl der Trainingsepisoden fest und erstellt Listen zur Speicherung der Belohnungen pro Episode.

Außerdem wird die Anzahl der zu überspringenden Frames pro Schritt definiert.

In [9]:
agent = DQNAgent(
    action_dim=5
)                                                                                       # Initialisiert den DQN-Agenten mit der Anzahl der Aktionen in der CarRacing-Umgebung
num_episodes = 1000                                                                     # Anzahl der Episoden für das Training
episode_rewards = []                                                                    # Liste zur Speicherung der Belohnungen pro Episode
frame_skip = 4                                                                          # Anzahl der Schritte, die in der Umgebung übersprungen werden sollen

Diese Zelle richtet den Live-Plot für die Visualisierung der Trainingsmetriken ein.

Dabei werden die notwendigen Matplotlib-Objekte sowie Listen zur Speicherung von Belohnungen, Verlusten und Epsilon-Werten erstellt, um den Trainingsfortschritt in Echtzeit verfolgen zu können.

Ausgabe des Beobachtungsraums der Umgebung

In [10]:
print(
    "Observation Space:", env.observation_space
)

Observation Space: Box(0, 255, (4, 84, 84), uint8)


Diese Zelle erstellt Listen für die Daten

In [11]:
rewards_list = []
losses_list = []
epsilon_list = []

Diese Zelle enthält die Haupt-Trainingsschleife für den DQN-Agenten.

In jeder Episode wird die Umgebung zurückgesetzt und der Agent interagiert Schritt für Schritt mit der Umgebung, indem er Aktionen auswählt, Belohnungen sammelt und Erfahrungen im Replay Buffer speichert.

Nach jedem Schritt wird das Netzwerk trainiert und der Epsilon-Wert zur Steuerung der Exploration angepasst.

Die Trainingsmetriken (Belohnung, Verlust, Epsilon) werden laufend aktualisiert und visualisiert.

Zusätzlich wird das Modell regelmäßig gespeichert.

In [12]:
for episode in range(num_episodes):
    """Trainingsschleife für den DQN-Agenten.
    Jede Episode beginnt mit der Rücksetzung der Umgebung und endet, wenn der Agent entweder das Ziel erreicht oder die maximale Anzahl von Schritten überschritten hat.
    Der Agent wählt Aktionen basierend auf dem aktuellen Zustand, sammelt Belohnungen und speichert Erfahrungen im Replay Buffer.
    Nach jeder Episode wird der Epsilon-Wert reduziert, um die Exploration zu verringern.
    Die Trainingsmetriken werden in einem Live-Plot aktualisiert und die Belohnungen, Verluste und Epsilon-Werte werden gespeichert.
    Am Ende jeder 100. Episode wird das DQN-Netzwerk gespeichert.
    """
    state, _ = env.reset()                                                                      # Setzt die Umgebung zurück und erhält den Anfangszustand
    done = False                                                                                # Initialisiert das Ende-Flag für die Episode
    episode_reward = 0                                                                          # Initialisiert die Belohnung für die Episode
    step_num = 0                                                                                # Initialisiert die Schrittanzahl für die Episode             
    neg_count = 0                                                                               # Zähler für negative Belohnungen, um festzustellen, ob der Agent nicht vorwärts kommt

    start_time = time.time()
    show_render = (episode + 1) % 1 == 0                                                        # Jede 2. Episode zeigen

    while not done:                                                                             # Die Episode läuft, bis der Agent entweder das Ziel erreicht oder die maximale Anzahl von Schritten überschritten hat
        if show_render:
            env.render()                                                                        # Zeigt das Spiel-Fenster                                                                                   
            time.sleep(0.01)                                                                    # Optional: Kleine Pause für bessere Sichtbarkeit      

        action = agent.select_action(np.array(state))
        continuous_action = discrete_to_continuous_action(action)
        next_state, reward, terminated, truncated, info = env.step(continuous_action)

        if info.get("on_grass", False):
            reward -= 20                                                                        # Festlegen, wie viel Verhalten auf Gras fahren bestraft wird

        if info.get("track_direction", 1) < 0:
            reward -= 50                                                                        # Festlegen, wie viel Verhalten in die falsche Richtung fahren bestraft wird
            terminated = True

        episode_reward += reward                                                                # Summiert die Belohnungen für die Episode
        step_num += 1                                                                           # Zählt die Schritte in der Episode

        if episode_reward < 0:                                                                  # Wenn die Belohnung negativ ist, wird der Zähler für negative Belohnungen erhöht           
            break
        if step_num > 300:                                                                      # Wenn die maximale Schrittanzahl überschritten wird, wird die Episode abgebrochen
            if reward < 0:
                neg_count += 1                                                                  # Zählt die Anzahl der negativen Belohnungen
            if neg_count >= 25:                                                                 # Wenn der Agent 25 negative Belohnungen hintereinander erhält, wird die Episode abgebrochen
                break

        agent.replay_buffer.push(                                                               # Fügt die aktuelle Erfahrung zum Replay Buffer hinzu
            np.array(state), action, reward, np.array(next_state), terminated
        )
        loss = agent.train_step()                                                               # Führt einen Trainingsschritt durch, um das Netzwerk zu aktualisieren
        state = next_state                                                                      # Aktualisiert den Zustand für den nächsten Schritt             
        done = terminated or truncated                                                          # Aktualisiert das Ende-Flag für die Episode

    agent.decay_epsilon(episode, factor=0.7)                                                    # Reduziert den Epsilon-Wert basierend auf der Anzahl der Episoden

    end_time = time.time()                                                                      # Berechnet die Dauer der Episode
    episode_duration = end_time - start_time                                                   

    rewards_list.append(episode_reward)                                                         # Speichert die Belohnung für die Episode
    losses_list.append(loss)                                                                    # Speichert den Verlust für die Episode
    epsilon_list.append(agent.epsilon)                                                          # Speichert den Epsilon-Wert für die Episode

    clear_output(wait=True)

    # Erstelle Subplots: 1 Zeile, 3 Spalten
    fig = make_subplots(
        rows=1, cols=3,
        subplot_titles=("Reward", "Loss", "Epsilon")
    )

    # Reward-Plot (blau)
    fig.add_trace(go.Scatter(
        y=rewards_list,
        mode='lines',
        name='Reward',
        line=dict(color='blue')
    ), row=1, col=1)

    # Loss-Plot (rot)
    fig.add_trace(go.Scatter(
        y=losses_list,
        mode='lines',
        name='Loss',
        line=dict(color='red')
    ), row=1, col=2)

    # Epsilon-Plot (grün)
    fig.add_trace(go.Scatter(
        y=epsilon_list,
        mode='lines',
        name='Epsilon',
        line=dict(color='green')
    ), row=1, col=3)

    # Layout-Optionen
    fig.update_layout(
        title_text=f"Übersicht der Metriken",
        template="plotly_white",
        showlegend=False,  # Legende in Subplots eher verwirrend
    )

    fig.update_xaxes(title_text="Episode", row=1, col=1)
    fig.update_xaxes(title_text="Episode", row=1, col=2)
    fig.update_xaxes(title_text="Episode", row=1, col=3)

    fig.update_yaxes(title_text="Reward", row=1, col=1)
    fig.update_yaxes(title_text="Loss", row=1, col=2)
    fig.update_yaxes(title_text="Epsilon", row=1, col=3)

    # Zeigen
    fig.show()

    ma_reward = moving_average(rewards_list, window_size=10)                            # Berechnet den gleitenden Durchschnitt der Belohnungen
    print(
        f"Episode {episode+1}/{num_episodes} | Reward: {episode_reward:.2f} | "
        f"MA10: {ma_reward[-1]:.2f} | Epsilon: {agent.epsilon:.3f}, Loss: {loss:.4f}"
    )
    
    if (episode + 1) % 100 == 0:                                                        # Speichert das DQN-Netzwerk alle 100 Episoden
        torch.save(agent.q_net.state_dict(), f"checkpoint_episode_{episode+1}.pth")

env.close()

Episode 1000/1000 | Reward: 878.65 | MA10: 856.84 | Epsilon: 0.100, Loss: 3.6939


Speichern der Visualisierung:

In [13]:
fig.write_image("final_plot_combined.png", width=2400, height=800, scale=2)

Diese Zelle erstellt und speichert drei separate Diagramme für die Trainingsmetriken Reward, Loss und Epsilon.

Für jede Metrik wird ein Plot erzeugt, als PNG-Datei mit hoher Auflösung gespeichert und eine Bestätigung über den erfolgreichen Speichervorgang ausgegeben.

In [14]:
# Plot 1: Reward
fig_reward = go.Figure()
fig_reward.add_trace(go.Scatter(
    y=rewards_list,
    mode='lines',
    name='Reward',
    line=dict(color='blue')
))
fig_reward.update_layout(
    title='Reward pro Episode',
    xaxis_title='Episode',
    yaxis_title='Reward',
    template='plotly_white'
)
fig_reward.write_image("reward_plot.png", width=800, height=600, scale=2)
print("Reward Plot gespeichert: 'reward_plot.png'")

# Plot 2: Loss
fig_loss = go.Figure()
fig_loss.add_trace(go.Scatter(
    y=losses_list,
    mode='lines',
    name='Loss',
    line=dict(color='red')
))
fig_loss.update_layout(
    title='Loss pro Episode',
    xaxis_title='Episode',
    yaxis_title='Loss',
    template='plotly_white'
)
fig_loss.write_image("loss_plot.png", width=800, height=600, scale=2)
print("Loss Plot gespeichert: 'loss_plot.png'")

# Plot 3: Epsilon
fig_epsilon = go.Figure()
fig_epsilon.add_trace(go.Scatter(
    y=epsilon_list,
    mode='lines',
    name='Epsilon',
    line=dict(color='green')
))
fig_epsilon.update_layout(
    title='Epsilon pro Episode',
    xaxis_title='Episode',
    yaxis_title='Epsilon',
    template='plotly_white'
)
fig_epsilon.write_image("epsilon_plot.png", width=800, height=600, scale=2)
print("Epsilon Plot gespeichert: 'epsilon_plot.png'")


Reward Plot gespeichert: 'reward_plot.png'
Loss Plot gespeichert: 'loss_plot.png'
Epsilon Plot gespeichert: 'epsilon_plot.png'


Diese Zelle speichert das trainierte neuronale Netzwerk des Agenten in einer Datei namens `trained_agent.pth` ab.

Dadurch kann das Modell später geladen und weiterverwendet werden, ohne das Training erneut durchführen zu müssen.

Nach dem Speichern wird eine Bestätigung ausgegeben, dass der Agent erfolgreich gespeichert wurde.

In [15]:
torch.save(agent.q_net.state_dict(), "trained_agent.pth")
print("Agent saved as 'trained_agent.pth'")

Agent saved as 'trained_agent.pth'
