# nfstream: a flexible network data analysis framework

[**nfstream**][repo] is a Python package providing fast, flexible, and expressive data structures designed to make working with **online** or **offline** network data both easy and intuitive. It aims to be the fundamental high-level building block for
doing practical, **real world** network data analysis in Python. Additionally, it has
the broader goal of becoming **a common network data processing framework for researchers** providing data reproducibility across experiments.

* **Performance:** **nfstream** is designed to be fast (x10 faster with pypy3 support) with a small CPU and memory footprint.
* **Layer-7 visibility:** **nfstream** deep packet inspection engine is based on [**nDPI**][ndpi]. It allows nfstream to perform [**reliable**][reliable] encrypted applications identification and metadata extraction (e.g. TLS, QUIC, TOR, HTTP, SSH, DNS, etc.).
* **Flexibility:** add a flow feature in 2 lines as an [**NFPlugin**][nfplugin].
* **Machine Learning oriented:** add your trained model as an [**NFPlugin**][nfplugin].

In this notebook, we demonstrate a subset of features provided by [**nfstream**][repo].

[documentation]: https://nfstream.readthedocs.io/en/latest/index.html
[ndpi]: https://github.com/ntop/nDPI
[nfplugin]: https://nfstream.readthedocs.io/en/latest/plugins.html
[reliable]: http://people.ac.upc.edu/pbarlet/papers/ground-truth.pam2014.pdf
[repo]: https://github.com/aouinizied/nfstream

In [1]:
from nfstream import NFStreamer, NFPlugin
import pandas as pd
pd.set_option('display.max_columns', 500)

## Flow aggregation made simple

In the following, we are going to use the main object provided by nfstream, `NFStreamer` which have the following parameters:

* `source` [default= `None` ]: Source of packets. Possible values: `live_interface_name` or  `pcap_file_path`.
* `snaplen` [default= `65535` ]: Packet capture length.
* `idle_timeout` [default= `30` ]: Flows that are inactive for more than this value in seconds will be exported.
* `active_timeout` [default= `300` ]: Flows that are active for more than this value in seconds will be exported.
* `plugins` [default= `()` ]: Set of user defined NFPlugins.
* `dissect` [default= `True` ]: Enable nDPI deep packet inspection library for Layer 7 visibility.
* `max_tcp_dissections` [default= `10` ]: Maximum per flow TCP packets to dissect (ignored when dissect=False).
* `max_udp_dissections` [default= `16` ]: Maximum per flow UDP packets to dissect (ignored when dissect=False).

`NFStreamer` returns a flow iterator. We can iterate over flows or convert it directly to pandas Dataframe using `to_pandas()` method.

In [2]:
df = NFStreamer(source="pcaps/instagram.pcap").to_pandas()

In [3]:
df.head()

Unnamed: 0,id,first_seen,last_seen,version,src_port,dst_port,protocol,vlan_id,src_ip,dst_ip,total_packets,total_bytes,duration,src2dst_packets,src2dst_bytes,dst2src_packets,dst2src_bytes,expiration_id,master_protocol,app_protocol,application_name,category_name,client_info,server_info,j3a_client,j3a_server
0,27,1436720950909,1436720952614,4,80,58216,6,4,31.13.86.52,192.168.0.103,150,153558,1705,103,150456,47,3102,0,7,119,HTTP.Facebook,SocialNetwork,,,,
1,19,1436720908533,1436720908579,4,26540,53,17,4,192.168.0.103,8.8.8.8,2,298,46,1,89,1,209,0,5,211,DNS.Instagram,SocialNetwork,,igcdn-photos-g-a.akamaihd.net,,
2,6,1436720901182,1436720908544,4,33976,80,6,4,192.168.0.103,77.67.29.17,34,29039,7362,14,924,20,28115,0,0,7,HTTP,Web,,,,
3,23,1436720908581,1436720908769,4,60908,443,6,4,192.168.0.103,46.33.70.136,19,9340,188,10,1369,9,7971,0,91,211,TLS.Instagram,SocialNetwork,igcdn-photos-g-a.akamaihd.net,"a248.e.akamai.net,*.akamaihd.net,*.akamaihd-st...",54ae5fcb0159e2ddf6a50e149221c7c7,34d6f0ad0a79e4cfdf145e640cc93f78
4,29,1436720952561,1436720952561,4,58690,443,6,4,192.168.0.103,46.33.70.159,2,169,0,2,169,0,0,0,0,91,TLS,Web,,,,


Now that we have our Dataframe, we can start analyzing our data as any data. For example we can compute additional features:

* Compute data ratio on both direction (src2dst and dst2src)

In [4]:
df["src2dst_bytes_data_ratio"] = df['src2dst_bytes'] / df['total_bytes']
df["dst2src_bytes_data_ratio"] = df['dst2src_bytes'] / df['total_bytes']

In [5]:
df.head()

Unnamed: 0,id,first_seen,last_seen,version,src_port,dst_port,protocol,vlan_id,src_ip,dst_ip,total_packets,total_bytes,duration,src2dst_packets,src2dst_bytes,dst2src_packets,dst2src_bytes,expiration_id,master_protocol,app_protocol,application_name,category_name,client_info,server_info,j3a_client,j3a_server,src2dst_bytes_data_ratio,dst2src_bytes_data_ratio
0,27,1436720950909,1436720952614,4,80,58216,6,4,31.13.86.52,192.168.0.103,150,153558,1705,103,150456,47,3102,0,7,119,HTTP.Facebook,SocialNetwork,,,,,0.979799,0.020201
1,19,1436720908533,1436720908579,4,26540,53,17,4,192.168.0.103,8.8.8.8,2,298,46,1,89,1,209,0,5,211,DNS.Instagram,SocialNetwork,,igcdn-photos-g-a.akamaihd.net,,,0.298658,0.701342
2,6,1436720901182,1436720908544,4,33976,80,6,4,192.168.0.103,77.67.29.17,34,29039,7362,14,924,20,28115,0,0,7,HTTP,Web,,,,,0.031819,0.968181
3,23,1436720908581,1436720908769,4,60908,443,6,4,192.168.0.103,46.33.70.136,19,9340,188,10,1369,9,7971,0,91,211,TLS.Instagram,SocialNetwork,igcdn-photos-g-a.akamaihd.net,"a248.e.akamai.net,*.akamaihd.net,*.akamaihd-st...",54ae5fcb0159e2ddf6a50e149221c7c7,34d6f0ad0a79e4cfdf145e640cc93f78,0.146574,0.853426
4,29,1436720952561,1436720952561,4,58690,443,6,4,192.168.0.103,46.33.70.159,2,169,0,2,169,0,0,0,0,91,TLS,Web,,,,,1.0,0.0


* Filter data according to some criterias:

In [6]:
df[df["dst_port"] == 443].head()

Unnamed: 0,id,first_seen,last_seen,version,src_port,dst_port,protocol,vlan_id,src_ip,dst_ip,total_packets,total_bytes,duration,src2dst_packets,src2dst_bytes,dst2src_packets,dst2src_bytes,expiration_id,master_protocol,app_protocol,application_name,category_name,client_info,server_info,j3a_client,j3a_server,src2dst_bytes_data_ratio,dst2src_bytes_data_ratio
3,23,1436720908581,1436720908769,4,60908,443,6,4,192.168.0.103,46.33.70.136,19,9340,188,10,1369,9,7971,0,91,211,TLS.Instagram,SocialNetwork,igcdn-photos-g-a.akamaihd.net,"a248.e.akamai.net,*.akamaihd.net,*.akamaihd-st...",54ae5fcb0159e2ddf6a50e149221c7c7,34d6f0ad0a79e4cfdf145e640cc93f78,0.146574,0.853426
4,29,1436720952561,1436720952561,4,58690,443,6,4,192.168.0.103,46.33.70.159,2,169,0,2,169,0,0,0,0,91,TLS,Web,,,,,1.0,0.0
11,32,1568796253770,1568796268061,4,49355,443,6,4,192.168.2.17,31.13.86.52,1366,1310382,14291,456,33086,910,1277296,0,91,211,TLS.Instagram,SocialNetwork,scontent-mxp1-1.cdninstagram.com,,7a29c223fb122ec64d10f0a159e07996,f4febc55ea12b31ae17cfb7e614afda8,0.025249,0.974751
13,33,1568796254514,1568796268054,4,49357,443,6,4,192.168.2.17,31.13.86.52,144,107306,13540,63,6340,81,100966,0,91,211,TLS.Instagram,SocialNetwork,scontent-mxp1-1.cdninstagram.com,,44dab16d680ef93487bc16ad23b3ffb1,,0.059083,0.940917
14,34,1568796254515,1568796268054,4,49358,443,6,4,192.168.2.17,31.13.86.52,388,309238,13539,165,14193,223,295045,0,91,211,TLS.Instagram,SocialNetwork,scontent-mxp1-1.cdninstagram.com,,44dab16d680ef93487bc16ad23b3ffb1,,0.045897,0.954103


## Extend nfstream

In some use cases, we need to add features that are computed as packet level. Thus, nfstream handles such scenaro using [**NFPlugin**][nfplugin].

[nfplugin]: https://nfstream.readthedocs.io/en/latest/plugins.html

* Let's suppose that we want ACK TCP flag counter per flow.

In [7]:
class ack_count(NFPlugin):
    def on_init(self, pkt): # flow creation with the first packet
        if pkt.tcpflags.ack == 1:
            return 1
        else:
            return 0
    def on_update(self, pkt, flow): # flow update with each packet belonging to the flow
        if pkt.tcpflags.ack == 1:
            flow.ack_count += 1

In [8]:
df = NFStreamer(source="pcaps/instagram.pcap", plugins=[ack_count()]).to_pandas()

In [9]:
df.head()

Unnamed: 0,id,first_seen,last_seen,version,src_port,dst_port,protocol,vlan_id,src_ip,dst_ip,total_packets,total_bytes,duration,src2dst_packets,src2dst_bytes,dst2src_packets,dst2src_bytes,expiration_id,master_protocol,app_protocol,application_name,category_name,client_info,server_info,j3a_client,j3a_server,ack_count
0,27,1436720950909,1436720952614,4,80,58216,6,4,31.13.86.52,192.168.0.103,150,153558,1705,103,150456,47,3102,0,7,119,HTTP.Facebook,SocialNetwork,,,,,150
1,19,1436720908533,1436720908579,4,26540,53,17,4,192.168.0.103,8.8.8.8,2,298,46,1,89,1,209,0,5,211,DNS.Instagram,SocialNetwork,,igcdn-photos-g-a.akamaihd.net,,,0
2,6,1436720901182,1436720908544,4,33976,80,6,4,192.168.0.103,77.67.29.17,34,29039,7362,14,924,20,28115,0,0,7,HTTP,Web,,,,,34
3,23,1436720908581,1436720908769,4,60908,443,6,4,192.168.0.103,46.33.70.136,19,9340,188,10,1369,9,7971,0,91,211,TLS.Instagram,SocialNetwork,igcdn-photos-g-a.akamaihd.net,"a248.e.akamai.net,*.akamaihd.net,*.akamaihd-st...",54ae5fcb0159e2ddf6a50e149221c7c7,34d6f0ad0a79e4cfdf145e640cc93f78,18
4,29,1436720952561,1436720952561,4,58690,443,6,4,192.168.0.103,46.33.70.159,2,169,0,2,169,0,0,0,0,91,TLS,Web,,,,,2


Our Dataframe have a new column named `ack_count`.

In some cases, we need volatile features.
Let's have an example use case as following:

* We want to compute the maximum per flow  packet inter arrival time.
* Our feature will be based on iat that we do not want as feature.

In [10]:
class iat(NFPlugin):
    def on_init(self, pkt):
        return [-1, pkt.time] # [iat value, last packet timestamp]
    def on_update(self, pkt, flow):
        flow.iat = [pkt.time - flow.iat[1], pkt.time]

class maximum_iat_ms(NFPlugin):
    def on_init(self, pkt):
        return -1 # we will set it as -1 as init value
    def on_update(self, pkt, flow):
        if flow.iat[0] > flow.maximum_iat_ms:
            flow.maximum_iat_ms = flow.iat[0]

In [11]:
df = NFStreamer(source="pcaps/instagram.pcap", plugins=[iat(volatile=True), maximum_iat_ms()]).to_pandas()

In [12]:
df.head()

Unnamed: 0,id,first_seen,last_seen,version,src_port,dst_port,protocol,vlan_id,src_ip,dst_ip,total_packets,total_bytes,duration,src2dst_packets,src2dst_bytes,dst2src_packets,dst2src_bytes,expiration_id,master_protocol,app_protocol,application_name,category_name,client_info,server_info,j3a_client,j3a_server,maximum_iat_ms
0,27,1436720950909,1436720952614,4,80,58216,6,4,31.13.86.52,192.168.0.103,150,153558,1705,103,150456,47,3102,0,7,119,HTTP.Facebook,SocialNetwork,,,,,1247
1,19,1436720908533,1436720908579,4,26540,53,17,4,192.168.0.103,8.8.8.8,2,298,46,1,89,1,209,0,5,211,DNS.Instagram,SocialNetwork,,igcdn-photos-g-a.akamaihd.net,,,46
2,6,1436720901182,1436720908544,4,33976,80,6,4,192.168.0.103,77.67.29.17,34,29039,7362,14,924,20,28115,0,0,7,HTTP,Web,,,,,7322
3,23,1436720908581,1436720908769,4,60908,443,6,4,192.168.0.103,46.33.70.136,19,9340,188,10,1369,9,7971,0,91,211,TLS.Instagram,SocialNetwork,igcdn-photos-g-a.akamaihd.net,"a248.e.akamai.net,*.akamaihd.net,*.akamaihd-st...",54ae5fcb0159e2ddf6a50e149221c7c7,34d6f0ad0a79e4cfdf145e640cc93f78,56
4,29,1436720952561,1436720952561,4,58690,443,6,4,192.168.0.103,46.33.70.159,2,169,0,2,169,0,0,0,0,91,TLS,Web,,,,,0


Our Dataframe have a new column named `maximum_iat_ms` containing the maximum observed packet 
inter arrval time per flow and set to -1 when there is only 1 packet.