In [None]:
!pip install sagemaker==2.227.0

In [None]:
from steps.preprocess import preprocess_data
from steps.training_kmeans import train_kmeans
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterFloat, ParameterInteger, ParameterBoolean, ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker import get_execution_role
import sagemaker
import os

In [None]:
role = get_execution_role()
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = 'pipelines'

config_yaml = f"""
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        # RoleArn: <replace the role arn here>
        S3RootUri: s3://{bucket}/{prefix}
        InstanceType: ml.m5.xlarge
        Dependencies: ./requirements.txt
        IncludeLocalWorkDir: true
        PreExecutionCommands:
        - "sudo chmod -R 777 /opt/ml/model"
        CustomFileFilter:
          IgnoreNamePatterns:
          - "data/*"
          - "models/*"
          - "*.ipynb"
          - "__pycache__"

"""

print(config_yaml, file=open('config.yaml', 'w'))

In [None]:
%store -r

env_variables={'MLFLOW_TRACKING_URI': mlflow_arn, 'MLFLOW_EXPERIMENT_NAME': 'anomaly-detection'}

In [None]:
# Create a SageMaker Pipeline

os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

pipeline_name = f"anomaly-detection"

training_instance_type = ParameterString(
    name="training_instance_type", default_value="ml.m5.xlarge"
)

kmeans_nclusters = ParameterInteger(
    name="kmeans_nclusters", default_value=2
)

input_data_s3_uri = ParameterString(
    name="input_data_s3_uri", default_value=data_s3_uri
)

processing_step = step(preprocess_data, name="Preprocess", job_name_prefix=f"{pipeline_name}-Preprocess", keep_alive_period_in_seconds=300, environment_variables=env_variables, instance_type=training_instance_type)(input_data_s3_uri)

training_step = step(train_kmeans, name="Train", job_name_prefix=f"{pipeline_name}-Train", keep_alive_period_in_seconds=300, environment_variables=env_variables, instance_type=training_instance_type)(processing_step[0], kmeans_nclusters)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        kmeans_nclusters,
        input_data_s3_uri,
    ],
    steps=[processing_step, training_step],
    pipeline_definition_config=PipelineDefinitionConfig(use_custom_job_prefix=True),        
)

# Execute the pipeline in SageMaker
pipeline.upsert(role_arn=role)