Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI development changes for PNAC HLD PR 1292 #2859

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
258 changes: 258 additions & 0 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import time
import itertools
import copy
import ast

from jsonpatch import JsonPatchConflict
from jsonpointer import JsonPointerException
Expand Down Expand Up @@ -6549,6 +6550,263 @@ def del_ntp_server(ctx, ntp_ip_address):
except SystemExit as e:
ctx.fail("Restart service ntp-config failed with error {}".format(e))

#
# 'nac' group ('config nac ...')
#
@config.group(cls=clicommon.AbbreviationGroup)
@click.pass_context
def nac(ctx):
"""NAC-related configuration tasks"""
config_db = ConfigDBConnector()
config_db.connect()
ctx.obj = {'db': config_db}

def init_nac_interface_to_enable():
"""initing nac interface ports to enable values"""
config_db = ConfigDBConnector()
config_db.connect()
try:
port_dict = config_db.get_table('PORT')
except Exception as e:
click.echo("PORT Table is not present in Config DB")
pass

ports = ast.literal_eval(json.dumps(port_dict))
nac_session_dict = config_db.get_table('NAC_SESSION')
portchannel_list = config_db.get_table("PORTCHANNEL")
subport_list = config_db.get_table("VLAN_SUB_INTERFACE")

for intf_name in ports.keys():
if nac_session_dict and intf_name in nac_session_dict:
config_db.mod_entry('NAC_SESSION', intf_name, {'admin_state': 'up', 'nac_status': 'unauthorized'})
else:
config_db.set_entry('NAC_SESSION', intf_name, {'admin_state': 'up', 'nac_status': 'unauthorized'})

intf_fs = parse_interface_in_filter(intf_name)
if intf_name in port_dict:
if intf_name in intf_fs:
config_db.mod_entry("PORT", intf_name, {"admin_status": "down"})

for po_name in portchannel_list:
if po_name in intf_fs:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "down"})

for sp_name in subport_list:
if sp_name in intf_fs:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "down"})

time.sleep(1)
config_db.mod_entry("PORT", intf_name, {"admin_status": "up"})

for po_name in portchannel_list:
if po_name in intf_fs:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "up"})

for sp_name in subport_list:
if sp_name in intf_fs:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "up"})
config_db.mod_entry('PORT', intf_name, {'learn_mode': 'drop'})
#click.echo("learn mode set to drop")


def init_nac_interface_to_default():
"""initing nac interface ports to default values"""
config_db = ConfigDBConnector()
config_db.connect()
try:
port_dict = config_db.get_table('PORT')
except Exception as e:
click.echo("PORT Table is not present in Config DB")
pass

ports = ast.literal_eval(json.dumps(port_dict))
nac_session_dict = config_db.get_table('NAC_SESSION')

for intf_name in ports.keys():
if nac_session_dict and intf_name in nac_session_dict:
config_db.mod_entry('NAC_SESSION', intf_name, {'admin_state': 'down', 'nac_status': 'unauthorized'})
else:
config_db.set_entry('NAC_SESSION', intf_name, {'admin_state': 'down', 'nac_status': 'unauthorized'})

if intf_name in port_dict:
config_db.mod_entry('PORT', intf_name, {'learn_mode': 'hardware'})

#
# 'nac' command ('config nac enable')
#
@nac.command()
@click.pass_context
def enable(ctx):
"""Enable NAC"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
nac_tbl = {'global': {'admin_state': 'up', 'auth_type': 'local', 'nac_type':'port'}}
config_db.mod_entry('NAC', 'global', nac_tbl['global'])
init_nac_interface_to_default()
else:
config_db.mod_entry('NAC', 'global', {'admin_state':'up'})

#
# 'nac' command ('config nac disable')
#
@nac.command()
@click.pass_context
def disable(ctx):
"""Disable NAC"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
nac_tbl = {'global': {'admin_state': 'down', 'auth_type': 'local', 'nac_type':'port'}}
config_db.mod_entry('NAC', 'global', nac_tbl['global'])
else:
nac_tbl['global']['admin_state'] = 'down'
config_db.mod_entry('NAC', 'global', {'admin_state':'down'})
init_nac_interface_to_default()

#
# 'nac' command ('config nac type <port|mac>')
#
@nac.command('type')
@click.argument('nac_type', type=click.Choice(['port', 'mac']))
@click.pass_context
def nac_type(ctx, nac_type):
"""Configure NAC type"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to set NAC type.")
else:
if nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
nac_tbl['global']['nac_type'] = nac_type
config_db.mod_entry('NAC', 'global', {'nac_type': nac_type})
init_nac_interface_to_default()

def fdb_clear_all():
"""Clear All FDB entries"""
appl_db = SonicV2Connector(host="127.0.0.1")
if appl_db != None:
hndl_appl_db = appl_db.connect(appl_db.APPL_DB)
if hndl_appl_db != None:
opdata = ["ALL", "ALL"]
dbmsg = json.dumps(opdata, separators=(',',':'))
appl_db.publish('APPL_DB', 'FLUSHFDBREQUEST', dbmsg)

#
# 'nac interface' group
#
@nac.group(cls=clicommon.AbbreviationGroup)
@click.pass_context
def interface(ctx):
"""Configure NAC settings for an interface"""
pass

# 'nac' command ('config nac interface enable ...')
#
@interface.command()
@click.argument('ifname', metavar='<interface_name>', required=True, type=str)
@click.pass_context
def enable(ctx, ifname):
"""Enable NAC for an interface"""
config_db = ctx.obj['db']

if not interface_name_is_valid(config_db, ifname) and ifname != 'all':
click.echo("Invalid interface name")
return

nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['nac_type'] != 'port':
click.echo("NAC feature is not configured in port-based mode, cannot configure PNAC settings")
return

nac_session_dict = config_db.get_table('NAC_SESSION')
port_dict = config_db.get_table('PORT')
portchannel_list = config_db.get_table("PORTCHANNEL")
subport_list = config_db.get_table("VLAN_SUB_INTERFACE")

if ifname == 'all':
init_nac_interface_to_enable()
else:
if nac_session_dict and ifname in nac_session_dict:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'up'})
else:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'up', 'nac_status': 'unauthorized'})

if ifname in port_dict:
config_db.mod_entry("PORT", ifname, {"admin_status": "down"})

for po_name in portchannel_list:
if po_name in ifname:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "down"})

for sp_name in subport_list:
if sp_name in ifname:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "down"})

time.sleep(1)
config_db.mod_entry("PORT", ifname, {"admin_status": "up"})

for po_name in portchannel_list:
if po_name in ifname:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "up"})

for sp_name in subport_list:
if sp_name in ifname:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "up"})

config_db.mod_entry('PORT', ifname, {'learn_mode': 'drop'})
click.echo("restarted interface, NAC feature enabled on interface")
fdb_clear_all()

# 'nac' command ('config nac interface disable ...')
#
@interface.command()
@click.argument('ifname', metavar='<interface_name>', required=True, type=str)
@click.pass_context
def disable(ctx, ifname):
"""Disable NAC for an interface"""
config_db = ctx.obj['db']
if not interface_name_is_valid(config_db, ifname) and ifname != 'all':
click.echo("Invalid interface name")
return

nac_tbl = config_db.get_table('NAC')
if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['nac_type'] != 'port':
click.echo("NAC feature is not configured in port-based mode, cannot configure PNAC settings")
return

nac_session_dict = config_db.get_table('NAC_SESSION')
port_dict = config_db.get_table('PORT')

if ifname == 'all':
init_nac_interface_to_default()
else:
if nac_session_dict and ifname in nac_session_dict:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'down', 'nac_status': 'unauthorized'})
else:
click.echo("NAC is not configured for this interface")

if ifname in port_dict:
config_db.mod_entry('PORT', ifname, {'learn_mode': 'hardware'})

#
# 'sflow' group ('config sflow ...')
#
Expand Down
2 changes: 2 additions & 0 deletions show/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from . import plugins
from . import syslog
from . import dns
from . import nac

# Global Variables
PLATFORM_JSON = 'platform.json'
Expand Down Expand Up @@ -300,6 +301,7 @@ def cli(ctx):

# syslog module
cli.add_command(syslog.syslog)
cli.add_command(nac.nac)

# Add greabox commands only if GEARBOX is configured
if is_gearbox_configured():
Expand Down
74 changes: 74 additions & 0 deletions show/nac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
import os

import click
import utilities_common.cli as clicommon
from natsort import natsorted
from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector
from portconfig import get_child_ports
from tabulate import tabulate
import ast

#
# 'nac group ("show nac...")
#
@click.group(invoke_without_command=True)
@clicommon.pass_db
@click.pass_context
def nac(ctx, db):
"""Show NAC related information"""
if ctx.invoked_subcommand is None:
show_nac_global(db.cfgdb)


def show_nac_global(config_db):
nac_info = config_db.get_table('NAC')
global_admin_state = 'down'
global_nac_type = 'port'
global_auth_type = 'local'
if nac_info:
global_admin_state = nac_info['global']['admin_state']
global_nac_type = nac_info['global']['nac_type']
global_auth_type = nac_info['global']['auth_type']

click.echo("\nNAC Global Information:")
click.echo(" NAC Admin State:".ljust(30) + "{}".format(global_admin_state))
click.echo(" NAC Type :".ljust(30) + "{}".format(global_nac_type))
click.echo(" NAC Authentication Type :".ljust(30) + "{}".format(global_auth_type))

#
# 'interface' command ("show nac interface <interface_name | all>")
#
@nac.command('interface')
@click.argument('interfacename', metavar='<interface_name>', required=True)
@clicommon.pass_db
def nac_interface(db, interfacename):
"""Show NAC interface information"""
config_db = db.cfgdb

nac_tbl = config_db.get_table('NAC')
if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
else:
if nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return

header = ['InterfaceName', 'NAC AdminState', 'Authorization State', 'Mapped Profile']
body = []

port_dict = config_db.get_table('PORT')
nac_session_dict = config_db.get_table('NAC_SESSION')

ports = ast.literal_eval(json.dumps(port_dict))
nac_sessions = ast.literal_eval(json.dumps(nac_session_dict))

if interfacename != 'all':
if interfacename in nac_sessions.keys():
body.append([interfacename, nac_sessions[interfacename].get('admin_state'), nac_sessions[interfacename].get('nac_status'), ports[interfacename].get('nac')])
else:
for intf_name in nac_sessions.keys():
if intf_name != 'all': #nac_sessions db table as well can have entry with nace 'all'
body.append([intf_name, nac_sessions[intf_name].get('admin_state'), nac_sessions[intf_name].get('nac_status'), ports[intf_name].get('nac')])
click.echo(tabulate(body, header, tablefmt="grid"))