/
shell.py
148 lines (113 loc) · 6.08 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Parser for a :class:`aiida_shell.ShellJob` job."""
from __future__ import annotations
import pathlib
import re
import typing as t
from aiida.engine import ExitCode
from aiida.orm import Data, FolderData, SinglefileData
from aiida.parsers.parser import Parser
from aiida_shell.calculations.shell import ShellJob
__all__ = ('ShellParser',)
class ShellParser(Parser):
"""Parser for a :class:`aiida_shell.ShellJob` job."""
def parse(self, **kwargs: t.Any) -> ExitCode:
"""Parse the contents of the output files stored in the ``retrieved`` output node."""
dirpath = pathlib.Path(kwargs['retrieved_temporary_folder'])
missing_filepaths = self.parse_custom_outputs(dirpath)
exit_code = self.parse_default_outputs(dirpath)
if 'parser' in self.node.inputs:
try:
self.call_parser_hook(dirpath)
except Exception as exception:
return self.exit_code('ERROR_PARSER_HOOK_EXCEPTED', exception=exception)
if missing_filepaths:
return self.exit_code('ERROR_OUTPUT_FILEPATHS_MISSING', missing_filepaths=', '.join(missing_filepaths))
return exit_code
def exit_code(self, key: str, **kwargs: t.Any) -> ExitCode:
"""Return the exit code corresponding to the given key.
:param key: The key under which the exit code is registered.
:param kwargs: Any keyword arguments to format the exit code message.
:returns: The formatted exit code.
"""
exit_code: ExitCode = getattr(self.exit_codes, key).format(**kwargs)
return exit_code
@staticmethod
def format_link_label(filename: str) -> str:
"""Format the link label from a given filename.
Valid link labels can only contain alphanumeric characters and underscores, without consecutive underscores.
They can also not start with a number. So all characters that are not alphanumeric or an underscore are
converted to underscores, where consecutive underscores are merged into one. Filenames that start with a number
are prefixed with ``aiida_shell_``.
:param filename: The filename.
:returns: The link label.
"""
if re.match('^[0-9]+.*', filename):
filename = f'aiida_shell_{filename}'
alphanumeric = re.sub('[^0-9a-zA-Z_]+', '_', filename)
link_label = re.sub('_[_]+', '_', alphanumeric)
return link_label
def parse_default_outputs(self, dirpath: pathlib.Path) -> ExitCode:
"""Parse the output files that should have been retrieved by default.
:param dirpath: Directory containing the retrieved files.
:returns: An exit code.
"""
try:
with (dirpath / ShellJob.FILENAME_STDERR).open(mode='rb') as handle:
node_stderr = SinglefileData(handle, filename=ShellJob.FILENAME_STDERR)
except FileNotFoundError:
stderr = ''
else:
stderr = node_stderr.get_content() # type: ignore[assignment]
self.out(ShellJob.FILENAME_STDERR, node_stderr)
filename_stdout = self.node.get_option('output_filename') or ShellJob.FILENAME_STDOUT
try:
with (dirpath / filename_stdout).open(mode='rb') as handle:
node_stdout = SinglefileData(handle, filename=filename_stdout)
except FileNotFoundError:
return self.exit_code('ERROR_OUTPUT_STDOUT_MISSING')
self.out(self.format_link_label(filename_stdout), node_stdout)
try:
exit_status = int((dirpath / ShellJob.FILENAME_STATUS).read_text())
except FileNotFoundError:
return self.exit_code('ERROR_OUTPUT_STATUS_MISSING')
except ValueError:
return self.exit_code('ERROR_OUTPUT_STATUS_INVALID')
if exit_status != 0:
return self.exit_code('ERROR_COMMAND_FAILED', status=exit_status, stderr=stderr)
if stderr:
return self.exit_code('ERROR_STDERR_NOT_EMPTY')
return ExitCode()
def parse_custom_outputs(self, dirpath: pathlib.Path) -> list[str]:
"""Parse the output files that have been requested through the ``outputs`` input.
:param dirpath: Directory containing the retrieved files.
:returns: List of missing output filepaths.
"""
if 'outputs' not in self.node.inputs:
return []
missing_filepaths = []
for filename in self.node.inputs.outputs.get_list():
for filepath in dirpath.glob(filename) if '*' in filename else (dirpath / filename,):
if not filepath.exists():
missing_filepaths.append(filepath.name)
continue
if filepath.is_file():
self.out(self.format_link_label(filepath.name), SinglefileData(filepath, filename=filepath.name))
else:
self.out(self.format_link_label(filepath.name), FolderData(tree=filepath))
return missing_filepaths
def call_parser_hook(self, dirpath: pathlib.Path) -> None:
"""Execute the ``parser`` custom parser hook that was passed as input to the ``ShellJob``."""
unpickled_parser = self.node.inputs.parser.load()
results = unpickled_parser(self, dirpath) or {}
if not isinstance(results, dict) or any(not isinstance(value, Data) for value in results.values()):
raise TypeError(f'{unpickled_parser} did not return a dictionary of `Data` nodes but: {results}')
for key, value in results.items():
self.out(key, value)