In [48]:
import logging
import pickle
import sys
from typing import List, NamedTuple


import joblib
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from ucimlrepo import fetch_ucirepo


from kfp.v2 import compiler, dsl
from kfp.components import OutputPath, InputPath
from kfp.v2.dsl import Input, Output, Dataset, Model
import json

from google.cloud import aiplatform

In [49]:
logging.basicConfig(level="INFO", stream=sys.stdout)

In [50]:

logging.basicConfig(level="INFO", stream=sys.stdout)

model_path = "./model.pkl"
def save_model_pickle(model, filename):
    """
    Saves a ML model in pickle format
    Args: model - The model bytes object to be saved
          filename - The filename
    """
    with open(filename, 'wb') as pickle_file:
        pickle.dump(model, pickle_file)


def get_iris_data():
    """
    Get iris dataset from UCI reposiory
    Returns: 
        X  - Dataframe containing 4 features regrding iris characteristics
        y - The target array for the iris classification 
    """
    data_iris = fetch_ucirepo(id=53) 
    X = data_iris.data.features 
    X.rename(columns = {
        'sepal length' : 'sepal_length',
        'sepal width' : 'sepal_width',
        'petal length' : 'petal_length',
        'petal width': 'petal_width'
    }, inplace=True)
    print(type(X))
    y = data_iris.data.targets['class']
    return X, y


def get_training_pipeline():
    "Defines training pipeline steps and returns the pipeline"
    pipeline = Pipeline(steps = [
        ('Imputer', SimpleImputer(strategy='mean', keep_empty_features=True)),
        ('normalization', StandardScaler()),
        ('estimator', LogisticRegression() )
    ]
    )
    return pipeline

def fit_model(X_train, y_train):
    """
    Fit a ML pipeline using GridSearchCV with 5 folds
    Args: X_train - Dataset to be trained
          y_train - Target column 
    Returns: model - ML pipeline fitted
    """
    parameters = {
        'estimator__solver': ['newton-cg'],
        'estimator__tol': [ 0.0001, 0.003, 0.01],
        'estimator__penalty': [None, 'l2'],
    }

    pipeline = get_training_pipeline()
    model = GridSearchCV(estimator=pipeline,
                            param_grid=parameters,
                            scoring= {"AUC": "roc_auc_ovr"},
                            refit="AUC",
                            cv=5,
                            verbose=1,
                            error_score='raise')
    model = model.fit(X_train, y_train)
    return model


def run_training_pipeline():
    """
        Runs the trainin pipelie for building a classifier for iris dataset and
        saves the trained model
    """

    logging.info(f"Getting dataset")
    X, y = get_iris_data()

    logging.info(f"Spliting dataset")
    X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=14)

    logging.info(f"Fittig model with train data")
    cv_model = fit_model(X_train, y_train)
    y_pred = cv_model.predict(X_test)

    logging.info(f"Computing scores")
    model_score = cv_model.score(X_test, y_test)
    logging.info(f"Model AUC Score: {model_score}")

    test_acc_score = accuracy_score(y_test, y_pred)
    logging.info(f"Accuracy test score: {test_acc_score}")

    logging.info("Saving model")
    save_model_pickle(cv_model, model_path)

if __name__ == '__main__':
    run_training_pipeline()

INFO:root:Getting dataset


<class 'pandas.core.frame.DataFrame'>
INFO:root:Spliting dataset
INFO:root:Fittig model with train data


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X.rename(columns = {


Fitting 5 folds for each of 6 candidates, totalling 30 fits
INFO:root:Computing scores
INFO:root:Model AUC Score: 1.0
INFO:root:Accuracy test score: 0.9666666666666667
INFO:root:Saving model


Creating the Pipeline

In [72]:
@dsl.component(packages_to_install=["ucimlrepo==0.0.7","fastparquet==2023.7.0"], base_image="python:3.9")
def load_data()-> Dataset:
    """
    Get iris dataset from UCI reposiory
    Returns: 
        X  - Dataframe containing 4 features regrding iris characteristics
        y - The target array for the iris classification 
    """
    import logging
    from ucimlrepo import fetch_ucirepo

    logging.info("Getting Dataset")
    data_iris = fetch_ucirepo(id=53) 
    X_array = data_iris.data.features 
    X_array.rename(columns = {
        'sepal length' : 'sepal_length',
        'sepal width' : 'sepal_width',
        'petal length' : 'petal_length',
        'petal width': 'petal_width'
    }, inplace=True)
    y_array = data_iris.data.targets['class']

    X_array['target'] = y_array

    data_artifact = Dataset(name="dataset", metadata={})
    X_array.to_csv(data_artifact.path)

    return data_artifact


In [103]:
@dsl.component(packages_to_install=["scikit-learn==1.5.2"], base_image="python:3.9")
def set_training_pipeline(pipeline_out: Output[Model])->Model:
    """
    Defines training pipeline steps and returns the pipeline
    """

    import logging
    import joblib
    from sklearn.impute import SimpleImputer
    from sklearn.linear_model import LogisticRegression
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler
    
    pipeline = Pipeline(steps = [
        ('Imputer', SimpleImputer(strategy='mean', keep_empty_features=True)),
        ('normalization', StandardScaler()),
        ('estimator', LogisticRegression() )
        ]
    )
    # model_artifact = Model(name="model", metadata={}, uri=dsl.get_uri())
    joblib.dump(pipeline, pipeline_out.path)


In [98]:
@dsl.component(packages_to_install=["scikit-learn==1.5.2"], base_image="python:3.9")
def train_model(
    dataset: Input[Dataset],
    pipeline: Input[Model],
)->Model:
    import logging
    import joblib
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import GridSearchCV, train_test_split

    logging.info(f"Spliting dataset")
    X_train, X_test, y_train, y_test = train_test_split(X.drop(columns="target"), dataset["target"], stratify=dataset["target"], test_size=0.2, random_state=14)

    logging.info(f"Fittig model with train data")


    parameters = {
        'estimator__solver': ['newton-cg'],
        'estimator__tol': [ 0.0001, 0.003, 0.01],
        'estimator__penalty': [None, 'l2'],
    }

    model = GridSearchCV(estimator=pipeline,
                            param_grid=parameters,
                            scoring= {"AUC": "roc_auc_ovr"},
                            refit="AUC",
                            cv=5,
                            verbose=1,
                            error_score='raise')
    
    pipeline = joblib.load(pipeline)
    model = pipeline.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    logging.info(f"Computing scores")
    model_score = model.score(X_test, y_test)
    logging.info(f"Model AUC Score: {model_score}")

    test_acc_score = accuracy_score(y_test, y_pred)
    logging.info(f"Accuracy test score: {test_acc_score}")

    logging.info("Saving model")
    model_artifact = Model(name="model", metadata={})
    joblib.dump(model, model_artifact.path)

In [121]:
PIPELINE_ROOT=""
@dsl.pipeline(
    name="my-pipeline-",
    description="A class project",
    pipeline_root=PIPELINE_ROOT
)
def my_pipeline_func():
    load_data_component = load_data()
    set_training_pipe_component = set_training_pipeline(
        
    ).after(load_data_component)

    fit_model_component = train_model(
        dataset = load_data_component.output,
        pipeline = set_training_pipe_component.outputs['pipeline_out']
    ).after(set_training_pipe_component)


    # log_model_component = log_model(
    #     fit_model_component.outputs.
        
    # ).after(fit_model_component)

In [119]:
def compile_pipeline(pipeline_func):   
    compiler.Compiler().compile(
        pipeline_func=pipeline_func,
        package_path="./temp/my_pipeline.json")

In [120]:
compile_pipeline(my_pipeline_func)

TypeError: expected str, bytes or os.PathLike object, not PipelineParam

In [102]:
#Running pipeline

PIPELINE_ROOT = "temp"
aiplatform.init(project="personal-448814",
                location="us-central1",
                staging_bucket=f"gs://kfp_bucket_pipeline/{PIPELINE_ROOT}/")
job = aiplatform.PipelineJob(
    display_name="A pipeline for a class project",
    template_path="./temp/my_pipeline.json",
    pipeline_root=f"gs://kfp_bucket_pipeline/{PIPELINE_ROOT}/",
    project="personal-448814",
    location="us-central1",
)
job.run(service_account="personal@personal-448814.iam.gserviceaccount.com")

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/367188317574/locations/us-central1/pipelineJobs/my-pipeline-20250124141646
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/367188317574/locations/us-central1/pipelineJobs/my-pipeline-20250124141646')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/my-pipeline-20250124141646?project=367188317574
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/367188317574/locations/us-central1/pipelineJobs/my-pipeline-20250124141646 current state:
3


KeyboardInterrupt: 

In [36]:
f"gs://kfp_bucket_pipeline/{PIPELINE_ROOT}/"

'gs://kfp_bucket_pipeline/temp/'