-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathdriver.py
More file actions
103 lines (82 loc) · 3.26 KB
/
driver.py
File metadata and controls
103 lines (82 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python3
# Copyright 2013 Linaro Limited
# Author Matt Hart <matthew.hart@linaro.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import logging
import os
from importlib import metadata
log = logging.getLogger("pdud.drivers")
def get_named_entry_point(group, name):
eps = metadata.entry_points()
matches = list(eps.select(group=group, name=name))
if len(matches) > 1:
raise RuntimeError(f"Multiple entry points for {group} under {name}")
return matches[0] if matches else None
class PDUDriver(object):
connection = None
hostname = ""
def __init__(self):
super(PDUDriver, self).__init__()
@classmethod
def select(cls, drivername):
ep = get_named_entry_point('pdudaemon.driver', drivername)
if ep:
# Not clear why a driver would reject the driver
# it is registered for but check anyway:
c = ep.load()
if not c.accepts(drivername):
raise Exception('pdudaemon.driver entry_point {} did not accept configuration'.format(c))
return c
candidates = cls.__subclasses__() # pylint: disable=no-member
for subc in cls.__subclasses__(): # pylint: disable=no-member
candidates = candidates + (subc.__subclasses__())
for subsubc in subc.__subclasses__():
candidates = candidates + (subsubc.__subclasses__())
willing = [c for c in candidates if c.accepts(drivername)]
if len(willing) == 0:
log.error("No driver accepted the configuration '%s'", drivername)
os._exit(1)
log.debug("%s accepted the request", willing[0])
return willing[0]
def handle(self, request, port_number):
log.debug("Driving PDU hostname: %s "
"PORT: %s REQUEST: %s",
self.hostname, port_number, request)
if request == "on":
self.port_on(port_number)
elif request == "off":
self.port_off(port_number)
else:
log.debug("Unknown request to handle - oops")
raise UnknownCommandException(
"Driver doesn't know how to %s " % request
)
self._cleanup()
def port_on(self, port_number):
self.port_interaction("on", port_number)
def port_off(self, port_number):
self.port_interaction("off", port_number)
def port_interaction(self, command, port_number):
pass
def _bombout(self):
pass
def _cleanup(self):
pass
class UnknownCommandException(Exception):
pass
class FailedRequestException(Exception):
pass