#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import itertools as it, operator as op, functools as ft
import os, sys, re, errno, collections as cs, pathlib as pl
def systemd_state_dump(machines=False):
'Fetch systemd state dump via libsystemd sd-bus API.'
import os, ctypes as ct
class sd_bus(ct.Structure): pass
class sd_bus_error(ct.Structure):
_fields_ = [('name', ct.c_char_p), ('message', ct.c_char_p), ('need_free', ct.c_int)]
class sd_bus_msg(ct.Structure): pass
lib = ct.CDLL('')
def run(call, *args, sig=None, check=True):
func = getattr(lib, call)
if sig: func.argtypes = sig
res = func(*args)
if check and res < 0: raise OSError(-res, os.strerror(-res))
return res
bus = ct.POINTER(sd_bus)()
run( 'sd_bus_open_system', ct.byref(bus),
sig=[ct.POINTER(ct.POINTER(sd_bus))] )
error = sd_bus_error()
reply = ct.POINTER(sd_bus_msg)()
machine_names = list()
if machines:
run( 'sd_bus_call_method',
ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p,
ct.c_char_p ] )
machine_info = list(ct.c_char_p() for n in range(4))
machine_info_refs = list(map(ct.byref, machine_info))
run( 'sd_bus_message_enter_container',
reply, b'a', b'(ssso)',
sig=[ct.POINTER(sd_bus_msg), ct.c_char, ct.c_char_p] )
while True:
n = run( 'sd_bus_message_read',
reply, b'(ssso)', *machine_info_refs,
sig=[ ct.POINTER(sd_bus_msg), ct.c_char_p,
ct.POINTER(ct.c_char_p), ct.POINTER(ct.c_char_p),
ct.POINTER(ct.c_char_p), ct.POINTER(ct.c_char_p) ] )
if not n: break
name, cls, unit, path = list(v.value.decode() for v in machine_info)
def get_state_dump():
run( 'sd_bus_call_method',
ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p,
ct.c_char_p ] )
dump = ct.c_char_p()
run( 'sd_bus_message_read', reply, b's', ct.byref(dump),
sig=[ct.POINTER(sd_bus_msg), ct.c_char_p, ct.POINTER(ct.c_char_p)] )
return dump.value.decode()
def bus_close():
run('sd_bus_flush_close_unref', bus, check=False)
state = dict(_host=get_state_dump())
for name in machine_names:
if name.startswith('.'): continue
run( 'sd_bus_open_system_machine',
ct.byref(bus), name.encode(),
sig=[ct.POINTER(ct.POINTER(sd_bus)), ct.c_char_p] )
except OSError as err:
if err.errno == errno.ENOENT: continue # old machine without such link
raise OSError(err.errno, f'machine={name} :: {str(err)}') from None
state[name] = get_state_dump()
return state
class SDProps(cs.UserDict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def fix_camelcase(self):
for k in list(self.keys()):
k_new = k
if ' ' in k: k_new = k_new.replace(' ', '_')
elif not (k.islower() or k.isupper()):
k_new = re.sub( r'([A-Z]{2,})([A-Z][a-z])',
lambda m: ('_''_', k_new )
k_new = re.sub(r'[A-Z]+', lambda m: '_', k_new)
k_new = re.sub(r'_+', '_', k_new.replace('/', '_').strip('_').lower())
if k_new == k: continue
assert k_new not in self, [k_new, self.get(k_new)]
self[k_new] = self.pop(k)
for k, v in list(self.items()):
if len(v) == 1 and isinstance(v[0], dict):
def __getattr__(self, k):
if k.endswith('_N'):
k = k[:-2]
if k not in self: return ''
v_list = self[k]
assert len(v_list) == 1, v_list
return v_list[0]
def systemd_state_dump_parse(state_map_str, prefix_fmt='{name}/', **parse_kws):
state = dict()
for name, state_str in state_map_str.items():
machine_state = ssd_parse_machine(state_str, **parse_kws)
prefix = prefix_fmt.format(name=name)
for unit, props in machine_state.items():
unit_key = prefix + unit
assert unit_key not in state, [name, unit, unit_path]
state[unit_key] = props
return state
def ssd_parse_machine(state_str, filter_prop=None, filter_unit=None):
state_lines = iter(state_str.splitlines())
state_lines = it.dropwhile(lambda line: line.startswith('Timestamp '), state_lines)
if not filter_prop: filter_prop = lambda k: True
if not filter_unit: filter_unit = lambda u: True
state, unit, props, prop_sub = dict(), None, None, None
for line in it.chain(state_lines, ['-> Unit END:']):
line = re.sub(r'^Limit\t(\S+)', r'\tLimit\1', line) # issue-9846 in systemd-239
if prop_sub is not None:
k, indent, v = prop_sub
m = ('^{}\s+(.*)(?::\s*|=)(.*)$'.format(re.escape(indent)), line)
if filter_prop(k) else None )
if m:
sk, sv = m.groups()
assert sk not in prop_sub, [prop_sub, sk]
if filter_prop(sk): v[sk] = sv
if k: props.setdefault(k, list()).append(v)
prop_sub = None
if props is not None:
m ='^(\s+)-> (Exec\w+):$', line)
if m:
prop_sub =,, dict()
m ='^(\s+)-> Job (\d+):$', line)
if m:
prop_sub = None,, dict() # discarded later
m ='^\s+(.*?)(?::\s*|=)(.*)$', line)
if m:
k, v = m.groups()
if filter_prop(k): props.setdefault(k, list()).append(v)
if prop_sub:
prop_sub, (k, indent, v) = None, prop_sub
if filter_prop(k): props.setdefault(k, list()).append(v)
if unit:
assert unit not in state, unit
if filter_unit(unit): state[unit] = SDProps(props)
props = None
if props is None:
m ='^-> Job (\d+):$', line)
if m: unit, props = None, dict() # discarded later
if props is None:
m ='^-> Unit (.*):$', line)
if not m: raise ValueError(repr(line))
unit, props =, dict()
unit = re.sub( br'\\x([\da-f]{2})',
lambda m: chr(int(, 16)).encode(),
unit.encode() ).decode()
return state
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='systemd state tracking tool.'
' Use "systemd-analyze dump" to get raw state dump.')
parser.add_argument('-u', '--unit', metavar='name',
help='Pretty-print state for specified unit name and exit.')
parser.add_argument('-m', '--machines', action='store_true',
help='Query and print units in each registered'
' machine running on this host, prefixed by "{name}/".'
' Connecting to system bus of these requires root uid.')
parser.add_argument('-f', '--file', metavar='file',
help='Process specified "systemd-analyze dump" file instead of getting it via dbus.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if not opts.unit:
# These are purely for performance optimization purposes.
# On hosts with containers, dumps can get pretty large and slow to parse.
filter_prop_keys = { 'Type', 'Source Path',
'Transient', 'Unit Load State', 'Unit Active State', 'TriggeredBy' }
filters = dict(
filter_prop=lambda k,_set=filter_prop_keys: k in _set,
filter_unit=lambda u: not'\.(device|slice|scope)$', u) )
else: filters = dict()
if not opts.file:
state_map_str = systemd_state_dump(machines=opts.machines)
else: state_map_str = dict(_host=pl.Path(opts.file).read_text())
state = systemd_state_dump_parse(
state_map_str, prefix_fmt='{name}/' if opts.machines else '', **filters )
if opts.unit:
unit = state.get(opts.unit)
if not unit: unit = state.get(opts.unit + '.service')
if not unit: parser.error(f'Unit {opts.unit!r} not found in systemd state dump')
import pprint
for unit, s in sorted(state.items()):
if'\.(device|slice|scope)$', unit): continue
# Skip any known auto-started/stopped and transient units
# Does not skip: Unit=X from timers/sockets
# WARNING: be sure to add all props used there to filter_prop_keys above!
if unit.endswith('.mount') and f'{unit[:-6]}.automount' in state: continue
if unit.endswith('.service') and (
f'{unit[:-8]}.timer' in state or f'{unit[:-8]}.socket' in state ): continue
if s.type_N == 'dbus': continue
if s.source_path_N == '/proc/self/mountinfo': continue
if s.transient_N == 'yes': continue
if'(/|^)user(-runtime-dir)?@\d+\.service$', unit): continue
if ('(/|^)[^/@]+@[^/]+\.service$', unit)
and s.triggered_by_N.endswith('.socket') ): continue
if ( s.unit_load_state not in ['error', 'not-found']
and s.unit_active_state == 'active' ): print(unit)
if __name__ == '__main__': sys.exit(main())