# AI agents for Network deployment, configuration and monitoring

## Import relevant libraries

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from crewai import Agent, Task, Crew, LLM, Process
from crewai_tools import BaseTool
import os
import requests
import paramiko
from typing import List
import time
import io
from netmiko import ConnectHandler
from netmiko import redispatch

## Add API keys

In [None]:
os.environ["OPENAI_MODEL_NAME"] = "gpt-4o-mini"
os.environ["OPENAI_API_KEY"]="Enter your OpenAI API key here"


In [None]:
llm = LLM(
    model="anthropic/claude-3-5-sonnet-20241022",
    api_key="Enter your Anthropic API key here",
)

## Define Document Specialist (Agent, Custom Tool and Tasks)

In [None]:
# Custom tool to extract content from a given webpage
class QuickstartExtractor(BaseTool):
    name: str = "WebPage Content extractor"
    description: str = "Get all the content from a webpage"
    
    def _run(self) -> str:    
            url = "https://learn.srlinux.dev/get-started/lab/"
            response = requests.get(url)
            response.raise_for_status()
            return response.text

In [None]:
# Create doc specialist Agent
doc_specialist = Agent(
			role="Documentation Specialist",
			goal="Extract and organize containerlab quickstart steps",
			backstory="""Expert in technical documentation with focus on clear, 
			actionable installation and setup instructions.""",
			verbose=True,
			tools = [QuickstartExtractor()],
			allow_delegation=False
        )

In [None]:
# Task for doc speaclist Agent
doc_task = Task(
				description=(
                    "From the containerlab quickstart guide:\n"
					"1. Extract installation steps \n"
					"2. Identify topology deployment steps \n"
					"3. Find node connection instructions \n"
	
				"Present in a clear, sequential format.\n"
				),
                expected_output="List of commands",
				agent=doc_specialist
                )

## Define Linux Configuration Agent (Agent, Custom Tool and Tasks)

In [None]:
# Tool for Linux Configuration agent to log into sandbox vm and deploy containerlab network topology
class ExecuteRemoteCommandTool(BaseTool):
    name: str = "execute_remote_command"
    description: str = """
        Executes a Linux command on a remote host via SSH. 
        Provide hostname, port, username, password or private key,
        and the command to execute. Returns the output or error message.
    """

    def _run(self, hostname: str, port: int = 22, username: str = "",
             password: str = None, private_key: str = None, command: str = "") -> str:
        """
        Executes the tool in synchronous mode.

        Args:
            hostname (str): Remote server hostname or IP.
            port (int): SSH port (default is 22).
            username (str): SSH username.
            password (str, optional): SSH password.
            private_key (str, optional): SSH private key as a string.
            command (str): Command to execute.

        Returns:
            str: Command output or error message.
        """
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            if private_key:
                key = paramiko.RSAKey.from_private_key(io.StringIO(private_key))
                ssh.connect(hostname, port, username, pkey=key)
            elif password:
                ssh.connect(hostname, port, username, password)
            else:
                return "Error: Either password or private key must be provided for authentication."

            stdin, stdout, stderr = ssh.exec_command(command)
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            ssh.close()
            return output if output else error
        except Exception as e:
            return f"Error: {str(e)}"

    async def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async execution.")

            

In [None]:
# Create Linux Configuration Agent which will execute linux commands and deploy containers

linux_configuration_agent = Agent(
    role="Linux Command Executor and Docker Orchestrator",
    goal="To execute Linux commands efficiently and manage Docker containers to streamline application deployment and environment management.\n",
    backstory="""
    You are a highly skilled systems operator with expertise in Linux and containerization. 
    Born from a fusion of administrative automation and DevOps, your purpose is to enhance productivity by handling shell commands, automating routine tasks, and orchestrating Docker containers. 
    You thrive in managing complex environments with precision, ensuring smooth operations for developers and administrators alike.
    """,
    tools=[ExecuteRemoteCommandTool()],
    verbose=True
)

In [None]:
# Task for Linux Configuration agent. It will basically get the output of Doc Specialist Agent
# and execute those commands on the sandbox VM
remote_ssh_task = Task(
    description=(
    "Use execute_remote_command for connecting to remote host with \n"
    "hostname: 192.168.64.2, \n"
    "username: debian, \n"
    "password: debian \n"
    "and execute system checks:\n"
    "1. Check system uptime\n"
    "2. Monitor disk usage\n"
    "3. Check running processes\n"
    "4. Verify network connectivity\n"
    "If all of these commands pass, then execute the commands obtained from {doc_task_output} to install containerlab and deploy the topology. For any commands requiring sudo, the sudo password is: debian.\n"
    "IMPORTANT: If containerlab is already installed then you can go to the next task of topology deployment.\n"
    "Finally list all the docker containers running.\n"
    ),
    expected_output="The output of the executed command or an error message if the command fails.",
    agent=linux_configuration_agent
)

## Senior Network Administrator (Agent, tools and tasks)

In [None]:
# This agent does not require any tool as it needs to generate the right topo info and config only.

In [None]:
# Define Senior Network Administrator Agent

network_admin_agent = Agent(
    role="Senior Network Administrator",
    goal="Execute and manage commands on remote systems",
    backstory="""Experienced network administrator skilled in remote system 
    management and automation. Expert in network routing, Linux/Unix systems and SSH-based operations.""",
    #tools=[PDFSearchTool(pdf="/Users/mandeepkular/Projects/srlinux_auto/Configuration_Basics_22.6.pdf")],
    verbose=True,
    llm=llm
)

In [None]:
# Task for this agent, which includes making it understand the topology and device connections.
# Now this topology information can also be in a file which can be fetched by the agent.
# For purpose of demonstration, the topology is included in the prompt itself.
# Also a sample SR Linux configuratoin is included as a few shot prompting technique. 
# This can also be fetched from a configuration guide pdf (if required). But in this
# use-case, its added directly to the prompt.

prepare_device_config_task = Task(
				description=(
                    "You need to design a network and generate configurationg for network devices in a given topology by configuring ip on subinterfaces, bgp on srlinux nodes and interfaces, static routes on linux nodes.\n"
                    "##Topology Information\n"
                    "You are given a network topology of client1 ----leaf1----spine1----leaf2----clien2 where following is the interface connectivity:\n"
                    "# inter-switch links\n"
                    "- endpoints: [\"leaf1:ethernet-1/49\", \"spine1:ethernet-1/1\"]\n"
                    "- endpoints: [\"leaf2:ethernet-1/49\", \"spine1:ethernet-1/2\"]\n"
                    "# client links"
                    "- endpoints: [\"client1:eth1\", \"leaf1:ethernet-1/1\"]\n"
                    "- endpoints: [\"client2:eth1\", \"leaf2:ethernet-1/1\"]\n"
                    "##Generate Configs:\n"
                    "Generate configuration for each node. Client1 and Client2 are linux machines.\n"
                    "Use absolute path of the command for example /sbin/ip instead of ip.\n"
                    "You can choose your own ip allocation scheme for providing ip to all the node's interfaces.\n"
                    "IMPORTANT: Each connection pair must be in its own unique subnet. For example:\n"
                    "- client1-leaf1 must use a different subnet than leaf1-spine1\n"
                    "- leaf1-spine1 must use a different subnet than spine1-leaf2\n"
                    "- spine1-leaf2 must use a different subnet than leaf2-client2\n"
                    "DO NOT place client1 and client2 in the same subnet.\n"
                    "For end to end connectivity, Client1 should have a specific route pointing to Client2 ip with the next-hop as interface ip of leaf1.\n"
                    "Similarly Client2 should have a specific route pointing to Client1 ip with the next-hop as interface ip of leaf2.\n"
                    "leaf1, spine1 and leaf2, refer the srlinux documentation pdfs\n"
                    "In a typical example you would have to configure ip on a subinterface of an ethernet interface which is obtained from interface connectivity given above,\n"
                    "ensuring that ip is allocated within a same subnet to each given endpoints pair.\n"
                    "Then configure specific routes for client1 and client2 followed by bgp configs on leaf1, spine1 and leaf2.\n"
                    "Srlinux configs require to go in config mode first via \"enter candidate private\" cli command and is then followed by the interface or bgp config that needs to be applied, each of which needs to be prepended by \"set /\".\n"
                    "Also a routing-policy with default acction accept would be required for exporting and importing all routes in bgp.\n"
                    "and in order to commit all the configs it requires a \"commit now\" cli command.\n"
                    "Following is an example of interface and bgp configs on srlinux devices:\n"
                    "# enter candidate datastore\n"
                    "enter candidate private\n"
                    "# configure data interfaces\n"
                    "set / interface ethernet-1/1 admin-state enable\n"
                    "set / interface ethernet-1/1 subinterface 0 admin-state enable\n"
                    "set / interface ethernet-1/1 subinterface 0 ipv4 address 192.168.1.1/24\n"
                    "set / interface ethernet-1/1 subinterface 0 ipv4 admin-state enable\n"
                    "set / interface ethernet-1/1 subinterface 0 admin-state enable\n"
                    "# add interfaces to the default namespace"
                    "set / network-instance default interface ethernet-1/1.0\n"
                    "# configure BGP\n"
                    "set / network-instance default protocols bgp admin-state enable\n"
                    "set / network-instance default protocols bgp router-id 10.10.10.1\n"
                    "set / network-instance default protocols bgp autonomous-system 65001\n"
                    "set / network-instance default protocols bgp afi-safi ipv4-unicast admin-state enable\n"
                    "set / network-instance default protocols bgp group ebgp admin-state enable\n"
                    "set / network-instance default protocols bgp group ebgp export-policy [ export-lo ]\n"
                    "set / network-instance default protocols bgp group ebgp import-policy [ export-lo ]\n"
                    "set / network-instance default protocols bgp neighbor 192.168.1.2 admin-state enable\n"
                    "set / network-instance default protocols bgp neighbor 192.168.1.2 peer-group ebgp\n"
                    "set / network-instance default protocols bgp neighbor 192.168.1.2 peer-as 65002\n"
                    "# create policy to export and import routes to bgp\n"
                    "set / routing-policy policy export-lo default-action policy-result accept\n"
                    "# commit config\n"
                    "commit now\n"
                    "Make sure that when you generate the config for srlinux devices, it absolutely adheres to the example configuration provided above. Also ensure that first command is \"enter candidate private\"\n"
				),
                expected_output="List of finalized commands for each node.\n",
				agent=network_admin_agent
                )

## Network Configuration Specialist (Agent, tools and tasks)

In [None]:
# Since the containers are deployed within the sandbox vm, the tool for this agent needs to ssh into the sandbox vm
# (as its jumphost) and then ssh to the relevant linux containers.
class SSHViaJumpHostTool(BaseTool):
    name: str = "ssh_via_jump_host"
    description: str = """Executes commands on a remote host via an SSH jump host.
        Takes jump host details (hostname, username, password/private key),
        remote host details (hostname, username, password/private key),
        and the command to execute."""

    def _run(
        self,
        jump_host: str,
        jump_username: str,
        jump_password: str = None,
        remote_host: str = "",
        remote_username: str = "",
        remote_password: str = None,
        command: str = """""",
    ) -> str:
        """
        Executes a command on a remote host via an SSH jump host.

        Args:
            jump_host (str): Jump host IP or hostname.
            jump_username (str): Username for the jump host.
            jump_password (str, optional): Password for the jump host. Required if no private key is provided.
            jump_private_key (str, optional): Private key for the jump host. Required if no password is provided.
            remote_host (str): Remote host IP or hostname.
            remote_username (str): Username for the remote host.
            remote_password (str, optional): Password for the remote host. Required if no private key is provided.
            remote_private_key (str, optional): Private key for the remote host. Required if no password is provided.
            command (str): Command to execute on the remote host.

        Returns:
            str: Command output or error message.
        """
        try:
            jump_private_key = None
            remote_private_key = None
            # Create the SSH client for the jump host
            jump_client = paramiko.SSHClient()
            jump_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            # Authenticate with the jump host
            if jump_private_key:
                key = paramiko.RSAKey.from_private_key(io.StringIO(jump_private_key))
                jump_client.connect(jump_host, username=jump_username, pkey=key)
            elif jump_password:
                jump_client.connect(jump_host, username=jump_username, password=jump_password)
            else:
                return "Error: Either password or private key must be provided for the jump host."

            # Use the jump host to connect to the remote host
            jump_transport = jump_client.get_transport()
            dest_addr = (remote_host, 22)
            local_addr = (jump_host, 22)
            jump_channel = jump_transport.open_channel("direct-tcpip", dest_addr, local_addr)

            # Create the SSH client for the remote host
            remote_client = paramiko.SSHClient()
            remote_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # Authenticate with the remote host
            if remote_private_key:
                key = paramiko.RSAKey.from_private_key(io.StringIO(remote_private_key))
                remote_client.connect(remote_host, username=remote_username, pkey=key, sock=jump_channel)
            elif remote_password:
                remote_client.connect(remote_host, username=remote_username, password=remote_password, sock=jump_channel)
            else:
                return "Error: Either password or private key must be provided for the remote host."

            # Execute the command on the remote host
            stdin, stdout, stderr = remote_client.exec_command(command)
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')

            # Close connections
            remote_client.close()
            jump_client.close()
            print(f"sleeping for 5 secs for configs to get applied")
            time.sleep(5)
            # Return output or error
            return output if output else error
        except Exception as e:
            return f"Error: {str(e)}"

    async def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async execution.")

In [None]:
# Simmilarly for sending commands to SR Linux containers while using sandbox VM as jumphost,
# Following tool will be used.
class SendCommandsSrLinux(BaseTool):
    name: str = "Execute commands on Srlinux containers"
    description: str = """Executes commands on a remote srlinux container via an SSH jump host.
        Takes jump host details (hostname, username, password),
        remote srlinux host details (hostname, username, password),
        and the list of commands to execute."""
    
    def _run(
        self,
        jump_host: str,
        jump_username: str,
        jump_password: str,
        remote_host_name: str,
        remote_username: str,
        remote_password: str,
        command_list: List[str],
    ) -> str:
        """
        Executes a command on a remote srlinux via an SSH jump host.

        Args:
            jump_host (str): Jump host IP or hostname.
            jump_username (str): Username for the jump host.
            jump_password (str): Password for the jump host. Required if no private key is provided.
            remote_host (str): Remote host IP or hostname.
            remote_username (str): Username for the remote host.
            remote_password (str): Password for the remote host. Required if no private key is provided.
            command_list (List[str]): List of Commands to execute on the remote host.

        Returns:
            str: Command output or error message.
        """

        try:
            device = {
            'device_type': 'terminal_server',
            'host': jump_host,
            'username': jump_username,
            'password': jump_password
            }

            print(f"device dict is: {device}")
            net_connect = ConnectHandler(**device)
            net_connect.write_channel(f"ssh {remote_username}@{remote_host_name}\n")
            time.sleep(5)
            print("slept for 5 secs")
            output = net_connect.read_channel()
            print(f"output is:{output}")
            if "assword" in output:
                net_connect.write_channel(f"{remote_password}\n")

            print(f"Router prompt is: {net_connect.find_prompt()}")

            redispatch(net_connect, device_type='nokia_srl')
            print(f"command list is:\n {command_list}")
            print(f"Router prompt is: {net_connect.find_prompt()}")
            new_output = net_connect.send_config_set(config_commands=command_list, exit_config_mode=False, config_mode_command="enter candidate private", cmd_verify=False)
            print(f"new output: {new_output}")
            net_connect.disconnect()
            print(f"sleeping for 5 secs in order for peering to be established")
            time.sleep(5)
            return new_output
        except Exception as e:
            if new_output:
                print(f"new output: {new_output}")
            return f"Error: {str(e)}"

    async def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async execution.")

In [None]:
# Define the Network Configuration Specialist Agent
network_configuration_agent = Agent(
    role="Network Configuration Specialist",
    goal="Design a given network topology and provide the configuration of each device.\n"
    "This includes configuring ip on interfaces, static routes and BGP on the specified nodes based on the topology.",
    backstory="""
    You are a seasoned network configuration specialist with years of experience configuring and optimizing high-performance networks in mission-critical environments. 
    Renowned for your expertise in dynamic routing protocols, especially BGP, you have an innate ability to understand complex topologies and establish seamless connectivity
    """,
    tools=[SSHViaJumpHostTool(), SendCommandsSrLinux()],
    verbose=True,
    llm=llm
)

In [None]:
# Now, this agent needs to ssh into the linux and SR Linux containers and configure them 
# according to the configuration generated by Senior Network Administrator Agent.
# Here also the prompt has login information for all devices for demonstrative purposes.
# They can be stored in a file which can be fetched by the agent.

network_config_task = Task(
    description=(
    "Use ssh_via_jump_host tool to login by using following login credentials and execute cli commands obtained from {bgp_config_task_output} for each of the node:\n"
    "client1:\n"
    "jump_host: 192.168.64.2, \n"
    "jump_username: debian, \n"
    "jump_password: debian \n"
    "remote_host: client1, \n"
    "remote_username: user, \n"
    "remote_password: multit00l, \n"
    "client2:\n"
    "jump_host: 192.168.64.2, \n"
    "jump_username: debian, \n"
    "jump_password: debian \n"
    "remote_host: client2, \n"
    "remote_username: user, \n"
    "remote_password: multit00l, \n"
    "leaf1:\n"
    "jump_host: 192.168.64.2, \n"
    "jump_username: debian, \n"
    "jump_password: debian \n"
    "remote_host_name: leaf1, \n"
    "remote_username: admin, \n"
    "remote_password: NokiaSrl1!, \n"
    "leaf2:\n"
    "jump_host: 192.168.64.2, \n"
    "jump_username: debian, \n"
    "jump_password: debian \n"
    "remote_host_name: leaf2, \n"
    "remote_username: admin, \n"
    "remote_password: NokiaSrl1!, \n"
    "spine1:\n"
    "jump_host: 192.168.64.2, \n"
    "jump_username: debian, \n"
    "jump_password: debian \n"
    "remote_host_name: spine1, \n"
    "remote_username: admin, \n"
    "remote_password: NokiaSrl1!, \n"
    "You need to execute the cli commands  on the respective node. For any commands requiring sudo, the sudo password is: debian\n"
    "Client1 and Client2 are linux machines.\n"
    "Use SSHViaJumpHostTool to configure the linux machines.\n"
    "leaf1, spine1 and leaf2 are srlinux routers.\n"
    "Use SendCommandsSrLinux tool to configure or send any command to srlinux routers.\n"
    "Verify that bgp peering \"State\" is \"established\" bgp peers on all leaf1, spine1 and leaf2.\n"
    "Wait for 10 secs so that routes can converge.\n"
    "Finally check the connectivity from client1 ip to client2 ip.\n"
    "BGP routes take time to converge. So if ping fails then retry after 10secs.\n"
    ),
    expected_output="A report in the markdown format containing the topology information (devices, interfaces, ip addresses and bgp info), configuration commands that were executed and result of ping output",
    agent=network_configuration_agent
)

## Define the Crew

In [None]:
crew = Crew(
    agents=[doc_specialist, network_configuration_agent, network_admin_agent, network_configuration_agent],
    tasks=[doc_task, remote_ssh_task, prepare_device_config_task, network_config_task],
    process=Process.sequential,
    verbose=True
)

## Now kickoff the Crew to do its job!

In [None]:
result = crew.kickoff(inputs={})