In [1]:
from base.director import Director
from base.blocks import Action, Split, Condition, Termination, Save
import asyncio

# auto-reload for development
%load_ext autoreload 
%autoreload 2

In [25]:
class Action1(Action):
    async def forward(self, data, args: dict = None):
        result = f"{data} Action1 Complete,"
        return result

class Action2(Action):
    async def forward(self, data, args: dict = None):
        result = f"{data} Action2 Complete,"#, f"{data} Action2 Complete on try 2,"]
        return result

class Action3(Action):
    async def forward(self, data, args: dict = None):
        input_arg = args["hello world"]
        result = f"{data} Action3 Complete adding {input_arg}"
        return result

class CustomSplit(Split):
    async def forward(self, data, args: dict = None):
        output = [data, data]
        return output

class CustomCondition(Condition):
    async def forward(self, data, args: dict = None):
        if len(data) > 50:
            return True
        return False

director = Director(max_concurrent_actions=100, depth_first=True)

perform_action1 = Action1("Action 1")
perform_action2 = Action2("Action 2")
perform_action2b = CustomSplit("Action 2b")
perform_action3 = Action3("Action 3")
condition_1 = CustomCondition("Condition 1")
save_agent = Save("Example Output", "example_output.txt")
director.subscribe("Initial Call", perform_action1)
director.subscribe("Action 1", perform_action2)
director.subscribe("Action 2", perform_action2b)
director.subscribe("Action 2b", perform_action3)
director.subscribe("Action 2b", condition_1)
director.subscribe("Condition 1", save_agent)

In [26]:
args = {"hello world": "hello world"}
await director("Initial Call", "Testing the director with", flatten_results = True, completion_results = True, args = args)

[{'event_name': 'Action 3',
  'result': 'Testing the director with Action1 Complete, Action2 Complete, Action3 Complete adding hello world'},
 {'event_name': 'Action 3',
  'result': 'Testing the director with Action1 Complete, Action2 Complete, Action3 Complete adding hello world'}]

In [20]:
save_agent.get_history(include_timestamp=True, delete_history=True)

[]

In [27]:
director.logs

deque(["Processing events for 'Initial Call'",
       "Processing events for 'Action 1'",
       "Processing events for 'Action 2'",
       "Splitting 'Action 2b'",
       "Processing events for 'Action 2b'",
       "No listeners for 'Action 3'",
       "Condition 'Condition 1' failed",
       "Events for 'Action 2b' completed",
       "Processing events for 'Action 2b'",
       "No listeners for 'Action 3'",
       "Condition 'Condition 1' failed",
       "Events for 'Action 2b' completed",
       "Events for 'Action 2' completed",
       "Events for 'Action 1' completed",
       "Events for 'Initial Call' completed"],
      maxlen=1000000)

In [6]:
import requests
import json

# URL of the endpoint
url = "http://0.0.0.0:8000/api/Initial Call"

# Data to be sent - adjust this according to your actual data requirements
data = {
    "key": "value"
}

# Send a POST request
response = requests.post(url, json=data)

# Print the response from the server
print("Status Code:", response.status_code)
print("Response Body:", response.json())


Status Code: 200
Response Body: [{'event_name': 'Action 1', 'result': "{'key': 'value'} Action1 Complete,"}, {'event_name': 'Action 2', 'result': "{'key': 'value'} Action1 Complete, Action2 Complete,"}, {'event_name': 'Action 2b', 'result': ["{'key': 'value'} Action1 Complete, Action2 Complete,", "{'key': 'value'} Action1 Complete, Action2 Complete,"]}, {'event_name': 'Action 3', 'result': "{'key': 'value'} Action1 Complete, Action2 Complete, Action3 Complete "}, {'event_name': 'Action 2b', 'result': ["{'key': 'value'} Action1 Complete, Action2 Complete,", "{'key': 'value'} Action1 Complete, Action2 Complete,"]}, {'event_name': 'Action 3', 'result': "{'key': 'value'} Action1 Complete, Action2 Complete, Action3 Complete "}]
