# Use this notebook for developing mlframe prototype 2

## Object-oriented approach. Testing on DL with benchmarks.

In [1]:
import multiprocessing
from Queue import Queue, Empty
import subprocess
import os
import time
import re
import sys

In [2]:
def printObj(obj):
    for key,val in obj.__dict__.iteritems():
        print key,"=",val

In [51]:
# Class for executing local and remote commands in background processes.
# Command must be in multiprocessing.Manager.dict().
# Command must be a string with the command and arguments separated with spaces.
# Should be called from BashExecutor class.
# Usage sample: 
# d = BashExecutor(command,hostname=hostname)
# d.start()
#  command - string representation of the command and arguments.
# exec_remote.sh must output subprocess exit code in the form:
# exitcode=N
# , where N is the number.
class BackgroundExecutor(multiprocessing.Process):
    
    def __init__(self, d = {}, debug=False, hostname="", callback=None, callback_params=None):
        super(BackgroundExecutor,self).__init__()
        self.debug = debug
        self.hostname = hostname
        self.exitcode_pat = re.compile("^exitcode=(\d+)")
        self.d = d
        self.command = d["command"].split(" ")
        self.callback = callback
        self.callback_params = callback_params
        if debug:
            print "In ",self.name," command='",self.command,"'"
            if callback is not None:
                print "Callback:",callback,callback_params
        
        
    # Poll exit code of self.proc and store it if not None.
    def poll(self):
        exitcode = self.proc.poll()
        if exitcode is not None:
            self.setExitCode(exitcode)
            if self.callback is not None:
                if self.debug: print "Calling callback",str(self.callback),str(self.callback_params)
                self.callback(self.callback_params)
        return exitcode
    
    # Set given exit code to Command class object.
    # Called from poll() method.
    def setExitCode(self, ec):
#         if self.debug:
#             print "Manager.dict object:",repr(self.d)
#             print type(self.d)
#             printObj(self.d)
        if self.d["exitcode"] == "":
            if self.debug: print "exit code:",ec,
            self.d["exitcode"] = ec                        
        
            
    def run(self):
        if self.debug: 
            print "In {}. Calling {}".format(self.name,self.command)
            
        command = self.command
        proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, shell=False)
        self.proc = proc
        if self.debug: print self.name, "process started"
        std = ""
        for std in iter(proc.stdout.readline, b''):
            if std is not None and len(std) > 0:
                self.d["stdout"] = self.d["stdout"] + std
                if self.debug:
                    self.d[1]=1                    
                    assert self.d[1] ==1, "Cannot set to Manager dictionary"
                print std,
            
            time.sleep(.5)
        self.poll()
    
    def nameYourself(self):
        cp = multiprocessing.current_process()
        print "name",cp._name
        print "parent pid",cp._parent_pid
        print "id",cp._identity
    
    def getExitcode(self):
        return self.d["exitcode"]
            

In [52]:
# Class for calling BackgroundExecutor with multiprocessing.Manager.dict object,
# which stores string representation of the command, and after execution: its ouptput and exit code.
class BashExecutor:
    def __init__(self, command, debug=False, callback=None, callback_params=None):
        manager = multiprocessing.Manager()
        self.d = manager.dict()
        self.d["command"] = command
        if debug:
            print "In BashExecutor:"
            print "Command set to '",self.d["command"],"'"
            print "command type:",type(self.d["command"])
            print "Callback:",callback, callback_params
        self.d["stdout"] = ""
        self.d["exitcode"] = ""
        self.debug = debug
        self.BE = BackgroundExecutor(self.d, self.debug, callback=callback, callback_params=callback_params)
        
    def start(self):
        self.BE.start()
        
    def getExitcode(self):        
        return self.d["exitcode"]
    
    def getStdout(self):
        return self.d["stdout"]
    
    def getCommand(self):
        return self.d["command"]
        
    def __str__(self):
        s = self.d["command"]
        if self.d["exitcode"] != "":
            s += " ("+str(self.d["exitcode"])+")"            
        return s
    
    def __repr__(self):
        s = self.d["command"]
        if self.d["exitcode"] != "":
            s += " ("+str(self.d["exitcode"])+")"            
        return s
        

In [83]:
# Class for storing host-related data: hostname, access key and username.
# Stores commands (instances of Command class) executed on the host.
# Has methods for connecting to the host with ssh, connection test, executing commands.
class Host(object):
    
    def __init__(self, hostname, address="localhost", user="", key="", debug=False, ssh_options="", scp_options=""):
        self.hostname = hostname
        self.address = address
        self.user = user
        key = key.replace("~",os.environ['HOME'])
        self.key = key
        self.debug = debug
        self.ssh_command = "ssh"
        self.scp_command = "scp"
        if key != "":
            self.ssh_command += " -i "+key
            self.scp_command += " -i "+key
        if ssh_options != "":
            self.ssh_command += " -o "+ssh_options
        if scp_options != "":
            self.scp_command += " -o "+scp_options

        self.host=""
        if user != "":
            self.host += user+"@"
        self.host += hostname
        self.commands = []
        if debug:
            print "hostname,address,user,key:",self.hostname,self.address,self.user,self.key
            print "ssh command:",self.ssh_command.replace(' ','.'),"host:",self.host
        
    def ping(self, N=5):        
        comm = BashExecutor("ping -c "+str(N)+" "+self.address, debug=self.debug)
        #print "Append:",self.commands.append(comm)
        index = len(self.commands)
        self.commands.append(comm)
        comm.start()        
        return index
        
    
    # Execute command on the server
    # If command is a script file, copy the file before executing it.
    def execute(self,command,options=""):
        # Deside if command is a script name or just a command
        package_directory = os.path.dirname(os.getcwd())
        scripts_location=os.path.realpath(os.path.join(package_directory,"mlframe","scripts"))
        command_list = command.split(" ")
        command_file = command_list[0]
        command_script_path = os.path.join(scripts_location, command_file)
        callback = None
        if os.path.isfile(command_script_path):
            if self.debug: print "script file exists:",command_script_path
            # Joined command: execute remote script file and delete it
            command = "./"+command
            if self.copyScriptFile(command_script_path) != 0:
                # Error copying script file
                return -1
            callback = self.removeScriptFile
            callback_params = command_file
            
        if self.debug: print "Command:",command
        
        if options != "":
            options = " "+options
        command = self.ssh_command+options+" "+self.host+" "+command
        
        if self.debug:
            print "Executing:",command.replace(' ','.')
            if callback is not None:
                print "Callback:",callback,callback_params
        if callback is not None:
            comm = BashExecutor(command, debug=self.debug,callback=callback, callback_params=callback_params)
        else:
            comm = BashExecutor(command, debug=self.debug)
        index = len(self.commands)
        self.commands.append(comm)
        comm.start()        
        return index
    
    def removeScriptFile(self, script_file, options=""):
        command = self.ssh_command
        if options != 0:
            command += " "+options
        command += self.host+" rm "+ script_file
        if self.debug: print "Remove command:",command
        proc = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, shell=False)
        std,stderr = proc.communicate()
        exitcode = proc.poll()
        if exitcode != 0:        
            print "Error deleting",script_file,"on",self.hostname,exitcode
            print std
            print "!",stderr
            
        return exitcode
        
    # Copy script file to the host.
    # Called every time before remote script execution.
    def copyScriptFile(self, script_file, options=""):
        command = self.scp_command+" -o ConnectTimeout=5"
        if options != 0:
            command += " "+options
        command += script_file + " "+self.host+":"
        if self.debug: print "Copy command:",command
        proc = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, shell=False)
        std,stderr = proc.communicate()
        exitcode = proc.poll()
        if exitcode != 0:        
            print "Error copying",script_file,"to",self.hostname,exitcode
            print std
            print "!",stderr
            
        return exitcode
    
    def connect_test(self):
        options = "-o ConnectTimeout=5"
        return self.execute("hostname",options=options)
        

In [85]:
dl_serv = Host("DL","DL",debug=True)

hostname,address,user,key: DL DL  
ssh command: ssh host: DL


In [86]:
dl_serv.execute("test.sh ab cs")

script file exists: /Users/peterbryzgalov/work/ML/mlframework/mlframe/scripts/test.sh
Copy command: scp -o ConnectTimeout=5 /Users/peterbryzgalov/work/ML/mlframework/mlframe/scripts/test.sh DL:
Command: ./test.sh ab cs
Executing: ssh.DL../test.sh.ab.cs
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bd510>> test.sh
In BashExecutor:
Command set to ' ssh DL ./test.sh ab cs '
command type: <type 'str'>
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bd510>> test.sh
In  BackgroundExecutor-58  command=' ['ssh', 'DL', './test.sh', 'ab', 'cs'] '
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bd510>> test.sh


0

In BackgroundExecutor-58. Calling ['ssh', 'DL', './test.sh', 'ab', 'cs']
BackgroundExecutor-58 process started
Running test command on DL-Server pars: ab cs
DL-Server(ab):1
(ab):1
DL-Server(ab) err:1
DL-Server(ab):2
(ab):2
DL-Server(ab) err:2
DL-Server(ab):3
(ab):3
DL-Server(ab) err:3
DL-Server(ab):4
(ab):4
DL-Server(ab):5
(ab):5
DL-Server(ab) err:4
DL-Server(ab) err:5
exit code: 125 Calling callback <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bd510>> test.sh
Remove command: ssh DL rm test.sh


In [87]:
key = "~/.ssh/id_rsa_com"
mouse = Host("mouse-pub","mouse-pub",user="peter",key=key,debug=True)

hostname,address,user,key: mouse-pub mouse-pub peter /Users/peterbryzgalov/.ssh/id_rsa_com
ssh command: ssh.-i./Users/peterbryzgalov/.ssh/id_rsa_com host: peter@mouse-pub


In [88]:
mouse.execute("test.sh")

script file exists: /Users/peterbryzgalov/work/ML/mlframework/mlframe/scripts/test.sh
Copy command: scp -i /Users/peterbryzgalov/.ssh/id_rsa_com -o ConnectTimeout=5 /Users/peterbryzgalov/work/ML/mlframework/mlframe/scripts/test.sh peter@mouse-pub:
Command: ./test.sh
Executing: ssh.-i./Users/peterbryzgalov/.ssh/id_rsa_com.peter@mouse-pub../test.sh
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bdcd0>> test.sh
In BashExecutor:
Command set to ' ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com peter@mouse-pub ./test.sh '
command type: <type 'str'>
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bdcd0>> test.sh
In  BackgroundExecutor-60  command=' ['ssh', '-i', '/Users/peterbryzgalov/.ssh/id_rsa_com', 'peter@mouse-pub', './test.sh'] '
Callback: <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bdcd0>> test.sh


0

In BackgroundExecutor-60. Calling ['ssh', '-i', '/Users/peterbryzgalov/.ssh/id_rsa_com', 'peter@mouse-pub', './test.sh']
BackgroundExecutor-60 process started
Running test command on mouse pars: 
mouse():1
():1
mouse() err:1
mouse():2
():2
mouse():3
():3
mouse() err:2
mouse() err:3
mouse():4
():4
mouse():5
():5
mouse() err:4
mouse() err:5
exit code: 125 Calling callback <bound method Host.removeScriptFile of <__main__.Host object at 0x1090bdcd0>> test.sh
Remove command: ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com peter@mouse-pub rm test.sh


In [89]:
dl_serv.connect_test()

Command: hostname
Executing: ssh.-o.ConnectTimeout=5.DL.hostname
In BashExecutor:
Command set to ' ssh -o ConnectTimeout=5 DL hostname '
command type: <type 'str'>
Callback: None None
In  BackgroundExecutor-62  command=' ['ssh', '-o', 'ConnectTimeout=5', 'DL', 'hostname'] '


1

In BackgroundExecutor-62. Calling ['ssh', '-o', 'ConnectTimeout=5', 'DL', 'hostname']
BackgroundExecutor-62 process started
DL-Server
exit code: 0

In [90]:
dl_serv.execute("nvidia-smi dmon -c 2 -s u")

Command: nvidia-smi dmon -c 2 -s u
Executing: ssh.DL.nvidia-smi.dmon.-c.2.-s.u
In BashExecutor:
Command set to ' ssh DL nvidia-smi dmon -c 2 -s u '
command type: <type 'str'>
Callback: None None
In  BackgroundExecutor-64  command=' ['ssh', 'DL', 'nvidia-smi', 'dmon', '-c', '2', '-s', 'u'] '


2

In BackgroundExecutor-64. Calling ['ssh', 'DL', 'nvidia-smi', 'dmon', '-c', '2', '-s', 'u']
BackgroundExecutor-64 process started
# gpu    sm   mem   enc   dec
# Idx     %     %     %     %
    0     0     0     0     0
    1     0     0     0     0
    2     0     0     0     0
    3     0     1     0     0
    4     0     0     0     0
    5     0     0     0     0
    6     0     0     0     0
    7     0     0     0     0
    0     0     0     0     0
    1     0     0     0     0
    2     0     0     0     0
    3     0     0     0     0
    4     0     0     0     0
    5     0     0     0     0
    6     0     0     0     0
    7     0     0     0     0
exit code: 0

In [91]:
test = range(3)
print test
test[0] = mouse.ping(1)
test[1] = mouse.connect_test()
test[2] = mouse.execute("hostname && date")

[0, 1, 2]
In BashExecutor:
Command set to ' ping -c 1 mouse-pub '
command type: <type 'str'>
Callback: None None
In  BackgroundExecutor-66  command=' ['ping', '-c', '1', 'mouse-pub'] '
Command: hostname
Executing: ssh.-i./Users/peterbryzgalov/.ssh/id_rsa_com.-o.ConnectTimeout=5.peter@mouse-pub.hostname
In BackgroundExecutor-66. Calling ['ping', '-c', '1', 'mouse-pub']
BackgroundExecutor-66 process started
ping: cannot resolve mouse-pub: Unknown host
In BashExecutor:
Command set to ' ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com -o ConnectTimeout=5 peter@mouse-pub hostname '
command type: <type 'str'>
Callback: None None
In  BackgroundExecutor-68  command=' ['ssh', '-i', '/Users/peterbryzgalov/.ssh/id_rsa_com', '-o', 'ConnectTimeout=5', 'peter@mouse-pub', 'hostname'] '
Command: hostname && date
Executing: ssh.-i./Users/peterbryzgalov/.ssh/id_rsa_com.peter@mouse-pub.hostname.&&.date
In BackgroundExecutor-68. Calling ['ssh', '-i', '/Users/peterbryzgalov/.ssh/id_rsa_com', '-o', 'ConnectTime

In [93]:
for tst in test:
    print tst
    i=tst
    print mouse.commands[i].getCommand()
    print mouse.commands[i].getStdout(),
    print "exit",mouse.commands[i].getExitcode()

1
ping -c 1 mouse-pub
ping: cannot resolve mouse-pub: Unknown host
exit 68
2
ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com -o ConnectTimeout=5 peter@mouse-pub hostname
mouse
exit 0
3
ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com peter@mouse-pub hostname && date
mouse
Sat Mar 31 18:42:11 JST 2018
exit 0


In [58]:
print mouse.commands

[ping -c 1 mouse.local (0), ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com -o ConnectTimeout=5 peter@mouse.local hostname (0), ssh -i /Users/peterbryzgalov/.ssh/id_rsa_com peter@mouse.local hostname && date (0)]


In [94]:
muse = Host("muse","52.158.238.181", user="ubuntu", key=key,debug=False)

In [95]:
muse.ping()
muse.connect_test()

1

ssh: Could not resolve hostname muse: nodename nor servname provided, or not known
PING 52.158.238.181 (52.158.238.181): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3

--- 52.158.238.181 ping statistics ---
5 packets transmitted, 0 packets received, 100.0% packet loss


In [96]:
reedbush = Host("reedbush","reedbush.cc.u-tokyo.ac.jp",key="~/.ssh/id_rsa_com",user="i96005")

In [97]:
reedbush.execute("test.sh")

Error copying /Users/peterbryzgalov/work/ML/mlframework/mlframe/scripts/test.sh to reedbush 1

Connection closed by 130.69.241.14 port 22
lost connection



-1

In [98]:
reedbush.ping(3)

0

PING reedbush.cc.u-tokyo.ac.jp (130.69.241.14): 56 data bytes
64 bytes from 130.69.241.14: icmp_seq=0 ttl=56 time=32.878 ms
64 bytes from 130.69.241.14: icmp_seq=1 ttl=56 time=11.704 ms
64 bytes from 130.69.241.14: icmp_seq=2 ttl=56 time=23.768 ms

--- reedbush.cc.u-tokyo.ac.jp ping statistics ---
3 packets transmitted, 3 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 11.704/22.783/32.878/8.672 ms
