### Preprocessing

In [3]:
import sys 
import os
module_path = os.path.abspath(os.path.join('../..'))
if module_path not in sys.path:
    sys.path.append(module_path)
from pathlib import Path
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "2"

import cdt
cdt.SETTINGS.rpath = "/usr/bin/Rscript"
from cdt.causality.graph import PC

import pandas as pd
from src.features import preprocessing
import numpy as np
import networkx as nx
import time
import datetime

In [4]:
data_path = Path('/home/ralmasri/projects/Thesis/Domain-Guided-Monitoring/data/')
csv_path = data_path / "logs_aggregated_concurrent.csv"
huawei_config = preprocessing.HuaweiPreprocessorConfig()
huawei_config.aggregated_log_file = csv_path
preprocessor = preprocessing.ConcurrentAggregatedLogsPreprocessor(huawei_config)
# huawei_df = preprocessor._load_log_only_data().fillna("")
huawei_df = pd.read_csv(data_path / "log_only_data.csv")

In [5]:
df_sizes = [100, 1000, 10000, 50000]
huawei_dfs = [huawei_df.head(x) for x in df_sizes]
results = {}

In [6]:
relevant_columns = [
            "Hostname",
            "log_level",
            "programname",
            "python_module",
            "http_status",
            "http_method",
            "@timestamp",
            "fine_log_cluster_template",
            "coarse_log_cluster_template",
            "url_cluster_template"
        ]

### Transformation without filtering

In [21]:
from collections import namedtuple
EvDef = namedtuple('EvDef', ['type', 'value'])
class EventDefinitionMap: # eid -> evdef
    def __init__(self, top_dt, end_dt):
        self.top_dt = top_dt
        self.end_dt = end_dt
        self._emap = {} # key : eid, val : evdef
        self._ermap = {} # key : evdef, val : eid

    def __len__(self):
        return len(self._emap)

    def _eids(self):
        return self._emap.keys()

    def _next_eid(self):
        eid = len(self._emap)
        while eid in self._emap:
            eid += 1
        else:
            return eid

    def get_evdef(self, eid) -> EvDef:
        return self._emap[eid]

    def get_eid(self, evdef):
        return self._ermap[evdef]

    def process_row(self, columns, row):
        row_eids = []
        for column in columns:
                if column == '@timestamp':
                    continue
                value = row[column]
                if value == "":
                    continue
                d = {
                    "type": column,
                    "value": row[column],
                }

                evdef = EvDef(**d)

                if evdef in self._ermap:
                    row_eids.append(self._ermap[evdef])
                else:
                    eid = self._next_eid()
                    self._emap[eid] = evdef
                    self._ermap[evdef] = eid
                    row_eids.append(eid)
        return row_eids

In [22]:
def label(dt_range, duration):
    top_dt, end_dt = dt_range
    l_label = []
    temp_dt = top_dt
    while temp_dt < end_dt:
        l_label.append(temp_dt)
        temp_dt += duration
    l_label.append(end_dt)
    return l_label

In [23]:
def discretize(l_dt, l_label, method):

    def return_empty(size):
        if method in ("count", "binary"):
            return [0] * bin_num
        elif method == "datetime":
            return [[] for i in range(bin_num)]
        else:
            raise NotImplementedError(
                "Invalid method name ({0})".format(method))

    def init_tempobj():
        if method == "count":
            return 0
        elif method == "binary":
            return 0
        elif method == "datetime":
            return []
        else:
            raise NotImplementedError(
                "Invalid method name ({0})".format(method))

    def update_tempobj():
        if method == "count":
            return temp + 1
        elif method == "binary":
            return 1
        elif method == "datetime":
            temp.append(new_dt)
            return temp
        else:
            raise NotImplementedError(
                "Invalid method name ({0})".format(method))

    bin_num = len(l_label) - 1
    l_dt_temp = sorted(l_dt)
    if len(l_dt_temp) <= 0:
        return_empty(bin_num)

    iterobj = iter(l_dt_temp)
    try:
        new_dt = next(iterobj)
    except StopIteration:
        raise ValueError("Not empty list, but failed to get initial value")
    while new_dt < l_label[0]:
        try:
            new_dt = next(iterobj)
        except StopIteration:
            return_empty(bin_num)

    ret = []
    stop = False
    for label_dt in l_label[1:]:
        temp = init_tempobj()
        if stop:
            ret.append(temp)
            continue
        while new_dt < label_dt:
            temp = update_tempobj()
            try:
                new_dt = next(iterobj)
            except StopIteration:
                # "stop" make data after label term be ignored
                stop = True
                break
        ret.append(temp)
    return ret

def autodiscretize_with_slide(l_dt, binsize, slide, dt_range):
    top_dt, end_dt = dt_range
    slide_width = max(int(binsize.total_seconds() / slide.total_seconds()), 1)
    l_top = label((top_dt, end_dt), slide)[:-1]
    l_end = [min(t + binsize, end_dt) for t in l_top]

    ret = []
    noslide = discretize(l_dt, l_top + [end_dt], method='datetime')

    for i, bin_end in enumerate(l_end):
        l_dt_temp = []
        for b in noslide[i:i+slide_width]:
            l_dt_temp.extend([dt for dt in b if dt <= bin_end])
    
        if len(l_dt_temp) > 0:
            ret.append(1)
        else:
            ret.append(0)

    return ret

    

In [24]:
# Assume we're using the G-squared test so everything is binary
bin_overlap = datetime.timedelta(seconds = 8)

def event2stat(evdict, top_dt, end_dt, dur):
    """This function looks at each event and returns a map that maps the 
    eid to a list that describes in which time bins the event occurs"""
    d_stat = {}
    labels = label((top_dt, end_dt), dur)
    for eid, l_ev in evdict.items():
        # if len(l_ev) == 0: # Skip events that dont have timestamps (shouldn't be possible)
        #     continue

        if bin_overlap == datetime.timedelta(seconds = 0):
            val = discretize(l_ev, labels, method="binary")
        else:
            slide = dur - bin_overlap
            val = autodiscretize_with_slide(l_ev, dur, slide, dt_range = (top_dt, end_dt))
        if val is not None:
            d_stat[eid] = val
    return d_stat

In [25]:
def create_maps(data_df: pd.DataFrame, top_dt, end_dt):
    evmap = EventDefinitionMap(top_dt=top_dt, end_dt=end_dt)
    evdict = {} # Event id -> list(datetime.datetime)
    for _, row in data_df.iterrows():
        row_eids = evmap.process_row(data_df.columns, row)
        map(lambda eid: evdict.setdefault(eid, []).append(row['@timestamp']), row_eids)
    return evmap, evdict

def get_date_range(data_df):
    date_format = '%Y-%m-%dT%H:%M:%S.%f000%z'
    data_df['@timestamp'] = data_df['@timestamp'].apply(lambda x: datetime.datetime.strptime(x, date_format))
    min_dt = data_df['@timestamp'].iloc[0].to_pydatetime()
    max_dt = data_df['@timestamp'].iloc[-1].to_pydatetime()
    print(f"Min: {min_dt}")
    print(f"Max: {max_dt}")
    top_dt = datetime.datetime.combine(min_dt.date(), datetime.time(hour=min_dt.hour)).replace(tzinfo=min_dt.tzinfo)
    end_dt = datetime.datetime.combine(max_dt.date(), datetime.time(hour=max_dt.hour, minute=max_dt.minute + 1)).replace(tzinfo=min_dt.tzinfo)
    return top_dt, end_dt


def generate_transformation_knowledge(huawei_df: pd.DataFrame):
    data_df = huawei_df.copy(deep=True)
    data_df.drop(labels=[x for x in data_df.columns if x not in relevant_columns], axis=1, inplace=True)
    data_df = data_df.sort_values(by='@timestamp').reset_index(drop=True)
    top_dt, end_dt = get_date_range(data_df)
    dur = datetime.timedelta(seconds=10)
    evmap, evdict = create_maps(data_df, top_dt, end_dt)
    data = event2stat(evdict, top_dt, end_dt, dur)
    dm = np.array([d for eid, d in sorted(data.items())]).transpose()
    df = pd.DataFrame(dm)
    graph_algo = PC(CItest="binary", method_indep="binary")
    output: nx.Graph = graph_algo.predict(df)
    causality_records = []
    for edge in list(output.edges()): # list((eid, eid))
        from_eid, to_eid = edge[0], edge[1]
        from_evdef = evmap.get_evdef(from_eid)
        to_evdef = evmap.get_evdef(to_eid)
        causality_records.append(
            {
                "parent_id": from_evdef.type + '#' + str(from_evdef.value),
                "parent_name": from_evdef.value,
                "child_id": to_evdef.type + '#' + str(to_evdef.value),
                "child_name": to_evdef.value,
            }
        )
    print(f"Number of nodes: {output.number_of_nodes()}")
    print(f"Number of edges: {output.number_of_edges()}")
    return pd.DataFrame.from_records(causality_records).drop_duplicates().reset_index(drop=True)

In [27]:
results['transformation no filter'] = []
for i, huawei_df in enumerate(huawei_dfs):
    start = time.time()
    print(f"Calculating for size {df_sizes[i]}")
    results['transformation no filter'].append(generate_transformation_knowledge(huawei_df))
    print(time.time() - start)

Calculating for size 100
Min: 2019-11-19 17:00:05+01:00
Max: 2019-11-19 17:04:20.472000+01:00
0.12584590911865234
Calculating for size 1000
Min: 2019-11-19 17:00:05+01:00
Max: 2019-11-19 17:17:31.496000+01:00
0.7679634094238281
Calculating for size 10000
Min: 2019-11-19 17:00:05+01:00
Max: 2019-11-19 21:03:02.049000+01:00
9.66567349433899
Calculating for size 50000
Min: 2019-11-19 17:00:05+01:00
Max: 2019-11-20 03:44:23.742000+01:00
238.5599546432495


### Heuristic

In [None]:
from src.features import sequences
from collections import Counter

causality_preprocessor = preprocessing.ConcurrentAggregatedLogsCausalityPreprocessor(huawei_config)
huawei_config.min_causality = 0.01

def collect_sequence_metadata(grouped_data):
    transformer = sequences.load_sequence_transformer()
    return transformer.collect_metadata(grouped_data, "all_events")

def generate_heuristic_knowledge(huawei_df: pd.DataFrame):
    log_only_data = huawei_df.copy(deep=True)
    log_only_data["grouper"] = 1
    grouped_data = preprocessor._aggregate_per(log_only_data, aggregation_column="grouper")
    metadata = collect_sequence_metadata(grouped_data)

    relevant_columns = set(
        [
            x
            for x in preprocessor.relevant_columns
            if not causality_preprocessor.config.log_only_causality or "log" in x
        ]
    )
    counted_causality = causality_preprocessor._generate_counted_causality(
        huawei_df, relevant_columns
    )

    causality_records = []
    for from_value, to_values in counted_causality.items():
        total_to_counts = len(to_values)
        to_values_counter = Counter(to_values)
        for to_value, to_count in to_values_counter.items():
            if to_count / total_to_counts > causality_preprocessor.config.min_causality:
                causality_records.append(
                    {
                        "parent_id": from_value,
                        "parent_name": from_value.split("#")[1],
                        "child_id": to_value,
                        "child_name": to_value.split("#")[1],
                    },
                )

    return (
        pd.DataFrame.from_records(causality_records)
        .drop_duplicates()
        .reset_index(drop=True)
    )

In [None]:
results['heuristic'] = [generate_heuristic_knowledge(x) for x in huawei_dfs]

### My method

In [None]:
def generate_my_method_knoweldge(huawei_df):
    vocab = set()
    for _, row in huawei_df.iterrows():
        for column in relevant_columns:
            if column == '@timestamp':
                continue
            name = column + "#" + str(row[column]).lower()
            if name not in vocab and row[column] != "":
                vocab.add(name)
    df_dict = {}
    for column in vocab:
        attribute, value = tuple(column.split('#'))
        if column not in df_dict:
            df_dict[column] = huawei_df[attribute].apply(lambda x: 1 if x == value else 0)
        else:
            df_dict[column] |= huawei_df[attribute].apply(lambda x: 1 if x == value else 0)
    alg_df = pd.concat(df_dict, axis=1)
    obj = PC(CItest="binary", method_indep="binary")
    output = obj.predict(alg_df)
    causality_records = []
    for edge in list(output.edges()):
        from_value, to_value = edge
        try:
            causality_records.append(
                {
                    "parent_id": from_value,
                    "parent_name": from_value.split("#")[1],
                    "child_id": to_value,
                    "child_name": to_value.split("#")[1],
                },
            )
        except Exception:
            print(from_value)
    return pd.DataFrame.from_records(causality_records).drop_duplicates().reset_index(drop=True)

In [None]:
results['my method'] = []
for i, huawei_df in enumerate(huawei_dfs):
    start = time.time()
    print(f"Calculating for size {df_sizes[i]}")
    results['my method'].append(generate_my_method_knoweldge(huawei_df))
    print(time.time() - start)