In [15]:
import gym
from gym import spaces
import numpy as np

class ElevatorEnv(gym.Env):
    def __init__(self, num_floors=5, max_capacity=5):
        super(ElevatorEnv, self).__init__()

        self.num_floors = num_floors
        self.max_capacity = max_capacity

        # Actions: 0: move up, 1: move down, 2: open doors
        self.action_space = spaces.Discrete(3)

        # Observations: current floor, direction, num passengers, queue at each floor (up and down)
        self.observation_space = spaces.Tuple((
            spaces.Discrete(self.num_floors),  # current floor
            spaces.Discrete(3),  # direction: 0: stationary, 1: up, 2: down
            spaces.Discrete(self.max_capacity + 1),  # num passengers in elevator
            spaces.MultiBinary(self.num_floors * 2)  # queue at each floor (up and down)
        ))

        self.reset()

    def reset(self):
        self.current_floor = np.random.randint(0, self.num_floors)
        self.direction = 0  # stationary
        self.passengers = 0
        # Randomly add passengers to the queue
        self.queue = np.random.randint(0, 2, size=self.num_floors * 2)  # 0 or 1 passenger for each direction on each floor
        return self._get_obs()


    def _get_obs(self):
        return (self.current_floor, self.direction, self.passengers, self.queue)

    def step(self, action):
        reward = 0

        # Move elevator based on action
        if action == 0 and self.current_floor < self.num_floors - 1:  # move up
            self.current_floor += 1
            self.direction = 1
        elif action == 1 and self.current_floor > 0:  # move down
            self.current_floor -= 1
            self.direction = 2
        elif action == 2:  # open doors
            self.direction = 0
            # Handle passenger pickup/dropoff logic here
            # For simplicity, let's just clear the queue at the current floor
            # and adjust the number of passengers
            idx_up = self.current_floor * 2
            idx_down = idx_up + 1
            reward += self.queue[idx_up] + self.queue[idx_down]  # reward for picking up
            self.passengers += self.queue[idx_up] + self.queue[idx_down]
            self.queue[idx_up] = 0
            self.queue[idx_down] = 0
            # Ensure we don't exceed capacity
            if self.passengers > self.max_capacity:
                excess = self.passengers - self.max_capacity
                reward -= excess  # penalty for overloading
                self.passengers = self.max_capacity

        # Negative reward for waiting passengers
        reward -= np.sum(self.queue)
        # Penalize unnecessary movements or stops
        if action == 2 and np.sum(self.queue[self.current_floor * 2: self.current_floor * 2 + 2]) == 0:
            reward -= 2  # penalty for opening doors when no passengers are waiting


        # Check if done (for simplicity, let's not set a termination condition)
        done = False
        # Random chance to add a passenger to a random floor and direction
        if np.random.rand() < 0.00001:  # 5% chance
            random_floor = np.random.randint(0, self.num_floors)
            random_direction = np.random.randint(0, 2)  # up or down
            self.queue[random_floor * 2 + random_direction] += 1

        return self._get_obs(), reward, done, {}

    def render(self, mode='human'):
        # For simplicity, just print the current state
        floor_str = ["[ ]"] * self.num_floors
        floor_str[self.current_floor] = "[E]"
        print("Floors:", " ".join(floor_str))
        print("Queue:", self.queue)
        print("Passengers in elevator:", self.passengers)

    def close(self):
        pass


In [16]:
import numpy as np
from collections import defaultdict

class MonteCarloAgent:
  def __init__(self, env, gamma=0.9, epsilon=0.9, epsilon_decay=0.995, epsilon_min=0.1):
        # ... [existing code]
      self.epsilon_decay = epsilon_decay
      self.epsilon_min = epsilon_min
      self.env = env
      self.gamma = gamma
      self.epsilon = epsilon
      self.Q = defaultdict(lambda: np.zeros(self.env.action_space.n))
      self.returns = defaultdict(list)

  def policy(self, state):
      # Convert state to a string representation
      state_str = str(state)

      # ε-greedy policy
      probs = np.ones(self.env.action_space.n) * self.epsilon / self.env.action_space.n
      best_action = np.argmax(self.Q[state_str])
      probs[best_action] = 1.0 - self.epsilon + (self.epsilon / self.env.action_space.n)
      return np.random.choice(np.arange(self.env.action_space.n), p=probs)

  def generate_episode(self):
      episode = []
      state = self.env.reset()
      for _ in range(100):  # fixed episode length
          action = self.policy(state)
          next_state, reward, _, _ = self.env.step(action)
          episode.append((state, action, reward))
          state = next_state

      return episode


  def update(self):
      episode = self.generate_episode()
      G = 0
      visited_state_actions = set()  # To keep track of state-action pairs we've seen

      for t in reversed(range(len(episode))):
          state, action, reward = episode[t]
          state_str = str(state)
          G = self.gamma * G + reward

          # Check if the state-action pair is visited for the first time in this episode
          if not (state_str, action) in visited_state_actions:
              visited_state_actions.add((state_str, action))  # Mark this state-action as visited
              self.returns[(state_str, action)].append(G)
              self.Q[state_str][action] = np.mean(self.returns[(state_str, action)])

      # Decay ε after each episode
      self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

  def train(self, num_episodes):
      for _ in range(num_episodes):
          self.update()

  def get_best_action(self, state):
          """Return the best action for a given state."""
          state_str = str(state)  # Convert state to string representation
          return np.argmax(self.Q[state_str])

In [17]:
env = ElevatorEnv()
agent = MonteCarloAgent(env)

agent.train(50)  # train for 5000 episodes


In [20]:
agent.Q

defaultdict(<function __main__.MonteCarloAgent.__init__.<locals>.<lambda>()>,
            {'(2, 0, 0, array([0, 0, 1, 1, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(1, 2, 0, array([0, 0, 1, 1, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(1, 0, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(0, 2, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(1, 1, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(0, 0, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(2, 1, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(2, 0, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(1, 2, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(3, 1, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))': array([0., 0., 0.]),
             '(3, 0, 2, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))

In [21]:
# Define a specific state
state = (3, 0, 0, np.array([0, 0, 1, 1, 0, 0, 0, 0, 0, 0]))
'''

Elevator's Current Floor: 3
This indicates that the elevator is currently on the 3rd floor.
Elevator's Direction: 0
This value represents the direction of the elevator. In the context provided:
  0: The elevator is stationary.
  1: The elevator is moving up.
  2: The elevator is moving down.
In this case, the elevator is stationary.
Number of Passengers in the Elevator: 0
This indicates that there are currently no passengers inside the elevator.
Queue of Passengers on Each Floor: np.array([0, 1, 0, 1, 0, 0, 0, 0, 0, 0])
  This array represents the queue of passengers waiting on each floor, with two entries for each floor: one for passengers waiting to go up and one for passengers waiting to go down.
For any given floor i:
The index 2 * i represents passengers waiting to go up.
The index 2 * i + 1 represents passengers waiting to go down.
Breaking it down:
Floor 0:
0 passengers waiting to go up.
1 passenger waiting to go down.
Floor 1:
0 passengers waiting to go up.
1 passenger waiting to go down.
Floor 2 (and all subsequent floors):
0 passengers waiting in both directions.
'''
# Get the agent's best action for the state
action = agent.get_best_action(state)

print(f"For state {state}, the agent's best action is: {action}")


For state (3, 0, 0, array([0, 0, 1, 1, 0, 0, 0, 0, 0, 0])), the agent's best action is: 2


In [22]:
env = ElevatorEnv()
agent = MonteCarloAgent(env)

agent.train(5000)  # train for 5000 episodes

In [None]:
# Get the agent's best action for the state
state = (3, 0, 0, np.array([1, 0, 0, 1, 0, 1, 0, 0, 0, 0]))

action = agent.get_best_action(state)

print(f"For state {state}, the agent's best action is: {action}")


For state (3, 0, 0, array([1, 0, 0, 1, 0, 1, 0, 0, 0, 0])), the agent's best action is: 1


In [None]:
EnvironmentError = ElevatorEnv()
agent = MonteCarloAgent(env)

agent.train(5000)  # train for 5000 episodes

In [None]:
# Get the agent's best action for the state
action = agent.get_best_action(state)

print(f"For state {state}, the agent's best action is: {action}")

For state (3, 0, 0, array([0, 1, 0, 1, 0, 0, 0, 0, 0, 0])), the agent's best action is: 0
