# Use MPI and Scalapy to distribute all of MICA workflow to work on multiple nodes

prep.py component \
Read input file and slice into manageable sizes

In [1]:
from MICA.lib import utils

In [2]:
#from IPython import parallel
import ipyparallel as ipp

# Launch an ipython parallel cluster
Run this on hpc node to launch a cluster with mpi engines

In [3]:
#this currently needs to be launched from terminal

#import subprocess 
#We need to launch an ipython parallel cluster

#!ipcluster start --n=4
#subprocess.Popen(['ipcluster','start','--n=4'])
#subprocess.Popen(['ipcluster', 'start', '--engines=MPIEngineSetLauncher', '--log-level', 'DEBUG', '--n=4'])


In [4]:
#!ipcluster start --engines=MPIEngineSetLauncher --log-level DEBUG --n=4 &

In [5]:
# Create a parallel client so that we can use %%px cell magic
# With rc and dview, we can interact between mpi ranks and the thread running this notebook
rc = ipp.Client()
dview = rc[:]
rc.ids

[0, 1, 2, 3]

In [6]:
%%px
#load all necessary libraries onto each rank

from scipy.sparse import csr_matrix
from mpi4py import MPI
import sys
import numba
import pandas as pd
import scanpy as sc
import scipy as sci
import numpy as np
import anndata
import time
from sklearn.decomposition import PCA
import fast_histogram
import logging
logging.basicConfig(level=logging.INFO)
from MICA.lib import utils

In [7]:
%%px

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
    "Hello, World! I am process %d of %d on %s.\n" 
    % (rank, size, name))

[stdout:0] Hello, World! I am process 0 of 4 on noderome152.
[stdout:1] Hello, World! I am process 1 of 4 on noderome152.
[stdout:2] Hello, World! I am process 2 of 4 on noderome152.
[stdout:3] Hello, World! I am process 3 of 4 on noderome152.


## Test this code on the terminal (outside of jupyter)
The following cell creates a python script and then we run it via command line on the terminal

In [8]:
%%px
#this is a defined elsewhere as a standalone python executable
#we are duplicating here to experiment with format for distrubuted computing
def prep_dist(input_file, out_name, slice_unit):
    """ Preprocess input file to create sliced matrices.

    Reads file into HDF5-format, adds parameters, slices data in file, generates several files with
    different combinations of slices.

    Args:
        input_file (str): path to input text-file
        out_name   (str): common rootname of generated output files
        slice_unit (int): size of each slice of cell data in input text-file
    """
    logging.basicConfig(level=logging.INFO)
    
    #Read in whole file stored in anndata csr format
    adf=utils.read_anndata_file(input_file)
    #if adf==None :
    #    raise Exception("Input file ",input_file," not found.")
        
    print("initial data size=",adf.shape)

    #anndata object can be split easily by subsetting

    #patch_file
    """ Prepares the HDF5 file for slicing. Completes the "temporary" HDF5-format file.
    Reads input file into several data frames. Indexes attributes as row, col and slice.
    Indexes columns and rows of data. Converts all data frames into an output HDF5 file
    with separate keys for each piece of data.
    Args:
        df_file       (str): path to HDF5-format file
        out_file_name (str): path to complete HDF5-format file
    """
    #patch adds parameters to anndata object
    #ceb I'm not sure what value the patch function adds
    
    # prepare h5 files (whole)
    #h5_tmp = out_name + ".h5ad.tmp"
    #utils.patch_anndata_csr_file(h5_tmp, out_name)

    #adata_sub = utils.patch_anndata_csr_file(adf, out_name)

    #os.remove(h5_tmp)
    
    #def patch_file(df_file,  out_file_name):
    #df = pd.HDFStore(df_file)["slice_0"]
    #df.to_hdf(out_file_name + ".whole.h5", "slice_0")
    #pd.DataFrame(data=np.array(df.shape + (1,)), index=["row", "col", "slice"]).to_hdf(
    #    out_file_name + ".whole.h5", "attr"
    #)
    #pd.DataFrame(data=df.columns, index=df.columns).to_hdf(
    #    out_file_name + ".whole.h5", "cols"
    #)
    #pd.DataFrame(data=df.index, index=df.index).to_hdf(
    #    out_file_name + ".whole.h5", "rows"
    #)    
    
    
    #Slice_file
    #def slice_file(df_file,  out_file_name, slice_size="1000"):
    """ Slices the HDF5 file.
    Determines the number of slices for the data, based on slice_size.
    Calculates start and end indices for slicing, creating a new dataframe
    based on those indices and appends them to the sliced output file using
    a unique-identifier in the format 'slice_00x' as key.
    Args:
        df_file       (str): path to HDF5-format file
        out_file_name (str): path to sliced output HDF5 file
        slice_size    (str): number of items in each slice
    """
    
    #h5_whole = out_name + ".whole.h5ad"
    slice_size = int(slice_unit)
    
    #df = pd.HDFStore(df_file)["slice_0"]
    
    #compute number of slices needed to break dataset in to slice_size row blocks
    b = int(np.ceil(float(adf.shape[0]) / float(slice_size)))
    #determine how many digits are in b so we can pad spaces for the string output
    digit = int(np.floor(np.log10(b)) + 1)
    #loop over slice numbers
    for i in range(b):
        #slice name is equal to batch index
        slice_name = str(i).zfill(digit)
        #compute batch row indices
        start = i * slice_size
        end = np.min([(i + 1) * slice_size, adf.shape[0]])
        #copy slice to array of slices
        adf_sub=adf[start:end,:]
        #write to file so we don't have to keep each slice in memory
        output_file_name = out_name + ".slice_" + slice_name +".h5ad"
        print("output_file_name: ",output_file_name)
        adf_sub.write(output_file_name)
        
    #return nrows and nslices
    return adf.shape[0], adf.shape[1], b
    
    #ceb this function creates pairwise block comparison files for each b(b-1)/2 block comparisons to be done.
    #calc_mi function is called to run independently on each of these comparison files.
    #We can replace this if we use mpi to spawn ranks and then have each rank perform one or more of these block comparisons.
    # The mpi rank will read the two files that it needs, so we can reduce the number of files generated
    # We can repurpose the following loop to direct the matrix file to the mpi ranks
    

  
    #in_.close()    
    #logging.info('MICA-prep step completed successfully.')

In [9]:
%%px
#ceb create csr version of numba_histogram2d, also compute_bin with knowledge that minx will always be zero

@numba.jit(nopython=True)
def numba_nan_fill(x):
    shape = x.shape
    x = x.ravel()
    x[np.isnan(x)] = 0.0
    x = x.reshape(shape)
    return x

@numba.jit(nopython=True)
def numba_inf_fill(x):
    shape = x.shape
    x = x.ravel()
    x[np.isinf(x)] = 0.0
    x = x.reshape(shape)
    return x

@numba.jit(nopython=True, fastmath=True)
def compute_bin_upperbound(x, max, num_bins):
    """ Compute bin index for a give number.
        Assume that min is always zero
    """
    # special case to mirror NumPy behavior for last bin
    if x == max:
        return num_bins - 1 # a_max always in last bin

    bin = int(num_bins * x / max)

    if bin >= num_bins:
        return None
    else:
        return bin

@numba.jit(nopython=True, fastmath=True)
def numba_histogram2d_csr(arr1, cols1, arr2, cols2, ncols, num_bins):
    """ Compute the bi-dimensional histogram of two data samples.
    Args:
        arr1 (array_like, shape (N,)): An array containing the x coordinates of the points to be histogrammed.
        arr2 (array_like, shape (N,)): An array containing the y coordinates of the points to be histogrammed.
        num_bins (int): int
    Return:
        hist (2D ndarray)
    """
    #for csr arrays we have to compute zero bins ahead of time 
        
    bin_indices1 = np.zeros((ncols,), dtype=np.int16)
    max1 = arr1.max()
    #note that bin_indices has same size/indices as full array x and y
    for i, x in enumerate(arr1.flat):
        #assume zero min
        bin_indices1[cols1[i]] = compute_bin_upperbound(x, max1, num_bins)

    bin_indices2 = np.zeros((ncols,), dtype=np.int16)
    max2 = arr2.max()
    for i, y in enumerate(arr2.flat):
        #assume zero min
        bin_indices2[cols2[i]] = compute_bin_upperbound(y, max2, num_bins)

    hist = np.zeros((num_bins, num_bins), dtype=np.int16)
    for i, b in enumerate(bin_indices1):
        hist[b, bin_indices2[i]] += 1
        
    return hist

In [10]:
%%px
#import numba
@numba.jit(nopython=True, fastmath=True)
def numba_calc_mi_dis_csr(arr1, cols1, arr2, cols2, bins, m):
    """ Calculates a mutual information distance D(X, Y) = H(X, Y) - I(X, Y) using bin-based method

    It takes gene expression data from single cells, and compares them using standard calculation for
    mutual information and joint entropy. It builds a 2d histogram, which is used to calculate P(arr1, arr2).

    Args:
        arr1 (float) nparray of csr: gene expression data for cell 1
        cols1 (int): column indices for csr arr1
        arr2 (float) nparray of csr: gene expression data for cell 2
        cols2 (int): column indices for csr arr
        bins           (int): number of bins
        m              (int): number of genes
    Returns:
        a float between 0 and 1
    """
    hist = numba_histogram2d_csr(arr1, cols1, arr2, cols2, m, bins)
    
    sm = np.sum(hist, axis=1)
    tm = np.sum(hist, axis=0)
    sm = sm / float(sm.sum())
    tm = tm / float(tm.sum())

    sm_tm = np.zeros((bins, bins), dtype=np.float32)
    for i, s in enumerate(sm):
        for j, t in enumerate(tm):
            sm_tm[i, j] = s * t

    fq = hist / float(m)
    div = np.true_divide(fq, sm_tm)
    numba_nan_fill(div)
    ent = np.log(div)
    numba_inf_fill(ent)
    agg = np.multiply(fq, ent)
    joint_ent = -np.multiply(fq, numba_inf_fill(np.log(fq))).sum()
    return joint_ent - agg.sum()

In [11]:
%%px
#numba compilation cannot interpret the creation of a 2d array inside of this function so we pass in and out SM_block instead of returning it
#import numba
@numba.jit(nopython=True, fastmath=True)
def process_matrices(Arows,Amat_data,Amat_indptr,Amat_indices,
                     Brows,Bmat_data,Bmat_indptr,Bmat_indices,
                     num_bins,num_genes,
                     SM_block, symmetric=False):
    #(mat1.n_obs,mat1.X.data,mat1.X.indptr,mat1.X.indices, mat2.n_obs,mat2.X.data,mat2.X.indptr,mat2.X.indices,SM,num_bins,mat2.n_vars,symmetric)

    #Arows = mat1.n_obs
    #Brows = mat2.n_obs
    #num_genes = mat1.n_vars
    
    #SM_block = np.ndarray(shape=(Arows,Brows))#, dtype=float, order='F')
    #SM_block = SM_block.reshape(Arows,Brows)
    for i in range(Arows):
        Arowstart = Amat_indptr[i]
        Arowend   = Amat_indptr[i+1]
        Arow_cols = Amat_indices[Arowstart:Arowend]
        Arow_data = Amat_data[Arowstart:Arowend]

        Bstart=0
        Bend=Brows
        #if(symmetric):Bstart=i #upper triangluar
        if(symmetric):Bend=i+1 #lower triangular
        for j in range(Bstart,Bend): 
            Browstart = Bmat_indptr[j]
            Browend   = Bmat_indptr[j+1]
            Brow_cols = Bmat_indices[Browstart:Browend]
            Brow_data = Bmat_data[Browstart:Browend]               
            SM_block[i,j] = numba_calc_mi_dis_csr(Arow_data, Arow_cols, Brow_data, Brow_cols, num_bins, num_genes)
            #SM_block[i*Arows+j] = numba_calc_mi_dis_csr(Arow_data, Arow_cols, Brow_data, Brow_cols, num_bins, num_genes)

            
    return #SM_block


In [12]:
%%px
#from mpi4py import MPI
from scipy.sparse import csr_matrix
#import sys
import pandas as pd

#create a 2d list to hold blocks of similarity matrix
#this should be a global var
#SM = [[None for j in range(b)] for i in range(b)] 

def calc_distance_metric_distributed(in_file_path, project_name, nrows, ncols, nslices, SM):
    
    """ Prepares the already sliced input file for further calculation in MICA.
    Enters pairs of slices (matrices) into temporary HDF5-format files. It enters them
    individually, using their unique key. It also enters the parameter data for every single 
    pair into the key "params", which consists of: [key1, key2, num_bins, num_genes,
    pair_index, project_name, num_slices]
    Args:
        in_file      (str): path to sliced HDF5-format file
        project_name (str): project name used to generate path for final outputs
        nrows : number of rows in global matrix
        ncols : number of vars in global matrix
    """

    
    #create a 2d list to hold blocks of similarity matrix
    #this should be a global var
    #SM = [[None for j in range(b)] for i in range(b)] 
    
    
    #input file is full input that has been segmented into b blocks of rows
    
    #nranks would ideally be equal to  b(b+1)/2
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    myrank = comm.Get_rank()
    name = MPI.Get_processor_name()
    #sys.stdout.write("Hello, World! I am process %d of %d on %s.\n" % (myrank, size, name))
    
    #in_ = pd.HDFStore(in_file, "r")  # slice.h5
    #b = int(np.ceil(float(nrows) / float(slice_size)))
    digit = int(np.floor(np.log10(nslices)) + 1)

    num_bins = int(np.floor(nrows ** (1 / 3.0)))  # number of bins
    #print("bins= ",bins)
    #b = in_["attr"].loc["slice", 0]  # number of sliced matrix
    b = nslices #number of row blocks (cells)
    m = ncols  # number of cols per row (genes)

    n_block_comparisons = int((b * (b + 1)) / 2)  # total number of row block comparisons needed to compute entire global similarity matrix
    n_jobs_per_rank = n_block_comparisons/size
    if (myrank == 0): print("block comparsons = %d. jobs per rank = %d\n" % (n_block_comparisons, n_jobs_per_rank))

    #build list of row block comparisons that current mpi rank will process
    myslices=[]
    for i in range(b):
        for j in range(i,b): # j in range [i,b]
            idx = int(i * b + j - (i * (i + 1)) / 2)
            targetrank = idx//n_jobs_per_rank
            if (targetrank == myrank): myslices.append((i,j))            

    #from list just generated, only do work assigned to your rank
    for index, tuple in enumerate(myslices):
        #print("tuple=",tuple)
        i=tuple[0] #block row
        j=tuple[1] #block col      

        #get 1st slice (row block) file
        slice_name = str(i).zfill(digit)
        ##ceb we only want to read this once per i,j combination
        input_file = in_file_path + project_name + ".slice_" + slice_name +".h5ad"
        #print("infile seg1: ",input_file)
        mat1 = utils.read_anndata_file(input_file)
    

        #get 2nd slice (row block) file
        slice_name = str(j).zfill(digit)
        input_file = in_file_path + project_name + ".slice_" + slice_name +".h5ad"
        #print("infile seg2: ",input_file)
        mat2 = utils.read_anndata_file(input_file)    

        #check to see if block comparison will result in a symmetric SM matrix
        # so that we can reduce the number of computations in half
        symmetric=False
        if i==j: symmetric=True
        
        print("rank: ",myrank," comparison between segs:",i," x ",j," symmetric=",symmetric)
            
        #compute distance metrics between row blocks
        
        Arows = mat1.n_obs
        Brows = mat2.n_obs
        num_genes = mat1.n_vars #we will assume Acols==Bcols==num_genes
        
        #need SM for each block pair    
        #creates local array of zeros and assigns to global 2d list
        #create matrix of zeros with row order indexing
        SM[i][j] = np.zeros(shape=(Arows,Brows), dtype = float, order = 'C')
        #create a 1d array for easy transfer via mpi
        #SM[i][j] = np.zeros(shape=(Arows*Brows), dtype = float, order = 'C')

 
        #This numba function cannot create a numpy array internally so we return SM[i,j] as a variable
        process_matrices(mat1.n_obs,mat1.X.data,mat1.X.indptr,mat1.X.indices, 
                         mat2.n_obs,mat2.X.data,mat2.X.indptr,mat2.X.indices,
                         num_bins, num_genes, SM[i][j],
                         symmetric #if i==j we can eliminate half of computations
                        )

        #convert to csr
        #we may want to assign this to scalapack distributed matrix here
        #SM[i][j]=csr_matrix(SM[i][j])

    return

In [13]:
%%px
import os
cwd=os.getcwd()
if rank==0:
    print(cwd)
    
data_file_path = cwd+'/test_data/inputs/10x/PBMC/3k/pre-processed/'
input_file_name = data_file_path + 'pbmc3k_preprocessed.h5ad'
project_name = 'pbmc3k'
output_file_name = data_file_path+project_name



[stdout:0] /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA


In [14]:
%%px
#set slice size (max size of row blocks)
slice_size = 500

In [15]:
%%px
if rank==0:
    print (input_file_name)

[stdout:0] /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad


In [16]:
%%px
#Run prep.py only on one processor to create the slice files

g_nrows=0 #global number of rows (cells)
ncols=0
nslices=0
if rank==0: 
    g_nrows, ncols, nslices = prep_dist(input_file_name, output_file_name, slice_size)

# this uses the iparallel dview object to distribute these variables from the notebook thread to all mpi ranks
# but the ranks can't communicate back to the notebook thread
#dview.push(dict(nrows=nrows, ncols=ncols, nslices=nslices))

#broadcast resultant variables from root to the other ranks
g_nrows = comm.bcast(g_nrows, root=0)
ncols = comm.bcast(ncols, root=0)
nslices = comm.bcast(nslices, root=0)

[stdout:0] 
initial data size= (2496, 10499)
output_file_name:  /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k.slice_0.h5ad
output_file_name:  /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k.slice_1.h5ad
output_file_name:  /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k.slice_2.h5ad
output_file_name:  /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k.slice_3.h5ad
output_file_name:  /research/rgs01/home/clusterHome/cburdysh/MICA_Project/MICA_distributed/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k.slice_4.h5ad


[stderr:0] 
  if not is_categorical(df_full[k]):
  if is_string_dtype(df[key]) and not is_categorical(df[key])


In [17]:
%%px
if rank==0:
    print("global nrows, ncols, slices: ",g_nrows, ncols, nslices)

[stdout:0] global nrows, ncols, slices:  2496 10499 5


Read in anndata preprocessed files (in distributed mode, by node number)

In [18]:
%%px
#create a 2d list to hold blocks of similarity matrix
#this should be stored in a distributed scalapack matrix
b=nslices #row blocks
SM = [[None for j in range(b)] for i in range(b)] 

start = time.time()
calc_distance_metric_distributed(data_file_path, project_name, g_nrows, ncols, nslices, SM)
end = time.time()
print("Elapsed = %s" % (end - start))


[stdout:0] 
block comparsons = 15. jobs per rank = 3

rank:  0  comparison between segs: 0  x  0  symmetric= True
rank:  0  comparison between segs: 0  x  1  symmetric= False
rank:  0  comparison between segs: 0  x  2  symmetric= False
rank:  0  comparison between segs: 0  x  3  symmetric= False
Elapsed = 35.57201290130615
[stdout:1] 
rank:  1  comparison between segs: 0  x  4  symmetric= False
rank:  1  comparison between segs: 1  x  1  symmetric= True
rank:  1  comparison between segs: 1  x  2  symmetric= False
rank:  1  comparison between segs: 1  x  3  symmetric= False
Elapsed = 36.69508695602417
[stdout:2] 
rank:  2  comparison between segs: 1  x  4  symmetric= False
rank:  2  comparison between segs: 2  x  2  symmetric= True
rank:  2  comparison between segs: 2  x  3  symmetric= False
rank:  2  comparison between segs: 2  x  4  symmetric= False
Elapsed = 36.95906114578247
[stdout:3] 
rank:  3  comparison between segs: 3  x  3  symmetric= True
rank:  3  comparison between segs: 3 

In [19]:
%%px
#Note diagonal is small. Should we expect diagonal to be unity since MI of X,X^T should be 1?
if rank==0:
    print( np.shape(SM[0][0]) )
    print( (SM[0][0]))



[stdout:0] 
(500, 500)
[[-5.15595322e-09  0.00000000e+00  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [ 6.57519216e-01  1.31353685e-08  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [ 6.16255376e-01  7.22905476e-01 -1.01818249e-08 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 ...
 [ 6.00543187e-01  6.95598780e-01  6.67865236e-01 ...  1.78994189e-08
   0.00000000e+00  0.00000000e+00]
 [ 5.21585065e-01  6.27581208e-01  5.83600189e-01 ...  5.78146991e-01
  -1.53148856e-08  0.00000000e+00]
 [ 5.47296983e-01  6.46476002e-01  6.09194393e-01 ...  6.03402223e-01
   5.24108924e-01  1.59023507e-08]]


## Create distributed matrix for scalapack and copy distributed blocks into object

In [20]:
%%px 
from scipy.sparse import csr_matrix #may not use csr as it complicates copy to distributed scalapack and is not used in scalapack apparently
import collections
for i in range(b):
    for j in range(i,b): # j in range [i,b]
        if isinstance(SM[i][j], collections.Iterable):
            #lm=SM[i][j]#.ravel()
            #print(SM[i][j])
            #B=csr_matrix(SM[0][0]) 
            #print("Rank:",rank, " SM[",i,"][",j,"] nnz=",SM[i][j].getnnz)
            #print("Rank:",rank, " SM[",i,"][",j,"] nnz=",SM[i][j])
            print("Rank:",rank, " SM[",i,"][",j,"] size=",np.shape(SM[i][j]))



[stdout:0] 
Rank: 0  SM[ 0 ][ 0 ] size= (500, 500)
Rank: 0  SM[ 0 ][ 1 ] size= (500, 500)
Rank: 0  SM[ 0 ][ 2 ] size= (500, 500)
Rank: 0  SM[ 0 ][ 3 ] size= (500, 500)
[stdout:1] 
Rank: 1  SM[ 0 ][ 4 ] size= (500, 496)
Rank: 1  SM[ 1 ][ 1 ] size= (500, 500)
Rank: 1  SM[ 1 ][ 2 ] size= (500, 500)
Rank: 1  SM[ 1 ][ 3 ] size= (500, 500)
[stdout:2] 
Rank: 2  SM[ 1 ][ 4 ] size= (500, 496)
Rank: 2  SM[ 2 ][ 2 ] size= (500, 500)
Rank: 2  SM[ 2 ][ 3 ] size= (500, 500)
Rank: 2  SM[ 2 ][ 4 ] size= (500, 496)
[stdout:3] 
Rank: 3  SM[ 3 ][ 3 ] size= (500, 500)
Rank: 3  SM[ 3 ][ 4 ] size= (500, 496)
Rank: 3  SM[ 4 ][ 4 ] size= (496, 496)


## Populate a global array with all of the MI data from each rank

Preferably, we would like each rank to contribute of their block MI matrices to the global matrix,
but currently the distributed global matrix has to be constructed from a global (not distributed) array

First, each rank loads it's matrix contribution into a global block matrix, \
And then we use MPI to transfer these contributions to rank 0 from all other ranks.

In [21]:
#We may be able to have each rank copy their data directly to the distributed global matrix (that we have yet to initialize) 
#so that we can avoid having to agglomerate matrix data on a single rank
#dA = dA.np2self(local_a, srow, scol, rank=0)

In [22]:
%%px
#copy SM[i][j] from each rank to root rank 0
n_jobs_per_rank=(int((b * (b + 1)) / 2))/comm.Get_size()
import collections
for i in range(b):
    for j in range(i,b): # j in range [i,b]
        #if SM[i][j]!=None:
        #if SM[i][j]!=0:
        idx = int(i * b + j - (i * (i + 1)) / 2)
        srank = idx//n_jobs_per_rank
        if rank!=0 and isinstance(SM[i][j], collections.Iterable):
            comm.send(SM[i][j],dest=0,tag=idx)
        elif rank==0 and not isinstance(SM[i][j], collections.Iterable):
            SM[i][j]=comm.recv( source=srank, tag=idx)
            

In [23]:
%%px
#Check to see of data was transferred to rank 0
if rank==0:
    print(np.shape(SM[4][4]))
    print(SM[4][4])
    

[stdout:0] 
(496, 496)
[[-1.08149925e-08  0.00000000e+00  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [ 6.18987683e-01 -1.94897120e-08  0.00000000e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [ 4.56650333e-01  4.91898546e-01 -2.23886729e-09 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 ...
 [ 5.30388228e-01  5.69486206e-01  3.70204082e-01 ...  1.50523602e-08
   0.00000000e+00  0.00000000e+00]
 [ 4.89730330e-01  5.22489321e-01  3.17985325e-01 ...  3.97571517e-01
  -2.86551806e-08  0.00000000e+00]
 [ 5.45617907e-01  5.72776800e-01  4.09006844e-01 ...  4.85972394e-01
   4.41741719e-01  2.10293315e-08]]


In [24]:
%%px
#For examples: https://www.programmersought.com/article/9237705297/
#git clone https://github.com/jrs65/scalapy.git
#import scalapy
#from scalapy import blacs
from scalapy import *

In [None]:
#%%px
#from inspect import getmembers, isfunction, ismodule
#if rank == 0:
#    print([o[0] for o in getmembers(scalapy) if ismodule(o[1])])

In [None]:
#%%px
#from inspect import getmembers, isfunction, ismodule
#import scalapy
#if rank==0:
#    print(getmembers(scalapy.blacs, isfunction))
#    #print(getmembers(scalapy.routines, isfunction))

In [25]:
%%px
#total number of global blocks = total number of block comparisons
#Should be greater than number of ranks to improve load balancing
# b is number of slices (blocks of rows) original data has been discretized into.
#The size of these blocks can be variable
global_number_of_matrix_blocks= int((b * (b + 1)) / 2) 

#We'll use a 2d process grid to distribute blocks so we want to have num_ranks divisivle by 2

In [None]:
#bcast SM[i][j] from each rank to root rank 0 so that we can load global matrix array


## Convert array into distributed block cyclic global matrix
Simply load a numpy vector with data from each block matrix to use in scalapak distributed matrix initialization

In [26]:
%%px
import collections

#create global 2d array to store MI info which will be sent to create block cyclic distributed matrix
#I would like to know how to create this matrix without having to have the memory of a global array on each rank
global_MI=np.zeros([g_nrows,g_nrows])

blocksize=slice_size
if rank==0:
    #load rank data into global matrix
    for i in range(b):
        for j in range(i,b): # j in range [i,b]
            #if SM[i][j]!=None:
            if isinstance(SM[i][j], collections.Iterable):
                lA = SM[i][j]
                #print( "rank=",rank,"SM[",i,",",j," shape=",np.shape(lA) )
                lr=np.shape(lA)[0]
                lc=np.shape(lA)[1]
                for ii in range(lr):
                    for jj in range(lc):
                        global_MI[i*blocksize+ii, j*blocksize+jj] = lA[ii,jj]


In [27]:
%%px
if rank==0:
    print(global_MI[0,0])
    print(global_MI[2495,2495])

[stdout:0] 
-5.155953219926346e-09
2.1029331498390036e-08


In [28]:
%%px
from scalapy import blacs
import os
import numpy as np
import scipy.linalg as la
from mpi4py import MPI
from scalapy import core
import scalapy.routines as rt

#distribute MI components to ranks as scalapack distributed matrix
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size #total number of ranks

global_num_rows =g_nrows
global_num_cols =g_nrows
local_num_rows =g_nrows/b

block_size=64 #default is 32
#Define process grid with process rows and process cols
#We'll use a 2d process grid to distribute blocks so we want to have num_ranks divisible by 2
assert((size % 2)==0)
#ideally we would like BR and BC to the square root of the num_ranks to get a square process matrix
PR=int(np.sqrt(size))
PC=PR
#if we can't create a square matrix, get next best dimensions
if PR*PR!=size:
    PC=size//PR
if rank==0:
    print("PR=",PR, "PC=",PC)

#create mpi comm context to send to blacs communicator    
#process context
cntxt = blacs.sys2blacs_handle(comm)

#initialize a process grid to distribute matrix blocks to.
#blacs.gridinit(ctxt,order='R',BR,BC)

#blacs.gridinit(pc,order='R',BR,BC)
#core.initmpi([PR, PC], block_shape=[local_num_rows,local_num_rows])
core.initmpi([PR, PC],block_shape=[block_size,block_size])

#instead of creating a global array on every processor, we want to load a 
#global array with local data
#MI=core.DistributedMatrix(global_shape=[global_num_rows,global_num_cols])
#global_MI=np.zeros([g_nrows,g_nrows])
#dMI=core.DistributedMatrix.from_global_array(global_MI,rank=0)

#convert to fortran array indexing to match scalapack functions
global_MI=np.asfortranarray(global_MI)
dMI=core.DistributedMatrix.from_global_array(global_MI)


print ('rank %d has global_shape of dMI = %s' % (rank, dMI.global_shape))
print ('rank %d has local_shape of dMI = %s' % (rank, dMI.local_shape))
print ('rank %d has block_shape of dMI = %s' % (rank, dMI.block_shape))


#blacs.gridinit(ctxt, b, b)
#ranklist = [(0, 0), 
#            (0, 1), (1, 1),
#            (0, 2), (1, 2), (2, 2),
#            (0, 3), (1, 3), (2, 3), (3, 3),
#            (0, 4), (1, 4), (2, 4), (3, 4), (4, 4)
#           ]

[stdout:0] 
PR= 2 PC= 2
rank 0 has global_shape of dMI = (2496, 2496)
rank 0 has local_shape of dMI = (1280, 1280)
rank 0 has block_shape of dMI = (64, 64)
[stdout:1] 
rank 1 has global_shape of dMI = (2496, 2496)
rank 1 has local_shape of dMI = (1280, 1216)
rank 1 has block_shape of dMI = (64, 64)
[stdout:2] 
rank 2 has global_shape of dMI = (2496, 2496)
rank 2 has local_shape of dMI = (1216, 1280)
rank 2 has block_shape of dMI = (64, 64)
[stdout:3] 
rank 3 has global_shape of dMI = (2496, 2496)
rank 3 has local_shape of dMI = (1216, 1216)
rank 3 has block_shape of dMI = (64, 64)


In [None]:
#We want to preserve this result by writing to file using MPI_IO
#mpi_writematrix(fname, local_array, comm, gshape, dtype,
#                    blocksize, process_grid, order='F', displacement=0):

## Compute Frobenius norm of distributed matrix

In [34]:
%%px
from scalapy import lowlevel as ll
#norm mutualinfomation matrix

#"""Normalizes mutual information metric in the merged matrix
#    
#    Args:
#        in_mat_file   (str): path to merged matrix
#        out_file_name (str): name of output file
#    """
#    #hdf = pd.HDFStore(in_mat_file)
#    #df = hdf["mi"]
#    #diag = np.asmatrix(np.diag(df))
#    #if in_mat_file == out_file_name + "_dist.h5":
#    #    hdf.put("norm_mi", df / np.sqrt(np.matmul(diag.T, diag)))
#        
#    #get diag
    
#    #compute dot product of diag
#A2 = scalapy.dot(D, D, transA='T')    
    #divide MI matrix by dot product of diag
    
#Some functions like plansy are fortran functions and not subroutines. Currently only subroutines are parsed by the scalapy conversion scripts
#So we might have to compute things like Norm manually until we create a new parser to handle conversion of fortran functions

#PDLANGE( 'F', M, N, A, IA, JA, DESCA, WORK ) #compute frobenius norm
#PCLANSY( 'F', UPLO, N, A, IA, JA,DESCA, WORK ) #compute frobenius norm on triangular matrix
#info = ll.pslansy
if rank==0:
    print(dir(ll))
    
 

[stdout:0] ['WorkArray', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '_call_routine', '_doc_pblas', '_doc_redist', '_doc_scl', '_encode_strings', '_expand_dm', '_expand_work', '_mod_dict', '_pblas', '_redist', '_scl', '_wrap_routine', 'bdlaexc', 'bdtrexc', 'bslaexc', 'bstrexc', 'cdbtf2', 'cdbtrf', 'cdttrf', 'cdttrsv', 'clahqr2', 'clamsh', 'clanv2', 'claref', 'core', 'cpttrsv', 'csteqr2', 'ddbtf2', 'ddbtrf', 'ddttrf', 'ddttrsv', 'division', 'dlamsh', 'dlapst', 'dlar1va', 'dlaref', 'dlarrd2', 'dlarre2', 'dlarre2a', 'dlarrf2', 'dlarrv2', 'dlasorte', 'dlasrt2', 'dpttrsv', 'dstegr2', 'dstegr2a', 'dstein2', 'dsteqr2', 'expand_args', 'np', 'pblas', 'pcagemv', 'pcahemv', 'pcamax', 'pcatrmv', 'pcaxpy', 'pccopy', 'pcdbsv', 'pcdbtrf', 'pcdbtrs', 'pcdbtrsv', 'pcdotc', 'pcdotu', 'pcdtsv', 'pcdttrf', 'pcdttrs', 'pcdttrsv', 'pcgbsv', 'pcgbtrf', 'pcgbtrs', 'pcgeadd', 'pcgebd2', 'pcgebrd', 'pcgecon', 'pcgeequ', 'pcgehd2', 'pcgehr

In [None]:
#compute eigenvalues,vectors
#evals1, evecs1 = scalapy.eigh(dMI, overwrite_a=False)

In [None]:
%%px
##test blacs
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
from scalapy import blacs
ctxt = blacs.sys2blacs_handle(comm)
blacs.gridinit(ctxt, 2, 2)
ranklist = [(0, 0), (0, 1), (1, 0), (1, 1)]
gi = blacs.gridinfo(ctxt)
gshape = gi[:2]
gpos = gi[2:]
assert gshape == (2, 2)
assert gpos == ranklist[rank]

In [None]:
%%px
##test scalapay on distributed matrix
 
import os
import numpy as np
import scipy.linalg as la
from mpi4py import MPI
from scalapy import core
import scalapy.routines as rt
 
 
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
 
if size != 4:
    raise Exception("Must run with 4 processes")
 
# define a function to compare whether two arrays are equal
allclose = lambda a, b: np.allclose(a, b, rtol=1e-4, atol=1e-6)
 
# initialize a global ProcessContext object,
# which includes the initialization of a 2 x 2 process grid
core.initmpi([2, 2], block_shape=[16, 16])

 
N = 300
# create a N x N numpy array with random numbers
gA = np.random.standard_normal((N, N)).astype(np.float64)
gA = np.asfortranarray(gA)
# create a DistributedMatrix from gA
dA = core.DistributedMatrix.from_global_array(gA, rank=0)
print ('rank %d has global_shape of dA = %s' % (rank, dA.global_shape))
print ('rank %d has local_shape of dA = %s' % (rank, dA.local_shape))
print ('rank %d has block_shape of dA = %s' % (rank, dA.block_shape))
 
# compute the inverse of dA
invA, ipiv = rt.inv(dA)
# convert to a global numpy array hold by rank 0 only
ginvA = invA.to_global_array(rank=0)
 
if rank == 0:
    # compare the result with that of scipy.linalg.inv
    print ('result equals that of scipy: ', allclose(ginvA, la.inv(gA)))
 
# write dA to file
file_name = 'dA.dat'
dA.to_file(file_name)
# now read it from file and check it equals the original DistributedMatrix
dA1 = core.DistributedMatrix.from_file(file_name, dA.global_shape, dA.dtype, dA.block_shape, dA.context)
print ('rank %d has dA.local_array == dA1.local_array: %s' % (rank, allclose(dA.local_array, dA1.local_array)))
 
# remove the file
if rank == 0:
    os.remove(file_name)

## Write distributed matrix to files

In [None]:
%%px 
from scipy.sparse import csr_matrix
if comm.Get_rank() == 0:
    SM_csr=csr_matrix(SM[0][0],(1000,1000),shape=(1000, 1000))


In [None]:
%%px
#show that diagonal blocks are symmetric
if comm.Get_rank() == 0:
    i=1
    j=100
    print((SM[0][0])[i,j])
    print((SM[0][0])[j,i])

In [None]:
#adata_sub1=adata[0:1000,:]
#adata_sub2=adata[1000:2000,:]
#adata_sub3=adata[2000:,:]

#print(adata_sub1.shape)
#print(adata_sub2.shape)
#print(adata_sub3.shape)

In [None]:
#We can split the anndata object by rows in the following way:
#print(adata.X.indptr)
#print(adata_sub1.X.indptr)
#print(adata_sub2.X.indptr)
#print(adata_sub3.X.indptr)

In [None]:
#Convert to dataframe for comparison
aframe = adata.to_df()
bframe = bdata.to_df()
aframe.shape

In [None]:
#print(frame.iloc[2000,:])
#(frame == 0).astype(int).sum(axis=1)
#print(frame[frame == 0].count(axis=1)/len(frame.columns))

In [None]:
#frame.to_csv('../../test_data/outputs/kgraph/pmbc3k.csv')

In [None]:
def calc_distance_mat_dist(mat1, mat2, paras, method):
    """ Calculates a distance metric in between two matrices (slices)

    Calculates a distance metric using the preferred method of comparison. Iterates over each cell's
    gene expression data and populates a new matrix with rows and columns as cells from the input
    matrices. The resulting matrix is then converted to an HDF5-format file.

    Args:
        mat1  (anndata dataframe): a sliced part of the original matrix, with some fraction of the
                                  total cells as rows from original file and all gene expression
                                  attributes as columns
        mat2  (anndata dataframe): similar to mat1
        paras (anndata dataframe): a dataframe that holds an array of parameters from the whole dataset
        method             (str): the method to be used for the distance calculation (
                                        mutual information: "mi")
    """

    bins = int(paras.loc["num_bins", 0])
    m = int(paras.loc["n_genes", 0])
    key = paras.loc["MI_indx", 0]

    project_name = paras.loc["project_name", 0]
    out_file_name = project_name + "_" + key + ".h5"
    print(out_file_name)

    if method == "mi":

        df = pd.DataFrame(data=0, index=mat1.index, columns=mat2.index) 
        start = time.time()
        
        for c in mat2.index:
            df.loc[mat1.index, c] = mat1.apply(
                calc_mi, axis=1, args=(mat2.loc[c, :], bins, m)
            )
            
        end = time.time()

    else:
        sys.exit("Distance Metrics not supported!\n")

        
    df.to_hdf(out_file_name, str(key))  # key: MI_indx
    paras.to_hdf(out_file_name, "params")


In [None]:
    #tmp hardcode input file
    inputfileA = '../../test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad'
    inputfileB = '../../test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad'

    #For given rank id, read in A and B matrix slices 
    matA = anndata.read_h5ad(inputfileA)
    matB = anndata.read_h5ad(inputfileB)
    metrics = metric.lower()
    params = matA["params"]

In [None]:
#reproducing standalone calc_scatter function here to experiment with distributed processing
def calc_scatter(input_file, metric='mi'):
    """ Calls calc_distance_mat utility function and calculates a metric in between cells that is chosen by the user

    Args:
        input_file (str): path to input HDF5-format file
        metric     (str): metric for calculation (mutual info, euclidean dist, pearson or spearman correlations
    """

    #mat = pd.HDFStore(input_file)
    #metrics = metric.lower()
    #params = mat["params"]
    #mat1 = mat[params.loc["key1", 0]]
    #mat2 = mat[params.loc["key2", 0]]
    #mat.close()
    
    #ceb set up an MPI rank list and coordinator to read in appropriate matrix slice files, 
    # compute distance matrix blocks and store results in distributed distance matrix

    #tmp hardcode input file
    inputfileA = '../../test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad'
    inputfileB = '../../test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad'

    #For given rank id, read in A and B matrix slices 
    matA = anndata.read_h5ad(inputfileA)
    matB = anndata.read_h5ad(inputfileB)
    metrics = metric.lower()
    params = matA["params"]

    #utils.calc_distance_mat(matA, matB, params, method=metrics)
    calc_distance_mat_distributed(matA, matB, params, method=metrics)

In [None]:
inputfile = '../../test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad'
calc_scatter(inputfile,'mi')

In [None]:
#%memit
start = time.time()
embedding = PCA(n_components=100)
frame_dr = embedding.fit_transform(frame)
frame_dr.shape
end = time.time()
runtime = end - start
msg = "The runtime for PCA took {} seconds to complete".format(runtime)
logging.info(msg)



In [None]:
%load_ext memory_profiler

In [None]:
def calc_mi_f(arr1, arr2, bins, m):
    """ Calculates mutual information in between two cells, considering their gene expression levels

    This function is called by calc_distance_mat. It takes gene expression data from single cells,
    and compares them using standard calculation for mutual information. It builds a 2d histogram,
    which is used to calculate P(arr1, arr2)

    Args:
        arr1 (pandas series): gene expression data for a given cell in matrix_1
        arr2 (pandas series):
        bins           (int):
        m              (int):
    """
    fq = fast_histogram.histogram2d(arr1, arr2, range=[[arr1.min(), arr1.max()+1e-9], [arr2.min(), arr2.max()+1e-9]],
                                    bins=(bins, bins)) / float(m)
    sm = np.sum(fq * float(m), axis=1)
    tm = np.sum(fq * float(m), axis=0)
    sm = np.asmatrix(sm / float(sm.sum()))
    tm = np.asmatrix(tm / float(tm.sum()))
    sm_tm = np.matmul(np.transpose(sm), tm)
    div = np.divide(fq, sm_tm, where=sm_tm != 0, out=np.zeros_like(fq))
    ent = np.log(div, where=div != 0, out=np.zeros_like(div))
    agg = np.multiply(fq, ent, out=np.zeros_like(fq), where=fq != 0)
    return agg.sum()

In [None]:
num_bins = int((frame_dr.shape[0]) ** (1 / 3.0))
num_genes = frame_dr.shape[1]

In [None]:
%timeit calc_mi_f(frame_dr[0], frame_dr[1], num_bins, num_genes)

In [None]:
arr = frame_dr[0]
fast_histogram.histogram1d(arr, bins=num_bins, range=[arr.min(), arr.max()+1e-9]) / num_genes


In [None]:
num_cells = frame_dr.shape[0]
marginals = np.empty((num_cells, num_bins))
for index, cell in enumerate(frame_dr):
    ht1d = fast_histogram.histogram1d(cell, bins=num_bins, range=[cell.min(), cell.max()+1e-9]) / num_genes
    marginals[index] = ht1d
print(marginals[0])
print(marginals[1])
np.transpose(np.asmatrix(marginals[0]))

In [None]:
def calc_mi_f2(arr1, arr2, marginals, index1, index2, bins, m):
    """ Calculates mutual information in between two cells, considering their gene expression levels

    This function is called by calc_distance_mat. It takes gene expression data from single cells,
    and compares them using standard calculation for mutual information. It builds a 2d histogram,
    which is used to calculate P(arr1, arr2)

    Args:
        arr1 (pandas series): gene expression data for a given cell in matrix_1
        arr2 (pandas series):
        bins           (int):
        m              (int):
    """
    fq = fast_histogram.histogram2d(arr1, arr2, range=[[arr1.min(), arr1.max()+1e-9], [arr2.min(), arr2.max()+1e-9]],
                                    bins=(bins, bins)) / float(m)
    sm_tm = np.matmul(np.transpose(np.asmatrix(marginals[index1])), np.asmatrix(marginals[index2]))
    div = np.divide(fq, sm_tm, where=sm_tm != 0, out=np.zeros_like(fq))
    ent = np.log(div, where=div != 0, out=np.zeros_like(div))
    agg = np.multiply(fq, ent, out=np.zeros_like(fq), where=fq != 0)
    return agg.sum()


In [None]:
#%timeit
calc_mi_f2(frame_dr[0], frame_dr[1], marginals, 0, 1, num_bins, num_genes)

In [None]:
def calc_marginals(frame_dr, num_bins, num_genes):
    num_cells = frame_dr.shape[0]
    marginals = np.empty((num_cells, num_bins))
    for index, cell in enumerate(frame_dr):
        ht1d = fast_histogram.histogram1d(cell, bins=num_bins, range=[cell.min(), cell.max() + 1e-9]) / num_genes
        marginals[index] = ht1d
    np.transpose(np.asmatrix(marginals[0]))
    return marginals

In [None]:
def calc_norm_mi_marginal(arr1, arr2, marginals, index1, index2, bins, m):
    """ Calculates a normalized mutual information distance in between two cells

    It takes gene expression data from single cells, and compares them using standard calculation for
    mutual information. It builds a 2d histogram, which is used to calculate P(arr1, arr2)

    Args:
        arr1 (pandas series): gene expression data for a given cell in matrix_1
        arr2 (pandas series):
        bins           (int):
        m              (int):
    """
    fq = fast_histogram.histogram2d(arr1, arr2, range=[[arr1.min(), arr1.max()+1e-9], [arr2.min(), arr2.max()+1e-9]],
                                    bins=(bins, bins)) / float(m)
    sm_tm = np.matmul(np.transpose(np.asmatrix(marginals[index1])), np.asmatrix(marginals[index2]))
    div = np.divide(fq, sm_tm, where=sm_tm != 0, out=np.zeros_like(fq))
    ent = np.log(div, where=div != 0, out=np.zeros_like(div))
    agg = np.multiply(fq, ent, out=np.zeros_like(fq), where=fq != 0)
    joint_ent = -np.multiply(fq, np.log(fq, where=fq != 0, out=np.zeros_like(fq)),
                             out=np.zeros_like(fq), where=fq != 0).sum()
    return (joint_ent - agg.sum()) / joint_ent

In [None]:
# %timeit
calc_norm_mi_marginal(frame_dr[0], frame_dr[1], marginals, 0, 1, num_bins, num_genes)

In [None]:
calc_norm_mi_marginal(frame_dr[0], frame_dr[2], marginals, 0, 2, num_bins, num_genes)



In [None]:
def calc_norm_mi(arr1, arr2, bins, m):
    """ Calculates a normalized mutual information distance in between two cells

    It takes gene expression data from single cells, and compares them using standard calculation for
    mutual information. It builds a 2d histogram, which is used to calculate P(arr1, arr2)

    Args:
        arr1 (pandas series): gene expression data for a given cell in matrix_1
        arr2 (pandas series):
        bins           (int):
        m              (int):
    """
    fq = fast_histogram.histogram2d(arr1, arr2, range=[[arr1.min(), arr1.max()+1e-9], [arr2.min(), arr2.max()+1e-9]],
                                    bins=(bins, bins)) / float(m)
    sm = np.sum(fq * float(m), axis=1)
    tm = np.sum(fq * float(m), axis=0)
    sm = np.asmatrix(sm / float(sm.sum()))
    tm = np.asmatrix(tm / float(tm.sum()))
    sm_tm = np.matmul(np.transpose(sm), tm)
    div = np.divide(fq, sm_tm, where=sm_tm != 0, out=np.zeros_like(fq))
    ent = np.log(div, where=div != 0, out=np.zeros_like(div))
    agg = np.multiply(fq, ent, out=np.zeros_like(fq), where=fq != 0)
    joint_ent = -np.multiply(fq, np.log(fq, where=fq != 0, out=np.zeros_like(fq)),
                             out=np.zeros_like(fq), where=fq != 0).sum()
    return (joint_ent - agg.sum()) / joint_ent

In [None]:
%timeit calc_norm_mi(frame_dr[0], frame_dr[1], num_bins, num_genes)

In [None]:
from sklearn.neighbors import DistanceMetric
dist = DistanceMetric.get_metric('euclidean')
X = [frame_dr[0], frame_dr[1]]

In [None]:
%timeit dist.pairwise(X)

In [None]:
def read_preprocessed_mat(in_file):
    """Read in preprocessed matrix file into a dataframe."""
    if in_file.endswith('.txt'):
        frame = pd.read_csv(in_file, sep="\t", index_col=0).iloc[:, 0:]
    if in_file.endswith('.h5ad') or in_file.endswith('.h5'):
        adata = anndata.read_h5ad(in_file)
        frame = adata.to_df()
    return frame

In [None]:
adata = anndata.read_h5ad('/Users/lding/Git/MICA/test_data/inputs/10x/PBMC/3k/pre-processed/pbmc3k_preprocessed.h5ad')

In [None]:
print(adata.X.shape)

In [None]:
start = time.time()
indices, dists, forest = sc.neighbors.compute_neighbors_umap(adata.X, n_neighbors=10)
end = time.time()
runtime = end - start
msg = "The runtime for compute_neighbors_umap took {} seconds to complete".format(runtime)
logging.info(msg)

In [None]:
def calc_norm_mi(arr1, arr2, bins, m):
    """ Calculates a normalized mutual information distance D(X, Y) = 1 - I(X, Y)/H(X, Y) using bin-based method

    It takes gene expression data from single cells, and compares them using standard calculation for
    mutual information and joint entropy. It builds a 2d histogram, which is used to calculate P(arr1, arr2).

    Args:
        arr1 (pandas series): gene expression data for cell 1
        arr2 (pandas series): gene expression data for cell 2
        marginals  (ndarray): marginal probability matrix
        index1         (int): index of cell 1
        index2         (int): index of cell 2
        bins           (int): number of bins
        m              (int): number of genes
    Returns:
        a float between 0 and 1
    """
    fq = fast_histogram.histogram2d(arr1, arr2, range=[[arr1.min(), arr1.max()+1e-9], [arr2.min(), arr2.max()+1e-9]],
                                    bins=(bins, bins)) / float(m)
    sm = np.sum(fq * float(m), axis=1)
    tm = np.sum(fq * float(m), axis=0)
    sm = np.asmatrix(sm / float(sm.sum()))
    tm = np.asmatrix(tm / float(tm.sum()))
    sm_tm = np.matmul(np.transpose(sm), tm)
    div = np.divide(fq, sm_tm, where=sm_tm != 0, out=np.zeros_like(fq))
    ent = np.log(div, where=div != 0, out=np.zeros_like(div))
    agg = np.multiply(fq, ent, out=np.zeros_like(fq), where=fq != 0)
    joint_ent = -np.multiply(fq, np.log(fq, where=fq != 0, out=np.zeros_like(fq)),
                             out=np.zeros_like(fq), where=fq != 0).sum()
    return (joint_ent - agg.sum()) / joint_ent

In [None]:
num_bins = int((adata.X.shape[0]) ** (1 / 3.0))
num_genes = adata.X.shape[1]
metric_params = {"bins": num_bins, "m": num_genes}

In [None]:
start = time.time()
indices, dists, forest = sc.neighbors.compute_neighbors_umap(adata.X, n_neighbors=10, metric=calc_norm_mi,
                                                             metric_kwds=metric_params)
end = time.time()
runtime = end - start
msg = "The runtime for compute_neighbors_umap took {} seconds to complete".format(runtime)
logging.info(msg)

In [None]:
adata = anndata.read_h5ad('/Users/lding/Documents/MICA/Datasets/filtered_gene_bc_matrices/hg19/pbmc33k_preprocessed.h5ad')
frame = adata.to_df()
frame.shape

In [None]:
frame.to_csv('/Users/lding/Documents/MICA/kgraph/pmbc33k.csv')

In [None]:
np.arange(1, 12)