In [1]:
import os

import pandas as pd
import numpy as np

import boto3
import sagemaker
import joblib

from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import seaborn as sns

from sagemaker.amazon.amazon_estimator import RecordSet # could be used if data fits in mem
import io
import sagemaker.amazon.common as smac

pd.set_option('display.max_columns', None)

In [2]:
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = "wyatt-datalake"
prefix = "project-1"
input_data = "s3://wyatt-datalake/data/terraform-aws-project-1/vehicles.csv"
# BASE_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = "/home/ec2-user/SageMaker/terraform-aws-project-1/notebooks/"
output_location = 's3://{}/{}/output'.format(bucket, prefix)

In [3]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=input_data,  # Change this to point to the s3 location of your raw input data.
)

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{prefix}/sklearn_preprocessor",
    sagemaker_session=sagemaker_session,
    role=role,
)

step_process = ProcessingStep(
    name=prefix + "-ProcessingStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code=os.path.join(BASE_DIR, "sklearn_preprocess.py"),
    job_arguments=["--input-data", "/opt/ml/processing/input/vehicles.csv"],
)

In [4]:
# TRAINING STEP
from sagemaker.image_uris import retrieve

ll_image = retrieve("linear-learner", boto3.Session().region_name)

ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role,
    instance_count=1,
    instance_type="ml.m4.2xlarge",
    volume_size=20,
    max_run=3600,
    input_mode="Pipe",
    output_path=output_location,
    sagemaker_session=sagemaker_session,
)

ll_estimator.set_hyperparameters(predictor_type="regressor", mini_batch_size=32)
# ll_estimator.fit(inputs={"train": s3_train_data}, logs=True)

In [5]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name=prefix + '-TrainingStep',
    estimator=ll_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="Pipe"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="Pipe"
        )
    },
)

In [6]:
pipeline = Pipeline(
    name=prefix + '-Pipeline',
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train],
    sagemaker_session=sagemaker_session,
)

In [7]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:761551243560:pipeline/project-1-pipeline',
 'ResponseMetadata': {'RequestId': 'f8a117a6-53cf-42cb-b5b0-37980696d1cf',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f8a117a6-53cf-42cb-b5b0-37980696d1cf',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Fri, 29 Oct 2021 19:09:14 GMT'},
  'RetryAttempts': 0}}

In [8]:
execution = pipeline.start()

In [9]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:761551243560:pipeline/project-1-pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-2:761551243560:pipeline/project-1-pipeline/execution/n4qdz94iibt7',
 'PipelineExecutionDisplayName': 'execution-1635534555176',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 10, 29, 19, 9, 15, 113000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 10, 29, 19, 9, 15, 113000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'd1d77a96-f3db-47e4-a368-475678be6d93',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd1d77a96-f3db-47e4-a368-475678be6d93',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '401',
   'date': 'Fri, 29 Oct 2021 19:09:14 GMT'},
  'RetryAttempts': 0}}

In [10]:
execution.list_steps()

[]

In [11]:
#pipeline.delete()