<a href="https://colab.research.google.com/github/maquico/IA-IDS330/blob/main/ids330_wumpus_parte2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mundo del Wumpus

**Reglas de Juego**

1. El agente solo se puedo mover arriba, abajo, izquiera y derecha. No se puede mover en diagonal.
2. Si hay un agujero en una casilla adyacente (excluyendo las diagonales), **el agente siente brisa**.
3. Si el Wumpus está en una casilla adyacente (excluyendo las diagonales), **el agente siente un mal olor**.
4. El agente puede eliminar el Wumpus si sabe donde está.
5. Si el oro se encuentra en la casilla donde está el agente, **el agente siente un brillo**, y puedo el coger el oro.
6. El agente sabe cuando hay una pared. El agente no volarse la pared.
7. Se gana cogiendo el oro y volviendo a la casilla original.

**Más específico**
- El mundo será 5x5.
- El agente siempre va a comenzar en la casilla (0, 0): la esquina superior izquierda.
- Solamente habrá un Wumpus.
- Habrá de 2 a 5 agujeros.
- Las casillas adyacentes al (0, 0) siempre estarán libres de agujeros o wumpus.
- Solamente un oro.


## Parte 2 | Aprendizaje por Refuerzo (Q-Learning)

In [None]:
# HIPERPARAMETROS

RANDOM_SEED = 2024

## TAMAÑO DEL MUNDO
HEIGHT = 5
WIDTH = 5

## NUMERO DE ITEMES
NUM_PITS = 2
NUM_WUMPUS = 1
NUM_GOLD = 1
INITIAL_COORD_AGENT = (0, 0)

## REFERENTE AL APRENDIZAJE
EPOCHS = 5000       # El numero de veces que el agente va a jugar
MAX_STEPS = 50      # La cantidad de pasos permitida que el agente puede tomar en una partida
LEARNING_RATE = 0.1 # La magnitud por la cual el agente va a actualizar la tabla de Q

In [None]:
import random
import itertools
import numpy as np


def get_adjacent_caves(coord: tuple, height: int, width: int) -> list:

    w, h = coord[0], coord[1]
    adjacent_caves = []

    if h - 1 >= 0:
        adjacent_caves.append((w, h-1))

    if h + 1 < height:
        adjacent_caves.append((w, h+1))

    if w - 1 >= 0:
        adjacent_caves.append((w-1, h))

    if w + 1 < width:
        adjacent_caves.append((w+1, h))

    return adjacent_caves


class WumpusWorld():
    def __init__(self, initial_coord_agent, height, width, num_pits, num_wumpus, num_gold):
        self.initial_coord_agent = initial_coord_agent
        self.height = height
        self.width = width
        self.num_pits = num_pits
        self.num_wumpus = num_wumpus
        self.num_gold = num_gold

        self.coords = itertools.product(list(range(height)), list(range(width)))
        # (0,0), (0, 1), (0, 2)....
        self.coords = list(self.coords)

        # Seleccionando coordenadas iniciales
        coords_to_avoid = get_adjacent_caves(initial_coord_agent, height, width)
        coords_to_avoid.append(initial_coord_agent)
        coords_except_entrance = [c for c in self.coords if c not in coords_to_avoid]
        select_coords = random.sample(coords_except_entrance, num_pits + num_wumpus + num_gold)
        self.coord_agent = initial_coord_agent
        self.previous_position = (-1,-1)
        self.coord_wumpus = select_coords[0]
        self.coord_gold = select_coords[1]
        self.coord_pits = select_coords[2:]

        # Estados iniciales
        self.is_agent_alive = True
        self.is_wumpus_alive = True
        self.breeze = False
        self.smell = False
        self.glitter = False
        self.bump = False
        self.score = 0.0
        self.last_score = 0.0
        self.steps = 0

        # Construyendo el mundo
        self.world = []
        for h in range(self.height):
            self.world.append(['*'] * self.width)
        self.world[self.coord_agent[0]][self.coord_agent[1]] = '☻'
        self.world[self.coord_wumpus[0]][self.coord_wumpus[1]] = 'w'
        self.world[self.coord_gold[0]][self.coord_gold[1]] = 'g'
        for c in self.coord_pits:
            self.world[c[0]][c[1]] = 'p'

        self.breeze_coords = self.get_breeze_coords()
        self.smell_coords = self.get_smell_coords()

    def reset_agent(self):
        self.world[self.coord_agent[0]][self.coord_agent[1]] = '*'
        self.world[self.previous_position[0]][self.previous_position[1]] = '*'
        self.coord_agent = self.initial_coord_agent
        self.previous_position = (-1,-1)
        self.is_agent_alive = True
        self.is_wumpus_alive = True
        self.breeze = False
        self.smell = False
        self.glitter = False
        self.bump = False
        self.last_score = self.score
        self.score = 0.0
        self.steps = 0

        self.world[self.coord_agent[0]][self.coord_agent[1]] = '☻'
        self.world[self.coord_wumpus[0]][self.coord_wumpus[1]] = 'w'
        self.world[self.coord_gold[0]][self.coord_gold[1]] = 'g'

        return self.last_score

    def get_breeze_coords(self):

        breeze_coords = []

        for c in self.coord_pits:
            adjacent_caves = get_adjacent_caves(c, self.height, self.width)
            adjacent_caves = [a for a in adjacent_caves if a not in self.coord_pits]
            breeze_coords.extend(adjacent_caves)

        return list(set(breeze_coords))

    def get_smell_coords(self):

        if self.is_wumpus_alive:
            return get_adjacent_caves(self.coord_wumpus, self.height, self.width)
        else:
            return None

    def print_world(self):
        for row in self.world:
            print(''.join(row))

    def move_agent(self, action: int, verbose: bool = False):
        # action => (arriba, derecha, abajo, izquierda) => (0, 1, 2, 3)
        self.score -= 1
        self.steps += 1

        x_t = self.coord_agent[0] + [0, 1, 0, -1][action]
        y_t = self.coord_agent[1] + [-1, 0, 1, 0][action]

        # ¿Es la nueva acción valida?
        if (x_t not in range(self.width)) or (y_t not in range(self.height)):
            self.score -= 500
            self.bump = True
            return 'bump'

        self.bump = False

        # Chequear por el tesoro
        if (x_t, y_t) == self.coord_gold:
            self.score += 1000
            self.glitter = True
            self.world[self.coord_agent[0]][self.coord_agent[1]] = '*'
            self.world[x_t][y_t] = '☻'
            self.previous_position = (x_t, y_t)
            if verbose:
                print('Tesoro encontrado!')
            return 'win'

        # Si se mueve hacia el wumpus o agujero, pierde
        if ((x_t, y_t) == self.coord_wumpus) or ((x_t, y_t) in self.coord_pits):
            self.is_agent_alive = False
            self.score -= 1000
            if verbose:
                print("Bye-bye agente")
            # last_score = self.reset_agent()
            return self.score

        self.world[self.coord_agent[0]][self.coord_agent[1]] = '*'
        self.previous_position = self.coord_agent
        self.coord_agent = (x_t, y_t)
        self.world[x_t][y_t] = '☻'

        # Sensando el entorno
        if (x_t, y_t) in self.breeze_coords:
            self.breeze = True
        else:
            self.breeze = False

        self.smell = True if (x_t, y_t) in self.smell_coords else False

        return


In [None]:
random.seed(RANDOM_SEED)
wumpus_world = WumpusWorld(INITIAL_COORD_AGENT, HEIGHT, WIDTH, NUM_PITS, NUM_WUMPUS, NUM_GOLD)
wumpus_world.print_world()

☻****
***g*
**p**
***w*
*p***


In [None]:
# Agente

class Agent():
    def __init__(self, width, height):

        '''
        width, height => posicion del agente
        2 => smell: 0, 1
        2 => breeze: 0, 1
        2 => bump: 0, 1
        4 => acciones: up, right, down, left
        '''

        '''
        Normalizar:
        [20, 10, 50] ==> [.2, .1, .5]
        '''

        self.Q = np.ones((width, height, 2, 2, 2, 4))
        self.normalizeQ()
        self.record = []
        self.state_action_value_minimum = 0.1

    def get_action(self, x_t, y_t, smell, breeze, bump, exploration=1.0):

        if np.random.random() < exploration:
            action = np.random.choice([0,1,2,3])
        else:
            action = np.random.choice([0,1,2,3],
                        p=self.Q[x_t][y_t][smell][breeze][bump])
            '''
            np.random.choice([0,1]):
                choose 0, p = 50%
                choose 1, p = 50%
            np.random.choice([0,1], p=[0.01, 0.99])
                choose 0, p = 1%
                choose 1, p = 99%
            '''

        self.record.append([x_t, y_t, smell, breeze, bump, action])
        return action

    def update_policy(self, is_alive, score, learning_rate=0.1):

        if is_alive and (score > 1000):
            for x, y, smell, breeze, bump, a in self.record:
                self.Q[x][y][smell][breeze][bump][a] += learning_rate

        else:
            for x, y, smell, breeze, bump, a in self.record:
                if self.Q[x][y][smell][breeze][bump][a] > self.state_action_value_minimum:
                    self.Q[x][y][smell][breeze][bump][a] -= learning_rate

        self.normalizeQ()
        self.record = []
        return

    def normalizeQ(self):
        for i in range(self.Q.shape[0]):
            for j in range(self.Q.shape[1]):
                for k in range(self.Q.shape[2]):
                    for l in range(self.Q.shape[3]):
                        for m in range(self.Q.shape[4]):
                            self.Q[i][j][k][l][m]=self.Q[i][j][k][l][m]/self.Q[i][j][k][l][m].sum()


In [None]:
from tqdm import tqdm

# LEARNING
random.seed(100)
wumpus_world = WumpusWorld(INITIAL_COORD_AGENT, HEIGHT, WIDTH, NUM_PITS, NUM_WUMPUS, NUM_GOLD)
wumpus_world.print_world()
agent = Agent(width=WIDTH, height=HEIGHT)
agent_record = []
best_Q = agent.Q
num_wins = 0

for epoch in tqdm(range(EPOCHS), total=EPOCHS):
    wumpus_world.reset_agent()

    while (wumpus_world.is_agent_alive) and (wumpus_world.glitter==False) or (wumpus_world.steps < MAX_STEPS):
        action = agent.get_action(
            x_t=wumpus_world.coord_agent[0],
            y_t=wumpus_world.coord_agent[1],
            smell=int(wumpus_world.smell),
            breeze=int(wumpus_world.breeze),
            bump=int(wumpus_world.bump),
            exploration = (-epoch + EPOCHS)/EPOCHS
        )

        wumpus_world.move_agent(action)

    if wumpus_world.glitter:
        best_Q = agent.Q

    num_wins += int(wumpus_world.glitter)

    agent.update_policy(
        is_alive=wumpus_world.is_agent_alive,
        score=wumpus_world.last_score,
        learning_rate=LEARNING_RATE)

    agent_record.append(agent.record)


☻****
**wp*
*****
p*g**
*****


100%|██████████| 5000/5000 [00:19<00:00, 253.65it/s]


In [None]:
print(f"win rate: {100*num_wins/EPOCHS}%")

win rate: 51.68%


In [None]:
# TESTING

wumpus_world.reset_agent()
agent.Q = best_Q

while wumpus_world.is_agent_alive and (wumpus_world.glitter==False):
    action = agent.get_action(
        x_t=wumpus_world.coord_agent[0],
        y_t=wumpus_world.coord_agent[1],
        smell=int(wumpus_world.smell),
        breeze=int(wumpus_world.breeze),
        bump=int(wumpus_world.bump),
        exploration = (-epoch + EPOCHS)/EPOCHS
    )
    wumpus_world.move_agent(action=action, verbose=True)
    wumpus_world.print_world()
    print()

*☻***
**wp*
*****
p*g**
*****

*☻***
**wp*
*****
p*g**
*****

☻****
**wp*
*****
p*g**
*****

*☻***
**wp*
*****
p*g**
*****

**☻**
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

****☻
**wp*
*****
p*g**
*****

****☻
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

**☻**
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

**☻**
**wp*
*****
p*g**
*****

*☻***
**wp*
*****
p*g**
*****

**☻**
**wp*
*****
p*g**
*****

***☻*
**wp*
*****
p*g**
*****

**☻**
**wp*
*****
p*g**
*****

Bye-bye agente
**☻**
**wp*
*****
p*g**
*****

