## Robo Cleaner

In [340]:
from collections import defaultdict
import time
import math
from IPython.display import HTML, display, clear_output
import random
import gym
from gym import spaces
import numpy as np
import pandas as pd

ATTEMPTS = 200
MAX_PENALTY = -10.0

# C  - Cleaner
# .  - Not visited cell
# #  - Wall
# V  - Visited cell
ROOM = [
    ['.', '.', '.', '.', '.'],
    ['.', '#', '#', '.', 'C'],
    ['.', '#', '.', '.', '.'],
    ['.', '.', '.', '#', '.'],
    ['.', '.', '.', '.', '.'],
]

In [341]:
%%HTML
<style type="text/css">
    table.room, table.room td{
        border: 1px solid black;
        font-size: 19px;
        text-align: center !important;
        vertical-align: middle;
        width: 20px;
        height: 20px
    }
    table.room td.not_visited {
        background: lightgray
    }
    table.room td.visited {
        background: purple
    }
    table.room td.wall {
        background: lightblue
    }
    table.room td.cleaner {
        background: yellow
    }
    table.room tr {
        background: none !important
    }
</style>

### Classes And Functions

In [342]:
class RoomEnv(gym.Env):
    def __init__(self, room):
        super(RoomEnv, self).__init__()
        self.room_orig = room
        self.reset()

    def reset(self):
        self.room = np.array(self.room_orig)
        self.start_pos = np.where(self.room == 'C')
        self.current_pos = self.start_pos
        self.rows, self.cols = self.room.shape

        self.visited = 0
        self.to_visit = len(np.where(self.room == '.')[0])

        self.visited_map = defaultdict(lambda :0)

        # 4 possible actions: 0=up, 1=down, 2=left, 3=right
        self.action_space = spaces.Discrete(4)

        # Observation space is grid of size:rows x columns
        self.observation_space = spaces.Tuple((spaces.Discrete(self.rows), spaces.Discrete(self.cols)))

        self.epoch = 0
        self.penalties = 0

        return self.current_pos
    def step(self, action):
        self.epoch += 1
        new_pos = np.array(self.current_pos)

        if action == 0:  # Up
            new_pos[0] -= 1
        elif action == 1:  # Down
            new_pos[0] += 1
        elif action == 2:  # Left
            new_pos[1] -= 1
        elif action == 3:  # Right
            new_pos[1] += 1

        done = False
        reward = 0.0

        if self._is_valid_position(new_pos):
            old_pos = np.array(self.current_pos)
            self.current_pos = new_pos

            r_new, c_new = new_pos[0][0], new_pos[1][0]
            r_old, c_old = old_pos[0][0], old_pos[1][0]

            ch = self.room[r_new, c_new]
            key = "_".join([str(r_new), str(c_new)])

            self.room[r_old, c_old] = 'V'
            self.room[r_new, c_new] = 'C'

            if ch == '.':
                reward = 1.0
                self.visited = self.visited + 1

                self.visited_map[key] = 1
                if self.visited == self.to_visit:
                    done = True
            else:
                old_val = self.visited_map[key]
                reward = -0.5 - old_val * 0.5
                self.visited_map[key] = old_val + 1
        else:
            reward = MAX_PENALTY
            self.penalties += 1

        return self.current_pos, reward, done, {}
        
    def _is_valid_position(self, pos):
        row, col = pos

        # If agent goes out of the grid
        if row < 0 or col < 0 or row >= self.rows or col >= self.cols:
            return False

        # If the agent hits an obstacle
        if self.room[row, col] == '#':
            return False
        return True

    def render(self):
        header = "<b>Epoch</b>: %d; <b>Penalties</b>: %d" % (self.epoch, self.penalties) 
        trs = []
        classes = {
            '.': 'not_visited',
            '#': 'wall',
            'C': 'cleaner',
            'V': 'visited'
        }
        for row in self.room:
            tr = "<tr>"
            for col in row:
                tr += "<td class='" + classes[col] + "'>" + col + "</td>"
            tr += "</tr>"
            trs.append(tr)
        display(HTML(header + "<table class='room'>" + "".join(trs) + "</table>"))
        # print(self.room)


class Attempts:
    def __init__(self, name):
        self.name = name

        self.attempts = []

    def add_attempt(self):
        self.attempts.append({"epochs": 0, "penalties": 0, "actions": []})
        return len(self.attempts) - 1

    def add_epoch(self, attempt=None):
        if attempt is None:
            attempt = self._last_attempt()
        self.attempts[attempt]["epochs"] += 1

    def add_penalty(self, attempt=None):
        if attempt is None:
            attempt = self._last_attempt()
        self.attempts[attempt]["penalties"] += 1

    def add_action(self, action, attempt=None):
        if attempt is None:
            attempt = self._last_attempt()
        self.attempts[attempt]["actions"].append(action)

    def best_attempt(self):
        return sorted(self.attempts, key=lambda a: a["epochs"])[0]

    def _last_attempt(self):
        return len(self.attempts) - 1

    def metrics(self):
        metrics = {
            "min_epochs": math.inf,
            "avg_epochs": 0,
            "avg_penalties": 0,
            "min_penalties": math.inf,
            "name": self.name
        }
        penalties = 0
        epochs = 0

        for attempt in self.attempts:
            penalties += attempt["penalties"]
            epochs += attempt["epochs"]
            if attempt["epochs"] < metrics["min_epochs"]:
                metrics["min_epochs"] = attempt["epochs"]
            if attempt["penalties"] < metrics["min_penalties"]:
                metrics["min_penalties"] = attempt["penalties"]
        metrics["avg_epochs"] = epochs / len(self.attempts)
        metrics["avg_penalties"] = penalties / len(self.attempts)

        return metrics

In [343]:
def state_to_index(state, rows):
    return rows * state[0][0] + state[1][0]

def show_attempt(actions, delay=0.2):
    env = RoomEnv(ROOM)

    env.render()
    for action in actions:
        next_state, reward, done, info = env.step(action)
        clear_output(wait=True)
        env.render()
        time.sleep(delay)

### Bruteforce

In [344]:
def brute_force():
    env = RoomEnv(ROOM)
    attempts = Attempts('BRUTE FORCE')
    
    for attempt in range(ATTEMPTS):
        env.reset()
        attempt = attempts.add_attempt()
        while True:
            action = env.action_space.sample()  # Random action
            attempts.add_action(action)
            next_state, reward, done, info = env.step(action)
                
            attempts.add_epoch()           
            if reward == MAX_PENALTY:
                attempts.add_penalty()

            if done:
                break
    return attempts

In [345]:
bf_attempts = brute_force()

In [346]:
show_attempt(bf_attempts.best_attempt()["actions"])

0,1,2,3,4
V,V,C,V,V
V,#,#,V,V
V,#,V,V,V
V,V,V,#,V
V,V,V,V,V


### Q-Learning

```Q(s,a)=Q(s,a)+α[r+γmaxuQ(s′,u)−Q(s,a))]```

In [347]:
def q_learning():
    env = RoomEnv(ROOM)
    rows = len(ROOM)
    cols = len(ROOM[0])
    
    states = rows * cols

    q_table = np.zeros([states, env.action_space.n])

    alpha = 0.1
    gamma = 0.6
    epsilon = 0.1

    attempts = Attempts('Q-Learning')

    for i in range(ATTEMPTS):
        attempt = attempts.add_attempt()

        state = env.reset()
        done = False

        while not done:
            state_index = state_to_index(state, rows)
            if random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(q_table[state_index])

            attempts.add_action(action)

            next_state, reward, done, info = env.step(action)
            next_state_index = state_to_index(next_state, rows)

            old_value = q_table[state_index, action]
            next_max = np.max(q_table[next_state_index])

            new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
            q_table[state_index, action] = new_value

            if reward == MAX_PENALTY:
                attempts.add_penalty()

            state = next_state
            attempts.add_epoch()
    return attempts, q_table

In [348]:
q_attempts, q_table = q_learning()

In [349]:
q_table

array([[-24.99461641, -25.15960456, -24.99461355, -24.70345943],
       [-24.98793789, -24.98772482, -28.49364701, -24.90694594],
       [-24.88861338, -24.948332  , -27.92108728, -22.30951079],
       [-24.87948273, -26.21395823, -22.50928442, -24.68475699],
       [-24.95402576, -29.09904811, -29.3986481 , -24.95522776],
       [-22.58482846, -26.08456367, -24.85331642, -24.81955292],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [-30.40291482, -24.53116156, -24.98296382, -32.09705985],
       [-25.59083641, -24.88283777, -22.31568262, -24.29565215],
       [-19.7463742 , -23.31093052, -24.37461335, -24.71967203],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [-24.93321984, -26.59861545, -24.9334239 , -25.6109588 ],
       [-25.00280408, -24.99807739, -23.86178549, -24.96433329],
       [-24.43257973, -23.55037875, -24.74952115, -24.54423674],
       [-22.42916846, -22

In [350]:
show_attempt(q_attempts.best_attempt()["actions"])

0,1,2,3,4
V,V,V,V,V
V,#,#,V,V
V,#,V,V,V
V,C,V,#,V
V,V,V,V,V


### SARSA

```Q(s,a)=Q(s,a)+α[r+γQ(s′,a′)−Q(s,a))]```

In [351]:
def sarsa():
    env = RoomEnv(ROOM)
    rows = len(ROOM)
    cols = len(ROOM[0])
    
    states = rows * cols

    q_table = np.zeros([states, env.action_space.n])

    alpha = 0.1
    gamma = 0.9
    epsilon = 0.1

    attempts = Attempts('SARSA')
    
    def get_action(state_index):
        if random.uniform(0, 1) < epsilon:
            return env.action_space.sample()
        else:
            return np.argmax(q_table[state_index])

    def update(state_index, next_state_index, reward, action, next_action):
        predict = q_table[state_index, action]
        target = reward + gamma * q_table[next_state_index, next_action]
        q_table[state_index, action] = q_table[state_index, action] + alpha * (target - predict)

    for i in range(200):
        attempt = attempts.add_attempt()

        state = env.reset()
        done = False
        state_index = state_to_index(state, rows)
        action = get_action(state_index)

        while not done:
        #for i in range(1000):
            next_state, reward, done, info = env.step(action)
            next_state_index = state_to_index(next_state, rows)
            attempts.add_action(action)
 
            #Choosing the next action
            next_action = get_action(next_state_index)
         
            #Learning the Q-value
            update(state_index, next_state_index, reward, action, next_action)

            if reward == MAX_PENALTY:
                attempts.add_penalty()
                
            state = next_state
            action = next_action
            state_index = state_to_index(state, rows)
            attempts.add_epoch()
    return attempts, q_table

In [352]:
sarsa_attempts, sq_table = sarsa()

In [353]:
sq_table

array([[-62.39856826, -62.60198127, -62.63374392, -58.21312737],
       [-60.43106867, -60.29236654, -59.95040824, -57.47856956],
       [-59.55923325, -60.11349289, -59.07219543, -57.09036331],
       [-59.10707031, -58.53275412, -58.47858927, -59.1765153 ],
       [-59.4198352 , -58.36798312, -60.27344734, -60.00261553],
       [-58.34473075, -59.19844933, -60.2932356 , -59.63148457],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [-57.88689463, -56.08800162, -57.74937923, -57.67053345],
       [-58.10230592, -57.62145665, -57.88337102, -57.97032561],
       [-59.00728578, -60.20628484, -60.33037524, -60.22360135],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [-60.432629  , -58.39045109, -60.56449153, -59.68140329],
       [-57.94289549, -58.2702175 , -58.24433027, -58.82123677],
       [-59.57352727, -58.42526584, -58.94209806, -60.28794258],
       [-57.63632625, -59

In [354]:
show_attempt(sarsa_attempts.best_attempt()["actions"])

0,1,2,3,4
V,V,V,V,C
V,#,#,V,V
V,#,V,V,V
V,V,V,#,V
V,V,V,V,V


### Metrics

In [355]:
df = pd.DataFrame([bf_attempts.metrics(), 
                   q_attempts.metrics(),
                   sarsa_attempts.metrics()]).set_index("name")
df

Unnamed: 0_level_0,min_epochs,avg_epochs,avg_penalties,min_penalties
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
BRUTE FORCE,72,263.87,99.845,23
Q-Learning,23,685.875,323.005,0
SARSA,68,227.995,19.085,0
