In [1]:
import networkx as nx
import numpy as np
import pandas as pd
from scipy import sparse
from scipy.sparse import csr_matrix
from scipy.sparse import coo_matrix
from time import perf_counter
import threading
from multiprocessing import Process, Pool
import math

In [2]:
## use multi-thread method to implement graph partition, function defination
def partion_multi_process (processID, graph_array, index_list, length):
    t_m_p = perf_counter()
    csr_row = np.int32(np.zeros(length + 1))
    csr_col = []
    temp_list = []
    for i in range(len(graph_array)):
        if ((int(graph_array[i][1]) >= index_list[processID]) and (int(graph_array[i][1]) < index_list[processID+1])):
            ## csr
            temp_list.append([graph_array[i][0], graph_array[i][1]])
            csr_row[graph_array[i][0]+1] += 1
    if (temp_list == []):
        return [], []

    temp_list = np.array(temp_list)
    temp_list = temp_list[np.lexsort([temp_list.T[1]])] ## sort array by incremental order
    temp_list = temp_list[np.lexsort([temp_list.T[0]])] ## sort array by incremental order

    csr_row = np.add.accumulate(csr_row)
    csr_col = temp_list[:, 1]

    t_m_e = perf_counter() - t_m_p
    print("partition finish, time : ", t_m_e)
    
    return csr_row, csr_col

In [3]:

## filename = './datasets/as-skitter.txt'
filename = './datasets/test.txt' ## seem our method performs better in amamzon dataset
## filename = './datasets/roadNet-CA.txt'

print("Load data from hard-disk ... ")
txt_array_t = np.int64(np.loadtxt(filename))
txt_array = txt_array_t[:,:2]
print(txt_array)

Load data from hard-disk ... 
[[0 1]
 [0 2]
 [0 3]
 [0 4]
 [0 5]
 [1 2]
 [2 3]
 [2 4]
 [4 5]]


In [4]:
## if source_id > dest_id, exchange them; only suitable for undirected graph.
for i in range (txt_array.shape[0]) :
    if (txt_array[i][0] >= txt_array[i][1]) :
        temp = txt_array[i][1]
        txt_array[i][1] = txt_array[i][0]
        txt_array[i][0] = temp

txt_array = txt_array[np.lexsort([txt_array.T[1]])] ## sort array by incremental order
txt_array = txt_array[np.lexsort([txt_array.T[0]])] ## sort array by incremental order

print("Delete self-edges ... ")
list_a = [] ## delete self-edges
for i in range (txt_array.shape[0]) :
    if (txt_array[i][0] == txt_array[i][1]) :
        list_a.append(i)
## print(list_a)
## print(txt_array)
txt_array = np.delete(txt_array, list_a, axis = 0)

print("Delete duplicated edges ... ")
list_b = []  ## delete duplicated edges 
for i in range (1, txt_array.shape[0]) :
    if ((txt_array[i][0] == txt_array[i-1][0]) & (txt_array[i][1] == txt_array[i-1][1])) :
        list_b.append(i)
## print(list_a)
txt_array = np.delete(txt_array, list_b, axis = 0)
dest_array = txt_array[:, 1]
dest_array = np.bincount(dest_array)
dest_array = np.add.accumulate(dest_array)
## print (dest_array.shape[0])
# print (dest_array)

print("Load data done ")

Delete self-edges ... 
Delete duplicated edges ... 
Load data done 


In [5]:
## define the partition index, need to add partition function here
t_partition_start = perf_counter()
adj_matrix_dim = np.int64(txt_array.max()) + 1 ## get the max id for csr row size
print ("vertex range : 0 -", adj_matrix_dim, end = " ")
partition_num = 2 ## can be set a variable, equals to thread numbers.
print ("using process number :", partition_num)
partition_index = np.zeros(partition_num + 1, dtype=np.int32)
for i in range(partition_num - 1):
    index_t = np.int32(len(txt_array)*(i+1)/partition_num)
    abs_array = np.absolute(dest_array - index_t)
    partition_index[i+1] = abs_array.argmin()
    ## partition_index[i+1] = np.int32((adj_matrix_dim)*(i+1)/partition_num)
    ## partition_index[i+1] = np.int32(adj_matrix_dim*(math.pow(0.717, partition_num-1-i)))
partition_index[partition_num] = adj_matrix_dim
print("partition index array ", partition_index)

vertex range : 0 - 6 using process number : 2
partition index array  [0 2 6]


In [6]:
## multi process pool.
process_pool = Pool(partition_num)
result_pool = []
global _shared_array
_shared_array = txt_array
for i in range(partition_num):
    result_pool.append(process_pool.apply_async(func=partion_multi_process, args=(i, _shared_array, partition_index, adj_matrix_dim)))

process_pool.close()
process_pool.join()

graph_csr_row = []
graph_csr_col = []

for i in range(partition_num):
    temp_a, temp_b = result_pool[i].get()
    graph_csr_row.append(temp_a)
    graph_csr_col.append(temp_b)
    ## print(result_pool[i].get())

partition finish, time : partition finish, time :  0.001255318522453308 
0.001018540933728218


In [7]:
print(graph_csr_row)
print(graph_csr_col)

[array([0, 1, 1, 1, 1, 1, 1]), array([0, 4, 5, 7, 7, 8, 8])]
[array([1]), array([2, 3, 4, 5, 2, 3, 4, 5])]


In [9]:
print(len(graph_csr_row))
for i in range (0, len(graph_csr_row)):
    fh = open("csr_row%s.txt" % i, "w")
    for j in range(0, len(graph_csr_row[i])):
        fh.write(str(int(graph_csr_row[i][j])))
        fh.write('\n')
    fh.close()

print(len(graph_csr_col))
for k in range (0, len(graph_csr_col)):
    fk = open("csr_col%s.txt" % k, "w")
    for h in range(0, len(graph_csr_col[k])):
        fk.write(str(int(graph_csr_col[k][h])))
        fk.write('\n')
    fk.close()

2
2
