Skip to content

Commit

Permalink
Add parsing and setting methods for network configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
nocarryr committed Apr 5, 2018
1 parent 9d4e244 commit c240422
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 1 deletion.
16 changes: 16 additions & 0 deletions tests/test_telnet_backend.py
@@ -1,3 +1,4 @@
import ipaddress
import asyncio
import pytest

Expand Down Expand Up @@ -42,6 +43,11 @@ def smartview_backend(request, mocked_vidhub_telnet_device):
'device_id':'a10203040506',
'device_model':'SmartView Something',
'device_name':'SmartView Something',
'using_dhcp':True,
'static_ip':ipaddress.ip_interface('192.168.0.3/255.255.255.0'),
'static_gateway':ipaddress.ip_address('192.168.0.1'),
'current_ip':ipaddress.ip_interface('192.168.1.101/255.255.255.0'),
'current_gateway':ipaddress.ip_address('192.168.1.1'),
})
elif request.param == 'smartscope':
d['cls'] = SmartScopeTelnetBackend
Expand All @@ -50,6 +56,11 @@ def smartview_backend(request, mocked_vidhub_telnet_device):
'device_id':'0a1b2c3d4e5f',
'device_model':'SmartScope Duo 4K',
'device_name':'SmartScope Duo',
'using_dhcp':True,
'static_ip':ipaddress.ip_interface('192.168.0.2/255.255.255.0'),
'static_gateway':ipaddress.ip_address('192.168.0.1'),
'current_ip':ipaddress.ip_interface('192.168.1.100/255.255.255.0'),
'current_gateway':ipaddress.ip_address('192.168.1.1'),
})
return d

Expand All @@ -66,6 +77,11 @@ async def test_telnet_smartscope(smartview_backend):
assert backend.device_model == smartview_backend['device_model']
assert backend.device_name == smartview_backend['device_name']
assert backend.device_id.lower() == smartview_backend['device_id']
assert backend.using_dhcp == smartview_backend['using_dhcp']
assert backend.static_ip == smartview_backend['static_ip']
assert backend.static_gateway == smartview_backend['static_gateway']
assert backend.current_ip == smartview_backend['current_ip']
assert backend.current_gateway == smartview_backend['current_gateway']
assert not backend.inverted
assert backend.num_monitors == len(backend.monitors) == 2

Expand Down
80 changes: 79 additions & 1 deletion vidhubcontrol/backends/telnet.py
Expand Up @@ -2,6 +2,7 @@
import logging
import string
import errno
import ipaddress

from pydispatch import Property

Expand All @@ -18,6 +19,11 @@
class TelnetBackendBase(object):
hostaddr = Property()
hostport = Property()
using_dhcp = Property(False)
static_ip = Property()
static_gateway = Property()
current_ip = Property()
current_gateway = Property()
def _telnet_init(self, **kwargs):
self.read_enabled = False
self.current_section = None
Expand Down Expand Up @@ -125,12 +131,78 @@ async def wait_for_ack_or_nak(self):
self.ack_or_nak = None
self.ack_or_nak_event.clear()
return resp.startswith('ACK')
def _parse_network_line(self, line):
value = line.split(':')[1].strip(' ')
if line.startswith('Dynamic IP:'):
self.using_dhcp = 'true' in value
return True

if line.startswith('Static'):
prop = 'static'
elif line.startswith('Current'):
prop = 'current'
else:
return False

if 'gateway' in line:
prop = '_'.join([prop, 'gateway'])
setattr(self, prop, ipaddress.ip_address(value))
return True

if 'address:' in line:
prop = '_'.join([prop, 'ip'])
if getattr(self, prop) is not None:
value = '{}/{}'.format(value, getattr(self, prop).netmask)
elif 'netmask:' in line:
prop = '_'.join([prop, 'ip'])
if getattr(self, prop) is not None:
value = '{}/{}'.format(getattr(self, prop).ip, value)
setattr(self, prop, ipaddress.ip_interface(value))
return True
async def _reconfigure_device_network(self, address, gateway, use_dhcp):
address_changed = False
tx_lines = [
'NETWORK:',
'Dynamic IP: {}'.format(str(use_dhcp).lower()),
]
if not self.using_dhcp and using_dhcp:
address_changed = True
if not use_dhcp:
if not isinstance(address, ipaddress.IPv4Interface):
address = ipaddress.ip_interface(address)
if not isinstance(gateway, ipaddress.IPv4Address):
gateway = ipaddress.ip_address(gateway)
if address.ip != self.static_ip:
address_changed = True
self.static_ip = address
self.static_gateway = gateway
self.using_dhcp = False
tx_lines.extend([
'Static address: {}'.format(address.ip),
'Static netmask: {}'.format(address.netmask),
'Static gateway: {}'.format(gateway),
])
tx_bfr = bytes('\n'.join(tx_lines), 'UTF-8')
tx_bfr += b'\n\n'
await self.send_to_client(tx_bfr)
return address_changed
async def set_dhcp(self):
address_changed = await self._reconfigure_device_network(None, None, True)
if address_changed:
await self.disconnect()
async def set_device_static_ip(self, address, gateway):
address_changed = await self._reconfigure_device_network(address, gateway, False)
if address_changed:
await self.disconnect()
self.hostaddr = str(self.static_ip.ip)
await self.connect()

class TelnetBackend(TelnetBackendBase, VidhubBackendBase):
DEFAULT_PORT = 9990
SECTION_NAMES = [
'PROTOCOL PREAMBLE:',
'VIDEOHUB DEVICE:',
'NETWORK:',
'INPUT LABELS:',
'OUTPUT LABELS:',
'VIDEO OUTPUT LOCKS:',
Expand Down Expand Up @@ -181,6 +253,10 @@ def split_value(line):
self.num_outputs = int(split_value(line))
elif line.startswith('Video inputs:'):
self.num_inputs = int(split_value(line))
elif self.current_section == 'NETWORK:':
r = self._parse_network_line(line)
if r is False:
section_parsed = True
elif self.current_section == 'OUTPUT LABELS':
i = int(line.split(' ')[0])
self.output_labels[i] = ' '.join(line.split(' ')[1:])
Expand Down Expand Up @@ -320,7 +396,9 @@ def split_value(line):
elif line.startswith('Inverted:'):
self.inverted = split_value(line) == 'true'
elif self.current_section == 'NETWORK':
pass
r = self._parse_network_line(line)
if r is False:
section_parsed = True
elif self.current_section.startswith('MONITOR '):
monitor_name = self.current_section
await self.parse_monitor_line(monitor_name, line, split_value(line))
Expand Down

0 comments on commit c240422

Please sign in to comment.