# About this Jupyter Notebook

@author: Yingding Wang\
@updated: 08.09.2023

This notebook defines and runs a kubeflow pipeline with KFP python SDK v1 for using LlaMA2 and T5 based de_en translatition models to extract information from a non-structured PDF data.

The prompt is specially constructed to extract "patient name" and "patient age" information from a doctor's letter.

## Install KFP Python SDK to build a V1 pipeline
* Build KF pipeline with python SDK: https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/
* Current KFP python SDK version on pypi.org: https://pypi.org/project/kfp/ 

In [1]:
import sys

In [2]:
# !{sys.executable} -m pip install --upgrade --user kfp==2.0.0b13
#!{sys.executable} -m pip install --upgrade --user kfp==1.8.22

## Restart the Kernel

After the installation of KFP python SDK, the notebook kernel must be restarted.

## Getting familiar with Jupyter Notebook ENV 

In [3]:
from platform import python_version
print (f"current platform python version: {python_version()}")

current platform python version: 3.8.10


In [4]:
# run kubectl command line to see the quota in the name space
!kubectl describe quota

Name:                                                         kf-resource-quota
Namespace:                                                    kubeflow-kindfor
Resource                                                      Used     Hard
--------                                                      ----     ----
basic-csi.storageclass.storage.k8s.io/persistentvolumeclaims  4        15
basic-csi.storageclass.storage.k8s.io/requests.storage        115Gi    150Gi
cpu                                                           2300m    128
longhorn.storageclass.storage.k8s.io/persistentvolumeclaims   1        15
longhorn.storageclass.storage.k8s.io/requests.storage         250Gi    500Gi
memory                                                        25518Mi  512Gi
requests.nvidia.com/mig-1g.10gb                               0        2
requests.nvidia.com/mig-1g.20gb                               0        1
requests.nvidia.com/mig-2g.20gb                               1        1


In [5]:
# examing the kfp python sdk version inside a KubeFlow v1.5.1
!{sys.executable} -m pip list | grep kfp

kfp                       1.8.22
kfp-pipeline-spec         0.1.16
kfp-server-api            1.8.5


## Setup global variables

In [6]:
import kfp
client = kfp.Client()
NAMESPACE = client.get_user_namespace()
EXPERIMENT_NAME = 'scivias' # Name of the experiment in the KF webapp UI
EXPERIMENT_DESC = 'extract information from doctors letter'
PREFIX = "llm"
DATA_ROOT = "/mnt"
DATA_SUB_PATH = "core-kind/yinwang"
DEFAULT_MODEL_TYPE = "7B"

print(NAMESPACE)

kubeflow-kindfor


In [7]:
from dataclasses import dataclass


@dataclass
class Settings:
    base_torch_image: str = "harbor-dmz.srv.med.uni-muenchen.de/core-general/ngc:0.0.0"

    
settings = Settings() 
print(f"{settings}")

Settings(base_torch_image='harbor-dmz.srv.med.uni-muenchen.de/core-general/ngc:0.0.0')


### Creating KubeFlow component from python function

In [8]:
# import kfp dsl components
import kfp.dsl as dsl
from functools import partial
from kfp.dsl import (
    pipeline,
    ContainerOp,
    PipelineVolume
)
from kfp.components import (
    InputPath,
    OutputPath,
    create_component_from_func
)

### Create llm inference component

#### Subprocess call to pass the nvidia-smi output

* Python 3.5 subprocess.run https://stackoverflow.com/questions/4760215/running-shell-command-and-capturing-the-output
* https://stackoverflow.com/questions/7681715/whats-the-difference-between-subprocess-popen-and-call-how-can-i-use-them

In [9]:
@partial(
    create_component_from_func,
    output_component_file=f"{PREFIX}_inference_component.yaml",
    base_image=settings.base_torch_image, # use pt base image
    packages_to_install=[
        # f"tensorflow-datasets=={settings.tf_datasets}",
    ] # adding additional libs
)
def llm_inference(data_root: str, data_sub_path: str, model_type: str, prompt: str) -> str:
    import subprocess
    import os, time, sys, re
    
    
    class GPUInfoHelper():
        def __init__(self):
            pass


        def byte_gb_info(self, byte_mem) -> str:
            """calculate the byte size to GB size for better human readable"""
            # format the f string float with :.2f to decimal digits
            # https://zetcode.com/python/fstring/
            return f"{(byte_mem/1024**3):4f} GB"


        def accelerator_mem_info(self, device_idx: int):
            # total
            t = torch.cuda.get_device_properties(device_idx).total_memory
            # usable
            r = torch.cuda.memory_reserved(device_idx)
            # allocated
            a = torch.cuda.memory_allocated(device_idx)
            # still free
            f = r-a  
            print( # "GPU memory info:\n" + 
                  f"Physical  memory : {self.byte_gb_info(t)}\n" + 
                  f"Reserved  memory : {self.byte_gb_info(r)}\n" + 
                  f"Allocated memory : {self.byte_gb_info(a)}\n" + 
                  f"Free      memory : {self.byte_gb_info(f)}")


        def accelerator_compute_info(self, device_idx: int) -> None:
            name = torch.cuda.get_device_properties(device_idx).name
            count = torch.cuda.get_device_properties(device_idx).multi_processor_count
            print(f"Device_name      : {name} \n" +
                  f"Multi_processor  : {count}")    


        def gpu_usage(self) -> None:        
            num_of_gpus = torch.cuda.device_count();
            # this shows only the gpu device, not the MIG
            print(f"num_of_gpus: {num_of_gpus}")
            for device_idx in range(torch.cuda.device_count()):
                print("-"*20)
                self.accelerator_compute_info(device_idx)                 
                self.accelerator_mem_info(device_idx)
                print("-"*20)
    
    
    def display_container_info():
        print("-"*10)
        print(f"python version: {sys.version}")
        print(f"torch version: {torch.__version__}")
        print("-"*10)
    
    
    def nvidia_device_uuid(input: str):
        """parse the nvidia devices uuid from the nvidia device info str
        """
        try:
            # r'' before the search pattern indicates it is a raw string, 
            # otherwise "" instead of single quote
            uuid = re.search(r'UUID\:\s(.+?)\)', input).group(1)
        except AttributeError:
            # "UUID\:\s" and "\)" not found
            uuid = ""
        return uuid
    
    
    def nvidia_device_info() -> str:
        """get the nvidia MIGs device uuid and GPU uuid 
        """
        # blocking call
        result = subprocess.run(["nvidia-smi", "-L"], stdout=subprocess.PIPE)
        # decode the byte object, returns string with \n
        cmd_out_str = result.stdout.decode('utf-8')
        return [line.strip() for line in cmd_out_str.split('\n') if len(line) > 0]
        
    
    def nvidia_mig_uuids() -> str:
        """get a comma separated str of nvidia MIGs devices
        """
        info_list = nvidia_device_info()
        # skip the first GPU ID, get the MIGs IDS
        uuid_list = [nvidia_device_uuid(e) for e in info_list[1:]]
        # if multi gpus need to join the device together for pytorch
        return ",".join(uuid_list)
    
    
    def init_cuda_torch(uuids: str, data_path: str) -> None:
        """setup the default env variables for transformers
        
        Args:
          uuids: a comma separate str of nvidia gpu/mig uuids
        """
        os.environ["WORLD_SIZE"] = "1" 
        os.environ["CUDA_VISIBLE_DEVICES"] = uuids 
        os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512" #512
        os.environ['XDG_CACHE_HOME']=f"{data_path}/models"
        
        
    def show_folder_files(folder: str) -> None:
        print(os.listdir(folder))
        
        
    def huggingface_access_token(data_path: str) -> str:
        token_file_path = f"{data_path}/.cache/huggingface/token"
        token = ""
        with open(token_file_path, "r") as file:
            token = file.read().replace('\n', '')
        return token
        
        
    '''Global variable'''
    model_map = {
        "7B": "meta-llama/Llama-2-7b-chat-hf",
        "13B" : "meta-llama/Llama-2-13b-chat-hf",
        "70B" : "meta-llama/Llama-2-70b-hf" 
    }
    data_path = f"{data_root}/{data_sub_path}"
    model_name = model_map.get(model_type, "7B")
    nvidia_state = GPUInfoHelper()
    
    '''Initialization'''
    UUIDs = nvidia_mig_uuids()
    init_cuda_torch(UUIDs, data_path)
    import torch
    display_container_info()
    print(UUIDs)
    
    show_folder_files(data_path)
    
    '''Transformers must be imported after the init_cuda_torch to get env set'''
    import transformers
    from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
    
    def chat_gen(
        generator: transformers.pipelines.text_generation.TextGenerationPipeline, 
        tokenizer: transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast
    ):    
        def local(input: str, print_mode: bool = True) -> list:
            start = time.time()
            sequences = generator(
                input,
                do_sample=True,
                top_k=10,
                num_return_sequences=1,
                eos_token_id=tokenizer.eos_token_id,
                # max_length=200,
                max_new_tokens=200,
            )
            result = []
            for seq in sequences:
                result.append(f"Result: \n{seq['generated_text']}")

            end = time.time()
            duration = end - start
            if print_mode == True:
                for s in result:
                    print(s)

                print("-"*20)
                print(f"walltime: {duration} in secs.")
                gpu_usage() 
            else:
                return result
        return local
    
    
    token = huggingface_access_token(data_path)
    print(f"Loading LLM model {model_name} ...")
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=token)
    
    generator = pipeline(
        "text-generation",
        model=model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        token=token,
        # use_auth_token=token,
    )
    # print gpu mem usage after loading the llm
    nvidia_state.gpu_usage()
    # create convenient chat function
    chat = chat_gen(generator, tokenizer)
    
    talk_back = chat(prompt)
    return talk_back
    
    

### Create data processing component

### Define Helper Function
Difference between 2Gi and 2G:
* https://stackoverflow.com/questions/50804915/kubernetes-size-definitions-whats-the-difference-of-gi-and-g/50805048#50805048

Set MIG GPU requests:
* https://github.com/kubeflow/pipelines/issues/6858#issuecomment-1007511676

```python
containerOp.add_resource_request(gpu_resource, gpu_req)
containerOp.add_resource_limit(gpu_resource, gpu_lim)
```

In [10]:
def pod_resource_transformer(op: ContainerOp, mem_req="200Mi", cpu_req="2000m", mem_lim="4000Mi", cpu_lim='4000m', gpu_req=None, gpu_lim=None):
    """
    this function helps to set the resource limit for container operators
    op.set_memory_limit('1000Mi') = 1GB
    op.set_cpu_limit('1000m') = 1 cpu core
    """
    gpu_resource = "nvidia.com/mig-1g.20gb"
    new_op = op.set_memory_request(mem_req)\
        .set_memory_limit(mem_lim)\
        .set_cpu_request(cpu_req)\
        .set_cpu_limit(cpu_lim)
    if (gpu_req is not None) and (gpu_lim is not None):
        new_op.add_resource_request(gpu_resource, gpu_req)
        new_op.add_resource_limit(gpu_resource, gpu_lim)
    return new_op

## Define Pipeline
* Intro Kubeflow pipeline: https://v1-5-branch.kubeflow.org/docs/components/pipelines/introduction/
* Kubeflow pipeline SDK v1: https://v1-5-branch.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/

In [11]:
@pipeline(
    name = EXPERIMENT_NAME,
    description = EXPERIMENT_DESC
)
def custom_pipeline(data_root: str= "/mnt", data_sub_path: str="core-kind/yinwang", model_type: str="7B"):
    """
    Args:
      data_root: the mount path of shared data volume.
      data_sub_path: the relative path to the data folder, without leading ./
    """
    
    '''local variable'''
    no_artifact_cache = "P0D"
    artifact_cache_today = "P1D"
    cache_setting = no_artifact_cache
    prompt = "how are you buddy?"
    
    '''Pipeline Volume'''
    # predefined pvc in namespace
    shared_volume = PipelineVolume("llm-models")
    
    '''pipeline'''   
    inference_task = llm_inference(
        data_root=data_root, 
        data_sub_path=data_sub_path, 
        model_type=model_type,
        prompt=prompt
    )
    # 200 MB ram and 1 cpu
    inference_task = pod_resource_transformer(inference_task, mem_req="24000Mi", cpu_req="1000m", mem_lim="24000Mi", cpu_lim="2000m", gpu_req=1, gpu_lim=1)
    # set the download caching to be 1day, disable caching with P0D
    inference_task.execution_options.caching_strategy.max_cache_staleness = cache_setting
    inference_task.add_pvolumes({data_root: shared_volume})
    inference_task.set_display_name("llama2 inference")
    
    

### (optional) pipeline compile step
use the following command to compile the pipeline to 

In [12]:
PIPE_LINE_FILE_NAME=f"{PREFIX}_kfp1_info_extraction_pipeline"
kfp.compiler.Compiler().compile(custom_pipeline, f"{PIPE_LINE_FILE_NAME}.yaml")

### Create Experiment Run

create run label with current data time
```python
from datetime import datetime
from pytz import timezone as ptimezone
ts = datetime.strftime(datetime.now(ptimezone("Europe/Berlin")), "%Y-%m-%d %H-%M-%S")
print(ts)
```

Reference:
* https://stackoverflow.com/questions/25837452/python-get-current-time-in-right-timezone/25887393#25887393

In [13]:
from datetime import datetime
from pytz import timezone as ptimezone

def get_local_time_str(target_tz_str: str = "Europe/Berlin", format_str: str = "%Y-%m-%d %H-%M-%S") -> str:
    """
    this method is created since the local timezone is miss configured on the server
    @param: target timezone str default "Europe/Berlin"
    @param: "%Y-%m-%d %H-%M-%S" returns 2022-07-07 12-08-45
    """
    target_tz = ptimezone(target_tz_str) # create timezone, in python3.9 use standard lib ZoneInfo
    # utc_dt = datetime.now(datetime.timezone.utc)
    target_dt = datetime.now(target_tz)
    return datetime.strftime(target_dt, format_str)

### Config pipeline run
* Setting imagePullSecretes for Pipeline with SDK: https://github.com/kubeflow/pipelines/issues/5843#issuecomment-859799181

In [14]:
# from kubernetes import client as k8s_client
pipeline_config = dsl.PipelineConf()

# pipeline_config.set_image_pull_secrets([k8s_client.V1ObjectReference(name=K8_GIT_SECRET_NAME, namespace=NAME_SPACE)])
pipeline_config.set_image_pull_policy("Always")
# pipeline_config.set_image_pull_policy("IfNotPresent")

pipeline_args = {
    'data_root' : DATA_ROOT,
    'data_sub_path' : DATA_SUB_PATH,
    'model_type': DEFAULT_MODEL_TYPE,
}

In [15]:
RUN_NAME = f"{PREFIX}_extract_info_kfp1 {get_local_time_str()}"

# client = kfp.Client()
client.create_run_from_pipeline_func(
    pipeline_func=custom_pipeline,
    arguments = pipeline_args, #{}
    run_name = RUN_NAME,
    pipeline_conf=pipeline_config,
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

RunPipelineResult(run_id=f4a6f068-5157-496e-9c16-116ac2281e39)