First install the ntopng python package in the current Jupyter kernel

In [None]:
import sys
!{sys.executable} -m pip install ntopng

Then we need to import libraries and set some defaults.

In [None]:
import os
import sys
import getopt
import time

from ntopng.ntopng import Ntopng
from ntopng.interface import Interface
from ntopng.host import Host
from ntopng.historical import Historical
from ntopng.flow import Flow

# Defaults
username     = "admin"
password     = "admin"
ntopng_url   = "http://localhost:3000"
iface_id     = 0
auth_token   = None
enable_debug = False
epoch_end    = int(time.time())
epoch_begin  = epoch_end - 3600
maxhits      = 10

The we define a few functions for playing with data stored in the ntopng ClickHouse database

In [None]:
def format_rsp(rsp):
    for row in rsp:
        print(row)

def top_x_remote_ipv4_hosts(my_historical, epoch_begin, epoch_end, maxhits):
    select_clause = "IPV4_DST_ADDR,SUM(TOTAL_BYTES) TOT"
    where_clause  = "(SERVER_LOCATION=1)"
    group_by      = "IPV4_DST_ADDR_FORMATTED"
    order_by      = "TOT DESC"

    rsp = my_historical.get_flows(iface_id, epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
    format_rsp(rsp)

def top_x_remote_ipv4_hosts_ports(my_historical, epoch_begin, epoch_end, maxhits):
    select_clause = "IPV4_DST_ADDR,SUM(TOTAL_BYTES) TOT,IP_DST_PORT"
    where_clause  = "(SERVER_LOCATION=1)"
    group_by      = "IPV4_DST_ADDR_FORMATTED,IP_DST_PORT"
    order_by      = "TOT DESC"

    rsp = my_historical.get_flows(iface_id, epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
    format_rsp(rsp)

def top_x_remote_ports(my_historical, epoch_begin, epoch_end, maxhits):
    select_clause = "SUM(TOTAL_BYTES) TOT,IP_DST_PORT"
    where_clause  = "(SERVER_LOCATION=1)"
    group_by      = "IP_DST_PORT"
    order_by      = "TOT DESC"

    rsp = my_historical.get_flows(iface_id, epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
    format_rsp(rsp)

Finally we define out main program that connects to the ntopng instance we have defined


In [None]:
try:
    my_ntopng = Ntopng(username, password, auth_token, ntopng_url)

    if(enable_debug):
        my_ntopng.enable_debug()        
except ValueError as e:
    print(e)
    os._exit(-1)


Then executes the above functions

In [None]:
try:
    my_historical = Historical(my_ntopng)

    print("\n\n==========================\nTop X Remote Hosts Traffic")
    top_x_remote_ipv4_hosts(my_historical, epoch_begin, epoch_end, maxhits)
    print("\n\n==========================\nTop X Remote Host/Ports Traffic")
    top_x_remote_ipv4_hosts_ports(my_historical, epoch_begin, epoch_end, maxhits)
    print("\n\n==========================\nTop X Remote Ports Traffic")
    top_x_remote_ports(my_historical, epoch_begin, epoch_end, maxhits)
    
except ValueError as e:
    print(e)
    os._exit(-1)