In [1]:
import os
os.environ["MKL_THREADING_LAYER"] = "GNU"
import numpy as np
import pandas as pd
import seaborn as sns
import pymc3 as pm
import scipy.stats as ss
from sampled import sampled
import theano
import theano.tensor as tt
import pickle
theano.config.exception_verbosity='high'

#disable logging temporarily
import logging
logger = logging.getLogger("pymc3")
logger.propagate = False

In [2]:
class LustrePartition:
    def __init__(self, name, num_osts, s_prob, d_prob, model):
        self.name = name
        self.num_osts = num_osts
        self.num_oss_servers = num_osts
        self.num_mds = 1
        
        # topology
        self.net1_paths = None
        self.net2_paths = None
        
        # potential
        self.net1_potential = None
        self.net2_potential = None
        
        #pm.Beta('c_prob', alpha=10.0, beta =0.1)
        with model:
            self.oss_servers = pm.Bernoulli(name + '_oss', p = s_prob, shape = self.num_oss_servers)
            self.osts = pm.Bernoulli(name + '_ost', p = d_prob, shape = self.num_osts)
            self.mds_server = pm.Bernoulli(name + '_mds',   p = s_prob)
            self.ha_disk_paths = tt._shared(np.asarray([0.0]*self.num_oss_servers, dtype=theano.config.floatX))
            for ha_disk in range(self.num_oss_servers):
                self.ha_disk_paths = tt.set_subtensor(self.ha_disk_paths[ha_disk], self.__create_ha__(ha_disk))
            self.ha_pairs = tt.reshape(self.ha_disk_paths, (1,self.num_oss_servers))
            
    def __create_ha__(self, ha_disk):
        # series parallel system for ha_server
        if ha_disk % 2 == 0:
            return (1.0 - ((1.0-self.oss_servers[ha_disk]) *  (1.0-self.oss_servers[ha_disk+1]))) * self.osts[ha_disk]
        else:
            return (1.0 - ((1.0-self.oss_servers[ha_disk]) *  (1.0-self.oss_servers[ha_disk-1]))) * self.osts[ha_disk]


class Kaleidoscope:
    def __init__(self):
        # pymc3 params
        self.num_draws = 100
        self.num_tune = 0
        self.num_cores = 10
        self.c_prob = 0.999
        self.s_prob = 0.999
        self.d_prob = 0.999
        self.n_prob = 0.99
        self.pm_model = pm.Model()
        self.health_status = None
        
        # monitoring params
        self.num_clients = 6 # store ping clients
        
        # Blue Waters design
        self.lustre_partitions = {
            "scratch":  LustrePartition("scratch", 360, self.s_prob, self.d_prob, self.pm_model), 
            "projects": LustrePartition("projects", 36, self.s_prob, self.d_prob, self.pm_model),
            "home":     LustrePartition("home", 36, self.s_prob, self.d_prob, self.pm_model)
        }
        # define lustre components
        with self.pm_model:
            # define networks
            self.ib_net = pm.Bernoulli('ib_net', p = self.n_prob)                         # 1
            self.gem_net = pm.Bernoulli('gem_net', p = self.n_prob)
            
            # define clients
            self.clients = pm.Bernoulli("clients", p = self.c_prob, shape = self.num_clients)
            for partition in self.lustre_partitions:
                store_partition = self.lustre_partitions[partition] # aliasing
                # define the topology network of storage system # c -> n (..) -> mds -> ha -> mdt
                store_partition.net1_paths = \
                    pm.math.dot(
                        tt.reshape(
                            self.clients[:self.num_clients-1]* self.ib_net * store_partition.mds_server, (self.num_clients-1, 1)
                        ), store_partition.ha_pairs
                    )
                store_partition.net2_paths = \
                    pm.math.dot(
                        tt.reshape(
                            self.clients[self.num_clients-1] * self.ib_net * self.gem_net * store_partition.mds_server, (1, 1) # only one client connected on this network
                        ), store_partition.ha_pairs
                    )
                
        
    def __apply_observations__(self, p_mat, data_success, data_fail):
        return tt.log(
            (
                tt.reshape(pm.math.flatten(p_mat), (1, -1))* 
                tt.reshape(tt._shared(data_success), (1, -1)) + 1e-9
            )  + 
             tt.reshape(pm.math.flatten(1.-p_mat), (1,-1))* 
                tt.reshape(tt._shared(data_fail), (1,-1)) + 1e-9
        ) 
    
    def infer(self, o_data):
        with self.pm_model:
            for partition in o_data:
                d1_s, d1_f, d2_s, d2_f = o_data[partition]['d1_s'], o_data[partition]['d1_f'], o_data[partition]['d2_s'], o_data[partition]['d2_f']
                # likelihood function
                if partition in self.lustre_partitions:
                    store_partition = self.lustre_partitions[partition]
                    store_partition.net1_potential = pm.Potential(partition + '_paths_net1', self.__apply_observations__(store_partition.net1_paths, d1_s, d1_f))
                    store_partition.net2_potential = pm.Potential(partition + '_paths_net2', self.__apply_observations__(store_partition.net2_paths, d2_s, d2_f))
            self.health_status = pm.sample(draws=self.num_draws, tune = self.num_tune, cores=self.num_cores, progressbar=False)

    def print_health_stats(self):
        with self.pm_model:
            # print all client status
            for i in range(0,self.num_clients):
                client_status =  np.sum(self.health_status.get_values('clients')[:,i])/float(len(self.health_status.get_values('clients')[:,i]))
                if client_status < 0.8:
                    print("client " + str(i) +  " :", client_status)
            # print network status
            ib_status = np.sum(self.health_status.get_values('ib_net'))/float(len(self.health_status.get_values('ib_net')))
            if ib_status < 0.8:
                print("ib_net ", ib_status )
            gem_status = np.sum(self.health_status.get_values('ib_net'))/float(len(self.health_status.get_values('ib_net')))
            if gem_status < 0.8:
                print("gem ", gem_status)
            
            #print partition status only failed ones
            for partition in self.lustre_partitions:
                mds_status = np.sum(self.health_status.get_values(partition + '_mds'))/float(len(self.health_status.get_values(partition + '_mds')))
                if mds_status < 0.8:
                    print(partition + "_mds " +  " :", mds_status)
                for i in range(0,self.lustre_partitions[partition].num_oss_servers):
                    p = np.sum(self.health_status.get_values(partition + '_oss')[:,i])/float(len(self.health_status.get_values(partition + '_oss')[:,i]))
                    if p <=0.95:
                        print(partition + "_oss " + str(i) +  " :", p)
                for i in range(0,self.lustre_partitions[partition].num_osts):
                    p = np.sum(self.health_status.get_values(partition + '_ost')[:,i])/float(len(self.health_status.get_values(partition + '_ost')[:,i]))
                    if p <=0.95:
                        print(partition + "_ost " + str(i) +  " :", p)
    def print_health_stats_trace(self, ts):
        with self.pm_model:
            # print all client status
            for i in range(0,self.num_clients):
                client_status =  np.sum(self.health_status.get_values('clients')[:,i])/float(len(self.health_status.get_values('clients')[:,i]))
                if client_status < 0.8:
                    print(ts, "client " + str(i) +  " :", client_status)
            # print network status
            ib_status = np.sum(self.health_status.get_values('ib_net'))/float(len(self.health_status.get_values('ib_net')))
            if ib_status < 0.8:
                print(ts, "ib_net ", ib_status )
            gem_status = np.sum(self.health_status.get_values('ib_net'))/float(len(self.health_status.get_values('ib_net')))
            if gem_status < 0.8:
                print(ts, "gem ", gem_status)
            
            #print partition status only failed ones
            for partition in self.lustre_partitions:
                mds_status = np.sum(self.health_status.get_values(partition + '_mds'))/float(len(self.health_status.get_values(partition + '_mds')))
                if mds_status < 0.8:
                    print(ts, partition + "_mds " +  " :", mds_status)
                for i in range(0,self.lustre_partitions[partition].num_oss_servers):
                    p = np.sum(self.health_status.get_values(partition + '_oss')[:,i])/float(len(self.health_status.get_values(partition + '_oss')[:,i]))
                    if p <=0.95:
                        print(ts, partition + "_oss " + str(i) +  " :", p)
                for i in range(0,self.lustre_partitions[partition].num_osts):
                    p = np.sum(self.health_status.get_values(partition + '_ost')[:,i])/float(len(self.health_status.get_values(partition + '_ost')[:,i]))
                    if p <=0.95:
                        print(ts, partition + "_ost " + str(i) +  " :", p)
        

In [3]:
class DataTrace:
    def __init__(self, ddir):
        self.num_clients_net1 = 5
        self.num_clients_net2 = 1
        self.lustre_partitions = {
            "scratch":  {"num_oss" : 360, 'd_s': None, 'd_f': None, "d1_s": None, "d1_f": None, "d2_s": None, "d2_f": None }, 
            "projects": {"num_oss" : 36, 'd_s': None, 'd_f': None, "d1_s": None, "d1_f": None, "d2_s": None, "d2_f": None },
            "home":     {"num_oss" : 36, 'd_s': None, 'd_f': None, "d1_s": None, "d1_f": None, "d2_s": None, "d2_f": None}
        }
        self.ddir = ddir
        
    def run_trace(self):
        scratch_files = [f.split(".")[0] for f in os.listdir(self.ddir + os.sep + "scratch")]
        project_files = [f.split(".")[0] for f in os.listdir(self.ddir + os.sep + "projects")]
        home_files =    [f.split(".")[0] for f in os.listdir(self.ddir + os.sep + "home")]
        
        for f in scratch_files:
            for partition in self.lustre_partitions:
                store_partition = self.lustre_partitions[partition]
                fname = self.ddir + os.sep + partition + os.sep + f
                store_partition['d_s'] = pickle.load(open(fname + ".success", 'rb'))
                store_partition['d_f'] = pickle.load(open(fname + ".failed", 'rb'))
            for ts in self.lustre_partitions['scratch']['d_s']:
                try:
                    for partition in self.lustre_partitions:
                        store_partition = self.lustre_partitions[partition]
                        num_oss = store_partition['num_oss']
                        d_s = store_partition['d_s']
                        d_f = store_partition['d_f']

                        d1_s = d_s[ts][0 : 5 * num_oss]
                        d2_s = d_s[ts][5 * num_oss: 6*num_oss]
                        d1_f = d_f[ts][0 : 5*num_oss]
                        d2_f = d_f[ts][5 * num_oss: 6*num_oss]
                        store_partition['d1_s'], store_partition['d1_f'], store_partition['d2_s'], store_partition['d2_f'] = \
                            d1_s, d1_f, d2_s, d2_f
                except:
#                     print("discarded the dataset as %d not found in %s" % (ts, partition)) # TODO
                    continue
                k = Kaleidoscope()
                k.infer(self.lustre_partitions)
                k.print_health_stats_trace(ts)

In [4]:
class Test:
    def __init__(self, failed_componnet_id):
        self.num_clients_net1 = 5
        self.num_clients_net2 = 1
        self.failed_componnet_id = failed_componnet_id
        self.lustre_partitions = {
            "scratch":  {"num_oss" : 360, 'd1_s': None, 'd1_f': None, 'd2_s': None, 'd2_f': None}, 
            "projects": {"num_oss" : 36, 'd1_s': None, 'd1_f': None, 'd2_s': None, 'd2_f': None},
            "home":     {"num_oss" : 36, 'd1_s': None, 'd1_f': None, 'd2_s': None, 'd2_f': None}
        }
        for partition in self.lustre_partitions:
            store_partition = self.lustre_partitions[partition]
            store_partition['d1_s'], store_partition['d1_f'], store_partition['d2_s'], store_partition['d2_f'] = \
                self.__generate_test_data__(store_partition['num_oss'])
        k = Kaleidoscope()    
        k.infer(self.lustre_partitions)
        k.print_health_stats()
        
    def __generate_test_data__(self, num_oss):
        # dummy data of observations
        num_samples = 10
        eps = 1e-9
        dummy_data_success_net1 = np.array([num_samples]*num_oss*self.num_clients_net1)
        dummy_data_fail_net1 = np.array([eps]*num_oss*self.num_clients_net1)

        dummy_data_success_net2 = np.array([num_samples]*num_oss*1)
        dummy_data_fail_net2 = np.array([eps]*num_oss*1)

        for i in range(self.num_clients_net1):
            dummy_data_success_net1[i*num_oss + self.failed_componnet_id] = eps
            dummy_data_fail_net1[i*num_oss + self.failed_componnet_id] = num_samples

        dummy_data_success_net2[0*num_oss + self.failed_componnet_id] = eps
        dummy_data_fail_net2[0*num_oss + self.failed_componnet_id] = num_samples
        return dummy_data_success_net1, dummy_data_fail_net1, dummy_data_success_net2, dummy_data_fail_net2

In [5]:
# dt = DataTrace( "/gpfs/gpfs0/home/sjha8/projects/klscope/data_mat")
# dt.run_trace()

In [6]:
%timeit
# failing ost 3 on each partition
testgen = Test(3)

Only 100 samples in chain.
Multiprocess sampling (10 chains in 10 jobs)
BinaryGibbsMetropolis: [scratch_oss, scratch_ost, scratch_mds, projects_oss, projects_ost, projects_mds, home_oss, home_ost, home_mds, ib_net, gem_net, clients]
The rhat statistic is larger than 1.05 for some parameters. This indicates slight problems during sampling.
The estimated number of effective samples is smaller than 200 for some parameters.


scratch_ost 3 : 0.002
projects_ost 3 : 0.005
home_ost 3 : 0.007
