diff --git a/invokeai/app/services/invoker.py b/invokeai/app/services/invoker.py index e3fa6da8513..a7c9ae444d6 100644 --- a/invokeai/app/services/invoker.py +++ b/invokeai/app/services/invoker.py @@ -49,7 +49,7 @@ def create_execution_state(self, graph: Graph | None = None) -> GraphExecutionSt new_state = GraphExecutionState(graph=Graph() if graph is None else graph) self.services.graph_execution_manager.set(new_state) return new_state - + def cancel(self, graph_execution_state_id: str) -> None: """Cancels the given execution state""" self.services.queue.cancel(graph_execution_state_id) @@ -71,18 +71,12 @@ def _start(self) -> None: for service in vars(self.services): self.__start_service(getattr(self.services, service)) - for service in vars(self.services): - self.__start_service(getattr(self.services, service)) - def stop(self) -> None: """Stops the invoker. A new invoker will have to be created to execute further.""" # First stop all services for service in vars(self.services): self.__stop_service(getattr(self.services, service)) - for service in vars(self.services): - self.__stop_service(getattr(self.services, service)) - self.services.queue.put(None) diff --git a/invokeai/app/services/processor.py b/invokeai/app/services/processor.py index c6229067503..35cbcd5068c 100644 --- a/invokeai/app/services/processor.py +++ b/invokeai/app/services/processor.py @@ -1,5 +1,5 @@ import traceback -from threading import Event, Thread +from threading import Event, Thread, BoundedSemaphore from ..invocations.baseinvocation import InvocationContext from .invocation_queue import InvocationQueueItem @@ -10,8 +10,11 @@ class DefaultInvocationProcessor(InvocationProcessorABC): __invoker_thread: Thread __stop_event: Event __invoker: Invoker + __threadLimit: BoundedSemaphore def start(self, invoker) -> None: + # if we do want multithreading at some point, we could make this configurable + self.__threadLimit = BoundedSemaphore(1) self.__invoker = invoker self.__stop_event = Event() self.__invoker_thread = Thread( @@ -20,7 +23,7 @@ def start(self, invoker) -> None: kwargs=dict(stop_event=self.__stop_event), ) self.__invoker_thread.daemon = ( - True # TODO: probably better to just not use threads? + True # TODO: make async and do not use threads ) self.__invoker_thread.start() @@ -29,6 +32,7 @@ def stop(self, *args, **kwargs) -> None: def __process(self, stop_event: Event): try: + self.__threadLimit.acquire() while not stop_event.is_set(): queue_item: InvocationQueueItem = self.__invoker.services.queue.get() if not queue_item: # Probably stopping @@ -110,7 +114,7 @@ def __process(self, stop_event: Event): ) pass - + # Check queue to see if this is canceled, and skip if so if self.__invoker.services.queue.is_canceled( graph_execution_state.id @@ -127,4 +131,6 @@ def __process(self, stop_event: Event): ) except KeyboardInterrupt: - ... # Log something? + pass # Log something? KeyboardInterrupt is probably not going to be seen by the processor + finally: + self.__threadLimit.release()