Set up the environment

In [2]:
import boto3
import sagemaker
import sagemaker.session
import pandas as pd

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"HateSpeechClassifier"

Download the dataset

In [3]:
!mkdir -p data
local_path = "data/labeled-data.csv"
#local_data = pd.read_csv()
#local_data.to_csv(local_path, index=False)
#del local_data
s3 = boto3.resource("s3")
#s3.Bucket("sagemaker-us-east-1-423321750767").download_file(
#    "hatespeech/labeled_data.csv",
#    local_path
#)


base_uri = f"s3://{default_bucket}/hatespeech"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-423321750767/hatespeech/labeled-data.csv


Define pipeline parameters

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

Create an instance of SklearnProcessor to pass in the processing step

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.t3.large",
    instance_count=processing_instance_count,
    base_job_name="sklearn-hatespeech-process",
    role=role,
)

In [6]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-423321750767/hatespeech/labeled-data.csv')

Create a processing step

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="HatespeechProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
        ProcessingInput(source='hatespeech/mypackage/', destination="/opt/ml/processing/input/code/my_package")
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="hatespeech/preprocessing.py",
)

Define the training step

In [8]:
from sagemaker.estimator import Estimator


#Specify a path where the models will save
model_path = f"s3://{default_bucket}/HatespeechTrain"

#Configure an estimator for the XGBoost algorithm
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="multi:softprob",
    num_class=9,
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [9]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="HatespeechTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

Define a processing step for evaluation

In [10]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.t3.xlarge",
    instance_count=1,
    base_job_name="script-hatespeech-eval",
    role=role,
)


In [11]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="HatespeechEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="hatespeech/evaluation.py",
    property_files=[evaluation_report],
)

Define a RegisterModel step to register the model

In [12]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="HatespeechRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.t2.xlarge"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

Define a condition step to verify model accuracy

In [13]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.weighted_f1.value"
    ),
    right=0.5
)

step_cond = ConditionStep(
    name="HatespeechF1Cond",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[], 
)

Create a pipeline

In [14]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"HatespeechPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [15]:
import json

#json.loads(pipeline.definition())

In [16]:
#Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:423321750767:pipeline/hatespeechpipeline',
 'ResponseMetadata': {'RequestId': '4ecb7692-cec7-4a62-8be3-d9d08c6dc2fb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4ecb7692-cec7-4a62-8be3-d9d08c6dc2fb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Thu, 16 Feb 2023 16:01:47 GMT'},
  'RetryAttempts': 0}}

In [17]:
#Start the pipeline
execution = pipeline.start()

In [18]:
#Examine the execution
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:423321750767:pipeline/hatespeechpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:423321750767:pipeline/hatespeechpipeline/execution/bc9ot0p5liks',
 'PipelineExecutionDisplayName': 'execution-1676563308216',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2023, 2, 16, 16, 1, 48, 99000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 2, 16, 16, 1, 48, 99000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:423321750767:user-profile/d-llegc57vyrs7/default-1676476367773',
  'UserProfileName': 'default-1676476367773',
  'DomainId': 'd-llegc57vyrs7'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:423321750767:user-profile/d-llegc57vyrs7/default-1676476367773',
  'UserProfileName': 'default-1676476367773',
  'DomainId': 'd-llegc57vyrs7'},
 'ResponseMetadata': {'RequestId': '24ba5271-6cfb-4529-9704-f9533df75764',
  'HTTPStatusCode': 200,


In [19]:
execution.wait()


In [20]:
execution.list_steps()

[{'StepName': 'HatespeechRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2023, 2, 16, 16, 19, 46, 507000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 2, 16, 16, 19, 47, 359000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:423321750767:model-package/hatespeechclassifier/1'}}},
 {'StepName': 'HatespeechF1Cond',
  'StartTime': datetime.datetime(2023, 2, 16, 16, 19, 45, 47000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 2, 16, 16, 19, 45, 742000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'HatespeechEval',
  'StartTime': datetime.datetime(2023, 2, 16, 16, 14, 57, 65000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 2, 16, 16, 19, 44, 441000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sag

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import f1_score



model_path = f"s3://sagemaker-us-east-1-423321750767/"
#s3.Bucket("sagemaker-us-east-1-423321750767").download_file(
#    "HatespeechTrain/pipelines-8ngzzbgz4o7g-HatespeechTrain-jMXTcCzMcM/output/model.tar.gz",
#    'models'
#)

s3.Bucket("sagemaker-us-east-1-423321750767").download_file(
"HatespeechPipeline/8ngzzbgz4o7g/HatespeechProcess/output/test/test.csv",
    './data/test.csv'
)

with tarfile.open('models') as tar:
    tar.extractall(path=".")

model = pickle.load(open("xgboost-model", "rb"))

test_path = "./data/test.csv"
df = pd.read_csv(test_path, header=None)

y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)

X_test = xgboost.DMatrix(df.values)

predictions = model.predict(X_test)
predictions = np.argmax(predictions,1).reshape((-1,1))

f1 = f1_score(y_test, predictions, average='macro')
std = np.std(y_test - predictions)
report_dict = {
    "multiclass_classification_metrics": {
        "weighted_f1": {
            "value": f1,
            "standard_deviation": std
        },
    },
}