# Crossentropy method

This notebook will teach you to solve reinforcement learning problems with crossentropy method. We'll follow-up by scaling everything up and using neural network policy.

In [None]:
import os

In [None]:
import gym
import numpy as np, pandas as pd

env = gym.make("Taxi-v3")
env.reset()
env.render()

In [None]:
n_states = env.observation_space.n
n_actions = env.action_space.n

print("n_states=%i, n_actions=%i"%(n_states,n_actions))

# Create stochastic policy

This time our policy should be a probability distribution.

```policy[s,a] = P(take action a | in state s)```

Since we still use integer state and action representations, you can use a 2-dimensional array to represent the policy.

Please initialize policy __uniformly__, that is, probabililities of all actions should be equal.


# Initialize policy (0.5pts)

In [None]:
policy = np.ones((n_states, n_actions)) / n_actions

assert type(policy) in (np.ndarray, np.matrix)
assert np.allclose(policy, 1./n_actions)
assert np.allclose(np.sum(policy, axis=1), 1)
print("Ok!")

# Play the game (0.5pts)

Just like before, but we also record all states and actions we took.

In [None]:
def generate_session(policy,t_max=10**4):
    """
    Play game until end or for t_max ticks.
    :param policy: an array of shape [n_states,n_actions] with action probabilities
    :returns: list of states, list of actions and sum of rewards
    """
    states, actions = [], []
    total_reward = 0.
    
    s = env.reset()
    
    for t in range(t_max):        
        a = np.random.choice(range(n_actions), p=policy[s])
        new_s, r, done, info = env.step(a)
        states.append(s)        
        actions.append(a)       
        total_reward += r       
        s = new_s
        if done:
            break
    return states, actions, total_reward
        

In [None]:
s,a,r = generate_session(policy)
assert type(s) == type(a) == list
assert len(s) == len(a)
assert type(r) in [float,np.float]

In [None]:
#let's see the initial reward distribution
import matplotlib.pyplot as plt
%matplotlib inline

sample_rewards = [generate_session(policy,t_max=1000)[-1] for _ in range(200)]

plt.hist(sample_rewards,bins=20);
plt.vlines([np.percentile(sample_rewards,50)],[0],[100],label="50'th percentile",color='green')
plt.vlines([np.percentile(sample_rewards,90)],[0],[100],label="90'th percentile",color='red')
plt.legend()

### Crossentropy method steps (1pts)

In [None]:
def select_elites(states_batch, actions_batch, rewards_batch, percentile=50):
#
    reward_threshold = np.percentile(rewards_batch, percentile)
    
    elite_states = []
    elite_actions = []
    
    for i in range(len(rewards_batch)):
        if rewards_batch[i] >= reward_threshold:
            elite_states.append(states_batch[i])
            elite_actions.append(actions_batch[i])
    
    elite_states = np.concatenate(elite_states) if elite_states else np.array([])
    elite_actions = np.concatenate(elite_actions) if elite_actions else np.array([])
    
    return elite_states, elite_actions

In [None]:
states_batch = np.array([
    np.array([1,2,3]),   #game1
    np.array([4,2,0,2]), #game2
    np.array([3,1])      #game3
])

actions_batch = np.array([
    np.array([0,2,4]),   #game1
    np.array([3,2,0,1]), #game2
    np.array([3,3])      #game3
])
rewards_batch = np.array([
    3,         #game1
    4,         #game2
    5,         #game3
])

test_result_0 = select_elites(states_batch,actions_batch,rewards_batch,percentile=0)
test_result_40 = select_elites(states_batch,actions_batch,rewards_batch,percentile=30)
test_result_90 = select_elites(states_batch,actions_batch,rewards_batch,percentile=90)
test_result_100 = select_elites(states_batch,actions_batch,rewards_batch,percentile=100)

assert np.all(test_result_0[0] == [1, 2, 3, 4, 2, 0, 2, 3, 1])  \
   and np.all(test_result_0[1] == [0, 2, 4, 3, 2, 0, 1, 3, 3]),\
        "For percentile 0 you should return all states and actions in chronological order"
assert np.all(test_result_40[0] == [4, 2, 0, 2, 3, 1]) and \
        np.all(test_result_40[1] ==[3, 2, 0, 1, 3, 3]),\
        "For percentile 30 you should only select states/actions from two first"
assert np.all(test_result_90[0] == [3,1]) and \
        np.all(test_result_90[1] == [3,3]),\
        "For percentile 90 you should only select states/actions from one game"
assert np.all(test_result_100[0] == [3,1]) and\
       np.all(test_result_100[1] == [3,3]),\
        "Please make sure you use >=, not >. Also double-check how you compute percentile."
print("Ok!")

In [None]:
def update_policy(elite_states, elite_actions):
  
    new_policy = np.zeros([n_states, n_actions])
    
    if len(elite_states) > 0:

        linear_indices = elite_states * n_actions + elite_actions
        
        counts = np.bincount(linear_indices, minlength=n_states * n_actions)
        counts = counts.reshape(n_states, n_actions)
        
        new_policy = counts

    row_sums = new_policy.sum(axis=1, keepdims=True)
    
    row_sums[row_sums == 0] = 1
    new_policy = new_policy / row_sums

    unvisited_states = np.where(new_policy.sum(axis=1) == 0)[0]
    new_policy[unvisited_states] = 1.0 / n_actions
    
    return new_policy

In [None]:

elite_states, elite_actions = ([1, 2, 3, 4, 2, 0, 2, 3, 1], [0, 2, 4, 3, 2, 0, 1, 3, 3])


new_policy = update_policy(elite_states,elite_actions)

assert np.isfinite(new_policy).all(), "Your new policy contains NaNs or +-inf. Make sure you don't divide by zero."
assert np.all(new_policy>=0), "Your new policy can't have negative action probabilities"
assert np.allclose(new_policy.sum(axis=-1),1), "Your new policy should be a valid probability distribution over actions"
reference_answer = np.array([
       [ 1.        ,  0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.5       ,  0.        ,  0.        ,  0.5       ,  0.        ],
       [ 0.        ,  0.33333333,  0.66666667,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.5       ,  0.5       ]])
print(new_policy[:4,:5])
assert np.allclose(new_policy[:4,:5],reference_answer)
print("Ok!")

# Training loop (1pts)
Generate sessions, select N best and fit to those.

In [None]:
from IPython.display import clear_output
import matplotlib.pyplot as plt
%matplotlib inline

def show_progress(rewards_batch,log, reward_range=[-990,+10]):
    """
    A convenience function that displays training progress. 
    No cool math here, just charts.
    """
    
    mean_reward = np.mean(rewards_batch)
    threshold = np.percentile(rewards_batch,percentile)
    log.append([mean_reward,threshold])

    clear_output(True)
    print("mean reward = %.3f, threshold=%.3f"%(mean_reward,threshold))
    plt.figure(figsize=[8,4])
    plt.subplot(1,2,1)
    plt.plot(list(zip(*log))[0],label='Mean rewards')
    plt.plot(list(zip(*log))[1],label='Reward thresholds')
    plt.legend()
    plt.grid()
    
    plt.subplot(1,2,2)
    plt.hist(rewards_batch,range=reward_range);
    plt.vlines([np.percentile(rewards_batch,percentile)],[0],[100],label="percentile",color='red')
    plt.legend()
    plt.grid()

    plt.show()


In [None]:

policy = np.ones([n_states, n_actions]) / n_actions

n_sessions = 250  
percentile = 70   
learning_rate = 0.5  

log = []

for i in range(100):
    
    %time sessions = [generate_session(policy) for _ in range(n_sessions)]
    
    states_batch, actions_batch, rewards_batch = zip(*sessions)

    elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile=percentile)
    
    new_policy = update_policy(elite_states, elite_actions)
    
    policy = learning_rate * new_policy + (1 - learning_rate) * policy
    

    show_progress(rewards_batch, log)

# Tabular crossentropy method

You may have noticed that the taxi problem quickly converges from -100 to a near-optimal score and then descends back into -50/-100. This is in part because the environment has some innate randomness. Namely, the starting points of passenger/driver change from episode to episode.

### Tasks
- __1.1__ (1 pts) Find out how the algorithm performance changes if you change different percentile and different n_samples. Show all precentiles on one graph and all n_samples on another
- __1.2__ (1 pts) Tune the algorithm to end up with positive average score.

It's okay to modify the existing code.


In [None]:


import gym
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

env = gym.make("Taxi-v3")
env.reset()
n_states = env.observation_space.n
n_actions = env.action_space.n



def train_with_params(n_sessions_val, percentile_val, steps=40):
    policy_local = np.ones([n_states, n_actions]) / n_actions
    mean_rewards = []
    
    for i in range(steps):
        sessions = [generate_session(policy_local) for _ in range(n_sessions_val)]
        states_batch, actions_batch, rewards_batch = zip(*sessions)
        
        elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile_val)
        new_policy = update_policy(elite_states, elite_actions)
        
        policy_local = 0.5 * new_policy + 0.5 * policy_local
        mean_rewards.append(np.mean(rewards_batch))
    
    return mean_rewards

percentiles = [50, 60, 70, 80, 90]
results_percentile = {}

for p in percentiles:
    results_percentile[p] = train_with_params(250, p, 40)

plt.figure(figsize=(10, 5))
for p, rewards in results_percentile.items():
    plt.plot(rewards, label=f'Percentile {p}')
plt.xlabel('Iteration')
plt.ylabel('Mean Reward')
plt.title('Effect of Percentile (n_sessions=250)')
plt.legend()
plt.grid(True)
plt.show()

n_sessions_list = [100, 250, 500, 1000]
results_n_sessions = {}

for n in n_sessions_list:
    results_n_sessions[n] = train_with_params(n, 70, 40)

plt.figure(figsize=(10, 5))
for n, rewards in results_n_sessions.items():
    plt.plot(rewards, label=f'n_sessions {n}')
plt.xlabel('Iteration')
plt.ylabel('Mean Reward')
plt.title('Effect of n_sessions (percentile=70)')
plt.legend()
plt.grid(True)
plt.show()


def train_positive():
    n_sessions = 300
    percentile = 80
    learning_rate = 0.3
    
    policy = np.ones([n_states, n_actions]) / n_actions
    policy_history = []
    
    for i in range(100):
        sessions = [generate_session(policy) for _ in range(n_sessions)]
        states_batch, actions_batch, rewards_batch = zip(*sessions)
        
        elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile)
        new_policy = update_policy(elite_states, elite_actions)
        
        policy = learning_rate * new_policy + (1 - learning_rate) * policy
        
        policy_history.append(policy.copy())
        if len(policy_history) > 20:
            avg_policy = np.mean(policy_history[-20:], axis=0)
            policy = avg_policy / np.sum(avg_policy, axis=1, keepdims=True)
        
        mean_reward = np.mean(rewards_batch)
        if i % 10 == 0:
            print(f"Iteration {i}: mean reward = {mean_reward:.1f}")
        
        if mean_reward > 5:
            print(f"Success! Reward = {mean_reward:.1f}")
            break
    
    return policy

final_policy = train_positive()

# final тест
test_rewards = [generate_session(final_policy)[-1] for _ in range(100)]
final_mean = np.mean(test_rewards)
print(f"Ft: mean reward = {final_mean:.1f}")

if final_mean > 0:
    print("sucsee tune")
else:
    print("fail succes")

# How do different percentiles affect training efficiency?

In [None]:
1.1:
percentiles:
быстрое обучение, не стабильные резы это примерно от 0 до 60
медленно обучается но хорошие резы это от 80 и больше
в среднем все хорошо от 70-80 по скорости и резам
1.2 

взяли  +- оптимальные параметры    
n_sessions = 300
    percentile = 80
    learning_rate = 0.3
мы начили достигать нормальных наград +5 + 10 (и т.д) вместо -100 которые были без тренировки 

SyntaxError: invalid syntax (904953148.py, line 1)

In [None]:
n_sessions_ar = [50, 100, 250, 500] 
perc = 70  
learning_rate = 0.5  

log = []
mean_results = []

for n_sessions in n_sessions_ar:
    curMean = []
    policy = np.ones([n_states, n_actions]) / n_actions
    
    for i in range(stepCounter):
        sessions = [generate_session(policy) for _ in range(n_sessions)]
        states_batch, actions_batch, rewards_batch = zip(*sessions)

        elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile=perc)
        new_policy = update_policy(elite_states, elite_actions)
        
        policy = learning_rate * new_policy + (1 - learning_rate) * policy
        
        mean_reward = np.mean(rewards_batch)
        curMean.append(mean_reward)
        
        clear_output(True)
        print(f"n_sessions: {n_sessions}, iteration: {i}, mean: {mean_reward:.1f}")
    
    mean_results.append(curMean)


plt.figure(figsize=(10, 5))
for i, n in enumerate(n_sessions_ar):
    plt.plot(mean_results[i], label=f'n_sessions={n}')
plt.xlabel('Iteration')
plt.ylabel('Mean Reward')
plt.title('Effect of n_sessions on Training')
plt.legend()
plt.grid(True)
plt.show()

# Stabilize positive rewards by averaging policy across 10 games (2 pts)

In [None]:


def generate_session(policy, t_max=1000):
    states, actions = [], []
    total_reward = 0.
    
    s = env.reset()
    
    for t in range(t_max):
        a = np.random.choice(range(n_actions), p=policy[s])
        new_s, r, done, info = env.step(a)
        
        states.append(s)
        actions.append(a)
        total_reward += r
        
        s = new_s
        if done:
            break
            
    return states, actions, total_reward


n_sessions = 100
percentile = 70
learning_rate = 0.3

log = []
policy_history = []
policy = np.ones([n_states, n_actions]) / n_actions

for i in range(100):
    sessions = [generate_session(policy) for _ in range(n_sessions)]
    states_batch, actions_batch, rewards_batch = zip(*sessions)
    
    elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile)
    new_policy = update_policy(elite_states, elite_actions)
    

    policy = learning_rate * new_policy + (1 - learning_rate) * policy
    

    policy_history.append(policy.copy())
    if len(policy_history) > 20:
        avg_policy = np.mean(policy_history[-20:], axis=0)
        policy = avg_policy / np.sum(avg_policy, axis=1, keepdims=True)
    

    mean_reward = np.mean(rewards_batch)
    if i % 10 == 0:
        print(f"Итерация {i}: ср. награда = {mean_reward:.1f}")
    
    if mean_reward > 5:
        print(f"Награда = {mean_reward:.1f}")
        break

test_rewards = [generate_session(policy)[-1] for _ in range(50)]
print(f"тест: среднее = {np.mean(test_rewards):.1f}")

# Digging deeper: approximate crossentropy with neural nets (2 pts)

In this section we will train a neural network policy for continuous state space game

In [None]:
import gym
import numpy as np, pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
env = gym.make("CartPole-v0").env  #if you see "<classname> has no attribute .env", remove .env or update gym

env.reset()
n_actions = env.action_space.n

plt.imshow(env.render("rgb_array"))

In this case use NN as a black box. All your should know that it is more complex than a tabular method. 

In [None]:
#create agent
from sklearn.neural_network import MLPClassifier
agent = MLPClassifier(hidden_layer_sizes=(20,20),
                      activation='tanh',
                      warm_start=True, #keep progress between .fit(...) calls
                      max_iter=1 #make only 1 iteration on each .fit(...)
                     )
#initialize agent to the dimension of state an amount of actions
agent.fit([env.reset()]*n_actions,range(n_actions));


In [None]:
def generate_session(t_max=1000):
    
    states,actions = [],[]
    total_reward = 0
    
    s = env.reset()
    
    for t in range(t_max):
        
        #predict array of action probabilities hint: predict_proba
        
        
        a = np.random.choice(np.arange(n_actions), p=probs)
        
        new_s,r,done,info = env.step(a)
        
        #record sessions like you did before
        
        
        s = new_s
        if done: break
    return states,actions,total_reward
        

In [None]:
def select_elites(states_batch,actions_batch,rewards_batch,percentile=50):
    """
    Select states and actions from games that have rewards >= percentile
    :param states_batch: list of lists of states, states_batch[session_i][t]
    :param actions_batch: list of lists of actions, actions_batch[session_i][t]
    :param rewards_batch: list of rewards, rewards_batch[session_i][t]
    
    :returns: elite_states,elite_actions, both 1D lists of states and respective actions from elite sessions
    
    Please return elite states and actions in their original order 
    [i.e. sorted by session number and timestep within session]
    
    If you're confused, see examples below. Please don't assume that states are integers (they'll get different later).
    """ 
    reward_threshold = np.percentile(rewards_batch,percentile)
    
    elite_states = []
    
    if(len(np.array(states_batch[0]).shape) == 1):
        elite_states = np.hstack(states_batch[rewards_batch >= reward_threshold])
    else:
        elite_states = np.vstack(states_batch[rewards_batch >= reward_threshold])
    elite_actions = np.hstack(actions_batch[rewards_batch >= reward_threshold])
    
    return elite_states,elite_actions
    

To train this classification model use fit(states, actions). In this case we are training classificator to predict the correct class (action) in a given state

In [None]:
n_sessions = 100
percentile = 70
log = []

for i in range(100):
    #generate new sessions
    sessions = [generate_session() for _ in range(n_sessions)]

    states_batch,actions_batch,rewards_batch = map(np.array,zip(*sessions))
    print(states_batch.shape)

    elite_states, elite_actions = select_elites(states_batch,actions_batch,rewards_batch,percentile=50)
    
    #fit agent

    show_progress(rewards_batch,log,reward_range=[0,np.max(rewards_batch)])
    
    if np.mean(rewards_batch)> 190:
        print("You Win! You may stop training now via KeyboardInterrupt.")
        break

# Report (1 pts)

In [None]:
# Describe what you did here.  Preferably with plot/report to support it