## Amazon Sagemaker Feature Store


In [1]:
import pandas as pd
import time
import sagemaker
import boto3
from sagemaker_graph_fraud_detection import config
from sagemaker.feature_store.feature_group import FeatureGroup

role = config.role
sess = sagemaker.Session()
bucket = sess.default_bucket()  
region = sagemaker.Session().boto_region_name

# read the prepared data from S3. Enter any of the Results processed file S3 location
source = f"s3://{bucket}/AutoInsuranceFraudDetection/Results/DataWrangler/output_1631613461/part-00000-a84a141e-2808-4012-86d5-9b8c43f33ce9-c000.csv"
df = pd.read_csv(source)

When creating a feature group, you can also create the metadata for the feature group, such as a short description, storage configuration, features for identifying each record, and the event time, as well as tags to store information such as the author, data source, version, and more. Since we do not have any such column, we are adding two extra columns called Fraud_ID and Fraud_time

In [2]:
#Add unique ID and event time for features store

df['Fraud_ID'] = df.index + 1000
current_time_sec = int(round(time.time()))
df['Fraud_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
df=df.drop(['_c0'],axis=1)
df.head()

Unnamed: 0,age,policy_number,policy_state,policy_deductable,policy_annual_premium,umbrella_limit,insured_sex,insured_education_level,insured_occupation,insured_relationship,...,witnesses,police_report_available,total_claim_amount,injury_claim,property_claim,vehicle_claim,auto_make,fraud_reported,Fraud_ID,Fraud_time
0,48,521585,2,1000,1406.91,0,1,4,2,0,...,2,2,71610,6510,13020,52080,10,1.0,1000,1631634000.0
1,42,342868,1,2000,1197.22,5000000,1,4,6,2,...,0,0,5070,780,780,3510,8,1.0,1001,1631634000.0
2,29,687698,2,2000,1413.14,5000000,0,6,11,3,...,3,1,34650,7700,3850,23100,4,0.0,1002,1631634000.0
3,41,227811,0,2000,1415.74,6000000,0,6,1,4,...,2,1,63400,6340,6340,50720,3,1.0,1003,1631634000.0
4,44,367455,0,1000,1583.91,6000000,1,0,11,4,...,1,1,6500,1300,650,4550,0,0.0,1004,1631634000.0


In [3]:
# initialize boto3 client

boto3.setup_default_session(region_name=region)
s3_client = boto3.client("s3", region_name=region)

### Configure the feature groups
The datatype for each feature is set by passing a dataframe and inferring the proper datatype. Feature data types can also be set via a config variable, but it will have to match the correspongin Python data type in the Pandas dataframe when it’s ingested to the Feature Group.

In [4]:
#configure the features

fraud_fg_name = f"auto-fraud"
fraud_feature_group = FeatureGroup(name=fraud_fg_name, sagemaker_session=sess)
#fraud_feature_group.delete()
fraud_feature_group.load_feature_definitions(data_frame=df)

[FeatureDefinition(feature_name='age', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='policy_number', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='policy_state', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='policy_deductable', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='policy_annual_premium', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='umbrella_limit', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='insured_sex', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='insured_education_level', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='insured_occupation', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='insured_relationship'

### Create the feature groups
You must tell the Feature Group which columns in the dataframe correspond to the required record indentifier and event time features.

In [5]:
record_identifier_feature_name = "Fraud_ID"
event_time_feature_name = "Fraud_time"
fraud_feature_group_s3_prefix="AutoInsuranceFraudDetection/FeatureStore"

try:
    fraud_feature_group.create(
        s3_uri=f"s3://{bucket}/{fraud_feature_group_s3_prefix}",
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=role,
        enable_online_store=True,
    )
    print(f'Create "fraud" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print(f"Using existing feature group: {fraud_fg_name}")
    else:
        raise (e)

Create "fraud" feature group: SUCCESS


### Wait until feature group creation has fully completed

In [6]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


wait_for_feature_group_creation_complete(feature_group=fraud_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup auto-fraud successfully created.


### Ingest records into the Feature Groups
After the Feature Groups have been created, we can put data into each store by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to s3 in chunks. The files will be written to the offline store within a few minutes of ingestion.

In [8]:
fraud_feature_group.ingest(data_frame=df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='auto-fraud', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f16da5b7490>, max_workers=3, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7f16db332b10>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

### Wait for offline store data to become available
This usually takes 5-8 minutes

In [9]:
#The FeatureGroup contains an OnlineStoreConfig and an OfflineStoreConfig controlling where the data is stored. 

fraud_feature_group_resolved_output_s3_uri = (
    fraud_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

fraud_feature_group_s3_prefix = fraud_feature_group_resolved_output_s3_uri.replace(
    f"s3://{bucket}/", ""
)

245582852906

Data location AutoInsuranceFraudDetection/FeatureStore/245582852906/sagemaker/ap-south-1/offline-store/auto-fraud-1631634108/data


In [10]:
offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=bucket, Prefix=fraud_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...")
        time.sleep(60)

print("\nData is available now.")

Waiting for data in offline store...
Waiting for data in offline store...
Waiting for data in offline store...
Waiting for data in offline store...

Data available.
