# 1.0 Source Data Exploration and Schemas

## 1.1 Exploring Raw Data

Let's first explore what the raw data look like so we can asses the change needed in the pipeline to dynamically build schema objects.

In [3]:
! ls /workspace/examples/data/dfp/azure-inference-data

AZUREAD_2022-08-30T00_17_05.561Z.json  AZUREAD_2022-08-31T00_21_46.153Z.json
AZUREAD_2022-08-30T03_14_27.626Z.json  AZUREAD_2022-08-31T03_08_27.951Z.json
AZUREAD_2022-08-30T06_15_21.422Z.json  AZUREAD_2022-08-31T06_20_09.178Z.json
AZUREAD_2022-08-30T09_21_58.312Z.json  AZUREAD_2022-08-31T09_01_27.089Z.json
AZUREAD_2022-08-30T12_05_53.775Z.json  AZUREAD_2022-08-31T12_02_02.230Z.json
AZUREAD_2022-08-30T15_05_34.679Z.json  AZUREAD_2022-08-31T15_03_06.756Z.json
AZUREAD_2022-08-30T18_39_54.214Z.json  AZUREAD_2022-08-31T18_03_06.102Z.json
AZUREAD_2022-08-30T21_01_48.448Z.json  AZUREAD_2022-08-31T21_13_44.759Z.json


In [4]:
import json
import pandas as pd

with open("../../../data/dfp/azure-training-data/AZUREAD_2022-08-01T00_03_56.207Z.json") as f:
    training_data_sample = json.load(f)
    
with open("../../../data/dfp/azure-inference-data/AZUREAD_2022-08-30T00_17_05.561Z.json") as f:
    inference_data_sample = json.load(f)
    
print(f"Training data has {len(training_data_sample)} rows and inference data has {len(inference_data_sample)} rows.")

Training data has 13 rows and inference data has 11 rows.


In [5]:
import pandas as pd
pd.DataFrame(training_data_sample).head()

Unnamed: 0,time,resourceId,operationName,operationVersion,category,tenantId,resultType,resultSignature,resultDescription,durationMs,callerIpAddress,correlationId,identity,Level,location,properties
0,2022-08-01T00:03:56.207532Z,/tenants/d3e5a967-5657-4a42-afcc-6106b6c3c299/...,Sign-in activity,1.0,NonInteractiveUserSignInLogs,d3e5a967-5657-4a42-afcc-6106b6c3c299,50158,,External security challenge was not satisfied.,0,44.22.19.201,84ca338d-f4ff-4f34-9f2a-5a6e23f78c0b,Thomas Price,4,XN,"{'id': 'df70b726-7756-4baa-9a7d-5ac965198e00',..."
1,2022-08-01T00:19:37.909827Z,/tenants/d3e5a967-5657-4a42-afcc-6106b6c3c299/...,Sign-in activity,1.0,SignInLogs,d3e5a967-5657-4a42-afcc-6106b6c3c299,0,,,0,99.116.100.205,7641103c-1db3-4e14-9ebc-6a9555ba02b2,Aaron Cole,4,XD,"{'id': 'c98bb980-53fe-43a8-afd2-72b917706b00',..."
2,2022-08-01T00:25:38.530749Z,/tenants/d3e5a967-5657-4a42-afcc-6106b6c3c299/...,Sign-in activity,1.0,NonInteractiveUserSignInLogs,d3e5a967-5657-4a42-afcc-6106b6c3c299,0,,,0,86.154.193.190,ef72b144-8295-493d-8231-c12e755a74d8,Kristen Howell,4,XR,"{'id': '4ef53074-987d-44ae-a8dd-b6e418929900',..."
3,2022-08-01T00:37:00.149031Z,/tenants/d3e5a967-5657-4a42-afcc-6106b6c3c299/...,Sign-in activity,1.0,NonInteractiveUserSignInLogs,d3e5a967-5657-4a42-afcc-6106b6c3c299,50158,,External security challenge was not satisfied.,0,42.62.103.34,9901c16e-f768-4891-9b92-f1ab68223893,Joseph Taylor,4,XF,"{'id': '7f19788f-2e61-49ad-9601-4fe6e5b87200',..."
4,2022-08-01T00:44:19.056251Z,/tenants/d3e5a967-5657-4a42-afcc-6106b6c3c299/...,Sign-in activity,1.0,NonInteractiveUserSignInLogs,d3e5a967-5657-4a42-afcc-6106b6c3c299,0,,,0,42.62.103.34,c25d344f-ea74-4470-94bb-ea652c630dd3,Joseph Taylor,4,XF,"{'id': '20f677f0-ddf9-46df-bf50-9e296caa9100',..."


We see a combination of mixed data types in the dataframe. Our Morpheus pipeline will require us to cast the input dataframe into uniform types.

## 2.1 An Overview of DataFrame Schemas

Recall that Morpheus requires us to create ```DataFrameInputSchema``` objects to pass to the ```DFPFileToDataFrameStage``` and ```DFPPreprocessingStage``` so it knows how and where to manipulate objects from the JSON as CuDF dataframes are processed. 

```DataFrameInputSchema``` objects are created from lists of ```ColumnInfo``` objects. For example, Here's what we're aiming to get to dynamically in the pipeline for the source stage:

```python
source_column_info = [
            DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"),
            RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"),
            RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"),
            ColumnInfo(name="category", dtype=str),
            RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"),
            RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"),
            RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"),
            RenameColumn(name="deviceDetailoperatingSystem",
                         dtype=str,
                         input_name="properties.deviceDetail.operatingSystem"),
            StringCatColumn(name="location",
                            dtype=str,
                            input_columns=[
                                "properties.location.city",
                                "properties.location.countryOrRegion",
                            ],
                            sep=", "),
            RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"),
        ]

```

You'll notice a few things here. There are three primary types of Column types used in this schema definition:
1. `ColumnInfo`: This is a 'fundamental' column type on Morpheus, storing a column name and data type for a single column in a dataframe. ColumnInfo objects are defined as python ```dataclasses``` and contain a host of utility functions to transform data. Refer to source at ```morpheus/utils/column_info.py``` for more details. 
2. `StringCatColumn`: Subclass of `ColumnInfo`, concatenates values from multiple columns into a new string column separated by `sep`. This is useful when we want to create a single derived categorical feature from multiple columns in a dataset. 
3. `RenameColumn`: This class derives from the ```ColumnInfo``` class to also allow for dynamic renaming of the column in the pipeline. Notice above that some columns are renamed to align with the original AE feature values we defined in the Config above. 
4. `DateTimeColumn`: This is a subclass of ```RenameColumn```, specific to casting UTC localized datetime values. When incoming values contain a
    time-zone offset string the values are converted to UTC, while values without a time-zone are assumed to be UTC.
    
Once the list is created, ```DataFrameInputSchema``` objects are created as follows:

```python
source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info)
```

One more thing to note above is that Morpheus also allows you to ***preserve nested JSON objects*** present in columns in the output dataframe. Here, we declare that we want to preserve the `properties` column in the output dataframe. This will flatten the JSON content of the column to individual columns, where the heriarchy of the element is reprented by '.' separated keys in the column. You can see the usage of this flattenning above in almost all lines.

## 1.2 Creating Schemas from JSON Definitions

Here, we explore if it is possible to dynamically generate sceham definitions defined above if the user provides a JSON schema definitions which can be interpreted at runtime. As an alpha version, we'll focus on five python datatypes which will be supported: `string`, `float`, `int`, `bool`, and `datetime`. 

We'll also support the following `ColumnInfo` types: `ColumnInfo`, `RenameColumn`,`BoolColumn`,`DateTimeColumn`,`StringJoinColumn`,`StringCatColumn`,`IncrementColumn`,and `DistinctIncrementColumn`. 

The user will provide a path to a schema_defintion.json file containing information about the schema definitoin they want to build, and the pipeline will dynamically verify the consistency of the definition and construct the schema. Let's look at an example of a JSON schema definition that would create a schema as defined in section 2.1.

```JSON
{
    "JSON_COLUMNS" : ["properties"],
    "PRESERVE_COLUMNS": [],
    "SCHEMA_COLUMNS" : [
        {
            "type" : "DateTimeColumn",
            "name" : "timestamp",
            "dtype": "datetime",
            "data_column" : "time"
        },
        {
            "type" : "RenameColumn",
            "name" : "username",
            "dtype": "string",
            "data_column" : "properties.userPrincipalName"
        },
        {
            "type" : "RenameColumn",
            "name" : "appDisplayName",
            "dtype": "string",
            "data_column" : "properties.appDisplayName"
        },
        {
            "type" : "ColumnInfo",
            "data_column" : "category",
            "dtype" : "string"
        },
        ... //Other Definitions
        {
            "type" : "StringCatColumn",
            "name" : "location",
            "dtype": "string",
            "data_columns" : ["properties.location.city", "properties.location.countryOrRegion"]
            "sep" : ", "
        }
    ]
}
```

Most of the structure above is fairly intuitive. `JSON_COLUMNS` indicated which parts of the dataframe contains a JSON type that needs to be flattened. Each of the `SCHEMA_COLUMN` objects will become one `ColumnInfo` definition in our schema list. One notable change is that the `input_name` argument in our code is changed to `data_column` in the JSON to increase clarity for the end user. 

Operationalizing something like this requires us to create a a class that can read the JSON file, validate its structure, and create the schema. Let's look at how we can use one below.


In [33]:
# First, we look at an example input data schema
! cat /workspace/models/data/dfp_azure_input_schema_example.json

{
    
    "JSON_COLUMNS" : ["properties"],
    "SCHEMA_COLUMNS" : [
        {
            "type" : "DateTimeColumn",
            "dtype" : "datetime",
            "name" : "timestamp",
            "data_column" : "time"
        },
        {
            "type" : "RenameColumn",
            "dtype" : "string",
            "name" : "username",
            "data_column" : "properties.userPrincipalName"        
        },
        {
            "type" : "RenameColumn",
            "dtype" : "string",
            "name" : "appDisplayName",
            "data_column" : "properties.appDisplayName"        
        }, 
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "category"
        },
        {
            "type" : "RenameColumn",
            "dtype" : "string",
            "name" : "clientAppUsed",
            "data_column" : "properties.clientAppUsed"       
        }, 
        {
            "type" : "RenameColumn",
            "dtype"

To an end user, specifying data schemas like the above is more intuitive than writing python to specify the schema. Our provided parser can also validate data types provided in such a JSON to ensure correctness. Let's see how we could use this with a parser. 

In [10]:
from morpheus.utils.json_schema_builder import JSONSchemaBuilder

schema_builder = JSONSchemaBuilder()
schema = schema_builder.build_schema("/workspace/models/data/dfp_azure_input_schema_example.json")

pprint.pprint(schema)

DataFrameInputSchema(json_columns=['properties'],
                     column_info=[DateTimeColumn(name='timestamp',
                                                 dtype='datetime64[ns]',
                                                 input_name='time'),
                                  RenameColumn(name='username',
                                               dtype='string',
                                               input_name='properties.userPrincipalName'),
                                  RenameColumn(name='appDisplayName',
                                               dtype='string',
                                               input_name='properties.appDisplayName'),
                                  ColumnInfo(name='category', dtype='str'),
                                  RenameColumn(name='clientAppUsed',
                                               dtype='string',
                                               input_name='properties.clientAppUsed'),
       

Voila! A schema was constructed dynamically from the JSON definition. 

# 2.0 Putting it Together in a Pipeline

Let's use the previous schema definition in a simple DFP Azure training pipeline to demonstrate it's capacity to simplify our code. 

## 2.1 Imports and Basic Configuration

In [11]:
%load_ext autoreload
%autoreload 2

# Ensure that the morpheus directory is in the python path. This may not need to be run depending on the environment setup
import sys
import os
sys.path.insert(0, os.path.abspath("../../../../../morpheus"))

In [12]:
import functools
import logging
import os
import mlflow
import typing
from datetime import datetime

from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage
from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage
from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.dfp_training import DFPTraining
from dfp.stages.multi_file_source import MultiFileSource
from dfp.utils.regex_utils import iso_date_regex

from morpheus.common import FileTypes
from morpheus.cli.utils import get_log_levels
from morpheus.cli.utils import get_package_relative_file
from morpheus.cli.utils import load_labels_file
from morpheus.cli.utils import parse_log_level
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.pipeline import LinearPipeline
from morpheus.utils.column_info import ColumnInfo
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import DateTimeColumn
from morpheus.utils.column_info import DistinctIncrementColumn
from morpheus.utils.column_info import IncrementColumn
from morpheus.utils.column_info import RenameColumn
from morpheus.utils.column_info import StringCatColumn
from morpheus.utils.file_utils import date_extractor
from morpheus.utils.logger import configure_logging

# Left align all tables
from IPython.core.display import HTML
table_css = 'table {align:left;display:block}'
HTML('<style>{}</style>'.format(table_css))

In [19]:
# Global options
train_users = "all"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "/workspace/.cache/dfp"

# Input files to read from
input_files = [
    "../../../../data/dfp/azure-training-data/AZUREAD_2022-08-0*.json"
]

# The format to use for models
model_name_formatter = "DFP-azure-{user_id}"

# The format to use for experiment names
experiment_name_formatter = "dfp/azure/training/{reg_model_name}"

# === Derived Options ===
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

# Tracking URI
tracking_uri = "http://mlflow:5000"

mlflow.set_tracking_uri(tracking_uri)

In [22]:
# Enable the Morpheus logger
config = Config()

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()

config.ae = ConfigAutoEncoder()

config.ae.feature_columns = [
    "appDisplayName", "clientAppUsed", "deviceDetailbrowser", "deviceDetaildisplayName", "deviceDetailoperatingSystem", "statusfailureReason", "appincrement", "locincrement", "logcount", 
]
config.ae.userid_column_name = "username"
config.ae.timestamp_column_name = "timestamp"

## 2.2 Schema Definition (here's where we use the JSONSchemaBuilder)
First, we'll define a schema for the input of the raw data using the `JSONSchemaBuilder`. We'll also define a data schema for the post-processed data using older manual definitions without a schema builder to compare the difference in complexity. 

In [34]:
# Using the JSONSchemaBuilder
from dfp.utils.json_schema_builder import JSONSchemaBuilder

schema_builder = JSONSchemaBuilder()
source_schema = schema_builder.build_schema("/workspace/models/data/dfp_azure_input_schema_example.json")

That's ***all we had to do*** to build an input schema. Let's compare that to how we define a preprocessing schema without a builder:

In [26]:
preprocess_column_info = [
    ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime),
    ColumnInfo(name=config.ae.userid_column_name, dtype=str),
    ColumnInfo(name="appDisplayName", dtype=str),
    ColumnInfo(name="clientAppUsed", dtype=str),
    ColumnInfo(name="deviceDetailbrowser", dtype=str),
    ColumnInfo(name="deviceDetaildisplayName", dtype=str),
    ColumnInfo(name="deviceDetailoperatingSystem", dtype=str),
    ColumnInfo(name="statusfailureReason", dtype=str),

    # Derived columns
    IncrementColumn(name="logcount",
                    dtype=int,
                    input_name=config.ae.timestamp_column_name,
                    groupby_column=config.ae.userid_column_name),
    DistinctIncrementColumn(name="locincrement",
                            dtype=int,
                            input_name="location",
                            groupby_column=config.ae.userid_column_name,
                            timestamp_column=config.ae.timestamp_column_name),
    DistinctIncrementColumn(name="appincrement",
                            dtype=int,
                            input_name="appDisplayName",
                            groupby_column=config.ae.userid_column_name,
                            timestamp_column=config.ae.timestamp_column_name)
]

preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"])

pprint.pprint(preprocess_schema)

DataFrameInputSchema(json_columns=[],
                     column_info=[ColumnInfo(name='timestamp',
                                             dtype='datetime64[ns]'),
                                  ColumnInfo(name='username', dtype='str'),
                                  ColumnInfo(name='appDisplayName',
                                             dtype='str'),
                                  ColumnInfo(name='clientAppUsed', dtype='str'),
                                  ColumnInfo(name='deviceDetailbrowser',
                                             dtype='str'),
                                  ColumnInfo(name='deviceDetaildisplayName',
                                             dtype='str'),
                                  ColumnInfo(name='deviceDetailoperatingSystem',
                                             dtype='str'),
                                  ColumnInfo(name='statusfailureReason',
                                             dtype='str'),
     

Using a schema builder clearly simplifies this process from a code prespective but also because an end-user does not have to understand the low-level implementation of schemas, but can implement it in a JSON object which is dynamically type-checked. **What if we were to re-implement the postprocess schema using our JSONSchemaBuilder?** Let's see how below.

In [24]:
! cat dfp_azure_preprocess_schema_example.json

{
    "PRESERVE_COLUMNS" : ["_batch_id"],
    "SCHEMA_COLUMNS" : [
        {
            "type" : "ColumnInfo",
            "dtype" : "datetime",
            "data_column" : "timestamp"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "username"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "appDisplayName"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "clientAppUsed"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "deviceDetailbrowser"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "deviceDetaildisplayName"
        },
        {
            "type" : "ColumnInfo",
            "dtype" : "string",
            "data_column" : "deviceDetailoperatingSyst

In [35]:
preprocess_schema = schema_builder.build_schema("/workspace/models/data/dfp_azure_preprocess_schema_example.json")
pprint.pprint(preprocess_schema)

DataFrameInputSchema(json_columns=[],
                     column_info=[ColumnInfo(name='timestamp',
                                             dtype='datetime64[ns]'),
                                  ColumnInfo(name='username', dtype='str'),
                                  ColumnInfo(name='appDisplayName',
                                             dtype='str'),
                                  ColumnInfo(name='clientAppUsed', dtype='str'),
                                  ColumnInfo(name='deviceDetailbrowser',
                                             dtype='str'),
                                  ColumnInfo(name='deviceDetaildisplayName',
                                             dtype='str'),
                                  ColumnInfo(name='deviceDetailoperatingSystem',
                                             dtype='str'),
                                  ColumnInfo(name='statusfailureReason',
                                             dtype='str'),
     

## 2.3 The Rest of the Pipeline

Now, we'll run the rest of the pipeline to demonstrate how schema's fit into the larger puzzle

In [31]:
# Create a linear pipeline object
configure_logging(log_level=logging.DEBUG)

pipeline = LinearPipeline(config)

# Source stage
pipeline.set_source(MultiFileSource(config, filenames=input_files))

# Batch files into batches by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
    DFPFileBatcherStage(config,
                        period="D",
                        date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))

# Output is a list of fsspec files. Convert to DataFrames. This caches downloaded data
pipeline.add_stage(
    DFPFileToDataFrameStage(config,
                            schema=source_schema,
                            file_type=FileTypes.JSON,
                            parser_kwargs={
                                "lines": False, "orient": "records"
                            },
                            cache_dir=cache_dir))


# This will split users or just use one single user
pipeline.add_stage(
    DFPSplitUsersStage(config,
                        include_generic=include_generic,
                        include_individual=include_individual,
                        skip_users=skip_users))

# Next, have a stage that will create rolling windows
pipeline.add_stage(
    DFPRollingWindowStage(
        config,
        min_history=300 if is_training else 1,
        min_increment=300 if is_training else 0,
        # For inference, we only ever want 1 day max
        max_history="60d" if is_training else "1d",
        cache_dir=cache_dir))

# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

# Finally, perform training which will output a model
pipeline.add_stage(DFPTraining(config, validation_size=0.10))

# Write that model to MLFlow
pipeline.add_stage(
    DFPMLFlowModelWriterStage(config,
                                model_name_formatter=model_name_formatter,
                                experiment_name_formatter=experiment_name_formatter))

# Run the pipeline
await pipeline.run_async()

====Pipeline Pre-build====[0m
====Pre-Building Segment: linear_segment_0====[0m
====Pre-Building Segment Complete!====[0m
====Pipeline Pre-build Complete!====[0m
====Registering Pipeline====[0m
====Building Pipeline====[0m
====Building Pipeline Complete!====[0m
====Registering Pipeline Complete!====[0m
====Starting Pipeline====[0m


W20240425 21:03:30.542733  2618 logging.cpp:67] MRC logger already initialized


====Pipeline Started====[0m
====Building Segment: linear_segment_0====[0m
Added source: <from-multi-file-24; MultiFileSource(filenames=['../../../data/dfp/azure-training-data/AZUREAD_2022-08-0*.json'], watch=False, watch_interval=1.0)>
  └─> fsspec.OpenFiles[0m
Added stage: <dfp-file-batcher-25; DFPFileBatcherStage(date_conversion_func=functools.partial(<function date_extractor at 0x7f7e741e5510>, filename_regex=re.compile('(?P<year>\\d{4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2})(?:T(?P<hour>\\d{1,2})(?::|_|\\.)(?P<minute>\\d{1,2})(?::|_|\\.)(?P<second>\\d{1,2})(?:\\.(?P<microsecond>\\d{0,6}))?)?(?P<zulu>Z)?')), period=D, sampling_rate_s=None, start_time=None, end_time=None, sampling=None)>
  └─ fsspec.OpenFiles -> Tuple[fsspec.core.OpenFiles, int][0m
Added stage: <dfp-file-to-df-26; DFPFileToDataFrameStage(schema=DataFrameInputSchema(json_columns=['properties'], column_info=[DateTimeColumn(name='timestamp', dtype='datetime64[ns]', input_name='time'), RenameColumn(name='username', dt

2024/04/25 21:03:34 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-azure-generic_user, version 32


[2mML Flow model upload complete: generic_user:DFP-azure-generic_user:32[0m
[2mTraining AE model for user: 'generic_user'... Complete.[0m


2024/04/25 21:03:36 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-azure-generic_user, version 33


[2mML Flow model upload complete: generic_user:DFP-azure-generic_user:33[0m
====Pipeline Complete====[0m


#### That's all! Thanks for reading. 