Skip to content

Commit

Permalink
feat(power): add driver for Anel ethernet power strips
Browse files Browse the repository at this point in the history
Anel sells several different ethernet power strips. The strips expose an
UDP api that allows to switch the plugs on an off.

Add a simple power driver for this UDP interface.

Signed-off-by: Tobias Schaffner <tobias.schaffner@siemens.com>
  • Loading branch information
TobiasSchaffner committed Mar 12, 2024
1 parent 9f7e69b commit af00a8c
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ACM
Allwinner
amd
Anel
api
APIs
ARCELI
Expand Down Expand Up @@ -121,6 +122,7 @@ Tizen's
TPM
tpmtool
ttyS
UDP
UI
unmuted
uncomment
Expand Down
4 changes: 2 additions & 2 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ config POWER_VARIANT
string "Power variant"
default "qemu"
help
Select a power variant from 'aviosys_8800', 'gpio', 'pduclient',
'qemu' and 'usbrelay'.
Select a power variant from 'aviosys_8800', 'anel', 'gpio',
'pduclient', 'qemu' and 'usbrelay'.
endmenu

menu "Remote Settings"
Expand Down
30 changes: 28 additions & 2 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ General settings
may be selected with ``variant``.

* ``variant``: string [required]
Select a power variant from ``aviosys_8800``, ``gpio``, ``pduclient``,
``qemu``, ``shellcmd`` and ``usbrelay``.
Select a power variant from ``aviosys_8800``, ``anel``, ``gpio``,
``pduclient``, ``qemu``, ``shellcmd`` and ``usbrelay``.

* ``remote``: section [optional]
Specify the host and ports to connect to when using a MTDA client (such as
Expand Down Expand Up @@ -236,6 +236,32 @@ Aviosys. The following settings are supported:
* ``vid``: integer [optional]
The USB vendor ID of the power outlet (defaults to ``067b``).

``anel`` driver settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``anel`` driver supports the UDP API of the Ethernet controlled power strips
from Anel. The following settings are supported:

* ``host``: string [required]
The IP or hostname of the power strip.

* ``plug``: integer [required]
The number of the plug used.

* ``user``: string [optional]
The username as configured in the web interface (defaults to ``admin``).

* ``password``: string [optional]
The password as configured in the web interface (defaults to ``amel``).

* ``port_in``: integer [optional]
The receive port of the UDP api as configured in the web interface
(defaults to ``77``).

* ``port_out``: integer [optional]
The send of the UDP api as configured in the web interface
(defaults to ``75``).

``docker`` driver settings
~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
9 changes: 9 additions & 0 deletions mtda.ini
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ time-until = login:
# ---------------------------------------------------------------------------
# Set "variant" to specify which power control device to use. Use one of:
# - aviosys_8800
# - anel
# - docker
# - gpio
# - pduclient
Expand All @@ -131,6 +132,14 @@ time-until = login:
[power]
variant=aviosys_8800

# variant=anel
# host=192.168.0.20
# plug=5
# user=admin
# password=anel
# port_in=77
# port_out=75

# variant=gpio
# gpio=gpiochip0@203
# enable=high
Expand Down
115 changes: 115 additions & 0 deletions mtda/power/anel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# ---------------------------------------------------------------------------
# Anel power strip driver for MTDA
# ---------------------------------------------------------------------------
#
# This software is a part of MTDA.
# Copyright (c) Siemens AG, 2024
#
# ---------------------------------------------------------------------------
# SPDX-License-Identifier: MIT
# ---------------------------------------------------------------------------

# System imports
import socket
from contextlib import contextmanager

# Local imports
from mtda.power.controller import PowerController


class AnelPowerController(PowerController):

def __init__(self, mtda):
self._host = None
self._plug = None
self._user = "admin"
self._password = "anel"
self._port_in = 77
self._port_out = 75
self._status = self.POWER_OFF
self.mtda = mtda

def configure(self, conf):
if 'host' in conf:
self._host = conf['host']
if 'plug' in conf:
self._plug = int(conf['plug'])
if 'user' in conf:
self._user = conf['user']
if 'password' in conf:
self.password = conf['password']
if 'port_in' in conf:
self.check_on = int(conf['port_in'])
if 'port_out' in conf:
self.check_on = int(conf['port_out'])

def probe(self):
if self._host is None:
raise ValueError("host not specified")
if self._plug is None:
raise ValueError("plug not specified")

def command(self, args):
return False

@contextmanager
def _in(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', self._port_in))
sock.settimeout(0.5)
yield sock
sock.close()

@contextmanager
def _out(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
yield sock
sock.close()

def _send(self, payload):
data = (payload + self._user + self._password).encode('latin')
with self._out() as connection:
connection.sendto(data, (self._host, self._port_out))

def _receive(self):
with self._in() as connection:
data, _ = connection.recvfrom(1024)
return data.decode('latin')

def _switch(self, state):
payload = f"Sw_{'on' if state else 'off'}{self._plug}"
self._send(payload)

try:
result = self._receive().split(':')[5+self._plug].rsplit(',', 1)[1]
except TimeoutError:
self.mtda.debug(3, "power.anel._switch(): TimeoutError")

if result == "0":
self._status = self.POWER_OFF
elif result == "1":
self._status = self.POWER_ON
else:
self._status = self.POWER_UNSURE

def on(self):
self._switch(True)
return self._status == self.POWER_ON

def off(self):
self._switch(False)
return self._status == self.POWER_OFF

def status(self):
return self._status

def toggle(self):
if self.status() == self.POWER_OFF:
self.on()
else:
self.off()
return self.status()


def instantiate(mtda):
return AnelPowerController(mtda)

0 comments on commit af00a8c

Please sign in to comment.