Skip to content

Commit

Permalink
Arista support add and remove interface vlan ips
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoissan authored and lindycoder committed Oct 25, 2018
1 parent 5da721f commit 1a27b29
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 103 deletions.
142 changes: 114 additions & 28 deletions netman/adapters/switches/arista.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import re

import pyeapi
from pyeapi.api.vlans import Vlans, isvlan
from netaddr import IPNetwork
from pyeapi.api.vlans import isvlan
from pyeapi.eapilib import CommandError

from netman.core.objects.exceptions import VlanAlreadyExist, UnknownVlan, BadVlanNumber, BadVlanName, \
OperationNotCompleted
IPAlreadySet, IPNotAvailable, UnknownIP
from netman.core.objects.switch_base import SwitchBase
from netman.core.objects.vlan import Vlan

Expand All @@ -21,7 +22,7 @@ def __init__(self, switch_descriptor):

def _connect(self):
m = re.match('^(https?):\/\/(.*)$', self.switch_descriptor.hostname.lower())
transport, hostname = (m.group(1), m.group(2)) if m else ('https', self.switch_descriptor.hostname)
transport, hostname = (m.group(1), m.group(2)) if m else ('https', self.switch_descriptor.hostname)

self.node = pyeapi.connect(host=hostname,
username=self.switch_descriptor.username,
Expand All @@ -30,10 +31,8 @@ def _connect(self):
transport=transport,
return_node=True)

self.conn = self.node.connection

def _disconnect(self):
self.conn = None
self.node = None

def _end_transaction(self):
pass
Expand All @@ -43,47 +42,134 @@ def _start_transaction(self):

def get_vlan(self, number):
try:
vlans_info = self.conn.execute("show vlan {}".format(number))
except CommandError:
raise UnknownVlan(number)

return self._extract_vlan_list(vlans_info)[0]
vlans_result, interfaces_result = self.node.enable(
["show vlan {}".format(number), "show interfaces Vlan{}".format(number)], strict=True)
vlans_info = vlans_result['result']
interfaces_info = interfaces_result['result']
except CommandError as e:
if "not found in current VLAN database" in e.command_error:
raise UnknownVlan(number)
elif "Invalid input" in e.command_error:
raise BadVlanNumber()
elif "Interface does not exist" in e.command_error:
vlans_info = e.output[1]
interfaces_info = {"interfaces": {}}
else:
raise

vlans = _extract_vlans(vlans_info)
_apply_interface_data(interfaces_info, vlans)

return vlans[0]

def get_vlans(self):
vlans_info = self.conn.execute("show vlan")
vlans_result, interfaces_result = self.node.enable(["show vlan", "show interfaces"], strict=True)

return self._extract_vlan_list(vlans_info)
vlans = _extract_vlans(vlans_result['result'])
_apply_interface_data(interfaces_result['result'], vlans)

return sorted(vlans, key=lambda v: v.number)

def add_vlan(self, number, name=None):
if not isvlan(number):
raise BadVlanNumber()
try:
self.conn.execute("show vlan {}".format(number))
self.node.enable(["show vlan {}".format(number)], strict=True)
raise VlanAlreadyExist(number)
except CommandError:
pass

commands = ["name {}".format(name)] if name else []
commands = ["vlan {}".format(number)]
if name is not None:
commands.append("name {}".format(name))

vlan = Vlans(self.node)
if not vlan.configure_vlan(number, commands):
try:
self.node.config(commands)
except CommandError:
raise BadVlanName()

def remove_vlan(self, number):
try:
self.conn.execute("show vlan {}".format(number))
self.node.enable(["show vlan {}".format(number)], strict=True)
except CommandError:
raise UnknownVlan(number)

vlan = Vlans(self.node)
if not vlan.delete(number):
raise OperationNotCompleted("Unable to remove vlan {}".format(number))
self.node.config(["no interface Vlan{}".format(number), "no vlan {}".format(number)])

def add_ip_to_vlan(self, vlan_number, ip_network):
vlan = self.get_vlan(vlan_number)

ip_found = next((ip for ip in vlan.ips if ip.ip == ip_network.ip), False)
if ip_found:
raise IPAlreadySet(ip_network, ip_found)

has_ips = len(vlan.ips) > 0
add_ip_command = "ip address {}{}".format(str(ip_network), " secondary" if has_ips else "")

commands = [
"interface vlan {}".format(vlan_number),
add_ip_command
]
try:
self.node.config(commands)
except CommandError as e:
raise IPNotAvailable(ip_network, reason=str(e))

def remove_ip_from_vlan(self, vlan_number, ip_network):
vlan = self.get_vlan(vlan_number)
existing_ip = next((ip for ip in vlan.ips
if ip.ip == ip_network.ip and ip.netmask == ip_network.netmask),
False)

if existing_ip:
ip_index = vlan.ips.index(existing_ip)
if ip_index == 0:
if len(vlan.ips) == 1:
remove_ip_command = "no ip address {}".format(str(ip_network))
else:
remove_ip_command = "ip address {}".format(str(vlan.ips[1]))
else:
remove_ip_command = "no ip address {} secondary".format(str(ip_network))

commands = [
"interface Vlan{}".format(vlan_number),
remove_ip_command
]
self.node.config(commands)
else:
raise UnknownIP(ip_network)


def _extract_vlans(vlans_info):
vlan_list = []
for id, vlan in vlans_info['vlans'].items():
if vlan['name'] == "VLAN{:04d}".format(int(id)):
vlan['name'] = None

vlan_list.append(Vlan(number=int(id), name=vlan['name'], icmp_redirects=True, arp_routing=True, ntp=True))
return vlan_list


def _apply_interface_data(interfaces_result, vlans):
interfaces = interfaces_result["interfaces"]
for vlan in vlans:
interface_name = "Vlan{}".format(vlan.number)

interface_data = interfaces.get(interface_name)
if interface_data is not None and len(interface_data["interfaceAddress"]) > 0:
interface_address = interface_data["interfaceAddress"][0]

primary_ip_data = interface_address["primaryIp"]
secondary_ips_data = interface_address["secondaryIpsOrderedList"]

primary_ip = _to_ip(primary_ip_data)

if primary_ip.ip != IPNetwork("0.0.0.0/0").ip:
vlan.ips.append(primary_ip)

for addr in secondary_ips_data:
vlan.ips.append(_to_ip(addr))

def _extract_vlan_list(self, vlans_info):
vlan_list = []
for id, vlan in vlans_info['result'][0]['vlans'].items():
if vlan['name'] == ("VLAN{:04d}".format(int(id))):
vlan['name'] = None

vlan_list.append(Vlan(number=int(id), name=vlan['name'], icmp_redirects=True, arp_routing=True, ntp=True))
return vlan_list
def _to_ip(ip_data):
return IPNetwork("{}/{}".format(ip_data["address"], ip_data["maskLen"]))
2 changes: 1 addition & 1 deletion test-constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ cryptography==2.3.1 # via twisted
docutils==0.14 # via sphinx
ecdsa==0.13
enum34==1.1.6 # via cryptography, flake8
fake-switches==1.3.0
fake-switches==1.3.2
flake8==3.4.1
flask==1.0.2
flexmock==0.10.2
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ nose>=1.2.1
mock>=1.0.1
pyhamcrest>=1.6
flexmock>=0.10.2
fake-switches>=1.3.0
fake-switches>=1.3.2
MockSSH>=1.4.2,!=1.4.5 # 1.4.5 has fixed dependency and freezes paramiko
gunicorn>=19.4.5
flake8==3.4.1
Expand Down
4 changes: 1 addition & 3 deletions tests/adapters/compliance_tests/add_ip_to_vlan_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@


class AddIpToVlanTest(ComplianceTestCase):
_dev_sample = "brocade"
_dev_sample = "arista"

def setUp(self):
super(AddIpToVlanTest, self).setUp()
self.client.add_vlan(1000, name="vlan_name")
self.client.add_vlan(2000, name="vlan_name")

def tearDown(self):
self.janitor.remove_vlan(1000)
self.janitor.remove_vlan(2000)
super(AddIpToVlanTest, self).tearDown()

def test_assigns_the_ip(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class RemoveIpFromVlanTest(ComplianceTestCase):
_dev_sample = "cisco"
_dev_sample = "arista"

def setUp(self):
super(RemoveIpFromVlanTest, self).setUp()
Expand Down

0 comments on commit 1a27b29

Please sign in to comment.