| What | Declarative agent flows in YAML, with checkpointing and resume |
| Why | Production agents need to survive process restarts and look diff-able in code review |
| Status | 🟢 Active — used internally on a couple of pipelines |
| Stack | Python 3.9+, PyYAML, Jinja2 |
pip install -r requirements.txt
pip install -e .
flowmind run examples/hello.yamlDefine a flow as data:
name: hello
entry: greet
nodes:
- id: greet
step: set
args:
greeting: "hello"
who: "world"
next: speak
- id: speak
step: print
args:
msg: "{{ greeting }}, {{ who }}!"Run it:
from flowmind import load_flow, Runtime
flow = load_flow("examples/hello.yaml")
state = Runtime(flow, checkpoint_dir="state/").run()
print(state["greeting"], state["who"])Three reasons, all production-flavoured:
- Diffs. A non-technical reviewer can read a YAML pull request and tell you that step 7 changed. They can't read a Python DSL.
- Hot-swap. The flow file is configuration. Change it without redeploying the worker.
- Single source of truth. The flow doubles as the spec and the documentation.
That said — every step is still Python. You're not stuck with the built-ins.
| Step | What it does |
|---|---|
set |
Write literal values to state |
print |
Print a Jinja-rendered message (debugging) |
noop |
Terminal placeholder |
http_get |
GET a URL, parse JSON, store under store_as |
python |
eval a tiny expression — sandboxed (__builtins__ cleared) |
Register your own:
from flowmind import Step, register
@register("call_llm")
class CallLLM(Step):
def run(self, state):
from openai import OpenAI
resp = OpenAI().chat.completions.create(
model=self.kwargs["model"],
messages=[{"role": "user", "content": state["prompt"]}],
)
return {"answer": resp.choices[0].message.content}Linear flows are boring. Branches are first-class:
branches:
- id: classify
cases:
- when: "score >= 80"
next: high_path
- when: "score >= 50"
next: mid_path
default: low_pathwhen is any Jinja expression with the full state in scope.
Pass checkpoint_dir= to the Runtime. After every step it writes {flow_name}.state.json. Resume from the last completed node:
state = State.load("state/hello.state.json")
Runtime(flow, state=state, checkpoint_dir="state/").resume(
from_node=state["_last_node"]
)This is the killer feature when an agent's tool call costs real money and the worker died half-way through.
A node can declare a fallback:
- id: try_fetch
step: http_get
args:
url: "https://api.example.com/v1/thing"
on_error: fallback_local
next: parseIf the step raises, the runtime jumps to on_error instead of crashing. The error and its origin land in state.history().
- No DAG fan-out / fan-in. Each step has one successor. If you need a DAG, you probably want a real workflow engine (Prefect, Temporal, Airflow). FlowMind is for the 90% of agent flows that are basically a state machine.
- No retry loops. Use
on_errorto route to a retry node yourself. - No async. Steps are synchronous. Easier to reason about, especially with checkpoints.
Apache-2.0.