This code was borrowed from AAI 540 labs

https://github.com/mechristenson/aai-540-labs.git

## Create S3 Bucket

In [81]:
import pandas as pd
import numpy as np
import boto3
import sagemaker
import datetime
import time
import os, re, sys, json, warnings
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, f1_score, accuracy_score
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from time import gmtime, strftime
from tqdm import tqdm

warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
#!pip install --upgrade boto3 botocore awscli

In [2]:
session = boto3.session.Session()
region = session.region_name
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

s3 = boto3.Session().client(service_name="s3", region_name=region)

In [3]:
print("Default bucket: {}".format(bucket))

Default bucket: sagemaker-us-east-1-046536196377


Verify S3_BUCKET Bucket Creation

In [4]:
from botocore.client import ClientError

response = None

try:
    response = s3.head_bucket(Bucket=bucket)
    print(response)
    setup_s3_bucket_passed = True
except ClientError as e:
    print("[ERROR] Cannot find bucket {} in {} due to {}.".format(bucket, response, e))

{'ResponseMetadata': {'RequestId': 'T46HY3T5EEXZJ5S7', 'HostId': 'a5FUXsiIcKNBHnVbpCx/DKx+6IA5AbTdasWP6h8/R7AIyyfK16sf1abGzWQm/Y+O05OuEDEG8BHRk6eIvQEx1A==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'a5FUXsiIcKNBHnVbpCx/DKx+6IA5AbTdasWP6h8/R7AIyyfK16sf1abGzWQm/Y+O05OuEDEG8BHRk6eIvQEx1A==', 'x-amz-request-id': 'T46HY3T5EEXZJ5S7', 'date': 'Thu, 09 Oct 2025 00:41:40 GMT', 'x-amz-bucket-region': 'us-east-1', 'x-amz-access-point-alias': 'false', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'BucketRegion': 'us-east-1', 'AccessPointAlias': False}


## Set up Data lake

In [5]:
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

#### Set S3 Destination Folder

In [6]:
s3_private_path_parquet = "s3://{}/toxicity_pds/parquet".format(bucket)
print(s3_private_path_parquet)

s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet


### Copy data from github to S3 bucket

In [7]:
!aws s3 cp --recursive /home/sagemaker-user/aai540_toxicity_classification/civil $s3_private_path_parquet/ --include "*"


upload: civil/validation-00000-of-00001.parquet to s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/validation-00000-of-00001.parquet
upload: civil/test-00000-of-00001.parquet to s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/test-00000-of-00001.parquet
upload: civil/train-00000-of-00001.parquet to s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/train-00000-of-00001.parquet


### List the files

In [8]:
print(s3_private_path_parquet)

s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet


In [9]:
!aws s3 ls $s3_private_path_parquet/

                           PRE 046536196377/
                           PRE output/
                           PRE query_results/
                           PRE test/
                           PRE train/
                           PRE validation/
2025-10-09 00:41:42   34099179 test-00000-of-00001.parquet
2025-10-09 00:41:42   68844404 train-00000-of-00001.parquet
2025-10-09 00:41:42   11697541 validation-00000-of-00001.parquet


## Set up Sagemaker Feature store

In [10]:
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)

featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

### S3 Bucket set up for offline store

In [11]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
print(default_s3_bucket_name)

sagemaker-us-east-1-046536196377


In [12]:
# set up IAM role
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

arn:aws:iam::046536196377:role/LabRole


In [13]:
s3_private_path_parquet

's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet'

In [14]:
s3_private_path_parquet.replace("s3://", "").split("/", 1)

['sagemaker-us-east-1-046536196377', 'toxicity_pds/parquet']

In [15]:
!aws s3 ls $s3_private_path_parquet/

                           PRE 046536196377/
                           PRE output/
                           PRE query_results/
                           PRE test/
                           PRE train/
                           PRE validation/
2025-10-09 00:41:42   34099179 test-00000-of-00001.parquet
2025-10-09 00:41:42   68844404 train-00000-of-00001.parquet
2025-10-09 00:41:42   11697541 validation-00000-of-00001.parquet


In [16]:
# This is an offline version and not required to be executed in aws
'''
import pandas as pd
train = "civil/train-00000-of-00001.parquet"
validation = "civil/validation-00000-of-00001.parquet"
test = "civil/test-00000-of-00001.parquet"

train_data = pd.read_parquet(train)
validation_data = pd.read_parquet(validation)
test_data = pd.read_parquet(test)

print(train_data.shape)
print(validation_data.shape)
print(test_data.shape)
'''

'\nimport pandas as pd\ntrain = "civil/train-00000-of-00001.parquet"\nvalidation = "civil/validation-00000-of-00001.parquet"\ntest = "civil/test-00000-of-00001.parquet"\n\ntrain_data = pd.read_parquet(train)\nvalidation_data = pd.read_parquet(validation)\ntest_data = pd.read_parquet(test)\n\nprint(train_data.shape)\nprint(validation_data.shape)\nprint(test_data.shape)\n'

In [17]:
import boto3
import pandas as pd
import io

# Your region & private bucket
s3_client = boto3.client("s3", region_name=region)

# Split the s3 path into bucket and prefix
# Example: if $s3_private_path_csv="my-private-bucket/housing-data"
bucket_name, prefix = s3_private_path_parquet.replace("s3://", "").split("/", 1)

# Define the file keys (relative to prefix)
train_file_key = f"{prefix}/train-00000-of-00001.parquet"
val_file_key = f"{prefix}/validation-00000-of-00001.parquet"
test_file_key = f"{prefix}/test-00000-of-00001.parquet"


# Load the files
train_data_object = s3_client.get_object(Bucket=bucket_name, Key=train_file_key)
validation_data_object = s3_client.get_object(Bucket=bucket_name, Key=val_file_key)
test_data_object = s3_client.get_object(Bucket=bucket_name, Key=test_file_key)

# Convert to pandas DataFrames
train_data = pd.read_parquet(io.BytesIO(train_data_object["Body"].read()))
validation_data = pd.read_parquet(io.BytesIO(validation_data_object["Body"].read()))
test_data = pd.read_parquet(io.BytesIO(test_data_object["Body"].read()))


print("Train Data Shape:", train_data.shape)
print("Train Data Shape:", validation_data.shape)
print("Test Data Shape:", test_data.shape)



Train Data Shape: (267516, 21)
Train Data Shape: (45047, 21)
Test Data Shape: (132730, 21)


In [18]:
train_data['toxicity'].value_counts()

toxicity
0    237173
1     30343
Name: count, dtype: int64

In [19]:
# Sample 10,000 records where toxicity == 0
train_tox0 = train_data[train_data['toxicity'] == 0].sample(n=30000, random_state=42)
train_tox1 = train_data[train_data['toxicity'] == 1].sample(n=30000, random_state=42)

val_tox0 = validation_data[validation_data['toxicity'] == 0].sample(n=3000, random_state=42)
val_tox1 = validation_data[validation_data['toxicity'] == 1].sample(n=3000, random_state=42)

test_tox0 = test_data[test_data['toxicity'] == 0].sample(n=3000, random_state=42)
test_tox1 = test_data[test_data['toxicity'] == 1].sample(n=3000, random_state=42)

# Concatenate to create a balanced dataframe
train_balanced = pd.concat([train_tox0, train_tox1], ignore_index=True)
val_balanced = pd.concat([val_tox0, val_tox1], ignore_index=True)
test_balanced = pd.concat([test_tox0, test_tox1], ignore_index=True)

# Shuffle the resulting dataframe
train_balanced = train_balanced.sample(frac=1, random_state=42).reset_index(drop=True)
val_balanced = val_balanced.sample(frac=1, random_state=42).reset_index(drop=True)
test_balanced = test_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

print(train_balanced['toxicity'].value_counts())
print(val_balanced['toxicity'].value_counts())
print(test_balanced['toxicity'].value_counts())

toxicity
0    30000
1    30000
Name: count, dtype: int64
toxicity
0    3000
1    3000
Name: count, dtype: int64
toxicity
0    3000
1    3000
Name: count, dtype: int64


### Feature Engineering & Preprocessing

In [20]:
import re

TAG_RE = re.compile(r'<[^>]+>')

def re_tags(text_list): #define remove tag funtion
    return [TAG_RE.sub('', str(word)).lower() for word in text_list]

In [21]:
# Remove Accented Characters
!pip install unidecode
import unidecode
def re_accented_char(text_list):
    return [unidecode.unidecode(word.encode().decode('utf-8')) for word in text_list]

Collecting unidecode
  Using cached Unidecode-1.4.0-py3-none-any.whl.metadata (13 kB)
Using cached Unidecode-1.4.0-py3-none-any.whl (235 kB)
Installing collected packages: unidecode
Successfully installed unidecode-1.4.0


In [22]:
# Extended contractions
def ex_contractions(text_list):
    result=[]
    for word in text_list:
        # replace contracting withoutsignal
        word = word.replace("wont","won't")
        word = word.replace("cant","can't")
        word = word.replace("its","it's")
        word = word.replace("youre","you're")
        word = word.replace("hes","he's")
        word = word.replace("shes","she's")
        word = word.replace("its","it's")
        word = word.replace("weare","we're")
        word = word.replace("theyre","they're")

        # specific
        word = re.sub(r"won\'t", "will not", str(word))
        word = re.sub(r"can\'t", "can not", str(word))

        # general
        word = re.sub(r"n\'t", " not", str(word))
        word = re.sub(r"\'re", " are", str(word))
        word = re.sub(r"\'s", " is", str(word))
        word = re.sub(r"\'d", " would", str(word))
        word = re.sub(r"\'ll", " will", str(word))
        word = re.sub(r"\'t", " not", str(word))
        word = re.sub(r"\'ve", " have", str(word))
        word = re.sub(r"\'m", " am", str(word))
        result.append(word)
    return result

In [24]:
# Removing Special Characters
def re_special_chars(text_list):
    return [re.sub("[^a-zA-Z0-9]"," ",word) for word in text_list]

#### Lemmatization

In [25]:
from nltk.stem import WordNetLemmatizer

def lemmatize_text(text_list):
    wnl = WordNetLemmatizer()
    lemmatizer_sentence = []  
    tokenizer=nltk.tokenize.WhitespaceTokenizer()
    for word in tokenizer.tokenize(text_list):
        lemmatizer_sentence.append(wnl.lemmatize(word,'v'))
        lemmatizer_sentence.append(" ")
    
    return("".join(lemmatizer_sentence))

#### Removing Stop Words

In [26]:
import nltk
from nltk.corpus import stopwords

def stopwords_text(text_list):
    stop = stopwords.words('english')
    sentence_without = []
    tokenizer=nltk.tokenize.WhitespaceTokenizer()
    for word in tokenizer.tokenize(text_list):
        if word not in stop:
            sentence_without.append(word)
            sentence_without.append(" ")
            
    return("".join(sentence_without))

In [27]:
def re_whitespaces(text_list): 
    result=[]
    for word in text_list:
        word=(re.sub(r'\d','',str(word))) #remove numbers 
        word = (re.sub(r'\s+',' ', str(word))) #remove duplicates white spacces
        result.append(word)
    return result

### Pipeline

In [28]:
import nltk
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package stopwords to /home/sagemaker-
[nltk_data]     user/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/sagemaker-
[nltk_data]     user/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [29]:
# Class for regular expressions application
class ApplyRegex(BaseEstimator, TransformerMixin):
    
    def __init__(self, regex_transformers):
        self.regex_transformers = regex_transformers
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        # Applying all regex functions in the regex_transformers dictionary
        for regex_name, regex_function in self.regex_transformers.items():
            X = regex_function(X)
            
        return X

In [30]:
class StopWordsRemoval(BaseEstimator, TransformerMixin):
    
    def __init__(self, text_stopwords):
        self.text_stopwords = text_stopwords
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        return [self.text_stopwords(comment) for comment in X]

In [31]:
class LemmatizeProcess(BaseEstimator, TransformerMixin):
    
    def __init__(self, Lemmatize):
        self.Lemmatizer = Lemmatize
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        return [self.Lemmatizer(comment) for comment in X]

In [32]:
# Defining regex transformers to be applied
regex_transformers = {
    'remove_tags': re_tags,
    'remove_accents': re_accented_char,
    'decontracted': ex_contractions,
    're_sc': re_special_chars,
    'whitespaces': re_whitespaces
}

# Building a text prep pipeline
text_prep_pipeline = Pipeline([
    ('regex', ApplyRegex(regex_transformers)),
    ('stopwords', StopWordsRemoval(stopwords_text)),
    ('lemmatize', LemmatizeProcess(lemmatize_text)),
])

In [33]:
train_balanced[train_balanced.columns[2:3]]

Unnamed: 0,comment_text
0,"""And two of the candidates — fully half of thi..."
1,How did this pathetic slam against another pos...
2,It is not a religious sect! It is just a made ...
3,This is a great ball club that shapiro has put...
4,I believe the issue is one of bribery- paying ...
...,...
59995,What they are NOT entitled to is to expand the...
59996,Oliver is the funniest guy on Television at th...
59997,"Lol, well by your own reasoning, somewhere in ..."
59998,Pope Francis is an authoritarian and a tyrant....


In [34]:
train_balanced['comment_text'] = text_prep_pipeline.fit_transform(train_balanced[train_balanced.columns[2:3]].values)

In [35]:
test_balanced['comment_text'] = text_prep_pipeline.transform(test_balanced[test_balanced.columns[2:3]].values)



In [36]:
val_balanced['comment_text'] = text_prep_pipeline.transform(val_balanced[val_balanced.columns[2:3]].values)

In [37]:
val_balanced['toxicity'].value_counts()

toxicity
0    3000
1    3000
Name: count, dtype: int64

#### Define FeatureGroups¶

In [23]:
train_feature_group_name = "train-feature-group"
validation_feature_group_name = "validation-feature-group"
test_feature_group_name = "test-feature-group"

In [24]:
'''
import boto3

client = boto3.client('sagemaker')
client.delete_feature_group(FeatureGroupName=train_feature_group_name)
client.delete_feature_group(FeatureGroupName=validation_feature_group_name)
client.delete_feature_group(FeatureGroupName=test_feature_group_name)
'''

"\nimport boto3\n\nclient = boto3.client('sagemaker')\nclient.delete_feature_group(FeatureGroupName=train_feature_group_name)\nclient.delete_feature_group(FeatureGroupName=validation_feature_group_name)\nclient.delete_feature_group(FeatureGroupName=test_feature_group_name)\n"

In [25]:
from sagemaker.feature_store.feature_group import FeatureGroup

train_feature_group = FeatureGroup(name=train_feature_group_name, sagemaker_session=feature_store_session)
validation_feature_group = FeatureGroup(name=validation_feature_group_name, sagemaker_session=feature_store_session)
test_feature_group = FeatureGroup(name=test_feature_group_name, sagemaker_session=feature_store_session)


In [26]:
train_df = train_balanced[['id','comment_text','toxicity']]
val_df = val_balanced[['id','comment_text','toxicity']]
test_df = test_balanced[['id','comment_text','toxicity']]

In [27]:
import time

current_time_sec = int(round(time.time()))
record_identifier_feature_name = "id"
event_time_feature_name = "EventTime"

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")
    return data_frame


def appendEventTime(df):
    # append EventTime feature
    df[event_time_feature_name] = pd.Series(
        [current_time_sec] * len(df), dtype="float64"
    )
    return df

In [28]:
# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
train_df = cast_object_to_string(train_df)
val_df = cast_object_to_string(val_df)
test_df = cast_object_to_string(test_df)

# append event time

train_df = appendEventTime(train_df)
val_df = appendEventTime(val_df)
test_df = appendEventTime(test_df)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string"

In [29]:
# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
train_feature_group.load_feature_definitions(data_frame=train_df)
validation_feature_group.load_feature_definitions(data_frame=val_df)
test_feature_group.load_feature_definitions(data_frame=test_df)

[FeatureDefinition(feature_name='id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='comment_text', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='toxicity', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='EventTime', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None)]

In [30]:
def featureGroupExists(feature_group_name):
    print(feature_group_name)
    # SageMaker client (not session)
    sm_client = boto3.client("sagemaker", region_name=region)
    featureGroupExists = False
    try:
        response = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)
        featureGroupExists = True
        print(f"✅ Feature group exists. Status: {response['FeatureGroupStatus']}")
    except sm_client.exceptions.ResourceNotFound:
        featureGroupExists=False
        print(f"❌ {feature_group_name} does not exist.")
    return featureGroupExists

train_feature_group_exists = featureGroupExists(train_feature_group_name)
validation_feature_group_exists = featureGroupExists(validation_feature_group_name)
test_feature_group_exists = featureGroupExists(test_feature_group_name)

train-feature-group
✅ Feature group exists. Status: Created
validation-feature-group
✅ Feature group exists. Status: Created
test-feature-group
✅ Feature group exists. Status: Created


In [31]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

def create_feature_group(feature_group, fg_Exists):
    fg_Exists = featureGroupExists(feature_group.name)
    if not fg_Exists:
        feature_group.create(
            s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
            record_identifier_name=record_identifier_feature_name,
            event_time_feature_name=event_time_feature_name,
            role_arn=role,
            enable_online_store=True,
        )
        wait_for_feature_group_creation_complete(feature_group)
    else:
        print(f"{feature_group.name} already exists")

create_feature_group(train_feature_group,train_feature_group_exists)
create_feature_group(validation_feature_group, validation_feature_group_exists)
create_feature_group(test_feature_group,test_feature_group_exists)



train-feature-group
✅ Feature group exists. Status: Created
train-feature-group already exists
validation-feature-group
✅ Feature group exists. Status: Created
validation-feature-group already exists
test-feature-group
✅ Feature group exists. Status: Created
test-feature-group already exists


In [32]:
train_df

Unnamed: 0,id,comment_text,toxicity,EventTime
0,303179,"""And two of the candidates — fully half of thi...",0,1.759971e+09
1,5400711,How did this pathetic slam against another pos...,1,1.759971e+09
2,5843611,It is not a religious sect! It is just a made ...,1,1.759971e+09
3,5431675,This is a great ball club that shapiro has put...,0,1.759971e+09
4,5969175,I believe the issue is one of bribery- paying ...,0,1.759971e+09
...,...,...,...,...
59995,5795427,What they are NOT entitled to is to expand the...,1,1.759971e+09
59996,981527,Oliver is the funniest guy on Television at th...,1,1.759971e+09
59997,601614,"Lol, well by your own reasoning, somewhere in ...",0,1.759971e+09
59998,901829,Pope Francis is an authoritarian and a tyrant....,0,1.759971e+09


In [33]:
test_df

Unnamed: 0,id,comment_text,toxicity,EventTime
0,7142993,"Well, poor Harvey. If only he was a President...",0,1.759971e+09
1,7128635,Save your outrage for a topic where it actuall...,1,1.759971e+09
2,7128357,Conservatives also tend to be unfunny. Perhap...,0,1.759971e+09
3,702837,So you'd establish a precedent and have to bre...,0,1.759971e+09
4,5738083,All the knowledge in the world at your fingert...,1,1.759971e+09
...,...,...,...,...
5995,5455377,Oh FFS. Can we talk about the atrocities commi...,1,1.759971e+09
5996,418931,HAH---this newspaper is almost a bigger joke t...,1,1.759971e+09
5997,5461894,The only reason they are doing this is because...,1,1.759971e+09
5998,5318889,Christian extremist.\nWhen body slamming is ju...,1,1.759971e+09


In [49]:
train_feature_group.ingest(data_frame=train_df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='train-feature-group', feature_definitions={'id': {'FeatureName': 'id', 'FeatureType': 'Integral'}, 'comment_text': {'FeatureName': 'comment_text', 'FeatureType': 'String'}, 'toxicity': {'FeatureName': 'toxicity', 'FeatureType': 'Integral'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f609fb91af0>, sagemaker_session=<sagemaker.session.Session object at 0x7f609e8bb8c0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7f609bf44290>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

In [50]:
validation_feature_group.ingest(data_frame=val_df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='validation-feature-group', feature_definitions={'id': {'FeatureName': 'id', 'FeatureType': 'Integral'}, 'comment_text': {'FeatureName': 'comment_text', 'FeatureType': 'String'}, 'toxicity': {'FeatureName': 'toxicity', 'FeatureType': 'Integral'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f609fb91af0>, sagemaker_session=<sagemaker.session.Session object at 0x7f609e8bb8c0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7f609c2cc050>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

In [51]:
test_feature_group.ingest(data_frame=test_df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='test-feature-group', feature_definitions={'id': {'FeatureName': 'id', 'FeatureType': 'Integral'}, 'comment_text': {'FeatureName': 'comment_text', 'FeatureType': 'String'}, 'toxicity': {'FeatureName': 'toxicity', 'FeatureType': 'Integral'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f609fb91af0>, sagemaker_session=<sagemaker.session.Session object at 0x7f609e8bb8c0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7f609c2ce480>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

#### Query Feature values

In [31]:
validation_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:046536196377:feature-group/validation-feature-group',
 'FeatureGroupName': 'validation-feature-group',
 'RecordIdentifierFeatureName': 'id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'id', 'FeatureType': 'Integral'},
  {'FeatureName': 'comment_text', 'FeatureType': 'String'},
  {'FeatureName': 'toxicity', 'FeatureType': 'Integral'},
  {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}],
 'CreationTime': datetime.datetime(2025, 10, 7, 23, 42, 40, 197000, tzinfo=tzlocal()),
 'OnlineStoreConfig': {'EnableOnlineStore': True},
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet',
   'ResolvedOutputS3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/046536196377/sagemaker/us-east-1/offline-store/validation-feature-group-1759880560/data'},
  'DisableGlueTableCreation': False,
  'DataCatalogConfig': {'TableName': 'validation_fe

In [32]:
train_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:046536196377:feature-group/train-feature-group',
 'FeatureGroupName': 'train-feature-group',
 'RecordIdentifierFeatureName': 'id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'id', 'FeatureType': 'Integral'},
  {'FeatureName': 'comment_text', 'FeatureType': 'String'},
  {'FeatureName': 'toxicity', 'FeatureType': 'Integral'},
  {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}],
 'CreationTime': datetime.datetime(2025, 10, 7, 23, 42, 18, 521000, tzinfo=tzlocal()),
 'OnlineStoreConfig': {'EnableOnlineStore': True},
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet',
   'ResolvedOutputS3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/046536196377/sagemaker/us-east-1/offline-store/train-feature-group-1759880538/data'},
  'DisableGlueTableCreation': False,
  'DataCatalogConfig': {'TableName': 'train_feature_group_17598805

In [33]:
test_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:046536196377:feature-group/test-feature-group',
 'FeatureGroupName': 'test-feature-group',
 'RecordIdentifierFeatureName': 'id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'id', 'FeatureType': 'Integral'},
  {'FeatureName': 'comment_text', 'FeatureType': 'String'},
  {'FeatureName': 'toxicity', 'FeatureType': 'Integral'},
  {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}],
 'CreationTime': datetime.datetime(2025, 10, 7, 23, 43, 17, 285000, tzinfo=tzlocal()),
 'OnlineStoreConfig': {'EnableOnlineStore': True},
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet',
   'ResolvedOutputS3Uri': 's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/046536196377/sagemaker/us-east-1/offline-store/test-feature-group-1759880597/data'},
  'DisableGlueTableCreation': False,
  'DataCatalogConfig': {'TableName': 'test_feature_group_1759880597',

In [34]:
import pandas as pd
import s3fs

train_s3_resolved_uri = (
    train_feature_group.describe()
        .get("OfflineStoreConfig")
        .get('S3StorageConfig')
        .get('ResolvedOutputS3Uri')
)

train_feature_data = pd.read_parquet(train_s3_resolved_uri, storage_options={'anon': False})
print(train_feature_data.shape)
train_feature_data[['comment_text', 'toxicity']]

(60000, 11)


Unnamed: 0,comment_text,toxicity
0,iso mother silver spoon parasites,1
1,hey loser gun laws failure typical leave wing ...,1
2,realize older generation sad showcase year old...,0
3,iso ncm proceed fee educate n nmore ad hominem...,1
4,allie please call president poor fool really c...,1
...,...,...
59995,live legally pay property tax guess kid afford...,0
59996,richlynd ketah guest transgender take hormone ...,1
59997,nothe democrats far perfect dummy republicans ...,1
59998,corrupt always corrupt,1


In [35]:
val_s3_resolved_uri = (
    validation_feature_group.describe()
        .get("OfflineStoreConfig")
        .get('S3StorageConfig')
        .get('ResolvedOutputS3Uri')
)

validation_feature_data = pd.read_parquet(val_s3_resolved_uri, storage_options={'anon': False})
print(validation_feature_data.shape)
validation_feature_data[['comment_text', 'toxicity']]

(6000, 11)


Unnamed: 0,comment_text,toxicity
0,clinton cash bunch lie mastermind trump new ca...,1
1,saudi arabia travel ban n nodd dismiss fact ea...,1
2,disagree mr bruno show true respect america in...,0
3,alcohol invasive chemical us first americans a...,1
4,april st,0
...,...,...
5995,point could sa edit hell curious would guess l...,0
5996,nothat crazy harper absolute disdain terrorist...,1
5997,bad apples every profession include police exc...,0
5998,trump slip badly time nnazi evil white suprema...,1


In [36]:
test_s3_resolved_uri = (
    test_feature_group.describe()
        .get("OfflineStoreConfig")
        .get('S3StorageConfig')
        .get('ResolvedOutputS3Uri')
)

test_feature_data = pd.read_parquet(test_s3_resolved_uri, storage_options={'anon': False})
print(test_feature_data.shape)
test_feature_data[['comment_text', 'toxicity']]

(6000, 11)


Unnamed: 0,comment_text,toxicity
0,bdpapa would rather work stink drink get news ...,1
1,ishe think shoot time,1
2,difference right wrong conservatives know diff...,0
3,find amaze anchorage taxi operators ask suppor...,0
4,notice type crap little hand little device say...,1
...,...,...
5995,instead try unite country prime minister try d...,1
5996,evangelical another word interchangeable hypoc...,1
5997,year resident propose borough would expect opp...,0
5998,wonder uh president david lassner would say pr...,1


In [37]:
X_train, y_train = train_feature_data["comment_text"], train_feature_data["toxicity"]
X_val, y_val = validation_feature_data["comment_text"], validation_feature_data["toxicity"]
X_test, y_test = test_feature_data["comment_text"], test_feature_data["toxicity"]

### Model Training

In [51]:
import numpy as np
# Vectorize text
vectorizer = TfidfVectorizer(max_features=1000)
X_train_tfidf = vectorizer.fit_transform(X_train)
X_val_tfidf = vectorizer.transform(X_val)

In [38]:
# Combine features and labels
train_combined = np.hstack([y_train.values.reshape(-1, 1), X_train_tfidf.toarray()])
val_combined = np.hstack([y_val.values.reshape(-1, 1), X_val_tfidf.toarray()])


# Save to CSV (without index and header)
train_df = pd.DataFrame(train_combined)
val_df = pd.DataFrame(val_combined)
train_df.to_csv('train.csv', index=False, header=False)
val_df.to_csv('validation.csv', index=False, header=False)

# Upload to S3
s3 = boto3.client('s3')
s3.upload_file('train.csv', bucket, f"{prefix}/train/train.csv")
s3.upload_file('validation.csv', bucket, f"{prefix}/validation/validation.csv")

In [40]:
%%time
from time import gmtime, strftime

job_name = "xgb-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, job_name)
image = sagemaker.image_uris.retrieve(
    framework="xgboost", region=boto3.Session().region_name, version="1.7-1"
)

sm_estimator = sagemaker.estimator.Estimator(
    image,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=50,
    input_mode="File",
    output_path=output_location,
    sagemaker_session=sess,
)

sm_estimator.set_hyperparameters(
    objective="binary:logistic",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    verbosity=0,
    num_round=100,
    eval_metric= "auc"
)

train_vect_data = sagemaker.inputs.TrainingInput(
    "s3://{}/{}/train/".format(bucket, prefix),
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
validation_vect_data = sagemaker.inputs.TrainingInput(
    "s3://{}/{}/validation/".format(bucket, prefix),
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)

data_channels = {"train": train_vect_data, "validation": validation_vect_data}

# Start training by calling the fit method in the estimator
sm_estimator.fit(inputs=data_channels, job_name=job_name, logs=True)

INFO:sagemaker:Creating training-job with name: xgb-2025-10-08-01-20-20


2025-10-08 01:20:22 Starting - Starting the training job...
2025-10-08 01:20:54 Downloading - Downloading input data......
2025-10-08 01:21:29 Downloading - Downloading the training image...
  import pkg_resources[0m
[34m[2025-10-08 01:22:19.868 ip-10-2-204-181.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-10-08 01:22:19.935 ip-10-2-204-181.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-10-08:01:22:20:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-10-08:01:22:20:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2025-10-08:01:22:20:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2025-10-08:01:22:20:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025-10-08:01:22:20:INFO] Running XGBoost Sagemaker in algorithm mode[0m


### Real time AI Inference

In [43]:
from datetime import datetime

# S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = sagemaker.image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Image URI: 156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/datacapture
Ground truth path: s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/ground_truth_data/2025-10-09-00-51-41
Report path: s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/reports


In [None]:
from sagemaker.estimator import Estimator

job_name = "xgb-2025-10-08-01-20-20"
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, job_name)

from sagemaker.estimator import Estimator
xgb_model = Estimator.attach(job_name, sagemaker_session=sess)
print(xgb_model.model_data)  # same S3 path



#### Deploy the model with data capture enabled.

In [45]:
from sagemaker.model_monitor import DataCaptureConfig

# Deploy Model
endpoint_name = f"Toxicity-xgb-churn-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

predictor = xgb_model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.JSONDeserializer(),
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)


  endpoint_name = f"Toxicity-xgb-churn-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"


EndpointName = Toxicity-xgb-churn-model-quality-monitor-2025-10-09-0106
------!

### Create the SageMaker Predictor object from the endpoint to be used for invoking the model

In [77]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=sess, serializer=sagemaker.serializers.CSVSerializer()
)

In [59]:
test_input = test_feature_data[['comment_text','toxicity']]
test_input.head()

Unnamed: 0,comment_text,toxicity
0,bdpapa would rather work stink drink get news ...,1
1,ishe think shoot time,1
2,difference right wrong conservatives know diff...,0
3,find amaze anchorage taxi operators ask suppor...,0
4,notice type crap little hand little device say...,1


In [104]:
test_input[:500]

Unnamed: 0,comment_text,toxicity
0,bdpapa would rather work stink drink get news ...,1
1,ishe think shoot time,1
2,difference right wrong conservatives know diff...,0
3,find amaze anchorage taxi operators ask suppor...,0
4,notice type crap little hand little device say...,1
...,...,...
495,nothe people call pope francis marxist either ...,1
496,christ water teach lure young rich man back pe...,0
497,thief fraud forgerer liar horrible family memb...,1
498,isince loren believe earth thousands yrs old s...,0


In [60]:
#X_test_tfidf = vectorizer.transform(X_test)

#test_combined = np.hstack([y_test.values.reshape(-1, 1), X_test_tfidf.toarray()])

# Save to CSV (without index and header)
#test_df = pd.DataFrame(test_combined)
test_input[:500].to_csv('test_input.csv', index=False, header=False)

# Upload to S3
s3 = boto3.client('s3')
s3.upload_file('test_input.csv', bucket, f"{prefix}/test/test_input.csv")

In [57]:
test_feature_data.head()

Unnamed: 0,id,comment_text,toxicity,EventTime,write_time,api_invocation_time,is_deleted,year,month,day,hour
0,1062561,bdpapa would rather work stink drink get news ...,1,1759881000.0,2025-10-07 23:52:42.770000+00:00,2025-10-07 23:47:47+00:00,False,2025,10,7,23
1,5996417,ishe think shoot time,1,1759881000.0,2025-10-07 23:52:42.770000+00:00,2025-10-07 23:47:48+00:00,False,2025,10,7,23
2,5866571,difference right wrong conservatives know diff...,0,1759881000.0,2025-10-07 23:52:42.770000+00:00,2025-10-07 23:47:48+00:00,False,2025,10,7,23
3,7175103,find amaze anchorage taxi operators ask suppor...,0,1759881000.0,2025-10-07 23:52:42.770000+00:00,2025-10-07 23:47:49+00:00,False,2025,10,7,23
4,6023076,notice type crap little hand little device say...,1,1759881000.0,2025-10-07 23:52:42.770000+00:00,2025-10-07 23:47:49+00:00,False,2025,10,7,23


In [66]:
test_location = "s3://{}/{}/test/test_input.csv".format(bucket, prefix)
df = pd.read_csv(test_location)
df.head()

Unnamed: 0,bdpapa would rather work stink drink get news braddah motor skills less affect marijuana alcohol try see,1
0,ishe think shoot time,1
1,difference right wrong conservatives know diff...,0
2,find amaze anchorage taxi operators ask suppor...,0
3,notice type crap little hand little device say...,1
4,many time trump condemn david duke kkk white s...,1


In [68]:
test_location

's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/test/test_input.csv'

In [79]:
toxicity_cutoff = 0.75

In [83]:
def preprocess(text_list):
    tfidf_features = vectorizer.transform(text_list)
    # Convert to dense array and CSV string format (no headers)
    csv_features = ",".join(map(str, tfidf_features.toarray().flatten()))
    return csv_features

limit = 200  # Need at least 200 samples to compute standard deviations
i = 0
with open("test_predictions.csv", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    with open("test_input.csv", "r") as f:
        for row in f:
            (input_cols, label) = row.split(",", 1)
            input_texts = [input_cols]
            payload = "\n".join([preprocess([text]) for text in input_texts])
            probability = float(predictor.predict(payload))
            prediction = "1" if probability > toxicity_cutoff else "0"
            baseline_file.write(f"{probability},{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            time.sleep(0.5)
print()
print("Done!")

........................................................................................................................................................................................................
Done!


In [86]:
!head test_predictions.csv

probability,prediction,label
0.2784499526023865,0,1

0.39159175753593445,0,1

0.5909994840621948,0,0

0.24004068970680237,0,0

0.8099843859672546,1,1


### Generate a baseline for model quality performance <a id='generate-baseline'></a>

In [89]:
from sagemaker.s3 import S3Downloader, S3Uploader

baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

Baseline data uri: s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/baselining/data
Baseline results uri: s3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/baselining/results


In [90]:
baseline_dataset_uri = S3Uploader.upload(f"test_predictions.csv", baseline_data_uri)
baseline_dataset_uri

's3://sagemaker-us-east-1-046536196377/toxicity_pds/parquet/baselining/data/test_predictions.csv'

### Create a baselining job with validation dataset predictions

In [91]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [92]:
# Create the model quality monitoring object
churn_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=sess,
)

In [97]:
from datetime import datetime, timezone

# Name of the model quality baseline job
baseline_job_name = f"Toxicity-xgb-churn-model-baseline-job-{datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S")}"

In [98]:
# Execute the baseline suggestion job.
job = churn_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
)
job.wait(logs=False)

INFO:sagemaker:Creating processing-job with name Toxicity-xgb-churn-model-baseline-job-2025-10-09-02-19-29


...........................................................!

#### Explore the results of the baselining job

In [99]:
baseline_job = churn_model_quality_monitor.latest_baselining_job

#### View the metrics generated

In [101]:
binary_metrics = baseline_job.baseline_statistics().body_dict["binary_classification_metrics"]
pd.json_normalize(binary_metrics).T

Unnamed: 0,0
confusion_matrix.0.0,101
confusion_matrix.0.1,4
confusion_matrix.1.0,68
confusion_matrix.1.1,28
recall.value,0.291667
recall.standard_deviation,0.023252
precision.value,0.875
precision.standard_deviation,0.024915
accuracy.value,0.641791
accuracy.standard_deviation,0.019236


#### View the constraints generated

In [102]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

Unnamed: 0,threshold,comparison_operator
recall,0.291667,LessThanThreshold
precision,0.875,LessThanThreshold
accuracy,0.641791,LessThanThreshold
true_positive_rate,0.291667,LessThanThreshold
true_negative_rate,0.961905,LessThanThreshold
false_positive_rate,0.038095,GreaterThanThreshold
false_negative_rate,0.708333,GreaterThanThreshold
auc,0.829563,LessThanThreshold
f0_5,0.625,LessThanThreshold
f1,0.4375,LessThanThreshold


### Setup continuous model monitoring to identify model quality drift <a id='analyze-model-quality-drift'></a>

#### Generate prediction data for Model Quality  Monitoring

In [None]:
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, "test-predictions.csv")
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()

#### View captured data

In [None]:
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

In [None]:
print("\n".join(capture_file[-3:-1]))

In [None]:
import re


def get_csv_output_from_s3(s3uri, batch_file):
    file_name = "{}.out".format(batch_file)
    match = re.match("s3://([^/]+)/(.*)", "{}/{}".format(s3uri, file_name))
    output_bucket, output_prefix = match.group(1), match.group(2)
    s3.download_file(output_bucket, output_prefix, file_name)
    return pd.read_csv(file_name, sep=",", header=None)

In [43]:
def trainmodel(pipeline):
    # Train
    pipeline.fit(X_train, y_train)

    # Predict and evaluate
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    return y_pred, accuracy
    

In [44]:
# Build pipeline
pipeline_svc = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('svm', LinearSVC())
])

# Train and Predict
y_pred, acc_score = trainmodel(pipeline_svc) 

# Display score
print('Accuracy:', acc_score)

Accuracy: 0.8121666666666667


In [77]:
# Build pipeline
pipeline_nb = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('nb', MultinomialNB())
])

# Train and Predict
y_pred, acc_score = trainmodel(pipeline_nb) 

# Display score
print('Accuracy:', acc_score)

In [46]:
# Build pipeline
pipeline_rf = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('rf', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Train and Predict
y_pred, acc_score = trainmodel(pipeline_rf) 

# Display score
print('Accuracy:', acc_score)

Accuracy: 0.795


In [47]:
# Build pipeline
pipeline_xgb = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('rf', xgb.XGBClassifier(objective='multi:softmax', num_class=y_train.nunique(), use_label_encoder=False, eval_metric='mlogloss'))
])

# Train and Predict
y_pred, acc_score = trainmodel(pipeline_xgb) 

# Display score
print('Accuracy:', acc_score)

Accuracy: 0.8018333333333333


### Release Resources

In [37]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [1]:

%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>