# 🔄 Build an ML Pipeline with scikit-learn & Union

<a target="_blank" href="https://colab.research.google.com/github/unionai-oss/scikit-learn-ml-pipelines/blob/main/tutorial.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

This tutorial will walk you through building an end-to-end machine learning pipeline using scikit-learn and Union's AI workflow and inference platform. We'll download a dataset, train a machine learning model, deploy it, and track its artifacts using Union's powerful MLOps features. Although this example may seem relatively simple, all the concepts and tools used here can be applied to more complex machine learning and AI projects.


By just adding a few lines of code to your Python functions, you'll be able to create a reproducible ML pipeline, taking advantage of Union's features:

- Reproducible AI workflows: Ensure your ML pipeline produces the same environments every time.
- Versioning of code and artifacts: Track changes in your code and models automatically.
- Data Caching for faster iterations: Reuse results from previous executions to save time.
- Declarative Infrastructure: Define your ML infrastructure needs directly in your code without worrying about provisioning.
- Artifact Management for models and data: Automatically manage your model files and datasets.
- Container Image Builder: Build and deploy your code in a consistent environment.
- Local Development: Test your workflows locally before deploying them to the cloud.
- Actors for long-running stateful containers: Handle tasks that require continuous state or interaction.
- And more...

```python
@task(
    cache=True,
    cache_version="4",
    container_image=image,
    requests=Resources(cpu="2", mem="2Gi")
)
def download_data(): -> pd.DataFrame:
    ...

@task(
    container_image=image,
    requests=Resources(cpu="2", mem="20Gi", gpu="1")
)
def train_model(data: pd.DataFrame:): -> pytorch.Model:
    ...

@workflow()
def pipeline_workflow():
    data = download_data()
    train_model(data=data)
    ...

```


## 🧰 Setup 

Sign up for a Union Serverless account at [Union.ai](https://union.ai) by clicking the "Get Started" button. No card required, and you'll get $30 in free credits to get started. Signing up can take a few minutes.

Or you can use your [Union BYOC Enterprise](https://www.union.ai/pricing) login if you have one.

### 📦 Install Python Packages & Clone Repo

Packages can be installed in your local environment using the following command using your preferred package manager from the [requirements.txt](requirements.txt) file. For example `pip install -r requirements.txt`. 

to clone the repo, run the following command in your environment: `git clone `

If you're running this notebook in a Google Colab environment, you can install the packages and clone the GitHub repo directly in the notebook by running the following cell:


In [15]:
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    !git clone https://github.com/unionai-oss/scikit-learn-ml-pipelines
    %cd scikit-learn-ml-pipelines
    !pip install -r requirements.txt

### 🔐 Authenticate

If you're using [Union BYOC Enterprise](https://www.union.ai/pricing) use: `union create login --host <union-host-url>`

Otherwise, Authenticate to [Union Serverless](https://www.union.ai/) by running the command below - create an account for free at [Union.ai](https://union.ai) if you don't have one:
 

In [None]:
!union create login --serverless --auth device-flow

🔐 [33mConfiguration saved to [0m[33m/Users/sageelliott/.union/[0m[33mconfig.yaml[0m
Login successful into [1;32mserverless[0m


## 🧩 Create a Simple Workflow

Before we build our ML pipeline lets build a simple workflow to understand the basics of Union's workflow system.

`ImageSpec` - Allows you to specify the environment in which your task will run directly in your Python code. This includes the Python packages, CUDA version, and any additional environment setup you need. When a task is run, Union will automatically build a container image with the specified environment if it doens't already exsist and run the task in that container.

`Tasks` - Tasks are the building blocks of workflows. They allow you to define a unit of work and what infrastructure to us.

`Workflows` - A workflow is a collection of tasks that and defines data flow. Workflows can be run locally or in the cloud.

Both tasks workflows are strongly typed


In [32]:
import flytekit as fl
import pandas as pd
import sys


image = fl.ImageSpec(
    name="notebook-example",
    packages=[
        "flytekit==1.15.0",
        "union==0.1.144",
    ],
    python_version=f"{sys.version_info.major}.{sys.version_info.minor}",
)

# task = fl.task(container_image=image)

@fl.task(container_image=image)
def hello_world(name: str) -> str:
    """Returns a greeting."""
    return f"Hello, {name}!"

@fl.workflow
def my_workflow(name: str ="union.ai") -> str:
    return hello_world(name=name)

In [33]:
from union.remote import UnionRemote
serverless = UnionRemote()

In [34]:
exe = serverless.execute(my_workflow, inputs={"name": "Flyte"})
exe

[34mImage notebook-example:2_vWc6HpkiQPaUm3f1PbWg was not found or has expired.[0m
[34m[1m🐳 Submitting a new build...[0m


[33m[1m👍 Build submitted![0m
[1m⏳ Waiting for build to finish at: [36mhttps://serverless.union.ai/org/sagecodes/projects/system/domains/production/executions/ah4xl7829bdjp9wb4jns[0m[0m
[32m[1m✅ Build completed in 0:00:24![0m


In [35]:
exe.wait(poll_interval=1)
dataframe = exe.outputs['o0']
dataframe

'Hello, Flyte!'

## 🔀 ML Model Training Pipeline

In this sections we'll be running tasks and workflows defined in Python under the relevant folders. 

Navigate to the `tasks` and `workflows` folders to see the code. if you're following along in a hosted jupyter notebook you should be able to view the code by clicking on a folder icon (usually on the left side of the screen).

First we'll create a machine learning pipeline that trains a model on the iris dataset.

Our workflow will have the following steps:
- Load the iris dataset
- Split the dataset into training and testing sets
- Train a Random Forest model
- Evaluate the model
- Save model as an artifact
- run a prediction with new data

Note: Data pipelines could be seperate from model training pipelines for more complex pipelines. In this example we'll keep it simple and combine them into one workflow.

navigate to the [workflows/workflows.py](workflows/workflows.py`workflows.py) file. Find `train_iris_classification()` function to see the code for the workflow. This workflow uses tasks defined in the [/tasks](tasks/data.py) folder and builds a container image from [container.py](containers.py).

In [21]:
!union run --remote workflows/workflows.py train_iris_classification

[36mRunning Execution on Remote.[0m
[34mImage flytekit:17ryEc3h1qj5D_caqMlI6w found. Skip building.[0m
[2K[33m0:00:00[0m Running execution on remote.[32m
[✔] [0mGo to [36mhttps://serverless.union.ai/org/sagecodes/projects/default/domains/development/executions/av498gzzlfcvqcflfsg2[0m to see execution in the console.
[2K[33m0:00:00[0m Running execution on remote.
[1A[2K

The `--remote` flag is used to run the workflow in the cloud. If you want to run the workflow locally, you can remove the flag.

Often times you may want to run a workflow locally to test it before running it in the cloud. This is especially useful when you're developing a new workflow or debugging an existing one.

It can be useful to do some things different when running locally, like using a subset of data, save files in a different format for debugging, etc. to trigger a section of code when running locally you can use can check for `"FLYTE_INTERNAL_EXECUTION_ID"` variable in the code. If it's not present, the code is running locally.

```python
if "FLYTE_INTERNAL_EXECUTION_ID" not in os.environ:
    # Only run this code locally
```

Take a look at your pipeline in the Union UI. You can relaunch workflows/tasks, view logs, and see the artifacts generated by the workflow.



### Union Remote



In [22]:
from union.remote import UnionRemote
remote = UnionRemote()

def get_latest_execution_model(limit=100):

  recent_executions = remote.recent_executions(limit=limit)
  executions = [
      e for e in recent_executions if e.spec.launch_plan.name == "workflows.workflows.train_iris_classification"
  ]

  recent_ex_id = executions[0].id.name
  execution = remote.fetch_execution(name=recent_ex_id)
  model_uri = execution.outputs["o0"].remote_source

  return model_uri

In [23]:
model_uri = get_latest_execution_model()
print(model_uri)

In [5]:
from flytekit import FlyteFile  
def make_prediction(model_uri, pred_data):

  predict_task = remote.fetch_task(name="tasks.predict.batch_knn_predict")


  inputs = {
      "pred_data": pred_data,
      "model": FlyteFile(model_uri)
  }

  # Execute the task
  execution = remote.execute(predict_task, inputs=inputs, wait=True)

  response = execution.outputs["o0"]

  return response


In [6]:
print(make_prediction(model_uri, [[-3.0,-5.3,-6.3,-5.0]]))

[0]


In [12]:
#todo: load model locally


## 🚀 Model Serving & Artifacts

In this tutorial we'll show you the common ways to serve a model using Union, but you can also download or move the model to your own infrastructure.

- Use a regular containers for batch inference
- Use Actors (long running stateful) for near real-time inference
- Serve the model and application interface within Union

### Batch Prediction ML Workflow
The training workflow produced a model artifact that we can use to make predictions on new data.

Lets run our first prediction worflow. This workflow ... We'll see how we can use actors to run long running tasks next for faster predictions.

In [None]:
# !union run --remote workflows/workflows.py batch_prediction_knn

In [28]:
!union register workflows/workflows.py

[2mRunning pyflyte register from /Users/sageelliott/Documents/gitrepos/tut-sklearn-pipelines with images ImageConfig(default_image=Image(name='default', fqn='cr.union.ai/v1/unionai/union', tag='py3.11-0.1.144', digest=None), images=[Image(name='default', fqn='cr.union.ai/v1/unionai/union', tag='py3.11-0.1.144', digest=None)]) and image destination folder /root on 1 package(s) ('/Users/sageelliott/Documents/gitrepos/tut-sklearn-pipelines/workflows/workflows.py',)[0m
Registering against serverless-1.us-east-2.s.union.ai[0m
[33mDetected Root /Users/sageelliott/Documents/gitrepos/tut-sklearn-pipelines, using this to create deployable package...[0m
[33mLoading packages ['workflows.workflows'] under source root /Users/sageelliott/Documents/gitrepos/tut-sklearn-pipelines[0m
[33mNo output path provided, using a temporary directory at /var/folders/nv/hcrpygqd6xvd6m2cf6w3pbvc0000gn/T/tmp7la_ztqg instead[0m
[33mComputed version is FUmTmF2y099t7bgASyTmzg[0m
[34mImage flytekit:17ryEc3h1

In [29]:
from union.remote import UnionRemote
# Create a remote connection
remote = UnionRemote()

In [9]:
def predict_with_container(data):

    inputs = {"pred_data": data}

    workflow = remote.fetch_workflow(name="workflows.workflows.batch_prediction_knn")
    execution = remote.execute(workflow, inputs=inputs, wait=True) # wait=True will block until the execution is complete

    # print(execution.outputs)

    return execution.outputs['o0']

In [10]:
print(predict_with_container([[5.1, 3.5, 1.4, 0.2]]))

[0]


### ⚡ Enabling Near Real-time Predictions with Actors

Union [Actors](https://docs.union.ai/serverless/user-guide/core-concepts/actors/#actors) dramatically reduce the cost of cold starts by maintaining long-running stateful environments that stay ready for use until a defined time-to-live (TTL). This persistent setup eliminates redundant initialization and unlocks several key benefits:



In [30]:
def predict_with_actors(data):

    inputs = {"pred_data": data}

    workflow = remote.fetch_workflow(name="workflows.workflows.actor_prediction_knn")
    execution = remote.execute(workflow, inputs=inputs, wait=True) # wait=True will block until the execution is complete

    # print(execution.outputs)

    return execution.outputs['o0']

In [31]:
print(predict_with_actors([[5.1, 3.5, 1.4, 0.2]]))

In [13]:
print(predict_with_actors([[5.1, 3.5, 1.4, 0.2]]))

[0]


In [14]:
print(predict_with_actors([[5.1, 3.5, 1.4, 0.2]]))

[0]


In [None]:
# !union run --remote workflows/workflows.py actor_prediction_knn

### Build an application with Streamlit & Union Serving

Full app serving coming soon! 

In [None]:
# Enable union serving
# !export ENABLE_UNION_SERVING=1

In [25]:
!union deploy apps app.py simple-streamlit-iris

[34mImage union-serve-iris-streamlit:xVX848FiI6_Ko8MTry9Ipw found. Skip building.[0m
✨ Deploying Application: ]8;id=85184;https://serverless.union.ai/org/sagecodes/projects/default/domains/development/apps/simple-streamlit-iris\simple-streamlit-iris]8;;\
🔎 Console URL: 
]8;id=538170;https://serverless.union.ai/org/sagecodes/projects/default/domains/development/apps/simple-streamlit-iris\[4;94mhttps://serverless.union.ai/org/sagecodes/projects/default/domains/development/a[0m]8;;\
]8;id=538170;https://serverless.union.ai/org/sagecodes/projects/default/domains/development/apps/simple-streamlit-iris\[4;94mpps/simple-streamlit-iris[0m]8;;\
[1m[[0m[1mStatus[0m[1m][0m [3mPending:[0m OutOfDate: The Route is still working to reflect the latest 
desired specification.
[1m[[0m[1mStatus[0m[1m][0m [3mPending:[0m RevisionMissing: Configuration 
[32m"default-development-simple-streamlit-iris"[0m is waiting for a Revision to become 
ready.
[1m[[0m[1mStatus[0m[

In [None]:
# generate api key to use Union remote on external hosting.
!union create api-key admin --name gradio-hf-app

## Learn More About Union and Building AI Pipelines:

We hope you had funand learned something new from this tutorial on building ML pipelines with Union! Creating reproducible AI workflows is a powerful way to increase productivity and collaboration accross your team. And an essential part of MLOps for deploying and managing machine learning models in production.

To learn more about Union and building AI pipelines: 
- Check out the [Union Documentation](https://docs.union.ai/).
- Contact us at [Union.ai](https://union.ai) for a demo or to learn more about Union Enterprise.
- Join our Slack community to ask questions and share your projects with other Union users.



