# Week5 Assignments (part1)

**Please use the "mlops_eng2" Conda environment for this week's assignments.**

In this week's assignments, you will build and run a KFP pipeline that trains and deploys a LightGBM regression model to predict public bike sharing demand. The assignments have two parts. In the first part (this notebook), You will create some KFP components following. In the [second part](./week5_assignments_part2.ipynb) you will create a KFP pipeline using your KFP components. 

**Guidelines for submitting the assignments**:
- For each assignment, a code skeleton is provided. Please put your solutions in between the `### START CODE HERE` and `### END CODE HERE` code comments. Please **do not change any code other than those between the `### START CODE HERE` and `### END CODE HERE` comments**. Otherwise your notebook may not pass the tests used in grading. 
- At the end of the second part of the assignments, you will compile your KFP pipeline and save it to a YAML file `pipeline.yaml`. 
- Please return the assignment notebooks (`week5_assignment-part1.ipynb` and  `week5_assignments_part2.ipynb`) and the `pipeline.yaml` file generated in the second part.
- For some assignments, you'll also need to capture some screenshots. Please put your screenshots in a PDF and also submit the PDF along with the other files mentioned above. 

In [62]:
# Import packages
import kfp
from kfp import dsl
from kfp import kubernetes
from kfp.dsl import component, Input, Output, Dataset
from kfp.compiler import Compiler
from typing import NamedTuple, Dict, Any

import mlflow
import lightgbm
from unittest.mock import create_autospec
import pandas as pd
import os
import shutil
import logging
import warnings
from pathlib import Path

from utils.send_requests import send_requests

assert kfp.__version__ == "2.0.1", "Incorrect version of kfp"
assert lightgbm.__version__ == "3.3.5", "Incorrect version of lightgbm"

import lightgbm as lgb
import optuna
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import boto3





# Suppress logging and warnings when grade the notebook
# This is just for the grading purpose and doesn't affect you
if os.environ.get("NBGRADER_EXECUTION") in ["autograde", "validate"]:
    logging.getLogger("").setLevel(logging.ERROR)
    loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
    for logger in loggers:
        logger.setLevel(logging.ERROR)
    mlflow.utils.logging_utils.disable_logging()
    warnings.filterwarnings("ignore")

## Assignment 1: Create KFP components (10 points)
You will need to create five KFP components, each KFP component gives 2 points. 

### 1a) Create a pull data component

This KFP component:

1) downloads the dataset (a CSV file) as a Pandas DataFrame from a URL given as an input,

2) saves the DataFrame to an output of type Dataset so that the dataset can be used by other KFP components.

The dataset can be found [here](https://raw.githubusercontent.com/yumoL/mlops_eng_course_datasets/master/intro/bike-demanding/train_full.csv). It's the same bike sharing demand dataset used in some of the previous weeks. Below is the explanation of each columns in the dataset:

**Variables**:

| Column name |  Explanation | type |
|-------------|---------------|----|
| datetime    | hourly date + timestamp| object
| season      | 1 = spring, 2 = summer, 3 = fall, 4 = winter | integer
| holiday     | whether the day is considered a holiday | integer
| workingday  | 1 if day is neither weekend nor holiday, otherwise is 0. | integer
| weather     | 1: Clear, Few clouds, Partly cloudy, Partly cloudy; 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist; 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds; 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog | integer
| temp        | temperature in Celsius | float
| atemp       | "feels like" temperature in Celsius | float
| humidity    | relative humidity | integer
| windspeed   | wind speed | float

**Targets**: 

| Column name | Explanation                                     | Type
|-------------|-------------------------------------------------| ----|
| casual      | number of non-registered user rentals initiated | integer
| registered  | number of registered user rentals initiated     | integer
| count       | number of total rentals                         | integer


In [63]:
@component(
    base_image="python:3.11",
    packages_to_install=["pandas~=2.2.0"],
)
def pull_data(url: str, data: Output[Dataset]):
    """
    Args:
        url: Dataset URL
        data: Output of type Dataset where the downloaded dataset is saved
    """
    ### START CODE HERE
    df = pd.read_csv(url)
    df.to_csv(data.path, index=False)
    ### END CODE HERE

It seems there isn't a simply way to directly test the KFP component created by the @component decorator. A workaround is to access the inner decorated Python function through the component attribute `python_func`. 

Let's test if your `pull_data` works correctly.

In [64]:
# Mock an output Dataset
Path("raw_data.csv").unlink(missing_ok=True)
raw_dataset = create_autospec(Dataset, metadata=dict(), path="raw_data.csv")
open("raw_data.csv", "w").close() # Just make sure we have the write permission to the file

pull_data.python_func(
    url="https://raw.githubusercontent.com/yumoL/mlops_eng_course_datasets/master/intro/bike-demanding/train_full.csv", 
    data=raw_dataset)

raw_df = pd.read_csv("raw_data.csv")


Expected output:

![](./images/raw-df.png)

In [65]:
assert raw_df.shape == (10886, 12), "The shape of the DataFrame is incorrect."

### 1b) Create a data preprocessing component

This KFP component: 

1. reads a dataset from an input of type Dataset,

1. converts the "datetime" column into Pandas datetime object,

1. creates three more features (hour, day and month) from the "datetime" column,

1. removes the "datetime", "casual" and "registered" columns,

1. splits the dataset into a training and a test dataset, using the last 168 rows of the dataset as the test dataset. (The dataset contains data from two years and the data were generated on an hourly basis so we use the last seven days (24*7=168) as the test data.)

1. further splits the training and test datasets into features and targets (the target column is named "target") and saves them into four separate outputs of type Dataset.

The resulted feature datasets should have 11 columns: season, holiday, workingday, weather, temp, atemp, humidity, windspeed, casual, registered, hour, day, and month. The target datasets should have 1 column: count.

The following links may be helpful:
- [pandas.to_datetime](https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.to_datetime.html)
- [pandas.Series.dt.*](https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.Series.dt.hour.html)
- [pandas.DataFrame.drop](https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.DataFrame.drop.html)

In [66]:
@component(
    base_image="python:3.11",
    packages_to_install=["pandas~=2.2.0"],
)
def preprocess_data(
    data: Input[Dataset],
    train_x_csv: Output[Dataset],
    train_y_csv: Output[Dataset],
    test_x_csv: Output[Dataset],
    test_y_csv: Output[Dataset],
):
    """
    Args:
        data: Input of type Dataset where the dataset is read from
        train_x_csv: Output of type Dataset where the training features are saved
        train_y_csv: Output of type Dataset where the training target is saved
        test_x_csv: Output of type Dataset where the test features are saved
        test_y_csv: Output of type Dataset where the test target is saved
    """
    target = "count"

    ### START CODE HERE
    # Read the dataset
    df = pd.read_csv(data.path)

    # Convert "datetime" to Pandas datetime object
    df["datetime"] = pd.to_datetime(df["datetime"])

    # Create new features from the "datetime" column
    df["hour"] = df["datetime"].dt.hour
    df["day"] = df["datetime"].dt.day
    df["month"] = df["datetime"].dt.month

    # Drop unnecessary columns
    df = df.drop(columns=["datetime", "casual", "registered"])

    # Split into training and test datasets
    train_df = df.iloc[:-168]
    test_df = df.iloc[-168:]

    # Split features and target
    train_x = train_df.drop(columns=[target])
    train_y = train_df[[target]]
    test_x = test_df.drop(columns=[target])
    test_y = test_df[[target]]

    # Save datasets to output paths
    train_x.to_csv(train_x_csv.path, index=False)
    train_y.to_csv(train_y_csv.path, index=False)
    test_x.to_csv(test_x_csv.path, index=False)
    test_y.to_csv(test_y_csv.path, index=False)
    ### END CODE HERE

In [67]:
for file in ["train_x.csv", "train_y.csv", "test_x.csv", "test_y.csv"]:
    Path(file).unlink(missing_ok=True)
train_x_csv = create_autospec(Dataset, metadata=dict(), path="train_x.csv")
train_y_csv = create_autospec(Dataset, metadata=dict(), path="train_y.csv")
test_x_csv = create_autospec(Dataset, metadata=dict(), path="test_x.csv")
test_y_csv = create_autospec(Dataset, metadata=dict(), path="test_y.csv")

for file in [train_x_csv, train_y_csv, test_x_csv, test_y_csv]:
    open(file.path, "w").close() # Juts to make sure we can write to the file
    
preprocess_data.python_func(
    data=raw_dataset,
    train_x_csv=train_x_csv,
    train_y_csv=train_y_csv,
    test_x_csv=test_x_csv,
    test_y_csv=test_y_csv,
)

train_x_df = pd.read_csv("train_x.csv")
train_y_df = pd.read_csv("train_y.csv")
test_x_df = pd.read_csv("test_x.csv")
test_y_df = pd.read_csv("test_y.csv")


In [68]:
assert train_x_df.shape == (
    10718,
    11,
), "The shape of the train_x DataFrame is incorrect."
assert train_y_df.shape == (
    10718,
    1,
), "The shape of the train_y DataFrame is incorrect."
assert test_x_df.shape == (168, 11), "The shape of the test_x DataFrame is incorrect."
assert test_y_df.shape == (168, 1), "The shape of the test_y DataFrame is incorrect."

expected_feature_columns = [
    "season",
    "holiday",
    "workingday",
    "weather",
    "temp",
    "atemp",
    "humidity",
    "windspeed",
    "hour",
    "day",
    "month",
]
expected_target_column = "count"
assert set(train_x_df.columns) == set(
    expected_feature_columns
), "The columns of the training feature DataFrame are incorrect."
assert train_y_df.columns == [
    expected_target_column
], "The column of the training target DataFrame is incorrect."
assert set(test_x_df.columns) == set(
    expected_feature_columns
), "The columns of the test feature DataFrame are incorrect."
assert test_y_df.columns == [
    expected_target_column
], "The column of the train target DataFrame is incorrect."

### 1c) Create an HPO (hyperparameter optimization) component
We're going to train a LightGBM regression model to predict the bike sharing demand. In this KFP component, you need to create a component that uses Optuna to perform hyperparameter optimization for the LightGBM model.

This KFP component first reads the feature and target datasets of both training and test data from four inputs of type Dataset, respectively. Then it performs hyperparameter optimization for the LightGBM model using Optuna. (You can check week3 materials if you need a refresher on how to use Optuna). 

The target of the optimization is to minimize the MAE (mean absolute error) of the model when evaluating the model against the testing dataset. The hyperparameters to be tuned and their search ranges are shown below. The value of `random_state` is fixed. The hyperparameter values should be sampled in a linear domain if not separately specified. Please specify the hyperparameters in the same order as presented in the table. When define the search space for the hyperparameters, please use the names given in the "Hyperparameter" column in the table. 

| Hyperparameter    | Explanation                                                                 | type    | range                                                                    |
|:-------------------|:-----------------------------------------------------------------------------|:---------|:--------------------------------------------------------------------------|
| learning_rate     | The step size of the gradient descent. It controls how quickly the model fits and then overfits the training data.              | float   | [0.001, 0.1] (sampled from the logarithmic domain) |
| colsample_bytree  | The percentage of features to use when training each tree.                | float   | [0.05, 0.5]                                                              |
| num_leaves        | Max number of nodes in a single tree.                                       | integer | [2, 2^10]                                                                |
| random_state      | The seed for random number generation for reproducibility.                                   | integer | given as an argument `random_seed` |

When run the study:
- Use [TPESampler](https://optuna.readthedocs.io/en/stable/reference/samplers/generated/optuna.samplers.TPESampler.html) as the sampler for the hyperparameter sampling and use the value of the `random_seed` argument as the seed of the sampler. 
- The Optuna study perform `hpo_trials` (this is also an argument) trials.
- You don't need to persist the study anywhere.

Finally, the component should return a namedtuple with the following fields:
- `hyperparams`: the best hyperparameters found by the optimization
- `best_mae`: the best MAE found by the optimization

In [69]:
@component(
    base_image="python:3.11",
    packages_to_install=[
        "pandas~=2.2.0",
        "numpy~=1.26.4",
        "lightgbm==3.3.5",
        "optuna==3.5.0",
        "scikit-learn~=1.4.0",
    ],
)
def hpo(
    train_x_csv: Input[Dataset],
    train_y_csv: Input[Dataset],
    test_x_csv: Input[Dataset],
    test_y_csv: Input[Dataset],
    hpo_trials: int = 2,
    random_seed: int = 42
) -> NamedTuple(
    "Output",
    [
        ("hyperparams", Dict[str, Any]),
        ("best_mae", float),
    ],
):
    """
    Args:
        train_x_csv: Input where the training feature data is saved
        train_y_csv: Input where the training target data is saved
        test_x_csv: Input where the test feature data is saved
        test_y_csv: Input where the test target data is saved
        hpo_trials: The number of trials that the Optuna study should run
        random_seed: The random seed used for model training and t TPESampler
    Returns:
        namedtuple("Output", ["hyperparams", "best_mae"]) where hyperparams is the best hyperparameter combination found by the optimization 
        and best_mae is the best MAE found by the optimization. The hyperparams is a dictionary where the keys are the hyperparameter names 
        and values the hyperparameter values. It should also contain the random_state used in model training. 
        The returned namedtuple should be like:
        Output(hyperparams={'learning_rate': ..., 'colsample_bytree': ..., 'num_leaves': ..., 'random_state': <random_seed>}, best_mae=...)  
    """
    # TODO:
    # 1. Read the feature and target datasets 
    # 2. Define the objective function
    # 3. Create an Optuna study and run it. Assign the study to the "study" variable, i.e., study=optuna.create_study(...)
    ### START CODE HERE
    # Load the datasets
    train_x = pd.read_csv(train_x_csv.path)
    train_y = pd.read_csv(train_y_csv.path).squeeze()  # Convert to a Series
    test_x = pd.read_csv(test_x_csv.path)
    test_y = pd.read_csv(test_y_csv.path).squeeze()    # Convert to a Series

    # Define the Optuna objective function
    def objective(trial):
        params = {
            "learning_rate": trial.suggest_float("learning_rate", 0.001, 0.1, log=True),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1),
            "num_leaves": trial.suggest_int("num_leaves", 2, 1024),
            "random_state": random_seed,
            
        }

        # Train LightGBM model
        model = lgb.LGBMRegressor(**params)
        model.fit(train_x, train_y, eval_set=[(test_x, test_y)], eval_metric="mae", early_stopping_rounds=10, verbose=False)

        # Predict and calculate MAE
        predictions = model.predict(test_x)
        mae = mean_absolute_error(test_y, predictions)
        return mae

    # Create the Optuna study
    study = optuna.create_study(
        direction="minimize",
        sampler=optuna.samplers.TPESampler(seed=random_seed)
    )

    # Optimize the study
    study.optimize(objective, n_trials=hpo_trials)
    ### END CODE HERE
    
    # Insert the random_state into the hyperparams when prepare the output namedtuple
    hyperparams = {key: value for key, value in study.best_params.items()}
    hyperparams["random_state"] = random_seed

    # TODO: Construct and return the namedtuple
    
    ### START CODE HERE
    # Return the results
    best_mae = study.best_value
    Output = NamedTuple(
        "Output",
        [("hyperparams", Dict[str, Any]), ("best_mae", float)],
    )
    return Output(hyperparams, best_mae)

    ### END CODE HERE

In [70]:
hpo_output = hpo.python_func(
    train_x_csv=train_x_csv, 
    train_y_csv=train_y_csv, 
    test_x_csv=test_x_csv, 
    test_y_csv=test_y_csv, 
    hpo_trials=2, 
    random_seed=42
)
print(f"The output returned by the HPO component is {hpo_output}")


[I 2024-12-05 21:17:23,722] A new study created in memory with name: no-name-40e266f7-c5ae-4089-98d0-ec3400a6c8bf
[I 2024-12-05 21:17:26,822] Trial 0 finished with value: 91.7245589134533 and parameters: {'learning_rate': 0.005611516415334507, 'colsample_bytree': 0.9753571532049581, 'num_leaves': 750}. Best is trial 0 with value: 91.7245589134533.
[I 2024-12-05 21:17:27,992] Trial 1 finished with value: 80.8831514088002 and parameters: {'learning_rate': 0.015751320499779727, 'colsample_bytree': 0.5780093202212182, 'num_leaves': 161}. Best is trial 1 with value: 80.8831514088002.


The output returned by the HPO component is Output(hyperparams={'learning_rate': 0.015751320499779727, 'colsample_bytree': 0.5780093202212182, 'num_leaves': 161, 'random_state': 42}, best_mae=80.8831514088002)


In [71]:
hyperparams = hpo_output.hyperparams
for param_name in ["learning_rate", "colsample_bytree", "num_leaves", "random_state"]:
    assert param_name in hyperparams, f"{param_name} is not in the hyperparameters"
assert hpo_output.best_mae < 81, "Too large MAE."

Expected output:

```text
Output(hyperparams={'learning_rate': 0.015751320499779727, 'colsample_bytree': 0.5780093202212182, 'num_leaves': 161, 'random_state': 42}, best_mae=80.8831514088002)
```

### 1d) Create a train component

This component 

1) loads the feature and target datasets of both training and test data from four inputs of type Dataset, respectively,

2) uses the training dataset and the given hyperparameters to train a [LightGBM regression model](https://lightgbm.readthedocs.io/en/latest/pythonapi/lightgbm.LGBMRegressor.html). The hyperparameters are given in a dictionary, e.g., `{"num_leaves": 1023, "learning_rate": 0.05, ...}`. In the dictionary, each key is an argument name accepted by the LGBMRegressor class and the value is the value assigned to the corresponding argument,

3) evaluates the trained model against the testing dataset, the evaluation metrics to be used are **root mean squared error (RMSE), mean absolute error (MAE), r2_score**,

4) logs the used hyperparameters and evaluation metrics to an MLflow Run. The following information is given as the component inputs: the name of the MLflow Experiment under which the MLflow Run should be stored, the URIs of the MLflow service and the artifact store,

5) registers the model to MLflow, the artifact path relative to the MLflow Run is also given as an input,
6) returns the (absolute) S3 URI of the saved model in Mlflow's artifact store, e.g., `s3://mlflow/<mlflow-experiment-id>/<mlflow-run-id>/artifacts/bike-demand`.

**Notes**:
- When logging hyperparameters, please use the keys of the hyperparameter dictionary as the parameter names.
- When logging metrics, please use **"rmse", "mae", "r2"** as the metric names. 

In [80]:
import os
@component(
    base_image="python:3.11",
    packages_to_install=["pandas~=2.2.0", "numpy~=1.26.4", "lightgbm~=3.3.5", "scikit-learn~=1.4.0", "mlflow==2.9.2", "boto3~=1.34.40"],
)

def train(
    train_x_csv: Input[Dataset],
    train_y_csv: Input[Dataset],
    test_x_csv: Input[Dataset],
    test_y_csv: Input[Dataset],
    mlflow_experiment_name: str,
    mlflow_tracking_uri: str,
    mlflow_s3_endpoint_url: str,
    model_artifact_path: str,
    hyperparams: Dict[str, Any],
) -> str: 
    """
    Args:
        train_x_csv: Input where the training feature data is saved
        train_y_csv: Input where the training target data is saved
        test_x_csv: Input where the test feature data is saved
        test_y_csv: Input where the test target data is saved
        mlflow_experiment_name: Name of the MLflow experiment
        mlflow_tracking_uri: URI of MLflow's tracking server
        mlflow_s3_endpoint_url: URL of MLflow's artifact store
        model_artifact_path: The path where the artifacts of the model are stored in MLflow's artifact store. It's relative to the MLflow Run.
        hyperparams: Hyperparameters that need to be configured. The hyperparameters will be passed as a dictionary like {"num_leaves": 1023, "learning_rate": 0.05}
    
    Returns: 
        The S3 URI of the saved model in Mlflow's artifact store, e.g., s3://mlflow/13/e5559bc.../artifacts/bike-demand
    """
    ### START CODE HERE
    # Load the datasets
    train_x = pd.read_csv(train_x_csv.path)
    train_y = pd.read_csv(train_y_csv.path).squeeze()  # Convert to a Series
    test_x = pd.read_csv(test_x_csv.path)
    test_y = pd.read_csv(test_y_csv.path).squeeze()    # Convert to a Series

    # Set MLflow tracking URI and experiment name
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(mlflow_experiment_name)

    # Start an MLflow run
    with mlflow.start_run():
        os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
        os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
        os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://mlflow-minio.local"
        # Create and train the LightGBM model
        model = lgb.LGBMRegressor(**hyperparams)
        model.fit(train_x, train_y)

        # Make predictions on the test set
        test_pred = model.predict(test_x)

        # Calculate evaluation metrics
        rmse = mean_squared_error(test_y, test_pred, squared=False)  # RMSE
        mae = mean_absolute_error(test_y, test_pred)  # MAE
        r2 = r2_score(test_y, test_pred)  # R²

        # Log hyperparameters
        for param, value in hyperparams.items():
            mlflow.log_param(param, value)
        
        # Log metrics
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("r2", r2)

        # Log the model to MLflow
        mlflow.lightgbm.log_model(model, artifact_path=model_artifact_path, registered_model_name=mlflow_experiment_name)


        # Get the S3 URI of the saved model
        model_uri = mlflow.get_artifact_uri(artifact_path=model_artifact_path)

        # Return the S3 URI of the saved model
        return model_uri
    ### END CODE HERE
   

In [81]:
# These are needed for testing the function outside the MLOps platform
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://mlflow-minio.local"


train_output = train.python_func(
    train_x_csv=train_x_csv,
    train_y_csv=train_y_csv,
    test_x_csv=test_x_csv,
    test_y_csv=test_y_csv,
    mlflow_experiment_name="test",
    mlflow_tracking_uri="http://mlflow-server.local",
    mlflow_s3_endpoint_url="http://mlflow-minio.local",
    model_artifact_path="test-model",
    hyperparams=hpo_output.hyperparams,
)

print(f"The output returned by the Train component is {train_output}")

# Expected output:
# The output returned by the Train component is s3://mlflow/<mlflow-experiment-id>/<mlflow-run-id>/artifacts/test-model


Registered model 'test' already exists. Creating a new version of this model...
2024/12/05 22:02:30 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: test, version 6
Created version '6' of model 'test'.


The output returned by the Train component is s3://mlflow/9/f8e906d06e184bb0a13f34c2306177d9/artifacts/test-model


In [82]:
# Test the "train" function
import mlflow
mlflow_run_id = train_output.split("/")[-3]
mlflow_client = mlflow.MlflowClient()
mlflow_run = mlflow_client.get_run(mlflow_run_id)

# Check if the logged params and metrics are correct
assert set(["mae", "r2", "rmse"]) == set(mlflow_run.data.metrics.keys()), "The metrics are not logged correctly."
assert set(["colsample_bytree", "learning_rate", "num_leaves", "random_state"]) == set(mlflow_run.data.params.keys()), "The hyperparameters are not logged correctly."

# Check if a model is uploaded
model_versions = mlflow_client.search_model_versions(filter_string=f"run_id='{mlflow_run_id}'")
assert len(model_versions) > 0

At [http://mlflow-server.local](http://mlflow-server.local), you should see an MLflow Experiment named "test" and there is an MLflow Run under the "test" experiment. You should also see a registered model associated to the MLflow Run:

<img src="./images/mlflow-test-run.png" width="1000" />
<img src="./images/mlflow-test-model.png" width="1000" />

### 1e) Create a deploy model component

This component uses KServe Python SDK to deploy the trained model to KServe in the "kserve-inference" namespace. The component should create a new inference service or update an existing one. In the tutorial, you may notice that the component used for model deployment may be completed though the deployed inference service is not yet ready. Here, **the component should remain running until the status of the deployed inference service is ready or a timeout of 6 minutes is reached.**

The name of the inference service and the S3 URI of the model are given as inputs.

**Hint**: 
- Using the LightGBM server provided by KServe doesn't work because the model saved by MLflow is in the pickled format, which is different from the format supported by KServe's LightGBM server. You can check [here](https://github.com/kserve/kserve/issues/2483) on how to use KServe SDK to deploy a model saved by MLflow.
- [kserve.wait_isvc_ready](https://kserve.github.io/website/0.10/sdk_docs/docs/KServeClient/#wait_isvc_ready)

In [83]:

import time

@component(
    base_image="python:3.11",
    packages_to_install=["kserve==0.11.2"],
)
def deploy_model(model_name: str, storage_uri: str):
    """
    Args:
        model_name: the name of the deployed inference service
        storage_uri: the URI of the saved model in MLflow's artifact store
    """
    from kubernetes import client
    from kserve import KServeClient
    from kserve import constants
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1ModelSpec
    from kserve import V1beta1ModelFormat
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    namespace = "kserve-inference"
    service_account_name = "kserve-sa"
    api_version = constants.KSERVE_V1BETA1
    logger.info(f"MODEL URI: {storage_uri}")
    
    modelspec = V1beta1ModelSpec(
        storage_uri=storage_uri,
        model_format=V1beta1ModelFormat(name="mlflow"),
        protocol_version="v2"
    )
    
    ### START CODE HERE
    # Define the inference service specification
    isvc = V1beta1InferenceService(
        api_version=api_version,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            name=model_name,
            namespace=namespace
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                model=modelspec,
                service_account_name=service_account_name
            )
        )
    )
    
    # Create the KServe client
    kserve_client = KServeClient(config_file="~/.kube/config")
    
    try:
        # Delete the existing inference service if it exists
        try:
            kserve_client.delete(model_name, namespace=namespace)
            logger.info(f"Deleted existing inference service '{model_name}'.")
            time.sleep(100)
        except Exception as delete_err:
            logger.warning(f"Failed to delete existing inference service: {delete_err}")
        
        # Create a new inference service
        kserve_client.create(isvc)
        logger.info(f"Inference service '{model_name}' creation initiated.")
        
        # Wait for the inference service to become ready
        kserve_client.wait_isvc_ready(model_name, namespace=namespace)
        logger.info(f"Inference service '{model_name}' is ready.")
    
    except Exception as e:
        logger.error(f"Failed to create inference service: {e}")
        raise
    ### END CODE HERE
    

In [84]:

# This function needs some time (<5min) to finish because it needs to wait for the inference service to be ready
deploy_model.python_func(model_name="test-bike-demand", storage_uri=train_output)


INFO:__main__:MODEL URI: s3://mlflow/9/f8e906d06e184bb0a13f34c2306177d9/artifacts/test-model
INFO:__main__:Deleted existing inference service 'test-bike-demand'.
INFO:__main__:Inference service 'test-bike-demand' creation initiated.
INFO:__main__:Inference service 'test-bike-demand' is ready.


In [85]:
# Send a request to the inference service
# The inference service should be immediately read when the deploy_model component is executed
response = send_requests(isvc_name="test-bike-demand")
print(response.json())

# Expected output:
# {"model_name":"test-bike-demand","id":"13791e9c-3eb9-41b1-98c4-8f7da9f08d60","parameters":{},
# "outputs":[{"name":"output-1","shape":[2,1],"datatype":"FP64","data":[35.894812901164,31.72387585260099]}]}

assert response.json()["outputs"][0]["shape"] == [2, 1], "The inference service doesn't seem to be deployed correctly"

{'model_name': 'test-bike-demand', 'id': '2bdb31af-b79b-498c-a865-9ef48c1779c1', 'parameters': {}, 'outputs': [{'name': 'output-1', 'shape': [2, 1], 'datatype': 'FP64', 'data': [66.75263617512559, 64.23432649358075]}]}


In [86]:
# Train another model
train_output2 = train.python_func(
    train_x_csv=train_x_csv,
    train_y_csv=train_y_csv,
    test_x_csv=test_x_csv,
    test_y_csv=test_y_csv,
    mlflow_experiment_name="test",
    mlflow_tracking_uri="http://mlflow-server.local",
    mlflow_s3_endpoint_url="http://mlflow-minio.local",
    model_artifact_path="test-model",
    hyperparams={"learning_rate": 0.1, "colsample_bytree": 0.8, "num_leaves": 100, "random_state": 42},
)

deploy_model.python_func(model_name="test-bike-demand", storage_uri=train_output2)


Registered model 'test' already exists. Creating a new version of this model...
2024/12/05 22:05:27 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: test, version 7
Created version '7' of model 'test'.
INFO:__main__:MODEL URI: s3://mlflow/9/1449bbed1a3e4a89951dd718601f7d90/artifacts/test-model
INFO:__main__:Deleted existing inference service 'test-bike-demand'.
INFO:__main__:Inference service 'test-bike-demand' creation initiated.
INFO:__main__:Inference service 'test-bike-demand' is ready.


In [87]:
# Send a request to the inference service
# The inference service should be immediately read when the deploy_model component is executed
response2 = send_requests(isvc_name="test-bike-demand")
assert (
    response2.json()["outputs"][0]["data"] != response.json()["outputs"][0]["data"]
), "The predictions should be different from the ones made by the previous inference service."

In [88]:
# Delete the testing inference service
!kubectl -n kserve-inference delete isvc test-bike-demand

inferenceservice.serving.kserve.io "test-bike-demand" deleted


In [89]:
# Clean up by deleting the MLflow runs and models created in the test
import subprocess
mlflow_client = mlflow.MlflowClient()

mlflow_exp = mlflow_client.get_experiment_by_name("test")
mlflow_runs = mlflow_client.search_runs(experiment_ids=[mlflow_exp.experiment_id])
for mlflow_run in mlflow_runs:
    model_versions = mlflow_client.search_model_versions(
        f"run_id='{mlflow_run.info.run_id}'"
    )
    # Delete the registered model corresponding to the run if any
    if len(model_versions) > 0:
        mv = model_versions[0]
        mlflow_client.delete_model_version(mv.name, mv.version)
    # Permanently delete the run
    mlflow_client.delete_run(mlflow_run.info.run_id)
    subprocess.run(["utils/delete_mlflow_run.sh", mlflow_run.info.run_id])

Run with ID 1449bbed1a3e4a89951dd718601f7d90 has been permanently deleted.
Run with ID f8e906d06e184bb0a13f34c2306177d9 has been permanently deleted.
Run with ID 50b4c4954cec430d8ad0b115c3f15f33 has been permanently deleted.


Now, let's compile the KFP components you created and save them to YAML files located in the `./components` directory. The names of the YAML files should be same as your component function names. For example, the YAML file for the `pull_data` component is named `pull_data.yaml`.

In [90]:
# Create the component folder
component_dir_name = "components"
if os.path.exists(component_dir_name):
    print("Remove the existing directory")
    shutil.rmtree(component_dir_name)
os.mkdir(component_dir_name)

kfp_compiler = Compiler()
for func_name in ["pull_data", "preprocess_data", "hpo", "train", "deploy_model"]:
    kfp_compiler.compile(eval(func_name), os.path.join(component_dir_name, f"{func_name}.yaml"))
    print(f"Generated {os.path.join(component_dir_name, f'{func_name}.yaml')}")

Remove the existing directory
Generated components/pull_data.yaml
Generated components/preprocess_data.yaml
Generated components/hpo.yaml
Generated components/train.yaml
Generated components/deploy_model.yaml


You can continue to the [second part](./week5_assignments_part2.ipynb).