# Advanced Simulation

## Imports
Importing required libraries (also applying future annotations for Python 3.8).

In [21]:
from __future__ import annotations
from typing import Any, Optional
from re import findall, sub

import numpy as np
import numpy.typing as npt
import pandas as pd

from tqdm import tqdm

## Set constants
Set ```CURRENT_TIME``` to detect the current time, set ```FLOORS``` to set the number of floors in the building.

In [22]:
# Set current time
CURRENT_TIME: float = 0.0


# Set constants
FLOORS = 6
DWELL_TIME = 10.0
MAX_RUNTIME = 86400
ELEVATOR_TRAVEL_TIME = 8.0
MAX_ELEVATOR_CAPACITY = 10

## Create State
Create a state class that keeps track of the current state. The state contains the current direction of the elevator (```False``` for down, ```True``` for up), the current floor, the state of the door (```False``` for closed, ```True``` for open), a matrix of people who are waiting and a vector that keeps track of people waiting in the elevator.

In [23]:
class State:
    direction: bool
    current_floor: int
    door: bool
    waiting: np.ndarray
    elevator: np.ndarray
    rest: bool # Nieuwe om de lift niets te laten doen als er niemand is

    def __init__(self):
        self.direction = False # True: up, False: down
        self.current_floor = 5
        self.door = False # True: open, False: closed
        self.waiting = np.zeros((FLOORS, FLOORS), np.int_)
        self.elevator = np.zeros(FLOORS, np.int_)
        self.rest = True

    def copy(self) -> State:
        new_state = State()
        new_state.direction = self.direction
        new_state.current_floor = self.current_floor
        new_state.door = self.door
        new_state.waiting = self.waiting.copy()
        new_state.elevator = self.elevator.copy()
        new_state.rest = self.rest
        return new_state

    def __str__(self) -> str:
        return f"\n{{ direction: {self.direction}, current_floor: {self.current_floor}, door: {self.door}, waiting: {self.waiting}, elevator: {self.elevator}, rest: {self.rest} }}"

    def __repr__(self) -> str:
        return self.__str__()

example_state = State()

## Get active events
This function returns a set of events that are currently active. It always returns the arrival events, if the door is open it returns the close door event, if it is not it returns the elevator arrival event. (Ik heb besloten om door open weg te laten, want anders kreeg ik een rare bug dat de deur soms random open ging).

In [24]:
# Get set of active events, FIXME events could probably be changed in some sort of class
def get_events(state: State) -> set[str]:
    events = set()

    for i in range(FLOORS):
        for j in range(FLOORS):
            if i == j:
                continue
            events.add(f"Arrival{i},{j}")

    # FIXME ik  moet nog wat bedenken voor DoorOpen, misschien laat ik die wel gewoon weg
    if state.door:
        events.add("DoorClose")
    elif not state.rest: # If rest state is off then continue to next floor
        events.add(f"ElevatorArrival{state.current_floor + (2*state.direction - 1)}") 
    
    return events

get_events(example_state)

{'Arrival0,1',
 'Arrival0,2',
 'Arrival0,3',
 'Arrival0,4',
 'Arrival0,5',
 'Arrival1,0',
 'Arrival1,2',
 'Arrival1,3',
 'Arrival1,4',
 'Arrival1,5',
 'Arrival2,0',
 'Arrival2,1',
 'Arrival2,3',
 'Arrival2,4',
 'Arrival2,5',
 'Arrival3,0',
 'Arrival3,1',
 'Arrival3,2',
 'Arrival3,4',
 'Arrival3,5',
 'Arrival4,0',
 'Arrival4,1',
 'Arrival4,2',
 'Arrival4,3',
 'Arrival4,5',
 'Arrival5,0',
 'Arrival5,1',
 'Arrival5,2',
 'Arrival5,3',
 'Arrival5,4'}

## Transition function
The ```new_state``` function returns the new state in case an event happens. It has two parameters, the ```old_state``` and the ```event```. In case of an unknown event it raises an ```Exception```.

In [25]:
def new_state(old_state: State, event: str):
    if "Elevator" in event:
        return parse_elevator_arrival(old_state, event)
    elif "Arrival" in event:
        return parse_arrival(old_state, event)
    elif "DoorClose" in event:
        return parse_door_close(old_state, event)

    raise Exception("Unknown event")

# Called in case of an arrival
def parse_arrival(old_state: State, event: str):
    state = old_state.copy()

    # Get event arrival and target floor
    current_floor, target_floor = sub(r"[a-zA-Z]*", r"", event).split(",")
    current_floor = int(current_floor)
    target_floor = int(target_floor)

    # If elevator open and on current floor enter directly, however only if the elevator is not full
    if current_floor == state.current_floor and state.door and state.elevator.sum() < MAX_ELEVATOR_CAPACITY:
        state.elevator[target_floor] += 1

    # Else enter waiting
    else:
        state.waiting[current_floor, target_floor] += 1

    # If elevator in rest mode, set in right direction and remove rest mode FIXME not in model
    if state.rest:
        state.direction = current_floor > state.current_floor
        state.rest = False

        # If elevator in rest on current floor and door is closed, then open door and enter elevator
        if state.current_floor == current_floor and not state.door:
            state.door = True
            state.waiting[current_floor, target_floor] -= 1
            state.elevator[target_floor] += 1

    return state

def should_continue_in_direction(waiting: np.ndarray, elevator: np.ndarray, current_floor: int, direction: bool) -> bool:
    if direction is True:
        # people in direction = people waiting in floors in current direction + people in elevator wanting to go in current direction
        people_in_direction = waiting[current_floor:,:].sum() + elevator[current_floor:].sum()
    else:
        people_in_direction = waiting[:current_floor,:].sum() + elevator[:current_floor].sum()
    
    return people_in_direction > 0 # Returns true if there are people needing to go to current direction

def update_elevator(elevator: np.ndarray, waiting: np.ndarray, arrival_floor: int):
    if elevator.sum() + waiting[arrival_floor, :].sum() <= MAX_ELEVATOR_CAPACITY: # If elevator does not have 10 people use fast method
        elevator += waiting[arrival_floor, :] # Everyone at this level enters elevator
        waiting[arrival_floor,:] = 0 # Everyone who has entered the elevator is not waiting anymore

    else: # If elevator would become full, use random order (since we do not know arrival time), this seems very slow
        while elevator.sum() < 10:
            # Get person that wants to go to random floor
            random_floor = np.random.randint(0, FLOORS)
            
            # If randomly generated number has no waiting, then run again
            if waiting[arrival_floor, random_floor] == 0:
                continue
            
            # This person leaves the waiting line, and enters the elevator
            waiting[arrival_floor, random_floor] -= 1
            elevator[random_floor] += 1

# Called in case of elevator arrival
def parse_elevator_arrival(old_state: State, event: str):
    state = old_state.copy()
    arrival_floor = int(findall(r"[\d+]", event)[0])

    state.current_floor = arrival_floor
    
    # If calls open door, or if people wanting to leave
    if state.waiting[arrival_floor,:].sum() + state.elevator[arrival_floor] > 0:
        state.door = True # Open door
        state.elevator[arrival_floor] = 0 # Everyone who has to be at this level leaves
        update_elevator(state.elevator, state.waiting, arrival_floor)

    # After arrival we need to set new direction (i.e. if we do not want to go up any more we go down etc.)
    
    # If at either limit, change direction
    if state.current_floor in {0, FLOORS - 1}:
        state.direction = not state.direction

    # If no more calls in this direction change direction
    elif not should_continue_in_direction(state.waiting, state.elevator, state.current_floor, state.direction):
        state.direction = not state.direction

    # If no more people enter rest mode
    if state.waiting.sum() + state.elevator.sum() == 0:
        state.rest = True

    return state

# Called in case of door close
def parse_door_close(old_state: State, event: str):
    state = old_state.copy()

    state.door = False

    # If no more calls in this direction change direction
    if not should_continue_in_direction(state.waiting, state.elevator, state.current_floor, state.direction):
        state.direction = not state.direction
    
    return state

## Simulate events
This function simulates the events that are currently active, these are stored in a Pandas DataFrame (i.e. some sort of Python table). All new events are generated, all old events are copied to the new row, then finally we return the table with all the currently scheduled events.

In [26]:
def simulate_new_events(new_events: set[str], old_events: Optional[set[str]] = None, simulation_times: Optional[pd.DataFrame] = None):
    # If we haven't yet created a schedule table, create one
    if simulation_times is None:
        simulation_times =  pd.DataFrame({}, index=[CURRENT_TIME])
    
    # Copy the table s.t. we do not accedentially overwrite the current one
    # simulation_times = simulation_times.copy() FIXME misschien niet nodig, ik heb geen idee, maar als ik het weghaal dan is het 20% sneller

    # Simulate all new events, add them to the schedule table
    for event in new_events:
        simulation_times.loc[CURRENT_TIME, event] = CURRENT_TIME + simulate_event(event)

    # If there are old events copy them to the new row
    if old_events:
        simulation_times.loc[CURRENT_TIME, list(old_events)] = simulation_times.iloc[-2][list(old_events)]

    return simulation_times


# This function ensures that we generate the right data for the right event
def simulate_event(event: str) -> float:
    if "Elevator" in event:
        return simulate_elevator_arrival(event)
    elif "Arrival" in event:
        return simulate_arrival(event)
    elif "Door" in event:
        return simulate_door_close(event)

    raise Exception("Unrecognized event")

# The three functions below simulate the values
def simulate_arrival(event: str) -> float:
    # Voor nu simuleren we voor alles gewoon een arrival time van gemiddeld 5 seconden, dat slaat nog nergens op maar dan kon ik ff snel testen hoe snel het allemaal was
    return np.random.exponential(1200)

def simulate_elevator_arrival(event: str) -> float:
    # For now we're just returning the travel time
    return ELEVATOR_TRAVEL_TIME

def simulate_door_close(event: str) -> float:
    # For now we're just returning the dwell time
    return DWELL_TIME

## Total simulation
This function runs the total simulation for the set dwell time.

In [27]:
def simulate_elevator(dwell_time: float):
    global CURRENT_TIME, DWELL_TIME
    CURRENT_TIME = 0.0
    DWELL_TIME = dwell_time
    state = State()
    # state_history = [state]
    active_events = get_events(state)
    scheduled_events = simulate_new_events(active_events)

    while CURRENT_TIME < MAX_RUNTIME: # Run for 60 seconds
        try:
            next_event = scheduled_events.loc[CURRENT_TIME].idxmin() #type:ignore # Get next event
            CURRENT_TIME = scheduled_events.loc[CURRENT_TIME, next_event] #type:ignore # Get new time
            state = new_state(state, next_event) # Get the new state
            # state_history.append(state)
            new_active_events = get_events(state) # Get all events that are now active
            old_events = (active_events - {next_event}).intersection(new_active_events) # Get all events that were already active
            new_events = new_active_events - (active_events - {next_event}) # Get all new events
            scheduled_events = simulate_new_events(new_events, old_events, scheduled_events) # Add all new events to table
            active_events = new_active_events # Set active events are new active events
        

        # If there is an error show all the data s.t. we can see what happened
        except Exception as err:
            print(f"{next_event=}")
            print(f"{CURRENT_TIME=}")
            print(f"{state=}")
            # print(f"{state_history=}")
            print(f"{new_active_events=}")
            print(f"{old_events=}")
            print(f"{new_events=}")
            display(scheduled_events)
            raise err

        # print(f"Next event: {next_event}, at: {CURRENT_TIME}")
        # print(f"New state: {state}\n")

In [28]:
import cProfile
import pstats

profile = cProfile.Profile()
profile.runcall(simulate_elevator, 10)
ps = pstats.Stats(profile)
ps.print_stats()

         36712348 function calls (36456693 primitive calls) in 37.128 seconds

   Random listing order was used

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   121979    0.019    0.000    0.019    0.000 {method 'append' of 'list' objects}
    11012    0.002    0.000    0.002    0.000 {method 'get' of 'dict' objects}
    22025    0.007    0.000    0.007    0.000 {method 'keys' of 'dict' objects}
    11012    0.003    0.000    0.003    0.000 {method 'values' of 'dict' objects}
    33277    0.031    0.000    0.031    0.000 {method 'clear' of 'dict' objects}
   341124    0.052    0.000    0.052    0.000 {method 'add' of 'set' objects}
    11012    0.029    0.000    0.029    0.000 {method 'intersection' of 'set' objects}
   110442    0.038    0.000    0.038    0.000 {built-in method __new__ of type object at 0x106d32808}
    24214    0.017    0.000    0.017    0.000 {method 'split' of 'str' objects}
        8    0.000    0.000    0.000    0.000 {method 'upper' of

<pstats.Stats at 0x7f8d69751640>