# Train Pipeline (SageMaker Pipelines)- Arch One

## Overview

The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train the model.

![](images/pipelines.PNG)

Train Pipeline consists of the following steps:

1. Preprocess data to build features required and split data into train, validation, and test datasets.
2. Apply hyperparameter tuning based on the ranges provided with the SageMaker LightGBM framework to give the best model, which is determined based on AUC score.

In [None]:
%%sh
pip install sagemaker -U
pip install boto3 -U

#### Import data

In [None]:
import pandas as pd
df=pd.read_csv("churn.txt")
df.to_csv("churn.csv",index=False)
bucket = "fairstone" #Bucket Name

#### Uplaod training Data to S3

In [None]:
!aws s3 cp churn.csv s3://{bucket}/ml_training/

### Step 1: Import statements

In [None]:
import boto3 
import pandas as pd 
import sagemaker 
from sagemaker.workflow.pipeline_context import PipelineSession 
import json
s3_client = boto3.resource('s3') 
pipeline_name = "LightGBM-ML-Pipeline-Test" 
sagemaker_session = sagemaker.session.Session(default_bucket=bucket) 
region = sagemaker_session.boto_region_name 
role = sagemaker.get_execution_role() 
pipeline_session = PipelineSession() 

# Step 1b: Modify Config File

#### Here we use a configuration file to set the defaults for our Pipeline parameters

Change the value of these config file (bucket name, mlflow tracking server etc.) to ones that match your account resources 

In [None]:
with open("train_config.json", "r") as f:
    print(f"Here is a preview of the configuration file:\n\n {json.loads(f.read())}")

## Step 2: Define SageMaker Pipeline Parameters 

SageMaker Pipelines supports parameterization. This allows ausers to alter the values of each parameters for each initiated pipeline execution. You can add or remove parameters.

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)
from sagemaker.workflow.functions import Join

# Read Default Config from Configuration File
with open("train_config.json", "r") as f:
    training_config=json.load(f)
    
# LightGBM tunable parameters for SageMaker Pipelines
learning_rate_min = ParameterFloat(name="LearningRateMin", default_value=float(training_config["LearningRateMin"]))
learning_rate_max = ParameterFloat(name="LearningRateMax", default_value=float(training_config["LearningRateMax"]))

num_boost_round_min = ParameterInteger(name="NumberOfBoostRoundMin", default_value=int(training_config["NumberOfBoostRoundMin"]))
num_boost_round_max = ParameterInteger(name="NumberOfBoostRoundMax", default_value=int(training_config["NumberOfBoostRoundMax"]))

num_leaves_min = ParameterInteger(name="NumLeavesMin", default_value=int(training_config["NumLeavesMin"]))
num_leaves_max = ParameterInteger(name="NumLeavesMax", default_value=int(training_config["NumLeavesMax"]))

feature_fraction_min = ParameterFloat(name="FeatureFractionMin", default_value=float(training_config["FeatureFractionMin"]))
feature_fraction_max = ParameterFloat(name="FeatureFractionMax", default_value=float(training_config["FeatureFractionMax"]))

bagging_fraction_min = ParameterFloat(name="BaggingFractionMin", default_value=float(training_config["BaggingFractionMin"]))
bagging_fraction_max = ParameterFloat(name="BaggingFractionMax", default_value=float(training_config["BaggingFractionMax"]))

bagging_freq_min = ParameterInteger(name="BaggingFreqMin", default_value=int(training_config["BaggingFreqMin"]))
bagging_freq_max = ParameterInteger(name="BaggingFreqMax", default_value=int(training_config["BaggingFreqMax"]))

max_depth_min = ParameterInteger(name="MaxDepthMin", default_value=int(training_config["MaxDepthMin"]))
max_depth_max = ParameterInteger(name="MaxDepthMax", default_value=int(training_config["MaxDepthMax"]))

min_data_in_leaf_min = ParameterInteger(name="MinDataInLeafMin", default_value=int(training_config["MinDataInLeafMin"]))
min_data_in_leaf_max = ParameterInteger(name="MinDataInLeafMax", default_value=int(training_config["MinDataInLeafMax"]))

tuner_objective_metric = ParameterString(name="TunerObjectiveMetric", default_value=training_config["TunerObjectiveMetric"])
tuner_metric_definition = ParameterString(name="TunerMetricDefinition", default_value=training_config["TunerMetricDefinition"])
algo_metric = ParameterString(name="AlgorithmMetric", default_value=training_config["AlgorithmMetric"])

max_tuning_jobs = ParameterInteger(name="MaxTuningJobs", default_value=int(training_config["MaxTuningJobs"]))
max_tuning_parallel_job = ParameterInteger(name="TuningParallelJobs", default_value=int(training_config["TuningParallelJobs"]))
tuning_strategy = ParameterString(name="TuningStrategy", default_value=training_config["TuningStrategy"], enum_values=["Bayesian", "Random", "Grid", "Hyperband"])
optimization_direction = ParameterString(name="OptimizationDirection", default_value=training_config["OptimizationDirection"], enum_values=["Maximize", "Minimize"])
supervised_training_task = ParameterString(name="TrainingTask", default_value=training_config["TrainingTask"], enum_values=["classification", "regression"])

# Infra Parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=int(training_config["ProcessingInstanceCount"]))
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value=training_config["ProcessingInstanceType"])
training_instance_type = ParameterString(name="TrainingInstanceType", default_value=training_config["TrainingInstanceType"])
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=int(training_config["TrainingInstanceCount"]))
training_volume_size = ParameterInteger(name="TrainingVolumeSize", default_value=int(training_config["TrainingVolumeSize"]))
processing_volume_size = ParameterInteger(name="ProcessingVolumeSize", default_value=int(training_config["ProcessingVolumeSize"]))

# Artifacts location Parameters
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value=training_config["ModelApprovalStatus"])
model_output_bucket = ParameterString(name="ModelOutput", default_value=training_config["ModelOutput"])
train_output_bucket = ParameterString(name="TrainOutput", default_value=training_config["TrainOutput"])
validation_output_bucket = ParameterString(name="ValidationOutput", default_value=training_config["ValidationOutput"])
test_output_bucket = ParameterString(name="TestOutput", default_value=training_config["TestOutput"])
s3_input_data_location = ParameterString(name="S3InputDataURI", default_value=training_config["S3InputDataURI"])

# Mlflow
ml_flow_arn = ParameterString(name="MLflow", default_value=training_config["MLflow"])

model_evaluation_threshold = ParameterFloat(name="EvalThreshold", default_value=float(training_config["EvalThreshold"]))
data_split_ratio = ParameterString(name="DataSplitRatio", default_value=training_config["DataSplitRatio"])


### Step 3: Define Processing Step

#### Create a Logic for processing the data

In [None]:
%%writefile "pipeline_scripts/churn_preprocess.py"

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
import glob
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import io
import sys
import time
import json
from time import strftime, gmtime
from sklearn import preprocessing

def load_and_combine_csv_files(directory):
    """
    Load all CSV files from a directory and combine them into a single DataFrame.

    Args:
    directory (str): Path to the directory containing CSV files.

    Returns:
    pandas.DataFrame: Combined DataFrame of all CSV files.
    """
    # Use glob to get all the csv files in the folder
    csv_files = glob.glob(os.path.join(directory, "*.csv"))

    # List to hold individual DataFrames
    df_list = []

    total_rows = 0
    for file in csv_files:
        try:
            # Read each file into a DataFrame
            df = pd.read_csv(file)
            total_rows += len(df)
            df_list.append(df)
            print(f"Loaded {file}: {len(df)} rows")
        except Exception as e:
            print(f"Error loading {file}: {str(e)}")

    # Combine all DataFrames in the list
    combined_df = pd.concat(df_list, ignore_index=True)

    print(f"\nTotal files processed: {len(csv_files)}")
    print(f"Total rows in combined DataFrame: {len(combined_df)}")

    return combined_df



def detect_and_encode_categorical(df, max_categories=10, include_dates=True):
    """
    Detect categorical columns (including object, int, and datetime), encode them, 
    and create a mapping of their indexes. Excludes the first column (assumed to be the target).

    Args:
    df (pandas.DataFrame): Input DataFrame
    max_categories (int): Maximum number of unique values to consider a column categorical
    include_dates (bool): Whether to treat date columns as categorical

    Returns:
    tuple: (preprocessed DataFrame, dict of categorical column indexes, dict of label encoders)
    """
    categorical_columns = []
    categorical_indexes = {}
    label_encoders = {}

    # Get the name of the first column (assumed to be the target)
    target_column = df.columns[0]

    for idx, (col, dtype) in enumerate(df.dtypes.items()):
        # Skip the first column (target)
        if col == target_column:
            continue

        if (dtype == 'object' or 
            (df[col].nunique() <= max_categories and dtype != 'float64') or
            pd.api.types.is_integer_dtype(dtype) or
            (include_dates and pd.api.types.is_datetime64_any_dtype(dtype))):

            categorical_columns.append(col)
            categorical_indexes[col] = idx  # Adjust index to account for skipped target column

            # Handle datetime columns
            if pd.api.types.is_datetime64_any_dtype(dtype):
                if include_dates:
                    df[col] = df[col].dt.strftime('%Y-%m-%d')  # Convert to string format
                else:
                    continue  # Skip datetime columns if not included

            # Encode categorical variables
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            label_encoders[col] = le

    print(f"Detected {len(categorical_columns)} categorical columns: {categorical_columns}")
    return df, categorical_indexes




if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--split-ratio', type=str, default="0.3",dest='split_ratio')
    args = parser.parse_args()

    base_dir_input = "/opt/ml/processing/input"
    base_dir = "/opt/ml/processing/"
    #Read Data
    df = load_and_combine_csv_files(base_dir_input)
    # Sample Analysis 
    df = df.drop("Phone", axis=1)
    df["Area Code"] = df["Area Code"].astype(object)
    
    df["target"] = df["Churn?"].map({"True.": 1, "False.": 0})
    df.drop(["Churn?"], axis=1, inplace=True)
    
    df = df[["target"] + df.columns.tolist()[:-1]]
    # df = pd.concat([churn]*50, ignore_index=True)
    df, cat_columns = detect_and_encode_categorical(df, max_categories=10, include_dates=True)
    cat_idx = list(cat_columns.values())
    
    # Save categorical information
    with open(f"{base_dir}/train/cat_idx.json", "w") as outfile:
        json.dump({"cat_idx": cat_idx}, outfile)
        
    # train, test, validation
    train, val_n_test = train_test_split(
        df, test_size=float(args.split_ratio), random_state=42, stratify=df["target"]
    )
    validation, test = train_test_split(
        val_n_test, test_size=float(args.split_ratio), random_state=42, stratify=val_n_test["target"]
    )
    
    # Save datasets
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

In [None]:
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    volume_size_in_gb = processing_volume_size,
    base_job_name="sklearn-pre-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=s3_input_data_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                         destination = train_output_bucket),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                        destination = validation_output_bucket),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                        destination = test_output_bucket)
    ],
    code="pipeline_scripts/churn_preprocess.py",
    arguments =[
        "--split-ratio",data_split_ratio
    ],
)
step_process = ProcessingStep(name="LightGBMDataPreProcess", step_args=processor_args)

### Step 4: Define HyperParameter Tuning  Step for Classification

Here we define teh training and tuning estimator for lightGBM using SageMaker Pipeline paramters. This lets up modify the structure of the tuning job like `instance_count`, `instance_type`, `volume_size` etc as well as hyperparameters for the LightGBM algorithm

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)
from sagemaker.workflow.steps import TuningStep
from sagemaker.estimator import Estimator
import random
from sagemaker import image_uris, model_uris, script_uris
from sagemaker.workflow.steps import TrainingStep
train_model_id, train_model_version, train_scope = "lightgbm-classification-model", "*", "training"
from sagemaker import hyperparameters

# # Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)

# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)
# Retrieve the default hyper-parameters for fine-tuning the model
hyperparameters = hyperparameters.retrieve_default(
    model_id=train_model_id, model_version=train_model_version
)

# [Optional] Override default hyperparameters with custom values
hyperparameters["num_boost_round"] = "200"
hyperparameters["metric"] = algo_metric # pipeline parameter

# Recommended for distributed training
hyperparameters["tree_learner"] = "voting" 
del hyperparameters["early_stopping_rounds"]

print(hyperparameters)

# Create SageMaker Estimator instance
tabular_estimator = Estimator(
    role=role,
    image_uri=train_image_uri,
    source_dir= "model_cat", 
    model_uri=train_model_uri,
    entry_point="train.py", 
    instance_count= training_instance_count,  # pipeline paramter
    volume_size=training_volume_size,  # pipeline paramter
    instance_type=training_instance_type, # pipeline paramter
    max_run=360000,
    hyperparameters=hyperparameters,
    output_path=model_output_bucket,
    sagemaker_session=pipeline_session, # Tells it its part of a Sagemaker Pipeline and not to execute individually
    environment={"MLFLOW_TRACKING_ARN": ml_flow_arn}, # pipeline paramter
    keep_alive_period_in_seconds = 1000 #Keep instance warm for fast experimentation iteration else experience cold start for each trials (note you will incur cost of warm instances)
)

Here we also parameterize the HyperParameter tuning ranges for the job so that we can modify during SageMaker Pipeline execution

In [None]:
from sagemaker.tuner import ContinuousParameter, IntegerParameter, HyperparameterTuner

# Define hyperparameter ranges (Pipeline parameters)
hyperparameter_ranges_lgb = {
    "learning_rate": ContinuousParameter(learning_rate_min , learning_rate_max , scaling_type="Auto"),
    "num_boost_round": IntegerParameter(num_boost_round_min , num_boost_round_max),
    "num_leaves": IntegerParameter(num_leaves_min , num_leaves_max),
    "feature_fraction": ContinuousParameter(feature_fraction_min, feature_fraction_max),
    "bagging_fraction": ContinuousParameter(bagging_fraction_min, bagging_fraction_max),
    "bagging_freq": IntegerParameter(bagging_freq_min, bagging_freq_max),
    "max_depth": IntegerParameter(max_depth_min, max_depth_max),
    "min_data_in_leaf": IntegerParameter(min_data_in_leaf_min, min_data_in_leaf_max),
}



tuner = HyperparameterTuner(
    estimator = tabular_estimator,
    objective_metric_name = tuner_objective_metric, # pipeline paramter
    hyperparameter_ranges = hyperparameter_ranges_lgb,  # pipeline paramter
    metric_definitions = [{"Name": tuner_objective_metric, "Regex": Join(on=':',values=[tuner_objective_metric ," ([0-9\\.]+)" ] )}], # pipeline paramter
    max_jobs=max_tuning_jobs, # pipeline paramter
    max_parallel_jobs=max_tuning_parallel_job, # pipeline paramter
    objective_type=optimization_direction, # pipeline paramter
    strategy = tuning_strategy # pipeline paramter
) 

# Here we create an implicit dependencies between the processing step and Tuning step
hpo_args = tuner.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

step_tuning = TuningStep(
    name="LightGBMClassifierHyperParameterTuning",
    step_args=hpo_args,
)

### Step 5: Define Tunning  Step for Regression

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)
from sagemaker.workflow.steps import TuningStep
from sagemaker.estimator import Estimator
import random
from sagemaker import image_uris, model_uris, script_uris
from sagemaker.workflow.steps import TrainingStep
train_model_id, train_model_version, train_scope = "lightgbm-regression-model", "*", "training"
from sagemaker import hyperparameters

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)

# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)
# Retrieve the default hyper-parameters for fine-tuning the model
hyperparameters = hyperparameters.retrieve_default(
    model_id=train_model_id, model_version=train_model_version
)

# [Optional] Override default hyperparameters with custom values
hyperparameters["num_boost_round"] = "200"
hyperparameters["metric"] = algo_metric

# Recommended for distributed training
hyperparameters["tree_learner"] = "voting" 
del hyperparameters["early_stopping_rounds"]
print(hyperparameters)

# Create SageMaker Estimator instance
reg_estimator = Estimator(
    role=role,
    image_uri=train_image_uri,
    source_dir= "model_reg", 
    model_uri=train_model_uri,
    entry_point="train.py", 
    instance_count= training_instance_count,  
    volume_size=training_volume_size, 
    instance_type=training_instance_type,
    max_run=360000,
    hyperparameters=hyperparameters,
    output_path=model_output_bucket,
    sagemaker_session=pipeline_session, # Tells it its part of a Sagemaker Pipeline and not to execute individually
    environment={"MLFLOW_TRACKING_ARN": ml_flow_arn},
    keep_alive_period_in_seconds = 1000 #Keep instance warm for fast experimentation iteration else experience cold start for each trials (note you will incur cost of warm instances)
)

In [None]:
from sagemaker.tuner import ContinuousParameter, IntegerParameter, HyperparameterTuner


# Define hyperparameter ranges
hyperparameter_ranges_lgb = {
    "learning_rate": ContinuousParameter(learning_rate_min , learning_rate_max , scaling_type="Auto"),
  "num_boost_round": IntegerParameter(num_boost_round_min , num_boost_round_max),
    "num_leaves": IntegerParameter(num_leaves_min , num_leaves_max),
    "feature_fraction": ContinuousParameter(feature_fraction_min, feature_fraction_max),
    "bagging_fraction": ContinuousParameter(bagging_fraction_min, bagging_fraction_max),
    "bagging_freq": IntegerParameter(bagging_freq_min, bagging_freq_max),
    "max_depth": IntegerParameter(max_depth_min, max_depth_max),
    "min_data_in_leaf": IntegerParameter(min_data_in_leaf_min, min_data_in_leaf_max),
}

tuner_reg = HyperparameterTuner(
    estimator = reg_estimator,
    objective_metric_name = tuner_objective_metric,
    hyperparameter_ranges = hyperparameter_ranges_lgb, 
    metric_definitions = [{"Name": tuner_objective_metric, "Regex": Join(on=':',values=[tuner_objective_metric ," ([0-9\\.]+)" ] )}],
    max_jobs=max_tuning_jobs,
    max_parallel_jobs=max_tuning_parallel_job, 
    objective_type=optimization_direction,
    strategy = tuning_strategy)

# Here we create an implicit dependencies between the processing step and Tuning step
hpo_args_reg = tuner_reg.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

step_tuning_reg = TuningStep(
    name="LightGBMHyperParameterTuningRegression",
    step_args=hpo_args_reg,
)

### Step 6: Define a condition step to Route to the appropiate Hyperparameter Tuning Step (Regression vs Classification)

In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThan,ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Define Condition. Here we evaluate the condition based on the training task passed as a pipeline parameter 
cond_task = ConditionEquals(
    left=supervised_training_task,
    right="classification",
)

# Condition Step
"""
Here we create a condition syep to swith the branch based on training task type. 
Run Classifier tuner if its a classification model or Regression tuner if its a regression model
"""
step_cond = ConditionStep(
    depends_on = [step_process], # Depends on the processing step
    name="TrainingTaskTypes",
    conditions=[cond_task], 
    if_steps=[step_tuning], # If condition is true
    else_steps=[step_tuning_reg] # If condition is false
)    


## Build and Trigger the pipeline run

After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.

In [None]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        # LightGBM tunable parameters
        learning_rate_min,
        learning_rate_max,
        num_leaves_min,
        num_leaves_max,
        feature_fraction_min,
        feature_fraction_max,
        bagging_fraction_min,
        bagging_fraction_max,
        bagging_freq_min,
        bagging_freq_max,
        max_depth_min,
        max_depth_max,
        min_data_in_leaf_min,
        min_data_in_leaf_max,
        num_boost_round_max,
        num_boost_round_min,

        # Other parameters
        processing_volume_size,
        training_volume_size,
        tuner_metric_definition,
        tuner_objective_metric,
        algo_metric,
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        training_instance_count,
        model_approval_status,
        model_output_bucket,
        train_output_bucket,
        validation_output_bucket,
        test_output_bucket,
        max_tuning_jobs,
        max_tuning_parallel_job,
        tuning_strategy,
        optimization_direction,
        ml_flow_arn,
        supervised_training_task,
        model_evaluation_threshold,
        s3_input_data_location,
        data_split_ratio,
    ],
    steps=[step_cond], # we pass only the condition step as we have declared all steps as dependencies to the condition step
)

definition = json.loads(pipeline.definition())
print(definition)

In [None]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)
# start Pipeline execution
pipeline.start()

## Create a new pipeline execution with different parameters

In [None]:
# Lets modify the objective metric to "binary_error" and create a new pipeline execution. We use the Pipelien Parameter Names instead of teh varaible names
pipeline.start(
    parameters=dict(
        AlgorithmMetric="binary_error",
        TunerObjectiveMetric = "binary_error",
        TunerMetricDefinition="binary_error: ([0-9\\.]+)",
        OptimizationDirection = "Minimize"
    )
)