In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
import os
sys.path.insert(0, os.path.abspath("./morpheus"))

print(sys.path)

['/work/examples/dfp_workflow/morpheus/morpheus', '/work/examples/dfp_workflow/morpheus', '/opt/conda/envs/morpheus/lib/python38.zip', '/opt/conda/envs/morpheus/lib/python3.8', '/opt/conda/envs/morpheus/lib/python3.8/lib-dynload', '', '/opt/conda/envs/morpheus/lib/python3.8/site-packages']


In [3]:
import logging
import os
import typing
from datetime import datetime
from datetime import timedelta

import click
import mlflow
import pandas as pd
from dfp.stages.dfp_inference_stage import DFPInferenceStage
from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage
from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_s3_batcher_stage import DFPS3BatcherStage
from dfp.stages.dfp_s3_to_df import DFPS3ToDataFrameStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.dfp_training import DFPTraining
from dfp.stages.multi_file_source import MultiFileSource
from dfp.stages.s3_object_source_stage import S3BucketSourceStage
from dfp.stages.s3_object_source_stage import s3_filter_duo
from dfp.stages.s3_object_source_stage import s3_object_generator
from dfp.stages.write_to_s3_stage import WriteToS3Stage
from dfp.utils.column_info import BoolColumn
from dfp.utils.column_info import CustomColumn
from dfp.utils.column_info import DataFrameInputSchema
from dfp.utils.column_info import RenameColumn

import cudf

from morpheus._lib.file_types import FileTypes
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.messages.message_meta import UserMessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.utils.logger import configure_logging

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Global options
train_users = "all"

# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "./.cache/dfp"

In [5]:
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)

config = Config()

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()

config.ae = ConfigAutoEncoder()

config.ae.feature_columns = [
    'accessdevicebrowser', 'accessdeviceos', 'device', 'result', 'reason', 'logcount', "locincrement"
]
config.ae.userid_column_name = "username"

In [6]:
def s3_date_extractor_duo(s3_object):
    key_object = s3_object.key

    # Extract the timestamp from the file name
    ts_object = key_object.split('_')[2].split('.json')[0].replace('T', ' ').replace('Z', '')
    ts_object = datetime.strptime(ts_object, '%Y-%m-%d %H:%M:%S.%f')

    return ts_object

In [7]:
# Specify the column names to ensure all data is uniform
column_info = [
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="access_device.browser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="access_device.os"),
    RenameColumn(name="locationcity", dtype=str, input_name="auth_device.location.city"),
    RenameColumn(name="device", dtype=str, input_name="auth_device.name"),
    BoolColumn(name="result",
               dtype=bool,
               input_name="result",
               true_values=["success", "SUCCESS"],
               false_values=["denied", "DENIED", "FRAUD"]),
    RenameColumn(name="reason", dtype=str, input_name="reason"),
    RenameColumn(name="username", dtype=str, input_name="user.name"),
    RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
]

input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"],
                                    column_info=column_info)

In [8]:
# Specify the final set of columns necessary just before pre-processing
def column_logcount(df: cudf.DataFrame):
    per_day = df[config.ae.timestamp_column_name].dt.to_period("D")

    # Create the per-user, per-day log count
    return df.groupby([config.ae.userid_column_name, per_day]).cumcount()

def column_locincrement(df: cudf.DataFrame):
    per_day = df[config.ae.timestamp_column_name].dt.to_period("D")

    # Simple but probably incorrect calculation
    return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1

model_column_info = [
    # Input columns
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="accessdevicebrowser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="accessdeviceos"),
    RenameColumn(name="device", dtype=str, input_name="device"),
    RenameColumn(name="result", dtype=bool, input_name="result"),
    RenameColumn(name="reason", dtype=str, input_name="reason"),
    # Derived columns
    CustomColumn(name="logcount", dtype=int, process_column_fn=column_logcount),
    CustomColumn(name="locincrement", dtype=int, process_column_fn=column_locincrement),
    # Extra columns
    RenameColumn(name="username", dtype=str, input_name="username"),
    RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
]

model_schema = DataFrameInputSchema(column_info=model_column_info, preserve_columns=["_batch_id"])

In [9]:
from dfp.stages.dfp_inference_stage import DFPInferenceStage
from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage
from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_s3_batcher_stage import DFPS3BatcherStage
from dfp.stages.dfp_s3_to_df import DFPS3ToDataFrameStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.dfp_training import DFPTraining
from dfp.stages.multi_file_source import MultiFileSource
from dfp.stages.s3_object_source_stage import S3BucketSourceStage
from dfp.stages.s3_object_source_stage import s3_filter_duo
from dfp.stages.s3_object_source_stage import s3_object_generator
from dfp.stages.write_to_s3_stage import WriteToS3Stage

# Create a linear pipeline object
pipeline = LinearPipeline(config)

# Source stage uses 
pipeline.set_source(
    MultiFileSource(config,
                    input_schema=input_schema,
                    filenames=["/work/examples/data/dfp/duo/duotest_pt1.json", "/work/examples/data/dfp/duo/duotest_pt2.json", "/work/examples/data/dfp/duo/duotest_pt3.json", "/work/examples/data/dfp/duo/duotest_pt4.json"],
                    parser_kwargs={
                        "lines": False, "orient": "records"
                    }))

# This will split users or just use one single user
pipeline.add_stage(
    DFPSplitUsersStage(config,
                       include_generic=include_generic,
                       include_individual=include_individual,
                       skip_users=skip_users))

# Next, have a stage that will create rolling windows
pipeline.add_stage(
    DFPRollingWindowStage(
        config,
        min_history=300 if is_training else 1,
        min_increment=300 if is_training else 1,
        # For inference, we only ever want 1 day max
        max_history="5d" if is_training else "1d",
        cache_dir=cache_dir))

# Specify the final set of columns necessary just before pre-processing
model_column_info = [
    # Input columns
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="accessdevicebrowser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="accessdeviceos"),
    RenameColumn(name="device", dtype=str, input_name="device"),
    RenameColumn(name="result", dtype=bool, input_name="result"),
    RenameColumn(name="reason", dtype=str, input_name="reason"),
    # Derived columns
    CustomColumn(name="logcount", dtype=int, process_column_fn=column_logcount),
    CustomColumn(name="locincrement", dtype=int, process_column_fn=column_locincrement),
    # Extra columns
    RenameColumn(name="username", dtype=str, input_name="username"),
    RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
]

model_schema = DataFrameInputSchema(column_info=model_column_info, preserve_columns=["_batch_id"])

# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=model_schema, only_last_batch=not is_training))

if (is_training):

    pipeline.add_stage(DFPTraining(config))

    pipeline.add_stage(DFPMLFlowModelWriterStage(config))
else:
    pipeline.add_stage(DFPInferenceStage(config))

    pipeline.add_stage(DFPPostprocessingStage(config))

    pipeline.add_stage(WriteToFileStage(config, filename="dfp_detections.csv", overwrite=True))

# Run the pipeline
await pipeline._do_run()

====Registering Pipeline====[0m
====Registering Pipeline Complete!====[0m
====Starting Pipeline====[0m
====Pipeline Started====[0m


W20220819 23:30:26.056663  5630 thread.cpp:138] unable to set memory policy - if using docker use: --cap-add=sys_nice to allow membind


====Building Pipeline====[0m
Added source: <from-multi-file-0; MultiFileSource(input_schema=DataFrameInputSchema(json_columns=['access_device', 'application', 'auth_device', 'user'], column_info=[RenameColumn(name='accessdevicebrowser', dtype=<class 'str'>, input_name='access_device.browser'), RenameColumn(name='accessdeviceos', dtype=<class 'str'>, input_name='access_device.os'), RenameColumn(name='locationcity', dtype=<class 'str'>, input_name='auth_device.location.city'), RenameColumn(name='device', dtype=<class 'str'>, input_name='auth_device.name'), BoolColumn(name='result', dtype=<class 'bool'>, input_name='result', value_map={'success': True, 'SUCCESS': True, 'denied': False, 'DENIED': False, 'FRAUD': False}), RenameColumn(name='reason', dtype=<class 'str'>, input_name='reason'), RenameColumn(name='username', dtype=<class 'str'>, input_name='user.name'), RenameColumn(name='timestamp', dtype=<class 'datetime.datetime'>, input_name='timestamp')], preserve_columns=None), filenames

[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'badguy'... Complete.[0m
[2mTraining AE model for user: 'maliciousactor'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'maliciousactor'... Complete.[0m
[2mTraining AE model for user: 'userc'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'userc'... Complete.[0m
[2mTraining AE model for user: 'userd'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'userd'... Complete.[0m
[2mTraining AE model for user: 'userb'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'userb'... Complete.[0m
[2mTraining AE model for user: 'userh'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'userh'... Complete.[0m
[2mTraining AE model for user: 'userf'...[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

[2mTraining AE model for user: 'userf'... Complete.[0m


[31mError trying to upload ML Flow model
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/registry.py", line 76, in get_store_builder
    store_builder = self._registry[scheme]
KeyError: 'file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/client.py", line 95, in _get_registry_client
    registry_client = ModelRegistryClient(self._registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 36, in __init__
    self.store  # pylint: disable=pointless-statement
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/client.py", line 40, in store
    return utils._get_store(self.registry_uri)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/mlflow/tracking/_model_registry/utils.py",

====Pipeline Complete====[0m
