In [1]:
%load_ext autoreload
%autoreload 2

# Import Libs

In [2]:
import yaml
import pandas as pd
import numpy as np
from package.utils import DotDict

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [3]:
with open('sagemaker_configuration.yaml', 'r') as f:
    input_dict = yaml.safe_load(f)
    conf = DotDict(input_dict=input_dict)

# Set Environment

In [4]:
ENV = 'local'
# ENV = 'cloud'

In [5]:
import sys
import json

import boto3
import sagemaker

if ENV=='local':
    from sagemaker.workflow.pipeline_context import LocalPipelineSession as PipelineSession
else:
    from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession(default_bucket=conf.BUCKET)
region = pipeline_session.boto_region_name

role = sagemaker.get_execution_role()

print(f"""region: {region}
bucket: {conf.BUCKET}
role: {role}
""")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
region: ap-southeast-1
bucket: tli-crm-segmentation
role: arn:aws:iam::140663717579:role/service-role/AmazonSageMaker-ExecutionRole-20220606T212656



# Pipeline Global Param

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

from sagemaker.workflow.functions import Join

In [7]:
bucket_param = ParameterString(
    name="BucketName",
    default_value=conf.BUCKET
)
model_name_param = ParameterString(
    name="ModelName",
)
# model_param_params = ParameterString(
#     name="ModelParam",
# )
# customer_group_param = ParameterString(
#     name="CustomerGroup",
# )
customer_groups = ['99pct', '0anp', '1hold', 'other']
model_params_dict = {
    '99pct': {'n_clusters': 3, 'random_state': 0},
    '0anp': {'n_clusters': 4, 'random_state': 0},
    '1hold': {'n_clusters': 4, 'random_state': 0},
    'other': {'n_clusters': 3, 'random_state': 0}
}
customer_group_params = {}
model_param_params = {}
for group in customer_groups:
    customer_group_params[group] = ParameterString(name=f"CustomerGroup-{group}")
    model_param_params[group] = ParameterString(name=f"ModelParam-{group}")

# Building a train Pipeline

## Create a processing

In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor = SKLearnProcessor(
    framework_version=conf.FRAMEWORK_VERSION,
    instance_type=conf.INSTANCE_TYPE,
    instance_count=conf.INSTANCE_COUNT,
    base_job_name=conf.STEP_PROCESSING,
    role=role,
    sagemaker_session=pipeline_session,
)

In [9]:
input_raw_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/raw/'])
input_model_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/artifact/bene_model.pkl'])
output_processing_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/processed'])

processing_inputs = [
    ProcessingInput(input_name="RawInput", source=input_raw_var, destination="/opt/ml/processing/input/raw"),
    ProcessingInput(input_name="ModelInput", source=input_model_var, destination="/opt/ml/processing/input/model"),
    ProcessingInput(input_name='Package', source='package', destination="/opt/ml/processing/input/code/package"),
]

processing_outputs = [
    ProcessingOutput(output_name="FeatureOutput", source="/opt/ml/processing/output", destination=output_processing_var),
]

In [10]:
from sagemaker.workflow.steps import ProcessingStep

processing_args = sklearn_processor.run(
    code='01_processing.py',
    inputs=processing_inputs,
    outputs=processing_outputs,
)

step_processing = ProcessingStep(name=conf.STEP_PROCESSING, 
                                 step_args=processing_args, 
                           )



## Create a grouping

In [11]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor = SKLearnProcessor(
    framework_version=conf.FRAMEWORK_VERSION,
    instance_type=conf.INSTANCE_TYPE,
    instance_count=conf.INSTANCE_COUNT,
    base_job_name=conf.STEP_GROUPING,
    role=role,
    sagemaker_session=pipeline_session,
)

In [12]:
# input_processing_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/processed'])
# output_grouping_feature_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/grouped'])
# output_grouping_scaler_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/artifact/scaler'])

# grouping_inputs = [
#     ProcessingInput(input_name="ProcessingInput", source=input_processing_var, destination="/opt/ml/processing/input/"),
#     ProcessingInput(input_name='Package', source='package', destination="/opt/ml/processing/input/code/package"),
# ]

# grouping_outputs = [
#     ProcessingOutput(output_name="FeatureOutput", source="/opt/ml/processing/output/feature", destination=output_grouping_feature_var),
#     ProcessingOutput(output_name="ScalerOutput", source="/opt/ml/processing/output/scaler", destination=output_grouping_scaler_var),
# ]

In [13]:
input_processing_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/processed'])
input_raw_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/raw/tay_ds_customer_profile_202505091441.csv'])

output_grouping_feature_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/grouped'])
output_grouping_scaler_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/artifact/scaler'])

grouping_inputs = [
    ProcessingInput(input_name="ProcessingInput", source=input_processing_var, destination="/opt/ml/processing/input/processed"),
    ProcessingInput(input_name="CustomerProfileInput", source=input_raw_var, destination="/opt/ml/processing/input/raw"),
    ProcessingInput(input_name='Package', source='package', destination="/opt/ml/processing/input/code/package"),
]

grouping_outputs = [
    ProcessingOutput(output_name="FeatureOutput", source="/opt/ml/processing/output/feature", destination=output_grouping_feature_var),
    ProcessingOutput(output_name="ScalerOutput", source="/opt/ml/processing/output/scaler", destination=output_grouping_scaler_var),
]

In [14]:
from sagemaker.workflow.steps import ProcessingStep

grouping_args = sklearn_processor.run(
    code='02_grouping.py',
    inputs=grouping_inputs,
    outputs=grouping_outputs,
)

step_grouping = ProcessingStep(name=conf.STEP_GROUPING, 
                               step_args=grouping_args, 
                               depends_on=[step_processing.name]
                               )

## Create a clustering

In [15]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor = SKLearnProcessor(
    framework_version=conf.FRAMEWORK_VERSION,
    instance_type=conf.INSTANCE_TYPE,
    instance_count=conf.INSTANCE_COUNT,
    base_job_name=conf.STEP_CLUSTERING,
    role=role,
    sagemaker_session=pipeline_session,
)

In [16]:
input_grouping_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/grouped'])
output_clustering_model_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/artifact/model'])
output_clustering_log_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/log'])
output_clustering_predict_var = Join(on='', values=['s3://', bucket_param, '/data-zones/neutral/inferenced'])

clustering_inputs = [
    ProcessingInput(input_name="ClusteringInput", source=input_grouping_var, destination="/opt/ml/processing/input/"),
    ProcessingInput(input_name='Package', source='package', destination="/opt/ml/processing/input/code/package"),
]

clustering_outputs = [
    ProcessingOutput(output_name="ModelOutput", source="/opt/ml/processing/output/model", destination=output_clustering_model_var),
    ProcessingOutput(output_name="LogOutput", source="/opt/ml/processing/output/log", destination=output_clustering_log_var),
    ProcessingOutput(output_name="PredictOutput", source="/opt/ml/processing/output/inferenced", destination=output_clustering_predict_var),
]

In [17]:
# Initialize an empty list to store processing steps
clustering_steps = []

for group in customer_groups:
    # Create processing arguments for this specific customer group
    clustering_args = sklearn_processor.run(
        code='03_clustering.py',
        inputs=clustering_inputs,
        outputs=clustering_outputs,
        arguments=[
            "--model-name", model_name_param,
            "--model-param", model_param_params[group],
            "--customer-group", customer_group_params[group]  # Passing the customer group parameter dynamically
        ]
    )
    
    # Create the ProcessingStep
    step_clustering = ProcessingStep(
        name=f"{conf.STEP_CLUSTERING}-{group}",  # Step name includes the customer group
        step_args=clustering_args,
        # depends_on=[step_grouping.name],
    )

    # Add the step to the list of clustering steps
    clustering_steps.append(step_clustering)

In [18]:
# from sagemaker.workflow.steps import ProcessingStep

# clustering_args = sklearn_processor.run(
#         code='0x_clustering_test-Copy1.py',
#         inputs=clustering_inputs,
#         outputs=clustering_outputs,
#         arguments=[
#             "--model-name", model_name_param,
#             "--model-param", model_param_params,
#             "--customer-group", customer_group_param
#         ]
#     )

# step_clustering = ProcessingStep(
#         name=conf.STEP_CLUSTERING,
#         step_args=clustering_args,
#         depends_on=[step_grouping.name],
#     )

# Create a train pipeline

In [19]:
# from sagemaker.workflow.pipeline import Pipeline
# import json

# # model_param = {'n_clusters': 3, 'random_state': 0}
# model_name = 'kmeans'

# train_pipeline = Pipeline(
#     name=f'{conf.PIPELINE_NAME}-neutral',
#     parameters=[
#         bucket_param,
#         model_name_param,
#         *model_param_params.values(),
#         *customer_group_params.values(),  # Add all customer group parameters here
#     ],
#     steps=
#     # [
#     #     step_processing, 
#     #     step_grouping] +
#     clustering_steps,
#     sagemaker_session=pipeline_session,
# )

# train_pipeline.upsert(role_arn=role)

# execution = train_pipeline.start(
#     parameters={
#         "ModelName": model_name,
#         **{f"ModelParam-{group}": json.dumps(model_params_dict[group]) for group in customer_groups},
#         **{f"CustomerGroup-{group}": group for group in customer_groups}  # Pass each customer group parameter dynamically
#     }
# )

In [20]:
from sagemaker.workflow.pipeline import Pipeline
import json

model_name = 'kmeans'

train_pipeline = Pipeline(
    name=f'{conf.PIPELINE_NAME}-neutral',
    parameters=[
        bucket_param,
        # model_name_param,
        # *model_param_params.values(),
        # *customer_group_params.values(),  # Add all customer group parameters here
    ],
    steps=
    [
        # step_processing, 
        step_grouping
    ],
    # clustering_steps,
    sagemaker_session=pipeline_session,
)

train_pipeline.upsert(role_arn=role)

execution = train_pipeline.start(
    parameters={
        # "ModelName": model_name,
        # **{f"ModelParam-{group}": json.dumps(model_params_dict[group]) for group in customer_groups},
        # **{f"CustomerGroup-{group}": group for group in customer_groups}  # Pass each customer group parameter dynamically
    }
)

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.entities:Starting execution for pipeline CustomerSegmentation-neutral. Execution ID is 1b46e846-350e-4cd6-bf44-fe2410e76234
INFO:sagemaker.local.entities:Starting pipeline step: 'PROCESSING'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview

 Container 8o9mmnn39u-algo-1-ktj2r  Creating
 Container 8o9mmnn39u-algo-1-ktj2r  Created
Attaching to 8o9mmnn39u-algo-1-ktj2r
8o9mmnn39u-algo-1-ktj2r  | Collecting pyyaml (from -r /opt/ml/processing/input/code/package/requirements.txt (line 2))
8o9mmnn39u-algo-1-ktj2r  |   Downloading PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.1 kB)
8o9mmnn39u-algo-1-ktj2r  | Downloading PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (737 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m737.4/737.4 kB[0m [31m50.7 MB/s[0m eta [36m0:00:00[0mta [36m-:--:--[0m
8o9mmnn39u-algo-1-ktj2r  | [?25hInstalling collected packages: pyyaml
8o9mmnn39u-algo-1-ktj2r  | Successfully installed pyyaml-6.0.2
8o9mmnn39u-algo-1-ktj2r  | function: get_config is starting...
8o9mmnn39u-algo-1-ktj2r  | function: get_config successfully executed at 0.004592180252075195s
8o9mmnn39u-algo-1-ktj2r  | current dir: ['bin', 'boot', 'dev', 'etc', 'home'

INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'PROCESSING' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'GROUPING'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.image:'Docker Compose' is not installed. Proceeding to check for 'docker-compose' CLI.
INFO:sagemaker.local.image:'Docker Compose' found using Docker Compose CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.local.image:No AWS credentials fou

 Container nkmky5lolm-algo-1-m5qx5  Creating
 Container nkmky5lolm-algo-1-m5qx5  Created
Attaching to nkmky5lolm-algo-1-m5qx5
nkmky5lolm-algo-1-m5qx5  | current dir: ['bin', 'boot', 'dev', 'etc', 'home', 'lib', 'lib32', 'lib64', 'libx32', 'media', 'mnt', 'opt', 'proc', 'root', 'run', 'sbin', 'srv', 'sys', 'tmp', 'usr', 'var', '.dockerenv', 'miniconda3', 'libffi7_3.3-4_amd64.deb']
nkmky5lolm-algo-1-m5qx5  | package dir: ['.ipynb_checkpoints', '__pycache__', 'clustering', 'grouping', 'preprocessing', 'requirements.txt', 'utils.py']
nkmky5lolm-algo-1-m5qx5  | input processed dir: ['raw', 'code', 'processed']
nkmky5lolm-algo-1-m5qx5  | input raw dir: ['tay_ds_customer_profile_202505091441.csv']
nkmky5lolm-algo-1-m5qx5  | Collecting pyyaml (from -r /opt/ml/processing/input/code/package/requirements.txt (line 2))
nkmky5lolm-algo-1-m5qx5  |   Downloading PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.1 kB)
nkmky5lolm-algo-1-m5qx5  | Downloading PyYAML-6.0.2-

INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'GROUPING' SUCCEEDED.
INFO:sagemaker.local.entities:Pipeline execution 1b46e846-350e-4cd6-bf44-fe2410e76234 SUCCEEDED


In [76]:
execution.__dict__

{'pipeline': <sagemaker.workflow.pipeline.Pipeline at 0x7f4fbdbf7be0>,
 'pipeline_execution_name': '22d71b13-7e91-4841-836e-cae6a9cd4938',
 'pipeline_execution_description': None,
 'pipeline_execution_display_name': None,
 'local_session': <sagemaker.workflow.pipeline_context.LocalPipelineSession at 0x7f4fbe16b4f0>,
 'status': 'Failed',
 'failure_reason': "Step 'GROUPING' failed with message: RuntimeError: Failed to run: ['docker-compose', '-f', '/tmp/tmpjqlgf4m5/docker-compose.yaml', 'up', '--build', '--abort-on-container-exit']. Process exited with code: 1",
 'creation_time': 1749103563.499863,
 'last_modified_time': 1749103583.34354,
 'step_execution': {'GROUPING': <sagemaker.local.entities._LocalPipelineExecutionStep at 0x7f4fbde778e0>},
 'pipeline_dag': <sagemaker.workflow.pipeline.PipelineGraph at 0x7f4fbdbf5960>,
 'pipeline_parameters': {'BucketName': 'tli-crm-segmentation'},
 '_blocked_steps': {}}

In [40]:
# definition['Parameters']

# Create an inference pipeline

In [41]:
# from sagemaker.workflow.pipeline import Pipeline

# inference_pipeline = Pipeline(
#     name=f'{conf.PIPELINE_NAME}-inference-sumas-neutral-ter',
#     parameters=[
#         bucket_param, train_dt_param
#     ],
#     steps=[
#         step_process,
#         step_inference,
#         step_pack,
#         step_strategy
#     ],
#     sagemaker_session=pipeline_session,
# )

# definition = json.loads(inference_pipeline.definition())
# inference_pipeline.upsert(role_arn=role)
# execution = inference_pipeline.start(parameters={
#     'train_dt':'202410'
# })

# Start Flow with StepFunctions

## Train flow

In [19]:
# import boto3
# import json

# load_dt = '202410'

# stepfunctions_client = boto3.client('stepfunctions')
# state_machine_arn = 'arn:aws:states:ap-southeast-1:140663717579:stateMachine:RecSysStemFunctionPipeline'

# run_input = {
#     "input_param": {
#         "flow":"train",
#         "load_dt":load_dt,
#         "sagemaker_pipeline": f'{conf.PIPELINE_NAME}-train-sumas-neutral-ter',
#     }
# }

# stepfunctions_client.start_execution(
#                 stateMachineArn=state_machine_arn, input=json.dumps(run_input)
#             )

## Inference flow

In [4]:
import yaml
from package.utils import DotDict

with open('sagemaker_configuration.yaml', 'r') as f:
    input_dict = yaml.safe_load(f)
    conf = DotDict(input_dict=input_dict)

import boto3
import json

load_dt = '202502'
train_dt = '202410'

stepfunctions_client = boto3.client('stepfunctions')
state_machine_arn = 'arn:aws:states:ap-southeast-1:140663717579:stateMachine:RecSysStemFunctionPipeline'

run_input = {
    "input_param": {
        "flow":"inference",
        "load_dt":load_dt,
        "sagemaker_pipeline": f'{conf.PIPELINE_NAME}-inference-sumas-neutral-ter',
        "train_dt":train_dt,
    }
}

stepfunctions_client.start_execution(
                stateMachineArn=state_machine_arn, input=json.dumps(run_input)
            )

{'executionArn': 'arn:aws:states:ap-southeast-1:140663717579:execution:RecSysStemFunctionPipeline:5107f11e-e173-4dad-9f9f-bc0d82772b34',
 'startDate': datetime.datetime(2025, 2, 26, 10, 7, 39, 24000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '9cb3c6f2-b679-4da3-8b8a-0ee90ea60826',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9cb3c6f2-b679-4da3-8b8a-0ee90ea60826',
   'date': 'Wed, 26 Feb 2025 10:07:39 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '164',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}