Skip to content

Commit

Permalink
Clean up and blackify nt.py
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Feb 1, 2021
1 parent 03672ed commit b5a9cdd
Showing 1 changed file with 95 additions and 77 deletions.
172 changes: 95 additions & 77 deletions src/shellingham/nt.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
"""
Get name and full path of most recent ancestor process that is a shell.
# Code based on the winappdbg project http://winappdbg.sourceforge.net/
# (BSD License) - adapted from Celery by Dan Ryan (dan@danryan.co)
# https://github.com/celery/celery/blob/2.5-archived/celery/concurrency/processes/_win.py
More Credits: https://stackoverflow.com/a/65955496/33264
"""
import contextlib
import ctypes
import ctypes.wintypes
import os

from ctypes.wintypes import (
BOOL,
CHAR,
DWORD,
HANDLE,
LONG,
LPWSTR,
MAX_PATH,
PDWORD,
ULONG,
)

from shellingham._core import SHELL_NAMES

k32 = ctypes.windll.kernel32

INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value
INVALID_HANDLE_VALUE = HANDLE(-1).value
ERROR_NO_MORE_FILES = 18
ERROR_INSUFFICIENT_BUFFER = 122
TH32CS_SNAPPROCESS = 2
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000


kernel32 = ctypes.windll.kernel32


def _check_handle(error_val=0):
def check(ret, func, args):
if ret == error_val:
Expand All @@ -44,101 +50,113 @@ def check(ret, func, args):

class ProcessEntry32(ctypes.Structure):
_fields_ = (
('dwSize', ctypes.wintypes.DWORD),
('cntUsage', ctypes.wintypes.DWORD),
('th32ProcessID', ctypes.wintypes.DWORD),
('th32DefaultHeapID', ctypes.POINTER(ctypes.wintypes.ULONG)),
('th32ModuleID', ctypes.wintypes.DWORD),
('cntThreads', ctypes.wintypes.DWORD),
('th32ParentProcessID', ctypes.wintypes.DWORD),
('pcPriClassBase', ctypes.wintypes.LONG),
('dwFlags', ctypes.wintypes.DWORD),
('szExeFile', ctypes.wintypes.CHAR * ctypes.wintypes.MAX_PATH),
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ProcessID", DWORD),
("th32DefaultHeapID", ctypes.POINTER(ULONG)),
("th32ModuleID", DWORD),
("cntThreads", DWORD),
("th32ParentProcessID", DWORD),
("pcPriClassBase", LONG),
("dwFlags", DWORD),
("szExeFile", CHAR * MAX_PATH),
)


k32.CloseHandle.argtypes = \
(ctypes.wintypes.HANDLE,)
k32.CloseHandle.restype = ctypes.wintypes.BOOL
kernel32.CloseHandle.argtypes = [HANDLE]
kernel32.CloseHandle.restype = BOOL

k32.CreateToolhelp32Snapshot.argtypes = \
(ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)
k32.CreateToolhelp32Snapshot.restype = ctypes.wintypes.HANDLE
k32.CreateToolhelp32Snapshot.errcheck = _check_handle(INVALID_HANDLE_VALUE)
kernel32.CreateToolhelp32Snapshot.argtypes = [DWORD, DWORD]
kernel32.CreateToolhelp32Snapshot.restype = HANDLE
kernel32.CreateToolhelp32Snapshot.errcheck = _check_handle( # type: ignore
INVALID_HANDLE_VALUE,
)

k32.Process32First.argtypes = \
(ctypes.wintypes.HANDLE, ctypes.POINTER(ProcessEntry32))
k32.Process32First.restype = ctypes.wintypes.BOOL
k32.Process32First.errcheck = _check_expected(ERROR_NO_MORE_FILES)
kernel32.Process32First.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)]
kernel32.Process32First.restype = BOOL
kernel32.Process32First.errcheck = _check_expected( # type: ignore
ERROR_NO_MORE_FILES,
)

k32.Process32Next.argtypes = \
(ctypes.wintypes.HANDLE, ctypes.POINTER(ProcessEntry32))
k32.Process32Next.restype = ctypes.wintypes.BOOL
k32.Process32Next.errcheck = _check_expected(ERROR_NO_MORE_FILES)
kernel32.Process32Next.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)]
kernel32.Process32Next.restype = BOOL
kernel32.Process32Next.errcheck = _check_expected( # type: ignore
ERROR_NO_MORE_FILES,
)

k32.GetCurrentProcessId.argtypes = ()
k32.GetCurrentProcessId.restype = ctypes.wintypes.DWORD
kernel32.GetCurrentProcessId.argtypes = []
kernel32.GetCurrentProcessId.restype = DWORD

k32.OpenProcess.argtypes = \
(ctypes.wintypes.DWORD, ctypes.wintypes.BOOL, ctypes.wintypes.DWORD)
k32.OpenProcess.restype = ctypes.wintypes.HANDLE
k32.OpenProcess.errcheck = _check_handle(INVALID_HANDLE_VALUE)
kernel32.OpenProcess.argtypes = [DWORD, BOOL, DWORD]
kernel32.OpenProcess.restype = HANDLE
kernel32.OpenProcess.errcheck = _check_handle( # type: ignore
INVALID_HANDLE_VALUE,
)

k32.QueryFullProcessImageNameW.argtypes = \
(ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.LPWSTR,
ctypes.wintypes.PDWORD)
k32.QueryFullProcessImageNameW.restype = ctypes.wintypes.BOOL
k32.QueryFullProcessImageNameW.errcheck = _check_expected(ERROR_INSUFFICIENT_BUFFER)
kernel32.QueryFullProcessImageNameW.argtypes = [HANDLE, DWORD, LPWSTR, PDWORD]
kernel32.QueryFullProcessImageNameW.restype = BOOL
kernel32.QueryFullProcessImageNameW.errcheck = _check_expected( # type: ignore
ERROR_INSUFFICIENT_BUFFER,
)


@contextlib.contextmanager
def Win32Handle(handle):
def _handle(f, *args, **kwargs):
handle = f(*args, **kwargs)
try:
yield handle
finally:
k32.CloseHandle(handle)
kernel32.CloseHandle(handle)


def enum_processes():
with Win32Handle(k32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)) as snap:
def _iter_processes():
f = kernel32.CreateToolhelp32Snapshot
with _handle(f, TH32CS_SNAPPROCESS, 0) as snap:
entry = ProcessEntry32()
entry.dwSize = ctypes.sizeof(entry)
ret = k32.Process32First(snap, entry)
ret = kernel32.Process32First(snap, entry)
while ret:
yield entry
ret = k32.Process32Next(snap, entry)
ret = kernel32.Process32Next(snap, entry)


def get_full_path(proch):
size = ctypes.wintypes.DWORD(ctypes.wintypes.MAX_PATH)
def _get_full_path(proch):
size = DWORD(MAX_PATH)
while True:
path_buff = ctypes.create_unicode_buffer('', size.value)
if k32.QueryFullProcessImageNameW(proch, 0, path_buff, size):
path_buff = ctypes.create_unicode_buffer("", size.value)
if kernel32.QueryFullProcessImageNameW(proch, 0, path_buff, size):
return path_buff.value
size.value *= 2


SHELLS = frozenset((_.encode() + b".exe" for _ in SHELL_NAMES))


def get_shell(pid=None, max_depth=6):
proc_map = {
proc.th32ProcessID: (proc.th32ParentProcessID, proc.szExeFile)
for proc in enum_processes()
for proc in _iter_processes()
}
if not pid:
pid = proc_map[k32.GetCurrentProcessId()][0]
proc = proc_map[pid]
depth = 0
while proc:
depth += 1

ppid, name = proc

if name in SHELLS:
with Win32Handle(k32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0,
pid)) as proch:
return (name.decode(), get_full_path(proch))
pid, proc = ppid, proc_map.get(ppid)
if depth > max_depth:
return None
pid = pid or os.getpid()

for _ in range(0, max_depth + 1):
try:
ppid, executable = proc_map[pid]
except KeyError: # No such process? Give up.
break

# The executable name would be encoded with the current code page if
# we're in ANSI mode (usually). Try to decode it into str/unicode,
# replacing invalid characters to be safe (not thoeratically necessary,
# I think). Note that we need to use 'mbcs' instead of encoding
# settings from sys because this is from the Windows API, not Python
# internals (which those settings reflect). (pypa/pipenv#3382)
if isinstance(executable, bytes):
executable = executable.decode("mbcs", "replace")

if executable.rpartition(".")[0].lower() not in SHELL_NAMES:
pid = ppid
continue

key = PROCESS_QUERY_LIMITED_INFORMATION
with _handle(kernel32.OpenProcess, key, 0, pid) as proch:
return (executable, _get_full_path(proch))

return None

0 comments on commit b5a9cdd

Please sign in to comment.