diff --git a/lavapdu/drivers/devantech.py b/lavapdu/drivers/devantech.py new file mode 100644 index 0000000..56a08e3 --- /dev/null +++ b/lavapdu/drivers/devantech.py @@ -0,0 +1,159 @@ +#! /usr/bin/python + +# Copyright 2016 Quentin Schulz +# +# Based on PDUDriver: +# Copyright 2013 Linaro Limited +# Author Matt Hart +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +from lavapdu.drivers.driver import PDUDriver +import socket + +log = logging.getLogger(__name__) + +class DevantechBase(PDUDriver): + connection = None + port_count = 0 + + def __init__(self, hostname, settings): + self.hostname = hostname + log.debug(settings) + self.settings = settings + self.ip = settings["ip"] + self.port = settings.get("port", 17494) + log.debug("port: %d" % self.port) + self.password = settings.get("password") + + self.connect() + + super(DevantechBase, self).__init__() + + def connect(self): + self.connection = socket.create_connection((self.ip, self.port)) + if self.password: + log.debug("Attempting connection to %s with provided password." % self.hostname) + msg = '\x79' + self.password + ret = self.connection.sendall(msg) + if ret: + log.error("Failed to send message.") + raise RuntimeError("Failed to send message.") + ret = self.connection.recv(1) + if ret != '\x01': + log.error("Authentication failed.") + raise RuntimeError("Failed to authenticate. Verify your password.") + + def port_interaction(self, command, port_number): + if port_number > self.port_count: + log.error("There are only %d ports. Provide a port number lesser than %d." % (self.port_count, self.port_count)) + raise RuntimeError("There are only %d ports. Provide a port number lesser than %d." % (self.port_count, self.port_count)) + + if command == "on": + msg = '\x20' + elif command == "off": + msg = '\x21' + else: + log.error("Unknown command %s." % (command)) + return + msg += chr(port_number) + msg += '\x00' + log.debug("Attempting control: %s port: %d hostname: %s." % (command, port_number, self.hostname)) + ret = self.connection.sendall(msg) + if ret: + log.error("Failed to send message.") + raise RuntimeError("Failed to send message.") + ret = self.connection.recv(1) + if ret != '\x00': + log.error("Failed to send %s command on port %d of %s." % (command, port_number, self.hostname)) + raise RuntimeError("Failed to send %s command on port %d of %s." % (command, port_number, self.hostname)) + + def _close_connection(self): + # Logout + log.debug("Closing connection.") + if self.password: + log.debug("Attempting to logout.") + ret = self.connection.sendall('\x7B') + if ret: + log.error("Failed to send message.") + raise RuntimeError("Failed to send message.") + ret = self.connection.recv(1) + if ret != '\x00': + log.error("Failed to logout of %s." % self.hostname) + raise RuntimeError("Failed to logout of %s." % self.hostname) + self.connection.close() + + def _cleanup(self): + self._close_connection() + + def _bombout(self): + self._close_connection() + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + return False + +class DevantechETH002(DevantechBase): + port_count = 2 + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + if drivername == "devantech_eth002": + return True + return False + +class DevantechETH0621(DevantechBase): + port_count = 2 + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + if drivername == "devantech_eth0621": + return True + return False + +class DevantechETH484(DevantechBase): + port_count = 4 + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + if drivername == "devantech_eth484": + return True + return False + +class DevantechETH008(DevantechBase): + port_count = 8 + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + if drivername == "devantech_eth008": + return True + return False + +class DevantechETH8020(DevantechBase): + port_count = 20 + + @classmethod + def accepts(cls, drivername): + log.debug(drivername) + if drivername == "devantech_eth8020": + return True + return False diff --git a/lavapdu/drivers/strategies.py b/lavapdu/drivers/strategies.py index 10512b1..29dc44f 100644 --- a/lavapdu/drivers/strategies.py +++ b/lavapdu/drivers/strategies.py @@ -29,6 +29,11 @@ from lavapdu.drivers.localcmdline import LocalCmdline from lavapdu.drivers.ip9258 import IP9258 from lavapdu.drivers.sainsmart import Sainsmart +from lavapdu.drivers.devantech import DevantechETH002 +from lavapdu.drivers.devantech import DevantechETH0621 +from lavapdu.drivers.devantech import DevantechETH484 +from lavapdu.drivers.devantech import DevantechETH008 +from lavapdu.drivers.devantech import DevantechETH8020 assert ACME assert APC7932 @@ -40,3 +45,8 @@ assert Ubiquity6Port assert IP9258 assert Sainsmart +assert DevantechETH002 +assert DevantechETH0621 +assert DevantechETH484 +assert DevantechETH008 +assert DevantechETH8020