## Eigene Umgebung

Eine eigene Umgebung ist eine Klasse mit den Standardmethoden einer Umgebung (\_\_init__, step, reset, render, close)

Wir benötigen zusätzlich auch Spaces (observation_space, action_space)

### Typen von Spaces

In [38]:
from gymnasium.spaces import Discrete, Box, Dict, Tuple
import gymnasium as gym
import numpy as np

#### Discrete

Liste befüllt mit X Werten (hier 3)

Wird generell für den Action Space verwendet

In [4]:
Discrete(3)

Discrete(3)

In [6]:
d = Discrete(3)
for x in range(20):
    print(d.sample())

1
1
0
1
2
2
0
1
1
0
2
0
0
1
1
2
2
2
0
1


#### Box

Zwei Arrays welche kombiniert miteinander einen Bereich darstellen

Wird generell für den Observation Space verwendet

In [8]:
Box(0, 100)  # Box mit einem Bereich

Box(0.0, 100.0, (1,), float32)

In [12]:
Box(0, 100, shape=(4,))

Box(0.0, 100.0, (4,), float32)

In [13]:
Box(0, 100, shape=(4,)).sample()

array([12.862288, 87.91389 , 67.61081 , 85.43166 ], dtype=float32)

In [15]:
Box(0, 100, shape=(4,), dtype=int).sample()

array([12, 19, 61, 81])

#### Tuple, Dict

Tuple: Kombination von mehreren Spaces

Dict: Kombination von mehreren Spaces (mit Namen)

In [20]:
Tuple((Box(0, 1), Box(0, 1)))

Tuple(Box(0.0, 1.0, (1,), float32), Box(0.0, 1.0, (1,), float32))

In [28]:
Tuple((Box(0, 1), Box(0, 1))).sample()

(array([0.13346805], dtype=float32), array([0.01115223], dtype=float32))

In [22]:
Dict({ "B1": Box(0, 1), "B2": Box(0, 1)})

Dict('B1': Box(0.0, 1.0, (1,), float32), 'B2': Box(0.0, 1.0, (1,), float32))

In [26]:
Dict({ "B1": Box(0, 1), "B2": Box(0, 1)})["B1"].sample()

array([0.21768107], dtype=float32)

### Definition von eigener Umgebung

Eine eigene Umgebung muss eine Klasse sein und von gym.Env erben

Außerdem muss diese Umgebung die 4 Standardmethoden implementieren und action/observatio_space + State angeben

Aufgabe des Fahrers: Die Geschwindigkeit zwischen 45km/h und 55km/h zu halten
- Wenn der Fahrer im Bereich ist, bekommt er einen Punkt

In [81]:
from typing import Optional
class CarDriver(gym.Env):
    def __init__(self):
        self.action_space = Discrete(3)  # 3 verschiedene Aktionen (Nichts, Beschleunigen, Bremsen)
        self.observation_space = Box(0, 100, dtype=np.float32)  # Geschwindigkeitsgrenzen (zwischen 0 und 100)
        self.state = 50
        self.steps = 0
        self.maxLength = 60

    def step(self, action):
        # 0, Nichts: Geschwindigkeit += 0
        # 1, Beschleunigen: Geschwindigkeit += 3
        # 2, Bremsen: Geschwindigkeit -= 3
        if action == 1:
            self.state += 3
        if action == 2:
            self.state -= 3

        # reward vergeben
        if self.state >= 45 and self.state <= 55:
            reward = 1
        else:
            reward = 0

        self.steps += 1

        self.state += np.random.randint(-1, 1)  # Varianz der Fahrbahn (Bergauf, Bergab, Baustellen, Stau, ...)
        
        # state, reward, done, terminate, info
        # state muss immer ein Numpy Array sein
        return np.array([self.state], dtype=np.float32), reward, self.steps > self.maxLength, False, {}
    
    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        self.state = 50
        self.steps = 0
        return (np.array([self.state], dtype=np.float32), {})

    def render(self):
        pass

    def close(self):
        pass

### EnvChecker

Prüft, ob die Umgebung valide ist (-> korrekt implementiert wurde)

In [82]:
from stable_baselines3.common.env_checker import check_env

In [83]:
env = CarDriver()

In [84]:
check_env(env, warn=True)

In [87]:
durchgaenge = 10
for x in range(durchgaenge):
    state = env.reset()
    score = 0
    done = False

    while not done:
        action = env.action_space.sample()
        state, reward, done, term, info = env.step(action)
        score += reward
        # print(f"\tDurchgang {x + 1}, Geschwindigkeit: {state}")

    print(f"Durchgang {x + 1}, Score: {score}")

Durchgang 1, Score: 29
Durchgang 2, Score: 13
Durchgang 3, Score: 9
Durchgang 4, Score: 7
Durchgang 5, Score: 9
Durchgang 6, Score: 5
Durchgang 7, Score: 8
Durchgang 8, Score: 1
Durchgang 9, Score: 9
Durchgang 10, Score: 10


In [88]:
from stable_baselines3 import PPO

In [89]:
env = CarDriver()
model = PPO("MlpPolicy", env, verbose=1)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [90]:
model.learn(100_000)

---------------------------------
| rollout/           |          |
|    ep_len_mean     | 61       |
|    ep_rew_mean     | 13.3     |
| time/              |          |
|    fps             | 1501     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 61          |
|    ep_rew_mean          | 16.6        |
| time/                   |             |
|    fps                  | 840         |
|    iterations           | 2           |
|    time_elapsed         | 4           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.012602279 |
|    clip_fraction        | 0.186       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.09       |
|    explained_variance   | 0.0114      |
|    learning_rate        | 0.

<stable_baselines3.ppo.ppo.PPO at 0x25212175460>

In [92]:
model.save("Models/CarDriver")

In [94]:
env = CarDriver()
model = PPO.load("Models/CarDriver", env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [97]:
durchgaenge = 10
for x in range(durchgaenge):
    observation = env.reset()  # Umgebung auf den Anfangszustand zurücksetzen
    score = 0  # Variable um die Performance des Algorithmus' zu speichern
    done = False  # Beschreibt, ob der Agent noch "im Spiel" ist
    observation = observation[0]

    while not (done or term):
        env.render()
        
        # Jetzt nehmen wir anstatt einer Random Action die Vorhersage des Models
        action, _ = model.predict(observation)
        if action.ndim == 1:
            action = action[0]
        
        observation, reward, done, term, info = env.step(action)
        score += reward  # reward: Gibt dem Agenten für korrekte Aktionen +Punkte, oder keine Punkte

    print(f"Durchgang {x + 1}, Score: {score}")
env.close()

Durchgang 1, Score: 47
Durchgang 2, Score: 49
Durchgang 3, Score: 55
Durchgang 4, Score: 58
Durchgang 5, Score: 61
Durchgang 6, Score: 46
Durchgang 7, Score: 50
Durchgang 8, Score: 50
Durchgang 9, Score: 50
Durchgang 10, Score: 47
