Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions ibllib/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import tempfile
import unittest
from unittest import mock
from pathlib import Path
from collections import OrderedDict

Expand Down Expand Up @@ -177,11 +178,13 @@ def test_pipeline_alyx(self):
self.assertTrue(len(tasks) == NTASKS)

# run them and make sure their statuses got updated appropriately
task_deck, datasets = pipeline.run(machine='testmachine')
check_statuses = [desired_statuses[t['name']] == t['status'] for t in task_deck]
# [(t['name'], t['status'], desired_statuses[t['name']]) for t in task_deck]
self.assertTrue(all(check_statuses))
self.assertTrue(set([d['name'] for d in datasets]) == set(desired_datasets))
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.td.name).joinpath('.gpu_lock')):
task_deck, datasets = pipeline.run(machine='testmachine')
check_statuses = [desired_statuses[t['name']] == t['status'] for t in task_deck]
# [(t['name'], t['status'], desired_statuses[t['name']]) for t in task_deck]
self.assertTrue(all(check_statuses))
self.assertTrue(set([d['name'] for d in datasets]) == set(desired_datasets))

# check logs
check_logs = [desired_logs in t['log'] if t['log'] else True for t in task_deck]
Expand All @@ -199,7 +202,9 @@ def test_pipeline_alyx(self):
self.assertTrue(all(check_statuses))

# test the rerun option
task_deck, dsets = pipeline.rerun_failed(machine='testmachine')
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.td.name).joinpath('.gpu_lock')):
task_deck, dsets = pipeline.rerun_failed(machine='testmachine')
task_02 = next(t for t in task_deck if t['name'] == 'Task02_error')
self.assertEqual('Complete', task_02['status'])
dep_task = next(x for x in task_deck if task_02['id'] in x['parents'])
Expand All @@ -213,17 +218,16 @@ def test_pipeline_alyx(self):
self.assertTrue(all(check_rerun))

# Rerun without clobber and check that logs are not overwritten
task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False)
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.td.name).joinpath('.gpu_lock')):
task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False)
check_logs = [t['log'].count(desired_logs) == desired_logs_rerun[t['name']] if t['log']
else t['log'] == desired_logs_rerun[t['name']] for t in task_deck]
check_rerun = ['===RERUN===' in t['log'] if desired_logs_rerun[t['name']] == 2
else True for t in task_deck]
self.assertTrue(all(check_logs))
self.assertTrue(all(check_rerun))

# Remove the lock file
Path.home().joinpath('.one', 'gpu.lock').unlink()


class GpuTask(ibllib.pipes.tasks.Task):
gpu = 1
Expand All @@ -238,28 +242,29 @@ class TestLocks(unittest.TestCase):

def test_gpu_lock_and_local_data_handler(self) -> None:
# Remove any existing locks first
if Path.home().joinpath('.one', 'gpu.lock').exists():
Path.home().joinpath('.one', 'gpu.lock').unlink()
with tempfile.TemporaryDirectory() as td:
session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
session_path.joinpath('alf').mkdir(parents=True)
task = GpuTask(session_path, one=None, location='local')
assert task.is_locked() is False
task.run()
assert task.status == 0
assert task.is_locked() is False
# then make a lock file and make sure it fails and is still locked afterwards
task._make_lock_file()
task.run()
assert task.status == - 2
assert task.is_locked()
# test the time out feature
task.time_out_secs = - 1
task._make_lock_file()
assert not task.is_locked()
task.run()
assert task.status == 0


if __name__ == "__main__":
# Patch _lock_file_path method to point to different lock file location
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(td).joinpath('.gpu_lock')):
self.assertFalse(task.is_locked())
task.run()
self.assertEqual(0, task.status)
self.assertFalse(task.is_locked())
# then make a lock file and make sure it fails and is still locked afterwards
task._make_lock_file()
task.run()
self.assertEqual(-2, task.status)
self.assertTrue(task.is_locked())
# test the time out feature
task.time_out_secs = - 1
task._make_lock_file()
self.assertFalse(task.is_locked())
task.run()
self.assertEqual(0, task.status)


if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)