Using Kubeflow Pipelines to automate, and create a ML pipeline 

We can also use Apache Airflow these both are orchestration tools, that can help us to design the ML workflow

In [None]:
from kfp import dsl #its like a drawing board to design our workflow
from kfp import compiler

# Ignore FutureWarnings in kfp
import warnings
warnings.filterwarnings("ignore", 
                        category=FutureWarning, 
                        module='kfp.*')

Kubeflow has 2 parts - components and pipelines

Components are like self contained sets of code that perform a certain steps in our ML workflow, like data preprocessing or pre training the model

Building a simple workflow 

In [None]:
### Simple example: component 1
@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    
    return hello_text

In [None]:
### Simple example: component 2
@dsl.component
def how_are_you(hello_text: str) -> str:
    
    how_are_you = f"{hello_text}. How are you?"
    
    return how_are_you

In [None]:
### Simple example: pipeline
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
    
    # notice, just recipient and not recipient.output
    hello_task = say_hello(name=recipient)
    
    # notice .output
    how_task = how_are_you(hello_text=hello_task.output)
    
    # notice .output
    return how_task.output 

In [None]:
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

These arguments are ingected into the pipeline during execution

In [None]:
pipeline_arguments = {
    "recipient": "World!",
}

To view the pipeline

In [None]:
!cat pipeline.yaml

### import `PipelineJob` 
from google.cloud.aiplatform import PipelineJob

job = PipelineJob(
        ### path of the yaml file to execute
        template_path="pipeline.yaml",
        ### name of the pipeline
        display_name=f"deep_learning_ai_pipeline",
        ### pipeline arguments (inputs)
        ### {"recipient": "World!"} for this example
        parameter_values=pipeline_arguments,
        ### region of execution
        location="us-central1",
        ### root is where temporary files are being 
        ### stored by the execution engine
        pipeline_root="./",
)

### submit for execution
job.submit()

### check to see the status of the job
job.state

### Real life pileline example

Reuse and existing Kubeflow pipeline to fine tune a Google foundation model "PaLM 2"

In [None]:
### these are the same 
### jsonl files from the previous lab

### time stamps have been removed so that 
### the files are consistent for all learners
TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl" 
EVAUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"  

Versioning the model using date and time

In [None]:
### path to the pipeline file to reuse
### the file is provided in your workspace as well
template_path = 'https://us-kfp.pkg.dev/ml-pipeline/\
large-language-model-pipelines/tune-large-model/v2.0.0'

In [None]:
import datetime

date = datetime.datetime.now().strftime("%H:%d:%m:%Y")

MODEL_NAME = f"deep-learning-ai-model-{date}"

In [None]:
TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20

from utils import authenticate
credentials, PROJECT_ID = authenticate() 

REGION = "us-central1"

Defining the arguments

In [None]:
pipeline_arguments = {
    "model_display_name": MODEL_NAME,
    "location": REGION,
    "large_model_reference": "text-bison@001",
    "project": PROJECT_ID,
    "train_steps": TRAINING_STEPS,
    "dataset_uri": TRAINING_DATA_URI,
    "evaluation_interval": EVALUATION_INTERVAL,
    "evaluation_data_uri": EVAUATION_DATA_URI,
}

For using with Vertex AI

pipeline_root "./"

job = PipelineJob(
        ### path of the yaml file to execute
        template_path=template_path,
        ### name of the pipeline
        display_name=f"deep_learning_ai_pipeline-{date}",
        ### pipeline arguments (inputs)
        parameter_values=pipeline_arguments,
        ### region of execution
        location=REGION,
        ### root is where temporary files are being 
        ### stored by the execution engine
        pipeline_root=pipeline_root,
        ### enable_caching=True will save the outputs 
        ### of components for re-use, and will only re-run those
        ### components for which the code or data has changed.
        enable_caching=True,
)

### submit for execution
job.submit()

### check to see the status of the job
job.state