In [1]:
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, component
from azure.storage.blob import BlobServiceClient

In [2]:
@component(
    base_image='python:3.12.3',
    packages_to_install=['pandas', 'azure-storage-blob']
)
def download_data_from_azure(
    x_train: Output[Dataset],
    x_test: Output[Dataset],
    y_train: Output[Dataset],
    y_test: Output[Dataset],
    azure_connection_string: str,
    container_name: str = 'csvstorage',
    x_train_blob: str = "X_train.csv",
    x_test_blob: str = "X_test.csv",
    y_train_blob: str = "y_train.csv",
    y_test_blob: str = "y_test.csv"
):
    import pandas as pd
    from azure.storage.blob import BlobServiceClient

    client = BlobServiceClient.from_connection_string(azure_connection_string)

    def download_blob(blob_name, path):
        blob = client.get_blob_client(container_name, blob_name)
        with open(path, 'wb') as f:
            f.write(blob.download_blob().readall())

    download_blob(x_train_blob, x_train.path)
    download_blob(x_test_blob, x_test.path)
    download_blob(y_train_blob, y_train.path)
    download_blob(y_test_blob, y_test.path)


In [3]:
@component(
    base_image='python:3.12.3',
    packages_to_install=['minio']
)
def check_model_exists(model_exists: Output[Dataset], model_name: str = 'best_model.pkl'):
    from minio import Minio
    import os

    client = Minio('minio-service:9000', access_key='minio', secret_key='minio123', secure=False)
    exists = client.bucket_exists("models") and client.stat_object("models", model_name) is not None
    with open(model_exists.path, 'w') as f:
        f.write("true" if exists else "false")


In [4]:
@component(
    base_image='python:3.12.3',
    packages_to_install=['pandas', 'joblib', 'scikit-learn']
)
def evaluate_model(
    model_exists: Input[Dataset],
    x_test: Input[Dataset],
    y_test: Input[Dataset],
    eval_score: Output[Dataset],
    model_name: str = 'best_model.pkl'
):
    import pandas as pd
    import joblib
    from sklearn.metrics import r2_score
    from minio import Minio
    import os

    with open(model_exists.path, 'r') as f:
        if f.read().strip() != 'true':
            with open(eval_score.path, 'w') as out:
                out.write("0")
            return

    client = Minio('minio-service:9000', access_key='minio', secret_key='minio123', secure=False)
    client.fget_object("models", model_name, "downloaded_model.pkl")
    model = joblib.load("downloaded_model.pkl")

    X_test = pd.read_csv(x_test.path)
    y_test_data = pd.read_csv(y_test.path).values.ravel()

    y_pred = model.predict(X_test)
    r2 = r2_score(y_test_data, y_pred)

    with open(eval_score.path, 'w') as out:
        out.write(str(r2))


In [5]:
@component(
    base_image='python:3.12.3',
    packages_to_install=[
        'pandas',
        'mlflow',
        'scikit-learn',
        'joblib',
        'minio',
        'dagshub',
        'requests'
    ]
)
def train_model(
    x_train: Input[Dataset],
    y_train: Input[Dataset],
    x_test: Input[Dataset],
    y_test: Input[Dataset],
    trained_model: Output[Model],
    pushgateway_url: str = "http://pushgateway-prometheus-pushgateway.monitoring.svc.cluster.local:9091"
):
    import pandas as pd
    import mlflow
    import joblib
    import requests
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import r2_score
    from minio import Minio
    import dagshub

    # Authenticate to DagsHub MLflow
    dagshub.auth.add_app_token("c1b64f0e0a5268dae2ca62d0ae4bec20fdecb445")
    dagshub.init(repo_owner='manish-bagdwal1', repo_name='MLOps-Pipeline-Local-Batch-Training', mlflow=True)

    mlflow.set_tracking_uri('https://dagshub.com/manish-bagdwal1/MLOps-Pipeline-Local-Batch-Training.mlflow')
    mlflow.set_experiment('kubeflow_experiment')

    X_train = pd.read_csv(x_train.path)
    y_train_data = pd.read_csv(y_train.path).values.ravel()
    X_test = pd.read_csv(x_test.path)
    y_test_data = pd.read_csv(y_test.path).values.ravel()

    model = RandomForestRegressor(n_estimators=100, random_state=42)

    with mlflow.start_run(run_name="RandomForest"):
        model.fit(X_train, y_train_data)
        predictions = model.predict(X_test)
        r2 = r2_score(y_test_data, predictions)

        mlflow.log_param("n_estimators", 100)
        mlflow.log_metric("r2", r2)
        mlflow.sklearn.log_model(model, "random_forest_model")

        # Save model locally
        joblib.dump(model, trained_model.path)

        # Upload model to MinIO
        client = Minio('minio-service:9000', access_key='minio', secret_key='minio123', secure=False)
        if not client.bucket_exists("models"):
            client.make_bucket("models")
        client.fput_object("models", "best_model.pkl", trained_model.path)

        # Push R2 to Prometheus Pushgateway
        job_name = "fuel_model_training"
        metric_name = "model_r2"
        metric_value = r2
        payload = f"{metric_name} {metric_value}\n"

        response = requests.post(
            f"{pushgateway_url}/metrics/job/{job_name}",
            data=payload,
            headers={"Content-Type": "text/plain"}
        )

        if response.status_code == 202:
            print("R2 pushed to Prometheus Pushgateway")
        else:
            print(f"Failed to push metric. Status: {response.status_code}")


In [6]:
@dsl.pipeline(name='mlops-kubeflow-pipeline')
def mlops_pipeline(
    azure_connection_string: str,
    r2_threshold: float = 0.8
):
    download_step = download_data_from_azure(
        azure_connection_string=azure_connection_string
    )

    check_model = check_model_exists()

    evaluate = evaluate_model(
        model_exists=check_model.outputs['model_exists'],
        x_test=download_step.outputs['x_test'],
        y_test=download_step.outputs['y_test']
    )

    with dsl.If(evaluate.outputs['eval_score'] == '0', name='NoModelCondition'):
        train_model(
            x_train=download_step.outputs['x_train'],
            y_train=download_step.outputs['y_train'],
            x_test=download_step.outputs['x_test'],
            y_test=download_step.outputs['y_test']
        )

    with dsl.If(evaluate.outputs['eval_score'].as_number() < r2_threshold, name='LowR2Condition'):
        train_model(
            x_train=download_step.outputs['x_train'],
            y_train=download_step.outputs['y_train'],
            x_test=download_step.outputs['x_test'],
            y_test=download_step.outputs['y_test']
        )


AttributeError: 'PipelineArtifactChannel' object has no attribute 'as_number'

In [None]:
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(mlops_pipeline, 'kubeflow_mlflow_pipeline_v16.yaml')