#### Tips
data operation (clean, transform) not only on master node, split them at first to all nodes
- split process from beginning and check the cpu consumption of each step

should be better on some algorithm like k-fold (faster by parallel computing): 
- do it!!
- use gather to create train or just bcast all data on each node at first

find a better way to measure CPU rather than fork        
- multipleprocess

save latency and cpu measure as a function can be imported

#### MPI Random Forest Classifier - Latency

In [6]:
# package
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
# data frame operation
import numpy as np
# latency timer

from mpi4py import MPI

In [None]:
def MPI_rf():
    # mpi4py: MPI.COMM_WORLD: communicator including all nodes
    #.Get_rank()/.Get_size()
    comm = MPI.COMM_WORLD 
    rank = comm.Get_rank()
    size = comm.Get_size()
    # for recording
    file_name = "./" + str(dt.datetime.now().strftime('%d%H%M')) +"_mpi_sklearn_rf_"
    # for node 1-8, 1680!
    L = 1680 # length before scatter
    l = int(L / size) # length after scatter
    # on all nodes, create variables for later use
    X_train = None
    Y_train = None
    Pred = None
    
    # on master, import/transform datas
    if rank == 0:
        # Load data from a text file, specify seperator as ','
        train = np.genfromtxt('temptrain.csv', delimiter=',')
        ## ?? train1 = train.reshape(157774)
        # subset a sample
        samples = 2000
        sample = train[1:samples]
        
        d1 = sample[0:samples-3]
        d2 = sample[1:samples-2]
        d3 = sample[2:samples-1]
        # .reshape(-1, 1) change shape to one column(do not specify row)
        # np.concatenate((arr1, arr2,...), axis=1): append arrays by row
        X_train = np.concatenate((d1.reshape(-1,1),
                                  d2.reshape(-1,1),
                                  d3.reshape(-1,1)),axis=1)
        Y_train = sample[2:samples]
        # Y_train = Y_train.T
        
        #confirm number of nodes
        print("\n"+str(size)+"\n\n")
        # data to be scatterd
        X_train = X_train[:L,:]  
        Y_train = Y_train[:L]
        # create to gather data
        Pred = np.zeros([l, size])
        
    # create to received data on each processl
    part_x = np.zeros([l,3])
    part_y = np.zeros(l) 
    
    # timer for communication and modelling
    t_start = MPI.Wtime()
    # scatter to each process
    comm.Scatter(X_train, part_x, root = 0) 
    comm.Scatter(Y_train, part_y, root = 0) 
    #define fit model
    rfc = RandomForestClassifier(max_depth= 6, min_samples_leaf=9, 
                                 n_estimators = 50, 
                                 min_samples_split=15, 
                                 max_features=0.6, oob_score=True)
    rfc.fit(part_x, part_y) 
    pred = rfc.predict(part_x)
    # gather pred to process 0
    comm.Gather(pred, Pred, root=0)
    # modeling done
    t_end = MPI.Wtime()
    latency = t_end - t_start
    
    # record latency on each node
    file = open(file_name+"latency.txt", "a") 
    file.write('%d,%d,%20.4f\n' % (size, rank, latency))
    # Evaluation
    if rank == 0 :
        score = f1_score(Y_train, Pred.reshape(L), average='micro')
        # open(file, mode='a'): open for writing, appending
        file = open(file_name+"score.txt", "a") 
        file.write('%d, %f\n' % (size, score))
        print(score)

In [None]:
# if this file is executed directly, run MPI_rf()                
if __name__ == '__main__':
    MPI_rf()      

In [51]:
!for i in {1..8}; do mpiexec -n $i python MPI_rf_latency.py; done

----------------------------------------------------------------------------
Open MPI has detected that a parameter given to a command line
option does not match the expected format:

  Option: n
  Param:  {1..8}

This is frequently caused by omitting to provide the parameter
to an option that requires one. Please check the command line and try again.
----------------------------------------------------------------------------


#### MPI Random Forest Classifer - CPU

In [None]:
from sklearn import datasets 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.metrics import f1_score
from os import fork
from os import kill 

from mpi4py import MPI
import numpy as np 
import datetime as dt
import time 
import csv 
#psutil: process and system utilities
import psutil
import signal

#import ctypes
#libc = ctypes.CDLL('libc.so.6') 

def cpu(file_name, node, size):
    #cpu count
    #cores = psutil.cpu_count()
    cpu_file = open(file_name+str(node)+"_cpu.txt", "a")
    #cpu_file.write('#Number of processes : %d\n'%(size))
    while 1:
        # cpu used percent
        cpu_percent = psutil.cpu_percent()
        #virtual memory info: total, available, percent, used, free, active
        memory_percent = psutil.virtual_memory()[2]
        #disk usage info: total, used, free, percent
        disk_percent = psutil.disk_usage('/')[3]
        # package counts
        net_io_count = psutil.net_io_counters(pernic=True)
        time.sleep(0.01) 
        cpu_file.write('%d,%2.2f,%2.2f,%2.2f\n'
                       %(size, cpu_percent, memory_percent, disk_percent))  

def getSize():
    iris = datasets.load_iris() 
    return len(iris.data)  

def MPI_temp():
    comm = MPI.COMM_WORLD 
    rank = comm.Get_rank()
    size = comm.Get_size()
    # for node 1-8, 1680!
    L = 84000 # length before scatter
    l = int(L / size) # length after scatter
    # on all nodes, create variables for later use
    X_train = None
    Y_train = None
    Pred = None
    stop = 0
    file_name = "./sample"+str(L)+"_mpi_sklearn_rf_" 
    
    if rank == 0:       
        # Load data from a text file, specify seperator as ','
        train = np.genfromtxt('temptrain.csv', delimiter=',')
        ## ?? train1 = train.reshape(157774)
        samples = 100000
        sample1 = train[1:samples]
    
        d1 = sample1[0:samples-3]
        d2 = sample1[1:samples-2]
        d3 = sample1[2:samples-1]
        # .reshape(-1, 1) change shape to one column(do not specify row)
        # np.concatenate((arr1, arr2,...), axis=1): append arrays by row
        X_train = np.concatenate((d1.reshape(-1,1),
                                  d2.reshape(-1,1),
                                  d3.reshape(-1,1)),axis=1)
        Y_train = sample1[2:samples]
        #Y_train = Y_train.T
        print("\n"+str(size)+"\n\n")
        X_train = X_train[:L,:]  
        Y_train = Y_train[:L] 
        Pred = np.zeros([l, size])
        
    # define received data on each process(according to size)
    part_x = np.zeros([l,3])
    part_y = np.zeros(l) 
    
    # fork() copy a process
    newpid = fork()
    # only on child process(fork() returns 0) of rank 0, call cup()
    if newpid == 0:
        libc.prctl(1, 15) 
        cpu(file_name, rank, size)
        raise SystemExit
    # on parent process(fork() returns new ID of process)
    else: 
        t_start = MPI.Wtime()
        comm.Scatter(X_train, part_x, root = 0) 
        comm.Scatter(Y_train, part_y, root = 0)       
        clf = RandomForestClassifier(max_depth= 6, min_samples_leaf=9,
                                     n_estimators = 50,
                                     min_samples_split=15,
                                     max_features=0.6, oob_score=True)
        clf.fit(part_x, part_y) 
        pred = clf.predict(part_x) 
        comm.Gather(pred, Pred, root=0)
        t_end = MPI.Wtime()
        latency = t_end - t_start
        # record latency on each node
        file = open(file_name+"latency.txt", "a") 
        file.write('%d,%d,%.4f\n' % (size, rank, latency))
        if rank == 0:
            score = f1_score(Y_train, Pred.reshape(L), average='micro')
            # open(file, mode='a'): open for writing, appending
            file = open(file_name+"score.txt", "a") 
            file.write('%d, %f\n' % (size, score))

if __name__ == '__main__':
    MPI_temp()      