In [1]:
''' environment'''
import gym 

'''storage '''
import collections

''' writer '''
from tensorboardX import SummaryWriter 

**Env**\
[frozen lake description](https://gym.openai.com/envs/FrozenLake-v0/)

SFFF       (S: starting point, safe)\
FHFH       (F: frozen surface, safe)\
FFFH       (H: hole, fall to your doom)\
HFFG       (G: goal, where the frisbee is located)\

The episode ends when you reach the goal or fall in a hole. You receive a reward of 1 if you reach the goal, and zero otherwise. 

In [5]:
Gamma = 0.9 

class Agent():
    
    def __init__(self):
        self.env = gym.make('FrozenLake-v0')
        self.state = self.env.reset()
        self.rewards = collections.defaultdict(float) 
        self.transists = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float) 
        
        
    ## collect initial data 
    def play_n_random(self, count):
        for _ in range(count):
            # run on env 
            #print(self.env.render())
            action = self.env.action_space.sample()
            next_state, reward, is_done, _ = self.env.step(action)
            
            # populate the matrices
            self.rewards[(self.state, action, next_state)] = reward
            self.transists[(self.state, action)][next_state] += 1 
            
            if is_done:
                self.state = self.env.reset()
                break 
            self.state = next_state  
            
    def value_iteration(self):
        for state in range(self.env.observation_space.n): 
            for action in range(self.env.action_space.n):
                self.values[(state, action)] = self.calculate_action_value(state, action)
            

    def calculate_action_value(self, state, action):
        target_counts = self.transists[(state, action)]
        total = sum(target_counts.values())
        
        action_value = 0.0 
        for nxt_state, count in target_counts.items():
            reward = self.rewards[(state,action, nxt_state)]
            
            best_action = self.select_action(nxt_state)
            
            value = self.values[(nxt_state, best_action)]
            action_value += (count/total)*(reward + Gamma*value)
            
        return action_value 
    
    def select_action(self, state):
        best_action, best_value = None, None 
        
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            
            if best_value is None or best_value < action_value:
                best_value = action_value 
                best_action = action 
        return best_action  
    
    def play_episode(self, env, render = False):
        total_reward = 0.0 
        state = env.reset()
        
        while True:
            if render:
                env.render()
                
            action = self.select_action(state)
            new_state, reward, is_done, _ = env.step(action)
            
            self.rewards[(state, action, new_state)] = reward 
            self.transists[(state, action)][new_state] += 1 
            total_reward += reward 
            
            if is_done:
                break 
            state = new_state 
        return total_reward  

In [3]:
agent = Agent()
agent.play_n_random(100)

print(agent.rewards)
print()
print(agent.transists)
print()
print(agent.values)
print()
print(agent.state)
print()
agent.env.render()

defaultdict(<class 'float'>, {(0, 1, 0): 0.0, (0, 1, 4): 0.0, (4, 3, 4): 0.0, (4, 2, 0): 0.0, (0, 0, 0): 0.0, (0, 1, 1): 0.0, (1, 2, 1): 0.0, (1, 2, 2): 0.0, (2, 3, 3): 0.0, (3, 3, 3): 0.0, (3, 0, 3): 0.0, (3, 1, 7): 0.0})

defaultdict(<class 'collections.Counter'>, {(0, 1): Counter({0: 2, 4: 1, 1: 1}), (4, 3): Counter({4: 1}), (4, 2): Counter({0: 1}), (0, 0): Counter({0: 3}), (1, 2): Counter({1: 1, 2: 1}), (2, 3): Counter({3: 1}), (3, 3): Counter({3: 1}), (3, 0): Counter({3: 1}), (3, 1): Counter({7: 1})})

defaultdict(<class 'float'>, {})

0


[41mS[0mFFF
FHFH
FFFH
HFFG


In [10]:
agent = Agent()
writer = SummaryWriter('valueBased')
test_env = gym.make('FrozenLake-v0')
iter_no = 0 
best_reward = 0.0 
    
while True:
    iter_no += 1 
    agent.play_n_random(10000)
    agent.value_iteration()
    
    reward = 0.0 
    test_episodes = 10
    for _ in range(test_episodes):
        reward += agent.play_episode(test_env, render = False)
    
    reward /= test_episodes 
    writer.add_scalar("reward", reward, iter_no)
    
    if reward > best_reward:
        print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
        best_reward = reward
    if reward > 0.80:
        print("Solved in %d iterations!" % iter_no)
        break
        
agent.play_episode(test_env, render = True) 
writer.close()

Best reward updated 0.000 -> 0.100
Best reward updated 0.100 -> 0.400
Best reward updated 0.400 -> 0.500
Best reward updated 0.500 -> 0.600
Best reward updated 0.600 -> 0.800
Best reward updated 0.800 -> 1.000
Solved in 186 iterations!

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FF[41mF[0mH
HFFG
  (Down)
SFFF
FHFH
FFFH
HF[41mF[0mG


In [45]:
from __future__ import print_function 

def play_s(agent, env):
    state = env.reset()
    render_list = []
    steps = 0
    while True:
        render_list.append(env.render("ansi"))
        
        action = agent.select_action(state)
        next_state, reward, is_done, _ = env.step(action)
        steps += 1 
        if reward == 1:
            print("*"*10, end= "\n")
            render_list.append(env.render("ansi"))
            print("successfully played in steps ",steps)
            for t in render_list:
                print(t.replace("\n", " "), end = "\t=>")
            print()
            env.reset()
            print("*"*10, end = "\n")
            break 
            
        if is_done and reward != 1:
            print("*"*10, end = "\n")
            for t in render_list:
                print(t.replace("\n", " "), end = "\t") 
            print("\nnot solved: trying again ")
            print("*"*10, end = "\n")
            steps = 0
            state = env.reset() 
            render_list.clear()
            
        state = next_state 
        
    return reward 

reward = play_s(agent, test_env)
print("reward ", reward)

**********
 [41mS[0mFFF FHFH FFFH HFFG 	  (Left) SFFF [41mF[0mHFH FFFH HFFG 	  (Left) SFFF [41mF[0mHFH FFFH HFFG 	  (Left) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH F[41mF[0mFH HFFG 	  (Down) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH F[41mF[0mFH HFFG 	  (Down) SFFF FHFH FF[41mF[0mH HFFG 	
not solved: trying again 
**********
**********
 [41mS[0mFFF FHFH FFFH HFFG 	  (Left) SFFF [41mF[0mHFH FFFH HFFG 	  (Left) SFFF [41mF[0mHFH FFFH HFFG 	  (Left) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH [41mF[0mFFH HFFG 	  (Up) SFFF FHFH F[41mF[0mFH HFFG 	  (Down) SFFF FHFH FFFH H[41mF[0mFG 	  (Right) SFFF FHFH F[41mF[0mFH HFFG 	  (Down) SFFF FHFH FF[41mF[0mH HFFG 	
not solved: trying again 
**********
**********
successfully played in steps  85
 [41mS[0mFFF FHFH FFFH HFFG 	=>  (Left) SFFF [41mF[0mHFH FFFH HFFG 	=>  (Left) [41mS[0mFFF FHFH FFFH HFFG 	=>  (Left) SFFF [41mF[0mHFH F

In [35]:
a = test_env.render("ansi")
b = test_env.render("ansi")

A = [a, b]

In [24]:
from __future__ import print_function

In [37]:
for t in A:
    print(t.replace("\n", " "), end = " \t")

 [41mS[0mFFF FHFH FFFH HFFG  	 [41mS[0mFFF FHFH FFFH HFFG  	

In [33]:
test_env.render()

  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m
