In [1]:
%cat nornir_inventory.py

from nornir.core.deserializer.inventory import Inventory
import io


class MyInventory(Inventory):
    def __init__(self, **kwargs):
        hosts = {
            "sw1": {
                "data": {
                    "foo": "bar",
                    "log": io.BytesIO()
                },
                "hostname": "192.168.123.21",
                "username": "neops",
                "password": "cisco",
                "platform": "ios",
                "groups": [],
                "connection_options": {
                    "napalm": {
                        "extras": {
                            "optional_args": {}
                        }
                    },
                    "netmiko": {
                        "extras": {}
                    }
                }
            },
            "sw2": {
                "data": {
                    "foo": "foo",
                    "log": io.BytesIO()
                },
                "host

In [2]:
from nornir import InitNornir
from nornir.core.deserializer.inventory import Inventory
nr = InitNornir(
            core={"num_workers": 20},
            dry_run=True,
            logging={
                "enabled": False
            },
            inventory={
                "plugin": "nornir_inventory.MyInventory",
            },
        )

In [3]:
nr.filter(name="sw1").inventory.hosts.items()

dict_items([('sw1', Host: sw1)])

In [4]:
from nornir.core.filter import F
nr.filter(F(data__foo="bar") | F(data__foo="foo")).inventory.hosts.keys()

dict_keys(['sw1', 'sw2'])

In [5]:
from nornir.plugins.tasks import networking

result = nr.run(
    task=networking.napalm_cli,
    commands=["show version"]
)

In [6]:
from nornir.plugins.functions.text import print_result
print_result(result)

[1m[36mnapalm_cli**********************************************************************[0m
[0m[1m[34m* sw1 ** changed : False *******************************************************[0m
[0m[1m[32mvvvv napalm_cli ** changed : False vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv INFO[0m
[0m{[0m [0m'show version'[0m: [0m'Cisco IOS Software, vios_l2 Software '[0m
                  [0m'(vios_l2-ADVENTERPRISEK9-M), Version '[0m
                  [0m'15.2(CML_NIGHTLY_20180619)FLO_DSGS7, EARLY DEPLOYMENT '[0m
                  [0m'DEVELOPMENT BUILD, synced to  V152_6_0_81_E\n'[0m
                  [0m'Technical Support: http://www.cisco.com/techsupport\n'[0m
                  [0m'Copyright (c) 1986-2018 by Cisco Systems, Inc.\n'[0m
                  [0m'Compiled Tue 19-Jun-18 06:06 by mmen\n'[0m
                  [0m'\n'[0m
                  [0m'\n'[0m
                  [0m'ROM: Bootstrap program is IOSv\n'[0m
                  [0m'\n'[0m
                  [

In [7]:
from nornir.core.task import Result, Task

def my_task(task: Task, **kwargs):
    print(f"run task on {task.host}\n")
    return Result(host=task.host, result=result)

nr.run(
    task=my_task
)

run task on sw1
run task on sw2
[0m[0m

[0m[0m

AggregatedResult (my_task): {'sw1': MultiResult: [Result: "my_task"], 'sw2': MultiResult: [Result: "my_task"]}

In [8]:
def my_subtask(task: Task, **kwargs):
    print(f"run subtask on {task.host}\n")
    return Result(host=task.host, result=f"run subtask on {task.host}")

def my_task(task: Task, **kwargs):
    print(f"run task on {task.host}\n")
    r = task.run(task=my_subtask)
    return Result(host=task.host, result=r.result)

nr.run(
    task=my_task
)

run task on sw1
run task on sw2
[0m
[0m
[0mrun subtask on sw1
[0m[0m
run subtask on sw2
[0m[0m
[0m

AggregatedResult (my_task): {'sw1': MultiResult: [Result: "my_task", Result: "my_subtask"], 'sw2': MultiResult: [Result: "my_task", Result: "my_subtask"]}

In [9]:
from nornir.core.processor import Processor
from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task
import io

class PrintTaskState(Processor):
    def task_started(self, task: Task) -> None:
        print(f'task {task.name} started\n')

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        print(f'task {task.name} completed\n')

    def task_instance_started(self, task: Task, host: Host) -> None:
        print(f'task instance {task.name} on {host.name} started\n')

    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        print(f'task instance {task.name} on {host.name} completed\n')

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        print(f'subtask instance {task.name} on {host.name} started\n')

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        print(f'subtask instance {task.name} on {host.name} completed\n')


class TaskLog(Processor):
    def task_started(self, task: Task) -> None:
        pass

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        pass

    def task_instance_started(self, task: Task, host: Host) -> None:
        pass

    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        logger = host.data.get('log')
        if logger and isinstance(logger, io.BytesIO):
            last_log = logger.getvalue().decode()
            print(f"log of {host.name}\n{last_log}\n")
            logger.truncate(0)
            logger.seek(0)

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass
    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        logger = host.data.get('log')
        if logger and isinstance(logger, io.BytesIO):
            last_log = logger.getvalue().decode()
            print(f"log of {host.name}\n{last_log}\n")
            logger.truncate(0)
            logger.seek(0)

In [10]:
nr_pr = nr.with_processors([PrintTaskState(), TaskLog()])

In [11]:
from nornir.plugins.tasks import networking

def my_task(task: Task, **kwargs):
    print(f"run task on {task.host}\n")
    r1 = task.run(
        task=networking.napalm_cli,
        commands=["show version"]
    )
    r2 = task.run(
        task=networking.napalm_cli,
        commands=["show int desc"]
    )
    return Result(host=task.host, result=True)


nr_pr.run(
    task=my_task
)

task my_task started
[0m
[0mtask instance my_task on sw1 started
task instance my_task on sw2 started
[0m
[0m
[0mrun task on sw1
[0mrun task on sw2
[0m
[0m
[0m[0msubtask instance napalm_cli on sw1 started
subtask instance napalm_cli on sw2 started
[0m
[0m
[0m[0msubtask instance napalm_cli on sw2 completed
subtask instance napalm_cli on sw1 completed
[0m
[0m
[0mlog of sw2

**************************************************************************
* IOSv is strictly limited to use for evaluation, demonstration and IOS  *
* education. IOSv is provided as-is and is not supported by Cisco's      *
* Technical Advisory Center. Any use or disclosure, in whole or in part, *
* of the IOSv Software or Documentation to any third party for any       *
* purposes is expressly prohibited except as otherwise authorized by     *
* Cisco in writing.                                                      *
**************************************************************************
sw02#
sw

[0m
[0m
[0msubtask instance napalm_cli on sw2 started
[0msubtask instance napalm_cli on sw1 started
[0m
[0m
[0m[0msubtask instance napalm_cli on sw1 completed
subtask instance napalm_cli on sw2 completed
[0m
[0m
[0m[0mlog of sw2

sw02#show int desc
Interface                      Status         Protocol Description
Gi0/0                          up             up       dist-sw01@gi1/0
Gi0/1                          down           down     
Gi0/2                          down           down     
Gi0/3                          down           down     
Gi1/0                          down           down     
Gi1/1                          down           down     
Gi1/2                          down           down     
Gi1/3                          down           down     
Gi2/0                          up             up       CLIENT-C
Gi2/1                          down           down     
Gi2/2                          down           down     
Gi2/3                          do

AggregatedResult (my_task): {'sw1': MultiResult: [Result: "my_task", Result: "napalm_cli", Result: "napalm_cli"], 'sw2': MultiResult: [Result: "my_task", Result: "napalm_cli", Result: "napalm_cli"]}

In [12]:
nr_pr.data.reset_failed_hosts()

def my_subtask(task: Task, **kwargs):
    if task.host.name == "sw1":
        raise Exception("failed")
    return Result(host=task.host, result=True)

def my_task(task: Task, **kwargs):
    r = task.run(task=my_subtask, name="subtask1")
    r = task.run(task=my_subtask, name="subtask2")
    return Result(host=task.host, result=r.result)

nr_pr.run(
    task=my_task
)


task my_task started
[0m
[0mtask instance my_task on sw1 started
task instance my_task on sw2 started
[0m
[0m
[0msubtask instance subtask1 on sw1 started
[0msubtask instance subtask1 on sw2 started
[0m
[0m
[0m[0msubtask instance subtask1 on sw2 completed
[0m
[0mlog of sw2



Host 'sw1': task 'subtask1' failed with traceback:
Traceback (most recent call last):
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 85, in start
    r = self.task(self, **self.params)
  File "<ipython-input-12-70c096714b66>", line 5, in my_subtask
    raise Exception("failed")
Exception: failed

[0m

[0m
subtask instance subtask1 on sw1 completed
[0msubtask instance subtask2 on sw2 started
[0m[0m

[0mlog of sw1

[0msubtask instance subtask2 on sw2 completed
[0m[0m

[0mlog of sw2



Host 'sw1': task 'my_task' failed with traceback:
Traceback (most recent call last):
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 85, in start
    r = self.task(self, **self.params)
  File "<ipython-input-12-70c096714b66>", line 9, in my_task
    r = task.run(task=my_subtask, name="subtask1")
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 147, in run
    raise NornirSubTaskError(task=task, result=r)
nornir.core.exceptions.NornirSubTaskError: Subtask: <function my_subtask at 0x104c94e50> (failed)




[0m[0m
[0mtask instance my_task on sw2 completed


[0m

[0mtask instance my_task on sw1 completed

[0mlog of sw2

[0m
[0m
[0mlog of sw1

[0m[0m
[0mtask my_task completed
[0m
[0m

AggregatedResult (my_task): {'sw1': MultiResult: [Result: "my_task", Result: "subtask1"], 'sw2': MultiResult: [Result: "my_task", Result: "subtask1", Result: "subtask2"]}

In [13]:
nr_pr.data.reset_failed_hosts()

def my_subtask(task: Task, **kwargs):
    if task.host.name == "sw1":
        raise Exception("failed")
    return Result(host=task.host, result=True)

def my_task(task: Task, **kwargs):
    try:
        r = task.run(task=my_subtask, name="subtask1")
    except Exception:
            task.results.pop()

    r = task.run(task=my_subtask, name="subtask2")
    return Result(host=task.host, result=r.result)

nr_pr.run(
    task=my_task
)

task my_task started
[0m
[0mtask instance my_task on sw1 started
task instance my_task on sw2 started
[0m[0m

[0m[0msubtask instance subtask1 on sw2 started
subtask instance subtask1 on sw1 started
[0m[0m

[0m[0msubtask instance subtask1 on sw2 completed


Host 'sw1': task 'subtask1' failed with traceback:
Traceback (most recent call last):
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 85, in start
    r = self.task(self, **self.params)
  File "<ipython-input-13-c2e5aaac46b9>", line 5, in my_subtask
    raise Exception("failed")
Exception: failed



[0m


[0m

[0mlog of sw2

subtask instance subtask1 on sw1 completed
[0m
[0m[0msubtask instance subtask2 on sw2 started

[0m[0mlog of sw1


[0m[0m
subtask instance subtask2 on sw2 completed
[0msubtask instance subtask2 on sw1 started
[0m
[0m
[0mlog of sw2



Host 'sw1': task 'subtask2' failed with traceback:
Traceback (most recent call last):
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 85, in start
    r = self.task(self, **self.params)
  File "<ipython-input-13-c2e5aaac46b9>", line 5, in my_subtask
    raise Exception("failed")
Exception: failed



[0m[0m
[0mtask instance my_task on sw2 completed


[0m

[0m
subtask instance subtask2 on sw1 completed
[0mlog of sw2

[0m
[0m[0mlog of sw1


[0m


Host 'sw1': task 'my_task' failed with traceback:
Traceback (most recent call last):
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 85, in start
    r = self.task(self, **self.params)
  File "<ipython-input-13-c2e5aaac46b9>", line 14, in my_task
    r = task.run(task=my_subtask, name="subtask2")
  File "/Users/obi/.local/share/virtualenvs/pud-2020-A32_uhK5/lib/python3.8/site-packages/nornir/core/task.py", line 147, in run
    raise NornirSubTaskError(task=task, result=r)
nornir.core.exceptions.NornirSubTaskError: Subtask: <function my_subtask at 0x107919dc0> (failed)


[0m

[0m[0mtask instance my_task on sw1 completed
[0m
[0mlog of sw1

[0m
[0mtask my_task completed
[0m
[0m

AggregatedResult (my_task): {'sw1': MultiResult: [Result: "my_task", Result: "subtask2"], 'sw2': MultiResult: [Result: "my_task", Result: "subtask1", Result: "subtask2"]}