Skip to content

Commit

Permalink
CLI development changes for PNAC HLD PR 1292
Browse files Browse the repository at this point in the history
  • Loading branch information
thovikeerthi committed Jun 2, 2023
1 parent b316fc2 commit 8072107
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 0 deletions.
253 changes: 253 additions & 0 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import time
import itertools
import copy
import ast

from jsonpatch import JsonPatchConflict
from jsonpointer import JsonPointerException
Expand Down Expand Up @@ -6470,6 +6471,258 @@ def del_ntp_server(ctx, ntp_ip_address):
except SystemExit as e:
ctx.fail("Restart service ntp-config failed with error {}".format(e))

#
# 'nac' group ('config nac ...')
#
@config.group(cls=clicommon.AbbreviationGroup)
@click.pass_context
def nac(ctx):
"""NAC-related configuration tasks"""
config_db = ConfigDBConnector()
config_db.connect()
ctx.obj = {'db': config_db}

def init_nac_interface_to_enable():
"""initing nac interface ports to enable values"""
config_db = ConfigDBConnector()
config_db.connect()
try:
port_dict = config_db.get_table('PORT')
except Exception as e:
click.echo("PORT Table is not present in Config DB")
pass

ports = ast.literal_eval(json.dumps(port_dict))
nac_session_dict = config_db.get_table('NAC_SESSION')
portchannel_list = config_db.get_table("PORTCHANNEL")
subport_list = config_db.get_table("VLAN_SUB_INTERFACE")

for intf_name in ports.keys():
if nac_session_dict and intf_name in nac_session_dict:
config_db.mod_entry('NAC_SESSION', intf_name, {'admin_state': 'up', 'nac_status': 'unauthorized'})
else:
config_db.set_entry('NAC_SESSION', intf_name, {'admin_state': 'up', 'nac_status': 'unauthorized'})

if intf_name in port_dict:
config_db.mod_entry("PORT", intf_name, {"admin_status": "down"})

for po_name in portchannel_list:
if po_name in intf_name:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "down"})

for sp_name in subport_list:
if sp_name in intf_name:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "down"})

time.sleep(1)
config_db.mod_entry("PORT", intf_name, {"admin_status": "up"})

for po_name in portchannel_list:
if po_name in intf_name:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "up"})

for sp_name in subport_list:
if sp_name in intf_name:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "up"})
config_db.mod_entry('PORT', intf_name, {'learn_mode': 'drop'})
click.echo("learn mode set to drop")


def init_nac_interface_to_default():
"""initing nac interface ports to default values"""
config_db = ConfigDBConnector()
config_db.connect()
try:
port_dict = config_db.get_table('PORT')
except Exception as e:
click.echo("PORT Table is not present in Config DB")
pass

ports = ast.literal_eval(json.dumps(port_dict))
nac_session_dict = config_db.get_table('NAC_SESSION')

for intf_name in ports.keys():
if nac_session_dict and intf_name in nac_session_dict:
config_db.mod_entry('NAC_SESSION', intf_name, {'admin_state': 'down', 'nac_status': 'unauthorized'})
else:
config_db.set_entry('NAC_SESSION', intf_name, {'admin_state': 'down', 'nac_status': 'unauthorized'})

if intf_name in port_dict:
config_db.mod_entry('PORT', intf_name, {'learn_mode': 'hardware'})

#
# 'nac' command ('config nac enable')
#
@nac.command()
@click.pass_context
def enable(ctx):
"""Enable NAC"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
nac_tbl = {'global': {'admin_state': 'up', 'auth_type': 'local', 'nac_type':'port'}}
config_db.mod_entry('NAC', 'global', nac_tbl['global'])
init_nac_interface_to_default()
else:
config_db.mod_entry('NAC', 'global', {'admin_state':'up'})

#
# 'nac' command ('config nac disable')
#
@nac.command()
@click.pass_context
def disable(ctx):
"""Disable NAC"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
nac_tbl = {'global': {'admin_state': 'down'}}
config_db.mod_entry('NAC', 'global', nac_tbl['global'])
else:
nac_tbl['global']['admin_state'] = 'down'
config_db.mod_entry('NAC', 'global', {'admin_state':'down'})
init_nac_interface_to_default()

#
# 'nac' command ('config nac type <port|mac>')
#
@nac.command('type')
@click.argument('nac_type', type=click.Choice(['port', 'mac']))
@click.pass_context
def nac_type(ctx, nac_type):
"""Configure NAC type"""
config_db = ctx.obj['db']
nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to set NAC type.")
else:
if nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
nac_tbl['global']['nac_type'] = nac_type
config_db.mod_entry('NAC', 'global', {'nac_type': nac_type})
init_nac_interface_to_default()

def fdb_clear_all():
appl_db = SonicV2Connector(host="127.0.0.1")
appl_db.connect(appl_db.APPL_DB)
opdata = ["ALL", "ALL"]
dbmsg = json.dumps(opdata, separators=(',',':'))
appl_db.publish('APPL_DB', 'FLUSHFDBREQUEST', dbmsg)

#
# 'nac interface' group
#
@nac.group(cls=clicommon.AbbreviationGroup)
@click.pass_context
def interface(ctx):
"""Configure NAC settings for an interface"""
pass

# 'nac' command ('config nac interface enable ...')
#
@interface.command()
@click.argument('ifname', metavar='<interface_name>', required=True, type=str)
@click.pass_context
def enable(ctx, ifname):
"""Enable NAC for an interface"""
config_db = ctx.obj['db']

if not interface_name_is_valid(config_db, ifname) and ifname != 'all':
click.echo("Invalid interface name")
return

nac_tbl = config_db.get_table('NAC')

if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['nac_type'] != 'port':
click.echo("NAC feature is not configured in port-based mode, cannot configure PNAC settings")
return

nac_session_dict = config_db.get_table('NAC_SESSION')
port_dict = config_db.get_table('PORT')
portchannel_list = config_db.get_table("PORTCHANNEL")
subport_list = config_db.get_table("VLAN_SUB_INTERFACE")

if ifname == 'all':
init_nac_interface_to_enable()
else:
if nac_session_dict and ifname in nac_session_dict:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'up'})
else:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'up', 'nac_status': 'unauthorized'})

if ifname in port_dict:
config_db.mod_entry("PORT", ifname, {"admin_status": "down"})

for po_name in portchannel_list:
if po_name in ifname:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "down"})

for sp_name in subport_list:
if sp_name in ifname:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "down"})

time.sleep(1)
config_db.mod_entry("PORT", ifname, {"admin_status": "up"})

for po_name in portchannel_list:
if po_name in ifname:
config_db.mod_entry("PORTCHANNEL", po_name, {"admin_status": "up"})

for sp_name in subport_list:
if sp_name in ifname:
config_db.mod_entry("VLAN_SUB_INTERFACE", sp_name, {"admin_status": "up"})

config_db.mod_entry('PORT', ifname, {'learn_mode': 'drop'})
click.echo("restarted interface, NAC feature enabled on interface")
fdb_clear_all()

# 'nac' command ('config nac interface disable ...')
#
@interface.command()
@click.argument('ifname', metavar='<interface_name>', required=True, type=str)
@click.pass_context
def disable(ctx, ifname):
"""Disable NAC for an interface"""
config_db = ctx.obj['db']
if not interface_name_is_valid(config_db, ifname) and ifname != 'all':
click.echo("Invalid interface name")
return

nac_tbl = config_db.get_table('NAC')
if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return
elif nac_tbl['global']['nac_type'] != 'port':
click.echo("NAC feature is not configured in port-based mode, cannot configure PNAC settings")
return

nac_session_dict = config_db.get_table('NAC_SESSION')
port_dict = config_db.get_table('PORT')

if ifname == 'all':
init_nac_interface_to_default()
else:
if nac_session_dict and ifname in nac_session_dict:
config_db.mod_entry('NAC_SESSION', ifname, {'admin_state': 'down', 'nac_status': 'unauthorized'})
else:
click.echo("NAC is not configured for this interface")

if ifname in port_dict:
config_db.mod_entry('PORT', ifname, {'learn_mode': 'hardware'})

#
# 'sflow' group ('config sflow ...')
#
Expand Down
2 changes: 2 additions & 0 deletions show/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from . import warm_restart
from . import plugins
from . import syslog
from . import nac

# Global Variables
PLATFORM_JSON = 'platform.json'
Expand Down Expand Up @@ -298,6 +299,7 @@ def cli(ctx):

# syslog module
cli.add_command(syslog.syslog)
cli.add_command(nac.nac)

# Add greabox commands only if GEARBOX is configured
if is_gearbox_configured():
Expand Down
83 changes: 83 additions & 0 deletions show/nac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
import os

import click
import utilities_common.cli as clicommon
from natsort import natsorted
from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector
from portconfig import get_child_ports
from tabulate import tabulate
import ast

#
# 'nac group ("show nac...")
#
@click.group(invoke_without_command=True)
@clicommon.pass_db
@click.pass_context
def nac(ctx, db):
"""Show NAC related information"""
if ctx.invoked_subcommand is None:
show_nac_global(db.cfgdb)


def show_nac_global(config_db):
nac_info = config_db.get_table('NAC')
global_admin_state = 'down'
global_nac_type = 'port'
global_auth_type = 'local'
if nac_info:
global_admin_state = nac_info['global']['admin_state']
global_nac_type = nac_info['global']['nac_type']
global_auth_type = nac_info['global']['auth_type']

click.echo("\nNAC Global Information:")
click.echo(" NAC Admin State:".ljust(30) + "{}".format(global_admin_state))
click.echo(" NAC Type :".ljust(30) + "{}".format(global_nac_type))
click.echo(" NAC Authentication Type :".ljust(30) + "{}".format(global_auth_type))

#
# 'interface' command ("show nac interface <interface_name | all>")
#
@nac.command('interface')
@click.argument('interfacename', metavar='<interface_name>', required=True)
@clicommon.pass_db
def nac_interface(db, interfacename):
"""Show NAC interface information"""
config_db = db.cfgdb

nac_tbl = config_db.get_table('NAC')
if not nac_tbl:
click.echo("NAC feature not enabled. Enable feature to set NAC type.")
return
else:
if nac_tbl['global']['admin_state'] == 'down':
click.echo("NAC feature not enabled. Enable feature to configure NAC settings")
return

header = ['InterfaceName', 'NAC AdminState', 'Authorization State', 'Mapped Profile']
body = []

try:
port_dict = config_db.get_table('PORT')
except Exception as e:
click.echo("PORT Table is not present in Config DB")
raise click.Abort()

try:
nac_session_dict = config_db.get_table('NAC_SESSION')
except Exception as e:
click.echo("NAC_SESSION Table is not present in Config DB")
raise click.Abort()

ports = ast.literal_eval(json.dumps(port_dict))
nac_sessions = ast.literal_eval(json.dumps(nac_session_dict))

if interfacename != 'all':
if interfacename in nac_sessions.keys():
body.append([interfacename, nac_sessions[interfacename].get('admin_state'), nac_sessions[interfacename].get('nac_status'), ports[interfacename].get('nac')])
else:
for intf_name in nac_sessions.keys():
if intf_name != 'all': #nac_sessions db table as well can have entry with nace 'all'
body.append([intf_name, nac_sessions[intf_name].get('admin_state'), nac_sessions[intf_name].get('nac_status'), ports[intf_name].get('nac')])
click.echo(tabulate(body, header, tablefmt="grid"))

0 comments on commit 8072107

Please sign in to comment.