forked from nvbn/thefuck
-
Notifications
You must be signed in to change notification settings - Fork 0
/
types.py
284 lines (226 loc) · 8.9 KB
/
types.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
from imp import load_source
from subprocess import Popen, PIPE
import os
import sys
import six
from psutil import Process, TimeoutExpired
from . import logs, shells
from .conf import settings, DEFAULT_PRIORITY, ALL_ENABLED
from .exceptions import EmptyCommand
from .utils import compatibility_call
class Command(object):
"""Command that should be fixed."""
def __init__(self, script, stdout, stderr):
"""Initializes command with given values.
:type script: basestring
:type stdout: basestring
:type stderr: basestring
"""
self.script = script
self.stdout = stdout
self.stderr = stderr
@property
def script_parts(self):
if not hasattr(self, '_script_parts'):
try:
self._script_parts = shells.split_command(self.script)
except Exception:
logs.debug(u"Can't split command script {} because:\n {}".format(
self, sys.exc_info()))
self._script_parts = None
return self._script_parts
def __eq__(self, other):
if isinstance(other, Command):
return (self.script, self.stdout, self.stderr) \
== (other.script, other.stdout, other.stderr)
else:
return False
def __repr__(self):
return u'Command(script={}, stdout={}, stderr={})'.format(
self.script, self.stdout, self.stderr)
def update(self, **kwargs):
"""Returns new command with replaced fields.
:rtype: Command
"""
kwargs.setdefault('script', self.script)
kwargs.setdefault('stdout', self.stdout)
kwargs.setdefault('stderr', self.stderr)
return Command(**kwargs)
@staticmethod
def _wait_output(popen):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
:type popen: Popen
:rtype: bool
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
@staticmethod
def _prepare_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
script = script.strip()
return shells.from_shell(script)
@classmethod
def from_raw_script(cls, raw_script):
"""Creates instance of `Command` from a list of script parts.
:type raw_script: [basestring]
:rtype: Command
:raises: EmptyCommand
"""
script = cls._prepare_script(raw_script)
if not script:
raise EmptyCommand
env = dict(os.environ)
env.update(settings.env)
with logs.debug_time(u'Call: {}; with env: {};'.format(script, env)):
result = Popen(script, shell=True, stdout=PIPE, stderr=PIPE, env=env)
if cls._wait_output(result):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return cls(script, stdout, stderr)
else:
logs.debug(u'Execution timed out!')
return cls(script, None, None)
class Rule(object):
"""Rule for fixing commands."""
def __init__(self, name, match, get_new_command,
enabled_by_default, side_effect,
priority, requires_output):
"""Initializes rule with given fields.
:type name: basestring
:type match: (Command) -> bool
:type get_new_command: (Command) -> (basestring | [basestring])
:type enabled_by_default: boolean
:type side_effect: (Command, basestring) -> None
:type priority: int
:type requires_output: bool
"""
self.name = name
self.match = match
self.get_new_command = get_new_command
self.enabled_by_default = enabled_by_default
self.side_effect = side_effect
self.priority = priority
self.requires_output = requires_output
def __eq__(self, other):
if isinstance(other, Rule):
return (self.name, self.match, self.get_new_command,
self.enabled_by_default, self.side_effect,
self.priority, self.requires_output) \
== (other.name, other.match, other.get_new_command,
other.enabled_by_default, other.side_effect,
other.priority, other.requires_output)
else:
return False
def __repr__(self):
return 'Rule(name={}, match={}, get_new_command={}, ' \
'enabled_by_default={}, side_effect={}, ' \
'priority={}, requires_output)'.format(
self.name, self.match, self.get_new_command,
self.enabled_by_default, self.side_effect,
self.priority, self.requires_output)
@classmethod
def from_path(cls, path):
"""Creates rule instance from path.
:type path: pathlib.Path
:rtype: Rule
"""
name = path.name[:-3]
with logs.debug_time(u'Importing rule: {};'.format(name)):
rule_module = load_source(name, str(path))
priority = getattr(rule_module, 'priority', DEFAULT_PRIORITY)
return cls(name, rule_module.match,
rule_module.get_new_command,
getattr(rule_module, 'enabled_by_default', True),
getattr(rule_module, 'side_effect', None),
settings.priority.get(name, priority),
getattr(rule_module, 'requires_output', True))
@property
def is_enabled(self):
"""Returns `True` when rule enabled.
:rtype: bool
"""
if self.name in settings.exclude_rules:
return False
elif self.name in settings.rules:
return True
elif self.enabled_by_default and ALL_ENABLED in settings.rules:
return True
else:
return False
def is_match(self, command):
"""Returns `True` if rule matches the command.
:type command: Command
:rtype: bool
"""
script_only = command.stdout is None and command.stderr is None
if script_only and self.requires_output:
return False
try:
with logs.debug_time(u'Trying rule: {};'.format(self.name)):
if compatibility_call(self.match, command):
return True
except Exception:
logs.rule_failed(self, sys.exc_info())
def get_corrected_commands(self, command):
"""Returns generator with corrected commands.
:type command: Command
:rtype: Iterable[CorrectedCommand]
"""
new_commands = compatibility_call(self.get_new_command, command)
if not isinstance(new_commands, list):
new_commands = (new_commands,)
for n, new_command in enumerate(new_commands):
yield CorrectedCommand(script=new_command,
side_effect=self.side_effect,
priority=(n + 1) * self.priority)
class CorrectedCommand(object):
"""Corrected by rule command."""
def __init__(self, script, side_effect, priority):
"""Initializes instance with given fields.
:type script: basestring
:type side_effect: (Command, basestring) -> None
:type priority: int
"""
self.script = script
self.side_effect = side_effect
self.priority = priority
def __eq__(self, other):
"""Ignores `priority` field."""
if isinstance(other, CorrectedCommand):
return (other.script, other.side_effect) == \
(self.script, self.side_effect)
else:
return False
def __hash__(self):
return (self.script, self.side_effect).__hash__()
def __repr__(self):
return u'CorrectedCommand(script={}, side_effect={}, priority={})'.format(
self.script, self.side_effect, self.priority)
def run(self, old_cmd):
"""Runs command from rule for passed command.
:type old_cmd: Command
"""
if self.side_effect:
compatibility_call(self.side_effect, old_cmd, self.script)
if settings.alter_history:
shells.put_to_history(self.script)
# This depends on correct setting of PYTHONIOENCODING by the alias:
print(self.script)