In [1]:
import sys
sys.path.append('/home/nadzya/Apps/log-anomaly-detector/')

import logging

logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

In [2]:
import pandas as pd
import numpy as np 
import datetime
import json

from anomaly_detector.config import Configuration
from anomaly_detector.storage.mongodb_storage import MongoDBStorage
from anomaly_detector.storage.storage_attribute import MGStorageAttribute

from bson.json_util import dumps

from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from gensim.models import Word2Vec
import re

In [3]:
def config():
    """Initialize configurations before testing."""
    cfg = Configuration()
    cfg.MG_HOST = "172.17.18.83"
    cfg.MG_PORT = 27017
    cfg.MG_CERT_DIR = ""
    cfg.MG_INPUT_DB = "anomalydb"
    cfg.MG_INPUT_COL = "utm_anomaly"
    cfg.HOSTNAME_INDEX = "hostname"
    cfg.DATETIME_INDEX = "timestamp"
    cfg.MG_CA_CERT = "/home/nadzya/Apps/log-anomaly-detector/config_files/LAD_CA.crt"
    cfg.MG_USER = 'dbadmin'
    cfg.MG_PASSWORD = 'password123'
    cfg.AGGR_VECTOR_LENGTH = 25
    cfg.AGGR_WINDOW = 5
    cfg.AGGR_EPS = 0.05
    cfg.AGGR_MIN_SAMPLES = 2
    return cfg

In [4]:
class DataCleaner:
    """Data cleaning utility functions."""

    @classmethod
    def _clean_message(cls, line):
        """Remove all none alphabetical characters from message strings."""
        words = list(re.findall("[a-zA-Z]+", line))
        return words
        
    @classmethod
    def _preprocess(cls, data):
        """Provide preprocessing for the data before running it through W2V and SOM."""
        def to_str(x):
            """Convert all non-str lists to string lists for Word2Vec."""
            ret = " ".join([str(y) for y in x]) if isinstance(x, list) else str(x)
            return ret

        for col in data.columns:
            if col == "message":
                data[col] = data[col].apply(cls._clean_message)
            else:
                data[col] = data[col].apply(to_str)

        data = data.fillna("EMPTY")

    @classmethod
    def format_log(cls, config, es_dataset):
        """Format log will extract prefix out of the message."""
        if config.LOG_FORMATTER == "strip_prefix":
            for es_data in es_dataset:
                try:
                    if len(es_data['message'].split("] ")) > 1:
                        es_data["orig_message"] = es_data["message"]
                        es_data["message"] = es_data["message"].split("] ")[1]
                except Exception as ex:
                    logging.debug("Error {} in log formatter: {}".format(ex, config.ES_LOG_FORMATTER))

In [5]:
class MongoDBDataStorageSource(DataCleaner, MongoDBStorage):
    """MongoDB data source implementation."""

    NAME = "mg.source"

    def __init__(self, config):
        """Initialize mongodb storage backend."""
        self.config = config
        MongoDBStorage.__init__(self, config)

    def retrieve(self, storage_attribute: MGStorageAttribute):
        """Retrieve data from MongoDB."""

        mg_input_db = self.mg[self.config.MG_INPUT_DB]
        now = datetime.datetime.now()

        mg_data = mg_input_db[self.config.MG_INPUT_COL]

        if self.config.LOGSOURCE_HOSTNAME != 'localhost':
            query = {
                self.config.DATETIME_INDEX:  {
                    '$gte': now - datetime.timedelta(seconds=storage_attribute.time_range),
                    '$lt': now
                },
                self.config.HOSTNAME_INDEX: self.config.LOGSOURCE_HOSTNAME
            }
        else:
            query = {
                self.config.DATETIME_INDEX:  {
                    '$gte': now - datetime.timedelta(seconds=storage_attribute.time_range),
                    '$lt': now
                }
            }
        mg_data = mg_data.find(query).sort(self.config.DATETIME_INDEX, -1).limit(storage_attribute.number_of_entries)
        self.mg.close()

        if not mg_data.count():   # if it equials 0:
            return pandas.Dataframe(), mg_data

        mg_data = dumps(mg_data, sort_keys=False)

        mg_data_normalized = pd.DataFrame(pd.json_normalize(json.loads(mg_data)))
    
        self._preprocess(mg_data_normalized)

        return mg_data_normalized, json.loads(mg_data)

In [6]:
def get_lad_word2vec(config, df):
    w2v = W2VModel(config)
    w2v.create(df, config.AGGR_VECTOR_LENGTH, config.AGGR_WINDOW)
    return (w2v.one_vector(df), w2v.model["message"].wv)

In [7]:
def get_gensin_w2v(config, df):
    w2v = Word2Vec(sentences=list(df["message"]), size=config.AGGR_VECTOR_LENGTH, window=config.AGGR_WINDOW)
    
    vectors = []
    for x in list(df["message"]):
        temp = []
        for word in x:
            if word in w2v.wv:
                #print(len(w2v.wv[word]))
                temp.append(w2v.wv[word])
            else:
                temp.append(np.array([0]*config.AGGR_VECTOR_LENGTH))
        vectors.append(temp)
    return vectors

In [8]:
def log_words_to_vector(log_list):
    result = []
    log_array_transposed = np.array(log_list, dtype=object).transpose()
    for coord in log_array_transposed:
        result.append(np.mean(coord)) 
    return result

In [9]:
def all_logs_to_vectors(logs):
    result = []
    for log in logs:
        result.append(log_words_to_vector(log))
    return np.array(result)

In [10]:
def add_cluster_label_to_df_dbscan(config, df, vectors):
    dbscan = DBSCAN(eps=config.AGGR_EPS, min_samples=cfg.AGGR_MIN_SAMPLES)
    clusters = dbscan.fit_predict(vectors)
    df["cluster"] = clusters
    return clusters

In [11]:
def aggregate_logs(df, df_json, clusters):
    aggregated = []
    for cluster in np.unique(clusters):
        messages = []
        for i in list(df.loc[df['cluster'] == cluster].index):
            messages.append(df_json[i]["message"])

        if cluster == -1:
            for msg in messages:
                aggregated.append((msg, 1, []))
                #aggregated.append(msg)
        else:
            splited_messages = [x.split() for x in messages]
            splited_transpose = [list(row) for row in zip(*splited_messages)]
            result_string = ""
            var_num = 0

            for x in splited_transpose:
                if len(set(x)) == 1:
                    result_string += x[0] + " "
                else:
                    result_string += "***" + " "

            msg_num = len(messages)
            aggregated.append((result_string[:-1], msg_num, messages))
            #aggregated.append(result_string[:-1])

    return aggregated

In [12]:
cfg = config()
mg = MongoDBDataStorageSource(cfg)
df, json_logs = mg.retrieve(MGStorageAttribute(30*86400, 1000))

Loading Configuration
Conecting to MongoDB without SSL/TLS encryption.
  if not mg_data.count():   # if it equials 0:


ServerSelectionTimeoutError: connection closed, Timeout: 30s, Topology Description: <TopologyDescription id: 61b1dcc5c448cc63293b18ce, topology_type: Single, servers: [<ServerDescription ('172.17.18.83', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('connection closed')>]>

In [None]:
df = pd.DataFrame(df["message"])
vectors = get_gensin_w2v(cfg, df)

In [None]:
mean_vectors = all_logs_to_vectors(vectors)

In [None]:
clusters = add_cluster_label_to_df_dbscan(cfg, df, mean_vectors)
clusters

In [None]:
%pylab inline

pca = PCA(n_components=2, random_state=0)
data_2d_pca = pca.fit_transform(mean_vectors)

pylab.figure(figsize=(10, 6))
scat = pylab.scatter(data_2d_pca[:, 0], data_2d_pca[:, 1], c=list(df["cluster"]))
pylab.legend(*scat.legend_elements(num="auto"))

In [None]:
[" ".join(x) for x in list(df.loc[df["cluster"] == -1]["message"])]

In [None]:
aggregate_logs(df, json_logs, clusters)