# Iris Classification Pipeline with ZenML

This notebook demonstrates a ZenML pipeline for iris classification, including data loading, model training, evaluation, explainability, and data drift detection.

In [None]:
!zenml login https://d13d987c-zenml.cloudinfra.zenml.io

In [None]:
from zenml.client import Client

In [None]:
Client().activate_stack("default_with_s3")

In [None]:
!zenml stack describe 'local-aws-step-operator'
Client().activate_stack('local-aws-step-operator')

In [None]:
# !zenml stack describe 'aws-sagemaker-pipelines'
# Client().activate_stack('aws-sagemaker-pipelines')

In [None]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from zenml import step, log_metadata
from typing import Tuple, Dict, Any
from typing_extensions import Annotated

def safe_metadata(data: Any) -> Dict[str, Any]:
    """Create metadata dict with only supported types."""
    metadata = {"shape": data.shape}
    if isinstance(data, pd.DataFrame):
        metadata["columns"] = list(data.columns)
    return metadata


@step
def load_data() -> Tuple[
    Annotated[pd.DataFrame, "X_train"],
    Annotated[pd.DataFrame, "X_test"],
    Annotated[pd.Series, "y_train"],
    Annotated[pd.Series, "y_test"],
]:
    """Load the iris dataset and split into train and test sets."""
    iris = load_iris(as_frame=True)
    X = iris.data
    y = iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                        random_state=42)

    for name, data in [("X_train", X_train), ("X_test", X_test),
                       ("y_train", y_train), ("y_test", y_test)]:
        log_metadata(
            artifact_name=name,
            metadata={"dataset_info": safe_metadata(data)},
            infer_artifact=True,
        )

    return X_train, X_test, y_train, y_test

In [None]:
import pandas as pd
from sklearn.svm import SVC
from zenml import step, ArtifactConfig, log_metadata
from typing_extensions import Annotated
from zenml.config import ResourceSettings
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import SagemakerStepOperatorSettings
from zenml.enums import ArtifactType

@step(
    enable_cache=False,
    step_operator="aws-sagemaker-pipelines",
    settings={
         "step_operator.sagemaker": SagemakerStepOperatorSettings(estimator_args={"instance_type": "ml.p3.2xlarge"})
    }
)
def train_model(
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> Annotated[SVC, ArtifactConfig(name="model", artifact_type=ArtifactType.MODEL)]:
    """Train an SVM classifier."""
    model = SVC(kernel='rbf', probability=True)
    model.fit(X_train, y_train)
    train_accuracy = model.score(X_train, y_train)

    log_metadata(
        metadata={
            "training_metrics": {
                "train_accuracy": float(train_accuracy),
            },
            "model_info": {
                "model_type": type(model).__name__,
                "kernel": model.kernel,
            }
        },
        infer_model=True,
    )

    log_metadata(
        metadata={
            "model_details": {
                "type": type(model).__name__,
                "kernel": model.kernel,
                "n_support": model.n_support_.tolist(),
            }
        },
        artifact_name="model",
        infer_artifact=True,
    )

    return model

In [None]:
import pandas as pd
import numpy as np
from sklearn.svm import SVC
from zenml import step, log_metadata
from typing import Tuple
from typing_extensions import Annotated

@step
def evaluate_model(
    model: SVC,
    X_test: pd.DataFrame,
    y_test: pd.Series,
) -> Tuple[
    Annotated[np.ndarray, "predictions"],
    Annotated[np.ndarray, "probabilities"]
]:
    """Evaluate the model and make predictions."""
    test_accuracy = model.score(X_test, y_test)
    predictions = model.predict(X_test)
    probabilities = model.predict_proba(X_test)

    log_metadata(
        metadata={
            "evaluation_metrics": {
                "test_accuracy": float(test_accuracy),
            }
        },
        infer_model=True,
    )

    log_metadata(
        metadata={
            "prediction_info": {
                "shape": predictions.shape,
                "unique_values": np.unique(predictions).tolist()
            }
        },
        artifact_name="predictions",
        infer_artifact=True,
    )

    log_metadata(
        metadata={
            "probability_info": {
                "shape": probabilities.shape,
                "min": float(np.min(probabilities)),
                "max": float(np.max(probabilities))
            }
        },
        artifact_name="probabilities",
        infer_artifact=True,
    )

    return predictions, probabilities

In [None]:
import os
import io
import pandas as pd
import shap
from sklearn.svm import SVC
from zenml import step, log_metadata
from typing import Dict
from typing_extensions import Annotated
import matplotlib.pyplot as plt

from zenml.enums import ArtifactType, VisualizationType
from zenml.io import fileio
from zenml.materializers.base_materializer import BaseMaterializer


class SHAPVisualization:
    def __init__(self, shap_values, feature_names):
        self.shap_values = shap_values
        self.feature_names = feature_names

@step
def explain_model(
    model: SVC,
    X_train: pd.DataFrame
) -> Annotated[SHAPVisualization, "shap_visualization"]:
    """Generate SHAP values for model explainability and create a visualization."""
    explainer = shap.KernelExplainer(
        model.predict_proba,
        shap.sample(X_train, 100)
    )
    shap_values = explainer.shap_values(X_train.iloc[:100])

    log_metadata(
        metadata={
            "shap_info": {
                "shape": [arr.shape for arr in shap_values],
                "n_classes": len(shap_values),
                "n_features": shap_values[0].shape[1],
            }
        },
        artifact_name="shap_visualization",
        infer_artifact=True,
    )

    return SHAPVisualization(shap_values, X_train.columns)

In [None]:
import pandas as pd
from scipy.stats import ks_2samp
from zenml import step
from zenml import log_artifact_metadata
from typing import Dict
from typing_extensions import Annotated

@step
def detect_data_drift(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
) -> Annotated[Dict[str, float], "drift_metrics"]:
    """Detect data drift between training and test sets."""
    drift_metrics = {}
    for column in X_train.columns:
        _, p_value = ks_2samp(X_train[column], X_test[column])
        drift_metrics[column] = p_value

    log_metadata(
        metadata={
            "drift_summary": {
                "high_drift_features": [col for col, p in drift_metrics.items() if p < 0.05]
            }
        },
        artifact_name="drift_metrics",
        infer_artifact=True,
    )

    return drift_metrics

In [None]:
from zenml import pipeline, Model
from zenml.config import DockerSettings

@pipeline(
    settings={
        # "docker": DockerSettings(python_package_installer="uv",
        # requirements="requirements.txt"),
        # "resources": ResourceSettings(memory="8GB"),
    },
    model=Model(name="high_risk_classification")
)
def iris_classification_pipeline():
    X_train, X_test, y_train, y_test = load_data()
    model = train_model(X_train, y_train)
    evaluate_model(model, X_test, y_test)
    explain_model(model, X_train)
    drift_metrics = detect_data_drift(X_train, X_test)

In [None]:
# Run the pipeline
pipeline_run = iris_classification_pipeline()