In [1]:
# %pip install mlflow==1.30.0 boto3==1.26.118

In [2]:
import os
import mlflow

from kfp import dsl, compiler
from kfp.components import load_component_from_url, InputPath, OutputPath, create_component_from_func

from kubernetes.client.models import V1EnvVar

In [3]:
os.environ["AWS_ACCESS_KEY_ID"] = "hungdv"
os.environ["AWS_SECRET_ACCESS_KEY"] = "hungdv123"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = f"http://10.10.6.141:9190" 

mlflow.set_tracking_uri("http://mlflow.mlflow.svc.cluster.local:5000")
tracking_uri = mlflow.get_tracking_uri()
print("Current tracking uri: {}".format(tracking_uri))

Current tracking uri: http://mlflow.mlflow.svc.cluster.local:5000


### Download data

In [4]:
web_downloader_op = load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml"
)

### Pre-process

In [5]:
def preprocess(file_path: InputPath("CSV"), output_file: OutputPath("CSV")):
    import pandas as pd

    header = [
        "age",
        "workclass",
        "fnlwgt",
        "education",
        "education_num",
        "marital_status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "capital_gain",
        "capital_loss",
        "hours_per_week",
        "native_country",
        "income",
    ]
    df = pd.read_csv(file_path, header=None, names=header)
    # encode categorical data as integers
    categorical_columns = [
        "age",
        "workclass",
        "education",
        "marital_status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "native_country",
        "income",
    ]
    df[categorical_columns] = df[categorical_columns].apply(
        lambda x: x.astype("category").cat.codes, axis=0
    )
    df.to_csv(output_file, index=False)

preprocess_op = create_component_from_func(
    func=preprocess, 
    # base_image="python:3.9", 
    packages_to_install=["pandas==1.2.4"]
)

### Train

Log name of *experiment, run, model_name*, ... to MLflow

In [6]:
def train(file_path: InputPath("CSV")) -> str:
    import mlflow
    import pandas as pd
    from sklearn.neural_network import MLPClassifier
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(file_path)

    labels_column = "income"
    train_x, test_x, train_y, test_y = train_test_split(
        df.drop([labels_column], axis=1), 
        df[labels_column], 
        random_state=69
    )
    
    mlflow.set_experiment(experiment_name="Income")
    with mlflow.start_run(run_name="still_income_training"):
        alpha, hidden_layers = 2e-3, (6, 4)
        mlp = MLPClassifier(
            solver="lbfgs",
            alpha=alpha,
            hidden_layer_sizes=hidden_layers,
            random_state=69,
        )

        mlflow.log_param("alpha", alpha)
        mlflow.log_param("hidden_layers", hidden_layers)

        mlp.fit(train_x, train_y)

        preds = mlp.predict(test_x)

        accuracy = (test_y == preds).sum() / preds.shape[0]
        mlflow.log_metric("accuracy", accuracy)

        result = mlflow.sklearn.log_model(
            sk_model=mlp,
            artifact_path="model",
            registered_model_name="still_income_model",
        )
        return f"{mlflow.get_artifact_uri()}/{result.artifact_path}"

train_op = create_component_from_func(
    func=train,
    # base_image="python:3.9",
    packages_to_install=["mlflow==1.30.0", "pandas==1.2.4", "scikit-learn==0.24.2", "boto3==1.26.118"],
)

### Pipeline

In [7]:
@dsl.pipeline(
    name="income_pipeline",
    description="Pipeline for training and deploying a model trained on Census Income dataset",
)
def income_pipeline():
    downloader_task = web_downloader_op(
        url="https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
    )
    
    preprocess_task = preprocess_op(file=downloader_task.outputs["data"])
    
    train_task = train_op(file=preprocess_task.outputs["output"]).add_env_variable(
        V1EnvVar(
            name="MLFLOW_TRACKING_URI",
            value="http://mlflow.mlflow.svc.cluster.local:5000",
        )
    ).add_env_variable(
        V1EnvVar(
            name="MLFLOW_S3_ENDPOINT_URL",
            value="http://10.10.6.141:9190",
        )
    ).add_env_variable(
        V1EnvVar(
            name="AWS_ACCESS_KEY_ID",
            value="hungdv",
        )
    ).add_env_variable(
        V1EnvVar(
            name="AWS_SECRET_ACCESS_KEY",
            value="hungdv123",
        )
    )

### Run

Compile pipeline to get *package_path*

In [8]:
compiler.Compiler().compile(pipeline_func=income_pipeline, package_path="income_2.1.yaml")