In [2]:
from dataclasses import dataclass, field
from typing import List, Tuple, List, Dict
from collections import defaultdict
import numpy as np
import random
import pandas as pd

In [3]:
Coordinate = Tuple[int, int]

@dataclass
class PDWorldEnvironment:
    grid_size: int = 5
    pickup_locations: Dict[Coordinate, int] = field(default_factory=lambda: {(3,5): 10, (4,10): 10})
    dropoff_locations_capacity: Dict[Coordinate, int] = field(default_factory=lambda: {(1,1): 5, (2,5): 5, (3,3): 5, (5,5): 5})
    
    female_agent_start: Coordinate = (1, 3)
    male_agent_start: Coordinate = (5, 3)
    
    
    # These are the reward parameters
    step_cost: float = -1.0
    pickup_reward: float = 1.0
    dropoff_reward: float = 10.0
    invalid_penalty: float = -5.0
    terminal_bonus: float = 20.0
    
    
    use_counts_in_state: bool = False
    
    
    # this is the internal state
    f_agent_pos: Coordinate = field(init=False)
    m_agent_pos: Coordinate = field(init=False)
    carryingF: int = field(init=False, default=0)
    carryingM: int = field(init=False, default=0)
    pickups: Dict[Coordinate, int] = field(init=False)
    dropoffs: Dict[Coordinate, int] = field(init=False)
    turn: str = field(init=False) # Its either 'F' or 'M' turn
    episode_steps: int = field(init=False, default=0)
    
    
    actions = ['N', 'S', 'E', 'W', 'P', 'D']  # North, South, East, West, Pickup, Dropoff
    
    def reset(self):
        self.f_agent_pos = self.female_agent_start
        self.m_agent_pos = self.male_agent_start
        self.carryingF = 0
        self.carryingM = 0
        self.pickups = dict(self.pickup_locations)
        self.dropoffs = dict(self.dropoff_locations_capacity)
        self.turn = 'F'
        self.episode_steps = 0
        
        return self.get_state()
    
    def is_terminal(self) -> bool:
        return all(self.dropoffs[pos] >= cap for pos, cap in self.dropoff_locations_capacity.items())
    
    def total_remaining(self) -> int:
        total_capacity = sum(self.dropoff_locations_capacity.values())
        total_dropped = sum(self.dropoffs.values())
        return max(0, total_capacity - total_dropped)
    
    def bucket_remaining(self) -> int: # this function returns how many buckets are remaining to be dropped off
        remaining = self.total_remaining()
        return min(4, (remaining + 4) // 5)
    
    def get_state(self):
        if self.use_counts_in_state:
            pick = tuple(sorted(self.pickups.items()))
            drop = tuple(sorted(self.dropoffs.items()))
        else:
            pick = ()
            drop = (self.bucket_remaining(),)
            
        return (self.f_agent_pos, self.m_agent_pos, self.carryingF, self.carryingM, pick, drop, self.turn)
    
    def other_agent(self, who: str) -> str:
        return 'M' if who == 'F' else 'F'
    
    def move(self, pos: Coordinate, action: str) -> Coordinate:
        r, c = pos
        
        if action == 'N' and r > 1: r -= 1
        elif action == 'S' and r < self.grid_size: r += 1
        elif action == 'E' and c < self.grid_size: c += 1
        elif action == 'W' and c > 1: c -= 1
        
        return (r, c)
    
    def step(self, action: str):
        who = self.turn
        other = self.other_agent(who)
        reward = self.step_cost
        done = False
        invalid = False
        
        agent_pos = self.f_agent_pos if who == 'F' else self.m_agent_pos # get current agent position
        other_pos = self.m_agent_pos if who == 'F' else self.f_agent_pos # get other agent position
        carrying = self.carryingF if who == 'F' else self.carryingM
        
        # here we perform the action
        if action in ['N', 'S', 'E', 'W']:
            new_pos = self.move(agent_pos, action)
            if new_pos == other_pos:
                invalid = True
            else:
                agent_pos = new_pos
        elif action == 'P':
            if agent_pos in self.pickups and self.pickups[agent_pos] > 0 and carrying == 0:
                self.pickups[agent_pos] -= 1
                carrying = 1
                reward += self.pickup_reward
            else:
                invalid = True
        elif action == 'D':
            if agent_pos in self.dropoff_locations_capacity and carrying == 1:
                if self.dropoffs[agent_pos] < self.dropoff_locations_capacity[agent_pos]:
                    self.dropoffs[agent_pos] += 1
                    carrying = 0
                    reward += self.dropoff_reward
                else:
                    invalid = True
            else:
                invalid = True
        else:
            invalid = True
            
        if invalid:
            reward += self.invalid_penalty
        if who == 'F':
            self.f_agent_pos = agent_pos
            self.carryingF = carrying
        else:
            self.m_agent_pos = agent_pos
            self.carryingM = carrying
            
        self.episode_steps += 1
        
        if self.is_terminal():
            reward += self.terminal_bonus
            done = True
            
        self.turn = other
        return self.get_state(), reward, done
    
    
    def applicable_ops(self, who: str) -> List[str]: # this function returns list of applicable operations for the given agent
        agent_pos = self.f_agent_pos if who == 'F' else self.m_agent_pos
        carrying = self.carryingF if who == 'F' else self.carryingM
        
        ops = []
        if agent_pos in self.pickups and self.pickups[agent_pos] > 0 and carrying == 0:
            ops.append('P')
        if agent_pos in self.dropoff_locations_capacity and carrying == 1 and self.dropoffs[agent_pos] < self.dropoff_locations_capacity[agent_pos]:
            ops.append('D')
        
        if not ops:
            ops = ['N', 'S', 'E', 'W']
            
        return ops        
    
    def manhattan_distance(self) -> int:
        r1, c1 = self.f_agent_pos
        r2, c2 = self.m_agent_pos
        return abs(r1 - r2) + abs(c1 - c2)
        

In [None]:
class TabularQ:
    def __init__(self, actions: List[str]):
        self.actions = actions
        self.Q = defaultdict(lambda: np.zeros(len(actions), dtype=float))
        
    def best_action(self, state) -> int:
        q = self.Q[state]
        max_val = np.max(q)
        idxs = np.flatnonzero(q == max_val)
        return random.choice(idxs)
    
    def update_q(self, state, action_idx, target, alpha):
        q = self.Q[state]
        q[action_idx] += (1 - alpha) * q[action_idx] + alpha * target

In [None]:
@dataclass
class AgentConfig: # configuration for the agent
    alpha: float
    gamma: float
    policy: str # This would be 'PRANDOM', 'PGREEDY', or 'PEXploit'
    epsilon: float = 0.2

In [None]:
class IndependentLearner: # We choose an action for our agent based on a policy
    pass