/
run.py
129 lines (104 loc) · 4.52 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import configparser
import os
import sys
from configparser import ConfigParser
from logging import getLogger
from typing import List, Optional
import luigi
import luigi.cmdline
import luigi.retcodes
from luigi.cmdline_parser import CmdlineParser
import gokart
import gokart.slack
from gokart.object_storage import ObjectStorage
logger = getLogger(__name__)
def _read_environ():
config = luigi.configuration.get_config()
for key, value in os.environ.items():
super(ConfigParser, config).set(section=None, option=key, value=value.replace('%', '%%'))
def _check_config():
parser = luigi.configuration.LuigiConfigParser.instance()
for section in parser.sections():
try:
parser.items(section)
except configparser.InterpolationMissingOptionError as e:
raise luigi.parameter.MissingParameterException(f'Environment variable "{e.args[3]}" must be set.')
def _run_tree_info(cmdline_args, details):
with CmdlineParser.global_instance(cmdline_args) as cp:
gokart.tree_info().output().dump(gokart.make_tree_info(cp.get_task_obj(), details=details))
def _try_tree_info(cmdline_args):
with CmdlineParser.global_instance(cmdline_args):
mode = gokart.tree_info().mode
output_path = gokart.tree_info().output().path()
# do nothing if `mode` is empty.
if mode == '':
return
# output tree info and exit.
if mode == 'simple':
_run_tree_info(cmdline_args, details=False)
elif mode == 'all':
_run_tree_info(cmdline_args, details=True)
else:
raise ValueError(f'--tree-info-mode must be "simple" or "all", but "{mode}" is passed.')
logger.info(f'output tree info: {output_path}')
exit()
def _try_to_delete_unnecessary_output_file(cmdline_args: List[str]):
with CmdlineParser.global_instance(cmdline_args) as cp:
task = cp.get_task_obj() # type: gokart.TaskOnKart
if task.delete_unnecessary_output_files:
if ObjectStorage.if_object_storage_path(task.workspace_directory):
logger.info('delete-unnecessary-output-files is not support s3/gcs.')
else:
gokart.delete_local_unnecessary_outputs(task)
exit()
def _try_get_slack_api(cmdline_args: List[str]) -> Optional[gokart.slack.SlackAPI]:
with CmdlineParser.global_instance(cmdline_args):
config = gokart.slack.SlackConfig()
token = os.getenv(config.token_name, '')
channel = config.channel
to_user = config.to_user
if token and channel:
logger.info('Slack notification is activated.')
return gokart.slack.SlackAPI(token=token, channel=channel, to_user=to_user)
logger.info('Slack notification is not activated.')
return None
def _try_to_send_event_summary_to_slack(slack_api: Optional[gokart.slack.SlackAPI],
event_aggregator: gokart.slack.EventAggregator, cmdline_args: List[str]):
if slack_api is None:
# do nothing
return
options = gokart.slack.SlackConfig()
with CmdlineParser.global_instance(cmdline_args) as cp:
task = cp.get_task_obj()
tree_info = gokart.make_tree_info(task, details=True) if options.send_tree_info else 'Please add SlackConfig.send_tree_info to include tree-info'
task_name = type(task).__name__
comment = f'Report of {task_name}' + os.linesep + event_aggregator.get_summary()
content = os.linesep.join([
'===== Event List ====',
event_aggregator.get_event_list(),
os.linesep,
'==== Tree Info ====',
tree_info
])
slack_api.send_snippet(comment=comment, title='event.txt', content=content)
def run(cmdline_args=None, set_retcode=True):
cmdline_args = cmdline_args or sys.argv[1:]
if set_retcode:
luigi.retcodes.retcode.already_running = 10
luigi.retcodes.retcode.missing_data = 20
luigi.retcodes.retcode.not_run = 30
luigi.retcodes.retcode.task_failed = 40
luigi.retcodes.retcode.scheduling_error = 50
_read_environ()
_check_config()
_try_tree_info(cmdline_args)
_try_to_delete_unnecessary_output_file(cmdline_args)
gokart.testing.try_to_run_test_for_empty_data_frame(cmdline_args)
slack_api = _try_get_slack_api(cmdline_args)
event_aggregator = gokart.slack.EventAggregator()
try:
event_aggregator.set_handlers()
luigi.cmdline.luigi_run(cmdline_args)
except SystemExit as e:
_try_to_send_event_summary_to_slack(slack_api, event_aggregator, cmdline_args)
sys.exit(e.code)