### KFP를 활용해 Training PipeLine 만들기

In [119]:
#### New
from functools import partial
from kfp.components import create_component_from_func, InputPath, OutputArtifact, OutputPath,func_to_container_op
from kubernetes import client as k8s_client
from kubernetes.client import V1Volume
from kfp.dsl import ContainerOp
from kubernetes.client.models import V1EnvVar, V1SecretKeySelector


@partial(
    create_component_from_func,
    base_image="python:3.9",
    packages_to_install=["pandas"],
)
def load_data_create_component_from_func(
    # train_path: OutputArtifact("pvc"),
    # evaluation_path: OutputArtifact("pvc"),
):

    import pandas as pd
    import os 

    print('listdir : \n', os.listdir())

    print("load_pandas")

    # load data from github
    df_train = pd.read_csv(
        "https://raw.github.com/yangoos57/Learning_kubeflow/main/mini_project/data/train.csv"
    )
    df_evaluation = pd.read_csv(
        "https://raw.github.com/yangoos57/Learning_kubeflow/main/mini_project/data/validation.csv"
    )
    print("Complete_loading_data_to_pandas")

    df_train.to_csv('pvc/train_csv', index=False)
    df_evaluation.to_csv('pvc/val.csv', index=False)

    # df_train.to_csv(train_path, index=False)
    # df_evaluation.to_csv(evaluation_path, index=False)

    print("complete Loading Data")


In [124]:
@create_component_from_func
def creat_marfile():

    config = dict(
            inference_address="http://0.0.0.0:8085",
            management_address="http://0.0.0.0:8085",
            metrics_address="http://0.0.0.0:8082",
            grpc_inference_port=7070,
            grpc_management_port=7071,
            enable_envvars_config="true",
            install_py_dep_per_model="true",
            model_store="model-store",
            model_snapshot={
                "name": "startup.cfg",
                "modelCount": 1,
                "models": {
                    "torch-model": {
                        "1.0": {
                            "defaultVersion": 'true',
                            "marName": "torch-model.mar",
                            "minWorkers": 1,
                            "maxWorkers": 5,
                            "batchSize": 1,
                            "maxBatchDelay": 10,
                            "responseTimeout": 60,
                        }
                    }
                },
            },
        )

    with open("pvc/torch_model/config.properties", "w") as f:
        for i,j in config.items() :
            f.write(f'{i}={j}\n')
        f.close()


In [121]:
from kfp.dsl import pipeline
from kfp import onprem
import kfp.dsl as dsl


@pipeline(name="test_pipeline")
def test_pipeline():

    # container_ops
    data = load_data_create_component_from_func()
    data.apply(onprem.mount_pvc(pvc_name='lee', volume_name='test-lee', volume_mount_path="pvc"))

    # op = ContainerOp(
    #     name="MARfile_Test",
    #     image="python:3.9",
    #     command=["/bin/sh", "-c"],
    #     arguments=["cd pvc && touch test.file && ls -a"],
    #     # pvolumes={"/data": data.pvolume},
    #     # artifact_argument_paths=[data.outputs["train"]],
    # ).apply(onprem.mount_pvc(pvc_name='lee', volume_name='test-lee', volume_mount_path="pvc"))


import kfp

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(test_pipeline, "test_pipeline_op2.yaml")




### minio 연동

In [84]:
from minio import Minio
from minio.error import S3Error

url = 'localhost:9000/'
client = Minio(
        url,
        access_key="minio",
        secret_key="minio123",
        secure=False
    )

In [1]:
import kfp
client = kfp.Client(host='https://localhost:8080/pipeline')
client.create_run_from_pipeline_func(test_pipeline,arguments={})

client

MaxRetryError: HTTPSConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /pipeline/apis/v1beta1/healthz (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1123)')))

In [11]:
import json
log_dir_uri=f"s3://{'torch'}",
minio_endpoint = "http://minio-service.kubeflow:9000"
pod_template_spec=json.dumps({
            "spec": {
                "containers": [{
                    "env": [
                        {
                            "name": "AWS_ACCESS_KEY_ID",
                            "value": "minio"
                        },
                        {
                            "name": "AWS_SECRET_ACCESS_KEY",
                            "value" : "minio123"
                        },
                        {
                            "name": "S3_ENDPOINT",
                            "value": f"{minio_endpoint}",
                        },
                        {
                            "name": "S3_USE_HTTPS",
                            "value": "0"
                        },
                        {
                            "name": "S3_VERIFY_SSL",
                            "value": "0"
                        },
                    ]
                }]
            }
        }),

In [12]:
pod_template_spec

('{"spec": {"containers": [{"env": [{"name": "AWS_ACCESS_KEY_ID", "key": "minio"}, {"name": "AWS_SECRET_ACCESS_KEY", "key": "minio123"}, {"name": "S3_ENDPOINT", "value": "http://minio-service.kubeflow:9000"}, {"name": "S3_USE_HTTPS", "value": "0"}, {"name": "S3_VERIFY_SSL", "value": "0"}]}]}}',)

In [4]:
@partial(
    create_component_from_func,
    base_image="679oose/basepython:1.0"
)
def train_model(
    train_path:InputPath("csv"),
    evaluation_path: InputPath("csv"),
    model_save_path: OutputPath("folder"),
):

    from transformers import (
        DistilBertForSequenceClassification,
        DistilBertTokenizer,
        Trainer,
        TrainingArguments,
        TrainerCallback
    )
    from datasets import Dataset

    # loading data
    train_dataset = Dataset.from_csv(train_path)
    evaluation_dataset = Dataset.from_csv(evaluation_path)

    # tokenizing
    tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

    def tokenize_function(item):
        return tokenizer(item["text"], padding="max_length", max_length=128, truncation=True)

    train = train_dataset.map(tokenize_function)
    evaluation = evaluation_dataset.map(tokenize_function)

    print('complete Tokenizing')

    model = DistilBertForSequenceClassification.from_pretrained(
        "distilbert-base-uncased", num_labels=len(set(train_dataset["label"]))
    )
    tra_arg = TrainingArguments(
        output_dir="test",
        num_train_epochs=1,
        logging_steps=5,
        evaluation_strategy="epoch",
        disable_tqdm=True,
        save_strategy = "no"
    )

    class myCallback(TrainerCallback):

        def on_log(self, args, state, control, logs=None, **kwargs):
            print(f'{state.global_step} Steps ')

    trainer = Trainer(
        model=model,
        args=tra_arg,
        train_dataset=train,
        eval_dataset=evaluation,
        callbacks=[myCallback]
    )

    trainer.train()
    trainer.save_model(model_save_path)



In [6]:
from kfp.dsl import pipeline

@pipeline(name="NLP_Pipeline")
def NLP_Pipeline():
    data = load_data()
    train_model(data.outputs['train'],data.outputs['evaluation'])


import kfp
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(NLP_Pipeline, "NLP_Pipeline_1.2.yaml")