In [149]:
import pandas as pd 
from zat.log_to_dataframe import LogToDataFrame
from pathlib import Path
from matplotlib import pyplot as plt
from collections import defaultdict

In [2]:
data_dir = Path('../../data/Raw').resolve() # change this to match your data directory 
normal_dir = data_dir / 'Normal'
malware_dir = data_dir / 'Malware'
user_dir = normal_dir / 'CTU-Normal-44'
day_dir = user_dir / 'Day1'

In [41]:
conn_df = LogToDataFrame().create_dataframe(day_dir / 'conn.log.labeled')
ssl_df = LogToDataFrame().create_dataframe(day_dir / 'ssl.log.labeled')
x509_df = LogToDataFrame().create_dataframe(day_dir / 'x509.log')

In [62]:
dt_index = pd.DatetimeIndex(ssl_df.index)
ssl_conn_df = ssl_df.set_index('uid').join(conn_df.set_index('uid'), rsuffix='_conn')
ssl_conn_df = ssl_conn_df[[col for col in ssl_conn_df.columns if col[-5:] != '_conn']].set_index(dt_index)

In [153]:
def extract_duration_mean(df_window):
    duration_ms = df_window.duration.transform(lambda x: x.total_seconds()) * 1000
    return duration_ms.mean()

def extract_duration_std(df_window):
    duration_ms = df_window.duration.transform(lambda x: x.total_seconds()) * 1000
    return duration_ms.std()

def extract_ratio_established(df_window):
    # Established states from https://docs.zeek.org/en/master/scripts/base/protocols/conn/main.zeek.html
    connection_states = defaultdict(lambda: 0, df_window.conn_state.value_counts())
    connections_total = sum(connection_states.values())
    connections_established = connection_states['S1'] + connection_states['S2'] \
    + connection_states['S3'] + connection_states['SF'] + connection_states['RSTO']
    return connections_established / connections_total

In [154]:
aggregation_period = '5min'
time_window_groups = ssl_conn_df.groupby(pd.Grouper(freq=aggregation_period))

In [155]:
processed_df = pd.DataFrame(time_window_groups.apply(len), columns=['count_flows'])
processed_df['duration_mean'] = time_window_groups.apply(extract_duration_mean)
processed_df['duration_std'] = time_window_groups.apply(extract_duration_std)
# the paper also has percent sd of duration which I would interpret as percentage of flows, 
# with duration with more than 1 sd from mean (z-score > 1). I don't know if it makes sense.
# Might implement it later
processed_df['originator_sent_bytes'] = time_window_groups['orig_ip_bytes'].sum() # log this might make more sense
processed_df['responder_sent_bytes'] = time_window_groups['resp_ip_bytes'].sum() 
processed_df['responder_bytes_ratio'] = processed_df['responder_sent_bytes'] / (processed_df['originator_sent_bytes'] +  processed_df['responder_sent_bytes']) 
processed_df['connection_established_ratio'] = time_window_groups.apply(extract_ratio_established)

processed_df

TypeError: first argument must be callable or None

In [145]:
conn_df.columns


ts
2022-06-20 21:59:21.882370048    OTH
2022-06-20 22:01:23.278179840     S0
2022-06-20 22:01:53.666543104     S0
2022-06-20 22:02:02.465136128     S0
2022-06-20 22:02:16.730578944     S0
                                ... 
2022-06-21 21:56:24.286155008     S0
2022-06-21 21:57:11.715123968     S0
2022-06-21 21:58:03.645000960     S0
2022-06-21 21:58:18.419059968     S0
2022-06-21 21:58:45.058067968     S0
Name: conn_state, Length: 6081, dtype: category
Categories (13, object): ['OTH', 'REJ', 'RSTO', 'RSTOS0', ..., 'S3', 'SF', 'SH', 'SHR']

In [148]:
dict(conn_df.conn_state.value_counts())

{'S0': 3351,
 'SF': 1291,
 'RSTR': 411,
 'OTH': 308,
 'S1': 161,
 'S3': 146,
 'S2': 106,
 'RSTO': 100,
 'SH': 88,
 'SHR': 63,
 'RSTRH': 27,
 'RSTOS0': 19,
 'REJ': 10}