# Analyzing Netflow Data with xGT

This sample script loads raw NetFlow data in an xGT graph structure and queries for a graph pattern.

The dataset used is from the CTU-13 open source project:
https://mcfp.weebly.com/the-ctu-13-dataset-a-labeled-dataset-with-botnet-normal-and-background-traffic.html

Raw data example:

```
StartTime   SrcAddr       DstAddr       State  sTos  dTos  TotPkts  TotBytes
2011/08/16  147.32.86.58  77.75.73.9    SR_A   0.0   0.0   3        182
2011/08/16  147.32.3.51   147.32.84.46  S_RA   0.0   0.0   4        124
```

This notebook follows this sequence of steps:

1. Read the input netflow file
2. Enrich the IP nodes with geo-location
3. Create graph schema
4. Upload the data to the Trovares xGT server
5. Run a query

In [1]:
import numpy as np
import pandas as pd
import sys
import csv
import re
import os
import xgt

# For cloud instances, replace the localhost with the instance's IP address or use ssh tunneling
server = xgt.Connection(host='localhost', userid='xgtd')
server.set_default_namespace('ctu13')
xgt.__version__

'1.11.1'

## 1. Read the input netflow file

- Read the input netflow file from the file system into the pandas Dataframe.
- Do data transformations to align with Trovares xGT

In [2]:
%%time

def cleanup_data(x):
  if x == '':
    return pd.NA
  elif isinstance(x, str):
    return int(x, 16)
  return x

# Ingest data, translating datetime format to ISO standard.
input_filename = "https://mcfp.felk.cvut.cz/publicDatasets/CTU-Malware-Capture-Botnet-46/detailed-bidirectional-flow-labels/capture20110815-2.binetflow"
from datetime import datetime
ctu_date_parser = lambda x: datetime.strptime(x, '%Y/%m/%d %H:%M:%S.%f').strftime("%Y-%m-%dT%H:%M:%S.%f")
df = pd.read_csv(input_filename, parse_dates=['StartTime'], date_parser=ctu_date_parser, converters={"Sport": cleanup_data, "Dport": cleanup_data})

CPU times: user 2.14 s, sys: 290 ms, total: 2.43 s
Wall time: 5.94 s


## 2. Enrich the IP nodes with geo-location

- The enrichment uses the `maxminddb-geolite2` python module
- Installation is done with: `pip3 install maxminddb-geolite2`
- Create a data structure for IP nodes ready for Trovares xGT

In [3]:
%%time
# Collect all the IP address from source IP and target IP columns.
unique_ip_addrs = []
unique_ip_addrs.extend(df['SrcAddr'])
unique_ip_addrs.extend(df['DstAddr'])

# Remove duplicates.
unique_ip_addrs = list(set(unique_ip_addrs))

print("Number of unique IPs: " + str(len(unique_ip_addrs)))

Number of unique IPs: 41658
CPU times: user 21 ms, sys: 491 µs, total: 21.4 ms
Wall time: 22.9 ms


In [4]:
# Optional:  make sure you have the maminddb-geolite2 installed
# !pip3 install --upgrade maxminddb-geolite2

In [5]:
%%time
from geolite2 import geolite2

reader = geolite2.reader()

ip_nodes = []
for ip_addr in unique_ip_addrs:
    try:
        d = reader.get(ip_addr)
        if (isinstance(d, dict)):
            l = d.get('location')
            if (isinstance(l, dict)):
                ip_nodes.append([ip_addr, l.get('latitude'), l.get('longitude'),
                                 d['city']['names']['en'],
                                 d['country']['names']['en']])
    except:
        pass
geolite2.close()

print(f"Number of geo-located IP addresses: {len(ip_nodes):,}")

Number of geo-located IP addresses: 35,190
CPU times: user 454 ms, sys: 162 ms, total: 616 ms
Wall time: 886 ms


## 3. Create graph schema

In [6]:
# Create a vertex frame on the xGT server.
server.drop_frame('Netflow')
server.drop_frame('IP')

ip = server.create_vertex_frame(
    name = 'IP',
    schema = [['IPAddr', xgt.TEXT], ['Lat', xgt.FLOAT], ['Long', xgt.FLOAT],
              ['City', xgt.TEXT], ['Country', xgt.TEXT]],
    key = 'IPAddr',
)

In [7]:
# Create a netflow edge frame on the xGT server.
server.drop_frame('Netflow')

netflow = server.create_edge_frame(
    name = 'Netflow',
    schema = [
        ['StartTime', xgt.DATETIME], ['Dur', xgt.FLOAT], ['Proto', xgt.TEXT], ['SrcAddr', xgt.TEXT],
        ['Sport', xgt.INT], ['Dir', xgt.TEXT], ['DstAddr', xgt.TEXT], ['Dport', xgt.INT],
        ['State', xgt.TEXT], ['sTos', xgt.FLOAT], ['dTos', xgt.FLOAT],['TotPkts', xgt.INT],
        ['TotBytes', xgt.INT], ['SrcBytes', xgt.INT], ['Label', xgt.TEXT],
    ],
    source = ip,
    target = ip,
    source_key = 'SrcAddr',
    target_key = 'DstAddr', 
)

## 4. Upload the data to the Trovares xGT server


In [8]:
%%time
# First, pre-populate IP nodes with those IP addrs for which a geo-location is known
ip.insert(ip_nodes)
print(f"IP count: {ip.num_rows:,}")

IP count: 35,190
CPU times: user 70.1 ms, sys: 15.2 ms, total: 85.2 ms
Wall time: 114 ms


In [9]:
%%time
# Note that any IP address in the Netflow records that did not have any geo-location
# information will be automatically created in the xGT server with NULL values for 
# the non-key attributes.
netflow.insert(df)
print(f"IP count: {ip.num_rows:,}")
print(f"Netflow record (edges) count: {netflow.num_rows:,}")

IP count: 41,658
Netflow record (edges) count: 129,832
CPU times: user 83.1 ms, sys: 13.3 ms, total: 96.4 ms
Wall time: 214 ms


In [10]:
# Show memory footprint
max_memory = server.max_user_memory_size
print(f"Memory footprint: {max_memory - server.free_user_memory_size:,.3f} GiB used out of {max_memory:,.3f} GiB available.")

Memory footprint: 0.056 GiB used out of 16.000 GiB available.


## 5. Run a query

Run a `MATCH` query looking for a three-path (three edges connecting 4 nodes) that satisfy a bunch of constraints:

- The first three IP addresses are in different cities.
- The path starts and ends within the same country.
- The three edges are ordered by time.
- The durations are increasing throughout the path; the second edge has a much larger duration than the first.
- The three edges have these *OSI transport layer* protocols:  (tcp, udp, icmp)


In [11]:
%%time
job = server.run_job("""
    MATCH (a)-[e1]->(b)-[e2]->(c)-[e3]->(d)
    WHERE a.City <> b.City
      AND b.City <> c.City
      AND a.City <> c.City
      AND a.Country = d.Country
      AND e1.StartTime <= e2.StartTime
      AND e2.StartTime <= e3.StartTime
      AND e1.Dur < (e2.Dur / 10)  // e2 duration at least 10 times longer than e1
      AND e2.Dur < e3.Dur
      AND e1.Proto = 'tcp'
      AND e2.Proto = 'udp'
      AND e3.Proto = 'icmp'
    RETURN
      a.IPAddr AS A, a.City AS city1, e1.StartTime AS timestamp1, e1.Dur AS dur1,
      b.IPAddr AS B, b.City AS city2, e2.StartTime AS timestamp2, e2.Dur AS dur2,
      c.IPAddr AS C, c.City AS city3, e3.StartTime AS timestamp3, e3.Proto AS Proto3
""")

result_set = job.get_data_pandas()
print("Number of results: " + str(job.total_rows))
print(f"Total number of visited edges: {job.total_visited_edges:,}")

Number of results: 36
Total number of visited edges: 15,418,347
CPU times: user 11.1 ms, sys: 15.7 ms, total: 26.7 ms
Wall time: 209 ms


In [12]:
# See the actual answers
result_set

Unnamed: 0,A,city1,timestamp1,dur1,B,city2,timestamp2,dur2,C,city3,timestamp3,Proto3
0,93.181.102.24,Vysoke Myto,2011-08-15T16:43:55.804849,0.920507,147.32.84.118,Prague,2011-08-15T16:55:53.141447,22.003029,27.121.224.242,Ichinomiya,2011-08-15T17:10:56.946337,icmp
1,93.181.102.24,Vysoke Myto,2011-08-15T16:43:55.804849,0.920507,147.32.84.118,Prague,2011-08-15T16:55:53.141452,26.355053,122.57.10.203,Auckland,2011-08-15T16:55:56.496651,icmp
2,93.181.102.24,Vysoke Myto,2011-08-15T16:46:56.994198,1.008148,147.32.84.118,Prague,2011-08-15T16:55:53.141447,22.003029,27.121.224.242,Ichinomiya,2011-08-15T17:10:56.946337,icmp
3,93.181.102.24,Vysoke Myto,2011-08-15T16:46:56.994198,1.008148,147.32.84.118,Prague,2011-08-15T16:55:53.141452,26.355053,122.57.10.203,Auckland,2011-08-15T16:55:56.496651,icmp
4,78.45.122.46,Ostrava,2011-08-15T17:05:50.439640,0.801646,147.32.84.229,Prague,2011-08-15T17:11:02.638514,42.300133,85.101.162.126,Istanbul,2011-08-15T17:11:05.723377,icmp
5,78.45.122.46,Ostrava,2011-08-15T17:03:50.532438,0.913161,147.32.84.229,Prague,2011-08-15T17:11:02.638514,42.300133,85.101.162.126,Istanbul,2011-08-15T17:11:05.723377,icmp
6,78.45.122.46,Ostrava,2011-08-15T17:01:51.601500,1.196878,147.32.84.229,Prague,2011-08-15T17:11:02.638514,42.300133,85.101.162.126,Istanbul,2011-08-15T17:11:05.723377,icmp
7,78.45.122.46,Ostrava,2011-08-15T17:04:50.491271,0.986924,147.32.84.229,Prague,2011-08-15T17:11:02.638514,42.300133,85.101.162.126,Istanbul,2011-08-15T17:11:05.723377,icmp
8,78.45.122.46,Ostrava,2011-08-15T16:59:50.424783,0.643375,147.32.84.229,Prague,2011-08-15T17:11:02.638514,42.300133,85.101.162.126,Istanbul,2011-08-15T17:11:05.723377,icmp
9,213.195.224.3,Mělník,2011-08-15T16:43:22.342690,4.054811,147.32.84.229,Prague,2011-08-15T16:47:47.406019,1248.676147,90.199.159.220,Northampton,2011-08-15T16:47:50.468191,icmp


Copyright 2021-2022 Trovares Inc