Skip to content

Commit

Permalink
Convert to and from bytes for telnet
Browse files Browse the repository at this point in the history
For #2.
  • Loading branch information
rshipp committed Apr 19, 2019
1 parent d7d52c1 commit 580bbea
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 162 deletions.
120 changes: 60 additions & 60 deletions nut2.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __del__(self):
# Try to disconnect cleanly when class is deleted.
if self._srv_handler:
try:
self._srv_handler.write("LOGOUT\n")
self._srv_handler.write(b"LOGOUT\n")
self._srv_handler.close()
except (telnetlib.socket.error, AttributeError):
# The socket is already disconnected.
Expand All @@ -98,14 +98,14 @@ def _connect(self):
timeout=self._timeout)

if self._login is not None:
self._srv_handler.write("USERNAME %s\n" % self._login)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"USERNAME %s\n" % self._login.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if not result == "OK\n":
raise PyNUTError(result.replace("\n", ""))

if self._password is not None:
self._srv_handler.write("PASSWORD %s\n" % self._password)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"PASSWORD %s\n" % self._password.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if not result == "OK\n":
raise PyNUTError(result.replace("\n", ""))
except telnetlib.socket.error:
Expand All @@ -115,8 +115,8 @@ def description(self, ups):
"""Returns the description for a given UPS."""
logging.debug("description called...")

self._srv_handler.write("GET UPSDESC %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET UPSDESC %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
return result.split('"')[1].strip()
except IndexError:
Expand All @@ -130,13 +130,13 @@ def list_ups(self):
"""
logging.debug("list_ups from server")

self._srv_handler.write("LIST UPS\n")
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST UPS\n")
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST UPS\n":
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST UPS\n",
self._timeout)
result = self._srv_handler.read_until(b"END LIST UPS\n",
self._timeout).decode('utf-8')

ups_dict = {}
for line in result.split("\n"):
Expand All @@ -154,13 +154,13 @@ def list_vars(self, ups):
"""
logging.debug("list_vars called...")

self._srv_handler.write("LIST VAR %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST VAR %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST VAR %s\n" % ups:
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST VAR %s\n" % ups,
self._timeout)
result = self._srv_handler.read_until(b"END LIST VAR %s\n" % ups.encode('utf-8'),
self._timeout).decode('utf-8')
offset = len("VAR %s " % ups)
end_offset = 0 - (len("END LIST VAR %s\n" % ups) + 1)

Expand All @@ -179,13 +179,13 @@ def list_commands(self, ups):
"""
logging.debug("list_commands called...")

self._srv_handler.write("LIST CMD %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST CMD %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST CMD %s\n" % ups:
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST CMD %s\n" % ups,
self._timeout)
result = self._srv_handler.read_until(b"END LIST CMD %s\n" % ups.encode('utf-8'),
self._timeout).decode('utf-8')
offset = len("CMD %s " % ups)
end_offset = 0 - (len("END LIST CMD %s\n" % ups) + 1)

Expand All @@ -195,8 +195,8 @@ def list_commands(self, ups):

# For each var we try to get the available description
try:
self._srv_handler.write("GET CMDDESC %s %s\n" % (ups, command))
temp = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET CMDDESC %s %s\n" % (ups.encode('utf-8'), command.encode('utf-8')))
temp = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if temp.startswith("CMDDESC"):
desc_offset = len("CMDDESC %s %s " % (ups, command))
commands[command] = temp[desc_offset:-1].split('"')[1]
Expand All @@ -219,15 +219,15 @@ def list_clients(self, ups=None):
raise PyNUTError("%s is not a valid UPS" % ups)

if ups:
self._srv_handler.write("LIST CLIENTS %s\n" % ups)
self._srv_handler.write(b"LIST CLIENTS %s\n" % ups.encode('utf-8'))
else:
self._srv_handler.write("LIST CLIENTS\n")
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST CLIENTS\n")
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST CLIENTS\n":
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST CLIENTS\n",
self._timeout)
result = self._srv_handler.read_until(b"END LIST CLIENTS\n",
self._timeout).decode('utf-8')

clients = {}
for line in result.split("\n"):
Expand All @@ -247,13 +247,13 @@ def list_rw_vars(self, ups):
"""
logging.debug("list_vars from '%s'...", ups)

self._srv_handler.write("LIST RW %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST RW %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST RW %s\n" % ups:
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST RW %s\n" % ups,
self._timeout)
result = self._srv_handler.read_until(b"END LIST RW %s\n" % ups.encode('utf-8'),
self._timeout).decode('utf-8')
offset = len("VAR %s" % ups)
end_offset = 0 - (len("END LIST RW %s\n" % ups) + 1)

Expand All @@ -271,13 +271,13 @@ def list_enum(self, ups, var):
"""
logging.debug("list_enum from '%s'...", ups)

self._srv_handler.write("LIST ENUM %s %s\n" % (ups, var))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST ENUM %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST ENUM %s %s\n" % (ups, var):
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST ENUM %s %s\n" % (ups, var),
self._timeout)
result = self._srv_handler.read_until(b"END LIST ENUM %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')),
self._timeout).decode('utf-8')
offset = len("ENUM %s %s" % (ups, var))
end_offset = 0 - (len("END LIST ENUM %s %s\n" % (ups, var)) + 1)

Expand All @@ -294,13 +294,13 @@ def list_range(self, ups, var):
"""
logging.debug("list_range from '%s'...", ups)

self._srv_handler.write("LIST RANGE %s %s\n" % (ups, var))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"LIST RANGE %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "BEGIN LIST RANGE %s %s\n" % (ups, var):
raise PyNUTError(result.replace("\n", ""))

result = self._srv_handler.read_until("END LIST RANGE %s %s\n" % (ups, var),
self._timeout)
result = self._srv_handler.read_until(b"END LIST RANGE %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')),
self._timeout).decode('utf-8')
offset = len("RANGE %s %s" % (ups, var))
end_offset = 0 - (len("END LIST RANGE %s %s\n" % (ups, var)) + 1)

Expand All @@ -318,17 +318,17 @@ def set_var(self, ups, var, value):
"""
logging.debug("set_var '%s' from '%s' to '%s'", var, ups, value)

self._srv_handler.write("SET VAR %s %s %s\n" % (ups, var, value))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"SET VAR %s %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8'), value.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "OK\n":
raise PyNUTError(result.replace("\n", ""))

def get_var(self, ups, var):
"""Get the value of a variable."""
logging.debug("get_var called...")

self._srv_handler.write("GET VAR %s %s\n" % (ups, var))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET VAR %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
# result = 'VAR %s %s "%s"\n' % (ups, var, value)
return result.split('"')[1].strip()
Expand All @@ -344,8 +344,8 @@ def var_description(self, ups, var):
"""Get a variable's description."""
logging.debug("var_description called...")

self._srv_handler.write("GET DESC %s %s\n" % (ups, var))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET DESC %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
# result = 'DESC %s %s "%s"\n' % (ups, var, description)
return result.split('"')[1].strip()
Expand All @@ -356,8 +356,8 @@ def var_type(self, ups, var):
"""Get a variable's type."""
logging.debug("var_type called...")

self._srv_handler.write("GET TYPE %s %s\n" % (ups, var))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET TYPE %s %s\n" % (ups.encode('utf-8'), var.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
# result = 'TYPE %s %s %s\n' % (ups, var, type)
type_ = ' '.join(result.split(' ')[3:]).strip()
Expand All @@ -372,8 +372,8 @@ def command_description(self, ups, command):
"""Get a command's description."""
logging.debug("command_description called...")

self._srv_handler.write("GET CMDDESC %s %s\n" % (ups, command))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET CMDDESC %s %s\n" % (ups.encode('utf-8'), command.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
# result = 'CMDDESC %s %s "%s"' % (ups, command, description)
return result.split('"')[1].strip()
Expand All @@ -384,23 +384,23 @@ def run_command(self, ups, command):
"""Send a command to the specified UPS."""
logging.debug("run_command called...")

self._srv_handler.write("INSTCMD %s %s\n" % (ups, command))
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"INSTCMD %s %s\n" % (ups.encode('utf-8'), command.encode('utf-8')))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "OK\n":
raise PyNUTError(result.replace("\n", ""))

def fsd(self, ups):
"""Send MASTER and FSD commands."""
logging.debug("MASTER called...")

self._srv_handler.write("MASTER %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"MASTER %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "OK MASTER-GRANTED\n":
raise PyNUTError(("Master level function are not available", ""))

logging.debug("FSD called...")
self._srv_handler.write("FSD %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"FSD %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
if result != "OK FSD-SET\n":
raise PyNUTError(result.replace("\n", ""))

Expand All @@ -410,8 +410,8 @@ def num_logins(self, ups):
"""
logging.debug("num_logins called on '%s'...", ups)

self._srv_handler.write("GET NUMLOGINS %s\n" % ups)
result = self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"GET NUMLOGINS %s\n" % ups.encode('utf-8'))
result = self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
try:
# result = "NUMLOGINS %s %s\n" % (ups, int(numlogins))
return int(result.split(' ')[2].strip())
Expand All @@ -422,12 +422,12 @@ def help(self):
"""Send HELP command."""
logging.debug("HELP called...")

self._srv_handler.write("HELP\n")
return self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"HELP\n")
return self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')

def ver(self):
"""Send VER command."""
logging.debug("VER called...")

self._srv_handler.write("VER\n")
return self._srv_handler.read_until("\n", self._timeout)
self._srv_handler.write(b"VER\n")
return self._srv_handler.read_until(b"\n", self._timeout).decode('utf-8')
Loading

0 comments on commit 580bbea

Please sign in to comment.