# Tutorial 1

**In this tutorial you will:**
- learn the Flow basics and implement your first Flow [Section 1](#1-writing-your-first-atomicFlow-reversenumberatomicflow)
- understand the basics of Flows engine and serve your first Flow [Section 2](#2-Flow-engine)
- learn how to call served Flows [Section 2](#24-getting-an-instance-of-the-served-flow-via-a-proxy)
- learn how to compose Flows into more complex Flows [Section 3](#3writing-your-first-compositeFlow-reversenumbersequentialflow)

In [1]:
%load_ext autoreload
%autoreload 2
#imports
from aiflows.utils.general_helpers import read_yaml_file
from aiflows.utils import serving
from aiflows.utils import colink_utils
from aiflows.workers import run_dispatch_worker_thread
from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage
import sys
import os
sys.path.append("..")
from utils import compile_and_writefile, dict_to_yaml

# Specify path of your Flow modules
FLOW_MODULES_PATH = "./"

  from .autonotebook import tqdm as notebook_tqdm


## 1. Writing your First AtomicFlow: ReverseNumberAtomicFlow

In this section we will introduce the concept of a Flow and implement a simple Flow that reverses a number passed in the input message.

In a nutshell, Flows are objects that perform semantically meaningful unit of work and comunicate via messages.

There are two types of Flows: 
1) **Atomic Flows:** Performs the computation directly
2) **Composite Flows:** Orchestrates other Flows in performing the computation

When a message is send to a Flow, the Flow processes the message according to the logic specified in its `run` method. We will now go through the implementation of the `ReverseNumberAtomicFlow`: a Flow who given a number, reverses it (e.g., 12345-> 54321).

In Section 3, we see provide an example implementation of a Composite Flow.

### 1.1 Writing the ReverseNumberAtomicFlow

To implement a Flow, you must define its run method. This method takes an input_message as its input, which is an instance of the FlowMessage class. Within the FlowMessage, there are two important attributes to consider:

 - 1. `data`: This attribute is a dictionary containing the data that the Flow will process.
 - 2. `reply_data`: Another dictionary, it holds necessary information to send a reply message back to the sender of the input message. Typically, direct modification of this attribute isn't recommended. Instead, utilize the provided methods for modifying it (these methods will be introduced throughout the notebook).

When defining a Flow, it's crucial to specify the `input_interface` and `output_interface` in the configuration file. These interfaces specify the expected keys in the `data` attribute of the `input_message` and what will be returned in the `data` attribute when sending the message back. This ensures proper calling of your Flow by other users. For instance, in the `ReverseNumberAtomicFlow`, the `input_interface` expects a key `number` in the `data` attribute of the `input_message`, while the `output_interface` will include a key `reversed_number` in the `data` attribute of the sent message. Once the number is reversed, package the response dictionary into a message using the `package_output_message` method. This method packages the dictionary into a message and adjusts the `reply_data` attribute of the message to match that of the input message, ensuring it is sent back to the original sender. Finally, the reply is sent back using the `send_message` method.


In [2]:
%%compile_and_writefile ./ReverseNumberFlowModule/ReverseNumberAtomicFlow.py


from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage

class ReverseNumberAtomicFlow(AtomicFlow):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    # Customize the logic within this function as needed for your specific Flow requirements.
    def run(self, input_message: FlowMessage):
        ## ~~~~ Getting the input ~~~~

        # Get the input dictionary from the input message
        input_data = input_message.data
        
        # Get input number from data dictionary
        input_number = input_data["number"]
        
        ## ~~~~ Main logic ~~~~
        ########## YOUR CODE GOES HERE ##########
        # TODO: Reverse the input number (e.g. 1234 -> 4321)
        reversed_number = int(str(input_number)[::-1])
        ########## YOUR CODE GOES HERE ##########
        
        # ~~~ Preparing the response message ~~~
        response = {"reversed_number": reversed_number}
        
        # This method packages the `response` in a FlowMessage object 
        # containing the necessary metadata to send the message back
        # to the sender of the input message  
        reply = self.package_output_message(
            input_message=input_message,
            response=response,
        )
        
        # ~~~ Sending the response ~~~
        self.send_message(
            reply
        )



Note: Please be aware that the cell magic `compile_and_writefile` is saving the cell above at `./ReverseNumberFlowModule/ReverseNumberAtomicFlow.py`.



### 1.2 Creating the Default Configuration File for the Reverse Number Flow Class

For each Flow class, there exists a corresponding default configuration file. This file delineates the parameters to be configured for the Flow. Access to each field in the configuration file is facilitated within the Flow class through the syntax `self.flow_config["FIELD_NAME"]`. For instance, to retrieve the `input_interface` field, you would utilize `self.flow_config["input_interface"]`.

Now, let's proceed with composing the configuration file for the `ReverseNumberAtomicFlow`. It's noteworthy that we define it as a dictionary here and subsequently save it to a `.yaml` file. However, you also have the option to directly craft the configuration within a `.yaml` file. 

**IMPORTANT: Ensure that the configuration file is named identical to the Flow class name and is placed in the same directory as the Flow. If you don't, it won't be considered as the default configuration file of that Flow.**

In [3]:
default_config_reverse_number = \
{
    "name": "ReverseNumber",
    "description": "A Flow that takes in a number and reverses it.",

    "_target_": "ReverseNumberFlowModule.ReverseNumberAtomicFlow.ReverseNumberAtomicFlow.instantiate_from_default_config",

    "input_interface": "number",
    "output_interface": "reversed_number",
}
dict_to_yaml(default_config_reverse_number, "./ReverseNumberFlowModule/ReverseNumberAtomicFlow.yaml")

Let's dissect the configuration file for the `ReverseNumberAtomicFlow`:

- **`name` and `description`**: These parameters are straightforward. When defining a Flow, these must always be specified. Although they don't directly affect the functionality of the Flow, they are essential for documentation purposes.

- **`_target_`**: This parameter specifies how the Flow will be instantiated by `hydra`, a framework for configuring complex applications. It should always specify the path to the Flow class followed by `instantiate_from_default_config`. This standardizes the instantiation process from the default configuration file. For example, the value of `_target_` is `ReverseNumberFlowModule.ReverseNumberAtomic.ReverseNumberAtomicFlow.instantiate_from_default_config`, where `ReverseNumberFlowModule/ReverseNumberAtomic.py` is the path to the Flow class, and the class name in `ReverseNumberAtomic.py` is `ReverseNumberAtomic`.

- **`input_interface`**: This parameter outlines the keys expected in the `data` attribute of the input message. Here, the `number` key is expected.

- **`output_interface`**: Specifies the keys present in the `data` attribute of the message sent back. In this case, the `reversed_number` key will be included.

## 2. Flow Engine

In this section we introduce the basics of the Flows Engine, and guide you through the process of serving and accessing served Flows.

The Flows Engine is build on top of [CoLink](https://github.com/CoLearn-Dev/colink-sdk-python-dev). As mentioned in their documentation:

> CoLink provides a unified interface for the user, storage, communication, and computation. Extending gRPC, CoLink simplifies the development of multi-party protocols and allow implementations in different programming languages to work together consistently. With a unified interface that increases potential data contributors, CoLink has the potential to enable larger-scale decentralized data collaboration and unlock the true value of data.

In simpler terms, CoLink allows aiflows to run multiple Flows, which can be spread across multiple machines to complete a task. It enables communication between Flows hosted on different machines. Users can serve their Flows, and others can access them by obtaining an instance of the served Flow and interacting with it through a proxy.

In the following sections, we will guide you through:
- Connecting to an existing colink server or starting a local one
- Serving a Flow
- Starting a worker to run Flows
- Getting an instance of the served Flow via a proxy
- Calling the Flow through the proxy

### 2.1 Connecting to an Existing Colink Server or Starting a Local One

You have the option to either connect to an existing CoLink server or start a local one. Local servers are particularly useful for testing and development purposes. On the other hand, connecting to an existing server is beneficial when you need to access Flows served by others or serve your own Flows to others.

For this AMLD workshop, we have initiated a CoLink server for you to connect to, allowing you to serve your Flows and access Flows served by others. However, for the purpose of this tutorial, we will guide you through starting a local server. Instructions for connecting to an existing server will be provided later during the workshop.

To start a local server run the cell below (the server will run as long as your kernel is running).

In [4]:
# Start the colink server and creates a user (you)
# Returns the colink object who contains the information of the user and the server
cl = colink_utils.start_colink_server()

### 2.2 Serving A Flow

To serve a Flow, you need to define its class and `run` method, along with its default configuration file (as we've done in the previous section for the `ReverseNumberAtomicFlow`). Once the Flow is defined, you can serve it using the `serve_flow` function. Essentially, this function saves the path to the Flow's class to the CoLink server's storage to be able to instantiate instances of it when requested.

The key parameters of the `serve_flow` function are as follows:
- `cl`: The CoLink object containing user and server information (this object is created when you connect to a server).
- `flow_class_name`: Specifies the class of the Flow to be served (path to the class + class name).
- `flow_endpoint`: The endpoint where the Flow will be served. This serves as a unique identifier for accessing the Flow. If the endpoint is already in use by another Flow, your Flow will not be served.
- `singleton`: (Default is False) If set to True, only one instance of the Flow will be returned when a user requests it. In other words, multiple users can "talk" to the same Flow (e.g., all user "talks" to the same chatbot). If set to False, each user will get a different Flow instance (e.g., each user "talks"  to a different chatbots but of the same class).
- `parallel_dispatch`: (Default is False) If set to True, the Flow can handle multiple requests simultaneously. If set to False, the Flow can only handle one request at a time. Note that this makes the Flow stateless.

For example, in the cell below, we serve the `ReverseNumberAtomicFlow` at the endpoint "reverse_number" as a singleton.




In [5]:
# Serve the Flow
serving.serve_flow(
    cl=cl,
    flow_class_name="ReverseNumberFlowModule.ReverseNumberAtomicFlow.ReverseNumberAtomicFlow",
    flow_endpoint="reverse_number",
    singleton=True,
)

[[36m2024-04-08 10:37:15,305[0m][[34maiflows.utils.serving:116[0m][[32mINFO[0m] - Started serving ReverseNumberFlowModule.ReverseNumberAtomicFlow.ReverseNumberAtomicFlow at flows:reverse_number.[0m
[[36m2024-04-08 10:37:15,316[0m][[34maiflows.utils.serving:117[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m
[[36m2024-04-08 10:37:15,318[0m][[34maiflows.utils.serving:118[0m][[32mINFO[0m] - parallel_dispatch: False[0m
[[36m2024-04-08 10:37:15,320[0m][[34maiflows.utils.serving:119[0m][[32mINFO[0m] - singleton: True
[0m


True

### 2.3 Starting a Worker to Run Flows

As a server of the Flow, you must be the one to execute it when called. To execute a Flow, you need to start a worker. The worker is responsible for running the Flow's logic and will be activated whenever there are new messages to process. It's important to note that a single worker can handle multiple Flows; they are not limited to a single Flow exclusively.

To start a worker, run the cell here below:

In [6]:
# Start a worker thread to handle incoming messages
run_dispatch_worker_thread(cl)

[[36m2024-04-08 10:37:15,389[0m][[34maiflows.workers.dispatch_worker:236[0m][[32mINFO[0m] - Dispatch worker started in attached thread.[0m
[[36m2024-04-08 10:37:15,391[0m][[34maiflows.workers.dispatch_worker:237[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m


### 2.4 Getting an Instance of the Served Flow via a Proxy

Now that the Flow is served, you can access it by obtaining an instance of the served Flow through a proxy. The proxy is an `AtomicFlow` object that allows you to send messages to the requested Flow instance and receive responses from it. To obtain an instance of the served Flow, you can use the `get_flow_instance` function.

The main parameters of the `get_flow_instance` function are:

- `cl`: The CoLink object.
- `flow_endpoint`: The endpoint where the Flow is served (this should match the endpoint used to serve the Flow).
- `user_id`: The user ID of the user serving the Flow. Since we're serving a local server, you can simply set it to "local".
- `config_overrides`: (Default is None) Any configuration overrides to be applied to the Flow. This is useful when you want to modify the default configuration of the Flow. For example, you might want to change the language of a chatbot. The configuration overrides are merged with the default configuration of the Flow, with the overrides taking precedence.

In [7]:
# Get an instance of the Flow
proxy_reverse_number_flow = serving.get_flow_instance(
    cl=cl,
    flow_endpoint="reverse_number", ### TODO - SPECIFY THE CORRECT FLOW ENDPOINT
    user_id="local",
)

[[36m2024-04-08 10:37:15,730[0m][[34maiflows.utils.serving:336[0m][[32mINFO[0m] - Mounted 93f25a1c-2088-4fe3-9161-2d108242eaa8 at flows:reverse_number:mounts:local:93f25a1c-2088-4fe3-9161-2d108242eaa8[0m


### 2.5. Calling the Reverse Number Flow via the Proxy

Now that you have a proxy to the served Flow, you can send messages to it and receive responses. To send a message to the Flow, you need to package your data dictionary with the `package_input_message` method and then send a message via the proxy using the `get_reply_future` method. `get_reply_future` sends a message to the Flow with the data dictionary and the necessary information (in the `reply_data` attribute of the message) to get a reply back. It returns a future object that you can use to retrieve the reply from the Flow. The message can be retrieved from the future object using either the `get_data` method (which returns the `data` attribute of the reply message) or the `get_message` method. Both of these methods will block until the reply is received.

Without further ado, let's send a message to the `ReverseNumberAtomicFlow` to reverse the number 12345!


In [8]:
input_data = {"id": 0, "number": 12345}

# Package your data in a Flow Message

## Option 1: Via the FlowMessage class
# input_message = FlowMessage(
#     data=input_data,
# )

## Option 2 (prefered): Via the package input message method
input_message = proxy_reverse_number_flow.package_input_message(input_data)

# Send a message to reverse number and ask to get an answer back in a future

future = proxy_reverse_number_flow.get_reply_future(input_message)

# Get the response from the future
# To get the response as a data dictionary
reply_data = future.get_data()
# To get the response as a FlowMessage object
reply_message = future.get_message()

print("Data sent:\n",  input_data, "\n")
print("REPLY:\n", reply_data, "\n")



[[36m2024-04-08 10:37:15,936[0m][[34maiflows.workers.dispatch_worker:119[0m][[32mINFO[0m] - 
~~~ Dispatch task ~~~[0m
[[36m2024-04-08 10:37:15,942[0m][[34maiflows.workers.dispatch_worker:161[0m][[32mINFO[0m] - flow_endpoint: reverse_number[0m
[[36m2024-04-08 10:37:15,960[0m][[34maiflows.workers.dispatch_worker:162[0m][[32mINFO[0m] - flow_id: 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:15,963[0m][[34maiflows.workers.dispatch_worker:163[0m][[32mINFO[0m] - owner_id: local[0m
[[36m2024-04-08 10:37:15,964[0m][[34maiflows.workers.dispatch_worker:164[0m][[32mINFO[0m] - message_paths: ['push_tasks:6bcddaf9-ffe7-40bc-977b-54a0d466b773:msg'][0m
[[36m2024-04-08 10:37:15,965[0m][[34maiflows.workers.dispatch_worker:165[0m][[32mINFO[0m] - parallel_dispatch: False
[0m
[[36m2024-04-08 10:37:16,035[0m][[34maiflows.workers.dispatch_worker:188[0m][[32mINFO[0m] - Input message source: Proxy_reverse_number[0m
[[36m2024-04-08 10:37:17,173[

Data sent:
 {'id': 0, 'number': 12345} 

REPLY:
 {'reversed_number': 54321} 



## 3.Writing your First CompositeFlow: ReverseNumberSequentialFlow

In this section we will write a simple Composite Flow that reverses a number passed in the input message twice (e.g., 12345-> 54321 -> 12345).

As mentioned in section 1, **Composite Flows:** are used to orchestrate other Flows in performing the computation. Just like **Atomic Flows**, Composite Flows have a `run` method that takes an input message as its input. The `run` method of a Composite Flow typically calls other Flows via their proxies, processes the responses, and sends a reply back. The Flows that a Composite Flow orchestrates are referred to as its subflows.


### 3.1 Writing the ReverseNumberSequentialFlow Flow Class


#### 3.1.1 ReverseNumberSequential

In [9]:
%%compile_and_writefile ReverseNumberFlowModule/ReverseNumberSequentialFlowBlocking.py


from aiflows.base_flows import CompositeFlow
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
class ReverseNumberSequentialFlowBlocking(CompositeFlow):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        #~~~~~~~~~~~ Key Transformation solution 1 ~~~~~~~~~~~
        self.input_interface_second_reverse_flow = KeyInterface(
            keys_to_rename= {"reversed_number": "number"},
            keys_to_select= ["number"],
        )
    
        self.ouput_interface_reply = KeyInterface(
            keys_to_rename= {"reversed_number": "output_number"},
            keys_to_select = ["output_number"],
        )
                
    # Customize the logic within this function as needed for your specific flow requirements.
    def run(self, input_message: FlowMessage):
        
        # Call first reverse number
        future_first_reverse = self.subflows["first_reverse_flow"].get_reply_future(
                input_message
            )
        # Get response from first reverse number
        first_reverse_response = future_first_reverse.get_message()
        
        # Prepare response for the second reverse number
        first_reverse_response = self.input_interface_second_reverse_flow(first_reverse_response) 
        
        # Call second reverse number:
        future_second_reverse = self.subflows["second_reverse_flow"].get_reply_future(
            first_reverse_response
        )
        
        second_reverse_response = future_second_reverse.get_data()
        
        # Can call key interface tranformation on both message and dictionaries
        # prepare response for the initial message
        response = self.ouput_interface_reply(second_reverse_response)
        
        reply = self.package_output_message(
            input_message = input_message,
            response = response
        )
        
        self.send_message(
            reply
        )
        

Curious about how `KeyInterface` works? The `KeyInterface` class is helpful when you need to apply transformations to the data dictionary when passing from one Flow to another and to pass a dictionary that satisfies the `input_interface` of the Flow you wish to call. It allows you to define these transformations in a structured way. While you can apply these transformations directly in the `run` method of the Composite Flow, the `KeyInterface` class provides a more organized approach.

Check out the [Appendix](#4-appendix) for a detailed explanation of the `KeyInterface` class.

In [10]:
default_config_reverse_number_sequential_blocking = \
{
    "name": "ReverseNumberTwiceBlocking",
    "description": "A sequential Flow that reverses a number twice.",

    # TODO: Define the target
    "_target_": \
        "ReverseNumberFlowModule.ReverseNumberSequentialFlowBlocking.ReverseNumberSequentialFlowBlocking.instantiate_from_default_config",

    "input_interface": "number",
    "output_interface": "output_number",
    
    "subflows_config": {
        "first_reverse_flow": {
            "user_id": "local",
            "flow_endpoint": "reverse_number",
            "name": "A proxy Flow that calls reverse number to reverse number AGAIN.",
            "description": "A proxy Flow that calls reverse number to reverse number.",
        },
        "second_reverse_flow": {
            "user_id": "local",
            "flow_endpoint": "reverse_number",
            "name": "Proxy Second Reverse",
            "description": "A proxy Flow that calls reverse number to reverse number AGAIN.",
        },
    }
}
dict_to_yaml(default_config_reverse_number_sequential_blocking, "ReverseNumberFlowModule/ReverseNumberSequentialFlowBlocking.yaml")

Let's dissect the new configuration file for the `ReverseNumberSequentialFlowBlocking`:

- **`name` and `description`, `_target` , `input_interface`, `ouput_interface`**: These parameters are described in the previous section.

- **`subflows_config`**:  Each subflow is articulated as a key-item pair within a dictionary. The key denotes the name assigned to the subflow, while the corresponding item is a dictionary encapsulating the configuration of the subflow. In this instance, subflows are outlined with their default configuration, incorporating overrides for the name and description of each Flow. Noteworthy keys:
  - `flow_endpoint` and `user_id` match the `reverse_number` endpoint and `local` user ID, respectively, which were used to serve the `ReverseNumberAtomicFlow`. This allows us to get an instance of the Flow we served earlier when calling `get_flow_instance` (we don't have to serve `ReverseNumberAtomicFlow` again).
  - `_target_`: The target here is `aiflows.base_flows.AtomicFlow.instantiate_from_default_config`, which is an atomic Flow. Remember that proxy Fows are atomic Flows, so we can use the same target for all of them. Note that you can also use the target of the Flow you are calling, but this is not necessary (also, when you are calling a Flow served by some other user you might not have access to the target of the Flow, so you can use the target of the atomic Flow).

In [11]:
#Serve the Flow (just like before)
serving.serve_flow(
    cl=cl,
    flow_class_name="ReverseNumberFlowModule.ReverseNumberSequentialFlowBlocking.ReverseNumberSequentialFlowBlocking",
    flow_endpoint="ReverseNumberSequentialFlowBlocking",
)

[[36m2024-04-08 10:37:16,525[0m][[34maiflows.utils.serving:116[0m][[32mINFO[0m] - Started serving ReverseNumberFlowModule.ReverseNumberSequentialFlowBlocking.ReverseNumberSequentialFlowBlocking at flows:ReverseNumberSequentialFlowBlocking.[0m
[[36m2024-04-08 10:37:16,527[0m][[34maiflows.utils.serving:117[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m
[[36m2024-04-08 10:37:16,532[0m][[34maiflows.utils.serving:118[0m][[32mINFO[0m] - parallel_dispatch: False[0m
[[36m2024-04-08 10:37:16,533[0m][[34maiflows.utils.serving:119[0m][[32mINFO[0m] - singleton: False
[0m


True

In [12]:
# Start a second worker thread (since the Flow uses blocking calls, 2 workers are needed)
run_dispatch_worker_thread(cl)

[[36m2024-04-08 10:37:16,620[0m][[34maiflows.workers.dispatch_worker:236[0m][[32mINFO[0m] - Dispatch worker started in attached thread.[0m
[[36m2024-04-08 10:37:16,649[0m][[34maiflows.workers.dispatch_worker:237[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m


In [13]:
# Get an instance of the Flow and run it !!
proxy_reverse_sequential_blocking = serving.get_flow_instance(
    cl=cl,
    flow_endpoint="ReverseNumberSequentialFlowBlocking", #TODO: SPECIFY THE ENDPOINT
    user_id="local",
    config_overrides=default_config_reverse_number_sequential_blocking,
)

input_data = {"id": 0, "number": 12345}

# Package your data in a Flow Message
## Via the package input message method
input_message = proxy_reverse_sequential_blocking.package_input_message(input_data)

# Send a message to reverse number and ask to get an answer back in a future

future = proxy_reverse_sequential_blocking.get_reply_future(input_message)

# Get the response from the future
# To get the response as a data dictionary
reply_data = future.get_data()
# To get the response as a FlowMessage object
reply_message = future.get_message()

print("Data sent:\n",  input_data, "\n")
print("REPLY:\n", reply_data, "\n")

[[36m2024-04-08 10:37:16,949[0m][[34maiflows.utils.serving:543[0m][[32mINFO[0m] - Fetched singleton 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:16,974[0m][[34maiflows.utils.serving:543[0m][[32mINFO[0m] - Fetched singleton 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:17,020[0m][[34maiflows.utils.serving:336[0m][[32mINFO[0m] - Mounted 6fff0431-7381-435c-8319-bbb62c87bd14 at flows:ReverseNumberSequentialFlowBlocking:mounts:local:6fff0431-7381-435c-8319-bbb62c87bd14[0m
Data sent:
 {'id': 0, 'number': 12345} 

REPLY:
 {'output_number': 12345} 



[[36m2024-04-08 10:37:17,261[0m][[34maiflows.workers.dispatch_worker:119[0m][[32mINFO[0m] - 
~~~ Dispatch task ~~~[0m
[[36m2024-04-08 10:37:17,327[0m][[34maiflows.workers.dispatch_worker:161[0m][[32mINFO[0m] - flow_endpoint: reverse_number[0m
[[36m2024-04-08 10:37:17,328[0m][[34maiflows.workers.dispatch_worker:162[0m][[32mINFO[0m] - flow_id: 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:17,330[0m][[34maiflows.workers.dispatch_worker:163[0m][[32mINFO[0m] - owner_id: local[0m
[[36m2024-04-08 10:37:17,331[0m][[34maiflows.workers.dispatch_worker:164[0m][[32mINFO[0m] - message_paths: ['push_tasks:0f1ce4f5-b121-4eca-b3fc-b46854d5c9a6:msg'][0m
[[36m2024-04-08 10:37:17,333[0m][[34maiflows.workers.dispatch_worker:165[0m][[32mINFO[0m] - parallel_dispatch: False
[0m
[[36m2024-04-08 10:37:17,436[0m][[34maiflows.workers.dispatch_worker:188[0m][[32mINFO[0m] - Input message source: first_reverse_flow[0m
[[36m2024-04-08 10:37:17,467[0m

#### 3.1.2. ReverseNumberSequential with Non-Blocking Calls

In the previous example, we used blocking calls to the subflows. In this example, we will use non-blocking calls. The description here below explains the differences between blocking and non-blocking calls.

#### **Blocking vs Non-Blocking Calls**

There are two ways to call a subflow from a Composite Flow via its proxy:
1. **Blocking Calls**: The Composite Flow sends a message to the subflow and waits for the response before proceeding (and therefore stays in the `run` method until the response is received). Blocking calls are made using the `get_reply_future` method we introduced in the previous section (`get_reply_future` return a future object that you can use to retrieve the reply from the Flow via `future.get_data()` or `future.get_message()`, these "get" methods are what are blocking).
2. **Non-Blocking Calls**: The Composite Flow sends a message to the subflow and proceeds without waiting for the response. The response is expected to be sent in the input message queue of the Composite Flow. Therefore, you must exit the `run` method and the Flow will be called again when the response is received. Non-blocking calls are made using the `send_message` method.

##### Pros and cons of Blocking vs Non-Blocking Call? 

- **Blocking calls** might be more intuitive to use and understand, but they are less efficient because the Composite Flow is blocked while waiting for the response. Meaning that you are blocking one worker for the duration of the call. Therefore, you must have at least 2 worker threads running to be able to make a blocking call to a subflow and still be able to process other messages.

- **Non-blocking calls** are more efficient because the Composite Flow is not blocked while waiting for the response (one worker can run the whole Flow). However, they are more complex to use because you must manage the state of the Flow and the responses.

In the end, the choice between blocking and non-blocking calls depends on the specific use case, the requirements of the Flow and your preference.

Here below we showcase the implementation of the `ReverseNumberSequentialFlow` class with both blocking and non-blocking calls to the `ReverseNumberAtomicFlow`.

Note: Any data that you must save from one `run` method pass to another must be saved in the `self.flow_state` attribute of the Flow. This attribute is a dictionary that persists between calls to the `run` method. You can use it to save any data you need to keep between calls.

In [14]:
%%compile_and_writefile ReverseNumberFlowModule/ReverseNumberSequentialFlowNonBlocking.py


from aiflows.base_flows import CompositeFlow
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
class ReverseNumberSequentialFlowNonBlocking(CompositeFlow):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        #~~~~~~~~~~~ Key Transformation solution 1 ~~~~~~~~~~~
        self.input_interface_second_reverse_flow = KeyInterface(
            keys_to_rename= {"reversed_number": "number"},
            keys_to_select= ["number"],
        )
    
        self.ouput_interface_reply = KeyInterface(
            keys_to_rename= {"reversed_number": "output_number"},
            keys_to_select = ["output_number"],
        )
        
        self.next_state_dict = {
            "first_reverse_flow": "second_reverse_flow",
            "second_reverse_flow": "reply_to_message",
            "reply_to_message": "first_reverse_flow"
        }
    def set_up_flow_state(self):
        super().set_up_flow_state()
        self.flow_state = {"current_state": "first_reverse_flow"}
        
    def get_next_state(self):
        return self.next_state_dict[self.flow_state["current_state"]]
        
    # Customize the logic within this function as needed for your specific Flow requirements.
    def run(self, input_message: FlowMessage):
        
        # Run here is a bit like a switch statement where we decide which flow to call next.
        # We then call the next Flow and pass the input message to it. which we expect to get a reply from 
        # back in the input queue (which will call the run method againg)
        current_state = self.flow_state["current_state"]
        
        # Case where we need to reverse the number for the first time
        if current_state == "first_reverse_flow":
            #Save the initial message to the state
            self.flow_state["initial_message"] = input_message
            
            # Prepares message by editing `reply_data` so that it can get the reply from the first Flow
            # back in the input queue of the composite flow 
            message_for_first_reverse_flow = self.package_input_message(input_message.data)
            # Call the first Flow with non blocking call
            self.subflows["first_reverse_flow"].get_reply(
                message_for_first_reverse_flow
            )
        
        #Case where we need to reverse the number for the second time
        elif current_state == "second_reverse_flow":
            
            # Applies a transformation to the input message (renames keys of dictonary so that they match the
            # required format of the second flow)
            message = self.input_interface_second_reverse_flow(input_message)
            
            # Prepares message by editing `reply_data` so that it can get the reply from the first Flow
            # back in the input queue of the composite flow 
            message_for_second_reverse_flow = self.package_input_message(input_message.data)
            # call the first Flow with non blocking call
            self.subflows["second_reverse_flow"].get_reply(
                message_for_second_reverse_flow
            )
        
        # Case where we need to reply to the initial message (we've already reversed the number twice)
        else:
            message = self.ouput_interface_reply(input_message)
            
            # package ouput message to send back
                # This method packages `response` in a FlowMessage object 
                # containing the necessary metadata to send the message back
                # to the sender of the input message. 
            reply = self.package_output_message(
                input_message = self.flow_state["initial_message"],
                response = message.data
            )
            #send back the reply to initial caller of the Flow
            self.send_message(reply)
            
        self.flow_state["current_state"] = self.get_next_state()

The config is identical to the one above (except for `_target_` ) so we won't go through it again.

In [15]:
default_config_reverse_number_sequential_nonblocking = \
{
    "name": "ReverseNumberTwice",
    "description": "A sequential Flow that reverses a number twice.",

    # TODO: Define the target
    "_target_": \
        "ReverseNumberFlowModule.ReverseNumberSequentialFlowNonBlocking.ReverseNumberSequentialFlowNonBlocking.instantiate_from_default_config",

    "input_interface": "number",
    "output_interface": "output_number",
    
    "subflows_config": {
        "first_reverse_flow": {
            "user_id": "local",
            "flow_endpoint": "reverse_number",
            "name": "A proxy Flow that calls reverse number to reverse number AGAIN.",
            "description": "A proxy Flow that calls reverse number to reverse number.",
        },
        "second_reverse_flow": {
            "user_id": "local",
            "flow_endpoint": "reverse_number",
            "name": "Proxy Second Reverse",
            "description": "A proxy Flow that calls reverse number to reverse number AGAIN.",
        },
    }
}
dict_to_yaml(default_config_reverse_number_sequential_nonblocking, "ReverseNumberFlowModule/ReverseNumberSequentialFlowNonBlocking.yaml")

In [16]:
# Serve the Flow (just like before)
serving.serve_flow(
    cl=cl,
    flow_class_name="ReverseNumberFlowModule.ReverseNumberSequentialFlowNonBlocking.ReverseNumberSequentialFlowNonBlocking",
    flow_endpoint="ReverseNumberSequentialFlowNonBlocking",
)

[[36m2024-04-08 10:37:17,949[0m][[34maiflows.utils.serving:116[0m][[32mINFO[0m] - Started serving ReverseNumberFlowModule.ReverseNumberSequentialFlowNonBlocking.ReverseNumberSequentialFlowNonBlocking at flows:ReverseNumberSequentialFlowNonBlocking.[0m
[[36m2024-04-08 10:37:17,949[0m][[34maiflows.utils.serving:117[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m
[[36m2024-04-08 10:37:17,952[0m][[34maiflows.utils.serving:118[0m][[32mINFO[0m] - parallel_dispatch: False[0m
[[36m2024-04-08 10:37:17,958[0m][[34maiflows.utils.serving:119[0m][[32mINFO[0m] - singleton: False
[0m


True

In [17]:

# Get an instance of the Flow and run it !!
proxy_reverse_sequential_nonblocking = serving.get_flow_instance(
    cl=cl,
    flow_endpoint="ReverseNumberSequentialFlowNonBlocking", #TODO: SPECIFY THE ENDPOINT
    user_id="local",
    config_overrides=default_config_reverse_number_sequential_nonblocking,
)

input_data = {"id": 0, "number": 12345}

# Package your data in a Flow Message
## Via the package input message method
input_message = proxy_reverse_sequential_nonblocking.package_input_message(input_data)

# Send a message to reverse number and ask to get an answer back in a future

future = proxy_reverse_sequential_nonblocking.get_reply_future(input_message)

# Get the response from the future
# To get the response as a data dictionary
reply_data = future.get_data()
# To get the response as a FlowMessage object
reply_message = future.get_message()

print("Data sent:\n",  input_data, "\n")
print("REPLY:\n", reply_data, "\n")

[[36m2024-04-08 10:37:18,095[0m][[34maiflows.utils.serving:543[0m][[32mINFO[0m] - Fetched singleton 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:18,125[0m][[34maiflows.utils.serving:543[0m][[32mINFO[0m] - Fetched singleton 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:18,150[0m][[34maiflows.utils.serving:336[0m][[32mINFO[0m] - Mounted c7cbc9be-ef29-424e-a264-3913278e01cb at flows:ReverseNumberSequentialFlowNonBlocking:mounts:local:c7cbc9be-ef29-424e-a264-3913278e01cb[0m
Data sent:
 {'id': 0, 'number': 12345} 

REPLY:
 {'output_number': 12345} 



[[36m2024-04-08 10:37:18,401[0m][[34maiflows.workers.dispatch_worker:119[0m][[32mINFO[0m] - 
~~~ Dispatch task ~~~[0m
[[36m2024-04-08 10:37:18,412[0m][[34maiflows.workers.dispatch_worker:161[0m][[32mINFO[0m] - flow_endpoint: reverse_number[0m
[[36m2024-04-08 10:37:18,413[0m][[34maiflows.workers.dispatch_worker:162[0m][[32mINFO[0m] - flow_id: 93f25a1c-2088-4fe3-9161-2d108242eaa8[0m
[[36m2024-04-08 10:37:18,413[0m][[34maiflows.workers.dispatch_worker:163[0m][[32mINFO[0m] - owner_id: local[0m
[[36m2024-04-08 10:37:18,415[0m][[34maiflows.workers.dispatch_worker:164[0m][[32mINFO[0m] - message_paths: ['push_tasks:e6985d8d-b0fe-4aa1-950a-84378efd58e0:msg'][0m
[[36m2024-04-08 10:37:18,417[0m][[34maiflows.workers.dispatch_worker:165[0m][[32mINFO[0m] - parallel_dispatch: False
[0m
[[36m2024-04-08 10:37:18,457[0m][[34maiflows.workers.dispatch_worker:188[0m][[32mINFO[0m] - Input message source: first_reverse_flow[0m
[[36m2024-04-08 10:37:18,735[0m

## 4. Appendix

#### **Key Interfaces**

The `KeyInterface` class is helpful when you need to apply transformations to the data dictionary when passing from one Flow to another and to pass a dictionary that satisfies the `input_interface` of the Flow you wish to call. It allows you to define these transformations in a structured way. While you can apply these transformations directly in the `run` method of the Composite Flow, the `KeyInterface` class provides a more organized approach. The `KeyInterface` class takes the following input parameters (which define the transformations) and executes them in the order they are defined:
- `keys_to_rename`: A dictionary where the keys are the keys to rename and the values are the new names of the keys. For example, `{"old_key": "new_key"}` transforms `{"old_key": 1}` to `{"new_key": 1}`.

- `keys_to_copy`: A dictionary where the keys are the keys to copy and the values are the new names of the keys. For example, `{"key_a": "key_b"}` transforms `{"key_a": 1}` to `{"key_a": 1, "key_b": 1}`.
- `keys_to_set`: A dictionary where the keys are the keys to set and the values are the values to set. For example, `{"key_a": 1}` transforms `{}` to `{"key_a": 1}`.

- `additional_transformations`: A list of Callables where you can define custom transformations. Each Callable should take the arguments `data_dict` and return a dictionary as output.For example:
    ```python
        def increment_key_a(data_dict, **kwargs): 
            return {**data_dict,**{"key_a": data_dict["key_a"] + 1}}
    ```

- `key_to_select`: A list of keys to select from the dictionary. For example, `["key_a"]` transforms `{"key_a": 1, "key_b": 2}` to `{"key_a": 1}`.

- `key_to_delete`: A list of keys to delete from the dictionary. For example, `["key_a"]` transforms `{"key_a": 1, "key_b": 2}` to `{"key_b": 2}`.

