#!/usr/bin/env python3
import os, sys, select, time, signal, subprocess as sp
import pyudev
### Example systemd unit file:
# ### NOTE: only suitable as a systemd --user unit!
# [Service]
# ExecStart=/usr/local/bin/notify-power
# [Install]
# ### Optional udev start rule:
# # SUBSYSTEM=="power_supply", TAG+="systemd", \
# # ENV{SYSTEMD_USER_WANTS}="notify-power.service"
def get_dbus_notify_func(**defaults):
import ctypes as ct
class sd_bus(ct.Structure): pass
class sd_bus_error(ct.Structure):
_fields_ = [('name', ct.c_char_p), ('message', ct.c_char_p), ('need_free', ct.c_int)]
class sd_bus_msg(ct.Structure): pass
lib = ct.CDLL('')
def run(call, *args, sig=None, check=True):
func = getattr(lib, call)
if sig: func.argtypes = sig
res = func(*args)
if check and res < 0: raise OSError(-res, os.strerror(-res))
return res
bus, error, reply = (
ct.POINTER(sd_bus)(), sd_bus_error(), ct.POINTER(sd_bus_msg)() )
kws, defaults = defaults, dict(
app='', replaces_id=0, icon='',
summary='', body='', actions=None, hints=None, timeout=-1 )
for k in defaults:
if k in kws: defaults[k] = kws.pop(k)
assert not kws, kws
bb = lambda v: v.encode() if isinstance(v, str) else v
def encode_array(k, v):
if not v: sig, args = [ct.c_void_p], [None]
elif k == 'actions':
sig, args = [ct.c_int, [ct.c_char_p] * len(v)], [len(v), *map(bb, v)]
elif k == 'hints':
sig, args = [ct.c_int], [len(v)]
for ak, av in v.items():
sig.extend([ct.c_char_p, ct.c_char_p]) # key, type
if isinstance(av, (str, bytes)):
av_sig, av_args = [ct.c_char_p], [b's', bb(av)]
elif isinstance(av, int): av_sig, av_args = [ct.c_int32], [b'i', av]
else: av_sig, av_args = av
else: raise ValueError(k)
return sig, args
def notify_func(
summary=None, body=None, app=None, icon=None,
replaces_id=None, actions=None, hints=None, timeout=None ):
args, kws, sig_arrays = list(), locals(), list()
for k, default in defaults.items():
v = kws.get(k)
if v is None: v = default
if k in ['actions', 'hints']:
arr_sig, arr_args = encode_array(k, v)
else: args.append(bb(v))
run( 'sd_bus_open_user', ct.byref(bus),
sig=[ct.POINTER(ct.POINTER(sd_bus))] )
run( 'sd_bus_call_method',
b'org.freedesktop.Notifications', # dst
b'/org/freedesktop/Notifications', # path
b'org.freedesktop.Notifications', # iface
b'Notify', # method
b'susssasa{sv}i', *args,
ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p,
ct.c_char_p, ct.c_uint32,
ct.c_char_p, ct.c_char_p, ct.c_char_p,
*sig_arrays, ct.c_int32 ] )
note_id = ct.c_uint32()
n = run( 'sd_bus_message_read', reply, b'u', ct.byref(note_id),
sig=[ct.POINTER(sd_bus_msg), ct.c_char_p, ct.POINTER(ct.c_uint32)] )
assert n > 0, n
finally: run('sd_bus_flush_close_unref', bus, check=False)
return note_id.value
return notify_func
class PowerMonitor:
notify_opts = dict(icon='nwn', timeout=0, app='power', hints=dict(urgency=2))
notify_cmd = None
# notify_cmd = ( 'notify-net -d'
# ' -u critical -t0 -i nwn -n power -s' ).split()
udev = udev_mon = poller = notify_func = None
def __init__(self, debounce_delay=1.0):
self.debounce_delay = debounce_delay
def __enter__(self):
return self
def __exit__(self, *err): self.close()
def __del__(self): self.close()
def open(self):
self.udev = pyudev.Context()
self.udev_mon = pyudev.Monitor.from_netlink(self.udev)
self.poller, self.poller_hooks = select.epoll(), dict()
for pipe, hook in [(self.udev_mon, self.udev_event)]:
fd = pipe.fileno()
self.poller.register(fd, select.EPOLLIN)
self.poller_hooks[fd] = hook
if self.notify_opts or not self.notify_cmd:
self.notify_func = get_dbus_notify_func(**(self.notify_opts or dict()))
self.plugged_init = self.plugged = self.ts_notify = None
self.power_percent = None
def close(self):
if self.notify_func: self.notify_func = None
if self.poller: self.poller = self.poller.close()
if self.udev_mon: self.udev_mon = None
if self.udev: self.udev = None
def run(self):
err_mask = select.EPOLLHUP | select.EPOLLERR
while True:
ts = time.monotonic()
if self.ts_notify:
poll_time = max(0, self.ts_notify - ts)
if poll_time <= 0: self.notify()
else: poll_time = -1
for fd, ev in self.poller.poll(poll_time):
if ev & err_mask != 0: return
def udev_event(self):
ev, dev = self.udev_mon.receive_device()
plugged = None
# dev_info = dict( (k, getattr(dev, k)) for k in
# 'sys_name device_path driver device_type device_node'.split() )
# print(f'--- dev [{dev.action}]: {dev_info}')
# for k, v in dev.items(): print(f' {k} {v!r}')
if dev.sys_name.startswith('ADP'):
status = dev.get('POWER_SUPPLY_ONLINE')
if not status: return
plugged = bool(int(status))
if dev.sys_name.startswith('BAT'):
if pw: self.power_percent = float(pw)
# status = dev.get('POWER_SUPPLY_STATUS')
# if not status: return
# plugged = status.lower() in ['charging', 'full']
if plugged is None or plugged == self.plugged: return
if not self.ts_notify: self.plugged_init = self.plugged
self.ts_notify = time.monotonic() + self.debounce_delay
self.plugged = plugged
def notify(self):
if self.plugged_init != self.plugged:
state_head = ( 'online' if self.plugged else
'<span color="red">UNPLUGGED</span>' )
state_head = f'Charger state: {state_head}'
state_body = f'Battery state: <b>{self.power_percent:.0f}%</b>'
if self.notify_cmd:
cmd = self.notify_cmd.copy()
cmd =, input=state_body)
if cmd.returnstatus:
print( f'ERROR: notification cmd exited'
f' with code {cmd.returnstatus}', file=sys.stderr )
if self.notify_func: self.notify_func(state_head, state_body)
self.ts_notify = None
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Notification script for power supply events.')
parser.add_argument('-t', '--debounce-delay',
type=float, metavar='seconds', default=2.0,
help='Time to wait for any follow-up changes before issuing notification.')
parser.add_argument('--print-systemd-unit', action='store_true',
help='Print example systemd unit file to stdout and exit.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if opts.print_systemd_unit:
import re
with open(__file__) as src:
echo = False
for line in iter(src.readline, ''):
if echo: print(re.sub(r'^#+\s*', '', line.strip()))
if'^### Example systemd unit file:', line):
echo = True
elif echo and not line.strip(): return
with PowerMonitor(opts.debounce_delay) as app:
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal.SIG_DFL)