In [18]:
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
import sagemaker
from sagemaker.workflow.parameters import ParameterInteger
from sagemaker.workflow.execution_variables import ExecutionVariables

# GLOBAL VARIABLES

In [None]:
pipeline_name = "pipeline-inference"
role = sagemaker.get_execution_role()
instance_type = "ml.m5.large"
cod_month = ParameterInteger(name="PeriodoCarga")
tracking_server_arn = 'arn:aws:sagemaker:us-east-1:654654589924:mlflow-tracking-server/mlops-utec-mlflow-server'
experiment_name = "pipeline-inference-experiment"
model_name = "credit-card-fraud-detection"
model_version = "latest"

# DATA PULL

In [6]:
%%writefile data_pull_requirements.txt
awswrangler==3.12.0

Overwriting data_pull_requirements.txt


In [7]:
@step(
    name="DataPull",
    instance_type=instance_type,
    dependencies="./data_pull_requirements.txt"
)
def data_pull(experiment_name: str, run_name: str,
              cod_month: int) -> tuple[str, str, str]:
    import awswrangler as wr
    import mlflow

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    query = """
    WITH TRAIN as
    (
        SELECT  transaction_id
                ,customer_id
                ,amount
                ,merchant_category
                ,merchant_country
                ,cast(card_present as int) card_present
                ,cast(is_fraud as int) is_fraud
                ,cast(date_format(timestamp,'%Y%m') as int) as cod_month
                ,COUNT(1) OVER(PARTITION BY customer_id ORDER BY timestamp RANGE BETWEEN INTERVAL '1' month PRECEDING AND CURRENT ROW) as trx_vel_last_1mths
                ,COUNT(1) OVER(PARTITION BY customer_id ORDER BY timestamp RANGE BETWEEN INTERVAL '2' MONTH PRECEDING AND CURRENT ROW) as trx_vel_last_2mths
                ,SUM(amount) OVER(PARTITION BY customer_id ORDER BY timestamp RANGE BETWEEN INTERVAL '1' MONTH PRECEDING AND CURRENT ROW) as amt_vel_last_1mths
                ,SUM(amount) OVER(PARTITION BY customer_id ORDER BY timestamp RANGE BETWEEN INTERVAL '2' MONTH PRECEDING AND CURRENT ROW) as amt_vel_last_2mths
        FROM    RISK_MANAGEMENT.CREDIT_CARD_TRANSACTIONS
        WHERE   is_fraud is not null
    )
    SELECT  *
    FROM    TRAIN
    WHERE   cod_month = {}
    """.format(cod_month)

    inf_raw_s3_path = f"s3://mlops-utec-rpa/fraud-detection/inf-raw-data/{cod_month}.csv"
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id
        with mlflow.start_run(run_name="DataPull", nested=True):
            df = wr.athena.read_sql_query(sql=query, database="risk_management")
            df.to_csv(inf_raw_s3_path, index=False)
            mlflow.log_input(
                mlflow.data.from_pandas(df, inf_raw_s3_path),
                context="DataPull"
            )
    return inf_raw_s3_path, experiment_name, run_id

# MODEL INFERENCE

In [8]:
%%writefile model_training_requirements.txt
mlflow==2.13.2
sagemaker-mlflow==0.1.0

Overwriting model_training_requirements.txt


In [20]:
@step(
    name="ModelInference",
    instance_type=instance_type,
    dependencies="./model_training_requirements.txt"
)
def model_inference(inf_raw_s3_path: str, experiment_name: str,
                    run_id: str, cod_month: int) -> tuple[str, str, str]:
    import pandas as pd
    import mlflow

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    FEATURES = ['card_present', 'trx_vel_last_1mths', 'trx_vel_last_2mths',
                'amt_vel_last_1mths', 'amt_vel_last_2mths']
    model_uri = f"models:/{model_name}/{model_version}"
    df = pd.read_csv(inf_raw_s3_path)
    X = df[FEATURES]
    model = mlflow.xgboost.load_model(model_uri)
    df["prob"] = model.predict_proba(X)[:, 1]
    inf_proc_s3_path = f"s3://mlops-utec-rpa/fraud-detection/inf-proc-data/{cod_month}.csv"

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelInference", nested=True):
            df.to_csv(inf_proc_s3_path, index=False)
            mlflow.log_input(
                mlflow.data.from_pandas(df, inf_proc_s3_path),
                context="ModelInference"
            )
    return inf_proc_s3_path, experiment_name, run_id

# DATA PUSH

In [25]:
@step(
    name="DataPush",
    instance_type=instance_type,
    dependencies="./data_pull_requirements.txt"
)
def data_push(inf_proc_s3_path: str,experiment_name: str,run_id: str, cod_month: int):
    
    import pandas as pd
    import mlflow
    import numpy as np
    from datetime import datetime
    import pytz
    import awswrangler as wr

    ID_COL = "transaction_id"
    TIME_COL = "cod_month"
    PRED_COL = "prob"
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)

    df = pd.read_csv(inf_proc_s3_path)
    df['fraud_profile'] = np.where(df[PRED_COL] >= 0.415, 'High risk',
                                   np.where(df[PRED_COL] >= 0.285, 'Medium risk',
                                   'Low risk'))

    df['model'] = model_name
    timezone = pytz.timezone("America/Lima")
    df['load_date'] = datetime.now(timezone).strftime("%Y%m%d")
    df['order'] = df.prob.rank(method='first', ascending=False).astype(int)

    inf_posproc_s3_path = "s3://mlops-utec-rpa/fraud-detection/inf-posproc-data"
    inf_posproc_s3_path_partition = inf_posproc_s3_path + f'/{TIME_COL}={cod_month}/output.parquet'
    database = 'risk_management'
    table_name = database + '.fraud_detection'

    # Pushing data to S3 path
    df = df[[ID_COL, PRED_COL, 'model','fraud_profile','load_date', 'order', TIME_COL]] 
    df.to_parquet(inf_posproc_s3_path_partition, engine='pyarrow', compression='snappy')

    # Creating table
    ddl = f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
    {ID_COL} string,
    {PRED_COL} double,
    model string,
    fraud_profile string,
    load_date string,
    order int
    )
    PARTITIONED BY ({TIME_COL} int)
    STORED AS parquet
    LOCATION '{inf_posproc_s3_path}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
    """
    query_exec_id = wr.athena.start_query_execution(sql=ddl, database=database)
    wr.athena.wait_query(query_execution_id=query_exec_id)

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="DataPush", nested=True):
                mlflow.log_input(
                mlflow.data.from_pandas(df, inf_posproc_s3_path_partition),
                context="DataPush"
            )
    # Refreshing partition
    dml = f"MSCK REPAIR TABLE {table_name}"
    query_exec_id = wr.athena.start_query_execution(sql=dml, database=database)
    wr.athena.wait_query(query_execution_id=query_exec_id)

# PIPELINE

In [26]:
data_pull_step = data_pull(experiment_name=experiment_name,
                           run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
                           cod_month=cod_month)

model_inference_step = model_inference(inf_raw_s3_path=data_pull_step[0],
                                     experiment_name=data_pull_step[1],
                                     run_id=data_pull_step[2],
                                       cod_month=cod_month)

data_push_step = data_push(inf_proc_s3_path=model_inference_step[0],
                                     experiment_name=model_inference_step[1],
                                     run_id=model_inference_step[2],
                                      cod_month=cod_month)

In [27]:
pipeline = Pipeline(name=pipeline_name,
                    steps=[data_pull_step, model_inference_step,data_push_step],
                    parameters=[cod_month])
pipeline.upsert(role_arn=role)

2025-06-02 03:49:09,719 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-635106763104/pipeline-inference/DataPull/2025-06-02-03-49-09-458/function
2025-06-02 03:49:09,790 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-1-635106763104/pipeline-inference/DataPull/2025-06-02-03-49-09-458/arguments
2025-06-02 03:49:10,117 sagemaker.remote_function INFO     Copied dependencies file at './data_pull_requirements.txt' to '/tmp/tmph8a9fl_z/data_pull_requirements.txt'
2025-06-02 03:49:10,162 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-east-1-635106763104/pipeline-inference/DataPull/2025-06-02-03-49-09-458/pre_exec_script_and_dependencies'
2025-06-02 03:49:10,165 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-635106763104/pipeline-inference/ModelInference/2025-06-02-03-49-0

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:635106763104:pipeline/pipeline-inference',
 'ResponseMetadata': {'RequestId': 'ba6c4b9e-e0b7-4db6-8d9e-bea5fab43155',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ba6c4b9e-e0b7-4db6-8d9e-bea5fab43155',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Mon, 02 Jun 2025 03:49:12 GMT'},
  'RetryAttempts': 0}}

In [28]:
pipeline.start(parameters={"PeriodoCarga": 202501},
               execution_display_name="test-inference-full2",
               execution_description="Testando inferece full2")

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:635106763104:pipeline/pipeline-inference/execution/8qem1zji5ut5', sagemaker_session=<sagemaker.session.Session object at 0x7fec8454a180>)

In [None]:
execution = pipeline.start()
execution.describe()
execution.wait()
execution.list_steps()