#Modeling

In [None]:
import itertools
class AutonomousVacuum:
    cleaningCost = 20 #cost of cleaning a cell
    wallCost = -5 # cost of hitting the wall
    batteryCost = -7 #cost of running out of battery
    dustStorageCost = -10 # cost of running of dust storage
    objectCost = -5 #cost  of hitting any moving objects in the room
    recharge = 20 #cost of going back to recharge the vaccume
    emptyStorage = 15 #cost of emptying the dust storage
    hitWallProb= 0.3
    hitObject=0.4

    def __init__(self, grid_size, initial_position, base_position):
        self.grid_size = grid_size
        self.position = initial_position
        self.base_position = base_position
        self.initial_position = initial_position
        self.battery = 100
        self.dust_storage = 0
        # Initialize all cells as dirty
        self.dirty_cells = {(i, j) for i in range(grid_size) for j in range(grid_size)}
        # Remove the initial position from the dirty cells if necessary
        '''if initial_position in self.dirty_cells:
            self.dirty_cells.remove(initial_position)'''


    def is_end(self):
        # Check if all cells are cleaned
        if len(self.dirty_cells) == 1:
          return True
        return False


    def states(self):
     return list(itertools.product(range(self.grid_size), range(self.grid_size)))


    def actions(self):
       # Get possible moves based on the current position
       possible_moves = []
       x,y = self.position
       if x < self.grid_size - 1:
           possible_moves.append('down')#x + 1, y

       if x > 0 :
           possible_moves.append('up')#(x - 1, y)

       if y > 0 :
           possible_moves.append('left')#(x, y - 1)

       if y < self.grid_size - 1:
           possible_moves.append('right')#(x, y + 1)

       return possible_moves

    def succReward(self, action):
     result = []  # newStates, probHitWall, probObject, reward
     x, y = self.position

     if action in self.actions():

        if self.battery >= 5 and self.dust_storage <= 10:
            newBattery = self.battery - 5
            newStorage = self.dust_storage + 10
            if action == 'down':
                newPosition = (x + 1, y)
            elif action == 'up':
                newPosition = (x - 1, y)
            elif action == 'left':
                newPosition = (x, y - 1)
            elif action == 'right':
                newPosition = (x, y + 1)

            hitWall = (newPosition[0] == 0 or newPosition[0] == self.grid_size-1 or
                       newPosition[1] == 0 or newPosition[1] == self.grid_size-1)

            if hitWall:
                newPosition = self.position  # Stay in the same position if it hits the wall
                reward = self.wallCost
            else:
                reward = self.clean(newPosition)

            # Calculate expected reward considering the probabilities of hitting wall and objects
            result.append((newPosition, newBattery, newStorage, 1 - self.hitWallProb, 1 - self.hitObject, reward))
            result.append((newPosition, newBattery, newStorage, 1 - self.hitWallProb, self.hitObject, reward + self.objectCost))
            result.append((self.position, newBattery, self.dust_storage, self.hitWallProb, 1 - self.hitObject, self.wallCost))
            result.append((self.position, newBattery, self.dust_storage, self.hitWallProb, self.hitObject, self.wallCost + self.objectCost))

        elif self.battery <= 5:  # Corrected condition here
            # If the battery is too low, the vacuum needs to return to base for recharging
            result.append((self.base_position, 100, self.dust_storage, 1, 1, self.recharge + self.batteryCost))

        elif self.dust_storage >= 10:
            # If the dust storage is full, the vacuum needs to return to base to empty the storage
            result.append((self.base_position, self.battery, 0, 1, 1, self.emptyStorage + self.dustStorageCost))

     return result



    def clean(self, position):
        if position in self.dirty_cells:
            self.dirty_cells.remove(position)
            return self.cleaningCost
        return 0



    def recharge(self):
        self.battery = 100
        return self.recharge

    def emptyStorage(self):
        self.dust_storage = 0
        return self.emptyStorage



In [None]:
ip=(3,0)
bp=(0,1)
state=(0,1)
av=AutonomousVacuum(10, ip, bp)

print(av.succReward('up'))

[((3, 0), 95, 10, 0.7, 0.6, -5), ((3, 0), 95, 10, 0.7, 0.4, -10), ((3, 0), 95, 0, 0.3, 0.6, -5), ((3, 0), 95, 0, 0.3, 0.4, -10)]


##Policy Evaluation


In [None]:
def evaluatePolicy(vacuum, policy):
    V = {}

    # Initialize
    for i in range(vacuum.grid_size):
      for j in range(vacuum.grid_size):
         V[(i, j)] = 0


    def Q(position, action):
        vacuum.position =position
        outcomes = vacuum.succReward(action)
        return sum(hitWallProb * hitObjectProb * (reward + V.get(newPosition,0))
                    for newPosition, battery, dust_storage, hitWallProb, hitObjectProb, reward in outcomes)


    while True:
        delta = 0
        newV = {}

        for i in range(vacuum.grid_size):
           for j in range(vacuum.grid_size):
              position = (i, j)
              # Create a copy of the dirty_cells set
              dirty_cells_copy = vacuum.dirty_cells.copy()
              # Check if the position is in the dirty_cells_copy set
              if position in dirty_cells_copy:
                  dirty_cells_copy.remove(position)
              vacuum.dirty_cells = dirty_cells_copy  # Set the vacuum's dirty_cells to the copy
              if vacuum.is_end():
                newV[position] = 0

              else:
                  action = policy.get(position, vacuum.actions()[0])
                  newV[position] = Q(position, action)
                  delta = max(delta, abs(newV[position] - V[position]))

        V = newV

        if delta < 1e-10:
            break

    return V

In [None]:
# Define the grid size, initial position, and base position
grid_size = 3
initial_position = (0, 0)
base_position = (0, 0)

# Instantiate the AutonomousVacuum object
vacuum = AutonomousVacuum(grid_size, initial_position, base_position)

# Define a simple policy that always tries to move 'down' if possible, otherwise 'right'
policy = {
    (0, 0): 'down',
    (0, 1): 'down',
    (0, 2): 'down',
    (1, 0): 'down',
    (1, 1): 'down',
    (1, 2): 'down',
    (2, 0): 'right',
    (2, 1): 'right',
    (2, 2): 'down',  # This is a terminating state, no action should be needed
}

# Evaluate the policy
V = evaluatePolicy(vacuum, policy)

# Print the resulting value function
for i in range(grid_size):
    for j in range(grid_size):
        print(f"V({i}, {j}) = {V[(i, j)]:.2f}")


KeyboardInterrupt: 

##Optimal Policy

In [None]:
def valueIteration(vacuum):
    V = {}  # state -> estimate of V_opt(state)
    pi = {}  # state -> estimate of pi_opt(state)

    # Initialize
    for state in vacuum.states():
        V[state] = 0
        pi[state] = None

    # Based on current estimate of V
    def Q(position, action):
        vacuum.position =position
        outcomes = vacuum.succReward(action)
        return sum(hitWallProb * hitObjectProb * (reward + V.get(newPosition,0))
                    for newPosition, battery, dust_storage, hitWallProb, hitObjectProb, reward in outcomes)

    while True:
        delta = 0
        newV = {}

        for state in vacuum.states():
            vacuum.position=state
            if vacuum.isEnd(state):
                newV[state] = 0
                pi[state] = None
            else:
                max_action_value = float('-inf')
                optimal_action = None
                for action in vacuum.actions():
                    action_value = Q(state, action)
                    if action_value > max_action_value:
                        max_action_value = action_value
                        optimal_action = action
                newV[state] = max_action_value
                pi[state] = optimal_action

                delta = max(delta, abs(newV[state] - V[state]))

        # Test for convergence
        if delta < 1e-10:
            break

        V = newV

    return V, pi


In [None]:
grid_size = 3
initial_position = (0, 0)
base_position = (0, 0)

# Instantiate the AutonomousVacuum object
vacuum = AutonomousVacuum(grid_size, initial_position, base_position)

# Run value iteration
V, pi = valueIteration(vacuum)

# Print the resulting value function and policy
for i in range(grid_size):
    for j in range(grid_size):
        position = (i, j)
        print(f"V({position}) = {V[position]:.2f}, pi({position}) = {pi[position]}")

KeyboardInterrupt: 

##Optimizing by Monte Carlo

In [None]:
import random

def generate_episode(vacuum, policy):
    episode = []
    while not vacuum.is_end():
        state = vacuum.position
        action = policy[state]
        next_states = vacuum.succReward(action)
        next_state = random.choices([outcome[0] for outcome in next_states], weights=[outcome[3]*outcome[4] for outcome in next_states])[0]
        reward = sum(outcome[3]*outcome[4]*outcome[5] for outcome in next_states)
        episode.append((state, action, reward))
        vacuum.position = next_state
    return episode

def monte_carlo_control(vacuum, episodes=1000, epsilon=0.1, discount=0.9):
    Q = {} #the expected reward for taking action in a state (action - value)
    N = {} #to count the number of times each state-action pair is visited
    policy = {} #mapping states to actions

    for i in range(episodes):
        episode = generate_episode(vacuum, policy)
        G = 0
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = discount * G + reward
            N[(state, action)] = N.get((state, action), 0) + 1
            Q[(state, action)] = Q.get((state, action), 0) + (1 / N[(state, action)]) * (G - Q.get((state, action), 0))
            best_action = max(vacuum.actions(), key=lambda a: Q.get((state, a), 0))
            policy[state] = best_action if random.random() > epsilon else random.choice(vacuum.actions())
    return policy
