## **Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines**

In [2]:
!pip install virtualenv
venv_name="myEnvXimena"
!python3 -m venv "$venv_name"
!source "$venv_name/bin/activate"
!pip install -r requirements.txt
!pip install ipykernel
!python -m ipykernel install --user --name=myEnvXimena
!pip install -U sagemaker

[0mInstalled kernelspec myEnvXimena in /root/.local/share/jupyter/kernels/myenvximena
[0m

In [3]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName2"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [4]:
!mkdir -p data

In [5]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-example-files-prod-{region}").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

base_uri = f"s3://{default_bucket}/abalone2"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
s3://sagemaker-us-east-1-714430189195/abalone2/abalone-dataset.csv


In [6]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

#DUDA: Tenemos que descargar siempre los archivos a la instancia donde está corriendo nuestro jupyter notebook?

base_uri = f"s3://{default_bucket}/abalone2"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
s3://sagemaker-us-east-1-714430189195/abalone2/abalone-dataset-batch


### **Define Parameters to Parametrize Pipeline Execution**

In [7]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

#Se cambió tipo de instancia ml.t3.medium por ml.m4.xlarge
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m4.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

### **Define a Processing Step for Feature Engineering**
This notebook cell writes a file preprocessing_abalone.py, which contains the preprocessing script. 

In [8]:
!mkdir -p code

In [9]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    
    #Se definen pasos para los feature numericos. Imputación y estandarización.
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

     #Se definen pasos para los features categóricos.
    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )
    
    #Se define un único proceso compuesto por los N preprocesos que se aplicarán al dataset. 
    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Writing code/preprocessing.py


In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.t3.medium",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess2", step_args=processor_args)



#### **Define a Training Step to Train a Model**
Finally, we use the output of the estimator’s .fit() method as arguments to the TrainingStep. By passing the pipeline_session to the sagemaker_session, calling .fit() does not launch the training job, it returns the arguments needed to run the job as a step in the pipeline.

In [12]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput


#Se cambió tipo de instancia ml.t3.medium por ml.m4.xlarge
model_path = f"s3://{default_bucket}/AbaloneTrain2"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m4.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [13]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain2",
    step_args=train_args,
)

#### **Define a Model Evaluation Step to Evaluate the Trained Model**
This script loads an XGBoost model, evaluates its performance on a test dataset, computes the mean squared error, and writes the evaluation results to a JSON file within a specified directory.

In [14]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing code/evaluation.py


Next, create an instance of a ScriptProcessor processor and use it in the ProcessingStep.

In [15]:
from sagemaker.processing import ScriptProcessor


#ScriptProcessor allows to run a script on docker image.
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [16]:
from sagemaker.workflow.properties import PropertyFile


# The purpose of PropertyFile is to capture the output generated by a processing step in a SageMaker Pipeline. 
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval2",
    step_args=eval_args,
    property_files=[evaluation_report],
)

#### **Define a Create Model Step to Create a Model**

In [17]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [18]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="AbaloneCreateModel2",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

#### **Define a Transform Step to Perform Batch Transformation**

In [19]:
from sagemaker.transformer import Transformer

#Se cambió el tipo de instancia a una opción que fuera admitida. antes se tenía ml.m4.xlarge y se cambió por ml.m5.xlarge
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform2",
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [20]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform2", transformer=transformer, inputs=TransformInput(data=batch_data)
)

#### **Define a Register Model Step to Create a Model Package**

In [21]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

#Las instancias para inferencia están limitadas a ciertas opciones

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel2", step_args=register_args)



#### **Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed**

In [22]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AbaloneMSEFail2",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

#### **Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State**

In [23]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond2",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

#### **Define a Pipeline of Parameters, Steps, and Conditions**

In [24]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline3"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


#### **(Optional) Examining the pipeline definition**

In [25]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-714430189195/abalone2/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-714430189195/abalone2/abalone-dataset-batch'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess2',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.t3.medium',
      'InstanceCount': {'Get': 'P

#### **Submit the pipeline to SageMaker and start execution**

In [26]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:714430189195:pipeline/AbalonePipeline3',
 'ResponseMetadata': {'RequestId': 'd535674b-8c7a-4f86-8e84-a2e8b3a9972a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd535674b-8c7a-4f86-8e84-a2e8b3a9972a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Thu, 21 Sep 2023 20:53:34 GMT'},
  'RetryAttempts': 0}}

In [27]:
#Start the pipeline and accept all the default parameters.
execution = pipeline.start()

#### **Pipeline Operations: Examining and Waiting for Pipeline Execution**

In [28]:
#Describe the pipeline execution.
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:714430189195:pipeline/AbalonePipeline3',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:714430189195:pipeline/AbalonePipeline3/execution/rb1ldcmhfe58',
 'PipelineExecutionDisplayName': 'execution-1695329616979',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline3',
  'TrialName': 'rb1ldcmhfe58'},
 'CreationTime': datetime.datetime(2023, 9, 21, 20, 53, 36, 849000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 9, 21, 20, 53, 36, 849000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:714430189195:user-profile/d-pbox5lou3d3h/ximena',
  'UserProfileName': 'ximena',
  'DomainId': 'd-pbox5lou3d3h'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:714430189195:user-profile/d-pbox5lou3d3h/ximena',
  'UserProfileName': 'ximena',
  'DomainId': 'd-pbox5lou3d3h'},
 'ResponseMetadata': {'RequestId': '11456052-efd2-483b-9209-

In [29]:
#Wait for the execution to complete.
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded

In [30]:
#List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.
execution.list_steps()

[{'StepName': 'AbaloneEval2',
  'StartTime': datetime.datetime(2023, 9, 21, 21, 10, 28, 959000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:714430189195:processing-job/pipelines-rb1ldcmhfe58-AbaloneEval2-c9vbgOGFYI'}}},
 {'StepName': 'AbaloneTrain2',
  'StartTime': datetime.datetime(2023, 9, 21, 21, 7, 2, 796000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 9, 21, 21, 10, 27, 959000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:714430189195:training-job/pipelines-rb1ldcmhfe58-AbaloneTrain2-bQpQBArXiL'}}},
 {'StepName': 'AbaloneProcess2',
  'StartTime': datetime.datetime(2023, 9, 21, 20, 53, 37, 948000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 9, 21, 21, 7, 1, 881000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn

In [None]:
#execution.stop()

#### **Examining the Evaluation**

In [31]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


NoSuchKey: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.

#### **Lineage**
Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
{'StepName': 'AbaloneProcess2', 'StartTime': datetime.datetime(2023, 9, 21, 20, 53, 37, 948000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 9, 21, 21, 7, 1, 881000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:714430189195:processing-job/pipelines-rb1ldcmhfe58-AbaloneProcess2-MC36r2wCvi'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...a30a3835288eef3af1fc96e/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...14430189195/abalone2/abalone-dataset.csv,Input,DataSet,ContributedTo,artifact
2,68331...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...rb1ldcmhfe58/AbaloneProcess2/output/test,Output,DataSet,Produced,artifact
4,s3://...mhfe58/AbaloneProcess2/output/validation,Output,DataSet,Produced,artifact
5,s3://...b1ldcmhfe58/AbaloneProcess2/output/train,Output,DataSet,Produced,artifact


{'StepName': 'AbaloneTrain2', 'StartTime': datetime.datetime(2023, 9, 21, 21, 7, 2, 796000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 9, 21, 21, 10, 27, 959000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:714430189195:training-job/pipelines-rb1ldcmhfe58-AbaloneTrain2-bQpQBArXiL'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...mhfe58/AbaloneProcess2/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...b1ldcmhfe58/AbaloneProcess2/output/train,Input,DataSet,ContributedTo,artifact
2,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...oneTrain2-bQpQBArXiL/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'AbaloneEval2', 'StartTime': datetime.datetime(2023, 9, 21, 21, 10, 28, 959000, tzinfo=tzlocal()), 'StepStatus': 'Executing', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:714430189195:processing-job/pipelines-rb1ldcmhfe58-AbaloneEval2-c9vbgOGFYI'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...1efdc9e2d552feb7e97dadd942/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...rb1ldcmhfe58/AbaloneProcess2/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...oneTrain2-bQpQBArXiL/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...023-09-21-20-52-59-166/output/evaluation,Output,DataSet,Produced,artifact
