# Intro to Machine Learning Pipelines

<a target="_blank" href="https://colab.research.google.com/github/unionai-oss/intro-to-ml-pipelines/blob/main/intro-to-ml-pipelines.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Google Colab / Jupyter notebook

In [None]:
print("Hello, world!")

# Workshop Setup

Sign up for Union while libraries are installing below:

- Union sign up: https://signup.union.ai/
- Union Platform: https://serverless.union.ai/
- Hugging Face Sign up: https://huggingface.co/

Install python libraries by running the code cell below:

In [None]:
%%bash
pip install -U scikit-learn==1.4.1.post1 matplotlib==3.8.3 seaborn==0.13.2 union==0.1.85 keyrings.alt==5.0.0 huggingface_hub==0.24.0 joblib==1.3.2 pyarrow==17.0.0 python-dotenv

Auth Union

In [None]:
!union create login --serverless --auth device-flow

## Build a Simple Workflow
- [Task Docs](https://docs.union.ai/byoc/core-concepts/tasks/)
- [Workflow Docs](https://docs.union.ai/byoc/core-concepts/workflows/)



In [None]:
%%writefile simple_wf.py

# Import libraries and modules
# task
from flytekit import task, workflow

@task
def hello_world(name: str) -> str:
    return f"Hello {name}"

# workflow
@workflow
def main(name: str) -> str:
    return hello_world(name=name)


In [None]:
# Run locally
!union run simple_wf.py main --name Flyte

In [None]:
# Run on Union
!union run --remote simple_wf.py main --name Flyte

In [None]:
!union run simple_wf.py main --help

In [None]:
!union run --help

# Machine Learning Workflow

- Create a Container image
- Download Dataset
- Process Data & Visualize
- Train Machine Learning Model
- Evaluate Machine Learning Model
- Save Model to Hugging Face Model Hub

https://huggingface.co/

In [None]:
# Secrets in Union
# HF Secret
!union create secret hf_token

In [None]:
!union get secret

In [None]:
# !union delete secret hf_token

In [16]:
%%writefile workflow.py
# Import libraries and modules
import html
import io
from textwrap import dedent
from typing import List

import joblib
import matplotlib as mpl
import matplotlib.pyplot as plt
import pandas as pd
import base64
import seaborn as sns
import os
from dotenv import load_dotenv
from flytekit import (Deck, ImageSpec, Resources, Secret, current_context,
                      task, workflow)
from huggingface_hub import HfApi
from sklearn.datasets import load_iris
from sklearn.metrics import ConfusionMatrixDisplay, classification_report
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

# Define the container image to use for the tasks on Union with ImageSpec
image = ImageSpec(
    packages=[
        "union==0.1.85",
        "flytekit==1.13.8",
        "scikit-learn==1.4.1.post1",
        "matplotlib==3.8.3",
        "seaborn==0.13.2",
        "joblib==1.3.2",
        "huggingface_hub==0.24.0",
        "pyarrow==17.0.0",
        "python-dotenv"
    ],
)

load_dotenv()

# Helper function to convert a matplotlib figure into an HTML string
def _convert_fig_into_html(fig: mpl.figure.Figure) -> str:
    img_buf = io.BytesIO()
    fig.savefig(img_buf, format="png")
    img_base64 = base64.b64encode(img_buf.getvalue()).decode()
    return f'<img src="data:image/png;base64,{img_base64}" alt="Rendered Image" />'


# Task: Download the dataset
@task(
    cache=True,
    cache_version="7",
    container_image=image,
    requests=Resources(cpu="2", mem="2Gi"),
)
def download_dataset() -> pd.DataFrame:
    iris = load_iris()
    iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_df['target'] = iris.target
    return iris_df

# Task: Process the dataset
@task(
    enable_deck=True,
    container_image=image,
    requests=Resources(cpu="2", mem="2Gi"),
)
def process_dataset(data_df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:

    # Perform the train-test split
    train_df, test_df = train_test_split(data_df,
                                         test_size=0.2,
                                         random_state=42,
                                         stratify=data_df['target'])

    # Seaborn pairplot full dataset
    pairplot = sns.pairplot(data_df, hue="target")

    metrics_deck = Deck("Metrics")
    metrics_deck.append(_convert_fig_into_html(pairplot))

    return train_df, test_df


# Task: Train a model
@task(
    container_image=image,
    requests=Resources(cpu="3", mem="3Gi"),
)
def train_model(dataset: pd.DataFrame, n_neighbors: int = 3) -> KNeighborsClassifier:
    X_train, y_train = dataset.drop("target", axis="columns"), dataset["target"]
    model = knn = KNeighborsClassifier(n_neighbors=n_neighbors)
    return model.fit(X_train, y_train)

# Evaluate the model using the test dataset
@task(
    container_image=image,
    enable_deck=True,
    requests=Resources(cpu="2", mem="2Gi"),
)
def evaluate_model(model: KNeighborsClassifier, dataset: pd.DataFrame) -> KNeighborsClassifier:
    ctx = current_context()

    X_test, y_test = dataset.drop("target", axis="columns"), dataset["target"]
    y_pred = model.predict(X_test)

    # Plot confusion matrix in deck
    fig, ax = plt.subplots()
    ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)

    metrics_deck = Deck("Metrics")
    metrics_deck.append(_convert_fig_into_html(fig))

    # Add classification report
    report = html.escape(classification_report(y_test, y_pred))
    html_report = dedent(
        f"""\
    <h2>Classification report</h2>
    <pre>{report}</pre>"""
    )
    metrics_deck.append(html_report)

    ctx.decks.insert(0, metrics_deck)

    return model

# Upload the model to Hugging Face Hub
@task(
    container_image=image,
    requests=Resources(cpu="1", mem="1Gi"),
    secret_requests=[Secret(group=None, key="hf_token")],
)
def upload_model_to_hf(model: KNeighborsClassifier, repo_name: str, model_name: str) -> str:
    ctx = current_context()

    # set hf_token from local or union secret
    hf_token = os.getenv("HF_TOKEN")
    if hf_token is None:
        # If HF_TOKEN is not found, attempt to get it from the Flyte secrets
        hf_token = ctx.secrets.get(key="hf_token")
        print("Using Hugging Face token from Union secrets.")
    else:
        print("Using Hugging Face token from env.")

    # Create a new repository (if it doesn't exist)
    api = HfApi()
    api.create_repo(repo_name, token=hf_token, exist_ok=True)

    # save model
    joblib.dump(model, model_name)

    # Upload the model to the HF repository
    api.upload_file(
        path_or_fileobj=model_name,
        path_in_repo=model_name,
        repo_id=repo_name,
        commit_message="Upload model",
        repo_type=None,
        token=hf_token
    )
    return f"Model uploaded to Hugging Face Hub: {repo_name}{model_name}"

# Task: Model prediction with custom input data, good for testing
@task(
    container_image=image,
    enable_deck=True,
    requests=Resources(cpu="2", mem="2Gi"),
)
def model_predict(model: KNeighborsClassifier, pred_data: List[List[float]]) -> List[int]:
    predictions = model.predict(pred_data)
    return predictions.tolist()


# Main workflow that orchestrates the tasks
@workflow
def main(repo_name: str, model_name: str, n_neighbors: int = 3,
          pred_data: List[List[float]] = [[1.5, 2.3, 1.3, 2.4]]) -> KNeighborsClassifier:
    data_df = download_dataset()
    train, test = process_dataset(data_df)
    model = train_model(dataset=train, n_neighbors=n_neighbors)
    evaluated_model = evaluate_model(model=model, dataset=test)
    model_predict(model=model, pred_data=pred_data)
    model_name = model_name
    upload_result = upload_model_to_hf(model=evaluated_model,
                                       repo_name=repo_name,
                                       model_name=model_name)
    return model


Overwriting workflow.py


In [None]:
! union run workflow.py main --help

In [None]:
# Run workflow remotely (Union serverless) or your own compute
!union run --remote workflow.py main --repo_name YOURUSERNAME/REPOID --model_name model.pkl

View workflow in Union dashboard

## Download Model from Hugging Face Hub

In [None]:
from huggingface_hub import hf_hub_download
import joblib
hf_token = ""

hf_model = hf_hub_download(repo_id="YOURUSERNAME/REPOID",
                filename="model.pkl",
                token=hf_token)

model = joblib.load(hf_model)


In [None]:
model

In [None]:
print(model.predict([[3,5,6,5]]))
print(model.predict([[.4,.2,.1,.6]]))

## Remote API

In [None]:
# initialize remote context
from union.remote import UnionRemote
remote = UnionRemote()

In [None]:
# search for the 100 most recent executions of the workflow
recent_executions = remote.recent_executions(limit=100)
executions = [
    e for e in recent_executions if e.spec.launch_plan.name == "workflow.main"
]

In [None]:
executions

In [None]:
#get latest execution id
recent_ex_id = executions[0].id.name
print(recent_ex_id)

In [None]:
execution = remote.fetch_execution(name=recent_ex_id)

In [None]:
execution

In [None]:
execution.outputs

In [None]:
model_uri = execution.outputs["o0"].remote_source
print(model_uri)

In [None]:
# fetch a task
predict_task = remote.fetch_task(name="workflow.model_predict")

# Fecth specific version of a task
# task = remote.fetch_task(name="workflow.model_predict", version="_XyPQzsykVFTceNWitQmAg")

In [None]:
# view task details
predict_task

In [None]:
from flytekit.types.file import FlyteFile
inputs = {
    "pred_data": [[1.5,2.3,1.3,2.4]],
    "model": FlyteFile(model_uri)
}

# # Execute the task
execution = remote.execute(predict_task, inputs=inputs)
# execution = remote.execute(task, inputs=inputs, wait=True) # wait for execution to finish

url = remote.generate_console_url(execution)
print(f"🚀 Union Serverless execution url: {url}")

In [None]:
synced_execution = remote.sync(execution)

In [None]:
synced_execution

In [None]:
outputs = synced_execution.outputs["o0"]
print(outputs)

### All together:

In [None]:
from union.remote import UnionRemote
from flytekit.types.file import FlyteFile

remote = UnionRemote()

def get_latest_execution_model(limit=100):

  recent_executions = remote.recent_executions(limit=limit)
  executions = [
      e for e in recent_executions if e.spec.launch_plan.name == "workflow.main"
  ]

  recent_ex_id = executions[0].id.name
  execution = remote.fetch_execution(name=recent_ex_id)
  model_uri = execution.outputs["o0"].remote_source

  return model_uri

def make_prediction(model_uri, pred_data):

  predict_task = remote.fetch_task(name="workflow.model_predict")


  inputs = {
      "pred_data": pred_data,
      "model": FlyteFile(model_uri)
  }

  # # Execute the task
  execution = remote.execute(predict_task, inputs=inputs, wait=True)

  response = execution.outputs["o0"]

  return response

model_uri = get_latest_execution_model()

In [None]:
make_prediction(model_uri, [[-3.0,-5.3,-6.3,-5.0]])

Note: some things to consider when using remote API for inference:
- Using [Union Artifacts](https://www.union.ai/blog-post/data-aware-event-driven-ai-orchestration-with-artifacts) to store the model and dataset
- Making inference a separate workflow
- Currently good for batch inference
- We're working on new features that will make models more readily available for inference in Union!

Keep Learning:

