Skip to content

Commit

Permalink
Merge pull request #7 from adam-trhon/feature/systemd-get-services-json
Browse files Browse the repository at this point in the history
Feature/systemd get services json
  • Loading branch information
Emantor committed Sep 2, 2020
2 parents 9cec7bc + 5c3f3f1 commit 3cfef85
Showing 1 changed file with 93 additions and 30 deletions.
123 changes: 93 additions & 30 deletions labgridhelper/linux.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,102 @@
from labgrid.protocol import CommandProtocol
import json
import re

def get_systemd_version(command):
"""Returns systemd version retrieved by parsing output of `systemd --version`
Args:
command (CommandProtocol): An instance of a Driver implementing the CommandProtocol
Returns:
int: systemd version number
"""
assert isinstance(command, CommandProtocol), "command must be a CommandProtocol"

out = command.run_check("systemctl --version")
out = out[0]

parsed = re.search(r'^systemd\s+(?P<version>\d+)\s+', out)
if not parsed:
raise ValueError("Systemd version output changed")
return int(parsed.group("version"))

def get_systemd_status(command):
"""Returns parsed output of systemd Manager's ListUnits DBus command
Args:
command (CommandProtocol): An instance of a Driver implementing the CommandProtocol
Returns:
dict: dictionary of service names to their properties
"""
assert isinstance(command, CommandProtocol), "command must be a CommandProtocol"
# TODO: Use busctl --json if systemd>239
array_notation = "a(ssssssouso)"
out = command.run_check(
"busctl call --no-pager org.freedesktop.systemd1 \
/org/freedesktop/systemd1 org.freedesktop.systemd1.Manager ListUnits"
)

out = out[0]
if array_notation not in out:
raise ValueError("Systemd ListUnits output changed")
out = out[len(array_notation):]
array_length = int(out[:out.index('"')].strip(" "))
out = out[out.index('"')+1:-1]
out = out.split('\" \"')
data = iter(out)
services = {}
for _ in range(array_length):
name = next(data)
services[name] = {}
services[name]["description"] = next(data)
services[name]["load"] = next(data)
services[name]["active"] = next(data)
services[name]["sub"] = next(data)
services[name]["follow"] = next(data)
path_and_id = next(data)
pos = path_and_id.index('"')
services[name]["path"] = path_and_id[:pos]
services[name]["id"] = int(path_and_id[pos+1:-1].strip(" "))
services[name]["type"] = path_and_id[path_and_id.rfind('"'):]
services[name]["objpath"] = next(data)

return services
def get_systemd_status_json(command):
out = command.run_check(
"busctl call --json=short --no-pager org.freedesktop.systemd1 \
/org/freedesktop/systemd1 org.freedesktop.systemd1.Manager ListUnits"
)
out = out[0]
out = json.loads(out)
if out["type"] != array_notation:
raise ValueError("Systemd ListUnits output changed")

services = {}
for record in out["data"][0]:
data = iter(record)
name = next(data)
services[name] = {}
services[name]["description"] = next(data)
services[name]["load"] = next(data)
services[name]["active"] = next(data)
services[name]["sub"] = next(data)
services[name]["follow"] = next(data)
services[name]["path"] = next(data)
services[name]["id"] = int(next(data))
services[name]["type"] = next(data)
services[name]["objpath"] = next(data)

return services

def get_systemd_status_raw(command):
out = command.run_check(
"busctl call --no-pager org.freedesktop.systemd1 \
/org/freedesktop/systemd1 org.freedesktop.systemd1.Manager ListUnits"
)

out = out[0]
if array_notation not in out:
raise ValueError("Systemd ListUnits output changed")
out = out[len(array_notation):]
array_length = int(out[:out.index('"')].strip(" "))
out = out[out.index('"')+1:-1]
out = out.split('\" \"')
data = iter(out)
services = {}
for _ in range(array_length):
name = next(data)
services[name] = {}
services[name]["description"] = next(data)
services[name]["load"] = next(data)
services[name]["active"] = next(data)
services[name]["sub"] = next(data)
services[name]["follow"] = next(data)
path_and_id = next(data)
pos = path_and_id.index('"')
services[name]["path"] = path_and_id[:pos]
services[name]["id"] = int(path_and_id[pos+1:-1].strip(" "))
services[name]["type"] = path_and_id[path_and_id.rfind('"'):]
services[name]["objpath"] = next(data)

return services

if get_systemd_version(command) > 239:
return get_systemd_status_json(command)
else:
return get_systemd_status_raw(command)


def get_commands(command, directories=None):
"""Returns the commands of a running linux system
Expand Down

0 comments on commit 3cfef85

Please sign in to comment.