# Fraud Detection for Automobile Claims Demo

Auto insurance fraud ranges from misrepresenting facts on insurance applications and inflating insurance claims to staging accidents and submitting claim forms for injuries or damage that never occurred, to false reports of stolen vehicles. Fraud accounted for between 15 percent and 17 percent of total claims payments for auto insurance bodily injury in 2012, according to an Insurance Research Council (IRC) study. The study estimated that between $5.6 billion and $7.7 billion was fraudulently added to paid claims for auto insurance bodily injury payments in 2012, compared with a range of $4.3 billion to $5.8 billion in 2002.

- [Quick start with JumpStart and Autopilot](#Quick-Start-with-JumpStart-and-Autopilot)
- [Feature Engineering W/ Data Wrangler](#Preprocessing-&-feature-engineering)
- [Create Feature Store](#Create-Feature-Store-in-Code)
- [Train a XGBoost Model](#Train-A-Model)
- [Hyperparameter Tuning](#Hyperparameter-Tuning)
- [Deploy And Test](#Deploy-and-Serve-Model)
- [CI/CD Pipeline]

**This Demo is optimized for SageMaker Studio using Studio notebook in Data Science Kernel**

### Quick Start with JumpStart and Autopilot

---

#### SageMaker JumpStart
![JumpStart](statics/JumpStart.png)

#### SageMaker Autopilot
![Autopilot](statics/Autopilot.png)

## Setup

Install required and/or update third-party libraries

In [331]:
!python -m pip install -Uq pip
!python -m pip install -q awswrangler==2.2.0 imbalanced-learn==0.7.0 sagemaker==2.41.0 boto3==1.17.70
!python -m pip install -q sagemaker-experiments

[0m

In [363]:
import json
import time
import boto3
import string
import sagemaker
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import awswrangler as wr
from imblearn.over_sampling import SMOTE
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.serializers import CSVSerializer

import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup

from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

sagemaker_session = sagemaker.Session()

region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()

s3_client = boto3.client("s3", region_name=region)
prefix = "att-demo"

## Dataset
Load raw claims & customer dataset to pandas dataframe

In [364]:
df_claims = pd.read_csv("./data/claims.csv", index_col=0)
df_customers = pd.read_csv("./data/customers.csv", index_col=0)

df_claims.head()

Unnamed: 0_level_0,driver_relationship,incident_type,collision_type,incident_severity,authorities_contacted,num_vehicles_involved,num_injuries,num_witnesses,police_report_available,injury_claim,vehicle_claim,total_claim_amount,incident_month,incident_day,incident_dow,incident_hour,fraud
policy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
1,Spouse,Collision,Front,Minor,,2,0,0,No,71600,8913.668763,80513.668763,3,17,6,8,0
2,Self,Collision,Rear,Totaled,Police,3,4,0,Yes,6400,19746.724395,26146.724395,12,11,2,11,0
3,Self,Collision,Front,Minor,Police,2,0,1,Yes,10400,11652.969918,22052.969918,12,24,1,14,0
4,Child,Collision,Side,Minor,,2,0,0,No,104700,11260.930936,115960.930936,12,23,0,19,0
5,Self,Collision,Side,Major,Police,2,1,0,No,3400,27987.704652,31387.704652,5,8,2,8,0


In [365]:
df_customers.head()

Unnamed: 0_level_0,customer_age,months_as_customer,num_claims_past_year,num_insurers_past_5_years,policy_state,policy_deductable,policy_annual_premium,policy_liability,customer_zip,customer_gender,customer_education,auto_year
policy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,54,94,0,1,WA,750,3000,25/50,99207,Unkown,Associate,2006
2,41,165,0,1,CA,750,2950,15/30,95632,Male,Bachelor,2012
3,57,155,0,1,CA,750,3000,15/30,93203,Female,Bachelor,2017
4,39,80,0,1,AZ,750,3000,30/60,85208,Female,Advanced Degree,2020
5,39,60,0,1,CA,750,3000,15/30,91792,Female,High School,2018


Before you can preprocess the raw data with Data Wrangler, it must exist in S3.

In [366]:
claims_key = f"{prefix}/data/raw/claims.csv"

s3_client.upload_file(
    Filename="data/claims.csv", Bucket=bucket, Key=claims_key
)

claims_s3_path = f's3://{bucket}/{claims_key}'
print(f'processed claims data is located at {claims_s3_path}')


customers_key = f"{prefix}/data/raw/customers.csv"
s3_client.upload_file(
    Filename="data/customers.csv", Bucket=bucket, Key=customers_key
)

customers_s3_path = f's3://{bucket}/{customers_key}'
print(f'processed claims data is located at {customers_s3_path}')

processed claims data is located at s3://sagemaker-us-west-2-987720697751/att-demo/data/raw/claims.csv
processed claims data is located at s3://sagemaker-us-west-2-987720697751/att-demo/data/raw/customers.csv


### Exploration & feature engineering
From here let's jump into SageMaker Data Wranger to preprocess our dataset.  In this step, we are performing the following

1. ingest data from S3
2. visualize and analyze our data
3. process and transform to clean up and encode our dataset
4. combine customer and claims data to build one dataset
5. export data to S3 and feature store

![data_wrangler](statics/data_wrangler.png)

### Create Feature Store in Code
There are multiple ways to create/ingest data to Feature store
- Data Wrangler
- Stream using Kinesis Data Firehose
- Custom ingestion

In [367]:
# if the Data Wrangler job was not run, the claims and customers dataframes will be loaded from local copies
timestamp = pd.to_datetime("now").timestamp()

claims_preprocessed = pd.read_csv(filepath_or_buffer="data/claims_preprocessed.csv")

# a timestamp column is required by the feature store, so one is added with a current timestamp
claims_preprocessed["event_time"] = timestamp

customers_preprocessed = pd.read_csv(filepath_or_buffer="data/customers_preprocessed.csv")

customers_preprocessed["event_time"] = timestamp


combined_preprocessed = pd.read_csv(filepath_or_buffer="data/claims_customer.csv")

combined_preprocessed = combined_preprocessed.loc[:, ~combined_preprocessed.columns.str.contains("^Unnamed: 0")]

combined_preprocessed["event_time"] = timestamp

Initialize Feature Store Run Time

In [368]:
boto_session = boto3.Session(region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")

featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

#### Feature Store can infer the schema from Pandas DataFrame

In [369]:
claims_fg_name = f"{prefix}-claims-api"
customers_fg_name = f"{prefix}-customers-api"
combined_fg_name = f"{prefix}-combined-api"

record_identifier_feature_name = "policy_id"
event_time_feature_name = "event_time"

claims_feature_group = FeatureGroup(name=claims_fg_name, sagemaker_session=feature_store_session)

customers_feature_group = FeatureGroup(
    name=customers_fg_name, sagemaker_session=feature_store_session
)

combined_feature_group = FeatureGroup(
    name=combined_fg_name, sagemaker_session=feature_store_session
)

claims_feature_group.load_feature_definitions(data_frame=claims_preprocessed)
customers_feature_group.load_feature_definitions(data_frame=customers_preprocessed)
combined_feature_group.load_feature_definitions(data_frame=combined_preprocessed)

print('Schema Setup Complete')

Schema Setup Complete


#### Create Feature Group

In [None]:
claims_feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=sagemaker_role,
    enable_online_store=True,
)

customers_feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=sagemaker_role,
    enable_online_store=True,
)

combined_feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=sagemaker_role,
    enable_online_store=True,
)

### Wait until feature group creation has fully completed

In [371]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


wait_for_feature_group_creation_complete(feature_group=claims_feature_group)
wait_for_feature_group_creation_complete(feature_group=customers_feature_group)
wait_for_feature_group_creation_complete(feature_group=combined_feature_group)

FeatureGroup att-demo-claims-api successfully created.
FeatureGroup att-demo-customers-api successfully created.
FeatureGroup att-demo-combined-api successfully created.


### Ingesting data into Feature Store

In [50]:
claims_feature_group.ingest(data_frame=claims_preprocessed, max_workers=3, wait=True)

customers_feature_group.ingest(data_frame=customers_preprocessed, max_workers=3, wait=True)

combined_feature_group.ingest(data_frame=combined_preprocessed, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='att-demo-combined-api', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f34ee8f61d0>, max_workers=3, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7f34ee7b6ed0>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

#### While Wait for the Data To Load, Let's Explore the Feature Store Console

![Feature Store](statics/feature_store.png)

#### Pulling data from offline feature store
----
We can now build our training and test datasets repeatedly and consistenly from the feature groups we just created.  In this example, we will submit a SQL query to join the the Claims and Customers features.

In [372]:
claims_query = claims_feature_group.athena_query()
customers_query = customers_feature_group.athena_query()

claims_table = claims_query.table_name
customers_table = customers_query.table_name
database_name = customers_query.database

feature_columns = list(set(claims_preprocessed.columns) ^ set(customers_preprocessed.columns))
feature_columns_string = ", ".join(f'"{c}"' for c in feature_columns)
feature_columns_string = f'"{claims_table}".policy_id as policy_id, ' + feature_columns_string

query_string = f"""
SELECT DISTINCT {feature_columns_string}
FROM "{claims_table}" LEFT JOIN "{customers_table}" 
ON "{claims_table}".policy_id = "{customers_table}".policy_id
"""

In [373]:
claims_query.run(query_string=query_string, output_location=f"s3://{bucket}/{prefix}/query_results")
claims_query.wait()
dataset = claims_query.as_dataframe()

INFO:sagemaker:Query 29047a70-656e-4c62-978c-02a4698e3d10 is being executed.
INFO:sagemaker:Query 29047a70-656e-4c62-978c-02a4698e3d10 successfully executed.


### Train A XGBoost Model
----
#### Move the target varibale to the first column for our xgboost model

Split train & test dataset

In [374]:
col_order = ["fraud"] + list(dataset.drop(["fraud", "policy_id"], axis=1).columns)

train = dataset.sample(frac=0.80, random_state=0)[col_order]
test = dataset.drop(train.index)[col_order]

### Resolve class imbalance using SMOTE

To handle the imbalance, we can over-sample (i.e. upsample) the minority class using [SMOTE (Synthetic Minority Over-sampling Technique)](https://arxiv.org/pdf/1106.1813.pdf). After installing the imbalanced-learn module, if you receive an ImportError when importing SMOTE, then try restarting the kernel. 

In [375]:
gender = train["customer_gender_female"]
gender.value_counts()

0.0    2793
1.0    1207
Name: customer_gender_female, dtype: int64

#### Gender balance after SMOTE

In [376]:
sm = SMOTE(random_state=42)
train_data_gender_upsampled, gender_res = sm.fit_resample(train, gender)
train_data_gender_upsampled["customer_gender_female"].value_counts()

1.0    2793
0.0    2793
Name: customer_gender_female, dtype: int64

#### Resolve class imbalance for positive and negative samples

In [377]:
target = train_data_gender_upsampled["fraud"]
target.value_counts()

0    5473
1     113
Name: fraud, dtype: int64

In [378]:
sm = SMOTE(random_state=42, sampling_strategy=0.5)
train_data_upsampled, fraudr_res = sm.fit_resample(train_data_gender_upsampled, target)
train_data_upsampled["fraud"].value_counts()

0    5473
1    2736
Name: fraud, dtype: int64

#### Upload new data to S3

In [379]:
train_data_upsampled.to_csv("data/upsampled_train.csv", index=False)
key = f"{prefix}/data/train/upsampled/train.csv"

s3_client.upload_file(
    Filename="data/upsampled_train.csv",
    Bucket=bucket,
    Key=key,
)

train_data_upsampled_s3_path = f"s3://{bucket}/{key}"
print(f"Unsampled training data is uploaded to {train_data_upsampled_s3_path}")

Unsampled training data is uploaded to s3://sagemaker-us-west-2-987720697751/att-demo/data/train/upsampled/train.csv


### Set the hyperparameters
These are the parameters which will be sent to our training script in order to train the model. Although they are all defined as "hyperparameters" here, they can encompass XGBoost's [Learning Task Parameters](https://xgboost.readthedocs.io/en/latest/parameter.html#learning-task-parameters), [Tree Booster Parameters](https://xgboost.readthedocs.io/en/latest/parameter.html#parameters-for-tree-booster), or any other parameters you'd like to configure for XGBoost.

In [385]:
hyperparameters = {
    "max_depth": "3",
    "eta": "0.2",
    "objective": "binary:logistic",
    "num_round": "100"
}

In [386]:
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"

xgb_estimator = XGBoost(
    entry_point="xgboost_starter_script.py",
    source_dir="code",
    hyperparameters=hyperparameters,
    role=sagemaker_role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    framework_version="1.0-1",
)

#### Create Experiment
SageMaker Experiment helps you organize, track, compare and evaluate machine learning (ML) experiments and model versions. SInce ML is a highly iterative process, Experiment helps data scientists and ML engineers to explore thousands of different models in an organized manner. Exspecially when you are using tools like Automatic Model Tuning and Amazon SageMaker Autopilot, it will help you explore a large number of combinations automatically, and quickly zoom in on high-performance models.

![Experiment 1](statics/experiment_1.png) ![Experiment 2](statics/experiment_2.png)

In [387]:
demo_experiment = Experiment.create(
    experiment_name=f"att-demo-{int(time.time())}",
    description="Fraud Detection Demo",
    sagemaker_boto_client=sagemaker_boto_client,
)

#### Create a new experiment trial for training

In [388]:
trial_name = f"single-training-{int(time.time())}"
single_trial = Trial.create(
    trial_name=trial_name,
    experiment_name=demo_experiment.experiment_name,
    sagemaker_boto_client=sagemaker_boto_client,
)

experiment_config={
    "ExperimentName": demo_experiment.experiment_name,
    "TrialName": single_trial.trial_name,
    "TrialComponentDisplayName": "ManualTraining",
}

print(f"The experiment name is {demo_experiment.experiment_name}, and the trail component is {single_trial.trial_name}")

The experiment name is att-demo-1652885956, and the trail component is single-training-1652885958


In [389]:
xgb_estimator.fit(inputs={"train": train_data_upsampled_s3_path}, experiment_config=experiment_config)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2022-05-18-14-59-19-257


2022-05-18 14:59:20 Starting - Starting the training job...
2022-05-18 14:59:45 Starting - Preparing the instances for trainingProfilerReport-1652885959: InProgress
.........
2022-05-18 15:01:15 Downloading - Downloading input data...
2022-05-18 15:01:46 Training - Downloading the training image......
2022-05-18 15:02:43 Training - Training image download completed. Training in progress..[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Invoking user training script.[0m
[34mINFO:sagemaker-containers:Module xgboost_starter_script does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34mINFO:sagemaker-containers:Generating setup.cfg[0m
[34mINFO:sagemaker-containers:Generating MANIFEST.in[0m
[34mINFO:sagemaker-containers:Installing module with the following command:[0m
[34m/miniconda3/bin/python3 -m pip

In [392]:
training_job_info = sagemaker_boto_client.describe_training_job(
    TrainingJobName=xgb_estimator.latest_training_job.job_name
)

model_name = f"{prefix}-xgboost-post-smote"
model = sagemaker_session.create_model_from_job(
    name=model_name,
    training_job_name=training_job_info['TrainingJobName'],
    role=sagemaker_role,
    image_uri=training_job_info['AlgorithmSpecification']['TrainingImage'])

INFO:sagemaker:Creating model with name: att-demo-xgboost-post-smote


### Hyperparameter Tuning

We will tune four hyperparameters in this examples:

- eta: Step size shrinkage used in updates to prevent overfitting. After each boosting step, you can directly get the weights of new features. The eta parameter actually shrinks the feature weights to make the boosting process more conservative.

- max_depth: Maximum depth of a tree. Increasing this value makes the model more complex and likely to be overfitted.

In [393]:
hyperparameter_ranges = {
    "eta": ContinuousParameter(0, 1),
    "max_depth": IntegerParameter(1, 10),
}

In [394]:
objective_metric_name = "validation:auc"

In [395]:
tuner = HyperparameterTuner(
    xgb_estimator, 
    objective_metric_name, 
    hyperparameter_ranges, 
    max_jobs=12, 
    max_parallel_jobs=4
)

In [396]:
tuner.fit(inputs={"train": train_data_upsampled_s3_path})

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating hyperparameter tuning job with name: sagemaker-xgboost-220518-1510


................................................................................................................................................................!


#### Organize HPO training jobs to the Experiement and Trials

In [397]:
from smexperiments.search_expression import Filter, Operator, SearchExpression
from smexperiments.trial_component import TrialComponent

import pytz

# Get the most recently created tuning job

list_tuning_jobs_response = sagemaker_boto_client.list_hyper_parameter_tuning_jobs(
    SortBy="CreationTime", SortOrder="Descending"
)
print(f'Found {len(list_tuning_jobs_response["HyperParameterTuningJobSummaries"])} tuning jobs.')
tuning_jobs = list_tuning_jobs_response["HyperParameterTuningJobSummaries"]
most_recently_created_tuning_job = tuning_jobs[0]
tuning_job_name = most_recently_created_tuning_job["HyperParameterTuningJobName"]
trial_name = demo_experiment.experiment_name + "-hpo"

print(f"Associate all training jobs created by {demo_experiment.experiment_name} with trial {trial_name}")

# create the trial if it doesn't exist
try:
    trial = Trial.load(trial_name=trial_name)
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        trial = Trial.create(experiment_name=demo_experiment.experiment_name, trial_name=trial_name)
        
# Get the trial components derived from the training jobs

creation_time = most_recently_created_tuning_job["CreationTime"]
creation_time = creation_time.astimezone(pytz.utc)
creation_time = creation_time.strftime("%Y-%m-%dT%H:%M:%SZ")

created_after_filter = Filter(
    name="CreationTime",
    operator=Operator.GREATER_THAN_OR_EQUAL,
    value=str(creation_time),
)

# The training job names contain the tuning job name (and the training job name is in the source arn)
source_arn_filter = Filter(
    name="TrialComponentName", operator=Operator.CONTAINS, value=tuning_job_name
)
source_type_filter = Filter(
    name="Source.SourceType", operator=Operator.EQUALS, value="SageMakerTrainingJob"
)

search_expression = SearchExpression(
    filters=[created_after_filter, source_arn_filter, source_type_filter]
)

# Search iterates over every page of results by default
trial_component_search_results = list(
    TrialComponent.search(search_expression=search_expression, sagemaker_boto_client=sagemaker_boto_client)
)
print(f"Found {len(trial_component_search_results)} trial components.")


# Associate the trial components with the trial
for tc in trial_component_search_results:
    print(f"Associating trial component {tc.trial_component_name} with trial {trial.trial_name}.")
    trial.add_trial_component(tc.trial_component_name)
    # sleep to avoid throttling
    time.sleep(0.5)



Found 10 tuning jobs.
Associate all training jobs created by att-demo-1652885956 with trial att-demo-1652885956-hpo
Found 12 trial components.
Associating trial component sagemaker-xgboost-220518-1510-012-04b32da7-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-009-e638790e-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-011-91b5a381-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-010-1dea75c3-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-007-8282a4a8-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-005-74bc7d98-aws-training-job with trial att-demo-1652885956-hpo.
Associating trial component sagemaker-xgboost-220518-1510-008-033b7545-aws-training-job with trial att-demo-16528


Deploy and Serve Model
----

Now that we have trained a model, we can deploy and serve it for inference.

In [469]:
# variables used for parameterizing the notebook run
endpoint_name = f"{model_name}-endpoint-{int(time.time())}"
endpoint_instance_count = 1
endpoint_instance_type = "ml.m4.xlarge"

In [470]:
endpoint_config_name = f"{model_name}-endpoint-config-{int(time.time())}"

data_capture_s3_loaction = f"s3://{bucket}/{prefix}/monitoring/datacapture"


create_ep_config_response = sagemaker_boto_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": endpoint_instance_type,
            "InitialVariantWeight": 1,
            "InitialInstanceCount": endpoint_instance_count,
            "ModelName": model_name,
            "VariantName": "AllTraffic",
        }
    ],
    DataCaptureConfig={
        "EnableCapture": True,
        "InitialSamplingPercentage": 100,
        "DestinationS3Uri": data_capture_s3_loaction,
        "CaptureContentTypeHeader": { 
            "CsvContentTypes": [ "text/csv" ]
        },
        "CaptureOptions": [
            {
                "CaptureMode": "Input"
            },
            {
                "CaptureMode": "Output"
            }
        ],
    }
)

create_endpoint_response = sagemaker_boto_client.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)

endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = endpoint_info["EndpointStatus"]

while endpoint_status == "Creating":
    endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=endpoint_name)
    endpoint_status = endpoint_info["EndpointStatus"]
    print("Endpoint status:", endpoint_status)
    if endpoint_status == "Creating":
        time.sleep(60)

Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: Creating
Endpoint status: InService


#### Run Inference

In [475]:
predictor = sagemaker.predictor.Predictor(
    endpoint_name=endpoint_name, serializer=CSVSerializer(), sagemaker_session=sagemaker_session
)

### Run inference on test data

In [476]:
def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ""
    for array in split_array:
        predictions = ",".join([predictions, predictor.predict(array).decode("utf-8")])

    return np.fromstring(predictions[1:], sep=",")

# run batch prediction
probabilities = predict(test.to_numpy()[:, 1:])

def calibrate(probabilities, cutoff=.2):
    predictions = []
    for p in probabilities:
        if p <= cutoff:
            predictions.append(0)
        else:
            predictions.append(1)
    return predictions

# run calibration and visualize the results
predictions = np.asarray(calibrate(probabilities, 0.15))

pd.crosstab(
    index=test.iloc[:, 0],
    columns=predictions,
    rownames=["actual"],
    colnames=["predictions"],
)

predictions,0,1
actual,Unnamed: 1_level_1,Unnamed: 2_level_1
0,971,6
1,21,2


### Sample from Online Feature Store

In [477]:
%%time
sample_policy_id = 1050

t1 = time.time()

combined_response = featurestore_runtime.get_record(
    FeatureGroupName=combined_fg_name, RecordIdentifierValueAsString=str(sample_policy_id)
)

t2 = time.time()

print(f"Online feature store query speed {(t2-t1)*1000} ms\n")

combined_record = combined_response["Record"]
combined_df = pd.DataFrame(combined_record).set_index("FeatureName")

sample_acutal = int(combined_df.loc['fraud'][0])

blended_df = combined_df.loc[col_order].drop(["fraud"])

data_input = ",".join([str(x) for x in blended_df["ValueAsString"]])

results = predictor.predict(data_input, initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)
print(f"For policy {int(sample_policy_id)}: prediction is {np.round(prediction)} and actual is {sample_acutal}")
print("------------------\n")

Online feature store query speed 98.91629219055176 ms

For policy 1050: prediction is 0.0 and actual is 0
------------------

CPU times: user 18.8 ms, sys: 0 ns, total: 18.8 ms
Wall time: 134 ms


### Model Monitoring

![Model Monitoring](statics/Model_monitoring.png)

**1. Constraint suggestion with baseline/training dataset**

The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset data schema and the inference dataset schema should exactly match (i.e. the number and order of the features).

From the training dataset you can ask Amazon SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data. For this example, upload the training dataset that was used to train the pre-trained model included in this example. If you already have it in Amazon S3, you can directly point to it.

In [478]:
# this is our training dataset
baseline_data_uri = train_data_upsampled_s3_path
baseline_results_prefix = f"{prefix}/monitoring/baselining/results"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"

#### 1. Create a baselining job with training dataset
Now that you have the training data ready in Amazon S3, start a job to suggest constraints. DefaultModelMonitor.suggest_baseline(..) starts a ProcessingJob using an Amazon SageMaker provided Model Monitor container to generate the constraints.

In [479]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2022-05-19-18-43-11-183



Job Name:  baseline-suggestion-job-2022-05-19-18-43-11-183
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-987720697751/att-demo/data/train/upsampled/train.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-987720697751/att-demo/monitoring/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
...........................[34m2022-05-19 18:47:28,366 - matplotlib.font_manager - INFO - Generating new fontManager, this may take some time...[0m
[34m2022-05-19 18:47:28.886681: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot op

<sagemaker.processing.ProcessingJob at 0x7f34c541df10>

In [480]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
att-demo/monitoring/baselining/results/constraints.json
 att-demo/monitoring/baselining/results/statistics.json


In [481]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

  after removing the cwd from sys.path.


Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,fraud,Integral,8209,0,0.333293,2736.0,0.47139,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,..."
1,driver_relationship_spouse,Fractional,8209,0,0.097503,800.4006,0.272649,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.13..."
2,policy_state_ca,Fractional,8209,0,0.591132,4852.602,0.446117,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[1.0, 0.9318909752764775, 1.0, 0.935378161307..."
3,incident_type_breakin,Fractional,8209,0,0.128647,1056.066,0.298105,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.06810902472352254, 0.0, 0.0, 0.0, 0.0..."
4,num_vehicles_involved,Integral,8209,0,1.834206,15057.0,0.803161,1.0,7.0,"[{'lower_bound': 1.0, 'upper_bound': 1.6, 'cou...",0.64,2048.0,"[[2.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 2.0, 1.0,..."
5,incident_month,Integral,8209,0,6.739432,55324.0,3.331625,1.0,12.0,"[{'lower_bound': 1.0, 'upper_bound': 2.1, 'cou...",0.64,2048.0,"[[4.0, 10.0, 9.0, 9.0, 6.0, 6.0, 3.0, 3.0, 2.0..."
6,incident_type_theft,Fractional,8209,0,0.052531,431.2282,0.202187,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.2181743295932732, 0.0, 0.0, 0.44..."
7,authorities_contacted_none,Fractional,8209,0,0.279597,2295.215,0.411311,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.5035469815118861, 0.9318909752764775, 0.0,..."
8,num_injuries,Integral,8209,0,0.39274,3224.0,0.821638,0.0,4.0,"[{'lower_bound': 0.0, 'upper_bound': 0.4, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0,..."
9,vehicle_claim,Integral,8209,0,16518.060665,135596800.0,9252.604103,1000.0,51000.0,"[{'lower_bound': 1000.0, 'upper_bound': 6000.0...",0.64,2048.0,"[[4248.0, 16000.0, 17890.0, 11000.0, 15826.0, ..."


In [482]:
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

  


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,fraud,Integral,1.0,True
1,driver_relationship_spouse,Fractional,1.0,True
2,policy_state_ca,Fractional,1.0,True
3,incident_type_breakin,Fractional,1.0,True
4,num_vehicles_involved,Integral,1.0,True
5,incident_month,Integral,1.0,True
6,incident_type_theft,Fractional,1.0,True
7,authorities_contacted_none,Fractional,1.0,True
8,num_injuries,Integral,1.0,True
9,vehicle_claim,Integral,1.0,True


#### 2. Create a schedule to analyze collected data for data quality issues

In [None]:
my_default_monitor.delete_monitoring_schedule()

In [484]:
from time import gmtime, strftime
from sagemaker.model_monitor import CronExpressionGenerator

mon_schedule_name = f"{prefix}-monitoring-job-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime())

s3_report_path = f"s3://{bucket}/{prefix}/montoring/report"

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: att-demo-monitoring-job-2022-05-19-18-49-55


In [486]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

Schedule status: Scheduled


#### Generate some artificial traffic 

In [488]:
import random

count = 0
for i in range(1020, 1060):

    combined_response = featurestore_runtime.get_record(
        FeatureGroupName=combined_fg_name, RecordIdentifierValueAsString=str(i)
    )

    combined_record = combined_response["Record"]
    combined_df = pd.DataFrame(combined_record).set_index("FeatureName")

    blended_df = combined_df.loc[col_order].drop(["fraud"])
    
#     input_list = []
#     for x in blended_df["ValueAsString"]:
#         if bool(random.getrandbits(1)):
#             input_list.append(str(x))
#         else:
#             input_list.append(f"{str(random.randint(0, 100))}")
#     data_input = ",".join(input_list)

    data_input = ",".join([str(x) for x in blended_df["ValueAsString"]])

    results = predictor.predict(data_input, initial_args={"ContentType": "text/csv"})
    
    count+=1
    
print(f"{count} artificial traffic predicted...")

40 artificial traffic predicted...


In [466]:
mon_executions = my_default_monitor.list_executions()
mon_executions

[<sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f34c6118c90>]

## Clean up feature store

In [15]:
import boto3

client = boto3.client('sagemaker')

In [40]:
client.delete_feature_group(
    FeatureGroupName='fraud-detect-demo-customers-api'
)

{'ResponseMetadata': {'RequestId': '1e2226af-d8dc-4f44-a794-d71a43391ca8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1e2226af-d8dc-4f44-a794-d71a43391ca8',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Thu, 12 May 2022 19:16:29 GMT'},
  'RetryAttempts': 0}}