# Digital Finger Printing (DFP) with Morpheus - DUO Training
## Introduction

In this notebook, we will be building and running a DFP pipeline that performs training on Duo authentication logs. The goal is to train an autoencoder PyTorch model to recogize the patterns of users in the sample data. The model will then be used by a second Morpheus pipeline to generate anomaly scores for each individual log. These anomaly scores can be used by security teams to detect abnormal behavior when it happens so the proper action can be taken.

<div class="alert alert-block alert-info">
<b>Note:</b> For more information on DFP, the Morpheus pipeline, and setup steps to run this notebook, please refer to the coresponding DFP training materials.
</div>

In [1]:
%load_ext autoreload
%autoreload 2

# Ensure that the morpheus directory is in the python path. This may not need to be run depending on the environment setup
import sys
import os
sys.path.insert(0, os.path.abspath("../../morpheus"))

In [2]:
import functools
import logging
import os
import mlflow
import typing
from datetime import datetime

from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage
from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage
from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.dfp_training import DFPTraining
from dfp.stages.multi_file_source import MultiFileSource
from dfp.utils.regex_utils import iso_date_regex

from morpheus.common import FileTypes
from morpheus.cli.utils import get_log_levels
from morpheus.cli.utils import get_package_relative_file
from morpheus.cli.utils import load_labels_file
from morpheus.cli.utils import parse_log_level
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.pipeline import LinearPipeline
from morpheus.utils.column_info import BoolColumn
from morpheus.utils.column_info import ColumnInfo
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import DateTimeColumn
from morpheus.utils.column_info import DistinctIncrementColumn
from morpheus.utils.column_info import IncrementColumn
from morpheus.utils.column_info import RenameColumn
from morpheus.utils.column_info import StringCatColumn
from morpheus.utils.file_utils import date_extractor
from morpheus.utils.logger import configure_logging

# Left align all tables
from IPython.core.display import HTML
table_css = 'table {align:left;display:block}'
HTML('<style>{}</style>'.format(table_css))

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


## High Level Configuration

The following options significantly alter the functionality of the pipeline. These options are separated from the individual stage options since they may effect more than one stage. Additionally, the matching python script to this notebook, `dfp_pipeline_duo.py`, configures these options via command line arguments.

### Options

| Name | Type | Description |
| --- | --- | :-- |
| `train_users` | One of `["all", "generic", "individual"]` | This indicates which users to train for this pipeline:<ul><li>`"generic"`: Combine all users into a single model with the username 'generic_user'. Skips individual users.</li><li>`"individual"`: Trains a separate model for each individual user. Skips 'generic_user'.</li><li>`"all"`: Combination of `"generic"` and `"individual"`. Both the 'generic_user' and individual users are trained in the same pipeline.</li></ul>|
| `skip_users` | List of strings | Any user in this list will be dropped from the pipeline. Useful for debugging to remove automated accounts with many logs. |
| `cache_dir` | string | The location to store cached files. To aid with development and reduce bandwidth, the Morpheus pipeline will cache data from several stages of the pipeline. This option configures the location for those caches. |
| `input_files` | List of strings | List of files to process. Can specify multiple arguments for multiple files. Also accepts glob (\*) wildcards and schema prefixes such as `s3://`. For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. Refer to `fsspec` documentation for list of possible options. |
| `model_name_formatter` | string | A format string to use when building the model name. The model name is constructed by calling `model_name_formatter.format(user_id=user_id)`. For example, with `model_name_formatter="my_model-{user_id}"` and a user ID of `"first:last"` would result in the model name of `"my_model-first:last"`. This should match the value used in `DFPMLFlowModelWriterStage`. Available keyword arguments: `user_id`, `user_md5`. |
| `experiment_name_formatter` | string | A format string (without the `f`) that will be used when creating an experiment in ML Flow. Available keyword arguments: `user_id`, `user_md5`, `reg_model_name`. |


In [3]:
# Global options
train_users = "all"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "/workspace/.cache/dfp"

# Input files to read from
input_files = [
    "../../../../data/dfp/duo-training-data/DUO_2022-08-*.json"
]

# The format to use for models
model_name_formatter = "DFP-duo-{user_id}"

# The format to use for experiment names
experiment_name_formatter = "dfp/duo/training/{reg_model_name}"

# === Derived Options ===
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

# Tracking URI
tracking_uri = "http://mlflow:5000"

### Set MLFlow Tracking URI
Set MLFlow tracking URI to make inference calls.

In [4]:
mlflow.set_tracking_uri(tracking_uri)

### Global Config Object
Before creating the pipeline, we need to setup logging and set the parameters for the Morpheus config object. This config object is responsible for the following:
 - Indicating whether to use C++ or Python stages
    - C++ stages are not supported for the DFP pipeline. This should always be `False`
 - Setting the number of threads to use in the pipeline. Defaults to the thread count of the OS.
 - Sets the feature column names that will be used in model training
    - This option allows extra columns to be used in the pipeline that will not be part of the training algorithm.
    - The final features that the model will be trained on will be an intersection of this list with the log columns.
 - The column name that indicates the user's unique identifier
    - It is required for DFP to have a user ID column
 - The column name that indicates the timestamp for the log
    - It is required for DFP to know when each log occurred

In [5]:
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)

config = Config()

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()

config.ae = ConfigAutoEncoder()

config.ae.feature_columns = [
    'accessdevicebrowser', 'accessdeviceos', 'authdevicename', 'result', 'reason', 'logcount', "locincrement"
]
config.ae.userid_column_name = "username"
config.ae.timestamp_column_name = "timestamp"

In [6]:
# Specify the column names to ensure all data is uniform
source_column_info = [
    DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="timestamp"),
    RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="user.name"),
    RenameColumn(name="accessdevicebrowser", dtype=str, input_name="access_device.browser"),
    RenameColumn(name="accessdeviceos", dtype=str, input_name="access_device.os"),
    StringCatColumn(name="location",
                    dtype=str,
                    input_columns=[
                        "access_device.location.city",
                        "access_device.location.state",
                        "access_device.location.country"
                    ],
                    sep=", "),
    RenameColumn(name="authdevicename", dtype=str, input_name="auth_device.name"),
    BoolColumn(name="result",
                dtype=bool,
                input_name="result",
                true_values=["success", "SUCCESS"],
                false_values=["denied", "DENIED", "FRAUD"]),
    ColumnInfo(name="reason", dtype=str),
    # CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups"))
]

source_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"],
                                        column_info=source_column_info)


In [7]:
# Preprocessing schema
preprocess_column_info = [
    ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime),
    ColumnInfo(name=config.ae.userid_column_name, dtype=str),
    ColumnInfo(name="accessdevicebrowser", dtype=str),
    ColumnInfo(name="accessdeviceos", dtype=str),
    ColumnInfo(name="authdevicename", dtype=str),
    ColumnInfo(name="result", dtype=bool),
    ColumnInfo(name="reason", dtype=str),
    # Derived columns
    IncrementColumn(name="logcount",
                    dtype=int,
                    input_name=config.ae.timestamp_column_name,
                    groupby_column=config.ae.userid_column_name),
    DistinctIncrementColumn(name="locincrement",
                            dtype=int,
                            input_name="location",
                            groupby_column=config.ae.userid_column_name,
                            timestamp_column=config.ae.timestamp_column_name)
]

preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"])


## Pipeline Construction
From this point on we begin constructing the stages that will make up the pipeline. To make testing easier, constructing the pipeline object, adding the stages, and running the pipeline, is provided as a single cell. The below cell can be rerun multiple times as needed for debugging.

### Source Stage (`MultiFileSource`)

This pipeline read input logs from one or more input files. This source stage will construct a list of files to be processed and pass to downstream stages. It is capable of reading files from many different source types, both local and remote. This is possible by utilizing the `fsspec` library (similar to `pandas`). Refer to the [`fsspec`](https://filesystem-spec.readthedocs.io/) documentation for more information on the supported file types. Once all of the logs have been read, the source completes. 

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `filenames` | List of strings | | Any files to read into the pipeline. All files will be combined into a single `DataFrame` |

### File Batcher Stage (`DFPFileBatcherStage`)

To improve performance, multiple small input files can be batched together into a single DataFrame for processing. This stage is responsible for determining the timestamp of input files, grouping input files into batches by time, and sending the batches to be processed into a single DataFrame. Repeated batches of files will be loaded from cache resulting in increased performance.  For example, when performaing a 60 day training run, 59 days can be cached with a period of `"D"` and retraining once per day.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `period` | `str` | `"D"` | The period to create batches. Refer to `pandas` windowing frequency documentation for available options.  |
| `date_conversion_func` | Function of `typing.Callable[[fsspec.core.OpenFile], datetime]` | | A callback which is responsible for determining the date for a specified file. |

### File to DataFrame Stage (`DFPFileToDataFrameStage`)

After files have been batched into groups, this stage is responsible for reading the files and converting into a DataFrame. The specified input schema converts the raw DataFrame into one suitable for caching and processing. Any columns that are not needed should be excluded from the schema.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `schema` | `DataFrameInputSchema` | | After the raw `DataFrame` is read from each file, this schema will be applied to ensure a consisten output from the source. |
| `file_type` | `FileTypes` | `FileTypes.Auto` | Allows overriding the file type. When set to `Auto`, the file extension will be used. Options are `CSV`, `JSON`, `PARQUET`, `Auto`. |
| `parser_kwargs` | `dict` | `{}` | This dictionary will be passed to the `DataFrame` parser class. Allows for customization of log parsing. |
| `cache_dir` | `str` | `./.cache/dfp` | The location to write cached input files to. |

### Split Users Stage (`DFPSplitUsersStage`)

Once the input logs have been read into a `DataFrame`, this stage is responsible for breaking that single `DataFrame` with many users into multiple `DataFrame`s for each user. This is also where the pipeline chooses whether to train individual users or the generic user (or both).

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `include_generic` | `bool` | | Whether or not to combine all user logs into a single `DataFrame` with the username 'generic_user' |
| `include_individual` | `bool` | | Whether or not to output individual `DataFrame` objects for each user |
| `skip_users` | List of `str` | `[]` | Any users to remove from the `DataFrame`. Useful for debugging to remove automated accounts with many logs. Mutually exclusive with `only_users`. |
| `only_users` | List of `str` | `[]` | Only allow these users in the final `DataFrame`. Useful for debugging to focus on specific users. Mutually exclusive with `skip_users`. |

### Rolling Window Stage (`DFPRollingWindowStage`)

The Rolling Window Stage performs several key pieces of functionality for DFP.
1. This stage keeps a moving window of logs on a per user basis
   1. These logs are saved to disk to reduce memory requirements between logs from the same user
1. It only emits logs when the window history requirements are met
   1. Until all of the window history requirements are met, no messages will be sent to the rest of the pipeline.
   1. Configuration options for defining the window history requirements are detailed below.
1. It repeats the necessary logs to properly calculate log dependent features.
   1. To support all column feature types, incoming log messages can be combined with existing history and sent to downstream stages.
   1. For example, to calculate a feature that increments a counter for the number of logs a particular user has generated in a single day, we must have the user's log history for the past 24 hours. To support this, this stage will combine new logs with existing history into a single `DataFrame`.
   1. It is the responsibility of downstream stages to distinguish between new logs and existing history.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `min_history` | `int` | `300` | The minimum number of logs a user must have before emitting any messages. Logs below this threshold will be saved to disk. |
| `min_increment` | `int` or `str` | `300` | Once the min history requirement is met, this stage must receive `min_increment` *new* logs before emmitting another message. Logs received before this threshold is met will be saved to disk. Can be specified as an integer count or a string duration. |
| `max_history` | `int` or `str` | `"60d"` | Once `min_history` and `min_increment` requirements have been met, this puts an upper bound on the maximum number of messages to forward into the pipeline and also the maximum amount of messages to retain in the history. Can be specified as an integer count or a string duration. |
| `cache_dir` | `str` | `./.cache/dfp` | The location to write log history to disk. |

### Preprocessing Stage (`DFPPreprocessingStage`)

This stage performs the final, row dependent, feature calculations as specified by the input schema object. Once calculated, this stage can forward on all received logs, or optionally can only forward on new logs, removing any history information.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `input_schema` | `DataFrameInputSchema` | | The final, row dependent, schema to apply to the incoming columns |

### Training Stage (`DFPTraining`)

This stage is responsible for performing the actual training calculations. Training will be performed on all received data. Resulting message will contain the input data paired with the trained model.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `model_kwargs` | `dict` | `{}` | The options to use when creating a new model instance. Refer to `DFPAutoEncoder` for information on the available options. |

### MLFlow Model Writer Stage (`DFPMLFlowModelWriterStage`)

This stage is the last step in training. It will upload the trained model from the previous stage to MLFlow. The tracking URI for which MLFlow instance to use is configured using the static method `mlflow.set_tracking_uri()`.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `model_name_formatter` | `str` | `""` | A format string to use when building the model name. The model name is constructed by calling `model_name_formatter.format(user_id=user_id)`. For example, with `model_name_formatter="my_model-{user_id}"` and a user ID of `"first:last"` would result in the model name of `"my_model-first:last"` |
| `experiment_name` | `str` |  | All models are created inside of an experiment to allow metrics to be saved with each model. This option specifies the experiment name. The final experiment name for each model will be in the form of `{experiment_name}/{model_name}` |

In [8]:
# Create a linear pipeline object
pipeline = LinearPipeline(config)

# Source stage
pipeline.set_source(MultiFileSource(config, filenames=input_files))

# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
    DFPFileBatcherStage(config,
                        period="D",
                        date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))

# Output is S3 Buckets. Convert to DataFrames. This caches downloaded S3 data
pipeline.add_stage(
    DFPFileToDataFrameStage(config,
                            schema=source_schema,
                            file_type=FileTypes.JSON,
                            parser_kwargs={
                                "lines": False, "orient": "records"
                            },
                            cache_dir=cache_dir))


# This will split users or just use one single user
pipeline.add_stage(
    DFPSplitUsersStage(config,
                        include_generic=include_generic,
                        include_individual=include_individual,
                        skip_users=skip_users))

# Next, have a stage that will create rolling windows
pipeline.add_stage(
    DFPRollingWindowStage(
        config,
        min_history=300 if is_training else 1,
        min_increment=300 if is_training else 0,
        # For inference, we only ever want 1 day max
        max_history="60d" if is_training else "1d",
        cache_dir=cache_dir))

# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

# Finally, perform training which will output a model
pipeline.add_stage(DFPTraining(config, validation_size=0.10))

# Write that model to MLFlow
pipeline.add_stage(
    DFPMLFlowModelWriterStage(config,
                                model_name_formatter=model_name_formatter,
                                experiment_name_formatter=experiment_name_formatter))

# Run the pipeline
await pipeline.run_async()

====Registering Pipeline====[0m
====Building Pipeline====[0m
====Building Pipeline Complete!====[0m
[2mStarting! Time: 1689788486.0775218[0m
====Registering Pipeline Complete!====[0m
====Starting Pipeline====[0m
====Pipeline Started====[0m
====Building Segment: linear_segment_0====[0m
Added source: <from-multi-file-0; MultiFileSource(filenames=['../../../../data/dfp/duo-training-data/DUO_2022-08-*.json'], watch=False, watch_interval=1.0)>
  └─> fsspec.OpenFiles[0m
Added stage: <dfp-file-batcher-1; DFPFileBatcherStage(date_conversion_func=functools.partial(<function date_extractor at 0x7fa005d6dc60>, filename_regex=re.compile('(?P<year>\\d{4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2})T(?P<hour>\\d{1,2})(:|_)(?P<minute>\\d{1,2})(:|_)(?P<second>\\d{1,2})(?P<microsecond>\\.\\d{1,6})?Z')), period=D, sampling_rate_s=None, start_time=None, end_time=None, sampling=None)>
  └─ fsspec.OpenFiles -> Tuple[fsspec.core.OpenFiles, int][0m
Added stage: <dfp-s3-to-df-2; DFPFileToDataFrameStage(s

2023-07-19 17:41:28,044 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-07-19 17:41:28,044 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


[2mS3 objects to DF complete. Rows: 79, Cache: miss, Duration: 11666.966438293457 ms, Rate: 6.771254585999772 rows/s[0m
[2mBatch split users complete. Input: 79 rows from 2022-08-01 03:02:04 to 2022-08-01 23:46:51. Output: 17 users, rows/user min: 2, max: 79, avg: 9.29. Duration: 3.57 ms[0m
[2mS3 objects to DF complete. Rows: 102, Cache: miss, Duration: 418.95532608032227 ms, Rate: 243.46271225215196 rows/s[0m
[2mBatch split users complete. Input: 102 rows from 2022-08-02 00:37:03 to 2022-08-02 23:30:48. Output: 17 users, rows/user min: 1, max: 102, avg: 12.00. Duration: 3.08 ms[0m
[2mS3 objects to DF complete. Rows: 100, Cache: miss, Duration: 626.4846324920654 ms, Rate: 159.62083475569773 rows/s[0m
[2mBatch split users complete. Input: 100 rows from 2022-08-03 00:01:48 to 2022-08-03 23:58:52. Output: 16 users, rows/user min: 2, max: 100, avg: 12.50. Duration: 2.78 ms[0m
[2mS3 objects to DF complete. Rows: 88, Cache: miss, Duration: 429.6708106994629 ms, Rate: 204.8079548



[2mPreprocessed 369 data for logs in 2022-08-01 03:02:04 to 2022-08-04 23:50:20 in 460.75963973999023 ms[0m
[2mTraining AE model for user: 'generic_user'...[0m
[2mS3 objects to DF complete. Rows: 79, Cache: miss, Duration: 656.3653945922852 ms, Rate: 120.3597883905389 rows/s[0m
[2mBatch split users complete. Input: 79 rows from 2022-08-05 00:39:29 to 2022-08-05 23:48:15. Output: 17 users, rows/user min: 1, max: 79, avg: 9.29. Duration: 4.51 ms[0m
[2mS3 objects to DF complete. Rows: 111, Cache: miss, Duration: 914.5395755767822 ms, Rate: 121.3725496023444 rows/s[0m
[2mBatch split users complete. Input: 111 rows from 2022-08-06 00:04:07 to 2022-08-06 23:55:21. Output: 20 users, rows/user min: 1, max: 111, avg: 11.10. Duration: 3.46 ms[0m
[2mS3 objects to DF complete. Rows: 72, Cache: miss, Duration: 508.5406303405762 ms, Rate: 141.58160765203888 rows/s[0m
[2mBatch split users complete. Input: 72 rows from 2022-08-07 01:30:43 to 2022-08-07 23:13:44. Output: 16 users, rows/u



[2mPreprocessed 718 data for logs in 2022-08-01 03:02:04 to 2022-08-08 23:53:14 in 595.3989028930664 ms[0m
[2mS3 objects to DF complete. Rows: 85, Cache: miss, Duration: 714.8773670196533 ms, Rate: 118.90151223330476 rows/s[0m
[2mBatch split users complete. Input: 85 rows from 2022-08-09 00:02:38 to 2022-08-09 23:36:30. Output: 17 users, rows/user min: 1, max: 85, avg: 10.00. Duration: 14.45 ms[0m
[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'generic_user'...[0m
[2mS3 objects to DF complete. Rows: 105, Cache: miss, Duration: 599.5631217956543 ms, Rate: 175.12751565762005 rows/s[0m
[2mBatch split users complete. Input: 105 rows from 2022-08-10 00:06:51 to 2022-08-10 23:49:15. Output: 19 users, rows/user min: 1, max: 105, avg: 11.05. Duration: 4.68 ms[0m
[2mS3 objects to DF complete. Rows: 103, Cache: miss, Duration: 487.03455924987793 ms, Rate: 211.48396565253765 rows/s[0m
[2mBatch split users complete. Input: 103 rows fro

2023/07/19 17:41:44 INFO mlflow.tracking.fluent: Experiment with name 'dfp/duo/training/DFP-duo-generic_user' does not exist. Creating a new experiment.


[2mRolling window complete for generic_user in 15.27 ms. Input: 93 rows from 2022-08-12 00:00:36 to 2022-08-12 23:48:12. Output: 1104 rows from 2022-08-01 03:02:04 to 2022-08-12 23:48:12[0m




[2mPreprocessed 1104 data for logs in 2022-08-01 03:02:04 to 2022-08-12 23:48:12 in 734.6303462982178 ms[0m
[2mS3 objects to DF complete. Rows: 89, Cache: miss, Duration: 874.0179538726807 ms, Rate: 101.8285718338513 rows/s[0m
[2mBatch split users complete. Input: 89 rows from 2022-08-13 00:01:19 to 2022-08-13 23:34:01. Output: 18 users, rows/user min: 1, max: 89, avg: 9.89. Duration: 2.93 ms[0m




[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'generic_user'...[0m
[2mS3 objects to DF complete. Rows: 87, Cache: miss, Duration: 487.9343509674072 ms, Rate: 178.30267499615206 rows/s[0m
[2mBatch split users complete. Input: 87 rows from 2022-08-14 00:28:59 to 2022-08-14 23:42:24. Output: 18 users, rows/user min: 1, max: 87, avg: 9.67. Duration: 5.19 ms[0m
[2mS3 objects to DF complete. Rows: 68, Cache: miss, Duration: 510.03265380859375 ms, Rate: 133.32479693647065 rows/s[0m
[2mBatch split users complete. Input: 68 rows from 2022-08-15 00:15:04 to 2022-08-15 23:14:56. Output: 15 users, rows/user min: 1, max: 68, avg: 9.07. Duration: 2.86 ms[0m
[2mS3 objects to DF complete. Rows: 89, Cache: miss, Duration: 498.5921382904053 ms, Rate: 178.50261399059985 rows/s[0m
[2mBatch split users complete. Input: 89 rows from 2022-08-16 00:18:15 to 2022-08-16 23:58:07. Output: 17 users, rows/user min: 1, max: 89, avg: 10.47. Duration: 3.25



[2mS3 objects to DF complete. Rows: 90, Cache: miss, Duration: 954.7185897827148 ms, Rate: 94.26861586562714 rows/s[0m
[2mBatch split users complete. Input: 90 rows from 2022-08-17 00:04:52 to 2022-08-17 23:48:15. Output: 18 users, rows/user min: 1, max: 90, avg: 10.00. Duration: 3.43 ms[0m




[2mPreprocessed 1437 data for logs in 2022-08-01 03:02:04 to 2022-08-16 23:58:07 in 994.2903518676758 ms[0m
[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'generic_user'...[0m
[2mS3 objects to DF complete. Rows: 101, Cache: miss, Duration: 583.3799839019775 ms, Rate: 173.12901159970298 rows/s[0m
[2mBatch split users complete. Input: 101 rows from 2022-08-18 00:00:50 to 2022-08-18 23:53:08. Output: 17 users, rows/user min: 1, max: 101, avg: 11.88. Duration: 3.13 ms[0m
[2mSuccessfully registered model 'DFP-duo-generic_user'.[0m


2023/07/19 17:41:48 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 1


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:1[0m
[2mS3 objects to DF complete. Rows: 90, Cache: miss, Duration: 738.9256954193115 ms, Rate: 121.7984440897383 rows/s[0m
[2mBatch split users complete. Input: 90 rows from 2022-08-19 00:01:58 to 2022-08-19 23:21:45. Output: 17 users, rows/user min: 1, max: 90, avg: 10.59. Duration: 2.75 ms[0m


2023/07/19 17:41:49 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 2


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:2[0m
[2mS3 objects to DF complete. Rows: 108, Cache: miss, Duration: 549.2737293243408 ms, Rate: 196.6232758534626 rows/s[0m
[2mBatch split users complete. Input: 108 rows from 2022-08-20 00:03:51 to 2022-08-20 23:54:30. Output: 17 users, rows/user min: 1, max: 108, avg: 12.71. Duration: 3.80 ms[0m
[2mRolling window complete for generic_user in 22.29 ms. Input: 108 rows from 2022-08-20 00:03:51 to 2022-08-20 23:54:30. Output: 1826 rows from 2022-08-01 03:02:04 to 2022-08-20 23:54:30[0m


2023/07/19 17:41:49 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 3


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:3[0m




[2mPreprocessed 1826 data for logs in 2022-08-01 03:02:04 to 2022-08-20 23:54:30 in 823.347806930542 ms[0m
[2mS3 objects to DF complete. Rows: 100, Cache: miss, Duration: 952.2974491119385 ms, Rate: 105.00920704266785 rows/s[0m
[2mBatch split users complete. Input: 100 rows from 2022-08-21 00:12:40 to 2022-08-21 23:47:09. Output: 19 users, rows/user min: 1, max: 100, avg: 10.53. Duration: 3.04 ms[0m
[2mS3 objects to DF complete. Rows: 92, Cache: miss, Duration: 530.3006172180176 ms, Rate: 173.48650371677184 rows/s[0m
[2mBatch split users complete. Input: 92 rows from 2022-08-22 00:00:32 to 2022-08-22 23:46:41. Output: 17 users, rows/user min: 2, max: 92, avg: 10.82. Duration: 3.49 ms[0m
[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'generic_user'...[0m
[2mS3 objects to DF complete. Rows: 65, Cache: miss, Duration: 685.1587295532227 ms, Rate: 94.86852782622373 rows/s[0m
[2mBatch split users complete. Input: 65 rows from 202

2023/07/19 17:41:51 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 4


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:4[0m
[2mS3 objects to DF complete. Rows: 97, Cache: miss, Duration: 529.3879508972168 ms, Rate: 183.23046422874293 rows/s[0m
[2mBatch split users complete. Input: 97 rows from 2022-08-24 00:35:12 to 2022-08-24 23:44:42. Output: 17 users, rows/user min: 1, max: 97, avg: 11.41. Duration: 4.45 ms[0m
[2mRolling window complete for generic_user in 21.36 ms. Input: 97 rows from 2022-08-24 00:35:12 to 2022-08-24 23:44:42. Output: 2180 rows from 2022-08-01 03:02:04 to 2022-08-24 23:44:42[0m




[2mS3 objects to DF complete. Rows: 88, Cache: miss, Duration: 956.1178684234619 ms, Rate: 92.03886142730788 rows/s[0m
[2mBatch split users complete. Input: 88 rows from 2022-08-25 00:38:52 to 2022-08-25 23:28:42. Output: 18 users, rows/user min: 1, max: 88, avg: 9.78. Duration: 3.11 ms[0m




[2mRolling window complete for attacktarget in 21.85 ms. Input: 16 rows from 2022-08-25 03:23:50 to 2022-08-25 22:31:15. Output: 302 rows from 2022-08-01 04:38:16 to 2022-08-25 22:31:15[0m
[2mPreprocessed 2180 data for logs in 2022-08-01 03:02:04 to 2022-08-24 23:44:42 in 1008.3956718444824 ms[0m




[2mPreprocessed 302 data for logs in 2022-08-01 04:38:16 to 2022-08-25 22:31:15 in 419.0716743469238 ms[0m
[2mS3 objects to DF complete. Rows: 90, Cache: miss, Duration: 637.6986503601074 ms, Rate: 141.13249251692338 rows/s[0m
[2mBatch split users complete. Input: 90 rows from 2022-08-26 00:03:13 to 2022-08-26 23:33:39. Output: 18 users, rows/user min: 1, max: 90, avg: 10.00. Duration: 9.87 ms[0m
[2mS3 objects to DF complete. Rows: 88, Cache: miss, Duration: 565.8867359161377 ms, Rate: 155.50815103932968 rows/s[0m
[2mBatch split users complete. Input: 88 rows from 2022-08-27 00:17:07 to 2022-08-27 23:50:48. Output: 16 users, rows/user min: 1, max: 88, avg: 11.00. Duration: 2.94 ms[0m
[2mS3 objects to DF complete. Rows: 99, Cache: miss, Duration: 480.7703495025635 ms, Rate: 205.91952083241384 rows/s[0m
[2mBatch split users complete. Input: 99 rows from 2022-08-28 00:26:28 to 2022-08-28 23:45:18. Output: 17 users, rows/user min: 1, max: 99, avg: 11.65. Duration: 2.98 ms[0m




[2mTraining AE model for user: 'generic_user'...[0m


2023/07/19 17:41:55 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 5


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:5[0m
[2mS3 objects to DF complete. Rows: 99, Cache: miss, Duration: 1086.2226486206055 ms, Rate: 91.14153541699774 rows/s[0m
[2mBatch split users complete. Input: 99 rows from 2022-08-29 00:07:32 to 2022-08-29 23:44:47. Output: 17 users, rows/user min: 1, max: 99, avg: 11.65. Duration: 3.85 ms[0m
[2mRolling window complete for anthony in 13.22 ms. Input: 9 rows from 2022-08-29 04:07:05 to 2022-08-29 23:33:24. Output: 303 rows from 2022-08-01 04:54:59 to 2022-08-29 23:33:24[0m
[2mRolling window complete for benjamin in 16.98 ms. Input: 11 rows from 2022-08-29 00:07:32 to 2022-08-29 23:17:15. Output: 300 rows from 2022-08-01 04:52:20 to 2022-08-29 23:17:15[0m
[2mPreprocessed 2545 data for logs in 2022-08-01 03:02:04 to 2022-08-28 23:45:18 in 1105.8740615844727 ms[0m
[2mPreprocessed 306 data for logs in 2022-08-01 04:18:06 to 2022-08-28 22:28:05 in 305.36842346191406 ms[0m




[2mPreprocessed 303 data for logs in 2022-08-01 04:54:59 to 2022-08-29 23:33:24 in 288.36750984191895 ms[0m




[2mPreprocessed 300 data for logs in 2022-08-01 04:52:20 to 2022-08-29 23:17:15 in 321.4404582977295 ms[0m




[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'attacktarget'...[0m


2023/07/19 17:41:58 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 6


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:6[0m
[2mTraining AE model for user: 'attacktarget'... Complete.[0m
[2mTraining AE model for user: 'generic_user'...[0m


2023/07/19 17:41:59 INFO mlflow.tracking.fluent: Experiment with name 'dfp/duo/training/DFP-duo-attacktarget' does not exist. Creating a new experiment.


[2mSuccessfully registered model 'DFP-duo-attacktarget'.[0m


2023/07/19 17:41:59 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-attacktarget, version 1


[2mML Flow model upload complete: attacktarget:DFP-duo-attacktarget:1[0m
[2mTraining AE model for user: 'generic_user'... Complete.[0m
[2mTraining AE model for user: 'juan'...[0m


2023/07/19 17:42:01 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-generic_user, version 7


[2mML Flow model upload complete: generic_user:DFP-duo-generic_user:7[0m
[2mTraining AE model for user: 'juan'... Complete.[0m
[2mTraining AE model for user: 'anthony'...[0m


2023/07/19 17:42:02 INFO mlflow.tracking.fluent: Experiment with name 'dfp/duo/training/DFP-duo-juan' does not exist. Creating a new experiment.


[2mSuccessfully registered model 'DFP-duo-juan'.[0m


2023/07/19 17:42:02 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-juan, version 1


[2mML Flow model upload complete: juan:DFP-duo-juan:1[0m
[2mTraining AE model for user: 'anthony'... Complete.[0m
[2mTraining AE model for user: 'benjamin'...[0m


2023/07/19 17:42:03 INFO mlflow.tracking.fluent: Experiment with name 'dfp/duo/training/DFP-duo-anthony' does not exist. Creating a new experiment.


[2mSuccessfully registered model 'DFP-duo-anthony'.[0m


2023/07/19 17:42:03 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-anthony, version 1


[2mML Flow model upload complete: anthony:DFP-duo-anthony:1[0m
[2mTraining AE model for user: 'benjamin'... Complete.[0m


2023/07/19 17:42:04 INFO mlflow.tracking.fluent: Experiment with name 'dfp/duo/training/DFP-duo-benjamin' does not exist. Creating a new experiment.


[2mSuccessfully registered model 'DFP-duo-benjamin'.[0m


2023/07/19 17:42:04 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-duo-benjamin, version 1


[2mML Flow model upload complete: benjamin:DFP-duo-benjamin:1[0m
====Pipeline Complete====[0m
