In [None]:
import kfp
from kfp import dsl
from kfp.dsl import InputPath, OutputPath

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["requests==2.32.3", "pandas==2.2.2"]
)
def download_dataset(
    url: str,
    dataset_path: OutputPath()
) -> None:
    import requests
    import pandas as pd
    response = requests.get(url)
    response.raise_for_status()
    from io import StringIO
    dataset = pd.read_csv(StringIO(response.text), header=0, sep=",")
    dataset.to_csv(dataset_path, index=False)

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas==2.2.2", "pyarrow==15.0.2"]
)
def preprocess_dataset(
    dataset: InputPath(),
    output_file: OutputPath()
) -> None:
    import pandas as pd
    df = pd.read_csv(dataset, header=0)
    df.columns = [c.lower().replace(" ", "_") for c in df.columns]
    df.to_parquet(output_file)

@dsl.component(
    base_image="pytorch/pytorch:2.1.0-cuda11.8-cudnn8-devel",
    packages_to_install=[
        "pandas==2.2.2", "torch==2.1.0", "mlflow==2.15.1", "scikit-learn==1.5.0", "torch-model-archiver==0.8.2", "minio==7.2.5"
    ]
)
def train_pytorch_model(
    dataset: InputPath(),
    run_name: str,
    model_name: str,
    weights_path: OutputPath(),        # PyTorch weights
    torchscript_path: OutputPath(),    # TorchScript model (.pt)
    model_uri_path: OutputPath()       # Model artifact URI as text
) -> None:
    import os
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import pandas as pd
    import mlflow
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import pathlib
    import subprocess

    # Load data
    df = pd.read_parquet(dataset)
    X = df.drop(columns=['quality']).values
    y = df['quality'].values.reshape(-1, 1)

    # Preprocess
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

    # Convert to PyTorch tensors
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.float32)

    # Move to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    X_train, y_train = X_train.to(device), y_train.to(device)
    X_test, y_test = X_test.to(device), y_test.to(device)

    # Define model
    model = nn.Linear(X_train.shape[1], 1)
    model = model.to(device)

    # Train
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    for epoch in range(100):
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

    # Save PyTorch weights (state_dict)
    torch.save(model.state_dict(), weights_path)

    # Save TorchScript model locally
    scripted_model = torch.jit.trace(model.cpu(), X_train.cpu()[:1])
    scripted_model.save(torchscript_path)

    # --- Archive the model using torch-model-archiver ---
    # Create a minimal handler if needed
    handler_code = """
import torch
import os
import json
from ts.torch_handler.base_handler import BaseHandler

class WineQualityHandler(BaseHandler):
    def __init__(self):
        super().__init__()
        self.initialized = False

    def initialize(self, context):
        # Standard TorchServe model loading for TorchScript
        self.manifest = context.manifest
        properties = context.system_properties
        model_dir = properties.get("model_dir")
        serialized_file = self.manifest['model']['serializedFile']
        model_pt_path = os.path.join(model_dir, serialized_file)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = torch.jit.load(model_pt_path, map_location=self.device)
        self.model.eval()
        self.initialized = True

    def preprocess(self, data):
        # Accepts JSON {"data": [[feature1, feature2, ...], ...]}
        # or {"data": [feature1, feature2, ...]} for single sample
        if isinstance(data, list) and len(data) > 0:
            input_data = data[0].get('data')
            if isinstance(input_data[0], list):
                # Batch
                tensor = torch.tensor(input_data, dtype=torch.float32)
            else:
                # Single sample
                tensor = torch.tensor([input_data], dtype=torch.float32)
            return tensor.to(self.device)
        else:
            raise ValueError("Input data must be a JSON list with a 'data' key.")

    def inference(self, input_tensor):
        with torch.no_grad():
            output = self.model(input_tensor)
        return output

    def postprocess(self, inference_output):
        # Convert output tensor to list
        return inference_output.cpu().numpy().tolist()

    def handle(self, data, context):
        # Main entry point for TorchServe
        input_tensor = self.preprocess(data)
        output = self.inference(input_tensor)
        result = self.postprocess(output)
        return result
"""
    handler_path = "wine_quality_handler.py"
    with open(handler_path, "w") as f:
        f.write(handler_code)

    mar_name = f"{model_name}.mar"
    mar_path = f"model-store/{mar_name}"
    os.makedirs("model-store", exist_ok=True)

    # Run torch-model-archiver
    subprocess.run([
        "torch-model-archiver",
        "--model-name", model_name,
        "--version", "1.0",
        "--serialized-file", str(torchscript_path),
        "--handler", handler_path,
        "--version", "1.0",
        "--export-path", "model-store",
        "--force"
    ], check=True)

    # --- TorchServe config.properties ---
    config_dir = pathlib.Path("config")
    config_dir.mkdir(parents=True, exist_ok=True)
    config_path = config_dir / "config.properties"
    config_content = (
        "model_store=model-store\n"
        f"load_models={mar_name}\n"
        "enable_envvars_config=true\n"
        "disable_token_authorization=true\n"
    )
    with open(config_path, "w") as f:
        f.write(config_content)

    # --- Upload .mar and config.properties to MinIO ---
    from minio import Minio

    # MinIO connection settings (set via env or hardcoded for demo)
    minio_endpoint = os.getenv("MINIO_ENDPOINT", "minio-service.kubeflow:9000")
    minio_access_key = os.getenv("MINIO_ACCESS_KEY", "minio")
    minio_secret_key = os.getenv("MINIO_SECRET_KEY", "minio123")
    minio_bucket = os.getenv("MINIO_BUCKET", "mlpipeline")
    minio_secure = False  # Set to True if using HTTPS

    minio_client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=minio_secure
    )

    # Ensure bucket exists
    if not minio_client.bucket_exists(minio_bucket):
        minio_client.make_bucket(minio_bucket)

    # Upload .mar to model-store/ in MinIO
    minio_client.fput_object(
        minio_bucket,
        f"model-store/{mar_name}",
        mar_path
    )

    # Upload config.properties to config/ in MinIO
    minio_client.fput_object(
        minio_bucket,
        "config/config.properties",
        str(config_path)
    )

    # Log to MLflow (optional, for experiment tracking)
    mlflow.pytorch.autolog()
    with mlflow.start_run(run_name=run_name) as run:
        mlflow.log_param("model_type", "PyTorchLinearRegression")
        mlflow.pytorch.log_model(model, "model", registered_model_name=model_name)
        model_uri = f"{run.info.artifact_uri}/model"
        with open(model_uri_path, "w") as f:
            f.write(model_uri)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["kserve==0.15.0", "kubernetes==29.0.0", "nvgpu==0.10.0"]
)
def serve_pytorch_model(
    service_name: str = "wine-quality-pytorch",
    namespace: str = "kubeflow-user-example-com"
):
    """
    Deploys a trained PyTorch model using KServe from MinIO.
    Assumes model is stored in:
      s3://mlpipeline/mnt/models/model-store/model.mar
      s3://mlpipeline/mnt/models/config/config.properties
    """
    from kserve import KServeClient, V1beta1InferenceService, V1beta1InferenceServiceSpec, V1beta1PredictorSpec, V1beta1TorchServeSpec
    from kubernetes import client as k8s_client
    from kubernetes.config import ConfigException, load_incluster_config, load_kube_config

    # Load the Kubernetes configuration
    try:
        load_incluster_config()
    except ConfigException:
        load_kube_config()

    kserve_client = KServeClient()

    # The storage_uri points to the MinIO bucket root (adjust if you use a subfolder)
    storage_uri = "s3://mlpipeline/"

    isvc = V1beta1InferenceService(
        api_version="serving.kserve.io/v1beta1",
        kind="InferenceService",
        metadata=k8s_client.V1ObjectMeta(
            name=service_name,
            namespace=namespace,
            annotations={"pipelines.kubeflow.org/owned_by_kfp_run": "false", "sidecar.istio.io/inject": "false"}
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name="sa-minio-kserve",
                pytorch=V1beta1TorchServeSpec(
                    storage_uri=storage_uri,
                    image="pytorch/torchserve",
                    runtime_version="0.12.0-gpu",
                    resources=k8s_client.V1ResourceRequirements(
                        limits = {"nvidia.com/gpu":"1"}
                    )
                )
            )
        )
    )

    kserve_client.create(isvc, watch=True)
    print(f"Applied InferenceService '{service_name}' in namespace '{namespace}'.")
    print("Waiting for InferenceService to become ready...")
    kserve_client.wait_isvc_ready(name=service_name, namespace=namespace, timeout_seconds=600)
    isvc_status = kserve_client.get(name=service_name, namespace=namespace)
    print(f"InferenceService '{service_name}' is ready.")
    print(f"Prediction URL: {isvc_status['status']['url']}")

@dsl.pipeline(name="wine-quality-pytorch-pipeline")
def wine_quality_pipeline(
    url: str = "https://raw.githubusercontent.com/plotly/datasets/master/winequality-red.csv",
    run_name: str = "wine-quality-pytorch-run",
    model_name: str = "wine-quality-pytorch-model"
):
    download_task = download_dataset(url=url)
    preprocess_task = preprocess_dataset(dataset=download_task.outputs["dataset_path"])
    train_task = train_pytorch_model(
        dataset=preprocess_task.outputs['output_file'],
        run_name=run_name,
        model_name=model_name
    )
    serve_task = serve_pytorch_model()
    serve_task.after(train_task)

if __name__ == '__main__':
    import kfp
    kfp_client = kfp.Client()
    kfp_client.create_run_from_pipeline_func(
        wine_quality_pipeline,
        experiment_name="wine_quality_test",
    )
