Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR: Add two task managers and a base class for a worker #10

Merged
merged 12 commits into from
Oct 13, 2023
1 change: 1 addition & 0 deletions qtapputils/widgets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
# -----------------------------------------------------------------------------
from .range import RangeSpinBox, RangeWidget
from .waitingspinner import WaitingSpinner
from .taskmanagers import WorkerBase, TaskManagerBase, LIFOTaskManager
210 changes: 210 additions & 0 deletions qtapputils/widgets/taskmanagers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright © QtAppUtils Project Contributors
# https://github.com/jnsebgosselin/apputils
#
# This file is part of QtAppUtils.
# Licensed under the terms of the MIT License.
# -----------------------------------------------------------------------------
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
if TYPE_CHECKING:
from uuid import UUID

# ---- Standard imports
from collections import OrderedDict
from time import sleep
import uuid

# ---- Third party imports
from qtpy.QtCore import QObject, QThread, Signal


class WorkerBase(QObject):
"""
A worker to execute tasks without blocking the GUI.
"""
sig_task_completed = Signal(object, object)

def __init__(self):
super().__init__()
self._tasks = OrderedDict()

def add_task(self, task_uuid4, task, *args, **kargs):
"""
Add a task to the stack that will be executed when the thread of
this worker is started.
"""
self._tasks[task_uuid4] = (task, args, kargs)

def run_tasks(self):
"""Execute the tasks that were added to the stack."""
for task_uuid4, (task, args, kargs) in self._tasks.items():
if task is not None:
method_to_exec = getattr(self, '_' + task)
returned_values = method_to_exec(*args, **kargs)
else:
returned_values = args
self.sig_task_completed.emit(task_uuid4, returned_values)
self._tasks = OrderedDict()
self.thread().quit()


class TaskManagerBase(QObject):
"""
A basic FIFO (First-In, First-Out) task manager.
"""
sig_run_tasks_finished = Signal()

def __init__(self, verbose: bool = False):
super().__init__()
self.verbose = verbose

self._worker = None

self._task_callbacks = {}
self._task_data = {}

self._running_tasks = []
self._queued_tasks = []
self._pending_tasks = []
# Queued tasks are tasks whose execution has not been requested yet.
# This happens when we want the Worker to execute a list of tasks
# in a single run. All queued tasks are dumped in the list of pending
# tasks when `run_task` is called.
#
# Pending tasks are tasks whose execution was postponed due to
# the fact that the worker was busy. These tasks are run as soon
# as the worker becomes available.
#
# Running tasks are tasks that are being executed by the worker.

def run_tasks(
self, callback: Callable = None, returned_values: tuple = None):
"""
Execute all the tasks that were added to the stack.

Parameters
----------
callback : Callable, optional
A callback that will be called with the provided returned_values
after the current queued tasks have been all executed.
returned_values : tuple, optional
A list of values that will be passed to the callback function when
it is called.
"""
if callback is not None:
self.add_task(None, callback, returned_values)
self._run_tasks()

def add_task(self, task: Callable, callback: Callable, *args, **kargs):
"""Add a new task at the end of the queued tasks stack."""
self._add_task(task, callback, *args, **kargs)

def worker(self) -> WorkerBase:
"""Return the worker that is installed on this manager."""
return self._worker

def set_worker(self, worker: WorkerBase):
""""Install the provided worker on this manager"""
self._worker = worker
self._thread = QThread()
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run_tasks)

# Connect the worker signals to handlers.
self._worker.sig_task_completed.connect(self._handle_task_completed)

# ---- Private API
def _handle_task_completed(
self, task_uuid4: uuid.UUID, returned_values: tuple) -> None:
"""
Handle when a task has been completed by the worker.

This is the ONLY slot that should be called after a task is
completed by the worker.
"""
# Run the callback associated with the specified task UUID if any.
if self._task_callbacks[task_uuid4] is not None:
try:
self._task_callbacks[task_uuid4](*returned_values)
except TypeError:
# This means there is none 'returned_values'.
self._task_callbacks[task_uuid4]()

# Clean up completed task.
self._cleanup_task(task_uuid4)

# Execute pending tasks if worker is idle.
if len(self._running_tasks) == 0:
if len(self._pending_tasks) > 0:
self._run_pending_tasks()
else:
if self.verbose:
print('All pending tasks were executed.')
self.sig_run_tasks_finished.emit()

def _cleanup_task(self, task_uuid4: uuid.UUID):
"""Cleanup task associated with the specified UUID."""
del self._task_callbacks[task_uuid4]
del self._task_data[task_uuid4]
if task_uuid4 in self._running_tasks:
self._running_tasks.remove(task_uuid4)

def _add_task(self, task: Callable, callback: Callable, *args, **kargs):
"""Add a new task at the end of the stack of queued tasks."""
task_uuid4 = uuid.uuid4()
self._task_callbacks[task_uuid4] = callback
self._queued_tasks.append(task_uuid4)
self._task_data[task_uuid4] = (task, args, kargs)

def _run_tasks(self):
"""
Execute all the tasks that were added to the stack of queued tasks.
"""
self._pending_tasks.extend(self._queued_tasks)
self._queued_tasks = []
self._run_pending_tasks()

def _run_pending_tasks(self):
"""Execute all pending tasks."""
if len(self._running_tasks) == 0 and len(self._pending_tasks) > 0:
if self.verbose:
print('Executing {} pending tasks...'.format(
len(self._pending_tasks)))
# Even though the worker has executed all its tasks,
# we may still need to wait a little for it to stop properly.
i = 0
while self._thread.isRunning():
sleep(0.1)
i += 1
if i > 100:
print("Error: unable to stop {}'s working thread.".format(
self.__class__.__name__))

self._running_tasks = self._pending_tasks.copy()
self._pending_tasks = []
for task_uuid4 in self._running_tasks:
task, args, kargs = self._task_data[task_uuid4]
self._worker.add_task(task_uuid4, task, *args, **kargs)
self._thread.start()


class LIFOTaskManager(TaskManagerBase):
"""
A last-in, first out (LIFO) task manager manager, where there's always
at most one task in the queue, and if a new task is added, it overrides
or replaces the existing task.
"""

def _add_task(self, task: Callable, callback, *args, **kargs):
"""
Override method so that the tasks are managed as a LIFO
stack (Last-in, First out) instead of FIFO (First-In, First-Out).
"""
for task_uuid4 in self._pending_tasks:
self._cleanup_task(task_uuid4)
self._queued_tasks = []
self._pending_tasks = []
super()._add_task(task, callback, *args, **kargs)
self._run_tasks()
154 changes: 154 additions & 0 deletions qtapputils/widgets/tests/test_taskmanagers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright © SARDES Project Contributors
# https://github.com/cgq-qgc/sardes
#
# This file is part of SARDES.
# Licensed under the terms of the GNU General Public License.
# -----------------------------------------------------------------------------

"""
Tests for the TaskManagerBase class.
"""

# ---- Standard imports
from time import sleep

# ---- Third party imports
import pytest

# ---- Local imports
from qtapputils.widgets import WorkerBase, TaskManagerBase


# =============================================================================
# ---- Fixtures
# =============================================================================
@pytest.fixture
def DATA():
return [1, 2, 3, 4]


@pytest.fixture
def worker(DATA):
def _get_something():
sleep(0.5)
return DATA.copy(),

def _set_something(index, value):
sleep(0.5)
DATA[index] = value

worker = WorkerBase()
worker._get_something = _get_something
worker._set_something = _set_something
return worker


@pytest.fixture
def task_manager(worker, qtbot):
task_manager = TaskManagerBase()
task_manager.set_worker(worker)
yield task_manager

# We wait for the manager's thread to fully stop to avoid segfault error.
qtbot.waitUntil(lambda: not task_manager._thread.isRunning())


# =============================================================================
# ---- Tests
# =============================================================================
def test_run_tasks(task_manager, qtbot):
"""
Test that the task manager is managing queued tasks as expected.
"""
returned_values = []

def task_callback(data):
returned_values.append(data)

# Add some tasks to the manager.
task_manager.add_task('get_something', task_callback)
task_manager.add_task('get_something', task_callback)
task_manager.add_task('set_something', None, 2, -19.5)
task_manager.add_task('get_something', task_callback)

assert len(task_manager._queued_tasks) == 4
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 0
assert returned_values == []

# We then ask the manager to execute the queued tasks.
with qtbot.waitSignal(task_manager.sig_run_tasks_finished, timeout=5000):
task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 4

assert len(task_manager._running_tasks) == 0
assert len(returned_values) == 3
assert returned_values[0] == [1, 2, 3, 4]
assert returned_values[1] == [1, 2, 3, 4]
assert returned_values[2] == [1, 2, -19.5, 4]


def test_run_tasks_if_busy(task_manager, qtbot):
"""
Test that the manager is managing the queued tasks as expected
when adding new tasks while the worker is busy.
"""
returned_values = []

def task_callback(data):
returned_values.append(data)

# Add some tasks to the manager.
task_manager.add_task('get_something', task_callback)
task_manager.add_task('get_something', task_callback)
task_manager.add_task('set_something', None, 2, -19.5)
assert len(task_manager._queued_tasks) == 3
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 0

# We then ask the manager to execute the queued tasks.
with qtbot.waitSignal(task_manager.sig_run_tasks_finished, timeout=5000):
task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# While the worker is running, we add two other tasks to the manager.
task_manager.add_task('set_something', None, 1, 0.512)
task_manager.add_task('get_something', task_callback)
assert len(task_manager._queued_tasks) == 2
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# We then ask the manager to execute the tasks that we just added.
# These additional tasks should be run automatically after the first
# stack of tasks have been executed.
task_manager.run_tasks()
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 2
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# We then assert that all tasks have been executed as expected.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 0

assert len(returned_values) == 3
assert returned_values[0] == [1, 2, 3, 4]
assert returned_values[1] == [1, 2, 3, 4]
assert returned_values[2] == [1, 0.512, -19.5, 4]


if __name__ == "__main__":
pytest.main(['-x', __file__, '-v', '-rw'])