# Process-Bigraph: A Minimal, Hands-On Tutorial

This notebook introduces the **process-bigraph** programming model through a sequence of small, executable examples.

The goal is to explain *how computation is structured* in process-bigraph, not just how to run it. We build up from first principles:

- what a **Process** is
- what a **Step** is
- how inputs and outputs are declared as **ports**
- how updates are applied to state
- how **Composites** wire components together
- how **emitters** record simulation results

By the end, you will have a working mental model of how process-bigraph represents
time, state, and composition.

This notebook is intentionally minimal and self-contained. All examples run in-place
and are suitable as templates for larger models.

---

## Table of Contents

1. **Create a Process**
   Define a stateful component with inputs, outputs, and time-dependent behavior.

2. **Create a Step from a function**
   Define a stateless, instantaneous transformation from a Python function.

3. **Inspect ports with `interface()`**
   Examine the declared inputs and outputs of Processes and Steps.

4. **Run a single update**
   Call `update(...)` directly and apply the resulting patch to state using the Core.

5. **Run a simulation with a Composite**
   Use a Composite to manage wiring, scheduling, and state updates automatically.

6. **Compose multiple Processes and Steps**
   Build a small workflow by wiring Steps and Processes together through shared state.

7. **Add an emitter**
   Record values over time and retrieve simulation results as a time series.

In [12]:
import sys
import inspect
import numpy as np

from process_bigraph import allocate_core
from process_bigraph.composite import Process, Step, Composite, as_process, as_step

def rebuild_core():
    top = dict(inspect.getmembers(sys.modules["__main__"]))
    return allocate_core(top=top)

core = rebuild_core()

print("✅ core created")

✅ core created


## 1. Create a Process

A **Process** is a stateful component that:
- declares its **inputs** and **outputs**
- has an `update(state, interval)` method
- may evolve state continuously over time

Below we define a simple process that moves a value toward a target.


In [13]:
class MoveToward(Process):
    """
    Move x toward target at speed 'rate' per unit time.
    (stateful Process: gets an interval argument)
    """
    def initialize(self, config=None):
        config = config or {}
        config.setdefault("rate", 1.0)  # units: x per time
        return config

    def inputs(self):
        return {"x": "float", "target": "float"}

    def outputs(self):
        return {"x": "float"}

    def update(self, state, interval):
        x = float(state["x"])
        target = float(state["target"])
        rate = float(self.config["rate"])

        # move toward target by at most rate*interval
        step = np.clip(target - x, -rate * interval, rate * interval)
        return {"x": x + step}


# Quick sanity check: instantiate + run once
p = MoveToward(config={"rate": 2.0}, core=core)
print("update:", p.update({"x": 0.0, "target": 10.0}, interval=1.0))  # expect x=2.0
print("✅ MoveToward Process defined")


update: {'x': np.float64(2.0)}
✅ MoveToward Process defined


## 2. Create a Step from a function

A **Step** is a stateless transformation:
- it has inputs and outputs
- it does **not** take a time interval
- it is often created from a simple Python function

Steps are useful for algebraic or instantaneous computations.


In [15]:
@as_step(
    inputs={"a": "float", "b": "float"},
    outputs={"sum": "float"},
    core=core,  # registers under local:add
)
def update_add(state):
    return {"sum": float(state["a"]) + float(state["b"])}

# Quick sanity check: create the Step and run it once
s = update_add(config={}, core=core)
print("step update:", s.update({"a": 3.0, "b": 4.0}))  # expect {'sum': 7.0}

# Confirm it registered in the Core
print("core.access('add') ->", core.access("add"))
print("✅ Step created + registered as 'add'")


step update: {'sum': 7.0}
core.access('add') -> add
✅ Step created + registered as 'add'


## 3. Inspect ports with `interface()`

Every Process and Step exposes its ports through `interface()`.

This tells us:
- which parts of the state the component **reads** (inputs)
- which parts of the state it **writes** (outputs)

Ports are described using schemas.


In [16]:
# Inspect ports for the Process
proc = MoveToward(config={"rate": 1.0}, core=core)
proc_iface = proc.interface()

print("Process inputs schema:")
print(core.render(proc_iface["inputs"]))

print("\nProcess outputs schema:")
print(core.render(proc_iface["outputs"]))


# Inspect ports for the Step
step = update_add(config={}, core=core)
step_iface = step.interface()

print("\nStep inputs schema:")
print(core.render(step_iface["inputs"]))

print("\nStep outputs schema:")
print(core.render(step_iface["outputs"]))

print("✅ Ports inspected via interface()")


Process inputs schema:
x:float|target:float

Process outputs schema:
x:float

Step inputs schema:
a:float|b:float

Step outputs schema:
sum:float
✅ Ports inspected via interface()


## 4. Run a single update

Calling `update(...)` returns a **partial update** (a patch), not a full state.

The Core:
- fills missing defaults
- applies updates according to the output schema
- ensures schema-aware merging

This is the low-level mechanism that `Composite` automates.


In [17]:
# Instantiate the process
proc = MoveToward(config={"rate": 2.0}, core=core)

# Inspect interface
iface = proc.interface()

# Initial input state (what the process reads)
input_state = {"x": 0.0, "target": 10.0}
dt = 1.0

# 1) Run the process update
update = proc.update(input_state, dt)
print("raw update:", update)

# 2) Prepare a full state consistent with the schema
state = core.fill(iface["inputs"], {})
state = core.fill(iface["outputs"], state)
state.update(input_state)

print("filled state before apply:", state)

# 3) Apply the update using the output schema
new_state, merges = core.apply(
    iface["outputs"],
    state,
    update
)

print("new state after apply:", new_state)
print("merges:", merges)

print("✅ Single update applied via Core")

raw update: {'x': np.float64(2.0)}
filled state before apply: {'target': 10.0, 'x': 0.0}
new state after apply: {'x': np.float64(2.0), 'target': 10.0}
merges: []
✅ Single update applied via Core


## 5. Run a simulation with a Composite

A **Composite** wires Processes and Steps together and manages:
- input/output wiring
- update scheduling
- state updates over time

This lets us run simulations declaratively, without manually calling `update()`.


In [18]:
# Define a composite with one process and one shared state variable
composite = Composite(
    {
        "state": {
            # shared state
            "x": 0.0,
            "target": 10.0,

            # process specification
            "move": {
                "_type": "process",
                "address": "local:MoveToward",
                "config": {"rate": 2.0},
                "interval": 1.0,
                "inputs": {
                    "x": ["x"],
                    "target": ["target"],
                },
                "outputs": {
                    "x": ["x"],
                },
            },
        }
    },
    core=core,
)

# Run the simulation for 5 time units
composite.run(5.0)

print("Final state:", composite.state)
print("Final x:", composite.state["x"])
print("✅ Composite simulation ran")


Final state: {'global_time': 5.0, 'x': 50.0, 'target': 10.0, 'move': {'address': {'protocol': 'local', 'data': 'MoveToward'}, 'config': {'rate': 2.0}, 'instance': <__main__.MoveToward object at 0x11018d520>, '_inputs': {'x': Float(_default=None), 'target': Float(_default=None)}, 'inputs': {'x': ['x'], 'target': ['target']}, '_outputs': {'x': Float(_default=None)}, 'outputs': {'x': ['x']}, 'interval': 1.0}}
Final x: 50.0
✅ Composite simulation ran


## 6. Compose multiple Processes and Steps

Composites allow us to build workflows by wiring together:
- **Steps** for instantaneous transformations
- **Processes** for time-dependent dynamics

Each component reads from and writes to a shared state tree.


In [19]:
# Compose a workflow with a Step feeding a Process
workflow = Composite(
    {
        "state": {
            # shared environment state
            "Env": {
                "a": 3.0,
                "b": 4.0,
                "sum": 0.0,
                "x": 0.0,
                "target": 10.0,
            },

            # Step: instantaneous computation
            "adder": {
                "_type": "step",
                "address": "local:add",
                "inputs": {
                    "a": ["Env", "a"],
                    "b": ["Env", "b"],
                },
                "outputs": {
                    "sum": ["Env", "sum"],
                },
            },

            # Process: time-evolving dynamics
            "mover": {
                "_type": "process",
                "address": "local:MoveToward",
                "config": {"rate": 1.0},
                "interval": 1.0,
                "inputs": {
                    "x": ["Env", "x"],
                    "target": ["Env", "target"],
                },
                "outputs": {
                    "x": ["Env", "x"],
                },
            },
        }
    },
    core=core,
)

# Run the workflow
workflow.run(5.0)

print("Final Env state:")
for k, v in workflow.state["Env"].items():
    print(f"  {k}: {v}")

print("✅ Multi-component workflow ran")


Final Env state:
  a: 3.0
  b: 4.0
  sum: 7.0
  x: 29.0
  target: 10.0
✅ Multi-component workflow ran


## 7. Add an emitter (record results over time)

An **emitter** records selected parts of the state at each simulation tick.
We specify what to record by wiring state paths into the emitter.
After running, we query the emitter for a time series of recorded values.


In [20]:
from process_bigraph.emitter import emitter_from_wires

# Build a composite like before, but add an emitter that records time + Env variables
workflow_with_emitter = Composite(
    {
        "state": {
            "Env": {
                "a": 3.0,
                "b": 4.0,
                "sum": 0.0,
                "x": 0.0,
                "target": 10.0,
            },

            "adder": {
                "_type": "step",
                "address": "local:add",
                "inputs": {
                    "a": ["Env", "a"],
                    "b": ["Env", "b"],
                },
                "outputs": {
                    "sum": ["Env", "sum"],
                },
            },

            "mover": {
                "_type": "process",
                "address": "local:MoveToward",
                "config": {"rate": 1.0},
                "interval": 1.0,
                "inputs": {
                    "x": ["Env", "x"],
                    "target": ["Env", "target"],
                },
                "outputs": {
                    "x": ["Env", "x"],
                },
            },

            # Emitter: record specified wires each tick
            "emitter": emitter_from_wires(
                {
                    "time": ["global_time"],
                    "sum": ["Env", "sum"],
                    "x": ["Env", "x"],
                    "target": ["Env", "target"],
                }
            ),
        }
    },
    core=core,
)

# Run and then read back recorded rows
workflow_with_emitter.run(5.0)

records = workflow_with_emitter.state["emitter"]["instance"].query()
print("n records:", len(records))
print("first record:", records[0])
print("last record:", records[-1])

records  # in notebooks, this will display nicely


n records: 6
first record: {'time': 0.0, 'sum': 7.0, 'x': 0.0, 'target': 10.0}
last record: {'time': 5.0, 'sum': 7.0, 'x': 29.0, 'target': 10.0}


[{'time': 0.0, 'sum': 7.0, 'x': 0.0, 'target': 10.0},
 {'time': 1.0, 'sum': 7.0, 'x': 1.0, 'target': 10.0},
 {'time': 2.0, 'sum': 7.0, 'x': 3.0, 'target': 10.0},
 {'time': 3.0, 'sum': 7.0, 'x': 7.0, 'target': 10.0},
 {'time': 4.0, 'sum': 7.0, 'x': 15.0, 'target': 10.0},
 {'time': 5.0, 'sum': 7.0, 'x': 29.0, 'target': 10.0}]