# 노트북에서 모델 학습 및 서빙 API 생성 파이프라인 만들기

In [1]:
import os
import uuid
from kakaocloud_kbm import KbmPipelineClient
import kfp
from kfp import kubernetes
from kfp import components
import kfp.dsl as dsl
from kfp.dsl import Output, Input, Artifact, Model, Dataset, InputPath, OutputPath
import kfp.compiler as compiler

* 'json_loads' has been removed




In [2]:
# KBM Kubeflow SDK
os.environ["KUBEFLOW_HOST"] = "https://{{ KUBEFLOW 도메인 주소}}" # 반드시 Kubeflow 생성시 입력하신 IP형태가 아닌 도메인을 넣어주셔야 합니다.
os.environ["KUBEFLOW_USERNAME"] = "{{ KUBEFLOW 계정 이메일 }}"
os.environ["KUBEFLOW_PASSWORD"] = "{{ KUBEFLOW 계정 비밀번호 }}"

client = KbmPipelineClient(
    # verify_ssl=False ## 도메인 연결 및 TLS 설정이 되어 있지 않을 경우 활성화
)



In [3]:
# Variables
KBM_NAMESPACE = os.environ['NB_PREFIX'].split('/')[2]
COMPONENT_PATH = 'components'
SERVE_ENPOINT_PATH = os.path.join(COMPONENT_PATH, 'yelp_review_nlp_serve_model')
TRAIN_CR_IMAGE = "bigdata-150.kr-central-2.kcr.dev/kc-kubeflow/kmlp-pytorch:v1.8.0.py38.cuda.1a"

TASK_UUID = uuid.uuid1().hex[:8]
PVC_NAME = f"test-nlp-pvc-{TASK_UUID}"
MODEL_NAME = f"torch-model-{TASK_UUID}"
KBM_MODEL_SERV_NAME = f"torchserve-{TASK_UUID}"
EPOCH_NUM = 10

print(f"Model Name : {MODEL_NAME}")
print(f"Model PVC Name : {PVC_NAME}")
print(f"Model Server Name : {KBM_MODEL_SERV_NAME}")

Model Name : torch-model-cdad658c
Model PVC Name : test-nlp-pvc-cdad658c
Model Server Name : torchserve-cdad658c


## 파이프라인 컴포넌트 빌드하기

In [4]:
%%bash -s "{SERVE_ENPOINT_PATH}"

mkdir -p ${1}
echo ${1}

components/yelp_review_nlp_serve_model


### 데이터 수집 컴포넌트

In [5]:
@dsl.component(packages_to_install=['requests'])
def download_dataset(
    kc_kbm_os_train_url: str,
    kc_kbm_os_valid_url: str,
    kc_kbm_os_handler_url: str,
    kc_kbm_os_kserve_url: str
):
    import os
    from requests import get

    def download(url, dist_dir, file_name = None):
    	if not file_name:
    		file_name = url.split('/')[-1]
    
    	with open(os.path.join(dist_dir, file_name), "wb") as file:   
            	response = get(url)               
            	file.write(response.content)   
    
    pvc_data_path = "/data"

    if not kc_kbm_os_train_url:
        kc_kbm_os_train_url = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/data/train.csv'
        
    download(kc_kbm_os_train_url, pvc_data_path, "train.csv")

    if not kc_kbm_os_valid_url:
        kc_kbm_os_valid_url = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/data/validation.csv'

    download(kc_kbm_os_valid_url, pvc_data_path, "validation.csv")

    if not kc_kbm_os_handler_url:
        kc_kbm_os_handler_url = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/handler.py'

    download(kc_kbm_os_handler_url, pvc_data_path, "handler.py")

    if not kc_kbm_os_kserve_url:
        kc_kbm_os_kserve_url = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/kserve_component.yaml'
        
    download(kc_kbm_os_kserve_url, pvc_data_path, "kserve_component.yaml")

    print(os.listdir(pvc_data_path))


  return component_factory.create_component_from_func(


### 자연어 처리 모델 학습 컴포넌트

In [6]:
@dsl.component(
    packages_to_install=[
        'transformers==4.5.1',
        'packaging==21.3',
        # 'transformers[torch]',
        'datasets==1.4.1',
        # 'datasets'
    ],
    base_image=TRAIN_CR_IMAGE,
    output_component_file=f'{SERVE_ENPOINT_PATH}/train_component.yaml'
)
def train_nlp(
    epoch_num: str,
    model_name: str
):
    from transformers import (
        DistilBertForSequenceClassification,
        DistilBertTokenizer,
        Trainer,
        TrainingArguments,
        TrainerCallback,
    )
    from datasets import Dataset
    import pandas as pd
    import os
    
    pvc_data_path = "/data"

    print("listdir : \n ", os.listdir())
    print("getcwd : \n ", os.getcwd())
    
    os.chdir("/")
        
    train_dataset = Dataset.from_pandas(pd.read_csv(f"{pvc_data_path}/train.csv")).select(range(32))
    evaluation_dataset = Dataset.from_pandas(pd.read_csv(f"{pvc_data_path}/validation.csv"))
            
    print("Saving config.properties !!")
    
    # tokenizing
    tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
    
    def tokenize_function(item):
        return tokenizer(
            item["text"], 
            padding="max_length", 
            max_length=128, 
            truncation=True
        )
    
    train = train_dataset.map(tokenize_function)
    evaluation = evaluation_dataset.map(tokenize_function)
    
    print("complete Tokenizing")
    
    model = DistilBertForSequenceClassification.from_pretrained(
        "distilbert-base-uncased", num_labels=len(set(train_dataset["label"]))
    )
    
    tra_arg = TrainingArguments(
        per_device_train_batch_size=8,
        output_dir=f"{pvc_data_path}/torch_model",
        num_train_epochs=int(epoch_num),
        logging_steps=2,
        # evaluation_strategy="epoch",
        disable_tqdm=True,
        save_strategy="no",
    )
    
    class myCallback(TrainerCallback):
        def on_log(self, args, state, control, logs=None, **kwargs):
            print(f"{state.global_step} Steps ")
            
    trainer = Trainer(
        model=model,
        args=tra_arg,
        train_dataset=train,
        eval_dataset=evaluation,
        callbacks=[myCallback],
    )
    
    trainer.train()
    # Saving Tokenizer, Model
    trainer.save_model(f"{pvc_data_path}/torch_model/")
    tokenizer.save_pretrained(f"{pvc_data_path}/torch_model")

    print("Saving Model & Tokenizer Complete !!")
    
    # config for torchserve
    import json
    
    config = dict(
        inference_address="http://0.0.0.0:8085",
        management_address="http://0.0.0.0:8085",
        metrics_address="http://0.0.0.0:8082",
        grpc_inference_port=7070,
        grpc_management_port=7071,
        enable_envvars_config="true",
        install_py_dep_per_model="true",
        model_store="model-store",
        model_snapshot=json.dumps({
            "name": "startup.cfg",
            "modelCount": 1,
            "models": {
                f"{model_name}": {  # Model Name
                    "1.0": {
                        "defaultVersion": "true",
                        "marName": f"{model_name}.mar",
                        "minWorkers": 1,
                        "maxWorkers": 5,
                        "batchSize": 1,
                        "maxBatchDelay": 10,
                        "responseTimeout": 60,
                    }
                }
            },
        }),
    )
    # creating config & config folder
    if not os.path.exists(f"{pvc_data_path}/torch_model/config"):
        os.mkdir(f"{pvc_data_path}/torch_model/config")
        
    with open(f"{pvc_data_path}/torch_model/config/config.properties", "w") as f:
        for i, j in config.items():
            f.write(f"{i}={j}\n")


  @dsl.component(


### 서빙을 위한 MAR파일 생성 컴포넌트

In [7]:
@dsl.container_component
def create_marfile():
    return dsl.ContainerSpec(image='python:3.9', command=["/bin/sh"], args=[
        "-c",
        f"ls -al /data; cd /data/torch_model; ls -al; pwd; pip install torchserve torch-model-archiver torch-workflow-archiver; torch-model-archiver --model-name {MODEL_NAME} --version 1.0 --serialized-file pytorch_model.bin --handler ../handler.py --extra-files config.json,vocab.txt --force; mkdir model-store; mv -f {MODEL_NAME}.mar model-store; sed -i 's/\model-store\b/\/mnt\/models\/model-store/g' /data/torch_model/config/config.properties"
    ])

### 모델(서빙API) 생성 컴포넌트

In [8]:
%%writefile {SERVE_ENPOINT_PATH}/kserve_component.yaml
name: Serve a model with KServe 
description: Serve Models using KServe 
inputs:
  - {name: Action,                    type: String, default: 'create',                            description: 'Action to execute on KServe'}
  - {name: Model Name,                type: String, default: '',                                  description: 'Name to give to the deployed model'}
  - {name: Model URI,                 type: String, default: '',                                  description: 'Path of the S3 or GCS compatible directory containing the model.'}
  - {name: Canary Traffic Percent,    type: String, default: '100',                               description: 'The traffic split percentage between the candidate model and the last ready model'}
  - {name: Namespace,                 type: String, default: '',                                  description: 'Kubernetes namespace where the KServe service is deployed.'}
  - {name: Framework,                 type: String, default: '',                                  description: 'Machine Learning Framework for Model Serving.'}
  - {name: Runtime Version,           type: String, default: 'latest',                            description: 'Runtime Version of Machine Learning Framework'}
  - {name: Resource Requests,         type: String, default: '{"cpu": "0.5", "memory": "512Mi"}', description: 'CPU and Memory requests for Model Serving'}
  - {name: Resource Limits,           type: String, default: '{"cpu": "1", "memory": "1Gi"}',     description: 'CPU and Memory limits for Model Serving'}
  - {name: Custom Model Spec,         type: String, default: '{}',                                description: 'Custom model runtime container spec in JSON'}
  - {name: Autoscaling Target,        type: String, default: '0',                                 description: 'Autoscaling Target Number'}
  - {name: Service Account,           type: String, default: '',                                  description: 'ServiceAccount to use to run the InferenceService pod'}
  - {name: Enable Istio Sidecar,      type: Bool,   default: 'True',                              description: 'Whether to enable istio sidecar injection'}
  - {name: InferenceService YAML,     type: String, default: '{}',                                description: 'Raw InferenceService serialized YAML for deployment'}
  - {name: Watch Timeout,             type: String, default: '300',                               description: "Timeout seconds for watching until InferenceService becomes ready."}
  - {name: Min Replicas,              type: String, default: '-1',                                description: 'Minimum number of InferenceService replicas'}
  - {name: Max Replicas,              type: String, default: '-1',                                description: 'Maximum number of InferenceService replicas'}
  - {name: Request Timeout,           type: String, default: '60',                                description: "Specifies the number of seconds to wait before timing out a request to the component."}
  - {name: Enable ISVC Status,        type: Bool,   default: 'True',                              description: "Specifies whether to store the inference service status as the output parameter"}

outputs:
  - {name: InferenceService Status,   type: String,                                               description: 'Status JSON output of InferenceService'}
implementation:
  container:
    image: bigdata-150.kr-central-2.kcr.dev/kc-kubeflow/kserve-component:v0.11.1.kbm.1a
    command: ['python']
    args: [
      -u, kservedeployer.py,
      --action,                 {inputValue: Action},
      --model-name,             {inputValue: Model Name},
      --model-uri,              {inputValue: Model URI},
      --canary-traffic-percent, {inputValue: Canary Traffic Percent},
      --namespace,              {inputValue: Namespace},
      --framework,              {inputValue: Framework},
      --runtime-version,        {inputValue: Runtime Version},
      --resource-requests,      {inputValue: Resource Requests},
      --resource-limits,        {inputValue: Resource Limits},
      --custom-model-spec,      {inputValue: Custom Model Spec},
      --autoscaling-target,     {inputValue: Autoscaling Target},
      --service-account,        {inputValue: Service Account},
      --enable-istio-sidecar,   {inputValue: Enable Istio Sidecar},
      --output-path,            {outputPath: InferenceService Status},
      --inferenceservice-yaml,  {inputValue: InferenceService YAML},
      --watch-timeout,          {inputValue: Watch Timeout},
      --min-replicas,           {inputValue: Min Replicas},
      --max-replicas,           {inputValue: Max Replicas},
      --request-timeout,        {inputValue: Request Timeout},
      --enable-isvc-status,     {inputValue: Enable ISVC Status}
    ]

Overwriting components/yelp_review_nlp_serve_model/kserve_component.yaml


In [9]:
from kfp.components import load_component_from_file

def create_inference_model():
    kserve_op = load_component_from_file(f'{SERVE_ENPOINT_PATH}/kserve_component.yaml')
    
    model_name = KBM_MODEL_SERV_NAME
    namespace = KBM_NAMESPACE
    model_uri = f"pvc://{PVC_NAME}/torch_model"
    framework="pytorch"
    
    opt = kserve_op(action="apply",
              model_name=model_name,
              model_uri=model_uri,
              namespace=namespace,
              framework=framework)
    
    opt.set_cpu_limit(cpu="2").set_memory_limit(memory="4G")
    return opt

## 파이프라인 생성

In [10]:
@dsl.pipeline(
    name="Yelp Review NLP Model Pipeline"
)
def yelp_review_nlp_model_Pipeline(
    kc_kbm_os_train_url: str = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/data/train.csv',
    kc_kbm_os_valid_url: str = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/data/validation.csv',
    kc_kbm_os_handler_url: str = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/handler.py',
    kc_kbm_os_kserve_url: str = 'https://objectstorage.kr-central-2.kakaocloud.com/v1/252267c6b6f745eba8b850ec047b673e/kbm-files/guide_docs/hands_on/yelp_review_data_nlp/kserve_component.yaml',
    model_name: str = "torch-model",
    epoch_num: str = "10"
):
    pvc1 = kubernetes.CreatePVC(
        pvc_name=PVC_NAME,
        access_modes=['ReadWriteOnce'],
        size='10Gi',
        storage_class_name=''
    )
    
    ### 데이터 로드
    download_data = download_dataset(
        kc_kbm_os_train_url=kc_kbm_os_train_url,
        kc_kbm_os_valid_url=kc_kbm_os_valid_url,
        kc_kbm_os_handler_url=kc_kbm_os_handler_url,
        kc_kbm_os_kserve_url=kc_kbm_os_kserve_url
    )
    download_data.set_cpu_request(cpu="1").set_memory_request(memory="2G")
    download_data.set_caching_options(enable_caching=False)
    
    kubernetes.mount_pvc(
        download_data,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    
    ### 모델 학습
    model_train = train_nlp(
        epoch_num=epoch_num,
        model_name=model_name
    )
    model_train.set_cpu_request(cpu="4").set_memory_request(memory="8G")
    model_train.set_accelerator_type("nvidia.com/mig-1g.10gb").set_accelerator_limit(1)
    kubernetes.mount_pvc(
        model_train,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    model_train.set_display_name("Finetuning Text Classification Model")
    model_train.set_caching_options(enable_caching=False)
    model_train.after(download_data)
    
    ### Mar file 생성
    marfile = create_marfile()
    marfile.set_caching_options(enable_caching=False)
    marfile.set_cpu_limit(cpu="1").set_memory_limit(memory="2G")

    kubernetes.mount_pvc(
        marfile,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )

    marfile.set_display_name("Creating Marfile")
    marfile.after(model_train)
    
    ### 모델 서빙
    inference_model = create_inference_model()
    inference_model.set_cpu_limit(cpu="4").set_memory_limit(memory="8G")

    kubernetes.mount_pvc(
        inference_model,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    inference_model.after(marfile)
    

In [11]:
experiment_name = yelp_review_nlp_model_Pipeline.name + ' test experiment'

run_name = yelp_review_nlp_model_Pipeline.name + ' run'

arguments = {
    "model_name": MODEL_NAME,
    "epoch_num": str(EPOCH_NUM)
}

client.create_run_from_pipeline_func(
    yelp_review_nlp_model_Pipeline, 
    experiment_name=experiment_name, 
    run_name=run_name, 
    arguments=arguments
)

RunPipelineResult(run_id=a83cf803-c77f-402f-9a5d-e0584728f6b3)

## 파이프라인 실행 확인

위 링크 또는 Kubeflow Dashoboard > Runs에서 생성하신 Run을 확인하고 

Run 실행이 완료되고 난 후, 아래 테스트 코드를 실행하시기 바랍니다.

## 모델 서빙 API 테스트

In [12]:
from kubernetes import client, config
from kubernetes.config import ConfigException

try:
    # Load configuration inside the Pod
    config.load_incluster_config()
except ConfigException:
    # Load configuration for testing
    config.load_kube_config()
    
kube_core_client = client.CoreV1Api()

all_services = kube_core_client.list_namespaced_service(
    namespace=KBM_NAMESPACE, 
    label_selector=f"component=predictor,serving.kserve.io/inferenceservice={KBM_MODEL_SERV_NAME},networking.internal.knative.dev/serviceType=Private"
)
serv_api_ip = all_services.items[0].spec.cluster_ip

In [13]:
import requests

if "KUBEFLOW_HOST" in os.environ:
    host = os.environ["KUBEFLOW_HOST"]

if "KUBEFLOW_USERNAME" in os.environ:
    username = os.environ["KUBEFLOW_USERNAME"]

if "KUBEFLOW_PASSWORD" in os.environ:
    password = os.environ["KUBEFLOW_PASSWORD"]

session = requests.Session()

_kargs = {}
if host.startswith("https"):
    _kargs["verify"] = False

response = session.get(
    host, **_kargs
)


headers = {
    "Content-Type": "application/x-www-form-urlencoded",
}

session.post(response.url, headers=headers, data={"login": username, "password": password}, **_kargs)
session_cookie = session.cookies.get_dict()["authservice_session"]

In [14]:
input_text_data = "Hello World!"

_host_arr = host.split("/")

data = {
    "instances": [{"data": input_text_data}]
}

x = requests.post(
    url=f"{host}/v1/models/{MODEL_NAME}:predict", 
    cookies={'authservice_session': session_cookie},
    headers={
        "Host": f"{KBM_MODEL_SERV_NAME}.{KBM_NAMESPACE}.{_host_arr[2]}"
,
    },
    json=data, **_kargs
)

print(f"입력값: {data}")
print(f"결괏값: {x.text}")

입력값: {'instances': [{'data': 'Hello World!'}]}
결괏값: {"predictions":[4]}
