Skip to content

Commit

Permalink
Add test_lifo_run_tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsebgosselin committed Oct 17, 2023
1 parent 83719ff commit 6d1e072
Showing 1 changed file with 106 additions and 37 deletions.
143 changes: 106 additions & 37 deletions qtapputils/widgets/tests/test_taskmanagers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright © SARDES Project Contributors
# https://github.com/cgq-qgc/sardes
# Copyright © QtAppUtils Project Contributors
# https://github.com/jnsebgosselin/apputils
#
# This file is part of SARDES.
# Licensed under the terms of the GNU General Public License.
# This file is part of QtAppUtils.
# Licensed under the terms of the MIT License.
# -----------------------------------------------------------------------------

"""
Tests for the TaskManagerBase class.
Tests for the taskmanagers module.
"""

# ---- Standard imports
Expand All @@ -19,7 +19,7 @@
from qtpy.QtTest import QSignalSpy

# ---- Local imports
from qtapputils.widgets import WorkerBase, TaskManagerBase
from qtapputils.widgets import WorkerBase, TaskManagerBase, LIFOTaskManager


# =============================================================================
Expand Down Expand Up @@ -56,6 +56,16 @@ def task_manager(worker, qtbot):
qtbot.waitUntil(lambda: not task_manager._thread.isRunning())


@pytest.fixture
def lifo_task_manager(worker, qtbot):
task_manager = LIFOTaskManager()
task_manager.set_worker(worker)
yield task_manager

# We wait for the manager's thread to fully stop to avoid segfault error.
qtbot.waitUntil(lambda: not task_manager._thread.isRunning())


# =============================================================================
# ---- Tests
# =============================================================================
Expand Down Expand Up @@ -84,13 +94,14 @@ def task_callback(data):
assert returned_values == []

# We then ask the manager to execute the queued tasks.
with qtbot.waitSignal(task_manager.sig_run_tasks_finished, timeout=5000):
task_manager.run_tasks()
task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 4
# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 4

qtbot.waitUntil(lambda: len(end_signal_spy) == 1, timeout=5000)

assert len(task_manager._running_tasks) == 0
assert len(returned_values) == 3
Expand Down Expand Up @@ -126,31 +137,32 @@ def task_callback(data):
assert len(task_manager._running_tasks) == 0

# We then ask the manager to execute the queued tasks.
with qtbot.waitSignal(task_manager.sig_run_tasks_finished, timeout=5000):
task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# While the worker is running, we add two other tasks to the manager.
task_manager.add_task('set_something', None, 1, 0.512)
task_manager.add_task('get_something', task_callback)
assert len(task_manager._queued_tasks) == 2
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# We then ask the manager to execute the tasks that we just added.
# These additional tasks should be run automatically after the first
# stack of tasks have been executed.
task_manager.run_tasks()
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 2
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()
task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# While the worker is running, we add two other tasks to the manager.
task_manager.add_task('set_something', None, 1, 0.512)
task_manager.add_task('get_something', task_callback)
assert len(task_manager._queued_tasks) == 2
assert len(task_manager._pending_tasks) == 0
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

# We then ask the manager to execute the tasks that we just added.
# These additional tasks should be run automatically after the first
# stack of tasks have been executed.
task_manager.run_tasks()
assert len(task_manager._queued_tasks) == 0
assert len(task_manager._pending_tasks) == 2
assert len(task_manager._running_tasks) == 3
assert task_manager._thread.isRunning()

qtbot.waitUntil(lambda: len(end_signal_spy) == 1, timeout=5000)

# We then assert that all tasks have been executed as expected.
assert len(task_manager._queued_tasks) == 0
Expand All @@ -167,5 +179,62 @@ def task_callback(data):
assert len(end_signal_spy) == 1


def test_lifo_run_tasks(lifo_task_manager, qtbot, DATA):
"""
Test that the LIFO tasks manager is working as expected.
"""
# Add spy to the signals.
start_signal_spy = QSignalSpy(lifo_task_manager.sig_run_tasks_started)
end_signal_spy = QSignalSpy(lifo_task_manager.sig_run_tasks_finished)

# Add some tasks to the manager. Note that only the last
# task added will be effectively added to the queue.
lifo_task_manager.add_task('set_something', None, 0, 11)
lifo_task_manager.add_task('set_something', None, 1, 22)
lifo_task_manager.add_task('set_something', None, 2, 33)
lifo_task_manager.add_task('set_something', None, 3, 44)

# We assert that each signal were not called.
assert len(start_signal_spy) == 0
assert len(end_signal_spy) == 0

assert len(lifo_task_manager._queued_tasks) == 1
assert len(lifo_task_manager._pending_tasks) == 0
assert len(lifo_task_manager._running_tasks) == 0

# We then ask the manager to execute the queued task while adding
# more task.
lifo_task_manager.run_tasks()

# Assert that all queued tasks are now running tasks.
assert len(lifo_task_manager._queued_tasks) == 0
assert len(lifo_task_manager._pending_tasks) == 0
assert len(lifo_task_manager._running_tasks) == 1

# While the worker is running, we add two other tasks to the manager.
# Note that the second task should override the first one
lifo_task_manager.add_task('set_something', None, 1, 222)
lifo_task_manager.run_tasks()
lifo_task_manager.add_task('set_something', None, 2, 333)
lifo_task_manager.run_tasks()
assert len(lifo_task_manager._queued_tasks) == 0
assert len(lifo_task_manager._pending_tasks) == 1
assert len(lifo_task_manager._running_tasks) == 1
assert lifo_task_manager._thread.isRunning()

qtbot.waitUntil(lambda: len(end_signal_spy) == 1, timeout=5000)

assert len(lifo_task_manager._queued_tasks) == 0
assert len(lifo_task_manager._pending_tasks) == 0
assert len(lifo_task_manager._running_tasks) == 0

# Assert that DATA was modified as expected.
assert DATA == [1, 2, 333, 44]

# We assert that each signal were called only once.
assert len(start_signal_spy) == 1
assert len(end_signal_spy) == 1


if __name__ == "__main__":
pytest.main(['-x', __file__, '-v', '-rw'])

0 comments on commit 6d1e072

Please sign in to comment.