<a href="https://colab.research.google.com/github/savaridas/Scripts/blob/master/Anomaly%20Detection%20using%20Deep%20Learning_study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import os
import pandas as pd
from glob import glob
from spellpy import spell


def deeplog_df_transfer(df, event_id_map):
    df['datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
    df = df[['datetime', 'EventId']]
    df['EventId'] = df['EventId'].apply(lambda e: event_id_map[e] if event_id_map.get(e) else -1)
    deeplog_df = df.set_index('datetime').resample('1min').apply(_custom_resampler).reset_index()
    # deeplog_df['EventId'] = deeplog_df['EventId'].apply(lambda x: list(filter(lambda e: e != -1, x)))
    return deeplog_df


def _custom_resampler(array_like):
    return list(array_like)


def deeplog_file_generator(filename, df):
    with open(filename, 'w') as f:
        for event_id_list in df['EventId']:
            for event_id in event_id_list:
                f.write(str(event_id) + ' ')
            f.write('\n')


if __name__ == '__main__': 
    ##########
    # Parser #
    ##########
    input_dir = './'
    output_dir = './OUTPUT'
    log_format = '<Logrecord> <Date> <Time> <Pid> <Level> <Component> \[<ADDR>\] <Content>'
    log_main = 'open_stack'
    tau = 0.5

    parser = spell.LogParser(
        indir=input_dir,
        outdir=output_dir,
        log_format=log_format,
        logmain=log_main,
        tau=tau,
    )

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for log_name in glob(input_dir + '*.log'):
        print('INPUT_DIR:', input_dir)
        log_name = os.path.basename(log_name)
        print('LOG_NAME:', log_name)
        parser.parse(log_name)

   ##################
    # Transformation #
    ##################
    df = pd.read_csv(output_dir + '/openstack_normal1.log_structured.csv')
    df_normal = pd.read_csv(output_dir+ '/openstack_normal2.log_structured.csv')
    df_abnormal = pd.read_csv(output_dir+ '/openstack_abnormal.log_structured.csv')

    event_id_map = dict()
    for i, event_id in enumerate(df['EventId'].unique(), 1):
        event_id_map[event_id] = i

    print('event_id_map:', len(event_id_map))

    #########
    # Train #
    #########
    deeplog_train = deeplog_df_transfer(df, event_id_map)
    deeplog_file_generator('train', deeplog_train)

    ###############
    # Test Normal #
    ###############
    deeplog_test_normal = deeplog_df_transfer(df_normal, event_id_map)
    deeplog_file_generator('test_normal', deeplog_test_normal)

    #################
    # Test Abnormal #
    #################
    deeplog_test_abnormal = deeplog_df_transfer(df_abnormal, event_id_map)
    deeplog_file_generator('test_abnormal', deeplog_test_abnormal)



[2020-06-24 20:21:43,874][INFO]: Parsing file: ./openstack_normal1.log


INPUT_DIR: ./
LOG_NAME: openstack_normal1.log


[2020-06-24 20:21:44,163][INFO]: Loaded 19.1% of log lines.
[2020-06-24 20:21:44,379][INFO]: Loaded 38.2% of log lines.
[2020-06-24 20:21:44,696][INFO]: Loaded 57.3% of log lines.
[2020-06-24 20:21:44,926][INFO]: Loaded 76.5% of log lines.
[2020-06-24 20:21:45,244][INFO]: Loaded 95.6% of log lines.
[2020-06-24 20:21:45,384][INFO]: load_data() finished!
[2020-06-24 20:21:45,429][INFO]: Load objects done, lastestLineId: 155347
[2020-06-24 20:21:50,258][INFO]: Processed 19.1% of log lines.
[2020-06-24 20:21:55,306][INFO]: Processed 38.2% of log lines.
[2020-06-24 20:22:00,568][INFO]: Processed 57.4% of log lines.
[2020-06-24 20:22:06,131][INFO]: Processed 76.5% of log lines.
[2020-06-24 20:22:12,190][INFO]: Processed 95.6% of log lines.
[2020-06-24 20:22:13,660][INFO]: Processed 100.0% of log lines.
[2020-06-24 20:22:18,051][INFO]: Output parse file
[2020-06-24 20:22:19,828][INFO]: lastestLindId: 155347
[2020-06-24 20:22:27,737][INFO]: rootNodePath: ./OUTPUT/rootNode.pkl
[2020-06-24 20:22

INPUT_DIR: ./
LOG_NAME: openstack_normal2.log


[2020-06-24 20:22:28,322][INFO]: Loaded 7.3% of log lines.
[2020-06-24 20:22:29,015][INFO]: Loaded 14.6% of log lines.
[2020-06-24 20:22:29,433][INFO]: Loaded 21.9% of log lines.
[2020-06-24 20:22:29,963][INFO]: Loaded 29.2% of log lines.
[2020-06-24 20:22:30,417][INFO]: Loaded 36.5% of log lines.
[2020-06-24 20:22:30,843][INFO]: Loaded 43.8% of log lines.
[2020-06-24 20:22:31,124][INFO]: Loaded 51.1% of log lines.
[2020-06-24 20:22:31,507][INFO]: Loaded 58.4% of log lines.
[2020-06-24 20:22:31,913][INFO]: Loaded 65.7% of log lines.
[2020-06-24 20:22:32,183][INFO]: Loaded 73.0% of log lines.
[2020-06-24 20:22:32,442][INFO]: Loaded 80.2% of log lines.
[2020-06-24 20:22:32,869][INFO]: Loaded 87.5% of log lines.
[2020-06-24 20:22:33,134][INFO]: Loaded 94.8% of log lines.
[2020-06-24 20:22:33,634][INFO]: load_data() finished!
[2020-06-24 20:22:34,072][INFO]: Load objects done, lastestLineId: 207636
[2020-06-24 20:22:36,508][INFO]: Processed 7.3% of log lines.
[2020-06-24 20:22:38,725][INFO

INPUT_DIR: ./
LOG_NAME: openstack_abnormal.log


[2020-06-24 20:23:41,363][INFO]: Loaded 54.2% of log lines.
[2020-06-24 20:23:41,560][INFO]: Loaded 100.0% of log lines.
[2020-06-24 20:23:41,653][INFO]: load_data() finished!
[2020-06-24 20:23:41,845][INFO]: Load objects done, lastestLineId: 344549
[2020-06-24 20:23:45,466][INFO]: Processed 54.2% of log lines.
[2020-06-24 20:23:48,536][INFO]: Processed 100.0% of log lines.
[2020-06-24 20:23:50,138][INFO]: Output parse file
[2020-06-24 20:23:52,499][INFO]: lastestLindId: 344549
[2020-06-24 20:23:58,833][INFO]: rootNodePath: ./OUTPUT/rootNode.pkl
[2020-06-24 20:23:58,877][INFO]: logCluLPath: ./OUTPUT/logCluL.pkl
[2020-06-24 20:23:58,907][INFO]: Store objects done.
[2020-06-24 20:23:58,909][INFO]: Parsing done. [Time taken: 0:00:17.822048]
[2020-06-24 20:24:00,363][INFO]: NumExpr defaulting to 2 threads.


event_id_map: 1142


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  # Remove the CWD from sys.path while we load stuff.


In [2]:
!pip install spellpy

Collecting spellpy
  Downloading https://files.pythonhosted.org/packages/c8/8a/f9962aefb51b251cfe42e07d2bebf7cd639acb4e35ebbe26990f5536cf0f/spellpy-0.0.9-py3-none-any.whl
Installing collected packages: spellpy
Successfully installed spellpy-0.0.9


In [None]:
import os
import sys
import json
import logging
import argparse

import boto3
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s][%(levelname)s]: %(message)s')
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))


class Model(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(Model, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, input):
        h0 = torch.zeros(self.num_layers, input.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, input.size(0), self.hidden_size)
        out, _ = self.lstm(input, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


class Generate():
    def __init__(self):
        self.init_obj = None

    def generate(self, name, window_size, local):
        num_sessions = 0
        inputs = []
        outputs = []

        line = self.init_line(local, name)
        while line:
            line = tuple(map(lambda n: n - 1, map(int, line.strip().split())))
            for i in range(len(line) - window_size):
                inputs.append(line[i:i+window_size])
                outputs.append(line[i+window_size])
            line = self.readline(local)
            num_sessions += 1
        logger.info('Number of session({}): {}'.format(name, len(inputs)))
        logger.info('Number of seqs({}): {}'.format(name, len(inputs)))
        dataset = TensorDataset(torch.tensor(inputs, dtype=torch.float), torch.tensor(outputs))
        return dataset

    def init_line(self, local, name):
        if local:
            f = open(name, 'r')
            self.init_obj = f
            line = self.init_obj.readline()
        else:
            client = boto3.client('s3')
            bucket = '$YOUR_S3_BUCKET'
            prefix = '$YOUR_S3_FOLDER_NAME'
            self.init_obj = client.get_object(Bucket=bucket, Key=prefix + name)
            line = self.init_obj.get('Body')._raw_stream.readline()
            line = line.decode().rstrip()  # decode from byte to string & right strip \n
        return line

    def readline(self, local):
        if local:
            line = self.init_obj.readline()
        else:
            line = self.init_obj.get('Body')._raw_stream.readline()
            line = line.decode().rstrip()
        return line


def _get_train_data_loader(batch_size, is_distributed, window_size, local, **kwargs):
    logger.info("Get train data loader")
    _generate = Generate()
    seq_dataset = _generate.generate(name='train', window_size=window_size, local=local)
    train_sampler = torch.utils.data.distributed.DistributedSampler(seq_dataset) if is_distributed else None
    dataloader = DataLoader(seq_dataset, batch_size=batch_size, shuffle=train_sampler is None,
                            sampler=train_sampler, **kwargs)
    return dataloader


def _average_gradients(model):
    # Gradient averaging
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size


def train(args):
    is_distributed = len(args.hosts) > 1 and args.backend is not None
    logger.debug("Distributed training - {}".format(is_distributed))
    use_cuda = args.num_gpus > 0
    logger.debug("Number of gpus available - {}".format(args.num_gpus))
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    device = torch.device("cuda" if use_cuda else "cpu")

    if is_distributed:
        logger.info('Initialize the distributed environment')
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
        logger.info('Initialized the distributed environment:\'{}\' backend on {} nodes. '.format(
            args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(
            dist.get_rank(), args.num_gpus))

    # set the seed for generating random numbers
    torch.manual_seed(args.seed)
    if use_cuda:
        logger.info('Use CUDA')
        torch.cuda.manual_seed(args.seed)

    train_loader = _get_train_data_loader(args.batch_size, is_distributed, args.window_size, args.local, **kwargs)

    logger.debug("Processes {}/{} ({:.0f}%) of train data".format(
        len(train_loader.sampler), len(train_loader.dataset),
        100. * len(train_loader.sampler) / len(train_loader.dataset)
    ))

    model = Model(args.input_size, args.hidden_size, args.num_layers, args.num_classes).to(device)
    if is_distributed and use_cuda:
        logger.info('multi-machine multi-gpu case')
        model = torch.nn.parallel.DistributedDataParallel(model)
    else:
        logger.info('single-machine multi-gpu case or single-machine or multi-machine cpu case')
        model = torch.nn.DataParallel(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())

    for epoch in range(1, args.epochs + 1):
        model.train()
        train_loss = 0
        for seq, label in train_loader:
            seq = seq.clone().detach().view(-1, args.window_size, args.input_size).to(device)
            optimizer.zero_grad()
            output = model(seq)
            loss = criterion(output, label.to(device))
            loss.backward()
            if is_distributed and not use_cuda:
                # average gradients manually for multi-machine cpu case only
                _average_gradients(model)
            optimizer.step()
            train_loss += loss.item()
        logger.debug('Epoch [{}/{}], Train_loss: {}'.format(
            epoch, args.epochs, round(train_loss/len(train_loader.dataset), 4)
        ))
    logger.debug('Finished Training')
    save_model(model, args.model_dir, args)


def save_model(model, model_dir, args):
    logger.info("Saving the model.")
    path = os.path.join(model_dir, 'model.pth')
    torch.save(model.cpu().state_dict(), path)
    # Save arguments used to create model for restoring the model later
    model_info_path = os.path.join(model_dir, 'model_info.pth')
    with open(model_info_path, 'wb') as f:
        model_info = {
            'input_size': args.input_size,
            'hidden_size': args.hidden_size,
            'num_layers': args.num_layers,
            'num_classes': args.num_classes,
            'num_candidates': args.num_candidates,
            'window_size': args.window_size,
        }
        torch.save(model_info, f)


def model_fn(model_dir):
    logger.info('Loading the model.')
    model_info = {}
    with open(os.path.join(model_dir, 'model_info.pth'), 'rb') as f:
        model_info = torch.load(f)
    logger.debug('model_info: {}'.format(model_info))
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info('Current device: {}'.format(device))
    model = torch.nn.DataParallel(Model(input_size=model_info['input_size'],
                                        hidden_size=model_info['hidden_size'],
                                        num_layers=model_info['num_layers'],
                                        num_classes=model_info['num_classes']))
    with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    input_size = model_info['input_size']
    window_size = model_info['window_size']
    num_candidates = model_info['num_candidates']
    return {'model': model.to(device),
            'window_size': window_size,
            'input_size': input_size,
            'num_candidates': num_candidates}


def input_fn(request_body, request_content_type):
    logger.info('Deserializing the input data.')
    if request_content_type == 'application/json':
        input_data = json.loads(request_body)
        return input_data
    else:
        raise ValueError("{} not supported by script!".format(request_content_type))


def predict_fn(input_data, model_info):
    logger.info('Predict next template on this pattern series.')
    line = input_data['line']
    num_candidates = model_info['num_candidates']
    input_size = model_info['input_size']
    window_size = model_info['window_size']
    model = model_info['model']

    logger.info(line)
    logger.debug(num_candidates)
    logger.debug(input_size)
    logger.debug(window_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info('Current device: {}'.format(device))

    predict_cnt = 0
    anomaly_cnt = 0
    predict_list = [0] * len(line)
    for i in range(len(line) - window_size):
        seq = line[i:i + window_size]
        label = line[i + window_size]
        seq = torch.tensor(seq, dtype=torch.float).view(-1, window_size, input_size).to(device)
        label = torch.tensor(label).view(-1).to(device)
        output = model(seq)
        predict = torch.argsort(output, 1)[0][-num_candidates:]
        if label not in predict:
            anomaly_cnt += 1
            predict_list[i + window_size] = 1
        predict_cnt += 1
    return {'anomaly_cnt': anomaly_cnt, 'predict_cnt': predict_cnt, 'predict_list': predict_list}


def output_fn(prediction, accept):
    logger.info('Serializing the generated output.')
    if accept == "application/json":
        return json.dumps(prediction), accept
    raise ValueError("{} accept type is not supported by this script".format(accept))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Data and model checkpoints directories
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--epochs', type=int, default=50, metavar='N',
                        help='number of epochs to train (default: 50)')
    parser.add_argument('--window-size', type=int, default=10, metavar='N',
                        help='length of training window (default: 10)')
    parser.add_argument('--input-size', type=int, default=1, metavar='N',
                        help='model input size (default: 1)')
    parser.add_argument('--hidden-size', type=int, default=64, metavar='N',
                        help='hidden layer size (default: 64)')
    parser.add_argument('--num-layers', type=int, default=2, metavar='N',
                        help='number of model\'s layer (default: 2)')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--backend', type=str, default=None,
                        help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')

    parser.add_argument('--num-classes', type=int, metavar='N',
                        help='the number of model\'s output, must same as pattern size!')
    parser.add_argument('--num-candidates', type=int, metavar='N',
                        help='the number of predictors sequences as correct predict.')

    # Container environment
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))
    parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])

    # Local mode
    parser.add_argument('--local', type=bool, default=False,
                        help='local training model.')

    train(parser.parse_args())


KeyError: ignored

In [None]:
!pip install -q torch==1.5.1 torchvision


[K     |████████████████████████████████| 753.2MB 21kB/s 
[?25h

In [None]:
import torch
print(torch.__version__)

1.5.1
