In [2]:
pip install --upgrade sagemaker

[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = "sagemaker-us-west-2-557980067422"
model_package_group_name = f"Stock"

In [4]:
!mkdir -p data
local_path = "data/dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(default_bucket).download_file(
    "dataset/dataset.csv",
    local_path
)

input_data_uri = "s3://sagemaker-us-west-2-557980067422/dataset/dataset.csv"
inference_data_uri = "s3://sagemaker-us-west-2-557980067422/dataset/inference.csv"
inference_data_output_uri = "s3://sagemaker-us-west-2-557980067422/stock_out"

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
inference_data = ParameterString(
    name="InferenceData",
    default_value=inference_data_uri,
)

In [6]:
from sagemaker.workflow.steps import CacheConfig
cache_config = CacheConfig(enable_caching=True, expire_after="PT3D")

In [7]:
!mkdir -p code

In [8]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/dataset.csv",
        index_col=0,
    )
    if 'y' in df:
        df = df.drop('y', axis=1)
        
    df['date'] = [i.split('$')[0] for i in df.index]
    dates = sorted(set(df['date']))
    
    train_dates, validation_dates, test_dates = np.split(dates, [len(dates) - 2 * 7, len(dates) - 7])
    
    train = df[df["date"].isin(train_dates)].drop('date', axis=1)
    validation = df[df["date"].isin(validation_dates)].drop('date', axis=1)
    test = df[df["date"].isin(test_dates)].drop('date', axis=1)

    
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting code/preprocessing.py


In [9]:
!mkdir -p code

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-stock-process",
    role=role
)



In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="StockProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="code/preprocessing.py"
)

In [12]:
model_path = f"s3://{default_bucket}/StockTrain"

In [16]:
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=500,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)



In [17]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="StockTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

In [27]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import classification_report

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)
    y_pred = 1*(predictions>0.5)

    report_dict = metrics.classification_report(y_test, y_pred, labels=[1], output_dict=True)['1']

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


In [28]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-stock-eval",
    role=role,
)

In [29]:
step_eval = ProcessingStep(
    name="StockEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py"
)

In [30]:
%%writefile code/inference.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import os


from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/input/inference.csv"
    
    df = pd.read_csv(test_path, index_col=0)
    
    X_test = xgboost.DMatrix(df.values)
    predictions = model.predict(X_test)
    
    df.insert(0, "prediction", predictions)
    
    output_dir = "/opt/ml/processing/output"
    os.makedirs(output_dir, exist_ok=True)
    
    output_path = f"{output_dir}/inference.csv"
    df.to_csv(output_path, index=True)
    print("save ......")

Overwriting code/inference.py


In [31]:
from sagemaker.processing import ScriptProcessor


script_inference = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-stock-inference",
    role=role,
)

In [23]:
step_inference = ProcessingStep(
    name="StockInference",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=inference_data, 
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output", destination=inference_data_output_uri),
    ],
    code="code/inference.py"
)

In [24]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"StockPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        inference_data,
    ],
    steps=[step_process, step_train, step_eval, step_inference],
)

In [25]:
import json

json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-2-557980067422/dataset/dataset.csv'},
  {'Name': 'InferenceData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-2-557980067422/dataset/inference.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'StockProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'}

In [26]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:557980067422:pipeline/stockpipeline',
 'ResponseMetadata': {'RequestId': '6d15cd04-df13-4a21-81e6-240336edd387',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6d15cd04-df13-4a21-81e6-240336edd387',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '81',
   'date': 'Tue, 26 Jul 2022 07:14:01 GMT'},
  'RetryAttempts': 0}}