## Predictive Maintenance Pipeline

In [2]:
import json
import time
import boto3
import string
import sagemaker
import pandas as pd
import numpy as np
import awswrangler as wr

from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.workflow.pipeline_context import PipelineSession

In [None]:
# Get the region name
region = sagemaker.Session().boto_region_name
print("Using AWS Region: {}".format(region))

In [4]:
# Setup required sessions
boto3.setup_default_session(region_name = region)
boto_session = boto3.Session(region_name = region)

s3_client = boto3.client("s3", region_name = region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session = boto_session, sagemaker_client = sagemaker_boto_client
)
account_id = boto3.client("sts").get_caller_identity()["Account"]
sagemaker_role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

In [None]:
sagemaker.get_execution_role()

In [6]:
# S3 bucket where all the data will be stored
bucket = "BUCKET-NAME"
prefix = "mlops/predictive-maintenance"

### Upload raw data to S3

In [13]:
s3_client.upload_file(
    Filename="datasets/PdM_telemetry.csv", Bucket=bucket, Key=f"{prefix}/data/raw/PdM_telemetry.csv"
)
s3_client.upload_file(
    Filename="datasets/PdM_errors.csv", Bucket=bucket, Key=f"{prefix}/data/raw/PdM_errors.csv"
)
s3_client.upload_file(
    Filename="datasets/PdM_maint.csv", Bucket=bucket, Key=f"{prefix}/data/raw/PdM_maint.csv"
)
s3_client.upload_file(
    Filename="datasets/PdM_failures.csv", Bucket=bucket, Key=f"{prefix}/data/raw/PdM_failures.csv"
)
s3_client.upload_file(
    Filename="datasets/PdM_machines.csv", Bucket=bucket, Key=f"{prefix}/data/raw/PdM_machines.csv"
)

### Define Parameters to Parametrize Pipeline Execution

In [7]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

In [8]:
processing_instance_count = ParameterInteger(name = "ProcessingInstanceCount", default_value = 1)
instance_type = ParameterString(name = "TrainingInstanceType", default_value = "ml.m5.xlarge")

model_approval_status = ParameterString(
    name = "ModelApprovalStatus", default_value = "PendingManualApproval"
)

### Define a Processing Step for Feature Engineering

### Preprocessing Script

In [9]:
%%writefile scripts/preprocessing.py
import numpy as np
import pandas as pd
import boto3
from io import StringIO
import awswrangler as wr

base_dir = "/opt/ml/processing"
bucket = "BUCKET-NAME"
prefix = "mlops/predictive-maintenance"

# Upload the preprocessed data to S3
def upload_file_s3(df, name):
    boto3.setup_default_session(region_name = "us-east-1")
    s3_client = boto3.client("s3", region_name = "us-east-1")
    with StringIO() as csv_buffer:
        df.to_csv(csv_buffer, index = False)

        response = s3_client.put_object(
            Bucket = bucket, Key = f"{prefix}/data/preprocessed/{name}.csv", Body = csv_buffer.getvalue()
        )
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

        if status == 200:
            print(f"Successful S3 put_object response. Status - {status}")
        else:
            print(f"Unsuccessful S3 put_object response. Status - {status}")

# Convert to datetime datatype
def datetime_datatype(df):
    print("Converting to type datetime")
    df['datetime'] = pd.to_datetime(df['datetime'], format="%Y-%m-%d %H:%M:%S")
    return df


# Convert to category datatype
def category_datatype(df, column_name):
    print("Converting to type category")
    df[column_name] = df[column_name].astype('category')
    return df


# Lag Features from Telemetry
def telemetry_features(df):
    df = datetime_datatype(df)
    # Calculate mean values for telemetry features -- 3 hours rolling window
    print("Calculate mean values for telemetry features -- 3 hours rolling window")
    temp = []
    fields = ['volt', 'rotate', 'pressure', 'vibration']
    for col in fields:
        temp.append(pd.pivot_table(df,
                                   index = 'datetime',
                                   columns = 'machineID',
                                   values = col).resample('3H', closed = 'left', label = 'right').mean().unstack())
    telemetry_mean_3h = pd.concat(temp, axis = 1)
    telemetry_mean_3h.columns = [i + 'mean_3h' for i in fields]
    telemetry_mean_3h.reset_index(inplace = True)

    # repeat for standard deviation
    print("Calculate standard deviation for telemetry features -- 3 hours rolling window")
    temp = []
    for col in fields:
        temp.append(pd.pivot_table(df,
                                   index = 'datetime',
                                   columns = 'machineID',
                                   values = col).resample('3H', closed = 'left', label = 'right').std().unstack())
    telemetry_sd_3h = pd.concat(temp, axis = 1)
    telemetry_sd_3h.columns = [i + 'sd_3h' for i in fields]
    telemetry_sd_3h.reset_index(inplace = True)
    
    # Calculate mean values for telemetry features -- 24 hours rolling window
    print("Calculate mean values for telemetry features -- 24 hours rolling window")
    temp = []
    fields = ['volt', 'rotate', 'pressure', 'vibration']
    for col in fields:
        temp.append(pd.pivot_table(df,
                                   index = 'datetime',
                                   columns = 'machineID',
                                   values = col)
                    .resample('3H', closed = 'left', label = 'right')
                    .first()
                    .unstack()
                    .rolling(window = 24, center = False).mean())
    telemetry_mean_24h = pd.concat(temp, axis = 1)
    telemetry_mean_24h.columns = [i + 'mean_24h' for i in fields]
    telemetry_mean_24h.reset_index(inplace = True)
    telemetry_mean_24h = telemetry_mean_24h.loc[-telemetry_mean_24h['voltmean_24h'].isnull()]

    # repeat for standard deviation
    print("Calculate standard deviation for telemetry features -- 24 hours rolling window")
    temp = []
    fields = ['volt', 'rotate', 'pressure', 'vibration']
    for col in fields:
        temp.append(pd.pivot_table(df,
                                   index = 'datetime',
                                   columns = 'machineID',
                                   values = col)
                    .resample('3H', closed='left', label='right')
                    .first()
                    .unstack()
                    .rolling(window = 24, center = False).std())
    telemetry_sd_24h = pd.concat(temp, axis = 1)
    telemetry_sd_24h.columns = [i + 'sd_24h' for i in fields]
    telemetry_sd_24h = telemetry_sd_24h.loc[-telemetry_sd_24h['voltsd_24h'].isnull()]
    telemetry_sd_24h.reset_index(inplace = True)
    
    telemetry_feat = pd.concat([telemetry_mean_3h,
                            telemetry_sd_3h.iloc[:, 2:6],
                            telemetry_mean_24h.iloc[:, 2:6],
                            telemetry_sd_24h.iloc[:, 2:6]], axis = 1).dropna()

    upload_file_s3(telemetry_feat, "telemetry")
    
    return telemetry_feat


# Lag Features for Errors
def errors_lag_features(df):
    df = datetime_datatype(df)
    df = category_datatype(df, 'errorID')
    print("Lag features for errors")
    error_count = pd.get_dummies(df.set_index('datetime')).reset_index()
    error_count.columns = ['datetime', 'machineID', 'error1', 'error2', 'error3', 'error4', 'error5']
    error_count = error_count.groupby(['machineID', 'datetime']).sum().reset_index()
    error_count = telemetry[['datetime', 'machineID']].merge(error_count, on = ['machineID', 'datetime'], how = 'left').fillna(0.0)
    temp = []
    fields = ['error%d' % i for i in range(1, 6)]
    for col in fields:
        temp.append(pd.pivot_table(error_count,
                                   index = 'datetime',
                                   columns = 'machineID',
                                   values = col)
                    .resample('3H', closed='left', label='right')
                    .first()
                    .unstack()
                    .rolling(window = 24, center = False).sum())
    error_count = pd.concat(temp, axis = 1)
    error_count.columns = [i + 'count' for i in fields]
    error_count.reset_index(inplace = True)
    error_count = error_count.dropna()
    
    upload_file_s3(error_count, "errors")
    
    return error_count


# Maintenance Features
def maintenance_features(df):
    df = datetime_datatype(df)
    df = category_datatype(df, 'comp')
    print("Maintenance Features -- Days since last replacement")
    comp_rep = pd.get_dummies(df.set_index('datetime')).reset_index()
    comp_rep.columns = ['datetime', 'machineID', 'comp1', 'comp2', 'comp3', 'comp4']

    # combine repairs for a given machine in a given hour
    comp_rep = comp_rep.groupby(['machineID', 'datetime']).sum().reset_index()

    # add timepoints where no components were replaced
    comp_rep = telemetry[['datetime', 'machineID']].merge(comp_rep,
                                                          on=['datetime', 'machineID'],
                                                          how='outer').fillna(0).sort_values(by=['machineID', 'datetime'])
    components = ['comp1', 'comp2', 'comp3', 'comp4']
    for comp in components:
        comp_rep.loc[comp_rep[comp] < 1, comp] = None
        comp_rep.loc[-comp_rep[comp].isnull(),
                     comp] = comp_rep.loc[-comp_rep[comp].isnull(), 'datetime']
        comp_rep[comp] = comp_rep[comp].fillna(method = 'ffill')

    comp_rep = comp_rep.loc[comp_rep['datetime'] > pd.to_datetime('2015-01-01')]
    for comp in components:
        comp_rep[comp] = (comp_rep["datetime"] - pd.to_datetime(comp_rep[comp])) / np.timedelta64(1, "D")
        
    upload_file_s3(comp_rep, "maint")
    
    return comp_rep


# Failures Features
def failure_features(df):
    print("Failure features")
    df = datetime_datatype(df)
    df = category_datatype(df, 'failure')
    upload_file_s3(df, "failures")
    return df


# Final Features
def final_features(telemetry_df, errors_df, maint_df, machines_df):
    upload_file_s3(machines_df, "machines")
    print("Final features")
    final_feat = telemetry_df.merge(errors_df, on = ['datetime', 'machineID'], how = 'left')
    final_feat = final_feat.merge(maint_df, on = ['datetime', 'machineID'], how = 'left')
    final_feat = final_feat.merge(machines_df, on = ['machineID'], how = 'left')
    return final_feat


# Label Construction
def label_construct(tele_df, error_df, maint_df, machine_df, failure_df):
    print("----- Final Features -----")
    final_feat = final_features(tele_df, error_df, maint_df, machine_df)
    
    print("----- Label Construction -----")
    labeled_features = pd.DataFrame()
    labeled_features = final_feat.merge(
        failure_df, on = ['datetime', 'machineID'], how = 'left')
    labeled_features['failure'] = labeled_features['failure'].astype(str)
    labeled_features['failure'] = labeled_features['failure'].fillna(method = 'bfill', limit = 7)
    labeled_features['failure'] = labeled_features['failure'].replace('nan', 'none')
    print("----- Preprocessing completed -----")
    
    upload_file_s3(labeled_features, "preprocessed")
#     pd.DataFrame(labeled_features).to_csv(f"{base_dir}/preprocessed/final_data.csv", index = False)


if __name__ == "__main__":

    telemetry_data_uri = f"s3://{bucket}/{prefix}/data/raw/PdM_telemetry.csv"
    errors_data_uri = f"s3://{bucket}/{prefix}/data/raw/PdM_errors.csv"
    maint_data_uri = f"s3://{bucket}/{prefix}/data/raw/PdM_maint.csv"
    failures_data_uri = f"s3://{bucket}/{prefix}/data/raw/PdM_failures.csv"
    machines_data_uri = f"s3://{bucket}/{prefix}/data/raw/PdM_machines.csv"
    
    telemetry = wr.s3.read_csv(telemetry_data_uri)
    errors = wr.s3.read_csv(errors_data_uri)
    maint = wr.s3.read_csv(maint_data_uri)
    failures = wr.s3.read_csv(failures_data_uri)
    machines = wr.s3.read_csv(machines_data_uri)
    
    telemetry_df = telemetry_features(telemetry)
    errors_df = errors_lag_features(errors)
    maint_df = maintenance_features(maint)
    failures_df = failure_features(failures)
    machines_df = category_datatype(machines, 'model')
    
    label_construct(telemetry_df, errors_df, maint_df, machines_df, failures_df)

Overwriting scripts/preprocessing.py


### Feature Store Creation Script

In [10]:
%%writefile scripts/featurestore.py
import numpy as np
import pandas as pd
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import awswrangler as wr

base_dir = "/opt/ml/processing"
bucket = "BUCKET-NAME"
prefix = "mlops/predictive-maintenance"

boto_session = boto3.Session(region_name = "us-east-1")
sagemaker_boto_client = boto_session.client("sagemaker")
featurestore_runtime = boto_session.client(
    service_name = "sagemaker-featurestore-runtime", region_name = "us-east-1"
)
try:
    sagemaker_role = sagemaker.get_execution_role()
    print(f"Sagemaker Role for Feature Store file: {sagemaker_role}")
except ValueError:
    sagemaker_role = 'SAGEMAKER-ROLE'
    
feature_store_session = sagemaker.Session(
    boto_session = boto_session,
    sagemaker_client = sagemaker_boto_client,
    sagemaker_featurestore_runtime_client = featurestore_runtime,
)

# ------------------------------------ Read Data
telemetry_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/telemetry.csv"
errors_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/errors.csv"
maint_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/maint.csv"
failures_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/failures.csv"
machines_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/machines.csv"

telemetry = wr.s3.read_csv(telemetry_data_uri)
errors = wr.s3.read_csv(errors_data_uri)
maint = wr.s3.read_csv(maint_data_uri)
failures = wr.s3.read_csv(failures_data_uri)
machines = wr.s3.read_csv(machines_data_uri)

# ------------------------------------ Add Timestamp
telemetry['event_time'] = pd.to_datetime("now").timestamp()
errors['event_time'] = pd.to_datetime("now").timestamp()
maint['event_time'] = pd.to_datetime("now").timestamp()
failures['event_time'] = pd.to_datetime("now").timestamp()
machines['event_time'] = pd.to_datetime("now").timestamp()

# ------------------------------------ Create Feature Group
telemetry_feature_group = FeatureGroup(name = 'telemetry_fg', sagemaker_session = feature_store_session)
errors_feature_group = FeatureGroup(name = 'errors_fg', sagemaker_session = feature_store_session)
maintenance_feature_group = FeatureGroup(name = 'maintenance_fg', sagemaker_session = feature_store_session)
failures_feature_group = FeatureGroup(name = 'failures_fg', sagemaker_session = feature_store_session)
machines_feature_group = FeatureGroup(name = 'machines_fg', sagemaker_session = feature_store_session)

# ------------------------------------ Loading Definitions
telemetry_feature_group.load_feature_definitions(data_frame = telemetry)
errors_feature_group.load_feature_definitions(data_frame = errors)
maintenance_feature_group.load_feature_definitions(data_frame = maint)
failures_feature_group.load_feature_definitions(data_frame = failures)
machines_feature_group.load_feature_definitions(data_frame = machines)

record_identifier_feature_name = "machineID"
event_time_feature_name = "event_time"

# ------------------------------------ Telemetry Feature Store
try:
    telemetry_feature_group.create(
        s3_uri = f"s3://{bucket}/{prefix}/feature_store_data",
        record_identifier_name = record_identifier_feature_name,
        event_time_feature_name = event_time_feature_name,
        role_arn = sagemaker_role,
        enable_online_store = True,
    )
    time.sleep(30)
    print(f'Create "telemetry" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print("Using existing feature group")
    else:
        raise (e)

# ------------------------------------ Errors Feature Store      
try:
    errors_feature_group.create(
        s3_uri = f"s3://{bucket}/{prefix}/feature_store_data",
        record_identifier_name = record_identifier_feature_name,
        event_time_feature_name = event_time_feature_name,
        role_arn = sagemaker_role,
        enable_online_store = True,
    )
    time.sleep(30)
    print(f'Create "errors" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print("Using existing feature group")
    else:
        raise (e)

# ------------------------------------ Maintenance Feature Store
try:
    maintenance_feature_group.create(
        s3_uri = f"s3://{bucket}/{prefix}/feature_store_data",
        record_identifier_name = record_identifier_feature_name,
        event_time_feature_name = event_time_feature_name,
        role_arn = sagemaker_role,
        enable_online_store = True,
    )
    time.sleep(30)
    print(f'Create "maintenance" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print("Using existing feature group")
    else:
        raise (e)

# ------------------------------------ Failures Feature Store
try:
    failures_feature_group.create(
        s3_uri = f"s3://{bucket}/{prefix}/feature_store_data",
        record_identifier_name = record_identifier_feature_name,
        event_time_feature_name = event_time_feature_name,
        role_arn = sagemaker_role,
        enable_online_store = True,
    )
    time.sleep(30)
    print(f'Create "failures" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print("Using existing feature group")
    else:
        raise (e)

# ------------------------------------ Machines Feature Store
try:
    machines_feature_group.create(
        s3_uri = f"s3://{bucket}/{prefix}/feature_store_data",
        record_identifier_name = record_identifier_feature_name,
        event_time_feature_name = event_time_feature_name,
        role_arn = sagemaker_role,
        enable_online_store = True,
    )
    time.sleep(30)
    print(f'Create "machines" feature group: SUCCESS')
except Exception as e:
    code = e.response.get("Error").get("Code")
    if code == "ResourceInUse":
        print("Using existing feature group")
    else:
        raise (e)

# ------------------------------------ Ingesting Data
while (
    telemetry_feature_group.describe()['FeatureGroupStatus'] == 'Creating'):
    print("Feature Group Creating")
    time.sleep(60)
else:
    print("Feature Group Created")
    ## Below code needs to run only once to ingest the data into feature store
    #---------------------------------------------------------------
#     telemetry_feature_group.ingest(data_frame = telemetry, max_workers = 3, wait = True)
#     errors_feature_group.ingest(data_frame = errors, max_workers = 3, wait = True)
#     maintenance_feature_group.ingest(data_frame = maint, max_workers = 3, wait = True)
#     failures_feature_group.ingest(data_frame = failures, max_workers = 3, wait = True)
#     machines_feature_group.ingest(data_frame = machines, max_workers = 3, wait = True)
#     print("Feature Data Ingested")
    #---------------------------------------------------------------

Overwriting scripts/featurestore.py


### Train Test Split Script

In [11]:
%%writefile scripts/train_test_split_data.py
import pandas as pd
import boto3
from io import StringIO
import awswrangler as wr

base_dir = "/opt/ml/processing"
bucket = "BUCKET-NAME"
prefix = "mlops/predictive-maintenance"

def upload_file_s3(df, name):
    boto3.setup_default_session(region_name = "us-east-1")
    s3_client = boto3.client("s3", region_name = "us-east-1")
    with StringIO() as csv_buffer:
        df.to_csv(csv_buffer, index = False)

        response = s3_client.put_object(
            Bucket = bucket, Key = f"{prefix}/data/train-test/{name}.csv", Body = csv_buffer.getvalue()
        )
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

        if status == 200:
            print(f"Successful S3 put_object response. Status - {status}")
        else:
            print(f"Unsuccessful S3 put_object response. Status - {status}")

def train_test_split_script(labeled_features):
    threshold_dates = [[pd.to_datetime('2015-07-31 01:00:00'), pd.to_datetime('2015-08-01 01:00:00')],
                   [pd.to_datetime('2015-08-31 01:00:00'), pd.to_datetime('2015-09-01 01:00:00')],
                   [pd.to_datetime('2015-09-30 01:00:00'), pd.to_datetime('2015-10-01 01:00:00')]]
    
    for last_train_date, first_test_date in threshold_dates:
        # split out training and test data
        print(labeled_features['datetime'][0])
        train_y = labeled_features.loc[labeled_features['datetime'] < last_train_date, 'failure']
        train_data = pd.get_dummies(labeled_features.loc[labeled_features['datetime'] < last_train_date].drop(['datetime',
                                                                                                            'machineID',
                                                                                                              'failure'], axis = 1))
        test_y = labeled_features.loc[labeled_features['datetime'] > last_train_date, 'failure']
        test_data = pd.get_dummies(labeled_features.loc[labeled_features['datetime'] > first_test_date].drop(['datetime',
                                                                                                           'machineID',
                                                                                                             'failure'], axis = 1))
    
    train_data['failure'] = train_y
    test_data['failure'] = test_y
    
    upload_file_s3(train_data, "train")
    upload_file_s3(test_data, "test")

    pd.DataFrame(train_data).to_csv(f"{base_dir}/train/train.csv", index = False)
    pd.DataFrame(test_data).to_csv(f"{base_dir}/test/test.csv", index = False)
    
if __name__ == "__main__":
    final_data_uri = f"s3://{bucket}/{prefix}/data/preprocessed/preprocessed.csv"
    final_data = wr.s3.read_csv(final_data_uri)
    final_data['datetime'] = pd.to_datetime(final_data['datetime'], format="%Y-%m-%d %H:%M:%S")
    train_test_split_script(final_data)

Overwriting scripts/train_test_split_data.py


### Create an instance of a FrameworkProcessor

In [12]:
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn import SKLearn

# Handles processing tasks for jobs using a machine learning framework
sklearn_processor = FrameworkProcessor(
    estimator_cls = SKLearn,
    framework_version = "1.2-1",
    instance_type = "ml.m5.xlarge",
    instance_count = processing_instance_count,
    base_job_name = "sklearn-pred-maint-process",
    role = sagemaker_role,
    sagemaker_session = pipeline_session,
)

### Creating Processing Step

In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# Runs a processing job
processor_args = sklearn_processor.run(
    code = "preprocessing.py",
    source_dir = "scripts"
)

# Creates the pipeline step
preprocess_step = ProcessingStep(name = "PdM-Data-Read-And-PreProcessing", step_args = processor_args)



In [14]:
# Runs a processing job
train_test_args = sklearn_processor.run(
    outputs = [
        ProcessingOutput(output_name = "train", source = "/opt/ml/processing/train"),
        ProcessingOutput(output_name = "test", source = "/opt/ml/processing/test"),
    ],
    code = "train_test_split_data.py",
    source_dir = "scripts"
)

# Creates the pipeline step
train_test_split_step = ProcessingStep(name = "PdM-Train-Test-Data-Split", step_args = train_test_args, depends_on = [preprocess_step.name])

In [15]:
# Runs a processing job
fs_data = sklearn_processor.run(
    code = "featurestore.py",
    source_dir = "scripts"
)

# Creates the pipeline step
feature_store_step = ProcessingStep(name = "PdM-FeatureStore-Creation", step_args = fs_data, depends_on = [preprocess_step.name])

### Define a Training Step to Train a Model

In [16]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.sklearn.model import SKLearnModel

In [17]:
# SKLearn estimator is used for end-to-end training and deployment
sklearn_estimator = SKLearn(
    entry_point = "scripts/rf_script-no-featurenames.py",
    role = sagemaker_role,
    instance_count = 1,
    instance_type = "ml.m5.xlarge",
    framework_version = "1.2-1",
    base_job_name = "rf-scikit",
    hyperparameters = {
        "n-estimators": 100,
        "min-samples-leaf": 3,
    },
    sagemaker_session = pipeline_session,
)

In [18]:
train_args = sklearn_estimator.fit(
    inputs = {
        "train": TrainingInput(
            s3_data = train_test_split_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type = "text/csv"
        ),
        "test": TrainingInput(
            s3_data = train_test_split_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type = "text/csv",
        )
    }
)

In [19]:
# Creates the pipeline step
training_step = TrainingStep(
    name = "PdM-ModelTraininig",
    step_args = train_args,
    depends_on = [train_test_split_step.name]
)

### Define a Model Step to Create a Model

### Define a Register Model Step to Create a Model Package

In [20]:
from sagemaker.workflow.step_collections import RegisterModel

# Creates the pipeline step to register model
register_step = RegisterModel(
    name = "PdM-Register-Model",
    estimator = sklearn_estimator,
    model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types = ["text/csv"],
    response_types = ["text/csv"],
    inference_instances = ["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances = ["ml.m5.xlarge"],
    model_package_group_name = "predictive-maintenance",
    approval_status = model_approval_status,
)

In [179]:
deploy_args = sklearn_processor.run(
    code = "deploy_model.py",
    source_dir = "scripts",
    arguments = [
        "--model-data",
        training_step.properties.ModelArtifacts.S3ModelArtifacts,
        "--region",
        region,
        "--endpoint-instance-type",
        "ml.m5.xlarge",
        "--endpoint-name",
        "PdM-SKLearn-Pipeline-Endpoint-ReTraining",
    ],
)

# Creates the pipeline step
deploy_step = ProcessingStep(
    name = "PdM-DeployModel",
    step_args = deploy_args
)



### Define a Pipeline of Parameters, Steps, and Conditions

In [180]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "Predictive-Maintenance-Pipeline"
pipeline = Pipeline(
    name = pipeline_name,
    parameters = [
        processing_instance_count,
        instance_type,
        model_approval_status,
    ],
    steps = [preprocess_step, 
             feature_store_step, 
             train_test_split_step, 
             training_step,
             register_step, 
             deploy_step],
)

In [None]:
definition = json.loads(pipeline.definition())
definition

In [None]:
# Update the pipeline execution
pipeline.upsert(role_arn = sagemaker_role)

In [183]:
# Execute the pipeline
execution = pipeline.start()

In [None]:
# List status of the pipeline steps
execution.list_steps()

### List and Check the Endpoint

In [None]:
endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName = "PdM-SKLearn-Pipeline-Endpoint-ReTraining")
endpoint_info["EndpointStatus"]

In [None]:
sagemaker_boto_client.list_endpoints(NameContains = "PdM-SKLearn-Pipeline-Endpoint-ReTraining")[
    "Endpoints"
]

### Model Monitoring - Data Drift

In [193]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

test_data_drift_monitor = DefaultModelMonitor(
    role = sagemaker_role,
    instance_count = 1,
    instance_type = "ml.m5.xlarge",
    volume_size_in_gb = 1,
    max_runtime_in_seconds = 360,
    sagemaker_session = sagemaker_session,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


### Suggest Baseline

In [None]:
from datetime import datetime

baseline_data = "datasets/train-test/test_data_with_headers.csv"
baseline_results_uri = f"s3://{bucket}/{prefix}/data/baselining/test-header-data-results"
baseline_job_name = f"PdM-Baseline-Job-Data-Monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

test_data_drift_monitor.suggest_baseline(
    job_name = baseline_job_name,
    baseline_dataset = baseline_data,
    dataset_format = DatasetFormat.csv(header = True),
    output_s3_uri = baseline_results_uri,
)

In [None]:
# Fetch the latest baselining job
baseline_job = test_data_drift_monitor.latest_baselining_job

### Preprocessing Script for monitoring schedule data

In [233]:
%%writefile scripts/datacapture_preprocessing.py

import json

def preprocess_handler(inference_record):
    input_data = json.loads(inference_record.endpoint_input.data)
    input_data = {f"feature{str(i).zfill(10)}": val for i, val in enumerate(input_data)}

    output_data = json.loads(inference_record.endpoint_output.data)
    output_data = {"prediction0": output_data}

    print(input_data)
    print(type(input_data))
    print(output_data)
    return {**input_data}

Overwriting scripts/datacapture_preprocessing.py


In [234]:
s3_client.upload_file(
    Filename="scripts/datacapture_preprocessing.py", Bucket=bucket, Key=f"{prefix}/code/datacapture_preprocessing.py"
)

### Create Monitoring Schedule

In [217]:
preprocessor_path = f"s3://{bucket}/{prefix}/code/datacapture_preprocessing.py"

In [None]:
monitor_schedule_name = "PdM-DataDrift-Monitoring-Schedule-Header-Data"
endpoint_name = "PdM-SKLearn-Pipeline-Endpoint-ReTraining"

In [218]:
from sagemaker.model_monitor import CronExpressionGenerator

test_data_drift_monitor.create_monitoring_schedule(
    monitor_schedule_name = monitor_schedule_name,
    statistics = test_data_drift_monitor.baseline_statistics(),
    record_preprocessor_script = preprocessor_path,
    endpoint_input = endpoint_name,
    constraints = test_data_drift_monitor.suggested_constraints(),
    schedule_cron_expression = CronExpressionGenerator.hourly(),
    output_s3_uri = baseline_results_uri,
    enable_cloudwatch_metrics = True,
)

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: PdM-DataDrift-Monitoring-Schedule-2


### Invoke Endpoint

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer, JSONSerializer

predictor = Predictor(
    endpoint_name = endpoint_name, 
    sagemaker_session = sagemaker_session,
    serializer = JSONSerializer()
)

In [None]:
import json
import time

for item in test_data.to_numpy()[100:200]:
    item = [item.tolist()]
    result = predictor.predict(item)
    time.sleep(0.5)

### Check the monitoring schedule Execution status

In [None]:
len(test_data_drift_monitor.list_executions())

In [None]:
test_data_drift_monitor.list_executions()[-1].describe()["ProcessingJobStatus"]

In [None]:
test_data_drift_monitor.list_executions()[-1].describe()["ExitMessage"]

In [None]:
# Check for the latest violations if any generated
violations = test_data_drift_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)