# Digital Finger Printing (DFP) with Morpheus - DUO Inference
## Introduction

In this notebook, we will be building and running a DFP pipeline that performs inference on Duo authentication logs. The goal is to use the pretrained models generated in the Duo Training notebook to generate anomaly scores for each log. These anomaly scores can be used by security teams to detect abnormal behavior when it happens so the proper action can be taken.

<div class="alert alert-block alert-info">
<b>Note:</b> For more information on DFP, the Morpheus pipeline, and setup steps to run this notebook, please see the coresponding DFP training materials.
</div>

In [1]:
# Ensure that the morpheus directory is in the python path. This may not need to be run depending on the environment setup
import sys
import os
sys.path.insert(0, os.path.abspath("./morpheus"))

In [2]:
import logging
import os
import typing
from datetime import datetime
from datetime import timedelta

import click
import mlflow
import pandas as pd
from dfp.stages.dfp_inference_stage import DFPInferenceStage
from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage
from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_s3_batcher_stage import DFPS3BatcherStage
from dfp.stages.dfp_s3_to_df import DFPS3ToDataFrameStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.dfp_training import DFPTraining
from dfp.stages.multi_file_source import MultiFileSource
from dfp.stages.s3_object_source_stage import S3BucketSourceStage
from dfp.stages.s3_object_source_stage import s3_filter_duo
from dfp.stages.s3_object_source_stage import s3_object_generator
from dfp.stages.write_to_s3_stage import WriteToS3Stage
from dfp.utils.column_info import BoolColumn
from dfp.utils.column_info import CustomColumn
from dfp.utils.column_info import DataFrameInputSchema
from dfp.utils.column_info import RenameColumn

import cudf

from morpheus._lib.file_types import FileTypes
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.messages.message_meta import UserMessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.utils.logger import configure_logging

# Left align all tables
from IPython.core.display import HTML
table_css = 'table {align:left;display:block}'
HTML('<style>{}</style>'.format(table_css))

## High Level Configuration

The following options significantly alter the functionality of the pipeline. These options are separated from the individual stage options since they may effect more than one stage. Additionally, the matching python script to this notebook, `dfp_pipeline_duo.py`, configures these options via command line arguments.

### Options

| Name | Type | Description |
| --- | --- | :-- |
| `train_users` | One of `["none"]` | For inference, this option should always be `"none"` |
| `skip_users` | List of strings | Any user in this list will be dropped from the pipeline. Useful for debugging to remove automated accounts with many logs. |
| `cache_dir` | string | The location to store cached files. To aid with development and reduce bandwidth, the Morpheus pipeline will cache data from several stages of the pipeline. This option configures the location for those caches. |

In [3]:
# Global options
train_users = "none"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "./.cache/dfp"

# === Derived Options ===
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

### Global Config Object
Before creating the pipeline, we need to setup logging and set the parameters for the Morpheus config object. This config object is responsible for the following:
 - Indicating whether to use C++ or Python stages
    - C++ stages are not supported for the DFP pipeline. This should always be `False`
 - Setting the number of threads to use in the pipeline. Defaults to the thread count of the OS.
 - Sets the feature column names that will be used in model training
    - This option allows extra columns to be used in the pipeline that will not be part of the training algorithm.
    - The final features that the model will be trained on will be an intersection of this list with the log columns.
 - The column name that indicates the user's unique identifier
    - It is required for DFP to have a user ID column
 - The column name that indicates the timestamp for the log
    - It is required for DFP to know when each log occurred

In [4]:
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)

config = Config()

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()

config.ae = ConfigAutoEncoder()

config.ae.feature_columns = [
    'accessdevicebrowser', 'accessdeviceos', 'device', 'result', 'reason', 'logcount', "locincrement"
]
config.ae.userid_column_name = "username"
config.ae.timestamp_column_name = "timestamp"

In [5]:
def s3_date_extractor_duo(s3_object):
    key_object = s3_object.key

    # Extract the timestamp from the file name
    ts_object = key_object.split('_')[2].split('.json')[0].replace('T', ' ').replace('Z', '')
    ts_object = datetime.strptime(ts_object, '%Y-%m-%d %H:%M:%S.%f')

    return ts_object

In [6]:
# Specify the column names to ensure all data is uniform
column_info = [
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="access_device.browser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="access_device.os"),
    RenameColumn(name="locationcity", dtype=str, input_name="auth_device.location.city"),
    RenameColumn(name="device", dtype=str, input_name="auth_device.name"),
    BoolColumn(name="result",
               dtype=bool,
               input_name="result",
               true_values=["success", "SUCCESS"],
               false_values=["denied", "DENIED", "FRAUD"]),
    RenameColumn(name="reason", dtype=str, input_name="reason"),
    RenameColumn(name="username", dtype=str, input_name="user.name"),
    RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
]

input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"],
                                    column_info=column_info)

In [7]:
# Specify the final set of columns necessary just before pre-processing
def column_logcount(df: cudf.DataFrame):
    per_day = df[config.ae.timestamp_column_name].dt.to_period("D")

    # Create the per-user, per-day log count
    return df.groupby([config.ae.userid_column_name, per_day]).cumcount()

def column_locincrement(df: cudf.DataFrame):
    per_day = df[config.ae.timestamp_column_name].dt.to_period("D")

    # Simple but probably incorrect calculation
    return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1

model_column_info = [
    # Input columns
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="accessdevicebrowser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="accessdeviceos"),
    RenameColumn(name="device", dtype=str, input_name="device"),
    RenameColumn(name="result", dtype=bool, input_name="result"),
    RenameColumn(name="reason", dtype=str, input_name="reason"),
    # Derived columns
    CustomColumn(name="logcount", dtype=int, process_column_fn=column_logcount),
    CustomColumn(name="locincrement", dtype=int, process_column_fn=column_locincrement),
    # Extra columns
    RenameColumn(name="username", dtype=str, input_name="username"),
    RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
]

model_schema = DataFrameInputSchema(column_info=model_column_info, preserve_columns=["_batch_id"])

## Pipeline Construction
From this point on we begin constructing the stages that will make up the pipeline. To make testing easier, constructing the pipeline object, adding the stages, and running the pipeline, is provided as a single cell. The below cell can be rerun multiple times as needed for debugging.

### Source Stage (`MultiFileSource`)

This pipeline read input logs from one or more input files. This source stage will read all specified log files, combine them into a single `DataFrame`, and pass it into the pipeline. Once all of the logs have been read, the source completes. 

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `input_schema` | `DataFrameInputSchema` | | After the raw `DataFrame` is read from each file, this schema will be applied to ensure a consisten output from the source. |
| `filenames` | List of strings | | Any files to read into the pipeline. All files will be combined into a single `DataFrame` |
| `parser_kwargs` | `dict` | `{}` | This dictionary will be passed to the `DataFrame` parser class. Allows for customization of log parsing. |


### Split Users Stage (`DFPSplitUsersStage`)

Once the input logs have been read into a `DataFrame`, this stage is responsible for breaking that single `DataFrame` with many users into multiple `DataFrame`s for each user. This is also where the pipeline chooses whether to train individual users or the generic user (or both).

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `include_generic` | `bool` | | Whether or not to combine all user logs into a single `DataFrame` with the username 'generic_user' |
| `include_individual` | `bool` | | Whether or not to output individual `DataFrame` objects for each user |
| `skip_users` | List of `str` | `[]` | Any users to remove from the `DataFrame`. Useful for debugging to remove automated accounts with many logs. |

### Rolling Window Stage (`DFPRollingWindowStage`)

The Rolling Window Stage performs several key pieces of functionality for DFP.
1. This stage keeps a moving window of logs on a per user basis
   1. These logs are saved to disk to reduce memory requirements between logs from the same user
1. It only emits logs when the window history requirements are met
   1. Until all of the window history requirements are met, no messages will be sent to the rest of the pipeline.
   1. See the below options for configuring the window history requirements
1. It repeats the necessary logs to properly calculate log dependent features.
   1. To support all column feature types, incoming log messages can be combined with existing history and sent to downstream stages.
   1. For example, to calculate a feature that increments a counter for the number of logs a particular user has generated in a single day, we must have the user's log history for the past 24 hours. To support this, this stage will combine new logs with existing history into a single `DataFrame`.
   1. It is the responsibility of downstream stages to distinguish between new logs and existing history.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `min_history` | `int` | `1` | The minimum number of logs a user must have before emitting any messages. Logs below this threshold will be saved to disk. |
| `min_increment` | `int` or `str` | `0` | Once the min history requirement is met, this stage must receive `min_increment` *new* logs before emmitting another message. Logs received before this threshold is met will be saved to disk. Can be specified as an integer count or a string duration. |
| `max_history` | `int` or `str` | `"1d"` | Once `min_history` and `min_increment` requirements have been met, this puts an upper bound on the maximum number of messages to forward into the pipeline and also the maximum amount of messages to retain in the history. Can be specified as an integer count or a string duration. |
| `cache_dir` | `str` | `./.cache/dfp` | The location to write log history to disk. |

### Preprocessing Stage (`DFPPreprocessingStage`)

This stage performs the final, row dependent, feature calculations as specified by the input schema object. Once calculated, this stage can forward on all received logs, or optionally can only forward on new logs, removing any history information.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `input_schema` | `DataFrameInputSchema` | | The final, row dependent, schema to apply to the incoming columns |
| `only_new_batches` | `bool` | | Whether or not to foward on all received logs, or just new logs. |

### Inference Stage (`DFPInference`)

This stage performs several tasks to aid in performing inference. This stage will:
1. Download models as needed from MLFlow
1. Cache previously downloaded models to improve performance
   1. Models in the cache will be periodically refreshed from MLFlow at a configured rate
1. Perform inference using the downloaded model

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `model_name_formatter` | `str` | `""` | A format string to use when building the model name. The model name is constructed by calling `model_name_formatter.format(user_id=user_id)`. For example, with `model_name_formatter="my_model-{user_id}"` and a user ID of `"first:last"` would result in the model name of `"my_model-first:last"`. This should match the value used in `DFPMLFlowModelWriterStage` |

### Post Processing Stage (`DFPPostprocessingStage`)

This stage filters the output from the inference stage for any anomalous messages. Logs which exceed the specified Z-Score will be passed onto the next stage. All remaining logs which are below the threshold will be dropped.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `z_score_threshold` | `float` | `3.0` | The Z-Score used to separate anomalous logs from normal logs. All normal logs will be filterd out and anomalous logs will be passed on. |

### Write to File Stage (`WriteToFileStage`)

This final stage will write all received messages to a single output file in either CSV or JSON format.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `filename` | `str` | | The file to write anomalous log messages to. |
| `overwrite` | `bool` | `False` | If the file specified in `filename` already exists, it will be overwritten if this option is set to `True` |

In [8]:
# Create a linear pipeline object
pipeline = LinearPipeline(config)

# Source stage uses 
pipeline.set_source(
    MultiFileSource(config,
                    input_schema=input_schema,
                    filenames=["/work/examples/data/dfp/duo/duotest_pt1.json", "/work/examples/data/dfp/duo/duotest_pt2.json", "/work/examples/data/dfp/duo/duotest_pt3.json", "/work/examples/data/dfp/duo/duotest_pt4.json"],
                    parser_kwargs={
                        "lines": False, "orient": "records"
                    }))

# This will split users or just use one single user
pipeline.add_stage(
    DFPSplitUsersStage(config,
                       include_generic=include_generic,
                       include_individual=include_individual,
                       skip_users=skip_users))

# Next, have a stage that will create rolling windows
pipeline.add_stage(
    DFPRollingWindowStage(
        config,
        min_history=300 if is_training else 1,
        min_increment=300 if is_training else 0,
        # For inference, we only ever want 1 day max
        max_history="60d" if is_training else "1d",
        cache_dir=cache_dir))

# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=model_schema, only_new_batches=not is_training))

# Perform inference on the preprocessed data
pipeline.add_stage(DFPInferenceStage(config, model_name_formatter="AE-duo-{user_id}"))

# Filter for only the anomalous logs
pipeline.add_stage(DFPPostprocessingStage(config, z_score_threshold=3.0))

# Write all anomalies to a CSV file
pipeline.add_stage(WriteToFileStage(config, filename="dfp_detections.csv", overwrite=True))

# Run the pipeline
await pipeline._do_run()

====Registering Pipeline====[0m
====Registering Pipeline Complete!====[0m
====Starting Pipeline====[0m
====Pipeline Started====[0m
====Building Pipeline====[0m
Added source: <from-multi-file-0; MultiFileSource(input_schema=DataFrameInputSchema(json_columns=['access_device', 'application', 'auth_device', 'user'], column_info=[RenameColumn(name='accessdevicebrowser', dtype=<class 'str'>, input_name='access_device.browser'), RenameColumn(name='accessdeviceos', dtype=<class 'str'>, input_name='access_device.os'), RenameColumn(name='locationcity', dtype=<class 'str'>, input_name='auth_device.location.city'), RenameColumn(name='device', dtype=<class 'str'>, input_name='auth_device.name'), BoolColumn(name='result', dtype=<class 'bool'>, input_name='result', value_map={'success': True, 'SUCCESS': True, 'denied': False, 'DENIED': False, 'FRAUD': False}), RenameColumn(name='reason', dtype=<class 'str'>, input_name='reason'), RenameColumn(name='username', dtype=<class 'str'>, input_name='use

W20220827 06:58:36.361177   496 thread.cpp:138] unable to set memory policy - if using docker use: --cap-add=sys_nice to allow membind


Sending 4000 rows
[2mBatch split users complete. Input: 4000 rows from 2021-09-14 00:52:14 to 2022-01-24 15:23:41. Output: 10 users, rows/user min: 176, max: 700, avg: 400.00. Duration: 5.55 ms[0m
[2mRolling window complete for badguy in 16.03 ms. Input: 490 rows from 2021-09-14 00:52:14 to 2022-01-24 09:30:26. Output: 490 rows from 2021-09-14 00:52:14 to 2022-01-24 09:30:26[0m
[2mPreprocessed 490 data for logs in 2021-09-14 00:52:14 to 2022-01-24 09:30:26 in 34.24859046936035 ms[0m
[2mRolling window complete for maliciousactor in 37.56 ms. Input: 700 rows from 2021-09-14 16:58:15 to 2022-01-24 15:23:41. Output: 700 rows from 2021-09-14 16:58:15 to 2022-01-24 15:23:41[0m
[2mRolling window complete for usera in 24.63 ms. Input: 215 rows from 2021-10-07 10:53:23 to 2022-01-24 15:21:18. Output: 215 rows from 2021-10-07 10:53:23 to 2022-01-24 15:21:18[0m
[2mRolling window complete for userb in 28.06 ms. Input: 457 rows from 2021-10-07 04:02:07 to 2022-01-24 15:22:10. Output: 457