# Sagemaker Pipeline for Feature Build and Load to Feature Store

In [None]:
import boto3
import sagemaker

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"FeatureStorePackage"


In [None]:
claim_fraud_bucket_name = 'demo-insurance-claims'
file_key = 'claims_feature_store.csv'
input_data_uri = 's3://{}/{}'.format(claim_fraud_bucket_name, file_key)

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
storage_instance_type = ParameterString(
    name="FeatureStoreInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


## Create the processing script

Four steps to create the Feature Processing step in the Pipeline:

* We define the features as functions in the file : ```fs_job/Features.py```
* We create an instance of SKLearnProcessor
* Define a script to process an input file : ```fs_job/preprocessing.py```
* We create a step in the Pipeline that executes the feature processing job.


In [None]:
%%writefile fs_job/Features.py

"""
This file defines your custom features.
It will be used to populate a custom feature group.
The contents of this file will be inspected to determine
the number of features (number of functions).

These functions need to be aware of the column names in the dataframe
"""
import pandas as pd

def InjuryToVehicleClaimRatio(df):
    return df["injury_claim"]/df["vehicle_claim"]

def ReportIsStrange(df):
    regex = "weird|strange|inconsistent|unusual|suspicious"
    return df["report"].str.contains(regex, regex=True, case=False, na=False).map(int)



In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

claims_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-feature-process",
    role=role,
)

In [None]:
%%writefile fs_job/preprocessing.py

from inspect import getmembers, isfunction

import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd
  
import sys
src_path = "/opt/ml/processing/src"
sys.path.append(src_path)

sys.stderr.write("PATH UPDATED ")
sys.stderr.write( str(sys.path) )

files =  os.listdir(src_path)
sys.stderr.write( str(files) )

import Features as features

funcs = getmembers(features, isfunction)

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/claims_feature_store.csv",
    )
    
    for f in funcs:
        feature_name = f[0]
        function = f[1]
        df[feature_name] = function(df)
        
    pd.DataFrame(df).to_csv(f"{base_dir}/output/claims_feature_store.csv", header=True, index=False)

    sample = df.head()
    
    pd.DataFrame(sample).to_csv(f"{base_dir}/sample/sample.csv", header=True, index=False)
    

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

features_file = "fs_job/Features.py"

step_process = ProcessingStep(
    name="ClaimsProcess",
    processor=claims_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), 
      ProcessingInput(source=features_file, destination="/opt/ml/processing/src"),  
    ],
    outputs=[
        ProcessingOutput(output_name="output", source="/opt/ml/processing/output"),
        ProcessingOutput(output_name="sample", source="/opt/ml/processing/sample")
    ],
    code="fs_job/preprocessing.py",
)


### Create a Processing Job that Loads the results into the Feature Store

A second processing job, that loads the results of the previous step into Sagemaker Feature Store as a predetermined Feature Group. The default behaviour is over-write any exisiting Feature Group of the same name.

Note: We are having to install the sagemaker package to make this work. Ideally, you would create a custom image that has everything you need for this processing task.

In [None]:
%%writefile fs_job/feature_store.py

import pandas as pd
import subprocess
import sys
import os

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    
install("sagemaker")

os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"

import boto3
import sagemaker
from sagemaker.session import Session

role = sagemaker.get_execution_role()

sys.stderr.write( ("ARN ROLE" + role) )

region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
boto_session = boto3.Session(region_name=region)
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker-featurestore'
offline_feature_store_bucket = 's3://{}/{}'.format(default_bucket, prefix)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

s3_client = boto_session.client(service_name='s3', region_name=region)

src_path = "/opt/ml/processing/src"
sys.path.append(src_path)

sys.stderr.write( "LISTING SRC DIR" )
files =  os.listdir(src_path)
sys.stderr.write( str(files) )

sys.stderr.write( "LISTING DATA DIR" )
files =  os.listdir("/opt/ml/processing/data/")
sys.stderr.write( str(files) )

import FeatureStoreUtils as fsu

if __name__ == "__main__":

    data_file_path = "/opt/ml/processing/data/claims_feature_store.csv"
    claims_data = pd.read_csv(data_file_path)
    
    # IDEALLY THESE WOULD BE PASSED AS PARAMETERS
    feature_group_name = "Claims-Features"
    feature_group_desc = "Core Insurance Claims data for building various models"
    record_identifier_name = "policy_id"
    event_time_feature_name = "claim_date"

    fg = fsu.recreate_feature_store_from_dataframe(
        df=claims_data, 
        role=role, 
        fs_session=feature_store_session, 
        sm_client=sagemaker_client, 
        fg_name=feature_group_name, 
        fg_descr=feature_group_desc, 
        record_col=record_identifier_name, 
        event_col=event_time_feature_name, 
        s3_uri=offline_feature_store_bucket
    )


In [None]:
fs_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="feature-store-process",
    role=role,
)

In [None]:

fs_utils = "fs_job/FeatureStoreUtils.py"

step_feature_store = ProcessingStep(
    name="FeatureStoreStep",
    processor=fs_processor,
    inputs=[
      ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["sample"].S3Output.S3Uri, destination="/opt/ml/processing/sample"), 
      ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri, destination="/opt/ml/processing/data"), 
      ProcessingInput(source=fs_utils, destination="/opt/ml/processing/src"),
    ],
    outputs=[
        ProcessingOutput(output_name="output", source="/opt/ml/processing/output")
    ],
    code="fs_job/feature_store.py",
)


In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"FeatureStorePipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type, 
        storage_instance_type,
        input_data,
    ],
    steps=[step_process, step_feature_store],
)

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()