/
execute.py
109 lines (96 loc) · 4.47 KB
/
execute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# ----------------------------------------------------------------------
# rainbow, a terminal colorizer - https://github.com/nicoulaj/rainbow
# copyright (c) 2010-2018 rainbow contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------
import errno
import os
import pty
import re
import signal
import subprocess
import sys
from select import select
from rainbow.ansi import ANSI_RESET_ALL
from rainbow.transformer import IdentityTransformer
NEW_LINE = re.compile("\n|\r\n")
class ExecuteCommand(object):
def __init__(self,
args,
stdout_transformer=IdentityTransformer(),
stderr_transformer=IdentityTransformer(),
encoding=sys.getdefaultencoding()):
self.args = args
self.stdout_transformer = stdout_transformer
self.stderr_transformer = stderr_transformer
self.encoding = encoding
def encode(self, string):
return string.encode(self.encoding, 'replace')
def decode(self, bytes):
return bytes.decode(self.encoding, 'replace')
def run(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stderr.fileno()
in_master, in_slave = pty.openpty() if sys.stdin.isatty() else os.pipe()
out_master, out_slave = pty.openpty() if sys.stdout.isatty() else os.pipe()
err_master, err_slave = pty.openpty() if sys.stderr.isatty() else os.pipe()
p = subprocess.Popen(args=self.args, stdin=in_master, stdout=out_slave, stderr=err_slave)
readables = [stdin_fd, out_master, err_master]
writables = {stdin_fd: in_slave, out_master: stdout_fd, err_master: stderr_fd}
buffers = {out_master: '', err_master: ''}
transformers = {out_master: self.stdout_transformer, err_master: self.stderr_transformer}
try:
os.close(out_slave)
os.close(err_slave)
while True:
for read_fd in select(readables, [], [])[0]:
try:
data = os.read(read_fd, 4096)
except OSError as e:
if e.errno != errno.EIO:
raise # no cover
data = None
write_fd = writables[read_fd]
if data:
data_str = self.decode(data)
if read_fd == stdin_fd:
os.write(write_fd, self.encode(data_str))
for read_fd in buffers:
buffers[read_fd] = ''
else:
lines = NEW_LINE.split(buffers[read_fd] + data_str)
transformer = transformers[read_fd]
if lines[0] and buffers[read_fd]:
os.write(write_fd, self.encode('\r'))
for line in lines[:-1]:
os.write(write_fd, self.encode(transformer.transform(line) + '\n'))
if lines[-1]:
os.write(write_fd, self.encode(transformer.transform(lines[-1])))
buffers[read_fd] = lines[-1]
else:
buffers[read_fd] = ''
else:
if read_fd == stdin_fd:
os.close(write_fd)
else:
os.write(write_fd, self.encode(ANSI_RESET_ALL))
os.close(read_fd)
readables.remove(read_fd)
if out_master not in readables and err_master not in readables:
return p.wait()
except KeyboardInterrupt:
os.kill(p.pid, signal.SIGINT)
return p.wait()