forked from aiidateam/aiida-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_execmanager.py
514 lines (430 loc) · 21.2 KB
/
test_execmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the :mod:`aiida.engine.daemon.execmanager` module."""
import io
import os
import pathlib
import typing
import pytest
from aiida.common.datastructures import CalcInfo, CodeInfo, FileCopyOperation
from aiida.common.folders import Folder, SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.orm import CalcJobNode, FolderData, RemoteData, SinglefileData
from aiida.transports.plugins.local import LocalTransport
def serialize_file_hierarchy(dirpath: pathlib.Path) -> typing.Dict:
"""Serialize the file hierarchy at ``dirpath``.
.. note:: empty directories are ignored.
:param dirpath: the base path.
:return: a mapping representing the file hierarchy, where keys are filenames. The leafs correspond to files and the
values are the text contents.
"""
serialized: dict = {}
for root, _, files in os.walk(dirpath):
for filepath in files:
relpath = pathlib.Path(root).relative_to(dirpath)
subdir = serialized
if relpath.parts:
for part in relpath.parts:
subdir = subdir.setdefault(part, {})
subdir[filepath] = (pathlib.Path(root) / filepath).read_text()
return serialized
def create_file_hierarchy(hierarchy: typing.Dict, target: typing.Union[pathlib.Path, Folder]) -> None:
"""Create a file hierarchy in the target location.
.. note:: empty directories are ignored and are not created explicitly.
:param hierarchy: mapping with directory structure, e.g. returned by ``serialize_file_hierarchy``.
:param target: the target where the hierarchy should be created.
"""
for filename, value in hierarchy.items():
if isinstance(value, dict):
if isinstance(target, pathlib.Path):
create_file_hierarchy(value, target / filename)
elif isinstance(target, Folder):
create_file_hierarchy(value, target.get_subfolder(filename, create=True))
else:
raise TypeError('target must be either a `Path` or a `Folder` instance.')
elif isinstance(target, pathlib.Path):
target.mkdir(parents=True, exist_ok=True)
(target / filename).write_text(value)
elif isinstance(target, Folder):
with target.open(filename, 'w') as handle:
handle.write(value)
else:
raise TypeError('target must be either a `Path` or a `Folder` instance.')
@pytest.fixture
def file_hierarchy():
"""Return a sample nested file hierarchy."""
return {
'file_a.txt': 'file_a',
'path': {'file_b.txt': 'file_b', 'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}},
}
@pytest.fixture
def file_hierarchy_simple():
"""Return a simple nested file hierarchy."""
return {
'sub': {
'b': 'file_b',
},
'a': 'file_a',
}
@pytest.fixture
def node_and_calc_info(aiida_localhost, aiida_code_installed):
"""Return a ``CalcJobNode`` and associated ``CalcInfo`` instance."""
node = CalcJobNode(computer=aiida_localhost)
node.store()
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash').store()
code_info = CodeInfo()
code_info.code_uuid = code.uuid
calc_info = CalcInfo()
calc_info.uuid = node.uuid
calc_info.codes_info = [code_info]
return node, calc_info
def test_hierarchy_utility(file_hierarchy, tmp_path):
"""Test that the ``create_file_hierarchy`` and ``serialize_file_hierarchy`` function as intended.
This is tested by performing a round-trip.
"""
create_file_hierarchy(file_hierarchy, tmp_path)
assert serialize_file_hierarchy(tmp_path) == file_hierarchy
@pytest.mark.parametrize(
'retrieve_list, expected_hierarchy',
(
# Single file or folder, either toplevel or nested
(['file_a.txt'], {'file_a.txt': 'file_a'}),
(['path/sub/file_c.txt'], {'file_c.txt': 'file_c'}),
(['path'], {'path': {'file_b.txt': 'file_b', 'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}}),
(['path/sub'], {'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}),
(['*.txt'], {'file_a.txt': 'file_a'}),
(['*/*.txt'], {'file_b.txt': 'file_b'}),
# Single nested file that is retrieved keeping a varying level of depth of original hierarchy
([('path/sub/file_c.txt', '.', 3)], {'path': {'sub': {'file_c.txt': 'file_c'}}}),
([('path/sub/file_c.txt', '.', 2)], {'sub': {'file_c.txt': 'file_c'}}),
([('path/sub/file_c.txt', '.', 1)], {'file_c.txt': 'file_c'}),
([('path/sub/file_c.txt', '.', 0)], {'file_c.txt': 'file_c'}),
# Single nested folder that is retrieved keeping a varying level of depth of original hierarchy
([('path/sub', '.', 2)], {'path': {'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}}),
([('path/sub', '.', 1)], {'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}),
# Using globbing patterns
([('path/*', '.', 0)], {'file_b.txt': 'file_b', 'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}),
(
[('path/sub/*', '.', 0)],
{'file_c.txt': 'file_c', 'file_d.txt': 'file_d'},
), # This is identical to ['path/sub']
([('path/sub/*c.txt', '.', 2)], {'sub': {'file_c.txt': 'file_c'}}),
([('path/sub/*c.txt', '.', 0)], {'file_c.txt': 'file_c'}),
# Using globbing with depth `None` should maintain exact folder hierarchy
([('path/*.txt', '.', None)], {'path': {'file_b.txt': 'file_b'}}),
([('path/sub/*.txt', '.', None)], {'path': {'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}}),
# Different target directory
([('path/sub/file_c.txt', 'target', 3)], {'target': {'path': {'sub': {'file_c.txt': 'file_c'}}}}),
([('path/sub', 'target', 1)], {'target': {'sub': {'file_c.txt': 'file_c', 'file_d.txt': 'file_d'}}}),
([('path/sub/*c.txt', 'target', 2)], {'target': {'sub': {'file_c.txt': 'file_c'}}}),
# Missing files should be ignored and not cause the retrieval to except
(['file_a.txt', 'file_u.txt', 'path/file_u.txt', ('path/sub/file_u.txt', '.', 3)], {'file_a.txt': 'file_a'}),
),
)
def test_retrieve_files_from_list(
tmp_path_factory, generate_calculation_node, file_hierarchy, retrieve_list, expected_hierarchy
):
"""Test the `retrieve_files_from_list` function."""
source = tmp_path_factory.mktemp('source')
target = tmp_path_factory.mktemp('target')
create_file_hierarchy(file_hierarchy, source)
with LocalTransport() as transport:
node = generate_calculation_node()
transport.chdir(source)
execmanager.retrieve_files_from_list(node, transport, target, retrieve_list)
assert serialize_file_hierarchy(target) == expected_hierarchy
@pytest.mark.parametrize(
('local_copy_list', 'expected_hierarchy', 'pre_create_target_dir'),
(
([None, None], {'sub': {'b': 'file_b'}, 'a': 'file_a'}, False),
(['.', None], {'sub': {'b': 'file_b'}, 'a': 'file_a'}, False),
([None, '.'], {'sub': {'b': 'file_b'}, 'a': 'file_a'}, False),
(['.', '.'], {'sub': {'b': 'file_b'}, 'a': 'file_a'}, False),
([None, ''], {'sub': {'b': 'file_b'}, 'a': 'file_a'}, False),
(['sub', None], {'b': 'file_b'}, False),
([None, 'target'], {'target': {'sub': {'b': 'file_b'}, 'a': 'file_a'}}, False),
(['sub', 'target'], {'target': {'b': 'file_b'}}, False),
(['a', 'target/filename'], {'target': {'filename': 'file_a'}}, True),
),
)
def test_upload_local_copy_list(
fixture_sandbox,
node_and_calc_info,
file_hierarchy_simple,
tmp_path,
local_copy_list,
expected_hierarchy,
pre_create_target_dir,
):
"""Test the ``local_copy_list`` functionality in ``upload_calculation``."""
create_file_hierarchy(file_hierarchy_simple, tmp_path)
folder = FolderData()
folder.base.repository.put_object_from_tree(tmp_path)
folder.store()
node, calc_info = node_and_calc_info
calc_info.local_copy_list = [[folder.uuid] + local_copy_list]
with LocalTransport() as transport:
if pre_create_target_dir:
# This is a regression test for a bug that was introduced in 6898ff4d8c263cf08707c61411a005f6a7f731dd. The
# implementation copying files from the sandbox and the ``local_copy_list`` were changed.
# See https://github.com/aiidateam/aiida-core/pull/6348 for detailed discussion.
# A situation is simulated where the directory of a nested target filename in a ``local_copy_list`` already
# exists in the sandbox.
fixture_sandbox.get_subfolder('target', create=True)
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
# Check that none of the files were written to the repository of the calculation node, since they were communicated
# through the ``local_copy_list``.
assert node.base.repository.list_object_names() == []
# Now check that all contents were successfully written to the remote work directory
written_hierarchy = serialize_file_hierarchy(pathlib.Path(node.get_remote_workdir()))
assert written_hierarchy == expected_hierarchy
def test_upload_local_copy_list_files_folders(fixture_sandbox, node_and_calc_info, file_hierarchy, tmp_path):
"""Test the ``local_copy_list`` functionality in ``upload_calculation``.
Specifically, verify that files in the ``local_copy_list`` do not end up in the repository of the node.
"""
create_file_hierarchy(file_hierarchy, tmp_path)
folder = FolderData()
folder.base.repository.put_object_from_tree(tmp_path)
inputs = {
'file_x': SinglefileData(io.BytesIO(b'content_x')).store(),
'file_y': SinglefileData(io.BytesIO(b'content_y')).store(),
'folder': folder.store(),
}
node, calc_info = node_and_calc_info
calc_info.local_copy_list = [
(inputs['file_x'].uuid, inputs['file_x'].filename, './files/file_x'),
(inputs['file_y'].uuid, inputs['file_y'].filename, './files/file_y'),
(inputs['folder'].uuid, None, '.'),
]
with LocalTransport() as transport:
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
# Check that none of the files were written to the repository of the calculation node, since they were communicated
# through the ``local_copy_list``.
assert node.base.repository.list_object_names() == []
# Now check that all contents were successfully written to the remote working directory
written_hierarchy = serialize_file_hierarchy(pathlib.Path(node.get_remote_workdir()))
expected_hierarchy = file_hierarchy
expected_hierarchy['files'] = {}
expected_hierarchy['files']['file_x'] = 'content_x'
expected_hierarchy['files']['file_y'] = 'content_y'
assert expected_hierarchy == written_hierarchy
def test_upload_remote_symlink_list(fixture_sandbox, node_and_calc_info, file_hierarchy, tmp_path):
"""Test the ``remote_symlink_list`` functionality in ``upload_calculation``.
Nested subdirectories in the target should be automatically created.
"""
create_file_hierarchy(file_hierarchy, tmp_path)
node, calc_info = node_and_calc_info
calc_info.remote_symlink_list = [
(node.computer.uuid, str(tmp_path / 'path' / 'sub'), 'path/sub'),
(node.computer.uuid, str(tmp_path / 'file_a.txt'), 'file_a.txt'),
]
with LocalTransport() as transport:
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
filepath_workdir = pathlib.Path(node.get_remote_workdir())
assert (filepath_workdir / 'file_a.txt').is_symlink()
assert (filepath_workdir / 'path' / 'sub').is_symlink()
assert (filepath_workdir / 'file_a.txt').read_text() == 'file_a'
assert (filepath_workdir / 'path' / 'sub' / 'file_c.txt').read_text() == 'file_c'
@pytest.mark.parametrize(
'order, expected',
(
(None, 'remote'), # Default order should have remote last
(
[
FileCopyOperation.SANDBOX,
FileCopyOperation.REMOTE,
FileCopyOperation.LOCAL,
],
'local',
),
(
[
FileCopyOperation.REMOTE,
FileCopyOperation.LOCAL,
FileCopyOperation.SANDBOX,
],
'sandbox',
),
),
)
def test_upload_file_copy_operation_order(node_and_calc_info, aiida_localhost, tmp_path, order, expected):
"""Test the ``CalcInfo.file_copy_operation_order`` controls the copy order."""
dirpath_remote = tmp_path / 'remote'
dirpath_remote.mkdir()
dirpath_local = tmp_path / 'local'
dirpath_local.mkdir()
dirpath_sandbox = tmp_path / 'sandbox'
dirpath_sandbox.mkdir()
filepath_remote = dirpath_remote / 'file.txt'
filepath_remote.write_text('remote')
filepath_local = dirpath_local / 'file.txt'
filepath_local.write_text('local')
remote_data = RemoteData(remote_path=str(dirpath_remote), computer=aiida_localhost)
folder_data = FolderData(tree=dirpath_local)
sandbox = SandboxFolder(dirpath_sandbox)
sandbox.create_file_from_filelike(io.BytesIO(b'sandbox'), 'file.txt')
inputs = {
'local': folder_data,
'remote': remote_data,
}
node, calc_info = node_and_calc_info
calc_info.remote_copy_list = ((aiida_localhost.uuid, str(filepath_remote), 'file.txt'),)
calc_info.local_copy_list = ((folder_data.uuid, 'file.txt', 'file.txt'),)
if order is not None:
calc_info.file_copy_operation_order = order
with LocalTransport() as transport:
execmanager.upload_calculation(node, transport, calc_info, sandbox, inputs)
filepath = pathlib.Path(node.get_remote_workdir()) / 'file.txt'
assert filepath.is_file()
assert filepath.read_text() == expected
@pytest.mark.parametrize(
'sandbox_hierarchy, local_copy_list, remote_copy_list, expected_hierarchy, expected_exception',
[
## INTUITIVE BEHAVIOUR
# Only Sandbox
({'pseudo': {'Ba.upf': 'Ba pseudo'}}, (), (), {'pseudo': {'Ba.upf': 'Ba pseudo'}}, None),
# Sandbox creates folder; Local copy of a single file to target file in folder
# This is the QE use case for the `PwCalculation` plugin
(
{'pseudo': {}},
((SinglefileData, 'Ba pseudo', 'Ba.upf', 'pseudo/Ba.upf'),),
(),
{'pseudo': {'Ba.upf': 'Ba pseudo'}},
None,
),
# Sandbox creates folder; Remote copy of a single file to target folder
(
{'pseudo': {}},
(),
(({'pseudo': {'Ba.upf': 'Ba pseudo'}}, 'pseudo/Ba.upf', 'pseudo'),),
{'pseudo': {'Ba.upf': 'Ba pseudo'}},
None,
),
# Sandbox creates folder; Remote copy of a single file to target file in folder
(
{'pseudo': {}},
(),
(({'pseudo': {'Ba.upf': 'Ba pseudo'}}, 'pseudo/Ba.upf', 'pseudo/Ba.upf'),),
{'pseudo': {'Ba.upf': 'Ba pseudo'}},
None,
),
# Remote copy of a folder to the target "current directory"
({}, (), (({'pseudo': {'Ba.upf': 'Ba pseudo'}}, 'pseudo', '.'),), {'pseudo': {'Ba.upf': 'Ba pseudo'}}, None),
# Sandbox creates folder with nested folder; Remote copy of nested folder to target nested folder
# -> Copies the remote nested folder into target nested folder
(
{'folder': {'nested_folder': {'file': 'content'}}},
(),
(
(
{'folder': {'nested_folder': {'file': 'new_content'}}},
'folder/nested_folder',
'folder/nested_folder',
),
),
{'folder': {'nested_folder': {'file': 'content', 'nested_folder': {'file': 'new_content'}}}},
None,
),
## COUNTER-INTUITIVE BEHAVIOUR
# Sandbox creates folder; Local copy of a single file to target folder
# -> Fails outright since target folder exists
(
{'pseudo': {}},
((SinglefileData, 'Ba pseudo', 'Ba.upf', 'pseudo'),),
(),
{'pseudo': {'Ba.upf': 'Ba pseudo'}},
IsADirectoryError,
),
# Sandbox creates folder; Local copy of a folder -> Copies contents of folder to target folder
# Effectively this emulates the behaviour of `cp` with forward slash: `cp -r pseudo/ target/pseudo`
(
{'pseudo': {}},
((FolderData, {'pseudo': {'Ba.upf': 'Ba pseudo'}}, 'pseudo', 'pseudo'),),
(),
{'pseudo': {'Ba.upf': 'Ba pseudo'}},
None,
),
# Local copy of a folder to the "current directory"
# -> Copies the contents of the folder to the target current directory
({}, ((FolderData, {'pseudo': {'Ba.upf': 'Ba pseudo'}}, 'pseudo', '.'),), (), {'Ba.upf': 'Ba pseudo'}, None),
# Sandbox creates folder with nested folder; Local copy of nested folder to target nested folder
# -> Copies contents of nested folder to target nested folder
(
{'folder': {'nested_folder': {'file': 'content'}}},
(
(
FolderData,
{'folder': {'nested_folder': {'file': 'new_content'}}},
'folder/nested_folder',
'folder/nested_folder',
),
),
(),
{'folder': {'nested_folder': {'file': 'new_content'}}},
None,
),
],
)
def test_upload_combinations(
fixture_sandbox,
node_and_calc_info,
tmp_path,
sandbox_hierarchy,
local_copy_list,
remote_copy_list,
expected_hierarchy,
expected_exception,
):
"""Test the ``upload_calculation`` functions for various combinations of sandbox folders and copy lists.
The `local_copy_list` is formatted as a list of tuples, where each tuple contains the following elements:
- The class of the data node to be copied.
- The content of the data node to be copied. This can be either a string in case of a file, or a dictionary
representing the file hierarchy in case of a folder.
- The name of the file or directory to be copied.
- The relative path the data should be copied to.
The `remote_copy_list` is formatted as a list of tuples, where each tuple contains the following elements:
- A dictionary representing the file hierarchy that should be in the remote directory.
"""
create_file_hierarchy(sandbox_hierarchy, fixture_sandbox)
node, calc_info = node_and_calc_info
calc_info.local_copy_list = []
print(local_copy_list)
for copy_id, (data_class, content, filename, target_path) in enumerate(local_copy_list):
# Create a sub directroy in the temporary folder for each copy to avoid conflicts
sub_tmp_path_local = tmp_path / f'local_{copy_id}'
if issubclass(data_class, SinglefileData):
create_file_hierarchy({filename: content}, sub_tmp_path_local)
copy_node = SinglefileData(sub_tmp_path_local / filename).store()
calc_info.local_copy_list.append((copy_node.uuid, copy_node.filename, target_path))
elif issubclass(data_class, FolderData):
create_file_hierarchy(content, sub_tmp_path_local)
serialize_file_hierarchy(sub_tmp_path_local)
folder = FolderData()
folder.base.repository.put_object_from_tree(sub_tmp_path_local)
folder.store()
calc_info.local_copy_list.append((folder.uuid, filename, target_path))
calc_info.remote_copy_list = []
for copy_id, (hierarchy, source_path, target_path) in enumerate(remote_copy_list):
# Create a sub directroy in the temporary folder for each copy to avoid conflicts
sub_tmp_path_remote = tmp_path / f'remote_{copy_id}'
create_file_hierarchy(hierarchy, sub_tmp_path_remote)
calc_info.remote_copy_list.append(
(node.computer.uuid, (sub_tmp_path_remote / source_path).as_posix(), target_path)
)
if expected_exception is not None:
with pytest.raises(expected_exception):
with LocalTransport() as transport:
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
filepath_workdir = pathlib.Path(node.get_remote_workdir())
assert serialize_file_hierarchy(filepath_workdir) == expected_hierarchy
else:
with LocalTransport() as transport:
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
filepath_workdir = pathlib.Path(node.get_remote_workdir())
assert serialize_file_hierarchy(filepath_workdir) == expected_hierarchy