## Introduction

This guide demonstrates the setup and interactions of a `node`, `registry`, and `provider` service using the `kodo` framework. These services are used to manage distributed flows and registries in a modular and scalable architecture. A `node` provides agentic flows and connects to a `registry`, which brokers the nodes and flows. Additionally, a `hybrid` service acts as both a `node` and a `provider`, enabling more complex workflows by integrating its flows while maintaining independent operations. 

The examples below detail how to configure and start each component, including their connections and expected behaviors, illustrated with logging output.

In [4]:
import importlib

import multiprocessing as mp
from kodo.service.node import run_service
import httpx


importlib.reload(run_service)

ImportError: module run_service not in sys.modules

## Start `node` with 5 Flows

available options to `run_service`:
* `loader: Optional[str] = "kodo.worker.loader:default_loader"`
* `url:  Optional[str] = "http://localhost:3366"`
* `organization: Optional[str] = None`
* `connect: Optional[List[str]] = None`
* `registry: Optional[bool] = True`
* `feed: Optional[bool] = True`
* `cache_data: Optional[str] = "./data/cache.json"`
* `cache_reset: Optional[bool] = False`
* `screen_level: Optional[str] = "INFO"`
* `log_file: Optional[str] = "./data/kodo.log"`
* `log_file_level: Optional[str] = "DEBUG"`
* `exec_data: Optional[str] = "./data/exec"`
* `timeout: Optional[int] = 30`
* `retry: Optional[int] = 9`


In [2]:
node = mp.Process(
    target=run_service, 
    kwargs={
        "loader": "/Users/rostf/repos/kodo-core/tests/assets/node.yaml",
        # "cache_reset": True,
        # "retry": -1  # try forever
    })
node.start()

INFO     2025-01-13 14:22:34,703 http://localhost:3366: startup with flows: 5, providers: 0, connection: 1, log level: debug
DEBUG    2025-01-13 14:22:34,706 http://localhost:3366: semaphore +1 to 1 for connect
INFO     2025-01-13 14:22:34,706 http://localhost:3366: node startup complete
INFO     2025-01-13 14:22:34,708 http://localhost:3366: http://localhost:3367/connect with nodes: 1, flows: 5
DEBUG    2025-01-13 14:22:44,797 http://localhost:3366: return from home


## Start `registry`

After registry startup the node above will connect. This can take upt o 8 

In [3]:
registry = mp.Process(
    target=run_service, 
    kwargs={
        "loader": "/Users/rostf/repos/kodo-core/tests/assets/registry.yaml",
        "cache_reset": True,
    })
registry.start()

INFO     2025-01-13 14:09:47,760 http://localhost:3367: startup with flows: 0, providers: 0, connection: 0, log level: debug
INFO     2025-01-13 14:09:47,764 http://localhost:3367: registry startup complete (feed is True)
DEBUG    2025-01-13 14:09:51,539 http://localhost:3367: saved cache data/3367.json
INFO     2025-01-13 14:09:51,539 http://localhost:3367: connect from http://localhost:3366 with nodes: 1, flows: 5, feed False to organization Plan.Net Journey Agentic Node
DEBUG    2025-01-13 14:09:51,539 http://localhost:3366: succeed with http://localhost:3367/connect
DEBUG    2025-01-13 14:09:51,539 http://localhost:3366: semaphore -1 to 0
DEBUG    2025-01-13 14:09:51,539 http://localhost:3366: http://localhost:3367/connect complete


## Start `hybrid` (node _and_ provider service)

In [5]:
hybrid = mp.Process(
    target=run_service, 
    kwargs={
        "loader": "/Users/rostf/repos/kodo-core/tests/assets/agent50.yaml",
        "cache_reset": True
    })
hybrid.start()

INFO     2025-01-13 14:10:26,521 http://localhost:3368: startup with flows: 50, providers: 0, connection: 1, log level: debug
DEBUG    2025-01-13 14:10:26,523 http://localhost:3368: semaphore +1 to 1 for connect
INFO     2025-01-13 14:10:26,523 http://localhost:3368: registry startup complete (feed is False)
INFO     2025-01-13 14:10:26,525 http://localhost:3368: http://localhost:3367/connect with nodes: 1, flows: 50
DEBUG    2025-01-13 14:10:26,552 http://localhost:3367: saved cache data/3367.json
INFO     2025-01-13 14:10:26,552 http://localhost:3367: connect from http://localhost:3368 with nodes: 1, flows: 50, feed False to organization Mediaplus Agentic Node
DEBUG    2025-01-13 14:10:26,553 http://localhost:3368: succeed with http://localhost:3367/connect
DEBUG    2025-01-13 14:10:26,553 http://localhost:3368: semaphore -1 to 0
DEBUG    2025-01-13 14:10:26,553 http://localhost:3368: http://localhost:3367/connect complete


## SUMMARY

A node offers flows and connects to a registry. A registry brokers nodes/flows. A provider is a registry which provides nodes/flows to other registries but does not integrate the nodes/flows from peer registries (`feed is False`).
```mermaid
    flowchart LR
        flow5[5 Flows] --> node
        node --> registry
        flow50[50 Flows] --> provider
        provider --> registry
```

In [6]:
node_url = "http://localhost:3366"
registry_url = "http://localhost:3367"
hybrid_url = "http://localhost:3368"

assert httpx.get(f"{node_url}/").json()["registry"] is False
assert httpx.get(f"{registry_url}/").json()["registry"] is True and httpx.get(f"{registry_url}/").json()["feed"] is True
assert httpx.get(f"{hybrid_url}/").json()["registry"] is True and httpx.get(f"{hybrid_url}/").json()["feed"] is False

DEBUG    2025-01-13 14:11:16,826 http://localhost:3366: return from home
DEBUG    2025-01-13 14:11:16,851 http://localhost:3367: return from home
DEBUG    2025-01-13 14:11:16,872 http://localhost:3367: return from home
DEBUG    2025-01-13 14:11:16,891 http://localhost:3368: return from home
DEBUG    2025-01-13 14:11:16,913 http://localhost:3368: return from home
DEBUG    2025-01-13 14:12:06,491 http://localhost:3366: return from home
DEBUG    2025-01-13 14:12:22,721 http://localhost:3367: return from home
DEBUG    2025-01-13 14:12:48,748 http://localhost:3368: return from home


Retrieving the flows from `node`, `registry` and `provider` yields the following result.

In [7]:
assert httpx.get(f"{node_url}/flows").json()["total"] == 5
assert httpx.get(f"{registry_url}/flows").json()["total"] == 55
assert httpx.get(f"{hybrid_url}/flows").json()["total"] == 50

DEBUG    2025-01-13 14:12:58,778 http://localhost:3366: return /flows with page 0/0 and 5/5 records
DEBUG    2025-01-13 14:12:58,810 http://localhost:3367: return /flows with page 0/5 and 10/55 records
DEBUG    2025-01-13 14:12:58,833 http://localhost:3368: return /flows with page 0/5 and 10/50 records
DEBUG    2025-01-13 14:13:24,662 http://localhost:3368: return from home
DEBUG    2025-01-13 14:13:33,300 http://localhost:3368: return /flows with page 0/5 and 10/50 records
DEBUG    2025-01-13 14:13:40,051 http://localhost:3367: return /flows with page 0/5 and 10/55 records


## Details

The `registry` caches the result. On restart the `registry` delivers the nodes and flows even if the node died.

In [8]:
node.terminate()
registry.terminate()

INFO     2025-01-13 14:13:54,757 http://localhost:3366: shutdown now
INFO     2025-01-13 14:13:54,757 http://localhost:3367: shutdown now


### registry restart

In [10]:
registry = mp.Process(
    target=run_service, 
    kwargs={
        "loader": "/Users/rostf/repos/kodo-core/tests/assets/registry.yaml",
        "cache_reset": False,  # Default Value is False
        "retry": -1  # retry forever
    })
registry.start()

INFO     2025-01-13 14:14:21,410 http://localhost:3367: startup with flows: 0, providers: 2, connection: 0, log level: debug
DEBUG    2025-01-13 14:14:21,412 http://localhost:3367: semaphore +1 to 1 for reconnect
DEBUG    2025-01-13 14:14:21,412 http://localhost:3367: semaphore +1 to 2 for reconnect
INFO     2025-01-13 14:14:21,412 http://localhost:3367: registry startup complete (feed is True)
INFO     2025-01-13 14:14:21,414 http://localhost:3367: http://localhost:3366/reconnect
INFO     2025-01-13 14:14:21,430 http://localhost:3367: http://localhost:3368/reconnect
DEBUG    2025-01-13 14:14:21,447 http://localhost:3368: semaphore +1 to 1 for connect
INFO     2025-01-13 14:14:21,447 http://localhost:3368: http://localhost:3367/connect with nodes: 1, flows: 50
DEBUG    2025-01-13 14:14:21,447 http://localhost:3367: succeed with http://localhost:3368/reconnect
DEBUG    2025-01-13 14:14:21,448 http://localhost:3367: semaphore -1 to 1
DEBUG    2025-01-13 14:14:21,448 http://localhost:3367

In [11]:
assert httpx.get(f"{registry_url}/flows").json()["total"] == 55

DEBUG    2025-01-13 14:14:29,255 http://localhost:3367: return /flows with page 0/5 and 10/55 records


If the node restarts itself but changes the settings or set of flows, the registry's `/reconnect` synchronizes the registry.

In [12]:
node = mp.Process(
    target=run_service, 
    kwargs={
        "url": "http://localhost:3366",
        "registry": False,
        "feed": False,
        "organization": "Node with New Name",
        "cache_data": "./data/3366.json",
        "connect": "http://localhost:3367",
        "cache_reset": False,
        "screen_level": "DEBUG",
        "retry": -1,  # try forever
    })
node.start()

INFO     2025-01-13 14:15:22,134 http://localhost:3366: startup with flows: 0, providers: 0, connection: 1, log level: DEBUG
DEBUG    2025-01-13 14:15:22,136 http://localhost:3366: semaphore +1 to 1 for connect
INFO     2025-01-13 14:15:22,136 http://localhost:3366: node startup complete
INFO     2025-01-13 14:15:22,138 http://localhost:3366: http://localhost:3367/connect with nodes: 0, flows: 0
DEBUG    2025-01-13 14:15:22,163 http://localhost:3367: saved cache data/3367.json
INFO     2025-01-13 14:15:22,163 http://localhost:3367: connect from http://localhost:3366 with nodes: 0, flows: 0, feed False to organization Node with New Name
DEBUG    2025-01-13 14:15:22,164 http://localhost:3366: succeed with http://localhost:3367/connect
DEBUG    2025-01-13 14:15:22,164 http://localhost:3366: semaphore -1 to 0
DEBUG    2025-01-13 14:15:22,164 http://localhost:3366: http://localhost:3367/connect complete
DEBUG    2025-01-13 14:15:25,272 http://localhost:3366: semaphore +1 to 1 for connect
IN

In [13]:
assert httpx.get(f"{node_url}/").json()["organization"] == "Node with New Name"

DEBUG    2025-01-13 14:15:42,020 http://localhost:3366: return from home
DEBUG    2025-01-13 14:15:57,053 http://localhost:3366: return /flows with page 0/0 and 0/0 records
DEBUG    2025-01-13 14:16:05,908 http://localhost:3366: return from home
DEBUG    2025-01-13 14:16:12,112 http://localhost:3367: return /flows with page 0/5 and 10/50 records
DEBUG    2025-01-13 14:16:36,973 http://localhost:3367: return from home
DEBUG    2025-01-13 14:16:53,709 http://localhost:3366: return from home


In [14]:
assert httpx.get(f"{node_url}/flows").json()["total"] == 0
assert httpx.get(f"{registry_url}/flows").json()["total"] == 50
assert httpx.get(f"{hybrid_url}/flows").json()["total"] == 50

DEBUG    2025-01-13 14:16:58,433 http://localhost:3366: return /flows with page 0/0 and 0/0 records
DEBUG    2025-01-13 14:16:58,460 http://localhost:3367: return /flows with page 0/5 and 10/50 records
DEBUG    2025-01-13 14:16:58,482 http://localhost:3368: return /flows with page 0/5 and 10/50 records


## Disconnect

A node, registry or registry provider can explicitely _disconnect_ from a registry. This will remove the node and flows from the registry. The node itself still serves the flows though.

In [15]:
resp = httpx.delete(f"{hybrid_url}/connect")
assert resp.status_code == 204

INFO     2025-01-13 14:17:08,410 http://localhost:3367: /disconnect from http://localhost:3368
INFO     2025-01-13 14:17:08,410 http://localhost:3367: removed provider: http://localhost:3368
DEBUG    2025-01-13 14:17:08,423 http://localhost:3367: saved cache data/3367.json


In [16]:
assert httpx.get(f"{node_url}/flows").json()["total"] == 0
assert httpx.get(f"{registry_url}/flows").json()["total"] == 0
assert httpx.get(f"{hybrid_url}/flows").json()["total"] == 50

DEBUG    2025-01-13 14:17:13,712 http://localhost:3366: return /flows with page 0/0 and 0/0 records
DEBUG    2025-01-13 14:17:13,736 http://localhost:3367: return /flows with page 0/0 and 0/0 records
DEBUG    2025-01-13 14:17:13,757 http://localhost:3368: return /flows with page 0/5 and 10/50 records


In [16]:
node.terminate()

INFO     2025-01-13 10:04:08,036 http://localhost:3366: shutdown now


In [None]:
registry.terminate()

In [None]:
hybrid.terminate()