# Working with clusters

This notebook shows how to work with clusters in CLAP. 
We will use a cluster `npb-cluster` defined in `examples/cli/1. Creating a cluster.ipynb` notebook (pre-requesite). 

This notebook covers:
* How to start and setup a cluster
* How to grow a cluster (adding more nodes to it) and how to shrink it
* How to get cluster nodes

In [7]:
import sys
sys.path.append('../..')

Installing matplotlib package

In [2]:
!pip install matplotlib



In [27]:
import yaml
import time
import glob
from dataclasses import asdict
from app.cli.modules.node import get_config_db, get_node_manager
from app.cli.modules.role import get_role_manager
from app.cli.modules.cluster import get_cluster_config_db, get_cluster_manager
from clap.utils import float_time_to_string, path_extend
from clap.executor import SSHCommandExecutor, AnsiblePlaybookExecutor

import matplotlib.pyplot as plt
%matplotlib inline

In [30]:
configuration_db = get_config_db()
cluster_config_db = get_cluster_config_db()
node_manager = get_node_manager()
role_manager = get_role_manager()
cluster_manager = get_cluster_manager()
# Private's path (usually ~/.clap/private/) will be used for other methods
private_path = node_manager.private_path

`cluster_config_db` will load all cluster configs at `~/.clap/configs/clusters/` and will store all in the `clusters` member. 
`clusters` is a dictionary, where the keys are the name of cluster configuration the values are dataclasses of type `ClusterConfig`.
- Note: by default, CLAP looks for information at `~/.clap/` directory. However, in case the CLAP_PATH environment variable is defined, CLAP will look for configurations and other information at ${CLAP_PATH}.

Let's list all cluster configurations and get the configuration named `npb-cluster`.

In [31]:
print(list(cluster_config_db.clusters.keys()))

['npb-cluster']


In [32]:
npb_cluster_config = cluster_config_db.clusters['npb-cluster']
print(npb_cluster_config)

ClusterConfig(cluster_config_id='npb-cluster', options=None, before_all=[], before=[], after_all=[SetupConfig(roles=[], actions=[RoleActionType(role='npb', action='run', extra={})])], after=[], nodes={'npb-type-b': NodeConfig(type='type-b', count=2, min_count=2, setups=[SetupConfig(roles=[RoleAdd(name='npb', extra={'pubkey': '~/.ssh/id_rsa.pub', 'privkey': '~/.ssh/id_rsa'})], actions=[])])})


The configuration is a dataclass, so it can be ful converted to a dict, with `asdict` function.

In [33]:
npb_cluster_config_dict = asdict(npb_cluster_config)
print(yaml.dump(npb_cluster_config_dict, indent=4))

after: []
after_all:
-   actions:
    -   action: run
        extra: {}
        role: npb
    roles: []
before: []
before_all: []
cluster_config_id: npb-cluster
nodes:
    npb-type-b:
        count: 2
        min_count: 2
        setups:
        -   actions: []
            roles:
            -   extra:
                    privkey: ~/.ssh/id_rsa
                    pubkey: ~/.ssh/id_rsa.pub
                name: npb
        type: type-b
options: null



We can start a cluster, based on a cluster configuration, using the `start_cluster`  method from `ClusterManager` class The function will return a cluster id that will be used for other methods.

In [34]:
cluster_id = cluster_manager.start_cluster(npb_cluster_config)
print(cluster_id)

[1;35mthe implicit localhost does not match 'all'[0m

PLAY [localhost] ***************************************************************

TASK [Starting 2 type-b instances (timeout 600 seconds)] ***********************
[0;33mchanged: [localhost][0m

PLAY RECAP *********************************************************************
[0;33mlocalhost[0m                  : [0;32mok=1   [0m [0;33mchanged=1   [0m unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

[1;35mthe implicit localhost does not match 'all'[0m

PLAY [localhost] ***************************************************************

TASK [Tagging instances] *******************************************************
[0;33mchanged: [localhost] => (item={'id': 'i-033256a5f6d5c06ad', 'name': 'CarolArchey-a8c82747'})[0m
[0;33mchanged: [localhost] => (item={'id': 'i-04f8158d480ca9de6', 'name': 'NancyHackwell-0e9db9af'})[0m

PLAY RECAP *********************************************************************
[0;

Error executing command in node 0e9db9af: [Errno None] Unable to connect to port 22 on 52.201.245.163
Error executing command in 0e9db9af: [Errno None] Unable to connect to port 22 on 52.201.245.163.


cluster-da580f1038254cfa98b203ca109ecb53


We can get a full cluster information using `get_cluster_by_id` method from `ClusterManager` class. It will return a dataclass of type `ClusterDescriptor` that has all the information of a custer. TO get all clusters in the repository, `get_all_clusters` function returns a list of `ClusterDescriptor`. 

Let's print the `ClusterDescriptor` from cluster the recently created cluster `cluster-da580f1038254cfa98b203ca109ecb53` in YAML format.

In [None]:
cluster = cluster_manager.get_cluster_by_id(cluster_id)
cluster_dict = asdict(cluster)
print(yaml.dump(cluster_dict, indent=4))

Given a cluster id, we can get all CLAP nodes that belongs to this cluster, using `get_all_cluster_nodes` method from `ClusterManager` class. It wil return a list of node ids, which can be used with several CLAP modules, such as `NodeManager` and `RoleManager` classes..

In [None]:
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print(cluster_nodes)

Using the `get_cluster_nodes_types` method from `ClusterManager` class will result in a dictionary where the key are the cluster node types (e.g., `npb-type-b`) and the values are a list of node ids of nodes from that type.

In [None]:
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
print(cluster_nodes_with_type)

## Executing Benchmark with 2 nodes 

Until here, the cluster was only started. We may setup the cluster for all nodes, using the `setup_cluster` method. This will run all setups at all nodes in the cluster. For this clustr configuration, the setup will install packages and the application at nodes and run the application until the end. At end of setup, the application is terminated (see `examples/cli/1. Creating a cluster.ipynb` for more details).

In [None]:
cluster_manager.setup_cluster(cluster_id)

We will get the results using the `result` action from `npb` role. An extra variable called `output` is required (which is the directory to save the result files) and can be passed using the `perform_action` `extra_args` parameter.

In [None]:
output_dir = f'~/experiment-results-{time.time()}/' # dont forget the last slash (/) !
role_name = 'npb'
action = 'result'
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)

extras = {
    'output': output_dir
}

playbook_result = role_manager.perform_action(role_name, action, cluster_nodes, extra_args=extras)
print(f'Result executed? {playbook_result.ok}')
print(f'Return code {playbook_result.ret_code}')

Let's see the files we've fetch....

In [None]:
result_files = glob.glob(path_extend(output_dir, '*'))
print(yaml.dump(result_files))

And parse them...

In [None]:
per_proc_times = dict()
time_to_exec = None
number_processes = None

with open(result_files[0], 'r') as f:
    for line in f:
        if 'Time in seconds' in line:
            time_to_exec = float(line.split('Time in seconds =')[1].strip())
        if 'Total processes' in line:
            number_processes = int(line.split('Total processes =')[1].strip())

per_proc_times[number_processes] = time_to_exec
print(f"Time to execute with {number_processes} processes: {time_to_exec} seconds")

## Executing Benchmark with 4 nodes 

We can grow the cluster size using the `grow` method from `ClusterManager` class. You must pass the cluster id, the type of the cluster node and the quantity of nodes that must be started as parameter. It will return a list of node ids for the freshly created nodes.

In [None]:
node_type = 'npb-type-b'
new_node_ids = cluster_manager.grow(cluster_id, node_type, count=2, min_count=2)
print(f"New nodes: {new_node_ids}")

In [None]:
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print(cluster_nodes)

In [None]:
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
print(yaml.dump(cluster_nodes_with_type))

When setting up a cluster that has grown you may wish to pass the nodes that was just added, to avoid some setups to be executed in other nodes from cluster. 

Note: setups from `before_all` and `after_all` phases are executed at all cluster's nodes. 

In [None]:
new_nodes_types = {
    node_type: new_node_ids
}
cluster_manager.setup_cluster(cluster_id, nodes_being_added=new_nodes_types)

Let's fetch the results..

In [None]:
output_dir = f'~/experiment-results-{time.time()}/' # dont forget the last slash (/) !
role_name = 'npb'
action = 'result'
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)

extras = {
    'output': output_dir
}

playbook_result = role_manager.perform_action(role_name, action, cluster_nodes, extra_args=extras)
print(f'Result executed? {playbook_result.ok}')
print(f'Return code {playbook_result.ret_code}')

In [None]:
result_files = glob.glob(path_extend(output_dir, '*'))
print(yaml.dump(result_files))

And parse some output...

In [None]:
time_to_exec = None
number_processes = None

with open(result_files[0], 'r') as f:
    for line in f:
        if 'Time in seconds' in line:
            time_to_exec = float(line.split('Time in seconds =')[1].strip())
        if 'Total processes' in line:
            number_processes = int(line.split('Total processes =')[1].strip())
            
per_proc_times[number_processes] = time_to_exec
print(f"Time to execute with {number_processes} processes: {time_to_exec} seconds")

## Executing Benchmark with 1 node

To remove nodes from cluster you can simple stop them, using `stop_nodes` from `NodeManager` class. If you don't want to stop the node, just remove the cluster tag at node using the `remove_tags` function from `NodeManager` class. The cluster tag is something like this: `.cluster:cluster-da580f1038254cfa98b203ca109ecb53`...

In [None]:
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print(cluster_nodes)

In [None]:
stopped_nodes = node_manager.stop_nodes(cluster_nodes[0:3])
print(stopped_nodes)

In [None]:
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print(cluster_nodes)

And running the application again...

In [None]:
cluster_manager.setup_cluster(cluster_id)

Fetching some results...

In [None]:
output_dir = f'~/experiment-results-{time.time()}/' # dont forget the last slash (/) !
role_name = 'npb'
action = 'result'
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)

extras = {
    'output': output_dir
}

playbook_result = role_manager.perform_action(role_name, action, cluster_nodes, extra_args=extras)
print(f'Result executed? {playbook_result.ok}')
print(f'Return code {playbook_result.ret_code}')

In [None]:
result_files = glob.glob(path_extend(output_dir, '*'))
print(yaml.dump(result_files))

And parsing them...

In [None]:
time_to_exec = None
number_processes = None

with open(result_files[1], 'r') as f:
    for line in f:
        if 'Time in seconds' in line:
            time_to_exec = float(line.split('Time in seconds =')[1].strip())
        if 'Total processes' in line:
            number_processes = int(line.split('Total processes =')[1].strip())
            
per_proc_times[number_processes] = time_to_exec
print(f"Time to execute with {number_processes} processes: {time_to_exec} seconds")

### Final application execution times with 2, 4 and 8 nodes

In [None]:
xs, ys = [], []
for n_procs in sorted(per_proc_times.keys()):
    xs.append(n_procs)
    ys.append(per_proc_times[n_procs])
plt.plot(xs, ys)
plt.show()

## Stopping cluster

Finally we can stop the cluster (and stop all nodes) using the `stop_cluster` command. This will also remove the cluster from cluster repository.

Other similar functions are:
* `resume_cluster`: That will resume all paused nodes of a cluster  
* `pause_cluster`: That will pause all nodes of a cluster
* `is_alive`: That will check if all cluster nodes are alive

In [None]:
cluster_manager.stop_cluster(cluster_id)