forked from pyinfra-dev/pyinfra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_operations.py
171 lines (134 loc) · 5.93 KB
/
test_operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
import platform
import warnings
from importlib import import_module
from os import listdir, path
from unittest import TestCase
from unittest.mock import patch
from pyinfra.api import FileDownloadCommand, FileUploadCommand, FunctionCommand, StringCommand
from pyinfra.context import ctx_host, ctx_state
from pyinfra_cli.util import json_encode
from .util import FakeState, JsonTest, create_host, get_command_string, parse_value, patch_files
PLATFORM_NAME = platform.system()
def parse_commands(commands):
json_commands = []
for command in commands:
if isinstance(command, str): # matches pyinfra/api/operation.py
command = StringCommand(command.strip())
if isinstance(command, StringCommand):
json_command = get_command_string(command)
elif isinstance(command, dict):
command["command"] = get_command_string(command["command"]).strip()
json_command = command
elif isinstance(command, FunctionCommand):
func_name = (
command.function if command.function == "__func__" else command.function.__name__
)
json_command = [
func_name,
list(command.args),
command.kwargs,
]
elif isinstance(command, FileUploadCommand):
if hasattr(command.src, "read"):
command.src.seek(0)
data = command.src.read()
if isinstance(data, bytes):
data = data.decode()
else:
data = str(command.src)
json_command = ["upload", data, str(command.dest)]
elif isinstance(command, FileDownloadCommand):
json_command = ["download", str(command.src), str(command.dest)]
else:
raise Exception("{0} is not a valid command!".format(command))
if command.connector_arguments:
command.connector_arguments["command"] = json_command
json_command = command.connector_arguments
json_commands.append(json_command)
return json_commands
def assert_commands(commands, wanted_commands):
try:
assert commands == wanted_commands
except AssertionError as e:
print()
print("--> COMMANDS OUTPUT:")
print(json.dumps(commands, indent=4, default=json_encode))
print("--> TEST WANTS:")
print(
json.dumps(
wanted_commands,
indent=4,
default=json_encode,
),
)
raise e
def make_operation_tests(arg):
# Get the operation we're testing against
module_name, op_name = arg.split(".")
module = import_module("pyinfra.operations.{0}".format(module_name))
op = getattr(module, op_name)
# Generate a test class
@patch("pyinfra.operations.files.get_timestamp", lambda: "a-timestamp")
@patch("pyinfra.operations.util.files.get_timestamp", lambda: "a-timestamp")
class TestTests(TestCase, metaclass=JsonTest):
jsontest_files = path.join("tests", "operations", arg)
jsontest_prefix = "test_{0}_{1}_".format(module_name, op_name)
@classmethod
def setUpClass(cls):
# Create a global fake state that attach to context state
cls.state = FakeState()
def jsontest_function(self, test_name, test_data):
if (
"require_platform" in test_data
and PLATFORM_NAME not in test_data["require_platform"]
):
return
op_test_name = "{0}/{1}.json".format(arg, test_name)
# Create a host with this tests facts and attach to context host
host = create_host(facts=test_data.get("facts", {}))
allowed_exception = test_data.get("exception")
args = parse_value(test_data.get("args", []))
kwargs = parse_value(test_data.get("kwargs", {}))
with ctx_state.use(self.state):
with ctx_host.use(host):
with patch_files(test_data.get("local_files", {})):
try:
output_commands = list(op._inner(*args, **kwargs))
except Exception as e:
if allowed_exception:
allowed_exception_names = allowed_exception.get("names")
if not allowed_exception_names:
allowed_exception_names = [allowed_exception["name"]]
if e.__class__.__name__ not in allowed_exception_names:
print("Wrong exception raised!")
raise
assert e.args[0] == allowed_exception["message"]
return
raise
commands = parse_commands(output_commands)
assert_commands(commands, test_data["commands"])
noop_description = test_data.get("noop_description")
if len(commands) == 0 or noop_description:
if noop_description is not None:
assert host.noop_description == noop_description
else:
assert host.noop_description is not None, "no noop description was set"
warnings.warn(
'No noop_description set for test: {0} (got "{1}")'.format(
op_test_name,
host.noop_description,
),
)
return TestTests
# Find available operation tests
operations = sorted(
[
filename
for filename in listdir(path.join("tests", "operations"))
if path.isdir(path.join("tests", "operations", filename))
],
)
# Generate the classes, attaching to locals so nose picks them up
for operation_name in operations:
locals()[operation_name] = make_operation_tests(operation_name)