# Prepare Data in this Step
 - ## Data Wrangler: To Make transformation and process the data
 - ## Sagemaker Feature Store: To store the features
   
1. Use Data Wrangler UI to transform and preprocess the data, create .flow files.
2. Use .flow files to create the processed datasets.
3. Store the features (processed dataset) in feature store
4. FeatureGroup Creation, Ingest batch data and retrive records

References:
 - https://github.com/aws/amazon-sagemaker-examples/blob/4534bff4b5b5062af5789d98c4ddca01b0cb5d1f/end_to_end/fraud_detection/0-AutoClaimFraudDetection.ipynb
 - https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-introduction-notebook.html
 - https://boto3.amazonaws.com/v1/documentation/api/1.20.9/reference/services/sagemaker-featurestore-runtime.html



In [2]:
from sagemaker import get_execution_role
from sagemaker.session import Session
import sagemaker
import boto3
import pandas as pd
import numpy as np 
from time import strftime, gmtime
import time
from sagemaker.feature_store.feature_group import FeatureGroup

In [3]:
sagemaker_session = Session()
REGION = sagemaker_session.boto_region_name
BUCKET = sagemaker_session.default_bucket()
PREFIX = "FraudDetection_AutoInsurance"

s3_client = boto3.client("s3")
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
print(REGION)

us-east-1


In [4]:
"""
Note: if you are not running this notebook from SageMaker Studio or SageMaker Classic Notebooks you will need to instanatiate 
the sagemaker_execution_role_name with an AWS role that has SageMakerFullAccess and SageMakerFeatureStoreFullAccess
"""
try:
    ROLE = sagemaker.get_execution_role()
    print(ROLE)
except:
    print("Need to set role manually")
    exec_role_name = "AmazonSageMaker-ExecutionRole-20250401T145997"
    sagemaker_role = iam_client.get_role(RoleName=exec_role_name)["Role"]["Arn"]
    print(f"\n instantiating sagemaker_role with supplied role name : {sagemaker_role}")

account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

arn:aws:iam::205930620783:role/service-role/AmazonSageMaker-ExecutionRole-20250401T145997
205930620783


In [6]:
# Upload Files to S3 Buckets
#s3_client.upload_file(Filename="Data/customers.csv", Bucket=BUCKET, Key=f"{PREFIX}/data/raw/customers.csv")
#s3_client.upload_file(Filename="Data/claims.csv", Bucket=BUCKET, Key=f"{PREFIX}/data/raw/claims.csv")

### USE DATA WRANGLER UI TO TRANSFORM THE ABOVE RAW DATA AND GENERATE THE BELOW PROCESSED DATA
**I am skipping for this now. But will learn more about this step

In [9]:
s3_uri = f"s3://{BUCKET}"

claims_data = pd.read_csv(f"{s3_uri}/{PREFIX}/data/claims_preprocessed.csv")
customers_data = pd.read_csv(f"{s3_uri}/{PREFIX}/data/customers_preprocessed.csv")
print(claims_data.dtypes)
print(customers_data.dtypes)

policy_id                            int64
incident_severity                  float64
num_vehicles_involved                int64
num_injuries                         int64
num_witnesses                        int64
police_report_available            float64
injury_claim                         int64
vehicle_claim                        int64
total_claim_amount                   int64
incident_month                       int64
incident_day                         int64
incident_dow                         int64
incident_hour                        int64
fraud                                int64
driver_relationship_self           float64
driver_relationship_na             float64
driver_relationship_spouse         float64
driver_relationship_child          float64
driver_relationship_other          float64
incident_type_collision            float64
incident_type_breakin              float64
incident_type_theft                float64
collision_type_front               float64
collision_t

In [10]:
print(claims_data.columns, "\n")
print(customers_data.columns, "\n")
print()
print("Policy id is the unique identifier in both cases")
print(claims_data.shape[0], claims_data['policy_id'].nunique())
print(customers_data.shape[0], customers_data['policy_id'].nunique())

Index(['policy_id', 'incident_severity', 'num_vehicles_involved',
       'num_injuries', 'num_witnesses', 'police_report_available',
       'injury_claim', 'vehicle_claim', 'total_claim_amount', 'incident_month',
       'incident_day', 'incident_dow', 'incident_hour', 'fraud',
       'driver_relationship_self', 'driver_relationship_na',
       'driver_relationship_spouse', 'driver_relationship_child',
       'driver_relationship_other', 'incident_type_collision',
       'incident_type_breakin', 'incident_type_theft', 'collision_type_front',
       'collision_type_rear', 'collision_type_side', 'collision_type_na',
       'authorities_contacted_police', 'authorities_contacted_none',
       'authorities_contacted_fire', 'authorities_contacted_ambulance'],
      dtype='object') 

Index(['policy_id', 'customer_age', 'customer_education', 'months_as_customer',
       'policy_deductable', 'policy_annual_premium', 'policy_liability',
       'auto_year', 'num_claims_past_year', 'num_insurers_pa

### Create Two Feature Groups for claims and customers.
Feature Groups is equivalent to tables/datasets.

In [7]:
customers_feature_group_name = "customers-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
claims_feature_group_name = "claims-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
print(customers_feature_group_name, "\n", claims_feature_group_name)

customers-feature-group-25-12-17-38 
 claims-feature-group-25-12-17-38


In [9]:
# Initiate Feature Group objects. Each feature group reprsents a table.
customers_feature_group = FeatureGroup(customers_feature_group_name, sagemaker_session)
claims_feature_group = FeatureGroup(claims_feature_group_name, sagemaker_session)

In [10]:
# Add event time in the datasets, that is required
customers_data['EventTime'] = time.time()
claims_data['EventTime'] = time.time()
print(customers_data['EventTime'])

0       1.748176e+09
1       1.748176e+09
2       1.748176e+09
3       1.748176e+09
4       1.748176e+09
            ...     
4995    1.748176e+09
4996    1.748176e+09
4997    1.748176e+09
4998    1.748176e+09
4999    1.748176e+09
Name: EventTime, Length: 5000, dtype: float64


### Load Feature Definitions into the feature groups

In [11]:
customers_feature_group.load_feature_definitions(data_frame=customers_data)
claims_feature_group.load_feature_definitions(data_frame=claims_data)

[FeatureDefinition(feature_name='policy_id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='incident_severity', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='num_vehicles_involved', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='num_injuries', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='num_witnesses', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='police_report_available', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='injury_claim', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='vehicle_claim', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>,

### The following calls create to create two feature groups, customers_feature_group and orders_feature_group, respectively:
Point to note, only the necessary structure will be created on "Feature Store" we still need to load the data yet.

In [12]:

customers_feature_group.create(
    s3_uri = f"{s3_uri}/{PREFIX}/data/feature_store",
    record_identifier_name='policy_id',
    event_time_feature_name="EventTime",
    role_arn=ROLE,
    enable_online_store=True
)


claims_feature_group.create(
    s3_uri = f"{s3_uri}/{PREFIX}/data/feature_store",
    record_identifier_name='policy_id',
    event_time_feature_name="EventTime",
    role_arn=ROLE,
    enable_online_store=True
)

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/claims-feature-group-25-12-17-38',
 'ResponseMetadata': {'RequestId': '0dad4a61-3f7a-49d9-84f0-4b61842e6a13',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0dad4a61-3f7a-49d9-84f0-4b61842e6a13',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '109',
   'date': 'Sun, 25 May 2025 12:22:20 GMT'},
  'RetryAttempts': 1}}

In [17]:
print(customers_feature_group.describe())

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/customers-feature-group-25-12-17-38', 'FeatureGroupName': 'customers-feature-group-25-12-17-38', 'RecordIdentifierFeatureName': 'policy_id', 'EventTimeFeatureName': 'EventTime', 'FeatureDefinitions': [{'FeatureName': 'policy_id', 'FeatureType': 'Integral'}, {'FeatureName': 'customer_age', 'FeatureType': 'Integral'}, {'FeatureName': 'customer_education', 'FeatureType': 'Integral'}, {'FeatureName': 'months_as_customer', 'FeatureType': 'Integral'}, {'FeatureName': 'policy_deductable', 'FeatureType': 'Integral'}, {'FeatureName': 'policy_annual_premium', 'FeatureType': 'Integral'}, {'FeatureName': 'policy_liability', 'FeatureType': 'Integral'}, {'FeatureName': 'auto_year', 'FeatureType': 'Integral'}, {'FeatureName': 'num_claims_past_year', 'FeatureType': 'Integral'}, {'FeatureName': 'num_insurers_past_5_years', 'FeatureType': 'Integral'}, {'FeatureName': 'customer_gender_male', 'FeatureType': 'Fractional'}, {'Featur

### List Feature Groups

In [11]:
print(sagemaker_session.boto_session.client('sagemaker').list_feature_groups())
print()
print(boto3.client('sagemaker').list_feature_groups())

{'FeatureGroupSummaries': [{'FeatureGroupName': 'customers-feature-group-25-12-17-38', 'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/customers-feature-group-25-12-17-38', 'CreationTime': datetime.datetime(2025, 5, 25, 12, 22, 19, 134000, tzinfo=tzlocal()), 'FeatureGroupStatus': 'Created', 'OfflineStoreStatus': {'Status': 'Active'}}, {'FeatureGroupName': 'claims-feature-group-25-12-17-38', 'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/claims-feature-group-25-12-17-38', 'CreationTime': datetime.datetime(2025, 5, 25, 12, 22, 20, 178000, tzinfo=tzlocal()), 'FeatureGroupStatus': 'Created', 'OfflineStoreStatus': {'Status': 'Active'}}], 'ResponseMetadata': {'RequestId': '233c2494-73ed-4108-8a4c-98cd057027d4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '233c2494-73ed-4108-8a4c-98cd057027d4', 'content-type': 'application/x-amz-json-1.1', 'content-length': '569', 'date': 'Wed, 11 Jun 2025 12:07:12 GMT'}, 'RetryAttempts': 0

In [12]:
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")

check_feature_group_status(customers_feature_group)
check_feature_group_status(claims_feature_group)

### Upload data into feature groups

In [23]:
customers_feature_group.ingest(data_frame=customers_data, max_workers=3, wait=True) 
claims_feature_group.ingest(data_frame=claims_data, max_workers=3, wait=True) 

IngestionManagerPandas(feature_group_name='claims-feature-group-25-12-17-38', feature_definitions={'policy_id': {'FeatureName': 'policy_id', 'FeatureType': 'Integral'}, 'incident_severity': {'FeatureName': 'incident_severity', 'FeatureType': 'Fractional'}, 'num_vehicles_involved': {'FeatureName': 'num_vehicles_involved', 'FeatureType': 'Integral'}, 'num_injuries': {'FeatureName': 'num_injuries', 'FeatureType': 'Integral'}, 'num_witnesses': {'FeatureName': 'num_witnesses', 'FeatureType': 'Integral'}, 'police_report_available': {'FeatureName': 'police_report_available', 'FeatureType': 'Fractional'}, 'injury_claim': {'FeatureName': 'injury_claim', 'FeatureType': 'Integral'}, 'vehicle_claim': {'FeatureName': 'vehicle_claim', 'FeatureType': 'Integral'}, 'total_claim_amount': {'FeatureName': 'total_claim_amount', 'FeatureType': 'Integral'}, 'incident_month': {'FeatureName': 'incident_month', 'FeatureType': 'Integral'}, 'incident_day': {'FeatureName': 'incident_day', 'FeatureType': 'Integral'

### If feature group already exists then initialize the feature group object and get the data

In [16]:
print(sagemaker_session.boto_session.client('sagemaker').list_feature_groups())
print()
feature_groups = boto3.client('sagemaker').list_feature_groups()

{'FeatureGroupSummaries': [{'FeatureGroupName': 'customers-feature-group-25-12-17-38', 'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/customers-feature-group-25-12-17-38', 'CreationTime': datetime.datetime(2025, 5, 25, 12, 22, 19, 134000, tzinfo=tzlocal()), 'FeatureGroupStatus': 'Created', 'OfflineStoreStatus': {'Status': 'Active'}}, {'FeatureGroupName': 'claims-feature-group-25-12-17-38', 'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/claims-feature-group-25-12-17-38', 'CreationTime': datetime.datetime(2025, 5, 25, 12, 22, 20, 178000, tzinfo=tzlocal()), 'FeatureGroupStatus': 'Created', 'OfflineStoreStatus': {'Status': 'Active'}}], 'ResponseMetadata': {'RequestId': '35d08d8f-917b-407c-8bdf-a2c049753087', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '35d08d8f-917b-407c-8bdf-a2c049753087', 'content-type': 'application/x-amz-json-1.1', 'content-length': '569', 'date': 'Wed, 11 Jun 2025 12:12:27 GMT'}, 'RetryAttempts': 0

In [18]:
customers_feature_group_name = feature_groups['FeatureGroupSummaries'][0]['FeatureGroupName']
claims_feature_group_name = feature_groups['FeatureGroupSummaries'][1]['FeatureGroupName']

In [20]:
claims_feature_group = FeatureGroup(name=claims_feature_group_name, sagemaker_session=sagemaker_session)
customers_feature_group = FeatureGroup(name=customers_feature_group_name, sagemaker_session=sagemaker_session)
claims_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205930620783:feature-group/claims-feature-group-25-12-17-38',
 'FeatureGroupName': 'claims-feature-group-25-12-17-38',
 'RecordIdentifierFeatureName': 'policy_id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'policy_id',
   'FeatureType': 'Integral'},
  {'FeatureName': 'incident_severity', 'FeatureType': 'Fractional'},
  {'FeatureName': 'num_vehicles_involved', 'FeatureType': 'Integral'},
  {'FeatureName': 'num_injuries', 'FeatureType': 'Integral'},
  {'FeatureName': 'num_witnesses', 'FeatureType': 'Integral'},
  {'FeatureName': 'police_report_available', 'FeatureType': 'Fractional'},
  {'FeatureName': 'injury_claim', 'FeatureType': 'Integral'},
  {'FeatureName': 'vehicle_claim', 'FeatureType': 'Integral'},
  {'FeatureName': 'total_claim_amount', 'FeatureType': 'Integral'},
  {'FeatureName': 'incident_month', 'FeatureType': 'Integral'},
  {'FeatureName': 'incident_day', 'FeatureType': 'Integral'},
  {'Fe

### Use Athena query to load the datasets

In [45]:
claims_query = claims_feature_group.athena_query()
print(claims_query.table_name, "\n", claims_query.database)
customers_query = customers_feature_group.athena_query()
print(customers_query.table_name, "\n", customers_query.database)

query1 = f"SELECT * FROM {customers_query.table_name}"
query2 = f"SELECT * FROM {claims_query.table_name}"

claims_query.run(query1, output_location=f"s3://{BUCKET}/FraudDetection_AutoInsurance/data/query_results")
claims_query.wait() 
customers_dataset = claims_query.as_dataframe()
claims_query.run(query2, output_location=f"s3://{BUCKET}/FraudDetection_AutoInsurance/data/query_results")
claims_query.wait() 
claims_dataset = claims_query.as_dataframe()

print(claims_dataset['is_deleted'].value_counts())
print(claims_dataset['policy_id'].nunique() == claims_data['policy_id'].shape[0])
print(customers_dataset['is_deleted'].value_counts())
print(customers_dataset['policy_id'].nunique() == claims_data['policy_id'].shape[0])

claims_feature_group_25_12_17_38_1748175740 
 sagemaker_featurestore
customers_feature_group_25_12_17_38_1748175739 
 sagemaker_featurestore


is_deleted
False    10000
Name: count, dtype: int64
True
is_deleted
False    10000
Name: count, dtype: int64
True


In [47]:
# Drop the logically deleted records, and keep the latest records only based on the metadata columns. Delete the metadata columns in last

keep_latest = lambda df: df.sort_values('write_time',ascending=False).drop_duplicates(['policy_id'], keep='first').reset_index(drop=True)
claims_dataset = keep_latest(claims_dataset[claims_dataset['is_deleted']==False])
customers_dataset = keep_latest(customers_dataset[customers_dataset['is_deleted']==False])

metadata_columns = ['write_time', 'api_invocation_time', 'is_deleted']
dataset = pd.merge(
    claims_dataset.drop(columns=metadata_columns),
    customers_dataset.drop(columns=metadata_columns),
    on=['policy_id']
)
print(claims_dataset.shape) 
print(customers_dataset.shape) 
print(dataset.shape)
print(dataset.dtypes)


(5000, 34)
(5000, 22)
(5000, 49)
policy_id                            int64
incident_severity                  float64
num_vehicles_involved                int64
num_injuries                         int64
num_witnesses                        int64
police_report_available            float64
injury_claim                         int64
vehicle_claim                        int64
total_claim_amount                   int64
incident_month                       int64
incident_day                         int64
incident_dow                         int64
incident_hour                        int64
fraud                                int64
driver_relationship_self           float64
driver_relationship_na             float64
driver_relationship_spouse         float64
driver_relationship_child          float64
driver_relationship_other          float64
incident_type_collision            float64
incident_type_breakin              float64
incident_type_theft                float64
collision_type_front 

In [49]:
dataset_col_order = ['fraud']  + list(dataset.drop(["fraud", "policy_id"], axis=1).columns)
train = dataset.sample(frac=.80, random_state=0)[dataset_col_order]
test = dataset.drop(train.index)[dataset_col_order]
train.to_csv("data/train.csv", index=False)
test.to_csv("data/test.csv", index=False)
dataset.to_csv("data/dataset.csv", index=True)

In [50]:
# Upload the data to S3
s3_client = sagemaker_session.boto_session.client("s3")
s3_client.upload_file('data/train.csv', Bucket=BUCKET, Key="FraudDetection_AutoInsurance/data/train.csv")
s3_client.upload_file('data/test.csv', Bucket=BUCKET, Key="FraudDetection_AutoInsurance/data/test.csv")
s3_client.upload_file('data/dataset.csv', Bucket=BUCKET, Key="FraudDetection_AutoInsurance/data/dataset.csv")


In [54]:
print(train.select_dtypes(['int','float']).shape)
print(train.shape)

(4000, 48)
(4000, 48)
