In [1]:
import mlrun
import os
import sagemaker
import boto3

In [None]:
# Set the base project name
project_name = 'sagemaker-payment'

In [None]:
project = mlrun.get_or_create_project("sagemaker-payment", "./")

In [3]:
mlrun.set_env_from_file("env.var")

In [4]:
# Getting our model training function
project.set_function('./src/train.py', requirements=['sagemaker'],
                     name='train', kind='job', image='mlrun/mlrun')

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f2b7d4a6310>

In [5]:
evaluate_function = project.set_function(
    "src/evaluate.py",
    "evaluate",
    handler="evaluate",
    image="mlrun/mlrun",
    requirements=["xgboost"],
)

In [6]:
%%writefile src/workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions
from mlrun import get_or_create_ctx

from mlrun import (
    build_function,
    deploy_function,
    import_function,
    run_function,
)

    
@dsl.pipeline(
    name="Fraud Detection Pipeline",
    description="Detecting fraud from a transactions dataset"
)

def kfpipeline(evaluate_path):    

    project = mlrun.get_current_project()  

    project.get_function('train',sync=True)    
    

    train = project.run_function(name='train',
                               function='train',
                               handler='train',
                               params={},
                               outputs=["model"])
    
    ctx = get_or_create_ctx("kfp")
    ctx.logger.info(train.outputs)
    
    
    # evaluating the model
    evaluate_function = project.get_function("evaluate")
    
    #running
    evaluate_run = project.run_function(
    handler="evaluate",
    params={
        "model_path": train.outputs['model_path'],
        "model_name": "xgboost-model",
        "test_set": evaluate_path,
        "label_column": "transaction_category",        
    },
    returns=["classification_report: dataset"])
    
    
                                          
    # deploying serving function
    serving_function = project.get_function("serving")    
    

    if serving_function.spec.graph is None:

        # Set the topology and get the graph object:
        graph = serving_function.set_topology("flow", engine="async")

        # Add the steps:
        graph.to("XGBModelServer",
                 name="xgboost-model",
                 model_path=train.outputs['model_path']) \
             .to(handler="postprocess", name="postprocess").respond()


    # Set the desired requirements:
 
    # Deploy the serving function:
    project.deploy_function("serving")


Writing src/workflow.py


In [7]:
workflow_name = "workflow"
project.set_workflow(name=workflow_name, workflow_path="src/workflow.py")

In [8]:
sagemaker_role = os.environ["SAGEMAKER-ROLE"]
region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)
role = sagemaker_role
bucket_prefix = "payment-classification"
s3_bucket = sagemaker_session.default_bucket()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [9]:
s3_data = "s3://{}/{}/test/test.csv".format(s3_bucket, bucket_prefix)

### 1. Run the pipeline local <a class="anchor" id="Setup"></a>

In [11]:
project.run(workflow_name ,watch = True ,local = True, arguments={"evaluate_path": s3_data})



> 2024-01-24 08:46:57,206 [info] Storing function: {'name': 'train', 'uid': '4371cd9a7205467c83a198dd9e2d7cb1', 'db': None}
> 2024-01-24 08:46:58,577 [info] arn:aws:iam::934638699319:role/service-role/AmazonSageMaker-ExecutionRole-20231207T170664


INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2024-01-24-08-47-03-622



2024-01-24 08:47:04 Starting - Starting the training job...
2024-01-24 08:47:26 Starting - Preparing the instances for training............
2024-01-24 08:48:34 Downloading - Downloading input data.....
2024-01-24 08:49:04 Downloading - Downloading the training image..........
2024-01-24 08:50:00 Training - Training image download completed. Training in progress.................................
2024-01-24 08:52:46 Uploading - Uploading generated training model..
2024-01-24 08:53:02 Completed - Training job completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
sagemaker-payment,...9e2d7cb1,0,Jan 24 08:46:57,completed,train,workflow=5621091aa0034eb0bf5d58b8712f89d1v3io_user=adminkind=localowner=adminhost=default,,,,model_path





> 2024-01-24 08:53:02,772 [info] Run execution finished: {'status': 'completed', 'name': 'train'}
> 2024-01-24 08:53:02,772 [info] {'model_path': 'store://artifacts/sagemaker-payment/train_model_path@5621091aa0034eb0bf5d58b8712f89d1'}
> 2024-01-24 08:53:02,773 [error] Workflow run failed: {'exc_info': ['Traceback (most recent call last):\n', '  File "/home/sagemaker-user/.conda/envs/smdemo/lib/python3.9/site-packages/mlrun/projects/pipelines.py", line 746, in run\n    workflow_handler(**workflow_spec.args)\n', '  File "./src/workflow.py", line 40, in kfpipeline\n    evaluate_run = project.run_function(\n', "TypeError: run_function() missing 1 required positional argument: 'function'\n"]}


uid,start,state,name,parameters,results
...9e2d7cb1,Jan 24 08:46:57,completed,train,,


5621091aa0034eb0bf5d58b8712f89d1

### 1. Run the pipeline remotely <a class="anchor" id="Setup"></a>

In [12]:
project.run(workflow_name,         
            watch=True,arguments= {"evaluate_path": s3_data})

> 2024-01-24 08:53:02,965 [info] {'model': {{pipelineparam:op=train;name=model}}, 'run_id': {{pipelineparam:op=train;name=run_id}}}


KeyError: 'model_path'