# Test to run the experiments in the cloud

In [None]:
import mechanize
import numpy
import os
import queue
import random
import shutil
import socket
import string
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
import pandas
import libcloud
import paramiko
from dataclasses import dataclass
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider
from paramiko.buffered_pipe import PipeTimeout

### Configure the SSH user (Massoud cause it's the user where the bd folder is)

You need to create a GCLOUD-ACCOUNT with an extra key as explained here: https://libcloud.readthedocs.io/en/stable/compute/drivers/gce.html
and add it to the GCLOUD-KEY-PATH for the driver. The private RSA key named PKEY you'll find in your .ssh folder,
normally with the name gcloud-compute-platform

In [None]:
SSH_USER = 'am72ghiassi'
GCLOUD_ACCOUNT = 'libcloud@qpe-big-dl.iam.gserviceaccount.com'
GCLOUD_KEY_PATH = './qpe.json'  # The path to the Service Account Key (a JSON file)
GCLOUD_PROJECT = 'qpe-big-dl'  # GCloud project id
PKEY = './google_compute_engine'
DESIGN_CSV = ''  # The CSV with the experiment design

In [None]:
ComputeEngine = get_driver(Provider.GCE)

driver = ComputeEngine(GCLOUD_ACCOUNT, GCLOUD_KEY_PATH, project=GCLOUD_PROJECT)


### Create the Node definitions

In [None]:

class Node(ABC):
    def __init__(self, driver, name, master=False, masterNode=None):
        """Basic Node """
        print(f'Starting node with name {name}')
        self.driver = driver
        self.name=name
        if not master and masterNode == None:
            raise ValueError("Slave nodes need a master")
        self.master = masterNode
        _nodes = self.driver.list_nodes()
        for n in _nodes:
            if n.name == self.name:
                print(f'Found node {n} with name {n.name} and IPs {n.public_ips}, {n.private_ips}')
                self.public_ip = n.public_ips[0]
                self.private_ip = n.private_ips[0]
        self.connected = False

        for i in range(5):  # Try 5 times
            try:
                self.open_ssh()
                break
            except Exception as e:
                print(e)
                time.sleep(5)
            if not self.connected:
                raise RuntimeError(f"Can't connect to node {self.name}")
        self.start_type()

    def open_ssh(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.load_system_host_keys()
        self.ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
        self.k = paramiko.RSAKey.from_private_key_file(PKEY)
        self.ssh.connect(self.public_ip, username='diego', pkey=self.k)
        self.connected = True
        print(f'Node {self.name} connected via ssh')

    def __del__(self):
        self.close_ssh()

    def close_ssh(self):
        self.connected = False
        self.ssh.close()

    @abstractmethod
    def start_type(self):
        pass

@dataclass
class JobOptions:
    batch_size: int
    max_epochs: int

class MasterNode(Node):
    def start_type(self):
        stdin, stdout, stderr = self.ssh.exec_command(
            f'/home/{SSH_USER}/bd/spark/sbin/start-master.sh')
        if len(stderr.read()) > 0:
            print(stdout.read())
            print(stderr.read())

    def submit(self, options: JobOptions, filename, timeout):

        command = f"""/home/{SSH_USER}/bd/spark/bin/spark-submit --master spark://{self.private_ip}:7077 --driver-cores 1 \
                    --driver-memory 1G --total-executor-cores 2 --executor-cores 1 --executor-memory 1G \
                    --py-files /home/{SSH_USER}/bd/spark/lib/bigdl-0.11.0-python-api.zip,/home/{SSH_USER}/bd/mnist/lenet5.py \
                    --properties-file /home/{SSH_USER}/bd/spark/conf/spark-bigdl.conf \
                    --jars /home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar \
                    --conf spark.driver.extraClassPath=/home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar \
                    --conf spark.executer.extraClassPath=bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar /home/{SSH_USER}/bd/mnist/lenet5.py \
                    --action train --dataPath /tmp/mnist --batchSize {options.batch_size} --endTriggerNum {options.max_epochs} > {filename}"""

        print(command)



        stdin, stdout, stderr = self.ssh.exec_command(command)


        if len(stderr.read()) > 0:
            print(stderr.read())
            print(stdout.read())


    def cancel(self):
        br = mechanize.Browser()
        br.open(f"http://{self.public_ip}:8080")

        def select_form(form):
            return form.attrs.get('action', None) == 'app/kill/'
        try:
            br.select_form(predicate=select_form)
        except mechanize._mechanize.FormNotFoundError:
            print("FormNotFoundError")
        except Exception as e:
            print("An error occurred during cancelloing.")
            print(e)
        br.submit()


class SlaveNode(Node):
    def start_type(self):
        stdin, stdout, stderr = self.ssh.exec_command(f'/home/{SSH_USER}/bd/spark/sbin/start-slave.sh spark://{self.master.private_ip}:7077')
        if len(stderr.read()) > 0:
            print(stdout.read())
            print(stderr.read())

#### Create the nodes (master and slaves)

You just have to introduce the name of the node and it automatically finds it and starts all the
daemons necessary

In [156]:
# Try to connect to the master node
master = MasterNode(driver, 'bigdl', master=True)

Starting node with name bigdl
Found node <Node: uuid=cca95b2c0671f4fddfc8dd6f0e127b94cd440f20, name=bigdl, state=RUNNING, public_ips=['34.122.201.129'], private_ips=['10.128.0.4'], provider=Google Compute Engine ...> with name bigdl and IPs ['34.122.201.129'], ['10.128.0.4']
Node bigdl connected via ssh


In [157]:
# Try to create the other slaves
s1 = SlaveNode(driver, 'instance-1', master=False, masterNode=master)
s2 = SlaveNode(driver, 'instance-2', master=False, masterNode=master)



Starting node with name instance-1
Found node <Node: uuid=2f0b3b48e49b0ef04c01e71aaa9ef20d8388451b, name=instance-1, state=RUNNING, public_ips=['34.122.96.93'], private_ips=['10.128.0.5'], provider=Google Compute Engine ...> with name instance-1 and IPs ['34.122.96.93'], ['10.128.0.5']
Node instance-1 connected via ssh
Starting node with name instance-2
Found node <Node: uuid=15d8728539302907d0a183542c07f7f925cbbb5f, name=instance-2, state=RUNNING, public_ips=['34.72.116.119'], private_ips=['10.128.0.6'], provider=Google Compute Engine ...> with name instance-2 and IPs ['34.72.116.119'], ['10.128.0.6']
Node instance-2 connected via ssh


  key.get_name(), hostname, hexlify(key.get_fingerprint())
  key.get_name(), hostname, hexlify(key.get_fingerprint())


In [None]:
# Submit a job with batch size 128 and 1 epoch for testing
opt = JobOptions(128, 1)

# Save the output to this file
master.submit(opt, '/home/diego/test.out', 200)
