Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(power): add driver for Anel ethernet power strips #412

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ACM
Allwinner
amd
Anel
api
APIs
ARCELI
Expand Down Expand Up @@ -121,6 +122,7 @@ Tizen's
TPM
tpmtool
ttyS
UDP
UI
unmuted
uncomment
Expand Down
4 changes: 2 additions & 2 deletions Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ config POWER_VARIANT
string "Power variant"
default "qemu"
help
Select a power variant from 'aviosys_8800', 'gpio', 'pduclient',
'qemu' and 'usbrelay'.
Select a power variant from 'aviosys_8800', 'anel', 'gpio',
'pduclient', 'qemu' and 'usbrelay'.
endmenu

menu "Remote Settings"
Expand Down
30 changes: 28 additions & 2 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ General settings
may be selected with ``variant``.

* ``variant``: string [required]
Select a power variant from ``aviosys_8800``, ``gpio``, ``pduclient``,
``qemu``, ``shellcmd`` and ``usbrelay``.
Select a power variant from ``aviosys_8800``, ``anel``, ``gpio``,
``pduclient``, ``qemu``, ``shellcmd`` and ``usbrelay``.

* ``remote``: section [optional]
Specify the host and ports to connect to when using a MTDA client (such as
Expand Down Expand Up @@ -236,6 +236,32 @@ Aviosys. The following settings are supported:
* ``vid``: integer [optional]
The USB vendor ID of the power outlet (defaults to ``067b``).

``anel`` driver settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``anel`` driver supports the UDP API of the Ethernet controlled power strips
from Anel. The following settings are supported:

* ``host``: string [required]
The IP or hostname of the power strip.

* ``plug``: integer [required]
The number of the plug used.

* ``user``: string [optional]
The username as configured in the web interface (defaults to ``admin``).

* ``password``: string [optional]
The password as configured in the web interface (defaults to ``amel``).

* ``port_in``: integer [optional]
The receive port of the UDP api as configured in the web interface
(defaults to ``77``).

* ``port_out``: integer [optional]
The send of the UDP api as configured in the web interface
(defaults to ``75``).

``docker`` driver settings
~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
9 changes: 9 additions & 0 deletions mtda.ini
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ time-until = login:
# ---------------------------------------------------------------------------
# Set "variant" to specify which power control device to use. Use one of:
# - aviosys_8800
# - anel
# - docker
# - gpio
# - pduclient
Expand All @@ -131,6 +132,14 @@ time-until = login:
[power]
variant=aviosys_8800

# variant=anel
# host=192.168.0.20
# plug=5
# user=admin
# password=anel
# port_in=77
# port_out=75

# variant=gpio
# gpio=gpiochip0@203
# enable=high
Expand Down
115 changes: 115 additions & 0 deletions mtda/power/anel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# ---------------------------------------------------------------------------
# Anel power strip driver for MTDA
# ---------------------------------------------------------------------------
#
# This software is a part of MTDA.
# Copyright (c) Siemens AG, 2024
#
# ---------------------------------------------------------------------------
# SPDX-License-Identifier: MIT
# ---------------------------------------------------------------------------

# System imports
import socket
from contextlib import contextmanager

# Local imports
from mtda.power.controller import PowerController


class AnelPowerController(PowerController):

def __init__(self, mtda):
self._host = None
self._plug = None
self._user = "admin"
self._password = "anel"
self._port_in = 77
self._port_out = 75
self._status = self.POWER_OFF
self.mtda = mtda

def configure(self, conf):
if 'host' in conf:
self._host = conf['host']
if 'plug' in conf:
self._plug = int(conf['plug'])
if 'user' in conf:
self._user = conf['user']
if 'password' in conf:
self.password = conf['password']
if 'port_in' in conf:
self.check_on = int(conf['port_in'])
if 'port_out' in conf:
self.check_on = int(conf['port_out'])

def probe(self):
if self._host is None:
raise ValueError("host not specified")
if self._plug is None:
raise ValueError("plug not specified")

def command(self, args):
return False

@contextmanager
def _in(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', self._port_in))
sock.settimeout(0.5)
yield sock
sock.close()

@contextmanager
def _out(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
yield sock
sock.close()

def _send(self, payload):
data = (payload + self._user + self._password).encode('latin')
with self._out() as connection:
connection.sendto(data, (self._host, self._port_out))

def _receive(self):
with self._in() as connection:
data, _ = connection.recvfrom(1024)
return data.decode('latin')

def _switch(self, state):
payload = f"Sw_{'on' if state else 'off'}{self._plug}"
self._send(payload)

try:
result = self._receive().split(':')[5+self._plug].rsplit(',', 1)[1]
except TimeoutError:
self.mtda.debug(3, "power.anel._switch(): TimeoutError")

if result == "0":
self._status = self.POWER_OFF
elif result == "1":
self._status = self.POWER_ON
else:
self._status = self.POWER_UNSURE

def on(self):
self._switch(True)
return self._status == self.POWER_ON

def off(self):
self._switch(False)
return self._status == self.POWER_OFF

def status(self):
return self._status

def toggle(self):
if self.status() == self.POWER_OFF:
self.on()
else:
self.off()
return self.status()


def instantiate(mtda):
return AnelPowerController(mtda)