### Taking configuration parameter values from config.json

In [3]:
## Loading the configurations from config.json file.
import json
with open("config.json") as file:
    build_parameters = json.load(file)

In [None]:
import boto3
import sagemaker

region = boto3.Session().region_name
pipeline_output_bucket = build_parameters["output_bucket"] 
sagemaker_session = sagemaker.session.Session(default_bucket = pipeline_output_bucket)
# sagemaker_session = sagemaker.session.Session()
# role = sagemaker.get_execution_role()
role = "arn:aws:iam::852619674999:role/service-role/AmazonSageMaker-ExecutionRole-20220427T124311"

print(role)
print(sagemaker_session.default_bucket())

### Handling Input

In [5]:
## Getting the subfolder pattern
import datetime
now = datetime.datetime.now()
year = now.year
month = now.month
if month < 10:
    month = f"0{month}"
day = now.day
if day < 10:
    day = f"0{day}"
subfolder_pattern = f"{year}{month}{day}"

In [14]:
input_bucket = build_parameters["input_bucket"]

y_actual = f"s3://{build_parameters['input_bucket']}/churn-bigml-20.csv"

# import s3fs
# s3 = s3fs.S3FileSystem(anon=False)
# scoring_output_location = f"s3://{build_parameters['output_bucket']}/Scoring_Pipeline_Output/{subfolder_pattern}-*/InferenceOutput/*.csv"
# y_predicted = f"s3://{s3.glob(scoring_output_location)[0]}"

### Monitoring

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

# framework_version = "0.23-1"
framework_version = build_parameters["sklearn_processor_framework_version"]

sklearn_processor = SKLearnProcessor(
#     framework_version = processing_framework_sklearn_version,
    framework_version = framework_version,
    instance_type = build_parameters["processing_instance_type"],
    instance_count= build_parameters["processing_instance_count"],
    base_job_name = f"{build_parameters['usecase']}-preprocessing",
    role=role
)


from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_monitoring = ProcessingStep(
    name = "comparing-actual-vs-predicted",
    description = "Comparing actual and predicted to generate model performance metrics",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source =  y_actual, destination="/opt/ml/processing/input/actual"),  
        # ProcessingInput(source= y_predicted, destination="/opt/ml/processing/input/predicted"),
        # ProcessingInput(source = f"s3://{pipeline_output_bucket}/Monitoring_Output/Monitor.csv", destination = "/opt/ml/processing/input/metrics")
    ],
    outputs=[
        ProcessingOutput(output_name = "metrics", source="/opt/ml/processing/input/metrics_output", destination = f"s3://{pipeline_output_bucket}/Monitoring_Output")
    ],
    code = f"s3://{build_parameters['input_bucket']}/codes/{build_parameters['monitoring_code_file_name']}",
    job_arguments = ["--y_actual_location", "/opt/ml/processing/input/actual", "--y_predicted_location", 
                     # "/opt/ml/processing/input/predicted",
                     f"s3://{build_parameters['output_bucket']}/Scoring_Pipeline_Output/{subfolder_pattern}-*/InferenceOutput/*.csv",
                     "metrics_input_location", f"s3://{pipeline_output_bucket}/Monitoring_Output/Monitor.csv",
                     "--metrics_output_location", "/opt/ml/processing/input/metrics_output"]
)


### Sending Mail

In [None]:
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

func = Lambda(
    function_arn = "arn:aws:lambda:ap-south-1:852619674999:function:model_performance_notification",
    handler = "monitoring.lambda_handler"
)
step_deploy_lambda = LambdaStep(
    name="SendMail",
    lambda_func = func,
    inputs={
        # "performance_file_location":f"s3://{pipeline_output_bucket}/Monitoring_Output/Monitor.csv",
        "performance_file_location":step_monitoring.properties.ProcessingOutputConfig.Outputs["metrics"].S3Output.S3Uri,
        "sns_topic_name":"arn:aws:sns:ap-south-1:852619674999:Approvals"
    }
)

### Arranging the steps inside pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline

# pipeline_name = f"{usecase}-training"
pipeline = Pipeline(
    name="Churn-Monitoring",
    steps = [step_monitoring, 
             # step_deploy_lambda
            ]
)

### Uploading the pipeline

In [None]:
pipeline.upsert(role_arn=role)