Skip to content

Commit

Permalink
Blacify posix
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Feb 1, 2021
1 parent ad9eef2 commit 03672ed
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 37 deletions.
25 changes: 11 additions & 14 deletions src/shellingham/posix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@ def _get_process_mapping():
except EnvironmentError:
continue
return mapping
raise ShellDetectionFailure('compatible proc fs or ps utility is required')
raise ShellDetectionFailure("compatible proc fs or ps utility is required")


def _iter_process_args(mapping, pid, max_depth):
"""Iterator to traverse up the tree, yielding each process's argument list.
"""
"""Traverse up the tree and yield each process's argument list."""
for _ in range(max_depth):
try:
proc = mapping[pid]
except KeyError: # We've reached the root process. Give up.
except KeyError: # We've reached the root process. Give up.
break
if proc.args: # Persumably the process should always have a name?
if proc.args: # Persumably the process should always have a name?
yield proc.args
pid = proc.ppid # Go up one level.
pid = proc.ppid # Go up one level.


def _get_login_shell(proc_cmd):
"""Form shell information from the SHELL environment variable if possible.
"""
login_shell = os.environ.get('SHELL', '')
"""Form shell information from SHELL environ if possible."""
login_shell = os.environ.get("SHELL", "")
if login_shell:
proc_cmd = login_shell
else:
Expand All @@ -45,7 +43,7 @@ def _get_login_shell(proc_cmd):


_INTERPRETER_SHELL_NAMES = [
(re.compile(r'^python(\d+(\.\d+)?)?$'), {'xonsh'}),
(re.compile(r"^python(\d+(\.\d+)?)?$"), {"xonsh"}),
]


Expand All @@ -70,10 +68,10 @@ def _get_interpreter_shell(proc_name, proc_args):


def _get_shell(cmd, *args):
if cmd.startswith('-'): # Login shell! Let's use this.
if cmd.startswith("-"): # Login shell! Let's use this.
return _get_login_shell(cmd)
name = os.path.basename(cmd).lower()
if name in SHELL_NAMES: # Command looks like a shell.
if name in SHELL_NAMES: # Command looks like a shell.
return (name, cmd)
shell = _get_interpreter_shell(name, args)
if shell:
Expand All @@ -82,8 +80,7 @@ def _get_shell(cmd, *args):


def get_shell(pid=None, max_depth=6):
"""Get the shell that the supplied pid or os.getpid() is running in.
"""
"""Get the shell that the supplied pid or os.getpid() is running in."""
pid = str(pid or os.getpid())
mapping = _get_process_mapping()
for proc_args in _iter_process_args(mapping, pid, max_depth):
Expand Down
2 changes: 1 addition & 1 deletion src/shellingham/posix/_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import collections

Process = collections.namedtuple('Process', 'args pid ppid')
Process = collections.namedtuple("Process", "args pid ppid")
25 changes: 12 additions & 13 deletions src/shellingham/posix/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
STAT_PPID = 3
STAT_TTY = 6

STAT_PATTERN = re.compile(r'\(.+\)|\S+')
STAT_PATTERN = re.compile(r"\(.+\)|\S+")


def detect_proc():
Expand All @@ -22,42 +22,41 @@ def detect_proc():
* `status`: BSD-style, i.e. ``/proc/{pid}/status``.
"""
pid = os.getpid()
for name in ('stat', 'status'):
if os.path.exists(os.path.join('/proc', str(pid), name)):
for name in ("stat", "status"):
if os.path.exists(os.path.join("/proc", str(pid), name)):
return name
raise ProcFormatError('unsupported proc format')
raise ProcFormatError("unsupported proc format")


def _get_stat(pid, name):
path = os.path.join('/proc', str(pid), name)
with io.open(path, encoding='ascii', errors='replace') as f:
path = os.path.join("/proc", str(pid), name)
with io.open(path, encoding="ascii", errors="replace") as f:
# We only care about TTY and PPID -- all numbers.
parts = STAT_PATTERN.findall(f.read())
return parts[STAT_TTY], parts[STAT_PPID]


def _get_cmdline(pid):
path = os.path.join('/proc', str(pid), 'cmdline')
encoding = sys.getfilesystemencoding() or 'utf-8'
with io.open(path, encoding=encoding, errors='replace') as f:
path = os.path.join("/proc", str(pid), "cmdline")
encoding = sys.getfilesystemencoding() or "utf-8"
with io.open(path, encoding=encoding, errors="replace") as f:
# XXX: Command line arguments can be arbitrary byte sequences, not
# necessarily decodable. For Shellingham's purpose, however, we don't
# care. (pypa/pipenv#2820)
# cmdline appends an extra NULL at the end, hence the [:-1].
return tuple(f.read().split('\0')[:-1])
return tuple(f.read().split("\0")[:-1])


class ProcFormatError(EnvironmentError):
pass


def get_process_mapping():
"""Try to look up the process tree via the /proc interface.
"""
"""Try to look up the process tree via the /proc interface."""
stat_name = detect_proc()
self_tty = _get_stat(os.getpid(), stat_name)[0]
processes = {}
for pid in os.listdir('/proc'):
for pid in os.listdir("/proc"):
if not pid.isdigit():
continue
try:
Expand Down
16 changes: 7 additions & 9 deletions src/shellingham/posix/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ class PsNotAvailable(EnvironmentError):


def get_process_mapping():
"""Try to look up the process tree via the output of `ps`.
"""
"""Try to look up the process tree via the output of `ps`."""
try:
output = subprocess.check_output([
'ps', '-ww', '-o', 'pid=', '-o', 'ppid=', '-o', 'args=',
])
except OSError as e: # Python 2-compatible FileNotFoundError.
cmd = ["ps", "-ww", "-o", "pid=", "-o", "ppid=", "-o", "args="]
output = subprocess.check_output(cmd)
except OSError as e: # Python 2-compatible FileNotFoundError.
if e.errno != errno.ENOENT:
raise
raise PsNotAvailable('ps not found')
raise PsNotAvailable("ps not found")
except subprocess.CalledProcessError as e:
# `ps` can return 1 if the process list is completely empty.
# (sarugaku/shellingham#15)
Expand All @@ -30,15 +28,15 @@ def get_process_mapping():
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
output = output.decode(encoding)
processes = {}
for line in output.split('\n'):
for line in output.split("\n"):
try:
pid, ppid, args = line.strip().split(None, 2)
# XXX: This is not right, but we are really out of options.
# ps does not offer a sane way to decode the argument display,
# and this is "Good Enough" for obtaining shell names. Hopefully
# people don't name their shell with a space, or have something
# like "/usr/bin/xonsh is uber". (sarugaku/shellingham#14)
args = tuple(a.strip() for a in args.split(' '))
args = tuple(a.strip() for a in args.split(" "))
except ValueError:
continue
processes[pid] = Process(args=args, pid=pid, ppid=ppid)
Expand Down

0 comments on commit 03672ed

Please sign in to comment.