Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions invokeai/app/services/invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def create_execution_state(self, graph: Graph | None = None) -> GraphExecutionSt
new_state = GraphExecutionState(graph=Graph() if graph is None else graph)
self.services.graph_execution_manager.set(new_state)
return new_state

def cancel(self, graph_execution_state_id: str) -> None:
"""Cancels the given execution state"""
self.services.queue.cancel(graph_execution_state_id)
Expand All @@ -71,18 +71,12 @@ def _start(self) -> None:
for service in vars(self.services):
self.__start_service(getattr(self.services, service))

for service in vars(self.services):
self.__start_service(getattr(self.services, service))

def stop(self) -> None:
"""Stops the invoker. A new invoker will have to be created to execute further."""
# First stop all services
for service in vars(self.services):
self.__stop_service(getattr(self.services, service))

for service in vars(self.services):
self.__stop_service(getattr(self.services, service))

self.services.queue.put(None)


Expand Down
14 changes: 10 additions & 4 deletions invokeai/app/services/processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import traceback
from threading import Event, Thread
from threading import Event, Thread, BoundedSemaphore

from ..invocations.baseinvocation import InvocationContext
from .invocation_queue import InvocationQueueItem
Expand All @@ -10,8 +10,11 @@ class DefaultInvocationProcessor(InvocationProcessorABC):
__invoker_thread: Thread
__stop_event: Event
__invoker: Invoker
__threadLimit: BoundedSemaphore

def start(self, invoker) -> None:
# if we do want multithreading at some point, we could make this configurable
self.__threadLimit = BoundedSemaphore(1)
self.__invoker = invoker
self.__stop_event = Event()
self.__invoker_thread = Thread(
Expand All @@ -20,7 +23,7 @@ def start(self, invoker) -> None:
kwargs=dict(stop_event=self.__stop_event),
)
self.__invoker_thread.daemon = (
True # TODO: probably better to just not use threads?
True # TODO: make async and do not use threads
)
self.__invoker_thread.start()

Expand All @@ -29,6 +32,7 @@ def stop(self, *args, **kwargs) -> None:

def __process(self, stop_event: Event):
try:
self.__threadLimit.acquire()
while not stop_event.is_set():
queue_item: InvocationQueueItem = self.__invoker.services.queue.get()
if not queue_item: # Probably stopping
Expand Down Expand Up @@ -110,7 +114,7 @@ def __process(self, stop_event: Event):
)

pass

# Check queue to see if this is canceled, and skip if so
if self.__invoker.services.queue.is_canceled(
graph_execution_state.id
Expand All @@ -127,4 +131,6 @@ def __process(self, stop_event: Event):
)

except KeyboardInterrupt:
... # Log something?
pass # Log something? KeyboardInterrupt is probably not going to be seen by the processor
finally:
self.__threadLimit.release()