-
Notifications
You must be signed in to change notification settings - Fork 545
/
common.py
249 lines (212 loc) · 8.09 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
__authors__ = ["Tobias Marschall", "Marcel Martin", "Johannes Köster"]
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import os
import signal
import sys
import shlex
import shutil
import time
from os.path import join
import tempfile
import hashlib
import urllib
import pytest
import glob
import subprocess
from snakemake import snakemake
from snakemake.shell import shell
from snakemake.common import ON_WINDOWS
def dpath(path):
"""get the path to a data file (relative to the directory this
test lives in)"""
return os.path.realpath(join(os.path.dirname(__file__), path))
def md5sum(filename, ignore_newlines=False):
if ignore_newlines:
with open(filename, "r", encoding="utf-8", errors="surrogateescape") as f:
data = f.read().encode("utf8", errors="surrogateescape")
else:
data = open(filename, "rb").read()
return hashlib.md5(data).hexdigest()
# test skipping
def is_connected():
try:
urllib.request.urlopen("http://www.google.com", timeout=1)
return True
except urllib.request.URLError:
return False
def is_ci():
return "CI" in os.environ
def has_gcloud_service_key():
return "GCP_AVAILABLE" in os.environ
def has_zenodo_token():
return os.environ.get("ZENODO_SANDBOX_PAT")
gcloud = pytest.mark.skipif(
not is_connected() or not has_gcloud_service_key(),
reason="Skipping GCLOUD tests because not on "
"CI, no inet connection or not logged "
"in to gcloud.",
)
connected = pytest.mark.skipif(not is_connected(), reason="no internet connection")
ci = pytest.mark.skipif(not is_ci(), reason="not in CI")
not_ci = pytest.mark.skipif(is_ci(), reason="skipped in CI")
zenodo = pytest.mark.skipif(
not has_zenodo_token(), reason="no ZENODO_SANDBOX_PAT provided"
)
def copy(src, dst):
if os.path.isdir(src):
shutil.copytree(src, os.path.join(dst, os.path.basename(src)))
else:
shutil.copy(src, dst)
def get_expected_files(results_dir):
"""Recursively walk through the expected-results directory to enumerate
all expected files."""
return [
os.path.relpath(f, results_dir)
for f in glob.iglob(os.path.join(results_dir, "**/**"), recursive=True)
if not os.path.isdir(f)
]
def run(
path,
shouldfail=False,
snakefile="Snakefile",
subpath=None,
no_tmpdir=False,
tmpdir=None,
check_md5=True,
check_results=True,
cores=3,
nodes=1,
set_pythonpath=True,
cleanup=True,
conda_frontend="mamba",
config=dict(),
targets=None,
container_image=os.environ.get("CONTAINER_IMAGE", "snakemake/snakemake:main"),
shellcmd=None,
sigint_after=None,
**params,
):
"""
Test the Snakefile in the path.
There must be a Snakefile in the path and a subdirectory named
expected-results. If cleanup is False, we return the temporary
directory to the calling test for inspection, and the test should
clean it up.
"""
if set_pythonpath:
# Enforce current workdir (the snakemake source dir) to also be in PYTHONPATH
# when subprocesses are invoked in the tempdir defined below.
os.environ["PYTHONPATH"] = os.getcwd()
elif "PYTHONPATH" in os.environ:
del os.environ["PYTHONPATH"]
results_dir = join(path, "expected-results")
original_snakefile = join(path, snakefile)
assert os.path.exists(original_snakefile)
assert os.path.exists(results_dir) and os.path.isdir(
results_dir
), "{} does not exist".format(results_dir)
# If we need to further check results, we won't cleanup tmpdir
if not tmpdir:
tmpdir = next(tempfile._get_candidate_names())
tmpdir = os.path.join(tempfile.gettempdir(), "snakemake-%s" % tmpdir)
os.mkdir(tmpdir)
config = dict(config)
# handle subworkflow
if subpath is not None:
# set up a working directory for the subworkflow and pass it in `config`
# for now, only one subworkflow is supported
assert os.path.exists(subpath) and os.path.isdir(
subpath
), "{} does not exist".format(subpath)
subworkdir = os.path.join(tmpdir, "subworkdir")
os.mkdir(subworkdir)
# copy files
for f in os.listdir(subpath):
copy(os.path.join(subpath, f), subworkdir)
config["subworkdir"] = subworkdir
# copy files
for f in os.listdir(path):
copy(os.path.join(path, f), tmpdir)
# Snakefile is now in temporary directory
snakefile = join(tmpdir, snakefile)
# run snakemake
if shellcmd:
if not shellcmd.startswith("snakemake"):
raise ValueError("shellcmd does not start with snakemake")
shellcmd = "{} -m {}".format(sys.executable, shellcmd)
try:
if sigint_after is None:
subprocess.check_output(
shellcmd, cwd=path if no_tmpdir else tmpdir, shell=True
)
success = True
else:
with subprocess.Popen(
shlex.split(shellcmd), cwd=path if no_tmpdir else tmpdir
) as process:
time.sleep(sigint_after)
process.send_signal(signal.SIGINT)
time.sleep(2)
success = process.returncode == 0
except subprocess.CalledProcessError as e:
success = False
print(e.stderr, file=sys.stderr)
else:
assert sigint_after is None, "Cannot sent SIGINT when calling directly"
success = snakemake(
snakefile=original_snakefile if no_tmpdir else snakefile,
cores=cores,
nodes=nodes,
workdir=path if no_tmpdir else tmpdir,
stats="stats.txt",
config=config,
verbose=True,
targets=targets,
conda_frontend=conda_frontend,
container_image=container_image,
**params,
)
if shouldfail:
assert not success, "expected error on execution"
else:
assert success, "expected successful execution"
if check_results:
for resultfile in get_expected_files(results_dir):
if resultfile in [".gitignore", ".gitkeep"] or not os.path.isfile(
os.path.join(results_dir, resultfile)
):
# this means tests cannot use directories as output files
continue
targetfile = join(tmpdir, resultfile)
expectedfile = join(results_dir, resultfile)
if ON_WINDOWS:
if os.path.exists(join(results_dir, resultfile + "_WIN")):
continue # Skip test if a Windows specific file exists
if resultfile.endswith("_WIN"):
targetfile = join(tmpdir, resultfile[:-4])
elif resultfile.endswith("_WIN"):
# Skip win specific result files on Posix platforms
continue
assert os.path.exists(
targetfile
), 'expected file "{}" not produced'.format(resultfile)
if check_md5:
md5expected = md5sum(expectedfile, ignore_newlines=ON_WINDOWS)
md5target = md5sum(targetfile, ignore_newlines=ON_WINDOWS)
if md5target != md5expected:
with open(expectedfile) as expected:
expected_content = expected.read()
with open(targetfile) as target:
content = target.read()
assert (
False
), "wrong result produced for file '{resultfile}':\n------found------\n{content}\n-----expected-----\n{expected_content}\n-----------------".format(
resultfile=resultfile,
content=content,
expected_content=expected_content,
)
if not cleanup:
return tmpdir
shutil.rmtree(tmpdir, ignore_errors=ON_WINDOWS)