# Testcase: Subset of Collaborators

# Getting Started

Initially, we start by specifying the module where cells marked with the `#| export` directive will be automatically exported. 

In the following cell, `#| default_exp experiment `indicates that the exported file will be named 'experiment'. This name can be modified based on user's requirement & preferences

In [None]:
#| default_exp experiment

In [None]:
# | export

from metaflow import Flow

from openfl.experimental.workflow.interface.fl_spec import FLSpec
from openfl.experimental.workflow.placement.placement import aggregator, collaborator


class bcolors:  # NOQA: N801
    OKBLUE = "\033[94m"
    OKCYAN = "\033[96m"
    OKGREEN = "\033[92m"
    HEADER = "\033[95m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    ENDC = "\033[0m"


Let us now define the flow of the testcase datastore cli

In [None]:
#| export

class TestFlowSubsetCollaborators(FLSpec):
    """
    Testflow to validate working of Subset Collaborators in Federated Flow.
    """

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

    @aggregator
    def start(self):
        """
        Starting the flow with random subset of collaborators
        """
        print(
            f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for "
            + f"validating Subset of collaborators  {bcolors.ENDC}"
        )
        self.collaborators = self.runtime.collaborators

        # select subset of collaborators
        self.subset_collabrators = self.collaborators[:2]

        print(
            f"... Executing flow for {len(self.subset_collabrators)} collaborators out of Total: "
            + f"{len(self.collaborators)}"
        )

        self.next(self.test_valid_collaborators, foreach="subset_collabrators")

    @collaborator
    def test_valid_collaborators(self):
        """
        set the collaborator name
        """
        print("executing collaborator step test_valid_collaborators for "
              + f"collaborator {self.name}.")
        self.collaborator_ran = self.name
        self.next(self.join)

    @aggregator
    def join(self, inputs):
        """
        List of collaboartors ran successfully
        """
        print("inside join")
        self.collaborators_ran = [input.collaborator_ran for input in inputs]
        self.next(self.end)

    @aggregator
    def end(self):
        """
        End of the flow
        """
        print(f"End of the test case {TestFlowSubsetCollaborators.__name__} reached.")
        testcase()


def testcase():
    tc_pass_fail = {
        "passed": [], "failed": []
    }
    subset_collaborators = ["envoy_one", "envoy_two"]
    f = Flow("TestFlowSubsetCollaborators/")
    r = f.latest_run
    # Collaborator test_valid_collaborators step
    step = list(r)[1]
    # Aggregator join step
    join = list(r)[0]

    collaborators_ran = list(join)[0].data.collaborators_ran
    print(f"collaborators_ran: {collaborators_ran}")

    if len(list(step)) != len(subset_collaborators):
        tc_pass_fail["failed"].append(
            f"{bcolors.FAIL}...Flow only ran for {len(list(step))} "
            + f"instead of the {len(subset_collaborators)} expected "
            + f"collaborators- Testcase Failed.{bcolors.ENDC} "
        )
    else:
        tc_pass_fail["passed"].append(
            f"{bcolors.OKGREEN}Found {len(list(step))} tasks for each of the "
            + f"{len(subset_collaborators)} collaborators - "
            + f"Testcase Passed.{bcolors.ENDC}"
        )
    passed = True
    for collaborator_name in subset_collaborators:
        if collaborator_name not in collaborators_ran:
            passed = False
            tc_pass_fail["failed"].append(
                f"{bcolors.FAIL}...Flow did not execute for "
                + f"collaborator {collaborator_name}"
                + f" - Testcase Failed.{bcolors.ENDC}"
            )

    if passed:
        tc_pass_fail["passed"].append(
            f"{bcolors.OKGREEN}Flow executed for all collaborators"
            + f"- Testcase Passed.{bcolors.ENDC}"
        )
    for values in tc_pass_fail.values():
        print(*values, sep="\n")

    print(
        f"{bcolors.OKBLUE}Testing FederatedFlow - Ending test for validating "
        + f"the subset of collaborators. {bcolors.ENDC}"
    )
    if tc_pass_fail.get("failed"):
        tc_pass_fail_len = len(tc_pass_fail.get("failed"))
        raise AssertionError(
            f"{bcolors.FAIL}\n {tc_pass_fail_len} Test "
            + f"case(s) failed ... {bcolors.ENDC}"
        )


## Workspace creation

In [None]:
#| export

from openfl.experimental.workflow.runtime import FederatedRuntime

director_info = {
    'director_node_fqdn':'localhost',
    'director_port':50050,
    'cert_chain': None,
    'api_cert': None,
    'api_private_key': None,
}

federated_runtime = FederatedRuntime(
    collaborators= ['envoy_one', 'envoy_two', 'envoy_three', 'envoy_four'], 
    director=director_info, 
    notebook_path='./testflow_subset_of_collaborators.ipynb'
)

In [None]:
federated_runtime.get_envoys()

In [None]:
#| export

flflow = TestFlowSubsetCollaborators(checkpoint=True)
flflow.runtime = federated_runtime

In [None]:
flflow.run()

In [None]:
vars(flflow)