Skip to content

Schedule and monitor your Metaflow pipelines through Flyte without rewriting them

License

Notifications You must be signed in to change notification settings

npow/metaflow-flyte

Repository files navigation

metaflow-flyte

CI PyPI License: Apache-2.0 Python 3.10+

Schedule and monitor your Metaflow pipelines through Flyte without rewriting them.

The problem

You've built pipelines in Metaflow and now need Flyte's scheduling, UI, and observability — but rewriting your flows in Flytekit means losing Metaflow's versioning, artifact store, and local execution model. Running both side-by-side means maintaining two copies of every pipeline.

Quick start

pip install metaflow-flyte

# Generate the Flyte workflow file
python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest

# Run locally (no cluster required)
pyflyte run my_flow_remote.py my_flow

# Register and run on a Flyte cluster
pyflyte register --project flytesnacks --domain development my_flow_remote.py
pyflyte run --remote --project flytesnacks --domain development \
    my_flow_remote.py my_flow

Install

pip install metaflow-flyte

Or from source:

git clone https://github.com/npow/metaflow-flyte.git
cd metaflow-flyte
pip install -e ".[dev]"

Usage

Generate and register a Flyte workflow

python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest

pyflyte register --project myproject --domain development my_flow_remote.py

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically as Flyte workflow arguments:

python param_flow.py --datastore=s3 flyte create param_flow_remote.py \
    --image my-registry/my-image:latest

Pass parameters at runtime:

pyflyte run --remote ... param_flow_remote.py param_flow --greeting "Hello"

Step decorators (--with)

Inject Metaflow step decorators at deploy time without modifying the flow source:

python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest \
    --with=kubernetes:cpu=4,memory=8000

Retries

@retry on any step is picked up automatically. The generated Flyte task gets the corresponding retries parameter:

class MyFlow(FlowSpec):
    @retry(times=3)
    @step
    def train(self):
        ...

Scheduled flows

If your flow has a @schedule decorator, the generated file includes a Flyte LaunchPlan with the corresponding cron schedule automatically.

Project namespace

If the flow uses @project(name=...), the Flyte project is automatically used:

@project(name="recommendations")
class TrainFlow(FlowSpec): ...

How it works

metaflow-flyte compiles your Metaflow flow's DAG into a self-contained Flyte workflow file. Each Metaflow step becomes a @task. The generated file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • derives a stable run_id from the Flyte execution ID so all steps share one Metaflow run
  • passes --input-paths correctly for joins and foreach fan-outs
  • emits Metaflow artifact retrieval snippets to the Flyte UI Deck after each step

Executions list

All workflow executions are visible in the Flyte console with status, duration, and launch plan:

Flyte console showing multiple succeeded executions

Linear flow

A simple 3-step linear flow (start → process → end) runs as 4 Flyte tasks — one to generate the shared run ID, then one per Metaflow step:

Linear flow execution with all tasks succeeded

Branch flow

Branch flows with split/join (start → branch_a + branch_b → join → end) run the parallel steps concurrently as separate Flyte tasks:

Branch flow execution showing parallel steps

Foreach flow

Foreach fan-outs use a Flyte @dynamic task to spawn one task per item at runtime. The _foreach_*_dynamic Sub-Workflow node fans out and collects results:

Foreach flow execution showing dynamic sub-workflow

Task detail and Flyte Deck

Click any task in the execution view to open the detail panel. Each Metaflow step produces a Flyte Deck accessible via the "Flyte Deck" button:

Task detail panel showing Flyte Deck button

Metaflow artifact retrieval

The metaflow tab in the Flyte Deck shows the exact Python code to retrieve each artifact from this specific task — using the full FlowName/run_id/step_name/task_id pathspec:

Metaflow deck tab showing artifact retrieval code

For tasks that produce multiple artifacts, each one is listed with its access expression:

Metaflow deck showing message and result artifacts

Parametrised flows show the parameter values alongside the artifacts:

Param flow deck showing greeting and message artifacts

# Retrieve artifacts from any completed Metaflow step
from metaflow import Task
task = Task('LinearFlow/flyte-atlw559q7zhbg2mw92sq/process/62e7a9511b5646b6')

task.data.message   # access the 'message' artifact
task.data.result    # access the 'result' artifact

Configuration

The generated file bakes in datastore and image settings at creation time so every task subprocess uses the same configuration.

# Use S3 datastore with a custom image
python my_flow.py \
  --datastore=s3 \
  flyte create my_flow_remote.py \
  --image my-registry/my-image:latest \
  --project flytesnacks \
  --domain development

Docker image requirements

Your Docker image must contain:

  1. The flow Python file at the same absolute path as on your local machine
  2. All Python dependencies (metaflow, flytekit, boto3, etc.)
  3. USERNAME environment variable set (e.g. ENV USERNAME=metaflow)
  4. S3/datastore credentials and endpoint configuration

Example Dockerfile:

FROM python:3.11-slim
RUN pip install "metaflow>=2.9" "flytekit>=1.10" "boto3>=1.26"
COPY my_flow.py /path/to/my_flow.py
ENV USERNAME=metaflow

Development

git clone https://github.com/npow/metaflow-flyte.git
cd metaflow-flyte
pip install -e ".[dev]"

# Compilation tests only (fast, ~25s)
pytest tests/ -m "not integration and not e2e"

# Compilation + local pyflyte run (~3 min)
pytest tests/ -m "not e2e"

# E2e (requires a running Flyte cluster)
pytest tests/ -m "e2e"

The test suite covers three tiers:

  • Tier 1: compile all graph shapes → assert generated file content
  • Tier 2: pyflyte run locally → verify Metaflow artifacts written to disk
  • Tier 3: register + run on a live Flyte cluster → verify S3 artifacts and deck output

License

Apache 2.0

About

Schedule and monitor your Metaflow pipelines through Flyte without rewriting them

Topics

Resources

License

Stars

Watchers

Forks

Packages