In [None]:
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Bert Pipeline : PyTorch BERT News Classfication

This notebook shows PyTorch BERT end-to-end news classification example using Kubeflow Pipelines.


An example notebook that demonstrates how to:

* Get different tasks needed for the pipeline
* Create a Kubeflow pipeline
* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline
* Submit a job for execution
* Query(prediction and explain) the final deployed model
* Interpretation of the model using the Captum Insights


In [1]:
# ! pip uninstall -y kfp
# ! pip install --no-cache-dir  captum
import os

proxies={'http': 'http://172.16.153.24:7890',
        'https': 'http://172.16.153.24:7890'}
print(proxies)

TOKEN=os.environ['KF_PIPELINES_SA_TOKEN_PATH']
print(os.environ['KF_PIPELINES_SA_TOKEN_PATH'])

os.environ['http_proxy']=proxies['http']
os.environ['https_proxy']=proxies['https']


{'http': 'http://172.16.153.24:7890', 'https': 'http://172.16.153.24:7890'}
/var/run/secrets/kubeflow/pipelines/token


In [2]:
import kfp
import json
import os
from kfp.onprem import use_k8s_secret
from kfp import components
from kfp.components import load_component_from_file, load_component_from_url
from kfp import dsl
from kfp import compiler

kfp.__version__

'1.8.13'

# Enter your gateway and the cookie
[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)

![image.png](./image.png)

## Update values for the ingress gateway and auth session

In [3]:
INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'
# AUTH="<enter your token here>"
NAMESPACE="muzhi1991"
# COOKIE="authservice_session="+AUTH
EXPERIMENT="Default"

REGISTRY="k3d-myregistry.localhost:5000"

from kubernetes.client.models import V1EnvVar
http_proxy_var = V1EnvVar(name='http_proxy', value=proxies['http'])
https_proxy_var = V1EnvVar(name='https_proxy', value=proxies['https'])

## Set Log bucket and Tensorboard Image

In [4]:
MINIO_ENDPOINT="http://minio-service.kubeflow:9000"
LOG_BUCKET="mlpipeline"
# k3d-myregistry.localhost:5000/muzhi1991/tboard:DBE73959
TENSORBOARD_IMAGE=os.path.join(REGISTRY,"muzhi1991/tboard:DBE73959")
print(TENSORBOARD_IMAGE)

k3d-myregistry.localhost:5000/muzhi1991/tboard:DBE73959


In [5]:
client = kfp.Client()

In [6]:
client.create_experiment(EXPERIMENT)
experiments = client.list_experiments(namespace=NAMESPACE)
my_experiment = experiments.experiments[-1]
my_experiment

{'created_at': datetime.datetime(2022, 9, 25, 8, 39, 7, tzinfo=tzutc()),
 'description': None,
 'id': '9381fa9e-7b57-4045-8272-2fbda9821fb7',
 'name': 'Default',
 'resource_references': [{'key': {'id': 'muzhi1991', 'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

In [34]:
experiments.experiments

[{'created_at': datetime.datetime(2022, 9, 17, 8, 41, 13, tzinfo=tzutc()),
  'description': None,
  'id': '0681ebb1-f908-417b-a4ee-0cc24153f573',
  'name': 'test-data-process',
  'resource_references': [{'key': {'id': 'muzhi1991', 'type': 'NAMESPACE'},
                           'name': None,
                           'relationship': 'OWNER'}],
  'storage_state': 'STORAGESTATE_AVAILABLE'},
 {'created_at': datetime.datetime(2022, 9, 23, 15, 23, 15, tzinfo=tzutc()),
  'description': None,
  'id': '8035f383-4968-4553-86ed-00ca56cd94bc',
  'name': 'xgboost_train_test',
  'resource_references': [{'key': {'id': 'muzhi1991', 'type': 'NAMESPACE'},
                           'name': None,
                           'relationship': 'OWNER'}],
  'storage_state': 'STORAGESTATE_AVAILABLE'},
 {'created_at': datetime.datetime(2022, 9, 25, 8, 39, 7, tzinfo=tzutc()),
  'description': None,
  'id': '9381fa9e-7b57-4045-8272-2fbda9821fb7',
  'name': 'Default',
  'resource_references': [{'key': {'id': 'mu

## Set Inference parameters

In [7]:
DEPLOY_NAME="bertserve"
MODEL_NAME="bert"

In [8]:
# ! python utils/generate_templates.py bert/template_mapping.json

/home/jovyan/project/pytorch-bert/utils/../pytorch_kfp_components/templates
Processing copy_component.yaml
Processing train_component.yaml
Processing ax_generate_trials_component.yaml
Processing prediction_component.yaml
Processing ax_train_component.yaml
Processing minio_component.yaml
Processing ax_complete_trials_component.yaml
Processing preprocess_component.yaml
Processing tensorboard_component.yaml


In [9]:
prepare_tensorboard_op = load_component_from_file("yaml/tensorboard_component.yaml")
prep_op = components.load_component_from_file(
    "yaml/preprocess_component.yaml"
)
train_op = components.load_component_from_file(
    "yaml/train_component.yaml"
)
deploy_op = load_component_from_file(
    "./components/kserve/component.yaml"
)
minio_op = components.load_component_from_file(
    "yaml/minio_component.yaml"
)

In [10]:
dsl.RUN_ID_PLACEHOLDER

'{{workflow.uid}}'

## Define pipeline

In [29]:
@dsl.pipeline(name="Training pipeline", description="Sample training job test")
def pytorch_bert( # pylint: disable=too-many-arguments
    minio_endpoint=MINIO_ENDPOINT,
    log_bucket=LOG_BUCKET,
    log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
    mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
    config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
    model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
    tf_image=TENSORBOARD_IMAGE,
    deploy=DEPLOY_NAME,
    namespace=NAMESPACE,
    confusion_matrix_log_dir=f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
    num_samples=1000,
    max_epochs=1
):
    """Thid method defines the pipeline tasks and operations"""
    ### 准备tfboard，这里使用了自定义的tf镜像，里面有pytorch的profile插件（构建过程 common/tensorboard）
    ### 在notebook中构建镜像使用kaniko工具 写了个脚本，python utils/build_image.py 目录（包括Dockerfile） image名字
    ### 下面还配置了minio的key（注意secret有没有mlpipeline-minio-artifact，kf默认安装了，之所以是AWS——xxx，是因为用了boto3这个库他是亚马逊的。。兼容minio要求这些环境变量
    ### 看到方法是，在kubeflow的pipeline界面的visual里面点击启动tensorboard
    ### tf界面是要下面『训练完』，上传到minio的目录下，才能看（不能实时看）
    prepare_tb_task = prepare_tensorboard_op(
        log_dir_uri=f"s3://{log_bucket}/{log_dir}",
        image=tf_image,
        pod_template_spec=json.dumps({
            "spec": {
                "containers": [{
                    "env": [
                        {
                            "name": "AWS_ACCESS_KEY_ID",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "accesskey",
                                }
                            },
                        },
                        {
                            "name": "AWS_SECRET_ACCESS_KEY",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "secretkey",
                                }
                            },
                        },
                        {
                            "name": "AWS_REGION",
                            "value": "minio"
                        },
                        {
                            "name": "S3_ENDPOINT",
                            "value": f"{minio_endpoint}",
                        },
                        {
                            "name": "S3_USE_HTTPS",
                            "value": "0"
                        },
                        {
                            "name": "S3_VERIFY_SSL",
                            "value": "0"
                        },
                    ]
                }]
            }
        }),
    ).set_display_name("Visualization")
    ## 注意下面的都需要gpu机器。。虽然代码本质上这些东西不需要，但是pytorch的基础镜像没有gpu启动不来
    ## 之所以这个代码需要pytorch是因为用了pytorch_kfp_components这个库（他依赖了，这个库里还有用来生成模型文件的工具类，所以要。。）
    ## pytorch_kfp_components这个库其实这里就用了visualization输出一下可视化数据
    ## 总之，设计的不好，其实完全不需要gpu的
    ## 还有要下载数据所以需要代理
    prep_task = (
        prep_op().after(prepare_tb_task
                       ).set_gpu_limit(1).add_env_variable(http_proxy_var).add_env_variable(https_proxy_var).set_display_name("Preprocess & Transform")
    )
    
    ### 训练部分：
    ### 这里用了自己的镜像训练，代码里面是pytorch_kfp_components+pytorch lightning+transformers
    ### 这个pytorch_kfp_components库他把pytorch lightning套了一层定义了自己Trainer，没啥作用。。
    ### 这里还有个坑，pytorch_kfp_components这个库pip里面的太旧了有bug，要用源码的自己下载安装
    ### 下面指定的参数是pytorch lightning需要的ptl_args，使用gpu，accelerator=gpu，devices=1设备一个
    ### 训练用了transformers。。需要联网下载预训练模型
    confusion_matrix_url = f"minio://{log_bucket}/{confusion_matrix_log_dir}"
    script_args = f"model_name=bert.pth," \
                  f"num_samples={num_samples}," \
                  f"confusion_matrix_url={confusion_matrix_url}"
    # For GPU , set device count and strategy type ,strategy=ddp
    ptl_args = f"max_epochs={max_epochs},accelerator=gpu,profiler=pytorch,devices=1"
    train_task = (
        train_op(
            input_data=prep_task.outputs["output_data"],
            tensorboard_root=f"s3://{log_bucket}/{log_dir}",
            script_args=script_args,
            ptl_arguments=ptl_args
        ).after(prep_task).set_gpu_limit(1).add_env_variable(http_proxy_var).add_env_variable(https_proxy_var).set_display_name("Training")
        # For allocating resources, uncomment below lines
        # .set_memory_request('600M')
        # .set_memory_limit('1200M')
        # .set_cpu_request('700m')
        # .set_cpu_limit('1400m')
        # For GPU uncomment below line and set GPU limit and node selector
        # .set_gpu_limit(1).add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-p4')
    )
    
    ### 上传训练结果，
    ### 需要gpu。。谁让他用了基础镜像是pytorch呢。。。
    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=log_dir,
            input_path=train_task.outputs["tensorboard_root"],
            filename="",
        ).set_gpu_limit(1).after(train_task).set_display_name("Tensorboard Events Pusher")
    )
    minio_mar_upload = (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=mar_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="bert_test.mar",
        ).set_gpu_limit(1).after(train_task).set_display_name("Mar Pusher")
    )
    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=config_prop_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="config.properties",
        ).set_gpu_limit(1).after(train_task).set_display_name("Conifg Pusher")
    )

    model_uri = str(model_uri)
    ### 注意点：
    ### 1. 需要sa账号访问minio，storageUri需要从minios下载mar文件。参考 common/minio/kserve-minio-account.yaml
    ### 2. sidecar.istio.io/inject: "false" ，否则访问会出现403 rabc权限错误错误，这个是https://github.com/kserve/kserve/issues/1076
    ### 3. 加上代理，因为torchserver内部有requirements.txt需要下载python安装包
    ### 4. 需要手动进入容器下载bert-base-uncased模型，用BertModel.from_pretrained("bert-base-uncased")，否则第一次请求的时候会下载，但是会请求超时导致下载不了。。。
    ### 本质上是这个torch serve handler实现的有问题。。其实没必要下载原始的bert模型，直接加载我们自己的就行。
    ### 
    # pylint: disable=unused-variable
    isvc_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          storageUri: {}
          env:
          - name: http_proxy
            value: http://172.16.153.24:7890
          - name: https_proxy
            value: http://172.16.153.24:7890
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
    """.format(deploy, namespace, model_uri)
    
    ## gpu没测试
    # For GPU inference use below yaml with gpu count and accelerator 
    gpu_count = "1"
    accelerator = "nvidia-tesla-p4"
    isvc_gpu_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          storageUri: {}
          env:
          - name: http_proxy
            value: http://172.16.153.24:7890
          - name: https_proxy
            value: http://172.16.153.24:7890
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
              nvidia.com/gpu: {}
          nodeSelector:
            cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
    # Update inferenceservice_yaml for GPU inference
    deploy_task = (
        deploy_op(action="apply", inferenceservice_yaml=isvc_yaml
                 ).after(minio_mar_upload).set_display_name("Deployer")
    )

    dsl.get_pipeline_conf().add_op_transformer(
        use_k8s_secret(
            secret_name="mlpipeline-minio-artifact",
            k8s_secret_key_to_env={
                "secretkey": "MINIO_SECRET_KEY",
                "accesskey": "MINIO_ACCESS_KEY",
            },
        )
    )

In [12]:
# Compile pipeline
compiler.Compiler().compile(pytorch_bert, 'pytorch.tar.gz', type_check=True)



In [13]:
# Execute pipeline
run = client.run_pipeline(my_experiment.id, 'pytorch-bert', 'pytorch.tar.gz')

## Wait for inference service below to go to `READY True` state.

In [16]:
os.environ['http_proxy']=''
os.environ['https_proxy']=''
!kubectl get isvc $DEPLOY
os.environ['http_proxy']=proxies['http']
os.environ['https_proxy']=proxies['https']

NAME           URL                                         READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                    AGE
sklearn-iris   http://sklearn-iris.muzhi1991.example.com   True           100                              sklearn-iris-predictor-default-00001   8d
bertserve      http://bertserve.muzhi1991.example.com      True           100                              bertserve-predictor-default-00001      75m


# Get Inferenceservice name

In [14]:
INFERENCE_SERVICE_LIST = ! kubectl get isvc {DEPLOY_NAME} -n {NAMESPACE} -o json | python3 -c "import sys, json; print(json.load(sys.stdin)['status']['url'])"| tr -d '"' | cut -d "/" -f 3
INFERENCE_SERVICE_NAME = INFERENCE_SERVICE_LIST[0]
INFERENCE_SERVICE_NAME

'Unable to connect to the server: context deadline exceeded'

# Prediction Request

In [17]:
! cat ./bert/sample.txt

{
  "id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298",
  "inputs": [{
    "name": "4b7c7d4a-51e4-43c8-af61-04639f6ef4bc",
    "shape": -1,
    "datatype": "BYTES",
    "data": "Bloomberg has reported on the economy"
  }
  ]
}

In [30]:
os.environ['http_proxy']=''
os.environ['https_proxy']=''

AUTH='MTY2NDM3ODc0N3xOd3dBTkRNME1saEhOa1pPV0VRMVZrVlBSRUZhU1VZM1JWSkJRVkZOUWs5QlJVSTJSbE5MTlVoRFFWQTNNalpOVjA1QlZ6WkxTVkU9fGLpp2jV9jmPaxSd9BTi3FOvBGzYXq98DbyAD-0cLjUL'
COOKIE="authservice_session="+AUTH
INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'
!echo "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/infer"
!echo "Host: $INFERENCE_SERVICE_NAME"
!echo "Cookie: $COOKIE"
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/infer" -d @./bert/sample.txt > bert_prediction_output.json

os.environ['http_proxy']=proxies['http']
os.environ['https_proxy']=proxies['https']

http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models/bert/infer
Host: bertserve.muzhi1991.example.com
Cookie: authservice_session=MTY2NDM3ODc0N3xOd3dBTkRNME1saEhOa1pPV0VRMVZrVlBSRUZhU1VZM1JWSkJRVkZOUWs5QlJVSTJSbE5MTlVoRFFWQTNNalpOVjA1QlZ6WkxTVkU9fGLpp2jV9jmPaxSd9BTi3FOvBGzYXq98DbyAD-0cLjUL
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying 10.43.45.42:80...
* TCP_NODELAY set
* Connected to istio-ingressgateway.istio-system.svc.cluster.local (10.43.45.42) port 80 (#0)
> POST /v2/models/bert/infer HTTP/1.1
> Host: bertserve.muzhi1991.example.com
> User-Agent: curl/7.68.0
> Accept: */*
> Cookie: authservice_session=MTY2NDM3ODc0N3xOd3dBTkRNME1saEhOa1pPV0VRMVZrVlBSRUZhU1VZM1JWSkJRVkZOUWs5QlJVSTJSbE5MTlVoRFFWQTNNalpOVjA1QlZ6WkxTVkU9fGLpp2jV9jmPaxSd9BTi3FOvBGzYXq98DbyAD-0cLjUL
> Conte

In [31]:
! cat bert_prediction_output.json

{"id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298", "model_name": "bert_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": ["\"Business\""]}]}

# Explanation Request

In [34]:
os.environ['http_proxy']=''
os.environ['https_proxy']=''
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/explain" -d @./bert/sample.txt  > bert_explaination_output.json
os.environ['http_proxy']=proxies['http']
os.environ['https_proxy']=proxies['https']

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying 10.43.45.42:80...
* TCP_NODELAY set
* Connected to istio-ingressgateway.istio-system.svc.cluster.local (10.43.45.42) port 80 (#0)
> POST /v2/models/bert/explain HTTP/1.1
> Host: bertserve.muzhi1991.example.com
> User-Agent: curl/7.68.0
> Accept: */*
> Cookie: authservice_session=MTY2NDM3ODc0N3xOd3dBTkRNME1saEhOa1pPV0VRMVZrVlBSRUZhU1VZM1JWSkJRVkZOUWs5QlJVSTJSbE5MTlVoRFFWQTNNalpOVjA1QlZ6WkxTVkU9fGLpp2jV9jmPaxSd9BTi3FOvBGzYXq98DbyAD-0cLjUL
> Content-Length: 211
> Content-Type: application/x-www-form-urlencoded
> 
} [211 bytes data]
* upload completely sent off: 211 out of 211 bytes
100   211    0     0  100   211      0     22  0:00:09  0:00:09 --:--:--     0* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 421
< conten

In [35]:
! cat bert_explaination_output.json

{"id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298", "model_name": "bert_test", "model_version": "1", "outputs": [{"name": "explain", "shape": [], "datatype": "BYTES", "data": [{"words": ["bloomberg", "has", "reported", "on", "the", "economy"], "importances": [-0.3494510843097899, 0.10290922115798615, -0.0038743719593886034, -0.3523651655766479, -0.33892890561924205, 0.7926188290352265], "delta": -0.0065188932579783865}]}]}

In [36]:
explanations_json = json.loads(open("./bert_explaination_output.json", "r").read())
explanations_json

{'id': 'd3b15cad-50a2-4eaf-80ce-8b0a428bd298',
 'model_name': 'bert_test',
 'model_version': '1',
 'outputs': [{'name': 'explain',
   'shape': [],
   'datatype': 'BYTES',
   'data': [{'words': ['bloomberg', 'has', 'reported', 'on', 'the', 'economy'],
     'importances': [-0.3494510843097899,
      0.10290922115798615,
      -0.0038743719593886034,
      -0.3523651655766479,
      -0.33892890561924205,
      0.7926188290352265],
     'delta': -0.0065188932579783865}]}]}

In [37]:
prediction_json = json.loads(open("./bert_prediction_output.json", "r").read())

In [39]:
import torch
attributions = explanations_json["outputs"][0]["data"][0]['importances']
tokens = explanations_json["outputs"][0]["data"][0]['words']
delta = explanations_json["outputs"][0]["data"][0]['delta']

attributions = torch.tensor(attributions)
pred_prob = 0.75
pred_class = str(prediction_json["outputs"][0]["data"][0]).strip('""')
true_class = "Business"
attr_class ="world"

# Visualization of Predictions

In [41]:
from captum.attr import visualization
vis_data_records =[]
vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred_prob,
                            pred_class,
                            true_class,
                            attr_class,
                            attributions.sum(),       
                            tokens,
                            delta))

In [42]:
vis = visualization.visualize_text(vis_data_records)

True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
Business,Business (0.75),world,-0.15,bloomberg has reported on the economy
,,,,


### visualization appreas as below
![viz1.png](./viz1.png)

## Cleanup Script

In [None]:
! kubectl delete --all isvc -n $NAMESPACE

In [None]:
! kubectl delete pod --field-selector=status.phase==Succeeded -n $NAMESPACE