<H1 style="text-align: center;font-size:60px; color:#3E4E59"> Création d'un cluster avec dask </H1>

<H3 style='font-size:20px;color:#F25652'>Attention vos machines doivent avoir miniconda dans votre home et un environnement appelé python36 <br/>
Merci de ne pas utliser mon espace de travail</H3>
<H3 style='font-size:20px;color:#D9BB93;text-align: right'>Charles THEROND</H3>



In [1]:
import subprocess
import pandas as pd
import random
import threading

<H2 style="color:#3E4E59;font-size:40px">Création de la table contenant les adresses machines</H2>

In [2]:
def get_address(tab,salle='131'):
    '''
    Permet d 'ajouter à la table le nom des machines d'une salle donnée.
    
    INPUT:
    tab     ->  table contenant ( ou non ) la liste des adresses d'une salle.
    salle   ->  numéro de la salle pour laquelle nous souhaitons generer les IP.
    
    OUTPUT:
    tab     ->  liste contenant les adresses de la nouvelle salle ajoutée.  
    '''
    for i in range(1,31):
        if i<10:
            i=str('0'+str(i))
        tab.append(str('c'+salle+'-'+str(i)+'.enst.fr'))
    return tab

In [3]:
def update_env(add,out_list,login):
    '''
    Teste la connexion à une machine pour l'ajouter dans la chaine de cluster
    Attention votre machine doit avoir miniconda dans votre home et un environnement appelé python36
    
    INPUT:
    add         ->  adresse IP sur laquelle tester la connexion
    out_list    ->  liste des résultats pour multi-threading
    login       ->  pseudo de l'utilisateur
    '''
    try:
        result=[add]
        retour = subprocess.check_output(
            str("ssh -t "+login+"@"+add+" 'export PATH=\"/cal/homes/"+login+"/miniconda2/bin:$PATH\";source activate python36;conda env list ' "), 
            shell=True,
            timeout=20)
        result+= str(retour).split("\\n")
        
        out_list.append(str(result).split("\\n"))
        return result
    except:
        out_list.append( str('inexistant ou injoignable '+add))
        return str('inexistant ou injoignable '+add)

In [4]:
def get_free_machine(AD,login,verbose=False):
    '''
    Fonction visant a paralleliser la recherche de machine à l'aide de thread
    
    INPUT:
    AD          ->  Liste des machines sur laquelle tester la connexion
    verbose     ->  Boolean pour afficher plus de contenu
    login       ->  pseudo de l'utilisateur
    
    OUTPUT:
    Table des adresses uniques de machines utilisable pour le cluster
    
    '''
    import threading
    out_list = list()
    jobs=[]
    for address in AD:
        thread = threading.Thread(target=update_env,args=(address, out_list,login))
        jobs.append(thread)
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    if verbose:
        print("----------------------------------------")
        print("----------MACHINE INDISPONIBLE----------") 
        echec=[]
    ip_tab=[]
   
    for val in out_list:
        if val[0] =='i':
            echec.append(val[-16:])
        else:
            ip =val[0].split(",")[0].replace("'","").replace("[","")
            ip_tab.append(ip)
    if verbose:
        print(echec)
        print("----------------------------------------")
        print("-----------MACHINE DU CLUSTER-----------") 
        print(list(set(ip_tab)))
    return list(set(ip_tab))

<H2 style="color:#3E4E59;font-size:30px"> Utilisation</H2>

In [5]:
ADRESSE=get_address([],'127')
ADRESSE=get_address(ADRESSE,'128')
ADRESSE=get_address(ADRESSE,'126')

ADRESSE1=get_address([],'125')
ADRESSE1=get_address(ADRESSE1,'129')
ADRESSE1=get_address(ADRESSE1,'133')

cluster0 = get_free_machine(ADRESSE,"ctherond",True)
#cluster1 = get_free_machine(ADRESSE1,"ctherond")

----------------------------------------
----------MACHINE INDISPONIBLE----------
[' c128-02.enst.fr', ' c128-10.enst.fr', ' c128-14.enst.fr', ' c128-21.enst.fr', ' c128-23.enst.fr', ' c128-28.enst.fr', ' c128-30.enst.fr', ' c126-05.enst.fr', ' c126-08.enst.fr', ' c126-22.enst.fr', ' c127-17.enst.fr', ' c127-01.enst.fr', ' c127-06.enst.fr', ' c127-07.enst.fr', ' c127-08.enst.fr', ' c127-13.enst.fr', ' c127-23.enst.fr', ' c127-24.enst.fr', ' c127-26.enst.fr', ' c127-30.enst.fr', ' c128-01.enst.fr', ' c128-05.enst.fr', ' c128-12.enst.fr', ' c128-18.enst.fr', ' c128-25.enst.fr', ' c126-01.enst.fr', ' c126-04.enst.fr', ' c126-02.enst.fr', ' c126-03.enst.fr', ' c126-07.enst.fr', ' c126-09.enst.fr', ' c126-10.enst.fr', ' c126-12.enst.fr', ' c126-11.enst.fr', ' c126-14.enst.fr', ' c126-13.enst.fr', ' c126-15.enst.fr', ' c126-16.enst.fr', ' c126-18.enst.fr', ' c126-17.enst.fr', ' c126-19.enst.fr', ' c126-21.enst.fr', ' c126-23.enst.fr', ' c126-24.enst.fr', ' c126-25.enst.fr', ' c126-27.enst.fr

<H2 style="color:#3E4E59;font-size:40px">Lancement et extinction d'un cluster</H2>

<H2 style="color:#3E4E59;font-size:30px">Lancement du cluster</H2>

In [6]:
def start_master(add,login):
    '''
    Essayer de lancer le master sur l'adresse add à partir de repertoire du login
    '''
    try:
        print(add,"master")
        print("http://"+add+":8787/status")
        cmd="ssh -t "+login+"@"
        cmd+=add
        cmd+=' "export PATH="/cal/homes/'+login+'/miniconda2/bin:$PATH";source activate python36; nohup dask-scheduler " '
        retour = subprocess.check_output(
            cmd, 
            shell=True,
            timeout=2)
        return "ok"
    except:
        return str('echec starting master '+add)
def start_worker(master,add,login):
    '''
    Essaye de lancer un worker à l'adresse add sur le master master à partir du repertoire du login
    '''
    try:
        print(add,'worker')
        retour = subprocess.check_output(
            str('ssh -t '+login+'@'+add+' "export PATH="/cal/homes/'+login+'/miniconda2/bin:$PATH";source activate python36;nohup dask-worker '+master+'\:8786 "'), 
            shell=True,
            timeout=10)
        return 'ok'
    except:
        return str('echec starting worker '+add)  

def start_all(tab,login):
    '''
    Fonction visant à paralleliser le lancement du cluster à partir de tab sur l'espace login
    
    INPUT:
    tab     ->  liste des adresses sur lesquelles lancer le cluster
    login   ->  nom de l'espace sur le quelle lancer le cluster sur chaque machine
    '''
    master=""
    jobs=[]
    for ind in range(len(tab)):
        if ind==0:
            start_master(tab[ind],login)
            master=tab[ind]
        else:
            thread = threading.Thread(target=start_worker,args=(master,tab[ind],login))
            jobs.append(thread)
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

<H2 style="color:#3E4E59;font-size:30px">Extinction du cluster</H2>

In [7]:
def checkstop(add,login,Kill=False,verbose=False):
    '''
    Fonction permetant la vérification et l'arret des processus Dask sur la machine add
    
    INPUT:
    add         ->  Adresse sur la quelle éteindre effectuer la vérification ou l'arret
    login       ->  Espace sur lequel effectuer les tache
    Kill        ->  Boolean déterminant si on souhaite arreter les processus dask
    verbose     ->  Pour avoir plus de contenu
    '''
    try:
        result=""
        retour = subprocess.check_output(
            str("ssh -t "+login+"@"+add+" 'ps -ef | grep dask' "), 
            shell=True,
            timeout=10)
        for row in str(retour).split("\\n"):
            if 'python36/bin/python' in row:
                result1=row.split(" ")[1]
                result2=row.split(" ")[2]
                if not(Kill):
                    print('Le processus',result1,result2,'sur',add,"est en execution")
                if Kill :
                    subprocess.call(str("ssh -t  "+login+"@"+add+" 'kill "+result1+" ' "), 
                    shell=True,timeout=2)
                    subprocess.call(str("ssh -t  "+login+"@"+add+" 'kill "+result2+" ' "), 
                    shell=True,timeout=2)
                    if verbose:
                        print("Le processus",result1,result2,"sur",add,"est mort")
        if Kill & verbose:
            checkstop(add,login,False,True)
        return "L'arret c'est bien déroulé"
    except:
        return str('erreur de processus')
def checkstop_all(tab,login,K=False,VV=False):
    '''
    Fonction visant l'extinction parallelisée des machines de tab reliées au login
    '''
    jobs=[]
    for ind in range(1,len(tab)):
        thread = threading.Thread(target=checkstop,args=(tab[ind],login,K,VV))
        jobs.append(thread)
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    checkstop(tab[0],login,K,VV)

<H2 style="color:#3E4E59;font-size:30px">Utilisation</H2>

In [8]:
start_all(cluster0,"ctherond")

c127-21.enst.fr master
http://c127-21.enst.fr:8787/status
c128-08.enst.fr worker
c128-19.enst.fr worker
c127-11.enst.fr worker
c128-06.enst.fr worker
c128-26.enst.fr worker
c127-12.enst.fr worker
c127-15.enst.fr worker
c128-13.enst.fr c126-06.enst.frworker worker
c128-24.enst.fr
 worker
c127-29.enst.fr worker
c127-10.enst.fr workerc127-05.enst.fr worker

c127-18.enst.fr worker
c128-17.enst.fr worker
c127-03.enst.fr worker
c128-16.enst.fr worker
c126-20.enst.fr worker
c127-09.enst.fr worker
c128-09.enst.fr worker
c128-04.enst.fr worker
c127-20.enst.fr worker
c127-02.enst.fr worker
c128-11.enst.fr worker
c127-14.enst.fr worker
c127-27.enst.fr worker
c128-22.enst.fr c128-29.enst.fr worker
workerc128-07.enst.fr
 worker
c127-28.enst.fr worker
c127-19.enst.fr worker
c128-03.enst.fr worker
c128-27.enst.fr c127-04.enst.frworker worker

c128-15.enst.fr worker
c127-25.enst.fr worker
c127-22.enst.fr c127-16.enst.frworker
 worker
c128-20.enst.fr worker


In [9]:
checkstop_all(cluster0,"ctherond",True,True)

Le processus 12861  sur c126-06.enst.fr est mort
Le processus  7590 sur c127-09.enst.fr est mort
Le processus  7880 sur c127-14.enst.fr est mort
Le processus 12258  sur c128-19.enst.fr est mort
Le processus  8518 sur c127-12.enst.fr est mort
Le processus 10094  sur c127-29.enst.fr est mort
Le processus 18991  sur c128-07.enst.fr est mort
Le processus 18043  sur c128-24.enst.fr est mort
Le processusLe processus  9883 sur c127-11.enst.fr est mort
  4862 sur c127-10.enst.fr est mort
Le processus 12087  sur c127-15.enst.fr est mort
Le processusLe processus 14678  sur   7689 sur c127-27.enst.fr est mort
c128-04.enst.fr est mort
Le processus 15129  sur c128-17.enst.fr est mort
Le processus 10511  sur c127-05.enst.fr est mort
Le processus 16555  sur c128-08.enst.fr est mort
Le processus 11867  sur c127-03.enst.fr est mort
Le processus 13881  sur c127-04.enst.fr est mort
Le processus 16638  sur c128-22.enst.fr est mort
Le processus 16701  sur c128-13.enst.fr est mort
Le processus  8177 sur c12

In [10]:
checkstop_all?

[0;31mSignature:[0m [0mcheckstop_all[0m[0;34m([0m[0mtab[0m[0;34m,[0m [0mlogin[0m[0;34m,[0m [0mK[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m [0mVV[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m Fonction visant l'extinction parallelisée des machines de tab reliées au login
[0;31mFile:[0m      ~/Documents/<ipython-input-7-1fd496f7cf1a>
[0;31mType:[0m      function
