In [1]:
import sys

sys.path.append("/Users/kanishkjain/opt/anaconda3/envs/gym/lib/python3.9/site-packages")

import random
import collections
from pprint import pprint

import gym
import gym_toytext
import numpy as np
import matplotlib.pyplot as plt

In [38]:
class Agent:
    def __init__(
        self, environment="Roulette-v0"
    ) -> None:

        self.environment = environment

    def epsilon_greedy_policy(self, Pi, epsilon):
        
        Pi = np.around(Pi, decimals=1)

        p = np.random.rand()
        Num_A = len(Pi)
        if p < epsilon:
            return np.random.choice(Num_A)
        else:
            a_star = np.max(Pi)
            max_indices = [i for i in range(Num_A) if Pi[i]==a_star]
            A = np.random.choice(max_indices)
            return A
    
    def soft_policy(self, Num_A):
        A = np.random.choice(Num_A)
        return A
    
    def on_policy_monte_carlo(self, num_iter=30000, alpha=0.5,  gamma=0.6, epsilon = 0.7):

        env = gym.make(self.environment)
        Num_A = env.action_space.n
        Num_S = env.observation_space.n
        env.reset()

        NUM_ITER = num_iter

        Q = np.zeros((Num_S, Num_A))
        C = np.zeros((Num_S, Num_A))
        rewards = np.zeros(NUM_ITER)

        Pi = np.ones((Num_S, Num_A))/Num_A
        
        returns = collections.defaultdict(list)

        for it in range(1, NUM_ITER + 1):
            if it % 5000 == 0:
                print(f"Generating Episode Number: {it}")
                
            S = env.reset()
            
            G = 0
            step = 0
            episode = []
            while True:
                if np.random.rand() < epsilon:
                    A = np.random.choice(Num_A)
                else:
                    a_star = np.max(Pi[S])
                    max_indices = [i for i in range(Num_A) if Pi[S, i]==a_star]
                    A = np.random.choice(max_indices)
            
                S_, R, terminal, _ = env.step(A)
                
                if terminal and self.environment == 'CliffWalking-v0':
                    R = 0
                
                episode.append((S, A, R))
                
                G = G + (gamma**step)*R

                S = S_
                
                step+=1

                if terminal:
                    break
                    
            rewards[it - 1] = G

            S_A = [(S, A) for (S, A, _) in episode]
            
            G = 0
            t = len(episode) - 1
            while t >= 0:
                S, A, R = episode[t]
                G = G + R
                if not (S, A) in S_A[:t]:
                    returns[(S, A)].append(G)
                    Q[S, A] = sum(returns[(S, A)])/len(returns[(S, A)])
                    
                    q_max = np.max(Q[S])
                    max_indices = [i for i in range(Num_A) if Q[S, i]==q_max]
                    A_star = np.random.choice(max_indices)
                    
                    for a in range(Num_A):
                        if a == A_star:
                            Pi[S, a] = 1 - epsilon + (epsilon/Num_A)
                        else:
                            Pi[S, a] = (epsilon/Num_A)
                t-=1
                
        return Q, rewards, Pi
    
    def off_policy_monte_carlo(self, num_iter=50000, alpha=0.05, gamma=0.0, epsilon = 0.1):

        env = gym.make(self.environment)
        Num_A = env.action_space.n
        Num_S = env.observation_space.n
        env.reset()

        NUM_ITER = num_iter

        # Q = np.random.rand(Num_S, Num_A)
        Q = np.zeros((Num_S, Num_A))
        C = np.zeros((Num_S, Num_A))
        rewards = np.zeros(NUM_ITER)
        
        Pi = np.ones((Num_S, Num_A))/Num_A

        for it in range(1, NUM_ITER + 1):
            if it % 5000 == 0:
                print(f"Generating Episode Number: {it}")

            S = env.reset()
            
            G = 0
            step = 0
            episode = []
            while True:    
                A = np.random.choice(Num_A)
                S_, R, terminal, _ = env.step(A)
                
                if terminal and self.environment == 'CliffWalking-v0':
                    R = 0
                
                episode.append((S, A, R))
                
                G = G + (gamma**step)*R

                S = S_
                
                step+=1

                if terminal:
                    break
                    
            rewards[it - 1] = G
            
            G = 0
            W = 1
            t = len(episode) - 1
            while t >= 0:
                S, A, R = episode[t]
                
                G = gamma * G + R
                
                C[S, A] += W
                Q[S, A] += (W/C[S, A]) * (G - Q[S, A])
                
                q_max = np.max(Q[S])
                max_indices = [i for i in range(Num_A) if Q[S, i]==q_max]
                A_star = np.random.choice(max_indices)
                
                for a in range(Num_A):
                    if a == A_star:
                        Pi[S, a] = (1 - epsilon) + (epsilon/Num_A)
                    #if a in max_indices:
                    #    Pi[S, a] = (1 - epsilon)/(len(max_indices)) + (epsilon/Num_A)
                    else:
                        Pi[S, a] = (epsilon/Num_A)
                
                W = W*(Pi[S, A]*Num_A) #/epsilon
                
                if W == 0:
                    break
                
                t -= 1

        return Q, rewards
    
    def q_learning(self, num_iter=30000, alpha=0.1, gamma=0.9, epsilon = 0.05):

        env = gym.make(self.environment)
        Num_A = env.action_space.n
        Num_S = env.observation_space.n
        env.reset()

        NUM_ITER = num_iter

        Q = np.zeros((Num_S, Num_A))
        rewards = np.zeros(NUM_ITER)

        for it in range(1, NUM_ITER + 1):

            if it % 5000 == 0:
                print(f"Generating Episode Number: {it}")

            S = env.reset()
            
            G = 0
            step = 0
            while True:
                A = self.epsilon_greedy_policy(Q[S], epsilon)
                S_, R, terminal, _ = env.step(A)
                
                G = G + (gamma**step)*R

                Q[S][A] += alpha * (R + gamma * np.max(Q[S_]) - Q[S][A])

                S = S_
                
                step+=1

                if terminal:
                    break
                    
            rewards[it -1] = G

        return Q, rewards

    def sarsa(self, num_iter=30000, alpha=0.1, gamma=0.9, epsilon = 0.05):

        env = gym.make(self.environment)
        Num_A = env.action_space.n
        Num_S = env.observation_space.n
        env.reset()

        NUM_ITER = num_iter

        Q = np.zeros((Num_S, Num_A))
        rewards = np.zeros(NUM_ITER)

        for it in range(1, NUM_ITER + 1):
            if it % 5000 == 0:
                print(f"Generating Episode Number: {it}")

            S = env.reset()
            A = self.epsilon_greedy_policy(Q[S], epsilon)
            
            G = 0
            step = 0
            while True:
                S_, R, terminal, _ = env.step(A)
                A_ = self.epsilon_greedy_policy(Q[S_], epsilon)
                
                G = G + (gamma**step)*R

                Q[S][A] = Q[S][A] + alpha * (R + gamma * Q[S_][A_] - Q[S][A])

                S = S_
                A = A_
                
                step += 1

                if terminal:
                    break
            rewards[it -1] = G

        return Q, rewards

    def show_policy(self, Q):

        MAX_STEPS = 51
        
        env = gym.make(self.environment)
        Num_A = env.action_space.n
        Num_S = env.observation_space.n

        S = env.reset()
        print(f"Starting state: {S}")

        step = 0
        done = False
        while step < MAX_STEPS:
            A = np.argmax(Q[S])
            # env.render()
            S_, R, done, _ = env.step(A)
            if done:
                break
            print(
                f"Current State: {S}, action: {A}, reward: {R}, terminal: {done}, step: {step}"
            )
            S = S_
            step += 1
        if done:
            print(
                f"Current State: {S}, action: {A}, reward: {R}, terminal: {done}, step: {step}"
            )
            # env.render()
        env.close()
        print("Finished", done)

In [39]:
agent = Agent(environment='Roulette-v0')

In [40]:
q_value, rewards, policy = agent.on_policy_monte_carlo()
agent.show_policy(q_value)

Generating Episode Number: 5000
Generating Episode Number: 10000
Generating Episode Number: 15000
Generating Episode Number: 20000
Generating Episode Number: 25000
Generating Episode Number: 30000
Starting state: 0
Current State: 0, action: 37, reward: 0, terminal: True, step: 0
Finished True


In [41]:
off_policy, rewards = agent.off_policy_monte_carlo()
agent.show_policy(off_policy)

Generating Episode Number: 5000
Generating Episode Number: 10000
Generating Episode Number: 15000
Generating Episode Number: 20000
Generating Episode Number: 25000
Generating Episode Number: 30000
Generating Episode Number: 35000
Generating Episode Number: 40000
Generating Episode Number: 45000
Generating Episode Number: 50000
Starting state: 0
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 0
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 1
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 2
Current State: 0, action: 28, reward: -1.0, terminal: False, step: 3
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 4
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 5
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 6
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 7
Current State: 0, action: 28, reward: 1.0, terminal: False, step: 8
Current State: 0, action: 28, reward: 1.

In [42]:
off_policy[0]

array([-0.3072497 , -0.99996614, -0.90028068,  0.54515249,  0.21347303,
        0.34593049, -0.78845437,  0.41948918, -0.38611758, -0.22053396,
       -0.3294432 ,  0.08780172, -0.19371107, -0.99994617,  0.0109279 ,
       -0.19209121,  0.20451046, -0.81455185, -0.4230577 , -0.90168653,
       -0.14390406,  0.15154567, -0.00395848,  0.00953864, -0.27278296,
       -0.26963759,  0.9995746 , -0.9960765 ,  0.99978178,  0.99462403,
       -0.02002793, -0.88427229,  0.14254751, -0.13860719,  0.00392699,
        0.6087428 ,  0.65780813,  0.        ])

In [37]:
q_policy, rewards = agent.q_learning()
agent.show_policy(q_policy)

Generating Episode Number: 5000
Generating Episode Number: 10000
Generating Episode Number: 15000
Generating Episode Number: 20000
Generating Episode Number: 25000
Generating Episode Number: 30000
Starting state: 0
Current State: 0, action: 37, reward: 0, terminal: True, step: 0
Finished True


In [26]:
sarsa_policy, rewards = agent.sarsa()
agent.show_policy(sarsa_policy)

Generating Episode Number: 5000
Generating Episode Number: 10000
Generating Episode Number: 15000
Generating Episode Number: 20000
Generating Episode Number: 25000
Generating Episode Number: 30000
Starting state: 0
Current State: 0, action: 37, reward: 0, terminal: True, step: 0
Finished True
