In [1]:
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets
from random import choice, random
import seaborn as sns
import cv2

In [2]:
# Constants
GRID_SIZE = 5
ACTIONS = ['north', 'south', 'east', 'west', 'pickup', 'dropoff']
AGENT_NAMES = ['red', 'blue', 'black']
PICKUP_LOCATIONS = {(1, 5), (2, 4), (5, 2)}
DROPOFF_LOCATIONS = {(1, 1), (3, 1), (4, 5)}
BLOCKS_INITIAL = 5
CAPACITY = 5
REWARDS = {'pickup': 13, 'dropoff': 13, 'move': -1}

# Initial State Setup
def reset_environment():
    return {
        'positions': {'red': (3, 3), 'blue': (5, 3), 'black': (1, 3)},
        'blocks': {(1, 5): 5, (2, 4): 5, (5, 2): 5, (1, 1): 0, (3, 1): 0, (4, 5): 0},
        'carrying': {'red': False, 'blue': False, 'black': False}
    }

# Initialize state
state = reset_environment()

# Initialize Q-table
def initialize_q_table():
    q_table = {}
    for x in range(1, GRID_SIZE + 1):
        for y in range(1, GRID_SIZE + 1):
            for carrying in [False, True]:
                q_table[((x, y), carrying)] = {a: 0.0 for a in ACTIONS}
    return q_table

q_table = initialize_q_table()
alpha = 0.3
gamma = 0.5


In [3]:
def apply_action(agent, action, state):
    x, y = state['positions'][agent]
    if action == 'north' and y > 1:
        y -= 1
    elif action == 'south' and y < GRID_SIZE:
        y += 1
    elif action == 'east' and x < GRID_SIZE:
        x += 1
    elif action == 'west' and x > 1:
        x -= 1
    elif action == 'pickup' and (x, y) in PICKUP_LOCATIONS and not state['carrying'][agent] and state['blocks'][(x, y)] > 0:
        state['carrying'][agent] = True
        state['blocks'][(x, y)] -= 1
    elif action == 'dropoff' and (x, y) in DROPOFF_LOCATIONS and state['carrying'][agent] and state['blocks'][(x, y)] < CAPACITY:
        state['carrying'][agent] = False
        state['blocks'][(x, y)] += 1
    state['positions'][agent] = (x, y)

def select_action(q_table, state, agent, policy):
    current_pos = state['positions'][agent]
    carrying = state['carrying'][agent]
    if policy == 'PRandom':
        return choice(list(ACTIONS))
    else:
        max_action = max(q_table[(current_pos, carrying)], key=q_table[(current_pos, carrying)].get)
        if policy == 'PGreedy':
            return max_action
        elif policy == 'PExploit' and random() < 0.8:
            return max_action
        else:
            return choice(list(ACTIONS))

def update_q_table(q_table, state, action, reward, new_state, agent):
    old_pos = state['positions'][agent]
    new_pos = new_state['positions'][agent]
    old_carrying = state['carrying'][agent]
    new_carrying = new_state['carrying'][agent]
    old_q_value = q_table[(old_pos, old_carrying)][action]
    future_q = max(q_table[(new_pos, new_carrying)].values())
    q_table[(old_pos, old_carrying)][action] = old_q_value + alpha * (reward + gamma * future_q - old_q_value)

def compute_reward(state, action, new_state, agent):
    if action == 'pickup' or action == 'dropoff':
        return REWARDS[action]
    else:
        return REWARDS['move']


In [4]:
def plot_q_values(q_table, position, carrying):
    actions = ACTIONS
    values = [q_table[(position, carrying)][action] for action in actions]
    fig, ax = plt.subplots(figsize=(8, 3))
    sns.barplot(x=actions, y=values, ax=ax)
    ax.set_title(f'Q-values at Position {position} Carrying: {"Yes" if carrying else "No"}')
    ax.set_ylabel('Q-value')
    plt.show()



In [5]:


def create_grid_image(state, cell_size=50):
    # Define the size of the image
    img_size = GRID_SIZE * cell_size
    # Create a white image
    grid_img = np.full((img_size, img_size, 3), 255, np.uint8)

    # Draw the grid lines
    for i in range(GRID_SIZE + 1):
        cv2.line(grid_img, (0, i * cell_size), (img_size, i * cell_size), (0, 0, 0), 1)
        cv2.line(grid_img, (i * cell_size, 0), (i * cell_size, img_size), (0, 0, 0), 1)

    # Add pickup and dropoff locations
    for loc in PICKUP_LOCATIONS:
        cv2.putText(grid_img, 'P', ((loc[0] - 1) * cell_size + 15, loc[1] * cell_size - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    for loc in DROPOFF_LOCATIONS:
        cv2.putText(grid_img, 'D', ((loc[0] - 1) * cell_size + 15, loc[1] * cell_size - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    # Draw agents
    for idx, agent in enumerate(AGENT_NAMES):
        pos = state['positions'][agent]
        cv2.circle(grid_img, ((pos[0] - 1) * cell_size + cell_size // 2, (pos[1] - 1) * cell_size + cell_size // 2), cell_size // 4, (255, 0, 0), -1)

    return grid_img

def run_simulation_with_opencv(steps, policy):
    state = reset_environment()
    for step in range(steps):
        if step % 10 == 0:
            img = create_grid_image(state)
            cv2.imshow('Grid World', img)
            cv2.waitKey(100)  # Wait for 100 ms before next update

        for agent in AGENT_NAMES:
            action = select_action(q_table, state, agent, policy)
            apply_action(agent, action, state)
            reward = compute_reward(state, action, state, agent)
            update_q_table(q_table, state, action, reward, state, agent)

        if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
            break

    cv2.destroyAllWindows()

# To run the simulation


In [6]:
run_simulation_with_opencv(1000, 'PRandom')

