# Machine Translation with Transformer

In [83]:
import sys
sys.path.append('../..')

In [84]:
import yaml
import time
import glob
from dataclasses import asdict
from app.cli.modules.node import get_config_db, get_node_manager
from app.cli.modules.role import get_role_manager
from app.cli.modules.cluster import get_cluster_config_db, get_cluster_manager
from clap.utils import float_time_to_string, path_extend, yaml_load
from clap.executor import SSHCommandExecutor, AnsiblePlaybookExecutor


In [85]:
configuration_db = get_config_db()
cluster_config_db = get_cluster_config_db()
node_manager = get_node_manager()
role_manager = get_role_manager()
cluster_manager = get_cluster_manager()
# Private's path (usually ~/.clap/private/) will be used for other methods
private_path = node_manager.private_path

Redefinition of setup setup-initial. Skipping


`cluster_config_db` will load all cluster configs at `~/.clap/configs/clusters/` and will store all in the `clusters` member. `clusters` is a dictionary, where the keys are the name of cluster configuration the values are dataclasses of type `ClusterConfig`.

Let's list all cluster configurations and get the configuration named `npb-cluster`.

In [86]:
print(list(cluster_config_db.clusters.keys()))

['my-cluster', 'example-cluster']


In [87]:
npb_cluster_config = cluster_config_db.clusters['my-cluster']
print(npb_cluster_config)

ClusterConfig(cluster_config_id='my-cluster', options=None, before_all=[SetupConfig(roles=[RoleAdd(name='gan', extra={})], actions=[RoleActionType(role='gan', action='update-packages', extra={})])], before=[], after_all=[SetupConfig(roles=[], actions=[RoleActionType(role='gan', action='install-packages', extra={'packages': 'python3-pip, build-essential, cmake, openmpi-bin, openmpi-common, openmpi-doc, libopenmpi-dev'})]), SetupConfig(roles=[], actions=[RoleActionType(role='gan', action='run-command', extra={'cmd': 'sudo apt-get -y install python-is-python3'})]), SetupConfig(roles=[], actions=[RoleActionType(role='gan', action='run-command', extra={'cmd': 'sudo pip install mxnet gluonnlp sacremoses'})]), SetupConfig(roles=[], actions=[RoleActionType(role='gan', action='run-command', extra={'cmd': 'git clone https://github.com/robertopossidente/optimizer-clap-app.git'})]), SetupConfig(roles=[], actions=[RoleActionType(role='gan', action='run-command', extra={'cmd': 'sudo mkdir -p /etc/an

The configuration is a dataclass, so it can be ful converted to a dict, with `asdict` function.

In [88]:
npb_cluster_config_dict = asdict(npb_cluster_config)
print(yaml.dump(npb_cluster_config_dict, indent=4))

after: []
after_all:
-   actions:
    -   action: install-packages
        extra:
            packages: python3-pip, build-essential, cmake, openmpi-bin, openmpi-common,
                openmpi-doc, libopenmpi-dev
        role: gan
    roles: []
-   actions:
    -   action: run-command
        extra:
            cmd: sudo apt-get -y install python-is-python3
        role: gan
    roles: []
-   actions:
    -   action: run-command
        extra:
            cmd: sudo pip install mxnet gluonnlp sacremoses
        role: gan
    roles: []
-   actions:
    -   action: run-command
        extra:
            cmd: git clone https://github.com/robertopossidente/optimizer-clap-app.git
        role: gan
    roles: []
-   actions:
    -   action: run-command
        extra:
            cmd: sudo mkdir -p /etc/ansible/facts.d/ && sudo touch /etc/ansible/facts.d/times.fact
                && sudo chown ubuntu:ubuntu /etc/ansible/facts.d/times.fact
        role: gan
    roles: []
-   actions:
    -   

We can start a cluster, based on a cluster configuration, using the `start_cluster`  method from `ClusterManager` class The function will return a cluster id that will be used for other methods.

In [89]:
cluster_id = cluster_manager.start_cluster(npb_cluster_config)
print(cluster_id)

[1;35mthe implicit localhost does not match 'all'[0m

PLAY [localhost] ***************************************************************

TASK [Starting 1 type-a instances (timeout 600 seconds)] ***********************
[0;33mchanged: [localhost][0m

PLAY RECAP *********************************************************************
[0;33mlocalhost[0m                  : [0;32mok=1   [0m [0;33mchanged=1   [0m unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

[1;35mthe implicit localhost does not match 'all'[0m

PLAY [localhost] ***************************************************************

TASK [Tagging instances] *******************************************************
[0;33mchanged: [localhost] => (item={'id': 'i-00ec9ce079a006a84', 'name': 'MarySlaughter-50e04022'})[0m

PLAY RECAP *********************************************************************
[0;33mlocalhost[0m                  : [0;32mok=1   [0m [0;33mchanged=1   [0m unreachable=0    failed=0

We can get a full cluster information using `get_cluster_by_id` method from `ClusterManager` class. It will return a dataclass of type `ClusterDescriptor` that has all the information of a custer. TO get all clusters in the repository, `get_all_clusters` function returns a list of `ClusterDescriptor`. 

Let's print the `ClusterDescriptor` from cluster the recently created cluster `cluster-da580f1038254cfa98b203ca109ecb53` in YAML format.

Given a cluster id, we can get all CLAP nodes that belongs to this cluster, using `get_all_cluster_nodes` method from `ClusterManager` class. It wil return a list of node ids, which can be used with several CLAP modules, such as `NodeManager` and `RoleManager` classes..

In [90]:
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print(cluster_nodes)

['50e04022b4264d5685b1630a874af2f5', 'f0dd6a63228048cf8600ad8ab3da7510']


Using the `get_cluster_nodes_types` method from `ClusterManager` class will result in a dictionary where the key are the cluster node types (e.g., `npb-type-b`) and the values are a list of node ids of nodes from that type.

In [91]:
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
print(cluster_nodes_with_type)

{'type-a': ['50e04022b4264d5685b1630a874af2f5'], 'type-b': ['f0dd6a63228048cf8600ad8ab3da7510']}


In [92]:
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
print(yaml.dump(cluster_nodes_with_type))

type-a:
- 50e04022b4264d5685b1630a874af2f5
type-b:
- f0dd6a63228048cf8600ad8ab3da7510



In [93]:
cluster_manager.setup_cluster(cluster_id, nodes_being_added=None, max_workers=1, start_at_stage='before_all')
#cluster_manager.setup_cluster(cluster_id, nodes_being_added=None, max_workers=1, start_at_stage='after_all')


PLAY [all] *********************************************************************

TASK [Gathering Facts] *********************************************************
[0;32mok: [f0dd6a63228048cf8600ad8ab3da7510][0m
[0;32mok: [50e04022b4264d5685b1630a874af2f5][0m

TASK [Perform package list update] *********************************************
[0;33mchanged: [f0dd6a63228048cf8600ad8ab3da7510][0m
[0;33mchanged: [50e04022b4264d5685b1630a874af2f5][0m

PLAY RECAP *********************************************************************
[0;33m50e04022b4264d5685b1630a874af2f5[0m : [0;32mok=2   [0m [0;33mchanged=1   [0m unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   
[0;33mf0dd6a63228048cf8600ad8ab3da7510[0m : [0;32mok=2   [0m [0;33mchanged=1   [0m unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   


PLAY [all] *********************************************************************

TASK [Gathering Facts] *****************************************

In [94]:
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
host_nodes = node_manager.get_nodes_by_id(cluster_nodes)
print(host_nodes)
hosts_ips=''
for node in host_nodes:
    print (f"{node.node_id}: Status: {node.status}; IP: {node.ip}")
    hosts_ips = hosts_ips + node.ip + ':1,'
hosts_ips = hosts_ips[0:len(hosts_ips)-1]
print(hosts_ips)

master_id = cluster_nodes_with_type['type-a']
print(master_id)
master_node = node_manager.get_nodes_by_id(master_id)
print(master_node)

[NodeDescriptor(node_id='50e04022b4264d5685b1630a874af2f5', configuration=InstanceInfo(provider=ProviderConfigAWS(provider_config_id='aws-config-us-east-1', region='us-east-1', access_keyfile='aws-keypair.pub', secret_access_keyfile='aws-keypair.pem', vpc=None, url='https://ec2.us-east-1.amazonaws.com', provider='aws'), login=LoginConfig(login_config_id='login-ubuntu', user='ubuntu', keypair_name='gan-clap-keypair', keypair_public_file='gan-clap-keypair.pub', keypair_private_file='gan-clap-keypair.pem', ssh_port=22, sudo=True, sudo_user='root'), instance=InstanceConfigAWS(instance_config_id='type-a', provider='aws-config-us-east-1', login='login-ubuntu', flavor='t2.medium', image_id='ami-09e67e426f25ce0d7', security_group='gan-clap-sg-allopen', boot_disk_size=16, boot_disk_device=None, boot_disk_type=None, boot_disk_iops=None, boot_disk_snapshot=None, placement_group=None, price=None, timeout=None, network_ids=[])), nickname='MarySlaughter', ip='54.167.37.64', type='cloud', cloud_insta

In [95]:
command_to_execute = """
horovodrun -np 2 -H {} python /home/ubuntu/optimizer-clap-app/machine-translation/my-train.py """.format(hosts_ips)
print(command_to_execute)


horovodrun -np 2 -H 54.167.37.64:1,3.219.34.243:1 python /home/ubuntu/optimizer-clap-app/machine-translation/my-train.py 


In [96]:
'''command_to_execute = """
mpirun -np 1 -H localhost:1 -bind-to none -map-by slot python /home/ubuntu/optimizer-clap-app/machine-translation/my-train.py 2>&1 > log.txt 
echo Launch Machine Translation by ssh
"""
print(command_to_execute)'''
#command_to_execute = """horovodrun"""
executor = SSHCommandExecutor(command_to_execute, master_node, private_path)
result = executor.run()

for node_id, res in result.items():
    print(f"Node id {node_id}, executed the command: {res.ok}, ret code: {res.ret_code}")
    #resut is a dataclass, we can convert to a dictionary
    res_dict = asdict(res)
    print('-----')
    # Dump dictionary in YAML format
    print(yaml.dump(res_dict, indent=4, sort_keys=True))

Node id 50e04022b4264d5685b1630a874af2f5, executed the command: True, ret code: 0
-----
error: null
ok: true
ret_code: 0
stderr_lines:
- '[1,1]<stderr>:INFO:root:Namespace(batch_size=64, dtype=''float32'', epochs=5, lr=0.01,
    momentum=0.9, no_cuda=True)

    '
- '[1,0]<stderr>:INFO:root:Namespace(batch_size=64, dtype=''float32'', epochs=5, lr=0.01,
    momentum=0.9, no_cuda=True)

    '
stdout_lines:
- '[1,1]<stdout>:Loading dataset...

    '
- '[1,0]<stdout>:Loading dataset...

    '
- '[1,1]<stdout>:Loading dataset...

    '
- '[1,0]<stdout>:Loading dataset...

    '
- '[1,1]<stdout>:Loading dataset...

    '
- '[1,0]<stdout>:Loading dataset...

    '
- '[1,0]<stdout>:Numero de Epochs = 1

    '
- '[1,0]<stdout>:Epoch 0 - train_one_epoch started

    '
- '[1,0]<stdout>:[MO833] Rank,0,Initialization Time: 0.000199

    '
- '[1,1]<stdout>:Numero de Epochs = 1

    '
- '[1,1]<stdout>:Epoch 0 - train_one_epoch started

    '
- '[1,1]<stdout>:[MO833] Rank,1,Initialization Time: 0.00003

In [97]:
cat /home/ubuntu/.clap/roles/roles/getfacts.yml

---
- hosts: all
  gather_facts: True    # Query a set of variables in remote hosts
  gather_subset: min
  tasks:
  - name: Get iteration time from a fact
    set_fact:
      iteration_time: "{{ ansible_local.times.iteration_time }}"

In [98]:
nodes = node_manager.get_nodes_by_id(cluster_nodes)
for node in nodes:
    print (f"{node.node_id}: Status: {node.status}; IP: {node.ip}, Type: {node.type}")

playbook_file = path_extend('/home/ubuntu/.clap/roles/roles/getfacts.yml')
inventory = AnsiblePlaybookExecutor.create_inventory(nodes, private_path)
executor = AnsiblePlaybookExecutor(playbook_file, private_path, inventory=inventory)

result = executor.run()

print(f"Did the playbook executed? {result.ok}")
print(f"Ansible playbook return code: {result.ret_code}")
print(f"Let's check how nodes executed: ")
for node_id, status in result.hosts.items():
    print(f"    Node {node_id}: {status}")
print(f"Let's check variables set using set_fact module: ")
times={}
for node_id, facts in result.vars.items():
    print(f"    Node {node_id}: {facts}")
    times[node_id] = facts['iteration_time']
print('%s' % str(times))
# Dump dictionary in YAML format
print(yaml.dump(times, indent=4, sort_keys=True))

pi_logs_dir = '/home/ubuntu/CLAP/'
print('%s' % str(pi_logs_dir))
#timestamp_dir = path_extend(pi_logs_dir, '/timestamp/')
timestamp_dir = pi_logs_dir + 'timestamp/'
print('%s' % str(timestamp_dir))
os.makedirs(timestamp_dir, exist_ok=True)
timestamp_file = timestamp_dir + str(int(time.time())) +  '.out'
print('%s' % str(timestamp_file))
with open(timestamp_file, 'a') as outfile:
   yaml.dump(times, outfile, default_flow_style=False)


50e04022b4264d5685b1630a874af2f5: Status: reachable; IP: 54.167.37.64, Type: cloud
f0dd6a63228048cf8600ad8ab3da7510: Status: reachable; IP: 3.219.34.243, Type: cloud

PLAY [all] *********************************************************************

TASK [Gathering Facts] *********************************************************
[0;35m50e04022b4264d5685b1630a874af2f5 should use /usr/bin/python3, but is using [0m
[0;35m/usr/bin/python for backward compatibility with prior Ansible releases. A [0m
[0;35mfuture Ansible release will default to using the discovered platform python for[0m
[0;35m this host. See https://docs.ansible.com/ansible/2.11/reference_appendices/inte[0m
[0;35mrpreter_discovery.html for more information. This feature will be removed in [0m
[0;32mok: [50e04022b4264d5685b1630a874af2f5][0m
[0;35mf0dd6a63228048cf8600ad8ab3da7510 should use /usr/bin/python3, but is using [0m
[0;35m/usr/bin/python for backward compatibility with prior Ansible releases. A [0m
[0

In [99]:
root_dir='/home/ubuntu/CLAP/'
experiment_id = 'Transformer' 
vm_price_file = '/home/ubuntu/CLAP/vm_prices.yaml'
report_time = 10

instance_costs = yaml_load(vm_price_file)
node_prices={}
for node in nodes:
    print('---------')
    print(f"Node Id: {node.node_id}, created at {float_time_to_string(node.creation_time)}; Status: {node.status}")
    print('---------')
    # Or can be converted to a dict
    node_dict = asdict(node)
    # Printing dict in YAML format
    #print(yaml.dump(node_dict, indent=4))
    #print('**********')
    instance_flavor = node_dict['configuration']['instance']['flavor']
    node_prices[node.node_id] = float(times[node.node_id]) * float(instance_costs[instance_flavor])
    print(f"Instance Flavor: {instance_flavor}, Instance Cost: {instance_costs[instance_flavor]}, Iteration Time: {times[node.node_id]}, Node Price: {node_prices[node.node_id]}")
    print('---------')
print('Node Prices')
print(str(node_prices))


---------
Node Id: 50e04022b4264d5685b1630a874af2f5, created at 08-07-21 07:59:37; Status: reachable
---------
Instance Flavor: t2.medium, Instance Cost: 0.0464, Iteration Time: 1.6753449440002441, Node Price: 0.07773600540161132
---------
---------
Node Id: f0dd6a63228048cf8600ad8ab3da7510, created at 08-07-21 08:00:12; Status: reachable
---------
Instance Flavor: t2.large, Instance Cost: 0.0555, Iteration Time: 1.6740305423736572, Node Price: 0.09290869510173798
---------
Node Prices
{'50e04022b4264d5685b1630a874af2f5': 0.07773600540161132, 'f0dd6a63228048cf8600ad8ab3da7510': 0.09290869510173798}


In [100]:
import app.cli.modules.optimizer as opt
#opt.optimize_it(cluster_id, experiment_id, vm_price_file,root_dir, report_time) 

metrics = node_prices

cluster = cluster_manager.get_cluster_by_id(cluster_id)
cluster_dict = asdict(cluster)
print('cluster -> yaml.dump \n')
print(yaml.dump(cluster_dict, indent=4))
cluster_nodes = cluster_manager.get_all_cluster_nodes(cluster_id)
print('cluster_nodes \n')
print(cluster_nodes)
cluster_nodes_with_type = cluster_manager.get_cluster_nodes_types(cluster_id)
print('cluster_nodes_with_type \n')
print(cluster_nodes_with_type)
print('cluster_nodes_with_type -> yaml.dump \n')
print(yaml.dump(cluster_nodes_with_type))

nodes = node_manager.get_nodes_by_id(cluster_nodes)

higher_price = 0
lower_price = 1000
higher_price_node_id=[]
lower_price_node_id=[]
for node in nodes:
    node_dict = asdict(node)
    if(float(metrics[node.node_id]) > float(higher_price)):
        higher_price = float(metrics[node.node_id])
        higher_price_node_id.clear()
        higher_price_node_id.append(node.node_id)
        high_instance_type = node_dict['configuration']['instance']['instance_config_id']
        high_instance_flavor = node_dict['configuration']['instance']['flavor']
    if(float(metrics[node.node_id]) < float(lower_price)):
        lower_price = float(metrics[node.node_id])
        lower_price_node_id.clear()
        lower_price_node_id.append(node.node_id)
        low_instance_type = node_dict['configuration']['instance']['instance_config_id']
        low_instance_flavor = node_dict['configuration']['instance']['flavor']

print(f"High Price Node ID: {higher_price_node_id}, Instance Type: {high_instance_type}") 
print(f"Lower Price Node ID: {lower_price_node_id}, Instance Type: {low_instance_type}")

new_node_id = cluster_manager.grow(cluster_id, node_type=low_instance_type, count=1, min_count=1)
print(f"New Node: {new_node_id}")

new_node_ids = node_manager.get_nodes_by_id(new_node_id)
for node in new_node_ids
   print(f"[INIT] {float_time_to_string(node.creation_time)} Created node:{node.id} of #type {low_instance_flavor}")

#if(high_instance_flavor!=low_instance_flavor)
alive_node = node_manager.is_alive(new_node_id)
for node_id, alive_flag in alive_node.items():
    if(alive_flag == True):
        print(f"[INIT] New Node: {new_node_id}")
        new_nodes_types = {
            low_instance_type: new_node_id
        }
        #cluster_manager.setup_cluster(cluster_id, nodes_being_added=new_nodes_types, 
        #                                                  start_at_stage='before_all')
        stopped_node_id = node_manager.stop_nodes(higher_price_node_id)
        print(f"[STOP] {float_time_to_string(time.time())} Created node:{node.id} of #type {low_instance_flavor}")
        print(f"[STOP] Stopped Node: {stopped_node_id}")
        result = True
    else:
        result = False




cluster -> yaml.dump 

cluster_config:
    after: []
    after_all:
    -   actions:
        -   action: install-packages
            extra:
                packages: python3-pip, build-essential, cmake, openmpi-bin, openmpi-common,
                    openmpi-doc, libopenmpi-dev
            role: gan
        roles: []
    -   actions:
        -   action: run-command
            extra:
                cmd: sudo apt-get -y install python-is-python3
            role: gan
        roles: []
    -   actions:
        -   action: run-command
            extra:
                cmd: sudo pip install mxnet gluonnlp sacremoses
            role: gan
        roles: []
    -   actions:
        -   action: run-command
            extra:
                cmd: git clone https://github.com/robertopossidente/optimizer-clap-app.git
            role: gan
        roles: []
    -   actions:
        -   action: run-command
            extra:
                cmd: sudo mkdir -p /etc/ansible/facts.d/ && sudo touc

## Stopping cluster

Finally we can stop the cluster (and stop all nodes) using the `stop_cluster` command. This will also remove the cluster from cluster repository.

Other similar functions are:
* `resume_cluster`: That will resume all paused nodes of a cluster  
* `pause_cluster`: That will pause all nodes of a cluster
* `is_alive`: That will check if all cluster nodes are alive

In [101]:
#cluster_manager.stop_cluster(cluster_id)

[1;35mthe implicit localhost does not match 'all'[0m

PLAY [localhost] ***************************************************************

TASK [Stopping nodes MarioTurrubiates, MarySlaughter] **************************
[0;33mchanged: [localhost][0m

PLAY RECAP *********************************************************************
[0;33mlocalhost[0m                  : [0;32mok=1   [0m [0;33mchanged=1   [0m unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   



['50e04022b4264d5685b1630a874af2f5', '3afb704c35094654befcac2eab851f54']

In [102]:
'''clusters = cluster_manager.get_all_clusters()
for cluster in clusters:
    cluster_manager.stop_cluster(cluster)'''


In [103]:
#node_manager.stop_nodes(['2c4610c8c02b41f983d96fcf928e0a28'])
