/
cli.py
2031 lines (1899 loc) · 78.6 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2023, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import argparse
import sys
from typing import Set
import configargparse
from snakemake import logging
import snakemake.common.argparse
from snakemake.api import SnakemakeApi, _get_executor_plugin_registry, resolve_snakefile
import os
import glob
from argparse import ArgumentDefaultsHelpFormatter
from pathlib import Path
import re
import shlex
from importlib.machinery import SourceFileLoader
from snakemake.settings import (
ChangeType,
ConfigSettings,
DAGSettings,
DeploymentMethod,
DeploymentSettings,
ExecutionSettings,
NotebookEditMode,
OutputSettings,
PreemptibleRules,
Quietness,
RemoteExecutionSettings,
ResourceSettings,
SchedulingSettings,
StorageSettings,
WorkflowSettings,
)
from snakemake_interface_executor_plugins.settings import ExecMode
from snakemake_interface_storage_plugins.registry import StoragePluginRegistry
from snakemake.target_jobs import parse_target_jobs_cli_args
from snakemake.workflow import Workflow
from snakemake.dag import Batch
from snakemake.exceptions import (
CliException,
ResourceScopesException,
print_exception,
)
from snakemake.utils import update_config, available_cpu_count
from snakemake.common import (
SNAKEFILE_CHOICES,
__version__,
async_run,
get_appdirs,
get_container_image,
parse_key_value_arg,
)
from snakemake.resources import ResourceScopes, parse_resources, DefaultResources
from snakemake.settings import RerunTrigger
def parse_set_threads(args):
return parse_set_ints(
args,
"Invalid threads definition: entries have to be defined as RULE=THREADS pairs "
"(with THREADS being a positive integer).",
)
def parse_set_resources(args):
errmsg = (
"Invalid resource definition: entries have to be defined as RULE:RESOURCE=VALUE, with "
"VALUE being a positive integer or a string."
)
from collections import defaultdict
assignments = defaultdict(dict)
if args is not None:
for entry in args:
key, value = parse_key_value_arg(entry, errmsg=errmsg)
key = key.split(":")
if len(key) != 2:
raise ValueError(errmsg)
rule, resource = key
try:
value = int(value)
except ValueError:
assignments[rule][resource] = value
continue
if value < 0:
raise ValueError(errmsg)
assignments[rule][resource] = value
return assignments
def parse_set_scatter(args):
return parse_set_ints(
args,
"Invalid scatter definition: entries have to be defined as NAME=SCATTERITEMS pairs "
"(with SCATTERITEMS being a positive integer).",
)
def parse_set_resource_scope(args):
err_msg = (
"Invalid resource scopes: entries must be defined as RESOURCE=SCOPE pairs, "
"where SCOPE is either 'local', 'global', or 'excluded'"
)
if args is not None:
try:
return ResourceScopes(
parse_key_value_arg(entry, errmsg=err_msg) for entry in args
)
except ResourceScopesException as err:
invalid_resources = ", ".join(
f"'{res}={scope}'" for res, scope in err.invalid_resources.items()
)
raise ValueError(f"{err.msg} (got {invalid_resources})")
return ResourceScopes()
def parse_set_ints(arg, errmsg):
assignments = dict()
if arg is not None:
for entry in arg:
key, value = parse_key_value_arg(entry, errmsg=errmsg)
try:
value = int(value)
except ValueError:
raise ValueError(errmsg)
if value < 0:
raise ValueError(errmsg)
assignments[key] = value
return assignments
def parse_batch(args):
errmsg = "Invalid batch definition: batch entry has to be defined as RULE=BATCH/BATCHES (with integers BATCH <= BATCHES, BATCH >= 1)."
if args.batch is not None:
rule, batchdef = parse_key_value_arg(args.batch, errmsg=errmsg)
try:
batch, batches = batchdef.split("/")
batch = int(batch)
batches = int(batches)
except ValueError:
raise ValueError(errmsg)
if batch > batches or batch < 1:
raise ValueError(errmsg)
return Batch(rule, batch, batches)
return None
def parse_groups(args):
errmsg = "Invalid groups definition: entries have to be defined as RULE=GROUP pairs"
overwrite_groups = dict()
if args.groups is not None:
for entry in args.groups:
rule, group = parse_key_value_arg(entry, errmsg=errmsg)
overwrite_groups[rule] = group
return overwrite_groups
def parse_group_components(args):
errmsg = "Invalid group components definition: entries have to be defined as GROUP=COMPONENTS pairs (with COMPONENTS being a positive integer)"
group_components = dict()
if args.group_components is not None:
for entry in args.group_components:
group, count = parse_key_value_arg(entry, errmsg=errmsg)
try:
count = int(count)
except ValueError:
raise ValueError(errmsg)
if count <= 0:
raise ValueError(errmsg)
group_components[group] = count
return group_components
def _bool_parser(value):
if value == "True":
return True
elif value == "False":
return False
raise ValueError
def parse_config(entries):
"""Parse config from args."""
import yaml
yaml_base_load = lambda s: yaml.load(s, Loader=yaml.loader.BaseLoader)
parsers = [int, float, _bool_parser, yaml_base_load, str]
config = dict()
if entries:
valid = re.compile(r"[a-zA-Z_]\w*$")
for entry in entries:
key, val = parse_key_value_arg(
entry,
errmsg="Invalid config definition: Config entries have to be defined as name=value pairs.",
)
if not valid.match(key):
raise ValueError(
"Invalid config definition: Config entry must start with a valid identifier."
)
v = None
if val == "":
update_config(config, {key: v})
continue
for parser in parsers:
try:
v = parser(val)
# avoid accidental interpretation as function
if not callable(v):
break
except:
pass
assert v is not None
update_config(config, {key: v})
return config
def parse_cores(cores):
if cores == "all":
return available_cpu_count()
try:
return int(cores)
except ValueError:
raise CliException(
"Error parsing number of cores (--cores, -c): must be integer or 'all'."
)
def parse_jobs(jobs):
if jobs == "unlimited":
return sys.maxsize
try:
return int(jobs)
except ValueError:
raise CliException(
"Error parsing number of jobs (--jobs, -j): must be integer."
)
def get_profile_dir(profile: str) -> (Path, Path):
config_pattern = re.compile(r"config(.v(?P<min_major>\d+)\+)?.yaml")
def get_config_min_major(filename):
m = config_pattern.match(filename)
if m:
min_major = m.group("min_major")
if min_major is None:
return 0
min_major = int(min_major)
return min_major
return None
dirs = get_appdirs()
if os.path.exists(profile):
parent_dir = os.path.dirname(profile) or "."
search_dirs = [parent_dir]
profile = os.path.basename(profile)
else:
search_dirs = [os.getcwd(), dirs.user_config_dir, dirs.site_config_dir]
for d in search_dirs:
profile_candidate = Path(d) / profile
if profile_candidate.exists():
files = os.listdir(profile_candidate)
curr_major = int(__version__.split(".")[0])
config_files = {
f: min_major
for f, min_major in zip(files, map(get_config_min_major, files))
if min_major is not None and curr_major >= min_major
}
if config_files:
config_file = max(config_files, key=config_files.get)
return profile_candidate, profile_candidate / config_file
def get_profile_file(profile_dir: Path, file, return_default=False):
p = profile_dir / file
# "file" can actually be a full command. If so, `p` won't exist as the
# below would check if e.g. '/path/to/profile/script --arg1 val --arg2'
# exists. To fix this, we use shlex.split() to get the path to the
# script. We check for both, in case the path contains spaces or some
# other thing that would cause shlex.split() to mangle the path
# inaccurately.
if p.exists() or os.path.exists(shlex.split(str(p))[0]):
return p
if return_default:
return file
return None
def get_argument_parser(profiles=None):
"""Generate and return argument parser."""
from snakemake.profiles import ProfileConfigFileParser
dirs = get_appdirs()
config_files = []
profile_dir = None
if profiles:
for profile in profiles:
if profile == "":
print("Error: invalid profile name.", file=sys.stderr)
exit(1)
profile_entry = get_profile_dir(profile)
if profile_entry is not None:
profile_dir, config_file = profile_entry
config_files.append(config_file)
else:
print(
"Error: profile given but no config.yaml found. "
"Profile has to be given as either absolute path, relative "
"path or name of a directory available in either "
"{site} or {user}.".format(
site=dirs.site_config_dir, user=dirs.user_config_dir
),
file=sys.stderr,
)
exit(1)
parser = snakemake.common.argparse.ArgumentParser(
description="Snakemake is a Python based language and execution "
"environment for GNU Make-like workflows.",
formatter_class=ArgumentDefaultsHelpFormatter,
default_config_files=config_files,
config_file_parser_class=ProfileConfigFileParser,
)
group_exec = parser.add_argument_group("EXECUTION")
group_exec.add_argument(
"targets",
nargs="*",
default=set(),
help="Targets to build. May be rules or files.",
)
group_exec.add_argument(
"--dry-run",
"--dryrun",
"-n",
dest="dryrun",
action="store_true",
help="Do not execute anything, and display what would be done. "
"If you have a very large workflow, use --dry-run --quiet to just "
"print a summary of the DAG of jobs.",
)
group_exec.add_argument(
"--profile",
help=f"""
Name of profile to use for configuring
Snakemake. Snakemake will search for a corresponding
folder in {dirs.site_config_dir} and {dirs.user_config_dir}. Alternatively, this can be an
absolute or relative path.
The profile folder has to contain a file 'config.yaml'.
This file can be used to set default values for command
line options in YAML format. For example,
'--cluster qsub' becomes 'cluster: qsub' in the YAML
file. Profiles can be obtained from
https://github.com/snakemake-profiles.
The profile can also be set via the environment variable $SNAKEMAKE_PROFILE.
To override this variable and use no profile at all, provide the value 'none'
to this argument.
""",
env_var="SNAKEMAKE_PROFILE",
)
group_exec.add_argument(
"--workflow-profile",
help="""
Path (relative to current directory) to workflow specific profile
folder to use for configuring Snakemake with parameters specific for this
workflow (like resources).
If this flag is not used, Snakemake will by default use
'profiles/default' if present (searched both relative to current directory
and relative to Snakefile, in this order).
For skipping any workflow specific profile provide the special value 'none'.
Settings made in the workflow profile will override settings made in the
general profile (see --profile).
The profile folder has to contain a file 'config.yaml'.
This file can be used to set default values for command
line options in YAML format. For example,
'--executor slurm' becomes 'executor: slurm' in the YAML
file. It is advisable to use the workflow profile to set
or overwrite e.g. workflow specific resources like the amount of threads
of a particular rule or the amount of memory needed.
Note that in such cases, the arguments may be given as nested YAML mappings
in the profile, e.g. 'set-threads: myrule: 4' instead of 'set-threads: myrule=4'.
""",
)
group_exec.add_argument(
"--cache",
nargs="*",
metavar="RULE",
help="Store output files of given rules in a central cache given by the environment "
"variable $SNAKEMAKE_OUTPUT_CACHE. Likewise, retrieve output files of the given rules "
"from this cache if they have been created before (by anybody writing to the same cache), "
"instead of actually executing the rules. Output files are identified by hashing all "
"steps, parameters and software stack (conda envs or containers) needed to create them.",
)
group_exec.add_argument(
"--snakefile",
"-s",
metavar="FILE",
type=Path,
help=(
"The workflow definition in form of a snakefile."
"Usually, you should not need to specify this. "
"By default, Snakemake will search for {} "
"beneath the current working "
"directory, in this order. "
"Only if you definitely want a different layout, "
"you need to use this parameter."
).format(", ".join(map("'{}'".format, SNAKEFILE_CHOICES))),
)
group_exec.add_argument(
"--cores",
"-c",
action="store",
metavar="N",
type=parse_cores,
help=(
"Use at most N CPU cores/jobs in parallel. "
"If N is omitted or 'all', the limit is set to the number of "
"available CPU cores. "
"In case of cluster/cloud execution, this argument sets the maximum number "
"of cores requested from the cluster or cloud scheduler. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)"
"This number is available to rules via workflow.cores."
),
)
group_exec.add_argument(
"--jobs",
"-j",
metavar="N",
action="store",
type=parse_jobs,
help=(
"Use at most N CPU cluster/cloud jobs in parallel. For local execution this is "
"an alias for --cores (it is though recommended to use --cores in that case). "
"Note: Set to 'unlimited' to allow any number of parallel jobs."
),
)
group_exec.add_argument(
"--local-cores",
action="store",
default=available_cpu_count(),
metavar="N",
type=int,
help=(
"In cluster/cloud mode, use at most N cores of the host machine in parallel "
"(default: number of CPU cores of the host). The cores are used to execute "
"local rules. This option is ignored when not in cluster/cloud mode."
),
)
group_exec.add_argument(
"--resources",
"--res",
nargs="+",
metavar="NAME=INT",
default=dict(),
parse_func=parse_resources,
help=(
"Define additional resources that shall constrain the scheduling "
"analogously to --cores (see above). A resource is defined as "
"a name and an integer value. E.g. --resources mem_mb=1000. Rules can "
"use resources by defining the resource keyword, e.g. "
"resources: mem_mb=600. If now two rules require 600 of the resource "
"'mem_mb' they won't be run in parallel by the scheduler. In "
"cluster/cloud mode, this argument will also constrain the amount of "
"resources requested from the server. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)"
),
)
group_exec.add_argument(
"--set-threads",
metavar="RULE=THREADS",
nargs="+",
default=dict(),
parse_func=parse_set_threads,
help="Overwrite thread usage of rules. This allows to fine-tune workflow "
"parallelization. In particular, this is helpful to target certain cluster nodes "
"by e.g. shifting a rule to use more, or less threads than defined in the workflow. "
"Thereby, THREADS has to be a positive integer, and RULE has to be the name of the rule.",
)
group_exec.add_argument(
"--max-threads",
type=int,
help="Define a global maximum number of threads available to any rule. Rules "
"requesting more threads (via the threads keyword) will have their values "
"reduced to the maximum. This can be useful when you want to restrict the "
"maximum number of threads without modifying the workflow definition or "
"overwriting rules individually with --set-threads.",
)
group_exec.add_argument(
"--set-resources",
metavar="RULE:RESOURCE=VALUE",
nargs="+",
default=dict(),
parse_func=parse_set_resources,
help="Overwrite resource usage of rules. This allows to fine-tune workflow "
"resources. In particular, this is helpful to target certain cluster nodes "
"by e.g. defining a certain partition for a rule, or overriding a temporary directory. "
"Thereby, VALUE has to be a positive integer or a string, RULE has to be the name of the "
"rule, and RESOURCE has to be the name of the resource.",
)
group_exec.add_argument(
"--set-scatter",
metavar="NAME=SCATTERITEMS",
nargs="+",
default=dict(),
parse_func=parse_set_scatter,
help="Overwrite number of scatter items of scattergather processes. This allows to fine-tune "
"workflow parallelization. Thereby, SCATTERITEMS has to be a positive integer, and NAME has to be "
"the name of the scattergather process defined via a scattergather directive in the workflow.",
)
group_exec.add_argument(
"--set-resource-scopes",
metavar="RESOURCE=[global|local]",
nargs="+",
default=dict(),
parse_func=parse_set_resource_scope,
help="Overwrite resource scopes. A scope determines how a constraint is "
"reckoned in cluster execution. With RESOURCE=local, a constraint applied to "
"RESOURCE using --resources will be considered the limit for each group "
"submission. With RESOURCE=global, the constraint will apply across all groups "
"cumulatively. By default, only `mem_mb` and `disk_mb` are considered local, "
"all other resources are global. This may be modified in the snakefile using "
"the `resource_scopes:` directive. Note that number of threads, specified via "
"--cores, is always considered local. (See "
"https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#"
"resources-remote-execution for more info)",
)
group_exec.add_argument(
"--default-resources",
"--default-res",
nargs="*",
metavar="NAME=INT",
parse_func=DefaultResources,
help=(
"Define default values of resources for rules that do not define their own values. "
"In addition to plain integers, python expressions over inputsize are allowed (e.g. '2*input.size_mb'). "
"The inputsize is the sum of the sizes of all input files of a rule. "
"By default, Snakemake assumes a default for mem_mb, disk_mb, and tmpdir (see below). "
"This option allows to add further defaults (e.g. account and partition for slurm) or to overwrite these default values. "
"The defaults are 'mem_mb=min(max(2*input.size_mb, 1000), 8000)', "
"'disk_mb=max(2*input.size_mb, 1000)' "
"(i.e., default disk and mem usage is twice the input file size but at least 1GB), and "
"the system temporary directory (as given by $TMPDIR, $TEMP, or $TMP) is used for the tmpdir resource. "
"The tmpdir resource is automatically used by shell commands, scripts and wrappers to store temporary data (as it is "
"mirrored into $TMPDIR, $TEMP, and $TMP for the executed subprocesses). "
"If this argument is not specified at all, Snakemake just uses the tmpdir resource as outlined above."
),
)
group_exec.add_argument(
"--preemptible-rules",
nargs="*",
parse_func=set,
help=(
"Define which rules shall use a preemptible machine which can be prematurely killed by e.g. a cloud provider (also called spot instances). "
"This is currently only supported by the Google Life Sciences executor and ignored by all other executors. "
"If no rule names are provided, all rules are considered to be preemptible. "
"The "
),
)
group_exec.add_argument(
"--preemptible-retries",
type=int,
help="Number of retries that shall be made in order to finish a job from of rule that has been marked as preemptible via the --preemptible-rules setting.",
)
group_exec.add_argument(
"--config",
"-C",
nargs="*",
metavar="KEY=VALUE",
default=dict(),
parse_func=parse_config,
help=(
"Set or overwrite values in the workflow config object. "
"The workflow config object is accessible as variable config inside "
"the workflow. Default values can be set by providing a JSON file "
"(see Documentation)."
),
)
group_exec.add_argument(
"--configfile",
"--configfiles",
nargs="+",
metavar="FILE",
default=list(),
type=Path,
help=(
"Specify or overwrite the config file of the workflow (see the docs). "
"Values specified in JSON or YAML format are available in the global config "
"dictionary inside the workflow. Multiple files overwrite each other in "
"the given order. Thereby missing keys in previous config files are extended by "
"following configfiles. Note that this order also includes a config file defined "
"in the workflow definition itself (which will come first)."
),
)
group_exec.add_argument(
"--envvars",
nargs="+",
metavar="VARNAME",
parse_func=set,
default=set(),
help="Environment variables to pass to cloud jobs.",
)
group_exec.add_argument(
"--directory",
"-d",
metavar="DIR",
type=Path,
help=(
"Specify working directory (relative paths in "
"the snakefile will use this as their origin)."
),
)
group_exec.add_argument(
"--touch",
"-t",
action="store_true",
help=(
"Touch output files (mark them up to date without really "
"changing them) instead of running their commands. This is "
"used to pretend that the rules were executed, in order to "
"fool future invocations of snakemake. Fails if a file does "
"not yet exist. Note that this will only touch files that would "
"otherwise be recreated by Snakemake (e.g. because their input "
"files are newer). For enforcing a touch, combine this with "
"--force, --forceall, or --forcerun. Note however that you lose "
"the provenance information when the files have been created in "
"reality. Hence, this should be used only as a last resort."
),
)
group_exec.add_argument(
"--keep-going",
"-k",
action="store_true",
help="Go on with independent jobs if a job fails.",
)
group_exec.add_argument(
"--rerun-triggers",
nargs="+",
choices=RerunTrigger.choices(),
default=RerunTrigger.all(),
parse_func=RerunTrigger.parse_choices_set,
help="Define what triggers the rerunning of a job. By default, "
"all triggers are used, which guarantees that results are "
"consistent with the workflow code and configuration. If you "
"rather prefer the traditional way of just considering "
"file modification dates, use '--rerun-trigger mtime'.",
)
group_exec.add_argument(
"--force",
"-f",
action="store_true",
help=(
"Force the execution of the selected target or the first rule "
"regardless of already created output."
),
)
group_exec.add_argument(
"--executor",
"-e",
help="Specify a custom executor, available via an executor plugin: snakemake_executor_<name>",
choices=_get_executor_plugin_registry().plugins.keys(),
)
group_exec.add_argument(
"--forceall",
"-F",
action="store_true",
help=(
"Force the execution of the selected (or the first) rule and "
"all rules it is dependent on regardless of already created "
"output."
),
)
group_exec.add_argument(
"--forcerun",
"-R",
nargs="*",
metavar="TARGET",
parse_func=set,
default=set(),
help=(
"Force the re-execution or creation of the given rules or files."
" Use this option if you changed a rule and want to have all its "
"output in your workflow updated."
),
)
group_exec.add_argument(
"--prioritize",
"-P",
nargs="+",
metavar="TARGET",
parse_func=set,
default=set(),
help=(
"Tell the scheduler to assign creation of given targets "
"(and all their dependencies) highest priority."
),
)
group_exec.add_argument(
"--batch",
metavar="RULE=BATCH/BATCHES",
type=parse_batch,
help=(
"Only create the given BATCH of the input files of the given RULE. "
"This can be used to iteratively run parts of very large workflows. "
"Only the execution plan of the relevant part of the workflow has to "
"be calculated, thereby speeding up DAG computation. "
"It is recommended to provide the most suitable rule for batching when "
"documenting a workflow. It should be some aggregating rule that "
"would be executed only once, and has a large number of input files. "
"For example, it can be a rule that aggregates over samples."
),
)
group_exec.add_argument(
"--until",
"-U",
nargs="+",
metavar="TARGET",
parse_func=set,
default=set(),
help=(
"Runs the pipeline until it reaches the specified rules or "
"files. Only runs jobs that are dependencies of the specified "
"rule or files, does not run sibling DAGs. "
),
)
group_exec.add_argument(
"--omit-from",
"-O",
nargs="+",
metavar="TARGET",
parse_func=set,
default=set(),
help=(
"Prevent the execution or creation of the given rules or files "
"as well as any rules or files that are downstream of these targets "
"in the DAG. Also runs jobs in sibling DAGs that are independent of the "
"rules or files specified here."
),
)
group_exec.add_argument(
"--rerun-incomplete",
"--ri",
action="store_true",
help=("Re-run all jobs the output of which is recognized as incomplete."),
)
group_exec.add_argument(
"--shadow-prefix",
metavar="DIR",
help=(
"Specify a directory in which the 'shadow' directory is created. "
"If not supplied, the value is set to the '.snakemake' directory relative "
"to the working directory."
),
)
try:
import pulp
lp_solvers = pulp.list_solvers(onlyAvailable=True)
except ImportError:
# Dummy list for the case that pulp is not available
# This only happened when building docs.
lp_solvers = ["COIN_CMD"]
recommended_lp_solver = "COIN_CMD"
group_exec.add_argument(
"--scheduler",
default="greedy" if recommended_lp_solver not in lp_solvers else "ilp",
nargs="?",
choices=["ilp", "greedy"],
help=(
"Specifies if jobs are selected by a greedy algorithm or by solving an ilp. "
"The ilp scheduler aims to reduce runtime and hdd usage by best possible use of resources."
),
)
group_exec.add_argument(
"--wms-monitor",
action="store",
nargs="?",
help=(
"IP and port of workflow management system to monitor the execution of snakemake (e.g. http://127.0.0.1:5000)"
" Note that if your service requires an authorization token, you must export WMS_MONITOR_TOKEN in the environment."
),
)
group_exec.add_argument(
"--wms-monitor-arg",
nargs="*",
metavar="NAME=VALUE",
help=(
"If the workflow management service accepts extra arguments, provide."
" them in key value pairs with --wms-monitor-arg. For example, to run"
" an existing workflow using a wms monitor, you can provide the pair "
" id=12345 and the arguments will be provided to the endpoint to "
" first interact with the workflow"
),
)
group_exec.add_argument(
"--scheduler-ilp-solver",
default=recommended_lp_solver,
choices=lp_solvers,
help=("Specifies solver to be utilized when selecting ilp-scheduler."),
)
group_exec.add_argument(
"--scheduler-solver-path",
help="Set the PATH to search for scheduler solver binaries (internal use only).",
)
group_exec.add_argument(
"--conda-base-path",
help="Path of conda base installation (home of conda, mamba, activate) (internal use only).",
)
group_exec.add_argument(
"--no-subworkflows",
"--nosw",
action="store_true",
help=("Do not evaluate or execute subworkflows."),
)
group_exec.add_argument(
"--precommand",
help="Only used in case of remote execution. Command to be executed before "
"Snakemake executes each job on the remote compute node.",
)
group_group = parser.add_argument_group("GROUPING")
group_group.add_argument(
"--groups",
nargs="+",
parse_func=parse_groups,
help="Assign rules to groups (this overwrites any "
"group definitions from the workflow).",
)
group_group.add_argument(
"--group-components",
nargs="+",
parse_func=parse_group_components,
help="Set the number of connected components a group is "
"allowed to span. By default, this is 1, but this flag "
"allows to extend this. This can be used to run e.g. 3 "
"jobs of the same rule in the same group, although they "
"are not connected. It can be helpful for putting together "
"many small jobs or benefitting of shared memory setups.",
)
group_report = parser.add_argument_group("REPORTS")
group_report.add_argument(
"--report",
nargs="?",
const="report.html",
metavar="FILE",
type=Path,
help="Create an HTML report with results and statistics. "
"This can be either a .html file or a .zip file. "
"In the former case, all results are embedded into the .html (this only works for small data). "
"In the latter case, results are stored along with a file report.html in the zip archive. "
"If no filename is given, an embedded report.html is the default.",
)
group_report.add_argument(
"--report-stylesheet",
metavar="CSSFILE",
type=Path,
help="Custom stylesheet to use for report. In particular, this can be used for "
"branding the report with e.g. a custom logo, see docs.",
)
group_notebooks = parser.add_argument_group("NOTEBOOKS")
group_notebooks.add_argument(
"--draft-notebook",
metavar="TARGET",
help="Draft a skeleton notebook for the rule used to generate the given target file. This notebook "
"can then be opened in a jupyter server, executed and implemented until ready. After saving, it "
"will automatically be reused in non-interactive mode by Snakemake for subsequent jobs.",
)
group_notebooks.add_argument(
"--edit-notebook",
metavar="TARGET",
help="Interactively edit the notebook associated with the rule used to generate the given target file. "
"This will start a local jupyter notebook server. "
"Any changes to the notebook should be saved, and the server has to be stopped by "
"closing the notebook and hitting the 'Quit' button on the jupyter dashboard. "
"Afterwards, the updated notebook will be automatically stored in the path defined in the rule. "
"If the notebook is not yet present, this will create an empty draft. ",
)
group_notebooks.add_argument(
"--notebook-listen",
metavar="IP:PORT",
default="localhost:8888",
help="The IP address and PORT the notebook server used for editing the notebook (--edit-notebook) will listen on.",
)
group_utils = parser.add_argument_group("UTILITIES")
group_utils.add_argument(
"--lint",
nargs="?",
const="text",
choices=["text", "json"],
help="Perform linting on the given workflow. This will print snakemake "
"specific suggestions to improve code quality (work in progress, more lints "
"to be added in the future). If no argument is provided, plain text output is used.",
)
group_utils.add_argument(
"--generate-unit-tests",
nargs="?",
const=".tests/unit",
metavar="TESTPATH",
type=Path,
help="Automatically generate unit tests for each workflow rule. "
"This assumes that all input files of each job are already present. "
"Rules without a job with present input files will be skipped (a warning will be issued). "
"For each rule, one test case will be "
"created in the specified test folder (.tests/unit by default). After "
"successful execution, tests can be run with "
"'pytest TESTPATH'.",
)
group_utils.add_argument(
"--containerize",
action="store_true",
help="Print a Dockerfile that provides an execution environment for the workflow, including all "
"conda environments.",
)
group_utils.add_argument(
"--export-cwl",
action="store",
metavar="FILE",
help="Compile workflow to CWL and store it in given FILE.",
)
group_utils.add_argument(
"--list-rules",
"--list",
"-l",
action="store_true",
help="Show available rules in given Snakefile.",
)
group_utils.add_argument(
"--list-target-rules",
"--lt",
action="store_true",
help="Show available target rules in given Snakefile.",
)
group_utils.add_argument(
"--dag",
action="store_true",
help="Do not execute anything and print the directed "
"acyclic graph of jobs in the dot language. Recommended "
"use on Unix systems: snakemake --dag | dot | display. "
"Note print statements in your Snakefile may interfere "
"with visualization.",
)
group_utils.add_argument(
"--rulegraph",
action="store_true",
help="Do not execute anything and print the dependency graph "
"of rules in the dot language. This will be less "
"crowded than above DAG of jobs, but also show less information. "
"Note that each rule is displayed once, hence the displayed graph will be "
"cyclic if a rule appears in several steps of the workflow. "
"Use this if above option leads to a DAG that is too large. "
"Recommended use on Unix systems: snakemake --rulegraph | dot | display. "
"Note print statements in your Snakefile may interfere "
"with visualization.",
)
group_utils.add_argument(
"--filegraph",
action="store_true",
help="Do not execute anything and print the dependency graph "
"of rules with their input and output files in the dot language. "
"This is an intermediate solution between above DAG of jobs and the rule graph. "
"Note that each rule is displayed once, hence the displayed graph will be "
"cyclic if a rule appears in several steps of the workflow. "
"Use this if above option leads to a DAG that is too large. "
"Recommended use on Unix systems: snakemake --filegraph | dot | display. "
"Note print statements in your Snakefile may interfere "
"with visualization.",
)
group_utils.add_argument(
"--d3dag",
action="store_true",