## Setup and Import Libraries

In [1]:
import datetime
import vertexai
from kfp import dsl
from kfp import compiler
from utils import authenticate
from google.cloud.aiplatform import PipelineJob
from google.oauth2 import service_account
from google.cloud import aiplatform, storage

import warnings
warnings.filterwarnings('ignore')

In [2]:
credentials, PROJECT_ID = authenticate()

In [3]:
REGION = 'us-central1'

In [4]:
vertexai.init(
    project=PROJECT_ID,
    location=REGION,
    credentials=credentials
)

## Kubeflow Pipelines

- Kubeflow pipelines consist of two key concepts: Components and pipelines.
- Pipeline components are like self-contained sets of code that perform various steps in your ML workflow, such as, the first step could be preprocessing data, and second step could betraining a model.

### Simple Pipeline Example

#### Build the Pipeline

In [8]:
### Simple example: component 1
@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    
    return hello_text

- Since we "wrapped" this `say_hello` function in the decorator `@dsl.component`, the function will not actually return a string.
- The function will return a `PipelineTask` object.

In [9]:
hello_task = say_hello(name="Erwin")
print(hello_task)

<kfp.dsl.pipeline_task.PipelineTask object at 0x000002C4D946CA60>


- The object that we'll use to pass the information in `hello_text` to other components in the pipeline is `PipelineTask.output`, which will be a built-in data type:
> `['String', 'Integer', 'Float', 'Boolean', 'List', 'Dict']`

In [10]:
print(hello_task.output)

{{channel:task=say-hello;name=Output;type=String;}}


- Note when passing in values to the a `dsl.component` function, you have to specify the argument names (keyword arguments), and can't use positional arguments.

In [11]:
# this will give an error and ask you to specify the parameter name
hello_task = say_hello("Erwin")

TypeError: Components must be instantiated using keyword arguments. Positional parameters are not allowed (found 1 such parameters for component "say-hello").

- The second component is dependent on the first component
- Take the output of the first component and pass it to the second component.

In [12]:
### Simple example: component 2
@dsl.component
def how_are_you(hello_text: str) -> str:
    
    how_are_you = f"{hello_text}. How are you?"
    
    return how_are_you

- Notice that when we pass in the return value from the `say_hello` function, we want to pass in the PipelineTask.output object, and not the PipelineTask object itself.

In [13]:
how_task = how_are_you(hello_text=hello_task.output)
print(how_task)
print(how_task.output)

<kfp.dsl.pipeline_task.PipelineTask object at 0x000002C4D94A9A20>
{{channel:task=how-are-you;name=Output;type=String;}}


In [14]:
# This will give an error and ask you to pass in a built-in data type
how_task = how_are_you(hello_text=hello_task)
print(how_task)
print(how_task.output)

ValueError: Constant argument inputs must be one of type ['String', 'Integer', 'Float', 'Boolean', 'List', 'Dict'] Got: <kfp.dsl.pipeline_task.PipelineTask object at 0x000002C4D946CA60> of type <class 'kfp.dsl.pipeline_task.PipelineTask'>.

- Define the pipeline.
- Notice how the input to `say_hello` is just `recipient`, since that is already a built-in data type (a String).
- Recall that to get the value from a PipelineTask object, you'll use `PipelineTask.output` to pass in that value to another Pipeline Component function.
- Notice that Pipeline function should return the PipelineTask.output as well.

In [15]:
### Simple example: pipeline
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
    
    # notice, just recipient and not recipient.output
    hello_task = say_hello(name=recipient)
    
    # notice .output
    how_task = how_are_you(hello_text=hello_task.output)
    
    # notice .output
    return how_task.output 

- If you run this pipeline function, you'll see that the return value (`task.output` was a String) is again wrapped inside a PipelineTask object.

In [16]:
pipeline_output = hello_pipeline(recipient="Erwin")
print(pipeline_output)

<kfp.dsl.pipeline_task.PipelineTask object at 0x000002C4D952B7C0>


- Note that if you tried to return a PipelineTask object instead of the PipelineTask.output, you'd get an error message

In [17]:
### Pipeline with wrong return value type
@dsl.pipeline
def hello_pipeline_with_error(recipient: str) -> str:
    hello_task = say_hello(name=recipient)
    how_task = how_are_you(hello_text=hello_task.output)

    return how_task 
    # returning the PipelineTask object itself will give you an error

ValueError: Got unknown pipeline output: <kfp.dsl.pipeline_task.PipelineTask object at 0x000002C4D952A8C0>

#### Implement the Pipeline

##### Implement the pipeline

- A pipeline is a set of components that you orchestrate.
- It lets you define the order of execution and how data flows from one step to another.
- Compile the pipeline into a yaml file, `pipeline.yaml`

In [18]:
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

In [19]:
pipeline_arguments = {
    "recipient": "World!",
}

In [20]:
!type pipeline.yaml

# PIPELINE DEFINITION
# Name: hello-pipeline
# Inputs:
#    recipient: str
# Outputs:
#    Output: str
components:
  comp-how-are-you:
    executorLabel: exec-how-are-you
    inputDefinitions:
      parameters:
        hello_text:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-say-hello:
    executorLabel: exec-say-hello
    inputDefinitions:
      parameters:
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-how-are-you:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - how_are_you
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
   

In [21]:
# job = PipelineJob(
#     template_path="pipeline.yaml",
#     display_name="demo_pipeline",
#     parameter_values=pipeline_arguments,
#     project=project_id,
#     location="us-central1",
#     pipeline_root="./",
# )

# job.submit()

# job.state

## Real-life Pipeline Example

### Automation and Orchestration of a Supervised Tuning Pipeline.

- Reuse an existing Kubeflow Pipeline for Parameter-Efficient Fine-Tuning (PEFT) for a foundation model from Google, called [PaLM 2](https://ai.google/discover/palm2/). 
- Advantage of reusing a pipleline means you do not have to build it from scratch, you can only specify some of the parameters.

In [22]:
TRAINING_DATA_URI = "./tune_data_stack_overflow_python_qa.jsonl" 
EVAUATION_DATA_URI = "./tune_eval_data_stack_overflow_python_qa.jsonl"  

- Provide the model with a version.
- Versioning model allows for:
  - Reproducibility: Reproduce your results and ensure your models perform as expected.
  - Auditing: Track changes to your models.
  - Rollbacks: Roll back to a previous version of your model

In [23]:
template_path = 'https://us-kfp.pkg.dev/ml-pipeline/\
large-language-model-pipelines/tune-large-model/v2.0.0'

In [24]:
date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

In [25]:
MODEL_NAME = f"deep-learning-ai-model-{date}"

- This example uses two PaLM model parameters:
  - `TRAINING_STEPS`: Number of training steps to use when tuning the model. For extractive QA you can set it from 100-500. 
  - `EVALUATION_INTERVAL`: The interval determines how frequently a trained model is evaluated against the created *evaluation set* to assess its performance and identify issues. Default will be 20, which means after every 20 training steps, the model is evaluated on the evaluation dataset.

In [26]:
TRAINING_STEPS = 200
EVALUATION_INTERVAL = 20

- Define the arguments, the input that goes into the pipeline.

In [27]:
pipeline_arguments = {
    "model_display_name": MODEL_NAME,
    "location": REGION,
    "large_model_reference": "text-bison@001",
    "project": PROJECT_ID,
    "train_steps": TRAINING_STEPS,
    "dataset_uri": TRAINING_DATA_URI,
    "evaluation_interval": EVALUATION_INTERVAL,
    "evaluation_data_uri": EVAUATION_DATA_URI,
}

The code is provided below (for the real-life example from above). Keep in mind, **running this execution is time consuming and expensive**:

In [28]:
# pipeline_root "./"

# job = PipelineJob(
#         template_path=template_path,
#         display_name=f"deep_learning_ai_pipeline-{date}",
#         parameter_values=pipeline_arguments,
#         location=REGION,
#         pipeline_root=pipeline_root,
#         enable_caching=True,
# )

# job.submit()

# job.state