In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [1]:
netwrok_trace_type_mapping = {
    'timestamp': (1, 'Int64'),
    'size': (2, 'Int32'),
    'src_ip': (3, 'Int16'),
    'dest_ip': (4, 'Int16'),
    'src_port': (5, 'Int32'),
    'dest_port': (6, 'Int32'),
}
REGRESSION = 0
CLASSIFICATION = 1
B = 1
MB = 1024 * 1024
KB = 1024
GB = 1024 * 1024 * 1024
NANO_TO_MICRO = 1000
PACKETS = "packets"
SENDS = "sends"
ALLOCS = "cpu_allocations"
DISK_READS = "disk_read"
DISK_WRITES = "disk_write"
VIRTUAL_MEMORY = "memory"
RSS_MEMORY = "rss_memory"
DATA_MEMORY = "data_memory"
S_TIME = "s_time"
U_TIME = "u_time"
START_TIME = 'start_time'
END_TIME = 'end_time'
SIZE = 'size'
GAP = 'gap'
NETWORK_IN = 'networkin'
NETWORK_OUT = 'networkout'
NODE_1 = 237
NODE_2 = 229
NODE_3 = 212
NODE_4 = 144
MASTER = NODE_1
TARGET_IP = NODE_4
SGD_PREFIX = "../../data/4/node-1"
NSDI_FEATURES = [SENDS, DISK_READS, DISK_WRITES, VIRTUAL_MEMORY, S_TIME, START_TIME, END_TIME, GAP, NETWORK_IN, SIZE, NETWORK_OUT]
ALL_FEATURES = NSDI_FEATURES + [ALLOCS, RSS_MEMORY, DATA_MEMORY, U_TIME]


In [4]:
def force_to_type(df, names, dtype):
    df[names] = df[names].apply(pandas.to_numeric, errors='coerce')
    df = df.astype(dtype)
    df = df.dropna()
    return df

In [5]:
source_ip, destination_ip = MASTER, TARGET_IP
names = sorted(list(netwrok_trace_type_mapping.keys()), key=lambda x: netwrok_trace_type_mapping[x][0])
df = pd.read_csv(f"{SGD_PREFIX}/packets", header=None, index_col=False, names=names, sep=',', on_bad_lines='skip')
df = df[
    df['src_ip'].isin([source_ip, destination_ip]) &
    df['dest_ip'].isin([source_ip, destination_ip])
]
df = df.sort_values(by='timestamp')

In [6]:
def get_flow_idx(packets_df, time_delta, s_ip, d_ip, s_p, d_p):
    trace = packets_df.values
    time_stamp = trace[:, 0]
    value_trace = trace[:, 1]
    time_stamp_next = np.roll(time_stamp, -1)
    diffs = (time_stamp_next - time_stamp)
    diffs_high = np.argwhere(diffs > time_delta).squeeze()
    diffs_high_rolled = np.roll(diffs_high, -1).squeeze()
    return np.column_stack((diffs_high, diffs_high_rolled))[:-1], time_stamp, value_trace

def get_flow_sizes(ff, v):
    return np.array(
        [np.sum(v[slice(*f)]) for f in ff]
    )

def get_flow_times(f, t):
    return np.column_stack(
        (t[f[:, 0]],
         t[f[:, 1] - 1])
    )

def get_flow_gaps(t):
    return np.roll(
        np.roll(t[:, 0], -1) - t[:, 1], 1
    )

In [7]:
s_ip = MASTER
d_ip = TARGET_IP
df = df[(df['src_ip'] == s_ip) & (df['dest_ip'] == d_ip)]
ports = list(set(map(tuple, df[['src_port', 'dest_port']].values)))
flows = []
for sp, dp in ports:
    print(sp, dp)
    flow_idx, time_stamp, value_trace = get_flow_idx(df, 500 * NANO_TO_MICRO, MASTER, TARGET_IP, sp, dp)
    flow_times = get_flow_times(flow_idx, time_stamp)
    flow_sizes = get_flow_sizes(flow_idx, value_trace)
    flow_gaps = get_flow_gaps(flow_times)
    flows.append(pd.DataFrame({'start_time': flow_times[:, 0], 'end_time': flow_times[:, 0], 'size': flow_sizes, 'gap': flow_gaps}))

53694 2225
2222 58374


In [8]:
flows_pd = pd.concat(flows, ignore_index=True).sort_values(by='start_time')[:20000]

In [9]:
len(flows_pd)

20000

In [10]:
from model.flux import FluxRegression
FluxRegression(flows_pd).train()

{'train': {'mse': 3.2868984211513617e-05,
  'mae': 0.002220623993855103,
  'r2': 0.9784388778325026},
 'test': {'mse': 0.0006906998899660411,
  'mae': 0.005239354647428278,
  'r2': 0.3603799544847288}}