# Upload model to MinIO artifact store

In [None]:
from kfp.components import create_component_from_func, InputPath
from typing import NamedTuple

%load_ext lab_black

BASE_IMAGE = "quay.io/ibm/kubeflow-notebook-image-ppc64le:elyra3.14.1-py3.9-tf2.10.1-pt1.10.2-v0"


def upload_model(
    file_dir: InputPath(str),
    minio_url: str = "minio-service.kubeflow:9000",
    minio_secret: str = "mlpipeline-minio-artifact",
    export_bucket: str = "models",
    model_name: str = "my-model",
    model_version: int = 1,
    model_format: str = "onnx",
) -> NamedTuple("UploadOutput", [("s3_address", str), ("triton_s3_address", str)]):
    """Uploads a model file to MinIO artifact store."""

    from collections import namedtuple
    from kubernetes import client, config
    import logging
    from minio import Minio
    import sys

    logging.basicConfig(
        stream=sys.stdout,
        level=logging.INFO,
        format="%(levelname)s %(asctime)s: %(message)s",
    )
    logger = logging.getLogger()

    def get_minio_client(minio_secret):
        import base64
        from kubernetes.client.rest import ApiException

        def get_current_namespace():
            SA_NAMESPACE = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
            with open(SA_NAMESPACE) as f:
                return f.read()

        def decode(text):
            return base64.b64decode(text).decode("utf-8")

        config.load_incluster_config()
        api_client = client.ApiClient()

        try:
            secret = client.CoreV1Api(api_client).read_namespaced_secret(
                minio_secret, get_current_namespace()
            )

            minio_user = decode(secret.data["accesskey"])
            minio_pass = decode(secret.data["secretkey"])

            return Minio(
                minio_url, access_key=minio_user, secret_key=minio_pass, secure=False
            )
        except ApiException as e:
            if e.status == 404:
                logger.error(
                    "Failed to get secret 'mlpipeline-minio-artifact', which is needed for communicating with MinIO!"
                )
            raise Exception(e)

    logger.info(f"Establishing MinIO connection to '{minio_url}'...")
    minio_client = get_minio_client(minio_secret)

    # Create export bucket if it does not yet exist
    response = minio_client.list_buckets()
    export_bucket_exists = False
    for bucket in response:
        if bucket.name == export_bucket:
            export_bucket_exists = True

    if not export_bucket_exists:
        logger.info(f"Creating bucket '{export_bucket}'...")
        minio_client.make_bucket(bucket_name=export_bucket)

    model_path = f"{model_name}/{model_version}/model.{model_format}"
    s3_address = f"s3://{minio_url}/{export_bucket}/{model_format}"
    triton_s3_address = f"{s3_address}/{model_path}"

    logger.info(f"Saving onnx file to MinIO (s3 address: {s3_address})...")
    minio_client.fput_object(
        bucket_name=export_bucket,  # bucket name in Minio
        object_name=f"{model_format}/{model_path}",  # file name in bucket of Minio / for Triton name MUST be model.onnx!
        file_path=file_dir,  # file path / name in local system
    )

    logger.info("Finished.")
    out_tuple = namedtuple("UploadOutput", ["s3_address", "triton_s3_address"])
    return out_tuple(s3_address, triton_s3_address)


upload_model_comp = create_component_from_func(
    func=upload_model, output_component_file="component.yaml", base_image=BASE_IMAGE
)