Skip to content

Commit

Permalink
rmcp/session: add option to set priviledge level
Browse files Browse the repository at this point in the history
Signed-off-by: Heiko Thiery <heiko.thiery@gmail.com>
  • Loading branch information
hthiery committed Feb 29, 2024
1 parent 3a5ce6d commit fe0c186
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
11 changes: 11 additions & 0 deletions pyipmi/interfaces/ipmitool.py
Expand Up @@ -213,6 +213,15 @@ def _build_ipmitool_target(target):

return cmd

def _build_ipmitool_priv_level(self, level):
LEVELS = {
Session.PRIV_LEVEL_USER: 'USER',
Session.PRIV_LEVEL_OPERATOR: 'OPERATOR',
Session.PRIV_LEVEL_ADMINISTRATOR: 'ADMINISTRATOR'
}

return (' -L %s' % LEVELS[level])

def _build_ipmitool_cmd(self, target, lun, netfn, raw_bytes):
if not hasattr(self, '_session'):
raise RuntimeError('Session needs to be set')
Expand All @@ -222,6 +231,8 @@ def _build_ipmitool_cmd(self, target, lun, netfn, raw_bytes):
cmd += (' -H %s' % self._session.rmcp_host)
cmd += (' -p %s' % self._session.rmcp_port)

cmd += self._build_ipmitool_priv_level(self._session.priv_level)

if self._session.auth_type == Session.AUTH_TYPE_NONE:
cmd += ' -P ""'
elif self._session.auth_type == Session.AUTH_TYPE_PASSWORD:
Expand Down
9 changes: 5 additions & 4 deletions pyipmi/interfaces/rmcp.py
Expand Up @@ -448,13 +448,13 @@ def ping(self):
self._send_asf_msg(ping)
self._receive_asf_msg(AsfPong)

def _get_channel_auth_cap(self):
def _get_channel_auth_cap(self, session):
CHANNEL_NUMBER_FOR_THIS = 0xe
# get channel auth cap
req = create_request_by_name('GetChannelAuthenticationCapabilities')
req.target = self.host_target
req.channel.number = CHANNEL_NUMBER_FOR_THIS
req.privilege_level.requested = Session.PRIV_LEVEL_ADMINISTRATOR
req.privilege_level.requested = session.priv_level
rsp = self.send_and_receive(req)
check_completion_code(rsp.completion_code)
caps = ChannelAuthenticationCapabilities(rsp)
Expand Down Expand Up @@ -509,7 +509,7 @@ def establish_session(self, session):

# 1 - Get Channel Authentication Capabilities
log().debug('Get Channel Authentication Capabilities')
caps = self._get_channel_auth_cap()
caps = self._get_channel_auth_cap(session)
log().debug('%s' % caps)

# 2 - Get Session Challenge
Expand All @@ -518,6 +518,7 @@ def establish_session(self, session):
rsp = self._get_session_challenge(session)
session_challenge = rsp.challenge_string
session.sid = rsp.temporary_session_id

self._session = session

# 3 - Activate Session
Expand All @@ -529,7 +530,7 @@ def establish_session(self, session):

log().debug('Set Session Privilege Level')
# 4 - Set Session Privilege Level
self._set_session_privilege_level(Session.PRIV_LEVEL_ADMINISTRATOR)
self._set_session_privilege_level(session.priv_level)

log().debug('Session opened')

Expand Down
10 changes: 9 additions & 1 deletion pyipmi/ipmitool.py
Expand Up @@ -458,6 +458,7 @@ def usage(toplevel=False):
-I <interface> Set interface (available: rmcp, aardvark, ipmitool, ipmbdev)
-H <host> Set RMCP host
-U <user> Set RMCP user
-L <level> Set RMCP priviledge level
-P <password> Set RMCP password
-o <options> Set interface specific options (name=value, separated
by commas, see below for available options).
Expand Down Expand Up @@ -527,7 +528,7 @@ def parse_interface_options(interface_name, options):

def main():
try:
opts, args = getopt.getopt(sys.argv[1:], 't:hvVI:H:U:P:o:b:p:r:')
opts, args = getopt.getopt(sys.argv[1:], 't:hvVI:H:U:P:L:o:b:p:r:')
except getopt.GetoptError as err:
print(str(err))
usage()
Expand All @@ -540,6 +541,7 @@ def main():
rmcp_port = 623
rmcp_user = ''
rmcp_password = ''
rmcp_priv_level = None
interface_options = list()
for o, a in opts:
if o == '-v':
Expand All @@ -564,6 +566,8 @@ def main():
rmcp_user = a
elif o == '-P':
rmcp_password = a
elif o == '-L':
rmcp_priv_level = a
elif o == '-I':
interface_name = a
elif o == '-o':
Expand Down Expand Up @@ -614,6 +618,10 @@ def main():
if rmcp_host is not None:
ipmi.session.set_session_type_rmcp(rmcp_host, rmcp_port)
ipmi.session.set_auth_type_user(rmcp_user, rmcp_password)

if rmcp_priv_level is not None:
ipmi.session.set_priv_level(rmcp_priv_level)

ipmi.session.establish()

try:
Expand Down
13 changes: 13 additions & 0 deletions pyipmi/session.py
Expand Up @@ -29,6 +29,7 @@ class Session(object):

session_id = None
_interface = None
_priv_level = PRIV_LEVEL_ADMINISTRATOR
_auth_type = AUTH_TYPE_NONE
_auth_username = None
_auth_password = None
Expand Down Expand Up @@ -81,6 +82,18 @@ def serial_port(self):
def serial_baudrate(self):
return self._serial_baudrate

@property
def priv_level(self):
return self._priv_level

def set_priv_level(self, level):
LEVELS = {
'user': self.PRIV_LEVEL_USER,
'operator': self.PRIV_LEVEL_OPERATOR,
'administrator': self.PRIV_LEVEL_ADMINISTRATOR,
}
self._priv_level = LEVELS[level.lower()]

def _set_auth_type(self, auth_type):
self._auth_type = auth_type

Expand Down
11 changes: 6 additions & 5 deletions tests/interfaces/test_ipmitool.py
Expand Up @@ -64,8 +64,8 @@ def test_send_and_receive_raw_valid(self):
self._interface.send_and_receive_raw(target, 0, 0x6, b'\x01')

mock.assert_called_once_with('ipmitool -I lan -H 10.0.1.1 -p 623 '
'-U "admin" -P "secret" -t 0x20 -l 0 '
'raw 0x06 0x01 2>&1')
'-L ADMINISTRATOR -U "admin" -P "secret" '
'-t 0x20 -l 0 raw 0x06 0x01 2>&1')

def test_send_and_receive_raw_lanplus(self):
interface = Ipmitool(interface_type='lanplus')
Expand All @@ -79,8 +79,8 @@ def test_send_and_receive_raw_lanplus(self):
interface.send_and_receive_raw(target, 0, 0x6, b'\x01')

mock.assert_called_once_with('ipmitool -I lanplus -H 10.0.1.1 -p 623 '
'-U "admin" -P "secret" -t 0x20 -l 0 '
'raw 0x06 0x01 2>&1')
'-L ADMINISTRATOR -U "admin" -P "secret" '
'-t 0x20 -l 0 raw 0x06 0x01 2>&1')

def test_send_and_receive_raw_no_auth(self):
mock = MagicMock()
Expand All @@ -93,7 +93,8 @@ def test_send_and_receive_raw_no_auth(self):
self._interface.send_and_receive_raw(target, 0, 0x6, b'\x01')

mock.assert_called_once_with('ipmitool -I lan -H 10.0.1.1 -p 623 '
'-P "" -t 0x20 -l 0 raw 0x06 0x01 2>&1')
'-L ADMINISTRATOR -P "" '
'-t 0x20 -l 0 raw 0x06 0x01 2>&1')

def test_send_and_receive_raw_return_value(self):
mock = MagicMock()
Expand Down

0 comments on commit fe0c186

Please sign in to comment.