In [14]:
%load_ext autoreload
%autoreload 2
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import clear_output

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 1. Q-learning in the wild (3 pts)

Here we use the qlearning agent on taxi env from openai gym.
You will need to insert a few agent functions here.

In [15]:
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from IPython.display import clear_output

import warnings
warnings.filterwarnings('ignore')

import gym

env = gym.make("Taxi-v3")
n_actions = env.action_space.n

print(f"Taxi-v3 created")
print(f"Actions: {n_actions}")

class QLearningAgent():
    def __init__(self, alpha, epsilon, discount, getLegalActions):
        self.getLegalActions = getLegalActions
        self._qValues = defaultdict(lambda: defaultdict(lambda: 0))
        self.alpha = alpha
        self.epsilon = epsilon
        self.discount = discount

    def getQValue(self, state, action):
        if not (state in self._qValues) or not (action in self._qValues[state]):
            return 0.0
        return self._qValues[state][action]

    def setQValue(self, state, action, value):
        self._qValues[state][action] = value

    def getValue(self, state):
        possibleActions = self.getLegalActions(state)
        if not possibleActions:
            return 0.0
        return max(self.getQValue(state, a) for a in possibleActions)

    def getPolicy(self, state):
        possibleActions = self.getLegalActions(state)
        if not possibleActions:
            return None
        
        best_value = -float('inf')
        best_action = None
        for action in possibleActions:
            value = self.getQValue(state, action)
            if value > best_value:
                best_value = value
                best_action = action
        return best_action

    def getAction(self, state):
        possibleActions = self.getLegalActions(state)
        if not possibleActions:
            return None
        
        if random.random() < self.epsilon:
            return random.choice(possibleActions)
        else:
            return self.getPolicy(state)

    def update(self, state, action, nextState, reward):
        old_q = self.getQValue(state, action)
        next_q = self.getValue(nextState)
        new_q = old_q + self.alpha * (reward + self.discount * next_q - old_q)
        self.setQValue(state, action, new_q)

def play_and_train(env, agent, t_max=1000):
    total_reward = 0.0
    s = env.reset()

    for t in range(t_max):
        a = agent.getAction(s)
        next_s, r, done, _ = env.step(a)
        agent.update(s, a, next_s, r)
        s = next_s
        total_reward += r
        if done:
            break
    return total_reward

agent = QLearningAgent(
    alpha=0.2,
    epsilon=0.3,
    discount=0.95,
    getLegalActions=lambda s: range(n_actions)
)

rewards = []
for i in range(1000):
    reward = play_and_train(env, agent)
    rewards.append(reward)
    agent.epsilon = max(0.01, agent.epsilon * 0.999)
    
    if i % 100 == 0:
        clear_output(wait=True)
        avg_reward = np.mean(rewards[-100:]) if len(rewards) >= 100 else np.mean(rewards)
        print(f"Episode {i}")
        print(f"Reward: {reward:.2f}")
        print(f"Avg (last 100): {avg_reward:.2f}")
        print(f"Epsilon: {agent.epsilon:.3f}")
        
        plt.figure(figsize=(10, 4))
        plt.plot(rewards)
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Training Progress')
        plt.show()

print("\nDone!")
print(f"Final avg: {np.mean(rewards[-100:]):.2f}")

ModuleNotFoundError: No module named 'gym'

In [None]:
import gymnasium as gym
env = gym.make("Taxi-v3")
n_actions = env.action_space.n

Устанавливаю gymnasium через conda...


In [None]:


from IPython.display import clear_output

from IPython.display import clear_output
import matplotlib.pyplot as plt

def play_and_train(env, agent, t_max=10**4):
    """This function should
    - run a full game, actions given by agent.getAction(s)
    - train agent using agent.update(...) whenever possible
    - return total reward"""
    total_reward = 0.0
    s = env.reset()

    for t in range(t_max):
        a = agent.getAction(s)
        next_s, r, done, _ = env.step(a)

        agent.update(s, a, next_s, r)

        s = next_s
        total_reward += r
        if done:
            break

    return total_reward

rewards = []
for i in range(1000):
    rewards.append(play_and_train(env, agent))
    agent.epsilon *= 0.999 
    if i % 100 == 0:
        clear_output(True)
        print(f"Epsilon: {agent.epsilon:.4f}")
        print(f"Average reward: {np.mean(rewards[-100:]):.2f}")
        plt.plot(rewards)
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Training Progress')
        plt.show()

NameError: name 'env' is not defined

## 3. Continuous state space (2 pt)

Чтобы использовать табличный q-learning на continuous состояниях, надо как-то их обрабатывать и бинаризовать. Придумайте способ разбивки на дискретные состояния.

In [None]:
import gym

env = gym.make("CartPole-v1")
n_actions = env.action_space.n
print("first state:%s" % (env.reset()))

### Play a few games

Постройте распределения различных частей состояния игры. Сыграйте несколько игр и запишите все состояния.

In [None]:
states = []
for _ in range(100):
    state = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        next_state, _, done, _ = env.step(action)
        states.append(next_state)
        if done:
            break

states = np.array(states)
print(f"Collected {len(states)} states")
print(f"State ranges: min={states.min(axis=0)}, max={states.max(axis=0)}")

fig, axes = plt.subplots(2, 2, figsize=(10, 8))
for i in range(4):
    ax = axes[i//2, i%2]
    ax.hist(states[:, i], bins=30)
    ax.set_title(f'Dimension {i+1}')
    ax.set_xlabel('Value')
    ax.set_ylabel('Count')
plt.tight_layout()
plt.show()

## Binarize environment

In [None]:
from gym.core import ObservationWrapper

class Binarizer(ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.bins = [
            np.linspace(-2.4, 2.4, 8),    # позиция
            np.linspace(-3.0, 3.0, 6),    # скорость
            np.linspace(-0.2, 0.2, 8),    # угол
            np.linspace(-3.0, 3.0, 6)     # угл скорость
        ]

    def to_bin(self, value, bins):
        return np.digitize(value, bins)

    def observation(self, state):
        binned_state = tuple(
            self.to_bin(state[i], self.bins[i]) 
            for i in range(4)
        )
        return binned_state

env = Binarizer(gym.make("CartPole-v1"))
print("Binarized state example:", env.reset())

## Learn

In [None]:
agent = QLearningAgent(
    alpha=0.1,
    epsilon=0.5,
    discount=0.95,
    getLegalActions=lambda s: range(n_actions)
)

rewards = []
rewBuf = []
for i in range(500):
    for _ in range(10):
        reward = play_and_train(env, agent)
        rewards.append(reward)
    agent.epsilon *= 0.995
    
    if len(rewards) >= 100:
        rewBuf.append(np.mean(rewards[-100:]))
    else:
        rewBuf.append(np.mean(rewards))
    
    if i % 20 == 0:
        clear_output(True)
        print(f"Episode {i*10}")
        print(f"Epsilon: {agent.epsilon:.3f}")
        print(f"Recent reward: {rewBuf[-1]:.2f}")
        plt.plot(rewBuf)
        plt.xlabel('Episode (x10)')
        plt.ylabel('Average Reward (last 100)')
        plt.title('Training Progress - CartPole')
        plt.show()

if rewBuf[-1] > 195:
    print("Win!")

## 4. Experience replay (5 pts)

In [None]:
import random

class ReplayBuffer(object):
    def __init__(self, size):
        self._storage = []
        self._maxsize = size
        self._next_idx = 0

    def __len__(self):
        return len(self._storage)

    def add(self, obs_t, action, reward, obs_tp1, done):
        data = (obs_t, action, reward, obs_tp1, done)
        if len(self._storage) < self._maxsize:
            self._storage.append(data)
        else:
            self._storage[self._next_idx] = data
        self._next_idx = (self._next_idx + 1) % self._maxsize

    def sample(self, batch_size):
        indices = np.random.randint(0, len(self._storage), size=batch_size)
        states, actions, rewards, next_states, dones = [], [], [], [], []
        
        for i in indices:
            data = self._storage[i]
            states.append(data[0])
            actions.append(data[1])
            rewards.append(data[2])
            next_states.append(data[3])
            dones.append(data[4])
            
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(next_states),
            np.array(dones)
        )

Some tests to make sure your buffer works right

In [None]:
import numpy as np
replay = ReplayBuffer(2)
obj1 = tuple(range(5))
obj2 = tuple(range(5, 10))
replay.add(*obj1)
assert replay.sample(1)[0][0] == 0, "If there's just one object in buffer, it must be retrieved by buf.sample(1)"
replay.add(*obj2)
assert len(replay._storage)==2, "Please make sure __len__ methods works as intended."
replay.add(*obj2)
assert len(replay._storage)==2, "When buffer is at max capacity, replace objects instead of adding new ones."
assert np.unique(replay.sample(100)[0])[0] == 5
replay.add(*obj1)
assert max(len(np.unique(a)) for a in replay.sample(100))==2
replay.add(*obj1)
assert np.unique(replay.sample(100)[0])[0] == 0
print("Success!")

Now let's use this buffer to improve training:

In [None]:
env = Binarizer(gym.make('CartPole-v1'))
n_actions = env.action_space.n

agent = QLearningAgent(
    alpha=0.1,
    epsilon=0.5,
    discount=0.95,
    getLegalActions=lambda s: range(n_actions)
)
replay = ReplayBuffer(10000)

def play_and_train_with_replay(env, agent, replay, t_max=500, batch_size=32):
    total_reward = 0.0
    s = env.reset()
    
    for t in range(t_max):
        action = agent.getAction(s)
        next_s, r, done, _ = env.step(action)
        
        # Заполняем replay buffer
        replay.add(s, action, r, next_s, done)
        
        # Обучаемся на batch из replay buffer
        if len(replay) >= batch_size:
            states, actions, rewards, next_states, dones = replay.sample(batch_size)
            for i in range(batch_size):
                agent.update(states[i], actions[i], next_states[i], rewards[i])
        
        s = next_s
        total_reward += r
        if done:
            break
    
    return total_reward

rewards = []
rewBuf = []
for i in range(200):
    for _ in range(10):
        reward = play_and_train_with_replay(env, agent, replay, batch_size=64)
        rewards.append(reward)
    agent.epsilon *= 0.995
    
    if len(rewards) >= 100:
        rewBuf.append(np.mean(rewards[-100:]))
    else:
        rewBuf.append(np.mean(rewards))
    
    if i % 10 == 0:
        clear_output(True)
        print(f"Iteration {i*10}")
        print(f"Epsilon: {agent.epsilon:.3f}")
        print(f"Recent reward: {rewBuf[-1]:.2f}")
        plt.plot(rewBuf)
        plt.xlabel('Iteration (x10)')
        plt.ylabel('Average Reward (last 100)')
        plt.title('Training with Experience Replay')
        if rewBuf[-1] > 195:
            print("Win!")
            break
        plt.show()