In [39]:
import random
import numpy as np
import enum
import time
from termcolor import colored

In [150]:
class LiftsQueueCommand(enum.Enum):
    UP = 'up'
    DOWN = 'down'
    END1 = 'end1'
    END2 = 'end2'
    
class LiftsPassState(enum.Enum):
    NONE = 0
    SELECTED1 = 1
    SELECTED2 = 2

In [160]:
# пример для тестирования
floors = 8
commands = {
    (2, "up"): [(5, "end1")],
    (3, "up"): [(4, "end1")],
    (2, "down"): [(1, "end2")],
    (1, "up"): [(3, "end1",), (2, "end1")]
}
#логика: лифт-1: 1, 2, 3, 4, 5
#        лифт-2: 2, 1

GLOBAL_QUEUE = [(command[0], LiftsQueueCommand(command[1])) for command in commands.keys()] + [None]

def global_queue_add_end_commands(command_key):
    global GLOBAL_QUEUE
    command_key = (command_key[0], command_key[1].value)
    if command_key in commands.keys():
        for command in commands[command_key][::-1]:
            add_command = (command[0], LiftsQueueCommand(command[1]))
            if command not in GLOBAL_QUEUE:
                GLOBAL_QUEUE.insert(0, add_command)

In [161]:
class LiftsQueueHandler:

    def __init__(self, floors: int):
        self.floors = floors
        self.states_dca = {}
        self.state = None
        self.commands_queue = None
        self.queue_iterator = None
        self.queue_selector = None
        self.last_popped_command = None

    def set_global_queue(self, global_queue):
        self.commands_queue = global_queue

    def run(self, start_state: tuple):
        self.state = (*start_state, LiftsPassState.NONE, 0)
        for command_idx in range(len(self.commands_queue)):
            self.queue_iterator = command_idx
            item = self.states_dca[self.state][self.commands_queue[command_idx]]
            self.state = item["state"]
            item["select_action"]()
        self.last_popped_command = self.commands_queue.pop(self.queue_selector)
        return self.state
        #

    def select_current_item(self):
        self.queue_selector = self.queue_iterator

    def generate_easy_dca(self):
        for floor1 in range(1, self.floors + 1):
            for floor2 in range(1, self.floors + 1):
                for pass_state in LiftsPassState:
                    for selected_floor in range(0, self.floors + 1):
                        state_dict = self.states_dca[floor1, floor2, pass_state, selected_floor] = {}
                        state_dict[None] = {
                            'state': (floor1, floor2, pass_state, selected_floor),
                            'select_action': lambda: None
                        }

                        for command_floor in range(1, self.floors + 1):

                            # Если лифт не выбран
                            if pass_state == LiftsPassState.NONE:
                                state_dict[command_floor, LiftsQueueCommand.END1] = {
                                    'state': (floor1, floor2, LiftsPassState.SELECTED1, command_floor),
                                    'select_action': self.select_current_item
                                }
                                state_dict[command_floor, LiftsQueueCommand.END2] = {
                                    'state': (floor1, floor2, LiftsPassState.SELECTED2, command_floor),
                                    'select_action': self.select_current_item
                                }
                                if abs(floor1 - command_floor) <= abs(floor2 - command_floor):
                                    for commands_action in (LiftsQueueCommand.UP, LiftsQueueCommand.DOWN):
                                        state_dict[command_floor, commands_action] = {
                                            'state': (floor1, floor2, LiftsPassState.SELECTED1, command_floor),
                                            'select_action': self.select_current_item
                                        }
                                else:
                                    for commands_action in (LiftsQueueCommand.UP, LiftsQueueCommand.DOWN):
                                        state_dict[command_floor, commands_action] = {
                                            'state': (floor1, floor2, LiftsPassState.SELECTED2, command_floor),
                                            'select_action': self.select_current_item
                                        }

                            else:
                                used_floor = floor1 if pass_state == LiftsPassState.SELECTED1 else floor2

                                # при движении вверх; если этаж команды лежит по пути
                                if used_floor <= command_floor < selected_floor:
                                    if command_floor != self.floors:
                                        state_dict[command_floor, LiftsQueueCommand.UP] = {
                                            'state': (floor1, floor2, pass_state, command_floor),
                                            'select_action': self.select_current_item
                                        }
                                    if command_floor != 1:
                                        state_dict[command_floor, LiftsQueueCommand.DOWN] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                        }
                                    state_dict[command_floor, LiftsQueueCommand.END1] = {
                                            'state': (floor1, floor2, pass_state, command_floor if pass_state == LiftsPassState.SELECTED1 else selected_floor),
                                            'select_action': self.select_current_item
                                    }
                                    state_dict[command_floor, LiftsQueueCommand.END2] = {
                                            'state': (floor1, floor2, pass_state, command_floor if pass_state == LiftsPassState.SELECTED2 else selected_floor),
                                            'select_action': self.select_current_item
                                    }
                                # при движении вниз; если этаж команды лежит по пути
                                elif floor1 >= command_floor > selected_floor:
                                    if command_floor != self.floors:
                                        state_dict[command_floor, LiftsQueueCommand.UP] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                        }
                                    if command_floor != 1:
                                        state_dict[command_floor, LiftsQueueCommand.DOWN] = {
                                            'state': (floor1, floor2, pass_state, command_floor),
                                            'select_action': self.select_current_item
                                        }
                                    state_dict[command_floor, LiftsQueueCommand.END1] = {
                                            'state': (floor1, floor2, pass_state, command_floor if pass_state == LiftsPassState.SELECTED1 else selected_floor),
                                            'select_action': self.select_current_item
                                    }
                                    state_dict[command_floor, LiftsQueueCommand.END2] = {
                                            'state': (floor1, floor2, pass_state, command_floor if pass_state == LiftsPassState.SELECTED2 else selected_floor),
                                            'select_action': self.select_current_item
                                    }

                                else:
                                    if command_floor != self.floors:
                                        state_dict[command_floor, LiftsQueueCommand.UP] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                        }
                                    if command_floor != 1:
                                        state_dict[command_floor, LiftsQueueCommand.DOWN] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                        }
                                    state_dict[command_floor, LiftsQueueCommand.END1] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                    }   
                                    state_dict[command_floor, LiftsQueueCommand.END2] = {
                                            'state': (floor1, floor2, pass_state, selected_floor),
                                            'select_action': lambda: None
                                    }                              

                                

In [153]:
class LiftActions(enum.Enum):
    START = 0
    UP1 = 1
    UP2 = 2
    DOWN1 = 3
    DOWN2 = 4
    OPEN1 = 5
    OPEN2 = 6
    CLOSE1 = 7
    CLOSE2 = 8

    UPDATE_COMMANDS = 9
    NEXT_COMMAND = 10


In [169]:
class TwoLifts:
    def __init__(self, floors: int, start_state: tuple, global_queue: list = [], print_movments: bool = True):
        self.floors = floors
        self.states_dca = {}
        self.state = (*start_state, LiftsPassState.NONE)
        self.current_command = None
        self.lifts_queue_handler = LiftsQueueHandler(self.floors)
        self.lifts_queue_handler.set_global_queue(global_queue)
        
        self.print_movments = print_movments
        self.move_count = 0

        self.actions = {
            LiftActions.UP1: lambda: (self.plus_move_count(), print(f"Лифты: [{self.state[0]}, {self.state[1]}]->[{self.state[0] + 1}, {self.state[1]}]") if self.print_movments else None),
            LiftActions.DOWN1: lambda: (self.plus_move_count(), print(f"Лифты: [{self.state[0]}, {self.state[1]}]->[{self.state[0] - 1}, {self.state[1]}]") if self.print_movments else None),
            LiftActions.OPEN1: lambda: (print("Лифт 1: открыт") if self.print_movments else None),
            LiftActions.CLOSE1: lambda: (print("Лифт 1: закрыт") if self.print_movments else None),
            
            LiftActions.UP2: lambda: (self.plus_move_count(), print(f"Лифты: [{self.state[0]}, {self.state[1]}]->[{self.state[0]}, {self.state[1] + 1}]") if self.print_movments else None),
            LiftActions.DOWN2: lambda: (self.plus_move_count(), print(f"Лифты: [{self.state[0]}, {self.state[1]}]->[{self.state[0]}, {self.state[1] - 1}]") if self.print_movments else None),
            LiftActions.OPEN2: lambda: (print("Лифт 2: открыт") if self.print_movments else None),
            LiftActions.CLOSE2: lambda: (print("Лифт 2: закрыт") if self.print_movments else None),

            LiftActions.UPDATE_COMMANDS: lambda: global_queue_add_end_commands(self.lifts_queue_handler.last_popped_command),
            LiftActions.NEXT_COMMAND: lambda: self.set_current_command(self.lifts_queue_handler.run((self.state[0], self.state[1])))
        }
    def plus_move_count(self):
        self.move_count += 1

    def set_current_command(self, new_command):
        self.current_command = new_command

    def run(self):
        
        self.current_command = self.lifts_queue_handler.run((self.state[0], self.state[1]))
        while self.current_command[3]:
            next_item = self.states_dca[self.state][self.current_command[2], self.current_command[3]]

            next_item["action"]()
            self.state = next_item["state"]
            
    def exit(self):
        return 


    def generate_dca(self):
        self.lifts_queue_handler.generate_easy_dca()

        for floor1 in range(1, self.floors + 1):
            for floor2 in range(1, self.floors + 1):
                for pass_state in LiftsPassState:
                    self.states_dca[floor1, floor2, pass_state] = {}

                    self.states_dca[floor1, floor2, pass_state][LiftsPassState.NONE, 0] = {
                        "state": self.state,
                        "action": self.exit()
                    }
                    for next_floor in range(1, self.floors + 1):
                        #NONE
                        self.states_dca[floor1, floor2, pass_state][LiftsPassState.NONE, next_floor] = {
                            "state": None,
                            "action": lambda: None
                        }
                        #SELECTED1
                        if floor1 < next_floor:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED1, next_floor] = {
                                "state": (floor1 + 1, floor2, LiftsPassState.SELECTED1),
                                "action": self.actions[LiftActions.UP1]
                            }
                        elif floor1 > next_floor:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED1, next_floor] = {
                                "state": (floor1 - 1, floor2, LiftsPassState.SELECTED1),
                                "action": self.actions[LiftActions.DOWN1]
                            }
                        else:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED1, next_floor] = {
                                "state": (floor1, floor2, LiftsPassState.NONE),
                                "action": lambda: (self.actions[LiftActions.OPEN1](), self.actions[LiftActions.NEXT_COMMAND](), self.actions[LiftActions.UPDATE_COMMANDS](), self.actions[LiftActions.CLOSE1]())
                            }
                        #SELECTED2
                        if floor2 < next_floor:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED2, next_floor] = {
                                "state": (floor1, floor2 + 1, LiftsPassState.SELECTED2),
                                "action": self.actions[LiftActions.UP2]
                            }
                        elif floor2 > next_floor:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED2, next_floor] = {
                                "state": (floor1, floor2 - 1, LiftsPassState.SELECTED2),
                                "action": self.actions[LiftActions.DOWN2]
                            }
                        else:
                            self.states_dca[floor1, floor2, pass_state][LiftsPassState.SELECTED2, next_floor] = {
                                "state": (floor1, floor2, LiftsPassState.NONE),
                                "action": lambda: (self.actions[LiftActions.OPEN2](), self.actions[LiftActions.NEXT_COMMAND](), self.actions[LiftActions.UPDATE_COMMANDS](), self.actions[LiftActions.CLOSE2]())
                            }



In [170]:
floors = 8
commands = {
    (2, "up"): [(5, "end1")],
    (3, "up"): [(4, "end1")],
    (2, "down"): [(1, "end2")],
    (1, "up"): [(3, "end1",), (2, "end1")]
}
# 1: 1 2 3 4 5
# 2: 2 1
GLOBAL_QUEUE = [(command[0], LiftsQueueCommand(command[1])) for command in commands.keys()] + [None]

def global_queue_add_end_commands(command_key):
    global GLOBAL_QUEUE
    if command_key:
        command_key = (command_key[0], command_key[1].value)
        if command_key in commands.keys():
            for command in commands[command_key][::-1]:
                add_command = (command[0], LiftsQueueCommand(command[1]))
                if command not in GLOBAL_QUEUE:
                    GLOBAL_QUEUE.insert(0, add_command)

In [141]:
liftsQH = LiftsQueueHandler(floors)
liftsQH.generate_easy_dca()
liftsQH.set_global_queue(GLOBAL_QUEUE)

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((1, 1))[2:])
global_queue_add_end_commands((1, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((1, 1))[2:])
# global_queue_add_end_commands((1, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((2, 1))[2:])
global_queue_add_end_commands((2, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((2, 1))[2:])
# global_queue_add_end_commands((2, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((3, 1))[2:])
global_queue_add_end_commands((3, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((3, 1))[2:])
# global_queue_add_end_commands((3, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((4, 1))[2:])
# global_queue_add_end_commands((2, "up"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((5, 1))[2:])
global_queue_add_end_commands((2, "down"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((5, 2))[2:])
# global_queue_add_end_commands((2, "down"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print(liftsQH.run((5, 2))[1:])
# global_queue_add_end_commands((2, "down"))
print([(command[0], command[1].value) if command else None for command in GLOBAL_QUEUE])
print("----")

[(2, 'up'), (3, 'up'), (2, 'down'), (1, 'up'), None]
(<LiftsPassState.SELECTED1: 1>, 1)
[(3, 'end1'), (2, 'end1'), (2, 'up'), (3, 'up'), (2, 'down'), None]
----
[(3, 'end1'), (2, 'end1'), (2, 'up'), (3, 'up'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 2)
[(3, 'end1'), (2, 'up'), (3, 'up'), (2, 'down'), None]
----
[(3, 'end1'), (2, 'up'), (3, 'up'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 2)
[(5, 'end1'), (3, 'end1'), (3, 'up'), (2, 'down'), None]
----
[(5, 'end1'), (3, 'end1'), (3, 'up'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 3)
[(5, 'end1'), (3, 'up'), (2, 'down'), None]
----
[(5, 'end1'), (3, 'up'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 3)
[(4, 'end1'), (5, 'end1'), (2, 'down'), None]
----
[(4, 'end1'), (5, 'end1'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 4)
[(5, 'end1'), (2, 'down'), None]
----
[(5, 'end1'), (2, 'down'), None]
(<LiftsPassState.SELECTED1: 1>, 5)
[(2, 'down'), None]
----
[(2, 'down'), None]
(<LiftsPassState.SELE

In [171]:
lifts = TwoLifts(floors=6, start_state=(1, 1), global_queue=GLOBAL_QUEUE)
lifts.generate_dca()
lifts.run()



Лифт 1: открыт
Лифт 1: закрыт
Лифты: [1, 1]->[2, 1]
Лифт 1: открыт
Лифт 1: закрыт
Лифты: [2, 1]->[3, 1]
Лифт 1: открыт
Лифт 1: закрыт
Лифты: [3, 1]->[4, 1]
Лифт 1: открыт
Лифт 1: закрыт
Лифты: [4, 1]->[5, 1]
Лифт 1: открыт
Лифт 1: закрыт
Лифты: [5, 1]->[5, 2]
Лифт 2: открыт
Лифт 2: закрыт
Лифты: [5, 2]->[5, 1]
Лифт 2: открыт
Лифт 2: закрыт


[1, 2, 3, 4]

In [119]:
a = {}
a[1, 2] = 1
a[None] = 2
a[1, 2]


    
    

1