## Eigene Umgebung

### Spaces

In [84]:
import gymnasium as gym
from gymnasium.spaces import Discrete, Box, Tuple, Dict, MultiBinary, MultiDiscrete, Sequence
from typing import Optional
import numpy as np
from stable_baselines3 import PPO

In [5]:
Discrete(3)  # Liste mit X aufsteigenden Werten (bei 0 beginnend)

Discrete(3)

In [9]:
Box(0, 1, shape=(3, 3)).sample()  # 3x3 großes Feld mit Werten von X bis Y

array([[0.37702125, 0.4845982 , 0.2960506 ],
       [0.14492843, 0.14549544, 0.2020971 ],
       [0.61733496, 0.8103528 , 0.05600352]], dtype=float32)

In [10]:
Box(0, 1, shape=(3, 3), dtype=int).sample()  # Es kann hier auch noch ein Datentyp angegeben werden

array([[0, 0, 0],
       [1, 1, 0],
       [0, 1, 1]])

In [11]:
Box(0, 100, shape=(3, 3), dtype=int).sample()

array([[46, 16, 36],
       [73, 32, 50],
       [42, 44, 56]])

In [16]:
Tuple((Discrete(3), Box(0, 1, shape=(3, 3), dtype=int)))  # Tuple: Baut beliebig viele Einzelne Spaces in einen gemeinsamen Space zusammen

Tuple(Discrete(3), Box(0, 1, (3, 3), int32))

In [28]:
Tuple((Discrete(3), Box(0, 1, shape=(3, 3), dtype=int))).sample()

(0,
 array([[0, 0, 1],
        [1, 1, 0],
        [0, 1, 1]]))

In [20]:
Dict({"X": Box(0, 100, dtype=int), "Y": Box(0, 100, dtype=int)})  # Effektiv ein Tupel nur mit Namen pro Element

Dict('X': Box(0, 100, (1,), int32), 'Y': Box(0, 100, (1,), int32))

In [19]:
Dict({"X": Box(0, 100, dtype=int), "Y": Box(0, 100, dtype=int)}).sample()

OrderedDict([('X', array([13])), ('Y', array([41]))])

In [35]:
Sequence(Box(-100, 100)).sample()  # Beliebige Anzahl an Werten

(array([-57.240025], dtype=float32),
 array([-29.75645], dtype=float32),
 array([78.677925], dtype=float32),
 array([-8.643945], dtype=float32),
 array([-17.959154], dtype=float32),
 array([-1.9174975], dtype=float32),
 array([70.89043], dtype=float32))

In [38]:
MultiBinary(10).sample()  # Gibt X Bits (1 oder 0) zurück

array([0, 0, 1, 0, 1, 1, 1, 1, 1, 0], dtype=int8)

In [43]:
MultiDiscrete([5, 2, 2]).sample()

array([2, 0, 0], dtype=int64)

## Eigene Umgebung

Die eigene Umgebung ist eine Klasse, die von der Env Klasse erbt.

Danach können die Standardfunktionen implementiert werden

Wir wollen jetzt eine Umgebung bauen, die einen Autofahrer darstellt

Es soll hierbei die Geschwindigkeit im grünen Bereich gehalten werden (~50km/h)

In [94]:
class CarDriver(gym.Env):
    def __init__(self):
        # Spaces definieren
        self.action_space = Discrete(3)  # 3 Aktionen: Beschleunigen, bremsen, nichts tun
        self.observation_space = Box(0, 100, dtype=int)  # Box die die Geschwindigkeitsgrenzen definiert
        self.state = 40 + np.random.randint(-10, 10)  # State: Der derzeitige Status, kann ein beliebiges Objekt sein. Hier: Startgeschwindigkeit

        # Weitere Variablen (optional)
        self.drive_length = 60
        self.min_speed = 45
        self.max_speed = 55

    def step(self, action):
        # Actions: 0, 1, 2
        # 0: Nichts tun -> +-0km/h
        # 1: Bremsen -> -3 km/h
        # 2: Beschleunigen -> +3 km/h
        self.state += -3 if action == 1 else 3 if action == 2 else 0
        self.drive_length -= 1

        # Belohnung verteilen
        if self.state >= self.min_speed and self.state <= self.max_speed:
            reward = 1
        else:
            reward = -1

        # Fertig setzen
        done = self.drive_length <= 0

        # Noise (künstlich Auto verlangsamen und beschleunigen)
        self.state += np.random.randint(-1, 1)

        return np.array(self.state).reshape(1,), reward, done, False, {}  # State, Reward für diesen Step, Fertig Ja/Nein, Termination (kann ignoriert werden), Info (optional)

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None,):
        self.state = 40 + np.random.randint(-10, 10)
        self.drive_length = 60
        return (np.array(self.state, dtype=int).reshape(1,), {})

    def render(self):  # Keine UI
        pass

    def close(self):  # Keine End-Events
        pass

## Environment Checker

Damit kann geprüft werden, ob die Umgebung für RL funktioniert

In [95]:
env = CarDriver()

In [96]:
from stable_baselines3.common.env_checker import check_env

In [97]:
check_env(env, warn=True)

In [100]:
durchgänge = 5  # Anzahl der Durchgänge
for i in range(durchgänge):  # Teste die Umgebung X mal
    state = env.reset()  # Setze die Umgebung für den nächsten Durchlauf zurück
    done = False  # Brich den Prozess ab, wenn z.B. die Zeit abläuft
    score = 0

    while not done:
        env.render()  # Zeichne die Umgebung
        action = env.action_space.sample()  # Nimmt ein Random Element aus dem Action Space (Links oder Rechts)
        state, reward, done, term, info = env.step(action)  # state: Observation Space, reward: Belohnung, done: Fertig Ja/Nein, term: Termination (vorzeitiges Beenden), info: Extra Infos
        score += reward
    print(f"Durchgang {i}, Score: {score}")
env.close()

Durchgang 0, Score: -60
Durchgang 1, Score: -60
Durchgang 2, Score: -60
Durchgang 3, Score: -58
Durchgang 4, Score: -60


In [101]:
env = CarDriver()

In [102]:
model = PPO("MlpPolicy", env, verbose=1)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [103]:
model.learn(total_timesteps=100000)

---------------------------------
| rollout/           |          |
|    ep_len_mean     | 60       |
|    ep_rew_mean     | -42.5    |
| time/              |          |
|    fps             | 719      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 60           |
|    ep_rew_mean          | -42.4        |
| time/                   |              |
|    fps                  | 452          |
|    iterations           | 2            |
|    time_elapsed         | 9            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0117158685 |
|    clip_fraction        | 0.179        |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.09        |
|    explained_variance   | -0.00349     |
|    learning_r

<stable_baselines3.ppo.ppo.PPO at 0x144cf8d24a0>

In [104]:
model.save("Models/CarDriver")

In [105]:
env = CarDriver()

In [106]:
model = model.load("Models/CarDriver", env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [108]:
durchgänge = 5  # Anzahl der Durchgänge
for i in range(durchgänge):  # Teste die Umgebung X mal
    observation = env.reset()  # Setze die Umgebung für den nächsten Durchlauf zurück
    done = False  # Brich den Prozess ab, wenn z.B. die Zeit abläuft
    score = 0

    while not done:
        env.render()  # Zeichne die Umgebung
        if len(observation) > 1:
            observation = observation[0]
        action, _ = model.predict(observation)  # Nimmt den State nach Reset
        observation, reward, done, term, info = env.step(action)  # state: Observation Space, reward: Belohnung, done: Fertig Ja/Nein, term: Termination (vorzeitiges Beenden), info: Extra Infos
        score += reward
    print(f"Durchgang {i}, Score: {score}")
env.close()

Durchgang 0, Score: 46
Durchgang 1, Score: 60
Durchgang 2, Score: 50
Durchgang 3, Score: 48
Durchgang 4, Score: 54
